[jira] [Issue Comment Deleted] (KAFKA-10304) Revisit and improve the tests of MirrorMaker 2

2020-09-08 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-10304:
---
Comment: was deleted

(was: https://github.com/apache/kafka/pull/9224)

> Revisit and improve the tests of MirrorMaker 2
> --
>
> Key: KAFKA-10304
> URL: https://issues.apache.org/jira/browse/KAFKA-10304
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect, mirrormaker
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Minor
> Fix For: 2.8.0
>
>
> In a different MM2 change https://github.com/apache/kafka/pull/9029, some 
> concerns on tests were raised. It may be a good time to revisit and refactor 
> the tests, possibly in the following way:
> (1) are 100 messages good enough for integration tests?
> (2) what about the broker failure in the middle of integration tests?
> (3) other validations to check (e.g. topic config sync)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10304) Revisit and improve the tests of MirrorMaker 2

2020-09-08 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17192664#comment-17192664
 ] 

Ning Zhang commented on KAFKA-10304:


Hi [~mimaison] [~ryannedolan] this is mostly a refactoring pr on the MM2 
integration tests. The purpose of doing that: (1) address the concern in the 
previous PR (https://github.com/apache/kafka/pull/9029), (2) prepare for the 
future development (e.g. extract common functions). I think the current PR 
(https://github.com/apache/kafka/pull/9224) is just a starting point, and I am 
very appreciated for your feedback on what to test additionally and how to get 
close to the real scenario.

> Revisit and improve the tests of MirrorMaker 2
> --
>
> Key: KAFKA-10304
> URL: https://issues.apache.org/jira/browse/KAFKA-10304
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect, mirrormaker
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Minor
> Fix For: 2.8.0
>
>
> In a different MM2 change https://github.com/apache/kafka/pull/9029, some 
> concerns on tests were raised. It may be a good time to revisit and refactor 
> the tests, possibly in the following way:
> (1) are 100 messages good enough for integration tests?
> (2) what about the broker failure in the middle of integration tests?
> (3) other validations to check (e.g. topic config sync)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10304) Revisit and improve the tests of MirrorMaker 2

2020-09-08 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-10304:
---
Description: 
In a different MM2 change https://github.com/apache/kafka/pull/9029, some 
concerns on tests were raised. It may be a good time to revisit and refactor 
the tests, possibly in the following way:

(1) are 100 messages good enough for integration tests?
(2) what about the broker failure in the middle of integration tests?
(3) other validations to check (e.g. topic config sync)

  was:
due to the quick development of Kafka MM 2, unit and integration tests of 
MirrorMaker 2 were made just for covering each individual feature and some of 
them are simply copy-n-paste from the existing tests with small tweaks. It may 
be a good time to revisit and improve the tests, possibly in the following way:

(1) are 100 messages good enough for integration tests?
(2) what about the failure in the middle of integration tests?
(3) Do we want to check other messages (e.g. checkpoint, heartbeat, offset 
sync..) beyond the mirrored message in integration tests?


> Revisit and improve the tests of MirrorMaker 2
> --
>
> Key: KAFKA-10304
> URL: https://issues.apache.org/jira/browse/KAFKA-10304
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect, mirrormaker
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Minor
> Fix For: 2.8.0
>
>
> In a different MM2 change https://github.com/apache/kafka/pull/9029, some 
> concerns on tests were raised. It may be a good time to revisit and refactor 
> the tests, possibly in the following way:
> (1) are 100 messages good enough for integration tests?
> (2) what about the broker failure in the middle of integration tests?
> (3) other validations to check (e.g. topic config sync)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ning2008wisc commented on a change in pull request #9224: refactor MM2 integration tests

2020-09-08 Thread GitBox


ning2008wisc commented on a change in pull request #9224:
URL: https://github.com/apache/kafka/pull/9224#discussion_r485363204



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTest.java
##
@@ -396,27 +330,67 @@ public void testOneWayReplicationWithAutoOffsetSync() 
throws InterruptedExceptio
 try (Consumer consumer1 = 
primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
 "group.id", "consumer-group-1"), "test-topic-2")) {
 // we need to wait for consuming all the records for MM2 
replicating the expected offsets
-waitForConsumingAllRecords(consumer1);
+waitForConsumingAllRecords(consumer1, NUM_RECORDS_PRODUCED);
 }
 
 // create a consumer at backup cluster with same consumer group Id to 
consume old and new topic
 consumer = 
backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
 "group.id", "consumer-group-1"), "primary.test-topic-1", 
"primary.test-topic-2");
 
-waitForConsumerGroupOffsetSync(consumer, 
Arrays.asList("primary.test-topic-1", "primary.test-topic-2"), 
"consumer-group-1");
+waitForConsumerGroupOffsetSync(backup, consumer, 
Arrays.asList("primary.test-topic-1", "primary.test-topic-2"), 
+"consumer-group-1", NUM_RECORDS_PRODUCED);
 
 records = consumer.poll(Duration.ofMillis(500));
 // similar reasoning as above, no more records to consume by the same 
consumer group at backup cluster
 assertEquals("consumer record size is not zero", 0, records.count());
 consumer.close();
-
 }
+
+@Test
+public void testWithBrokerRestart() throws InterruptedException {
+// test with a higher number of records
+int numRecords = NUM_RECORDS_PRODUCED * 100;
+
+produceRecords(Arrays.asList(primary), Arrays.asList("test-topic-1"), 
numRecords);
 
-private void deleteAllTopics(EmbeddedKafkaCluster cluster) {
-Admin client = cluster.createAdminClient();
-try {
-client.deleteTopics(client.listTopics().names().get());
-} catch (Throwable e) {
-}
+// one way replication from primary to backup
+mm2Props.put("backup->primary.enabled", "false");
+mm2Config = new MirrorMakerConfig(mm2Props);
+   
+waitUntilMirrorMakerIsRunning(backup, SOURCE_CONNECTOR, mm2Config, 
"primary", "backup");
+
+// have to sleep a little for MM to be ready for the following the 
kafka broker restart
+Thread.sleep(TimeUnit.SECONDS.toMillis(1));
+
+// restart kafka broker at backup cluster
+restartKafkaBroker(backup);
+
+Consumer consumer = 
backup.kafka().createConsumerAndSubscribeTo(
+Collections.singletonMap("group.id", "consumer-group-1"), 
"primary.test-topic-1");
+// verify the consumption equals to produce
+waitForConsumingAllRecords(consumer, numRecords);
+consumer.commitAsync();
+
+// produce another set of records
+produceRecords(Arrays.asList(primary), Arrays.asList("test-topic-1"), 
numRecords);
+// restart kafka broker at primary cluster
+restartKafkaBroker(primary);
+// verify the consumption equals to produce
+waitForConsumingAllRecords(consumer, numRecords);
+
+consumer.close();
+}
+
+void createTopics() {
+// to verify topic config will be sync-ed across clusters
+Map topicConfig = new HashMap<>();
+topicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_COMPACT);
+// create these topics before starting the connectors so we don't need 
to wait for discovery
+primary.kafka().createTopic("test-topic-1", NUM_PARTITIONS, 1, 
topicConfig);

Review comment:
   when creating `test-topic-1` topic on primary cluster, add a topic 
config. Later on, we will check if the config is synced from primary to backup 
cluster.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe opened a new pull request #9269: MINOR: add ImplicitLinkedHashCollection#moveToEnd

2020-09-08 Thread GitBox


cmccabe opened a new pull request #9269:
URL: https://github.com/apache/kafka/pull/9269


   Add ImplicitLinkedHashCollection#moveToEnd.
   
   Refactor ImplicitLinkedHashCollectionIterator to be a little bit more
   robust against concurrent modifications to the map.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ning2008wisc commented on a change in pull request #9224: refactor MM2 integration tests

2020-09-08 Thread GitBox


ning2008wisc commented on a change in pull request #9224:
URL: https://github.com/apache/kafka/pull/9224#discussion_r485362458



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTest.java
##
@@ -207,13 +173,16 @@ public void testReplication() throws InterruptedException 
{
 
 mm2Config = new MirrorMakerConfig(mm2Props);
 
-waitUntilMirrorMakerIsRunning(backup, mm2Config, "primary", "backup");
+waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, 
"primary", "backup");
 
-waitUntilMirrorMakerIsRunning(primary, mm2Config, "backup", "primary");
+waitUntilMirrorMakerIsRunning(primary, CONNECTOR_LIST, mm2Config, 
"backup", "primary");   
 
 MirrorClient primaryClient = new 
MirrorClient(mm2Config.clientConfig("primary"));
 MirrorClient backupClient = new 
MirrorClient(mm2Config.clientConfig("backup"));
-
+
+assertEquals("topic config was not synced", 
TopicConfig.CLEANUP_POLICY_COMPACT, 
+getTopicConfig(backup.kafka(), "primary.test-topic-1", 
TopicConfig.CLEANUP_POLICY_CONFIG));

Review comment:
   Add a check for topic config sync, since the topic created on primary 
cluster has a "cleanup.policy" config





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ning2008wisc commented on a change in pull request #9224: refactor MM2 integration tests

2020-09-08 Thread GitBox


ning2008wisc commented on a change in pull request #9224:
URL: https://github.com/apache/kafka/pull/9224#discussion_r485361003



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/TestUtils.java
##
@@ -32,4 +73,166 @@
 }
 return props;
 }
+
+/*
+ * launch the connectors on kafka connect cluster, then check if they are 
running
+ */
+public static void waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster 
connectCluster, List connectorClasses,

Review comment:
   this is mostly copy-paste from `MirrorConnectorsIntegrationTest`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10467) kafka-topic --describe fails for topic created by "produce"

2020-09-08 Thread Swayam Raina (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17192630#comment-17192630
 ] 

Swayam Raina commented on KAFKA-10467:
--

[~huxi_2b] the topic gets created by the broker and exists (see description)

> kafka-topic --describe fails for topic created by "produce"
> ---
>
> Key: KAFKA-10467
> URL: https://issues.apache.org/jira/browse/KAFKA-10467
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 2.3.1
> Environment: MacOS 
>Reporter: Swayam Raina
>Priority: Minor
>
> {code:java}
> > kafka-topics --version
> 2.3.1 (Commit:18a913733fb71c01){code}
>  
> While producing to a topic that does not already exists
> {code:java}
> producer.send("does-not-exists", "msg-1")
> {code}
>  
> broker creates the topic
> {code:java}
> // partition file
> > ls /tmp/kafka-logs/
> does-not-exists-0{code}
>  
> If I try to list the topics, it shows also shows this new topic
> {code:java}
> > kafka-topics --bootstrap-server localhost:9092 --list
> does-not-exists-0
> {code}
> Now while trying to describe the topic that was auto-created the following 
> error is thrown
>  
> {code:java}
> > kafka-topics --bootstrap-server localhost:9092 --topic does-not-exists 
> >--describe
> Error while executing topic command : 
> org.apache.kafka.common.errors.UnknownServerException: The server experienced 
> an unexpected error when processing the request.Error while executing topic 
> command : org.apache.kafka.common.errors.UnknownServerException: The server 
> experienced an unexpected error when processing the request.[2020-09-08 
> 00:21:30,890] ERROR java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.UnknownServerException: The server experienced 
> an unexpected error when processing the request. at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>  at 
> kafka.admin.TopicCommand$AdminClientTopicService.$anonfun$describeTopic$3(TopicCommand.scala:228)
>  at 
> kafka.admin.TopicCommand$AdminClientTopicService.$anonfun$describeTopic$3$adapted(TopicCommand.scala:225)
>  at scala.collection.Iterator.foreach(Iterator.scala:941) at 
> scala.collection.Iterator.foreach$(Iterator.scala:941) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at 
> scala.collection.IterableLike.foreach(IterableLike.scala:74) at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:73) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:56) at 
> kafka.admin.TopicCommand$AdminClientTopicService.describeTopic(TopicCommand.scala:225)
>  at kafka.admin.TopicCommand$.main(TopicCommand.scala:66) at 
> kafka.admin.TopicCommand.main(TopicCommand.scala)Caused by: 
> org.apache.kafka.common.errors.UnknownServerException: The server experienced 
> an unexpected error when processing the request. (kafka.admin.TopicCommand$)
>  
> {code}
> ```
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on pull request #9102: KAFKA-10326 Both serializer and deserializer should be able to see th…

2020-09-08 Thread GitBox


guozhangwang commented on pull request #9102:
URL: https://github.com/apache/kafka/pull/9102#issuecomment-689304026


   I took a quick look and it lgtm. @abbccdda If your pass is good please feel 
free to merge.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ning2008wisc commented on a change in pull request #9224: refactor MM2 integration tests

2020-09-08 Thread GitBox


ning2008wisc commented on a change in pull request #9224:
URL: https://github.com/apache/kafka/pull/9224#discussion_r485337705



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/TestUtils.java
##
@@ -32,4 +71,141 @@
 }
 return props;
 }
+
+public static void waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster 
connectCluster, List mirrorClasses,

Review comment:
   This is the simple move from `MirrorConnectorsIntegrationTest` with 
generalization of connector class





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ning2008wisc commented on a change in pull request #9224: refactor MM2 integration tests

2020-09-08 Thread GitBox


ning2008wisc commented on a change in pull request #9224:
URL: https://github.com/apache/kafka/pull/9224#discussion_r485337316



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/TestUtils.java
##
@@ -16,12 +16,51 @@
  */
 package org.apache.kafka.connect.mirror;
 
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.DescribeConfigsResult;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+
+import static org.apache.kafka.connect.mirror.TestUtils.NUM_WORKERS;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static 
org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.CONNECTOR_SETUP_DURATION_MS;
+import static org.junit.Assert.assertTrue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TestUtils {

Review comment:
   propose `TestUtils` to be the central place to host common functions 
that will be used by integration tests 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8834: KAFKA-10134: Enable heartbeat during PrepareRebalance and Depend On State For Poll Timeout

2020-09-08 Thread GitBox


guozhangwang commented on a change in pull request #8834:
URL: https://github.com/apache/kafka/pull/8834#discussion_r485326873



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -652,10 +644,10 @@ public void handle(JoinGroupResponse joinResponse, 
RequestFuture fut
 } else if (error == Errors.MEMBER_ID_REQUIRED) {
 // Broker requires a concrete member id to be allowed to join 
the group. Update member id
 // and send another join group request in next cycle.
+String memberId = joinResponse.data().memberId();
+log.debug("Attempt to join group returned {} error. Will set 
the member id as {} and then rejoin", error, memberId);
 synchronized (AbstractCoordinator.this) {
-AbstractCoordinator.this.generation = new 
Generation(OffsetCommitRequest.DEFAULT_GENERATION_ID,
-joinResponse.data().memberId(), null);
-AbstractCoordinator.this.resetStateAndRejoin();

Review comment:
   Yes, this is redundant since we are raising this error and 
`resetStateAndRejoin()` would still be executed at the handler anyways.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8834: KAFKA-10134: Enable heartbeat during PrepareRebalance and Depend On State For Poll Timeout

2020-09-08 Thread GitBox


guozhangwang commented on a change in pull request #8834:
URL: https://github.com/apache/kafka/pull/8834#discussion_r485324085



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -446,14 +453,15 @@ boolean joinGroupIfNeeded(final Timer timer) {
 resetJoinGroupFuture();
 needsJoinPrepare = true;
 } else {
-log.info("Generation data was cleared by heartbeat thread. 
Initiating rejoin.");
+log.info("Generation data was cleared by heartbeat thread 
to {} and state is now {} before " +
+ "the rebalance callback is triggered, marking this 
rebalance as failed and retry",
+ generation, state);
 resetStateAndRejoin();

Review comment:
   That's a good question. I just thought about this and I think I can 
change the caller of `resetGeneration` (which is the only place that hb thread 
can reset the generation) and move the `state = MemberState.UNJOINED;` into the 
callee to make sure that they are always changed together.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -433,7 +440,7 @@ boolean joinGroupIfNeeded(final Timer timer) {
 generationSnapshot = this.generation;
 }
 
-if (generationSnapshot != Generation.NO_GENERATION) {
+if (generationSnapshot != Generation.NO_GENERATION && state == 
MemberState.STABLE) {

Review comment:
   Ack, will do.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9264: KAFKA-5636: Add Sliding Windows documentation

2020-09-08 Thread GitBox


ableegoldman commented on a change in pull request #9264:
URL: https://github.com/apache/kafka/pull/9264#discussion_r485314713



##
File path: docs/streams/developer-guide/dsl-api.html
##
@@ -3286,13 +3306,33 @@ KTable-KTable 
Foreign-Key
 
 Sliding time windows
 Sliding windows are actually quite different from 
hopping and tumbling windows.  In Kafka Streams, sliding windows
-are used only for join 
operations, and can be specified through the
-JoinWindows class.
-A sliding window models a fixed-size window that 
slides continuously over the time axis; here, two data records are
+are used for join 
operations, specified by using the
+JoinWindows class, and windowed aggregations, 
specified by using the code class="docutils literal">SlidingWindows class.

Review comment:
   ```suggestion
   JoinWindows class, and windowed aggregations, 
specified by using the SlidingWindows class.
   ```

##
File path: docs/streams/developer-guide/dsl-api.html
##
@@ -3286,13 +3306,33 @@ KTable-KTable 
Foreign-Key
 
 Sliding time windows
 Sliding windows are actually quite different from 
hopping and tumbling windows.  In Kafka Streams, sliding windows
-are used only for join 
operations, and can be specified through the
-JoinWindows class.
-A sliding window models a fixed-size window that 
slides continuously over the time axis; here, two data records are
+are used for join 
operations, specified by using the
+JoinWindows class, and windowed aggregations, 
specified by using the code class="docutils literal">SlidingWindows class.
+A sliding window models a fixed-size window that 
slides continuously over the time axis. In this model, two data records are
 said to be included in the same window if (in the 
case of symmetric windows) the difference of their timestamps is
-within the window size.  Thus, sliding windows are 
not aligned to the epoch, but to the data record timestamps.  In
-contrast to hopping and tumbling windows, the 
lower and upper window time interval bounds of sliding windows are
-both inclusive.
+within the window size. As a sliding window moves 
along the time axis, records may fall into multiple snapshots of
+the sliding window, but each unique combination of 
records appears only in one sliding window snapshot.
+The following code defines a sliding window with a 
time difference of 10 minutes and a grace period of 30 minutes:
+import org.apache.kafka.streams.kstream.SlidingWindows;
+
+// A sliding time window with a time difference of 10 minutes 
and grace period of 30 minutes
+Duration timeDifferenceMs = Duration.ofMinutes(10);
+Duration gracePeriodMs = Duration.ofMinutes(30);
+SlidingWindows.withTimeDifferenceAndGrace(timeDifferenceMs,gracePeriodMs);
+
+ 
+ Note
+ Sliding windows require that you set 
a grace period, as shown above. For time windows and session windows,
+ setting the grace period is optional and 
defaults to 24 hours.
+ 
+ 
+ 
+ This diagram shows windowing a stream of data records with 
sliding windows. The overlap of
+ the sliding window snapshots varies depending 
on the record times. In this diagram, the time numbers represent miliseconds. 
For example,
+ t=5 means “at the five milisecond 
mark”.

Review comment:
   I rendered the site locally and there's something weird going on here. 
Seems like you can't use literal `"`. Look at what the other captions do, seems 
like it might be `“`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

2020-09-08 Thread GitBox


ableegoldman commented on pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#issuecomment-689272887


   I think this is ready for @vvcephei to review, once the early records PR is 
merged and this one is rebased



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9157: Update for KIP-450 to handle early records

2020-09-08 Thread GitBox


ableegoldman commented on pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#issuecomment-689272607


   Not sure what's the deal with that but I ran it locally myself and 
everything passed. Can we merge this @mjsax ?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9157: Update for KIP-450 to handle early records

2020-09-08 Thread GitBox


ableegoldman commented on pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#issuecomment-689272398


   Looks like the three Java builds all passed, but the CI build failed due to
   ```
   [2020-09-09T02:35:12.324Z] + mkdir test-streams-archetype
   [2020-09-09T02:35:12.324Z] mkdir: cannot create directory 
'test-streams-archetype': File exists
   [2020-09-09T02:35:12.325Z] + echo Could not create test directory for stream 
quickstart archetype
   [2020-09-09T02:35:12.325Z] Could not create test directory for stream 
quickstart archetype
   ```
   
   I guess there was some issue with cleaning up or running this twice...?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10467) kafka-topic --describe fails for topic created by "produce"

2020-09-08 Thread huxihx (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17192585#comment-17192585
 ] 

huxihx commented on KAFKA-10467:


Seems the problem is caused by another issue since it should have thrown `Topic 
'does-not-exists' does not exist as expected`.  Could you manually create a 
topic using TopicCommand and then describe it to see if everything works?

> kafka-topic --describe fails for topic created by "produce"
> ---
>
> Key: KAFKA-10467
> URL: https://issues.apache.org/jira/browse/KAFKA-10467
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 2.3.1
> Environment: MacOS 
>Reporter: Swayam Raina
>Priority: Minor
>
> {code:java}
> > kafka-topics --version
> 2.3.1 (Commit:18a913733fb71c01){code}
>  
> While producing to a topic that does not already exists
> {code:java}
> producer.send("does-not-exists", "msg-1")
> {code}
>  
> broker creates the topic
> {code:java}
> // partition file
> > ls /tmp/kafka-logs/
> does-not-exists-0{code}
>  
> If I try to list the topics, it shows also shows this new topic
> {code:java}
> > kafka-topics --bootstrap-server localhost:9092 --list
> does-not-exists-0
> {code}
> Now while trying to describe the topic that was auto-created the following 
> error is thrown
>  
> {code:java}
> > kafka-topics --bootstrap-server localhost:9092 --topic does-not-exists 
> >--describe
> Error while executing topic command : 
> org.apache.kafka.common.errors.UnknownServerException: The server experienced 
> an unexpected error when processing the request.Error while executing topic 
> command : org.apache.kafka.common.errors.UnknownServerException: The server 
> experienced an unexpected error when processing the request.[2020-09-08 
> 00:21:30,890] ERROR java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.UnknownServerException: The server experienced 
> an unexpected error when processing the request. at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>  at 
> kafka.admin.TopicCommand$AdminClientTopicService.$anonfun$describeTopic$3(TopicCommand.scala:228)
>  at 
> kafka.admin.TopicCommand$AdminClientTopicService.$anonfun$describeTopic$3$adapted(TopicCommand.scala:225)
>  at scala.collection.Iterator.foreach(Iterator.scala:941) at 
> scala.collection.Iterator.foreach$(Iterator.scala:941) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at 
> scala.collection.IterableLike.foreach(IterableLike.scala:74) at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:73) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:56) at 
> kafka.admin.TopicCommand$AdminClientTopicService.describeTopic(TopicCommand.scala:225)
>  at kafka.admin.TopicCommand$.main(TopicCommand.scala:66) at 
> kafka.admin.TopicCommand.main(TopicCommand.scala)Caused by: 
> org.apache.kafka.common.errors.UnknownServerException: The server experienced 
> an unexpected error when processing the request. (kafka.admin.TopicCommand$)
>  
> {code}
> ```
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on pull request #9257: KAFKA-10463 the necessary utilities in Dockerfile should include git

2020-09-08 Thread GitBox


chia7712 commented on pull request #9257:
URL: https://github.com/apache/kafka/pull/9257#issuecomment-689265404


   @ijuma Could you take a look? I want to run system test on different jdk. I 
configure ```jdk_version``` to change the os of kafka dockerfile. Without this 
patch, it is impossible to switch to other popular jdk images (for example, 
openjdk:11 and azul/zulu-openjdk:11) quickly since the ```git``` is not 
installed.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9182: KAFKA-10403 Replace scala collection by java collection in generating…

2020-09-08 Thread GitBox


chia7712 commented on pull request #9182:
URL: https://github.com/apache/kafka/pull/9182#issuecomment-689256679


   ```
Execution failed for task ':connect:mirror:integrationTest'.
   01:19:16  > Process 'Gradle Test Executor 48' finished with non-zero exit 
value 1
   01:19:16This problem might be caused by incorrect test process 
configuration.
   01:19:16Please refer to the test execution section in the User Manual at
   ```
   
   the error is unrelated to this PR



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-09-08 Thread GitBox


chia7712 commented on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-689256048


   ```
   Build / JDK 15 / 
kafka.network.ConnectionQuotasTest.testNoConnectionLimitsByDefault
   Build / JDK 11 / 
kafka.network.DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota
   Build / JDK 11 / 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
   ```
   ```
   Module: kafkatest.tests.connect.connect_distributed_test
   Class:  ConnectDistributedTest
   Method: test_bounce
   Arguments:
   {
 "clean": true,
 "connect_protocol": "sessioned"
   }
   ```
   
   On my local, they are flaky on trunk branch.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9062: KAFKA-8098: fix the flaky test by disabling the auto commit to avoid member rejoining

2020-09-08 Thread GitBox


showuon commented on pull request #9062:
URL: https://github.com/apache/kafka/pull/9062#issuecomment-689247748


   @omkreddy , any comment for this PR? Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9104: KAFKA-10266: Update the connector config header.converter

2020-09-08 Thread GitBox


showuon commented on pull request #9104:
URL: https://github.com/apache/kafka/pull/9104#issuecomment-689247469


   @kkonstantine  , could you review this PR? Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9178: KAFKA-8362: fix the old checkpoint won't be removed after alter log dir

2020-09-08 Thread GitBox


showuon commented on pull request #9178:
URL: https://github.com/apache/kafka/pull/9178#issuecomment-689247342


   @tombentley @ijuma , could you help review this PR? Thank you.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9179: KAFKA-10390: Remove ignore case option when grep process info to be more specific

2020-09-08 Thread GitBox


showuon commented on pull request #9179:
URL: https://github.com/apache/kafka/pull/9179#issuecomment-689244826


   @cmccabe  @lbradstreet  , any other comments for this PR? Thank you.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

2020-09-08 Thread GitBox


showuon commented on pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#issuecomment-689244512


   @ijuma  , could you help review this PR again? Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-09-08 Thread GitBox


junrao commented on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-689241869


   @ijuma @hachikuji @rajinisivaram : I think this PR is ready to be merged. 
Any further comments from you?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

2020-09-08 Thread GitBox


ableegoldman commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r485266653



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
##
@@ -246,4 +238,31 @@ private void assertOutputKeyValueTimestamp(final 
TestOutputTopic testRecord = new 
TestRecord<>(expectedKey, expectedValue, null, expectedTimestamp);
 assertThat(nonWindowedRecord, equalTo(testRecord));
 }
+
+private void assertOutputKeyValueNotOrdered(final Set> results) {

Review comment:
   This makes it sound like you want to assert that the output is not 
ordered, which I don't think is the point here?
   
   Also, since you're only calling this from one place and are asserting a 
specific output that corresponds to a specific test, I would just inline this 
check in the test instead of moving it out to a new method

##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
##
@@ -239,52 +241,81 @@ private void doCountSlidingWindows(final  
MockProcessorSupplier
 inputTopic.pipeInput("2", "B", 1000L);
 inputTopic.pipeInput("3", "C", 600L);
 }
-assertThat(supplier.theCapturedProcessor().processed(), 
equalTo(Arrays.asList(
-// processing A@500
-new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 
500L)), 1L, 500L),
-// processing A@999
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 1L, 999L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(499L, 999L)), 2L, 999L),
-// processing A@600
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(499L, 999L)), 3L, 999L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 2L, 999L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(100L, 600L)), 2L, 600L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(601L, 1101L)), 1L, 999L),
-// processing B@500
-new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 
500L)), 1L, 500L),
-// processing B@600
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 1L, 600L),
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(100L, 600L)), 2L, 600L),
-// processing B@700
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 2L, 700L),
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(601L, 1101L)), 1L, 700L),
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(200L, 700L)), 3L, 700L),
-// processing C@501
-new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(1L, 
501L)), 1L, 501L),
-// processing first A@1000
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 3L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(601L, 1101L)), 2L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(1000L, 1500L)), 1L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(500L, 1000L)), 4L, 1000L),
-// processing second A@1000
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(500L, 1000L)), 5L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 4L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(601L, 1101L)), 3L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(1000L, 1500L)), 2L, 1000L),
-// processing first B@1000
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 3L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(601L, 1101)), 2L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(701L, 1201L)), 1L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(500L, 1000L)), 4L, 1000L),
-// processing second B@1000
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(500L, 1000L)), 5L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 4L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(601L, 1101)), 3L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(701L, 1201L)), 2L, 1000L),
-// processing C@600
-new KeyValueTimestamp<>(new Windowed<>("3", new 
Ti

[GitHub] [kafka] showuon commented on pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint

2020-09-08 Thread GitBox


showuon commented on pull request #9121:
URL: https://github.com/apache/kafka/pull/9121#issuecomment-689196185


   @mjsax could you review this PR again? Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-5636) Add Sliding-Window support for Aggregations

2020-09-08 Thread Leah Thomas (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leah Thomas resolved KAFKA-5636.

Resolution: Fixed

> Add Sliding-Window support for Aggregations
> ---
>
> Key: KAFKA-5636
> URL: https://issues.apache.org/jira/browse/KAFKA-5636
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Michael G. Noll
>Assignee: Leah Thomas
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.7.0
>
>
> We support three windowing types for aggregations in the DSL right now:
>  * Tumbling windows
>  * Hopping windows (note: some stream processing tools call these "sliding 
> windows")
>  * Session windows
> Some users have expressed the need for sliding windows. While we do use 
> sliding windows for joins, we do not yet support sliding window aggregations 
> in the DSL



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-5636) Add Sliding-Window support for Aggregations

2020-09-08 Thread Leah Thomas (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leah Thomas updated KAFKA-5636:
---
Fix Version/s: 2.7.0

> Add Sliding-Window support for Aggregations
> ---
>
> Key: KAFKA-5636
> URL: https://issues.apache.org/jira/browse/KAFKA-5636
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Michael G. Noll
>Assignee: Leah Thomas
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.7.0
>
>
> We support three windowing types for aggregations in the DSL right now:
>  * Tumbling windows
>  * Hopping windows (note: some stream processing tools call these "sliding 
> windows")
>  * Session windows
> Some users have expressed the need for sliding windows. While we do use 
> sliding windows for joins, we do not yet support sliding window aggregations 
> in the DSL



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-08 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r485249087



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##
@@ -439,6 +442,185 @@ public void testJoin() {
 }
 }
 
+@SuppressWarnings("unchecked")

Review comment:
   It's probably some weird Java thing where it lazily types the generics 
and doesn't force the cast until you put it in the map. (I just made that up, 
but @vvcephei  would probably know)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-08 Thread GitBox


lct45 commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r485248486



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##
@@ -439,6 +442,185 @@ public void testJoin() {
 }
 }
 
+@SuppressWarnings("unchecked")

Review comment:
   Yeah I do find it odd that there are no warnings earlier, it's something 
about the Map that's triggering them





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] JoelWee commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

2020-09-08 Thread GitBox


JoelWee commented on a change in pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#discussion_r485244413



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##
@@ -58,22 +58,23 @@ public void init(final ProcessorContext context) {
 
 @Override
 public void process(final K1 key, final V1 value) {
-// we do join iff keys are equal, thus, if key is null we cannot join 
and just ignore the record
-// If {@code keyMapper} returns {@code null} it implies there is no 
match,
+// We allow null keys unless {@code keyMapper} returns {@code null} 
and we ignore it as invalid.

Review comment:
   Yep makes sense. Have updated it now. Just noting here that this means 
we're changing a 
[test](https://github.com/apache/kafka/pull/9186/files#diff-e3715715832b244da2d8787362b0c570R230)
 we previously had





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-08 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r485239998



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##
@@ -439,6 +442,185 @@ public void testJoin() {
 }
 }
 
+@SuppressWarnings("unchecked")

Review comment:
   That seems weird to me. Guessing it's ultimately due to the 
`supplier.theCapturedProcessor().processed()` we loop over. But then wouldn't 
we get the warning a bit earlier? 🤷‍♀️ 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-08 Thread GitBox


lct45 commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r485235880



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##
@@ -439,6 +442,185 @@ public void testJoin() {
 }
 }
 
+@SuppressWarnings("unchecked")

Review comment:
   Without it, there are warnings with the transition from `K, V` to `Long, 
ValueAndTimestamp` when adding and updating the hash map that holds the results.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9264: KAFKA-5636: Add Sliding Windows documentation

2020-09-08 Thread GitBox


ableegoldman commented on pull request #9264:
URL: https://github.com/apache/kafka/pull/9264#issuecomment-689173489


   cc @guozhangwang for secondary review & merge



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5

2020-09-08 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17192513#comment-17192513
 ] 

Sophie Blee-Goldman commented on KAFKA-10134:
-

Hey [~davispw],

It looks like you might have run into a few distinct issues: the rebalancing 
problems, the "insufficient bytes available" IllegalStateException, and the 
"Active task 3_0 should have been suspended" IllegalStateException.

The rebalancing seems to point to this issue, as the full fix did not make it 
into 2.6.0 in time. It would be great if you could test out the patch and see 
if that helps (building from 
[pull/8834|https://github.com/apache/kafka/pull/8834] specifically, which is 
not yet merged). The patch I linked also includes a fix for KAFKA-10122, 
another cause of unnecessary rebalances.

For the two IllegalStateException issues, could you open separate tickets? They 
seem unrelated to this, and to each other, but definitely merit a closer look. 
Any logs you have from the time of the exceptions would help a lot. 

Thanks!

> High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
> 
>
> Key: KAFKA-10134
> URL: https://issues.apache.org/jira/browse/KAFKA-10134
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Sean Guo
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 2.5.2, 2.6.1
>
> Attachments: consumer3.log.2020-08-20.log, 
> consumer5.log.2020-07-22.log
>
>
> We want to utilize the new rebalance protocol to mitigate the stop-the-world 
> effect during the rebalance as our tasks are long running task.
> But after the upgrade when we try to kill an instance to let rebalance happen 
> when there is some load(some are long running tasks >30S) there, the CPU will 
> go sky-high. It reads ~700% in our metrics so there should be several threads 
> are in a tight loop. We have several consumer threads consuming from 
> different partitions during the rebalance. This is reproducible in both the 
> new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The 
> difference is that with old eager rebalance rebalance protocol used the high 
> CPU usage will dropped after the rebalance done. But when using cooperative 
> one, it seems the consumers threads are stuck on something and couldn't 
> finish the rebalance so the high CPU usage won't drop until we stopped our 
> load. Also a small load without long running task also won't cause continuous 
> high CPU usage as the rebalance can finish in that case.
>  
> "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 
> cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable  
> [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 
> os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 
> runnable  [0x7fe119aab000]   java.lang.Thread.State: RUNNABLE at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
> at
>  
> By debugging into the code we found it looks like the clients are  in a loop 
> on finding the coordinator.
> I also tried the old rebalance protocol for the new version the issue still 
> exists but the CPU will be back to normal when the rebalance is done.
> Also tried the same on the 2.4.1 which seems don't have this issue. So it 
> seems related something changed between 2.4.1 and 2.5.0.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-08 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r485222996



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -146,96 +165,203 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 boolean leftWinAlreadyCreated = false;
 boolean rightWinAlreadyCreated = false;
 
-// keep the left type window closest to the record
-Window latestLeftTypeWindow = null;
+Long previousRecordTimestamp = null;
+
 try (
 final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
 key,
 key,
-timestamp - 2 * windows.timeDifferenceMs(),
+Math.max(0, inputRecordTimestamp - 2 * 
windows.timeDifferenceMs()),
 // to catch the current record's right window, if it 
exists, without more calls to the store
-timestamp + 1)
+inputRecordTimestamp + 1)
 ) {
 while (iterator.hasNext()) {
-final KeyValue, ValueAndTimestamp> next = 
iterator.next();
-windowStartTimes.add(next.key.window().start());
-final long startTime = next.key.window().start();
+final KeyValue, ValueAndTimestamp> 
windowBeingProcessed = iterator.next();
+final long startTime = 
windowBeingProcessed.key.window().start();
+windowStartTimes.add(startTime);
 final long endTime = startTime + 
windows.timeDifferenceMs();
+final long windowMaxRecordTimestamp = 
windowBeingProcessed.value.timestamp();
 
-if (endTime < timestamp) {
-leftWinAgg = next.value;
-if (isLeftWindow(next)) {
-latestLeftTypeWindow = next.key.window();
-}
-} else if (endTime == timestamp) {
+if (endTime < inputRecordTimestamp) {
+leftWinAgg = windowBeingProcessed.value;
+previousRecordTimestamp = windowMaxRecordTimestamp;
+} else if (endTime == inputRecordTimestamp) {
 leftWinAlreadyCreated = true;
-putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
-} else if (endTime > timestamp && startTime <= timestamp) {
-rightWinAgg = next.value;
-putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
-} else {
+if (windowMaxRecordTimestamp < inputRecordTimestamp) {
+previousRecordTimestamp = windowMaxRecordTimestamp;
+}
+
updateWindowAndForward(windowBeingProcessed.key.window(), 
windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp);
+} else if (endTime > inputRecordTimestamp && startTime <= 
inputRecordTimestamp) {
+rightWinAgg = windowBeingProcessed.value;
+
updateWindowAndForward(windowBeingProcessed.key.window(), 
windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp);
+} else if (startTime == inputRecordTimestamp + 1) {
 rightWinAlreadyCreated = true;
+} else {
+throw new IllegalStateException("Unexpected window 
found when processing sliding windows");
 }
 }
 }
 
 //create right window for previous record
-if (latestLeftTypeWindow != null) {
-final long rightWinStart = latestLeftTypeWindow.end() + 1;
-if (!windowStartTimes.contains(rightWinStart)) {
-final TimeWindow window = new TimeWindow(rightWinStart, 
rightWinStart + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
-putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+if (previousRecordTimestamp != null) {
+final long previousRightWinStart = previousRecordTimestamp + 1;
+if (rightWindowNecessaryAndPossible(windowStartTimes, 
previousRightWinStart, inputRecordTimestamp)) {
+final TimeWindow window = new 
TimeWindow(previousRightWinStart, previousRightWinStart + 
windows.timeDifferenceMs());
+final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp);
+   

[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-08 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r485221931



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -146,96 +165,203 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 boolean leftWinAlreadyCreated = false;
 boolean rightWinAlreadyCreated = false;
 
-// keep the left type window closest to the record
-Window latestLeftTypeWindow = null;
+Long previousRecordTimestamp = null;
+
 try (
 final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
 key,
 key,
-timestamp - 2 * windows.timeDifferenceMs(),
+Math.max(0, inputRecordTimestamp - 2 * 
windows.timeDifferenceMs()),
 // to catch the current record's right window, if it 
exists, without more calls to the store
-timestamp + 1)
+inputRecordTimestamp + 1)
 ) {
 while (iterator.hasNext()) {
-final KeyValue, ValueAndTimestamp> next = 
iterator.next();
-windowStartTimes.add(next.key.window().start());
-final long startTime = next.key.window().start();
+final KeyValue, ValueAndTimestamp> 
windowBeingProcessed = iterator.next();
+final long startTime = 
windowBeingProcessed.key.window().start();
+windowStartTimes.add(startTime);
 final long endTime = startTime + 
windows.timeDifferenceMs();
+final long windowMaxRecordTimestamp = 
windowBeingProcessed.value.timestamp();
 
-if (endTime < timestamp) {
-leftWinAgg = next.value;
-if (isLeftWindow(next)) {
-latestLeftTypeWindow = next.key.window();
-}
-} else if (endTime == timestamp) {
+if (endTime < inputRecordTimestamp) {
+leftWinAgg = windowBeingProcessed.value;
+previousRecordTimestamp = windowMaxRecordTimestamp;
+} else if (endTime == inputRecordTimestamp) {
 leftWinAlreadyCreated = true;
-putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
-} else if (endTime > timestamp && startTime <= timestamp) {
-rightWinAgg = next.value;
-putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
-} else {
+if (windowMaxRecordTimestamp < inputRecordTimestamp) {
+previousRecordTimestamp = windowMaxRecordTimestamp;
+}
+
updateWindowAndForward(windowBeingProcessed.key.window(), 
windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp);
+} else if (endTime > inputRecordTimestamp && startTime <= 
inputRecordTimestamp) {
+rightWinAgg = windowBeingProcessed.value;
+
updateWindowAndForward(windowBeingProcessed.key.window(), 
windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp);
+} else if (startTime == inputRecordTimestamp + 1) {
 rightWinAlreadyCreated = true;
+} else {
+throw new IllegalStateException("Unexpected window 
found when processing sliding windows");
 }
 }
 }
 
 //create right window for previous record
-if (latestLeftTypeWindow != null) {
-final long rightWinStart = latestLeftTypeWindow.end() + 1;
-if (!windowStartTimes.contains(rightWinStart)) {
-final TimeWindow window = new TimeWindow(rightWinStart, 
rightWinStart + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
-putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+if (previousRecordTimestamp != null) {
+final long previousRightWinStart = previousRecordTimestamp + 1;
+if (rightWindowNecessaryAndPossible(windowStartTimes, 
previousRightWinStart, inputRecordTimestamp)) {
+final TimeWindow window = new 
TimeWindow(previousRightWinStart, previousRightWinStart + 
windows.timeDifferenceMs());
+final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp);
+   

[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-08 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r485221052



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -146,96 +165,203 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 boolean leftWinAlreadyCreated = false;
 boolean rightWinAlreadyCreated = false;
 
-// keep the left type window closest to the record
-Window latestLeftTypeWindow = null;
+Long previousRecordTimestamp = null;
+
 try (
 final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
 key,
 key,
-timestamp - 2 * windows.timeDifferenceMs(),
+Math.max(0, inputRecordTimestamp - 2 * 
windows.timeDifferenceMs()),
 // to catch the current record's right window, if it 
exists, without more calls to the store
-timestamp + 1)
+inputRecordTimestamp + 1)
 ) {
 while (iterator.hasNext()) {
-final KeyValue, ValueAndTimestamp> next = 
iterator.next();
-windowStartTimes.add(next.key.window().start());
-final long startTime = next.key.window().start();
+final KeyValue, ValueAndTimestamp> 
windowBeingProcessed = iterator.next();
+final long startTime = 
windowBeingProcessed.key.window().start();
+windowStartTimes.add(startTime);
 final long endTime = startTime + 
windows.timeDifferenceMs();
+final long windowMaxRecordTimestamp = 
windowBeingProcessed.value.timestamp();
 
-if (endTime < timestamp) {
-leftWinAgg = next.value;
-if (isLeftWindow(next)) {
-latestLeftTypeWindow = next.key.window();
-}
-} else if (endTime == timestamp) {
+if (endTime < inputRecordTimestamp) {
+leftWinAgg = windowBeingProcessed.value;
+previousRecordTimestamp = windowMaxRecordTimestamp;
+} else if (endTime == inputRecordTimestamp) {
 leftWinAlreadyCreated = true;
-putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
-} else if (endTime > timestamp && startTime <= timestamp) {
-rightWinAgg = next.value;
-putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
-} else {
+if (windowMaxRecordTimestamp < inputRecordTimestamp) {
+previousRecordTimestamp = windowMaxRecordTimestamp;
+}
+
updateWindowAndForward(windowBeingProcessed.key.window(), 
windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp);
+} else if (endTime > inputRecordTimestamp && startTime <= 
inputRecordTimestamp) {
+rightWinAgg = windowBeingProcessed.value;
+
updateWindowAndForward(windowBeingProcessed.key.window(), 
windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp);
+} else if (startTime == inputRecordTimestamp + 1) {
 rightWinAlreadyCreated = true;
+} else {
+throw new IllegalStateException("Unexpected window 
found when processing sliding windows");

Review comment:
   nit: log an error and include the relevant info (eg `windowStart` and 
`inputRecordTimestamp` at least). Same for the IllegalStateException in 
`processEarly`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10471) TimeIndex handling may cause data loss in certain back to back failure

2020-09-08 Thread Rohit Shekhar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17192504#comment-17192504
 ] 

Rohit Shekhar commented on KAFKA-10471:
---

+1 [~junrao] will detect unclean shutdown more reliably.

[~dhruvilshah] agree with your comment too, just not sure the cost of it during 
the log load time, so can we do a sanity test in a lazy fashion too?

> TimeIndex handling may cause data loss in certain back to back failure
> --
>
> Key: KAFKA-10471
> URL: https://issues.apache.org/jira/browse/KAFKA-10471
> Project: Kafka
>  Issue Type: Bug
>  Components: core, log
>Reporter: Rohit Shekhar
>Priority: Critical
>
> # Active segment for log A going clean shutdown - trim the time index to the 
> latest fill value, set the clean shutdown marker.
>  # Broker restarts, loading logs - no recovery due to clean shutdown marker, 
> log A recovers with the previous active segment as current. It also resized 
> the TimeIndex to the max.
>  #  Before all the log loads, the broker had a hard shutdown causing a clean 
> shutdown marker left as is.
>  #  Broker restarts, log A skips recovery due to the presence of a clean 
> shutdown marker but the TimeIndex file assumes the resized file from the 
> previous instance is all full (it assumes either file is newly created or is 
> full with valid value).
>  # The first append to the active segment will result in roll and TimeIndex 
> will be rolled with the timestamp value of the last valid entry (0)
>  # Segment's largest timestamp gives 0 (this can cause premature deletion of 
> data due to retention.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

2020-09-08 Thread GitBox


ableegoldman commented on a change in pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#discussion_r485218625



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##
@@ -58,22 +58,23 @@ public void init(final ProcessorContext context) {
 
 @Override
 public void process(final K1 key, final V1 value) {
-// we do join iff keys are equal, thus, if key is null we cannot join 
and just ignore the record
-// If {@code keyMapper} returns {@code null} it implies there is no 
match,
+// We allow null keys unless {@code keyMapper} returns {@code null} 
and we ignore it as invalid.

Review comment:
   Well, if `mappedKey` is null then there can't be a match in the global 
table since we can't do a lookup with a null key. I think what @mjsax means 
here (correct me if wrong) is just that we could phrase it a bit differently to 
say something like
   ```
   // If the mappedKey is null, we ignore it as invalid. This should never 
happen for KTables 
   // since keyMapper just returns the key, but for GlobalKTables a non-null 
key can result 
   // in a null mappedKey. There can't be a match for a null mappedKey, so we 
drop it
   ```
   
   ...or something. Thoughts?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10471) TimeIndex handling may cause data loss in certain back to back failure

2020-09-08 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17192503#comment-17192503
 ] 

Jun Rao commented on KAFKA-10471:
-

[~dhruvilshah] : I agree that the sanity check in the index file is useful to 
catch bugs like this. For this particular bug, the root cause is that the clean 
shutdown file doesn't reflect the actual on-disk state correctly. So, it would 
be useful to fix that directly too.

> TimeIndex handling may cause data loss in certain back to back failure
> --
>
> Key: KAFKA-10471
> URL: https://issues.apache.org/jira/browse/KAFKA-10471
> Project: Kafka
>  Issue Type: Bug
>  Components: core, log
>Reporter: Rohit Shekhar
>Priority: Critical
>
> # Active segment for log A going clean shutdown - trim the time index to the 
> latest fill value, set the clean shutdown marker.
>  # Broker restarts, loading logs - no recovery due to clean shutdown marker, 
> log A recovers with the previous active segment as current. It also resized 
> the TimeIndex to the max.
>  #  Before all the log loads, the broker had a hard shutdown causing a clean 
> shutdown marker left as is.
>  #  Broker restarts, log A skips recovery due to the presence of a clean 
> shutdown marker but the TimeIndex file assumes the resized file from the 
> previous instance is all full (it assumes either file is newly created or is 
> full with valid value).
>  # The first append to the active segment will result in roll and TimeIndex 
> will be rolled with the timestamp value of the last valid entry (0)
>  # Segment's largest timestamp gives 0 (this can cause premature deletion of 
> data due to retention.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

2020-09-08 Thread GitBox


ableegoldman commented on a change in pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#discussion_r485218625



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##
@@ -58,22 +58,23 @@ public void init(final ProcessorContext context) {
 
 @Override
 public void process(final K1 key, final V1 value) {
-// we do join iff keys are equal, thus, if key is null we cannot join 
and just ignore the record
-// If {@code keyMapper} returns {@code null} it implies there is no 
match,
+// We allow null keys unless {@code keyMapper} returns {@code null} 
and we ignore it as invalid.

Review comment:
   Well, if `mappedKey` is null then there can't be a match in the global 
table since we can't do a lookup with a null key. I think what @mjsax means 
here (correct me if wrong) is just that we could phrase it a bit differently to 
say something like
   ```
   // If the mappedKey is null, we ignore it as invalid. This should never 
happen for KTables since keyMapper 
   // just returns the key, but for GlobalKTables a non-null key can result in 
a null mappedKey. Since there 
   // can't be a match for a null mappedKey, we drop it
   ```
   
   ...or something. Thoughts?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

2020-09-08 Thread GitBox


ableegoldman commented on a change in pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#discussion_r485218625



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##
@@ -58,22 +58,23 @@ public void init(final ProcessorContext context) {
 
 @Override
 public void process(final K1 key, final V1 value) {
-// we do join iff keys are equal, thus, if key is null we cannot join 
and just ignore the record
-// If {@code keyMapper} returns {@code null} it implies there is no 
match,
+// We allow null keys unless {@code keyMapper} returns {@code null} 
and we ignore it as invalid.

Review comment:
   Well, if `mappedKey` is null then there can't be a match in the global 
table since we can't do a lookup with a null key. I think what @mjsax means 
here (correct me if wrong) is just that we could phrase it a bit differently to 
say something like
   "If the mappedKey is null, we ignore it as invalid. This should never happen 
for KTables since keyMapper just returns the key, but for GlobalKTables a 
non-null key can result in a null mappedKey. Since there can't be a match for a 
null mappedKey, we drop it"
   
   ...or something. Thoughts?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##
@@ -58,22 +58,23 @@ public void init(final ProcessorContext context) {
 
 @Override
 public void process(final K1 key, final V1 value) {
-// we do join iff keys are equal, thus, if key is null we cannot join 
and just ignore the record
-// If {@code keyMapper} returns {@code null} it implies there is no 
match,
+// We allow null keys unless {@code keyMapper} returns {@code null} 
and we ignore it as invalid.

Review comment:
   Well, if `mappedKey` is null then there can't be a match in the global 
table since we can't do a lookup with a null key. I think what @mjsax means 
here (correct me if wrong) is just that we could phrase it a bit differently to 
say something like
   ```
   If the mappedKey is null, we ignore it as invalid. This should never happen 
for KTables since keyMapper just returns the key, but for GlobalKTables a 
non-null key can result in a null mappedKey. Since there can't be a match for a 
null mappedKey, we drop it
   ```
   
   ...or something. Thoughts?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

2020-09-08 Thread GitBox


ableegoldman commented on a change in pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#discussion_r485217278



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##
@@ -58,23 +58,22 @@ public void init(final ProcessorContext context) {
 
 @Override
 public void process(final K1 key, final V1 value) {
-// we do join iff keys are equal, thus, if key is null we cannot join 
and just ignore the record
-// If {@code keyMapper} returns {@code null} it implies there is no 
match,
-// so ignore unless it is a left join

Review comment:
   Ah yeah sorry, I'm getting things mixed up here...this comment is 
referring to when the mappedKey is null while the condition I cited below now 
only applies to when the value is null.  Your reasoning sounds correct, we 
should still process the record in that case if it's a left join. But we should 
also remove this comment, since if the mappedKey is null then we drop it, 
regardless of if its a left join or any other





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10471) TimeIndex handling may cause data loss in certain back to back failure

2020-09-08 Thread Dhruvil Shah (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17192494#comment-17192494
 ] 

Dhruvil Shah commented on KAFKA-10471:
--

It may be nice to use the time index sanity check to catch issues where an 
index file is not in a state we expect. We used to perform a sanity check for 
indices associated with all segments, but with the changes to load segments 
lazily, we dropped those checks. If we could reintroduce those checks back 
safely, that may be sufficient to catch and fix such cases. We are taking a 
similar approach in https://issues.apache.org/jira/browse/KAFKA-10207, so that 
might help here too.

> TimeIndex handling may cause data loss in certain back to back failure
> --
>
> Key: KAFKA-10471
> URL: https://issues.apache.org/jira/browse/KAFKA-10471
> Project: Kafka
>  Issue Type: Bug
>  Components: core, log
>Reporter: Rohit Shekhar
>Priority: Critical
>
> # Active segment for log A going clean shutdown - trim the time index to the 
> latest fill value, set the clean shutdown marker.
>  # Broker restarts, loading logs - no recovery due to clean shutdown marker, 
> log A recovers with the previous active segment as current. It also resized 
> the TimeIndex to the max.
>  #  Before all the log loads, the broker had a hard shutdown causing a clean 
> shutdown marker left as is.
>  #  Broker restarts, log A skips recovery due to the presence of a clean 
> shutdown marker but the TimeIndex file assumes the resized file from the 
> previous instance is all full (it assumes either file is newly created or is 
> full with valid value).
>  # The first append to the active segment will result in roll and TimeIndex 
> will be rolled with the timestamp value of the last valid entry (0)
>  # Segment's largest timestamp gives 0 (this can cause premature deletion of 
> data due to retention.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10471) TimeIndex handling may cause data loss in certain back to back failure

2020-09-08 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17192492#comment-17192492
 ] 

Jun Rao commented on KAFKA-10471:
-

[~rshekhar]: Thanks for reporting this. This is a very good finding. If the 
TimeIndex doesn't start in a clean state, it can cause multiple things to go 
wrong afterward.

One way to fix this issue is to check the presence of the clean shutdown file 
in LogManager.loadLogs() before loading each individual log. We  then delete 
the clean shutdown file and pass a clean shutdown flag to Log and use that in 
Log.recoverLog(). That way, in step 4 above, the TimeIndex will be rebuilt 
properly.

> TimeIndex handling may cause data loss in certain back to back failure
> --
>
> Key: KAFKA-10471
> URL: https://issues.apache.org/jira/browse/KAFKA-10471
> Project: Kafka
>  Issue Type: Bug
>  Components: core, log
>Reporter: Rohit Shekhar
>Priority: Critical
>
> # Active segment for log A going clean shutdown - trim the time index to the 
> latest fill value, set the clean shutdown marker.
>  # Broker restarts, loading logs - no recovery due to clean shutdown marker, 
> log A recovers with the previous active segment as current. It also resized 
> the TimeIndex to the max.
>  #  Before all the log loads, the broker had a hard shutdown causing a clean 
> shutdown marker left as is.
>  #  Broker restarts, log A skips recovery due to the presence of a clean 
> shutdown marker but the TimeIndex file assumes the resized file from the 
> previous instance is all full (it assumes either file is newly created or is 
> full with valid value).
>  # The first append to the active segment will result in roll and TimeIndex 
> will be rolled with the timestamp value of the last valid entry (0)
>  # Segment's largest timestamp gives 0 (this can cause premature deletion of 
> data due to retention.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10448) Preserve Source Partition in Kafka Streams from context

2020-09-08 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17192491#comment-17192491
 ] 

Matthias J. Sax commented on KAFKA-10448:
-

Thanks, feel free to start a discussion on the dev mailing list directly.

> Preserve Source Partition in Kafka Streams from context
> ---
>
> Key: KAFKA-10448
> URL: https://issues.apache.org/jira/browse/KAFKA-10448
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: satya
>Priority: Minor
>  Labels: needs-kip
>
> Currently Kafka streams Sink Nodes use default partitioner or has the 
> provision of using a custom partitioner which has to be dependent on 
> key/value. I am looking for an enhancement of Sink Node to ensure source 
> partition is preserved instead of deriving the partition again using 
> key/value. One of our use case has producers which have custom partitioners 
> that we dont have access to as it is a third-party application. By simply 
> preserving the partition through context.partition() would be helpful.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10471) TimeIndex handling may cause data loss in certain back to back failure

2020-09-08 Thread Rohit Shekhar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rohit Shekhar updated KAFKA-10471:
--
Description: 
# Active segment for log A going clean shutdown - trim the time index to the 
latest fill value, set the clean shutdown marker.
 # Broker restarts, loading logs - no recovery due to clean shutdown marker, 
log A recovers with the previous active segment as current. It also resized the 
TimeIndex to the max.
 #  Before all the log loads, the broker had a hard shutdown causing a clean 
shutdown marker left as is.
 #  Broker restarts, log A skips recovery due to the presence of a clean 
shutdown marker but the TimeIndex file assumes the resized file from the 
previous instance is all full (it assumes either file is newly created or is 
full with valid value).
 # The first append to the active segment will result in roll and TimeIndex 
will be rolled with the timestamp value of the last valid entry (0)
 # Segment's largest timestamp gives 0 (this can cause premature deletion of 
data due to retention.

  was:
# Active segment for log A going clean shutdown - trim the time index to the 
latest fill value, set the clean shutdown marker.
 # Broker restarts, loading logs - no recovery due to clean shutdown marker, 
log A recovers with the previous active segment as current. It also resized the 
TimeIndex to the max.
 #  Before all the log loads, the broker had a hard shutdown causing a clean 
shutdown marker left as is.
 #  Broker restarts, log A skips recovery due to the presence of a clean 
shutdown marker but the TimeIndex file assumes the resized file from the 
previous instance is all full (it assumes either file is newly created or is 
full with valid value).
 # The first append to the active segment will result in roll and TimeIndex 
will be rolled with the timestamp value of the last valid entry (0)
 # Segment.the largest timestamp gives 0 (this can cause premature deletion of 
data due to retention.


> TimeIndex handling may cause data loss in certain back to back failure
> --
>
> Key: KAFKA-10471
> URL: https://issues.apache.org/jira/browse/KAFKA-10471
> Project: Kafka
>  Issue Type: Bug
>  Components: core, log
>Reporter: Rohit Shekhar
>Priority: Critical
>
> # Active segment for log A going clean shutdown - trim the time index to the 
> latest fill value, set the clean shutdown marker.
>  # Broker restarts, loading logs - no recovery due to clean shutdown marker, 
> log A recovers with the previous active segment as current. It also resized 
> the TimeIndex to the max.
>  #  Before all the log loads, the broker had a hard shutdown causing a clean 
> shutdown marker left as is.
>  #  Broker restarts, log A skips recovery due to the presence of a clean 
> shutdown marker but the TimeIndex file assumes the resized file from the 
> previous instance is all full (it assumes either file is newly created or is 
> full with valid value).
>  # The first append to the active segment will result in roll and TimeIndex 
> will be rolled with the timestamp value of the last valid entry (0)
>  # Segment's largest timestamp gives 0 (this can cause premature deletion of 
> data due to retention.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10471) TimeIndex handling may cause data loss in certain back to back failure

2020-09-08 Thread Rohit Shekhar (Jira)
Rohit Shekhar created KAFKA-10471:
-

 Summary: TimeIndex handling may cause data loss in certain back to 
back failure
 Key: KAFKA-10471
 URL: https://issues.apache.org/jira/browse/KAFKA-10471
 Project: Kafka
  Issue Type: Bug
  Components: core, log
Reporter: Rohit Shekhar


# Active segment for log A going clean shutdown - trim the time index to the 
latest fill value, set the clean shutdown marker.
 # Broker restarts, loading logs - no recovery due to clean shutdown marker, 
log A recovers with the previous active segment as current. It also resized the 
TimeIndex to the max.
 #  Before all the log loads, the broker had a hard shutdown causing a clean 
shutdown marker left as is.
 #  Broker restarts, log A skips recovery due to the presence of a clean 
shutdown marker but the TimeIndex file assumes the resized file from the 
previous instance is all full (it assumes either file is newly created or is 
full with valid value).
 # The first append to the active segment will result in roll and TimeIndex 
will be rolled with the timestamp value of the last valid entry (0)
 # Segment.the largest timestamp gives 0 (this can cause premature deletion of 
data due to retention.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-08 Thread GitBox


mjsax commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r485129599



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -117,25 +117,44 @@ public void process(final K key, final V value) {
 return;
 }
 
-final long timestamp = context().timestamp();
-//don't process records that don't fall within a full sliding 
window
-if (timestamp < windows.timeDifferenceMs()) {
+final long inputRecordTimestamp = context().timestamp();
+observedStreamTime = Math.max(observedStreamTime, 
inputRecordTimestamp);
+final long closeTime = observedStreamTime - 
windows.gracePeriodMs();
+
+if (inputRecordTimestamp + 1 + windows.timeDifferenceMs() <= 
closeTime) {

Review comment:
   nit: `1` -> `1L`

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -146,96 +165,203 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 boolean leftWinAlreadyCreated = false;
 boolean rightWinAlreadyCreated = false;
 
-// keep the left type window closest to the record
-Window latestLeftTypeWindow = null;
+Long previousRecordTimestamp = null;
+
 try (
 final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
 key,
 key,
-timestamp - 2 * windows.timeDifferenceMs(),
+Math.max(0, inputRecordTimestamp - 2 * 
windows.timeDifferenceMs()),
 // to catch the current record's right window, if it 
exists, without more calls to the store
-timestamp + 1)
+inputRecordTimestamp + 1)
 ) {
 while (iterator.hasNext()) {
-final KeyValue, ValueAndTimestamp> next = 
iterator.next();
-windowStartTimes.add(next.key.window().start());
-final long startTime = next.key.window().start();
+final KeyValue, ValueAndTimestamp> 
windowBeingProcessed = iterator.next();
+final long startTime = 
windowBeingProcessed.key.window().start();
+windowStartTimes.add(startTime);
 final long endTime = startTime + 
windows.timeDifferenceMs();
+final long windowMaxRecordTimestamp = 
windowBeingProcessed.value.timestamp();
 
-if (endTime < timestamp) {
-leftWinAgg = next.value;
-if (isLeftWindow(next)) {
-latestLeftTypeWindow = next.key.window();
-}
-} else if (endTime == timestamp) {
+if (endTime < inputRecordTimestamp) {
+leftWinAgg = windowBeingProcessed.value;
+previousRecordTimestamp = windowMaxRecordTimestamp;
+} else if (endTime == inputRecordTimestamp) {
 leftWinAlreadyCreated = true;
-putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
-} else if (endTime > timestamp && startTime <= timestamp) {
-rightWinAgg = next.value;
-putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
-} else {
+if (windowMaxRecordTimestamp < inputRecordTimestamp) {
+previousRecordTimestamp = windowMaxRecordTimestamp;
+}
+
updateWindowAndForward(windowBeingProcessed.key.window(), 
windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp);
+} else if (endTime > inputRecordTimestamp && startTime <= 
inputRecordTimestamp) {
+rightWinAgg = windowBeingProcessed.value;
+
updateWindowAndForward(windowBeingProcessed.key.window(), 
windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp);
+} else if (startTime == inputRecordTimestamp + 1) {
 rightWinAlreadyCreated = true;
+} else {
+throw new IllegalStateException("Unexpected window 
found when processing sliding windows");
 }
 }
 }
 
 //create right window for previous record
-if (latestLeftTypeWindow != null) {
-final long rightWinStart = latestLeftTypeWindow.end() + 1;
-if (!windowStartTimes.contains(rightWinStart)) {
-final TimeWindow window = new TimeWindo

[GitHub] [kafka] lct45 commented on pull request #9264: KAFKA-5636: Add Sliding Windows documentation

2020-09-08 Thread GitBox


lct45 commented on pull request #9264:
URL: https://github.com/apache/kafka/pull/9264#issuecomment-689115428


   @ableegoldman for review



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9157: Update for KIP-450 to handle early records

2020-09-08 Thread GitBox


ableegoldman commented on pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#issuecomment-689114712


   One unrelated test failure: 
`kafka.api.PlaintextAdminIntegrationTest.testAlterReplicaLogDirs`



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10432) LeaderEpochCache is incorrectly recovered on segment recovery for epoch 0

2020-09-08 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-10432.
-
Fix Version/s: 2.6.1
   Resolution: Fixed

> LeaderEpochCache is incorrectly recovered on segment recovery for epoch 0
> -
>
> Key: KAFKA-10432
> URL: https://issues.apache.org/jira/browse/KAFKA-10432
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0
>Reporter: Lucas Bradstreet
>Priority: Major
> Fix For: 2.6.1
>
>
> I added some functionality to the system tests to compare epoch cache 
> lineages ([https://github.com/apache/kafka/pull/9213]), and I found a bug in 
> leader epoch cache recovery.
> The test hard kills a broker and the cache hasn't been flushed yet, and then 
> it starts up and goes through log recovery. After recovery there is 
> divergence in the epoch caches for epoch 0:
> {noformat}
> AssertionError: leader epochs for output-topic-1 didn't match
>  [{0: 9393L, 2: 9441L, 4: 42656L},
>  {0: 0L, 2: 9441L, 4: 42656L}, 
>  {0: 0L, 2: 9441L, 4: 42656L}]
>   
>   
> {noformat}
> The cache is supposed to include the offset for epoch 0 but in recovery it 
> skips it 
> [https://github.com/apache/kafka/blob/487b3682ebe0eefde3445b37ee72956451a9d15e/core/src/main/scala/kafka/log/LogSegment.scala#L364]
>  due to 
> [https://github.com/apache/kafka/commit/d152989f26f51b9004b881397db818ad6eaf0392].
>  Then it stamps the epoch with a later offset when fetching from the leader.
> I'm not sure why the recovery code includes the condition 
> `batch.partitionLeaderEpoch > 0`. I discussed this with Jason Gustafson and 
> he believes it may have been intended to avoid assigning negative epochs but 
> is not sure why it was added. None of the tests fail with this check removed.
> {noformat}
>   leaderEpochCache.foreach { cache =>
> if (batch.partitionLeaderEpoch > 0 && 
> cache.latestEpoch.forall(batch.partitionLeaderEpoch > _))
>   cache.assign(batch.partitionLeaderEpoch, batch.baseOffset)
>   }
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji merged pull request #9219: KAFKA-10432: LeaderEpochCache is incorrectly recovered for leader epoch 0

2020-09-08 Thread GitBox


hachikuji merged pull request #9219:
URL: https://github.com/apache/kafka/pull/9219


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #9267: MINOR: Add debug logs for StreamThread

2020-09-08 Thread GitBox


guozhangwang commented on a change in pull request #9267:
URL: https://github.com/apache/kafka/pull/9267#discussion_r485132348



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -706,13 +662,17 @@ void runOnce() {
 totalProcessed += processed;
 }
 
+log.debug("TaskManager#process handled {} records; invoking 
TaskManager#punctuate", processed);

Review comment:
   Nit: I'd suggest we do not expose internal class names in log entries, 
e.g. here we can say "Processed {} records with {} iterations, invoking 
punctuation now", ditto below.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -689,6 +644,7 @@ void runOnce() {
  *  6. Otherwise, increment N.
  */
 do {
+log.debug("Invoking TaskManager#process with {} iterations.", 
numIterations);

Review comment:
   What's the rationale of recording both the starting and the ending of a 
procedure? If it is for trouble shooting purposes only maybe the starting log 
entry can be trace while ending entry is debug?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -752,6 +712,77 @@ void runOnce() {
 commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, 
now);
 }
 
+private void initializeAndRestorePhase() {
+{
+// only try to initialize the assigned tasks
+// if the state is still in PARTITION_ASSIGNED after the poll call
+final State stateSnapshot = state;
+if (stateSnapshot == State.PARTITIONS_ASSIGNED
+|| stateSnapshot == State.RUNNING && 
taskManager.needsInitializationOrRestoration()) {
+
+log.debug("State is {}; initializing and restoring", 
stateSnapshot);
+
+// transit to restore active is idempotent so we can call it 
multiple times
+changelogReader.enforceRestoreActive();
+
+if (taskManager.tryToCompleteRestoration()) {
+changelogReader.transitToUpdateStandby();
+
+setState(State.RUNNING);
+}
+
+if (log.isDebugEnabled()) {
+log.debug("Initialization and restore call done. State is 
{}", state);
+}
+}
+}
+
+log.debug("Invoking ChangeLogReader#restore");
+// we can always let changelog reader try restoring in order to 
initialize the changelogs;
+// if there's no active restoring or standby updating it would not try 
to fetch any data
+changelogReader.restore();
+}
+
+private long pollPhase() {
+final ConsumerRecords records;
+log.debug("Invoking Consumer#poll");
+
+if (state == State.PARTITIONS_ASSIGNED) {
+// try to fetch some records with zero poll millis
+// to unblock the restoration as soon as possible
+records = pollRequests(Duration.ZERO);
+} else if (state == State.PARTITIONS_REVOKED) {
+// try to fetch som records with zero poll millis to unblock
+// other useful work while waiting for the join response
+records = pollRequests(Duration.ZERO);
+} else if (state == State.RUNNING || state == State.STARTING) {
+// try to fetch some records with normal poll time
+// in order to get long polling
+records = pollRequests(pollTime);
+} else if (state == State.PENDING_SHUTDOWN) {
+// we are only here because there's rebalance in progress,
+// just poll with zero to complete it
+records = pollRequests(Duration.ZERO);
+} else {
+// any other state should not happen
+log.error("Unexpected state {} during normal iteration", state);
+throw new StreamsException(logPrefix + "Unexpected state " + state 
+ " during normal iteration");
+}
+
+final long pollLatency = advanceNowAndComputeLatency();
+
+if (log.isDebugEnabled()) {
+log.debug("Consumer#poll completed in {} ms and fetched {} 
records", pollLatency, records.count());
+}
+pollSensor.record(pollLatency, now);
+
+if (!records.isEmpty()) {

Review comment:
   SG.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -752,6 +712,77 @@ void runOnce() {
 commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, 
now);
 }
 
+private void initializeAndRestorePhase() {
+{
+// only try to initialize the assigned tasks
+// if the state is still in PARTITION_ASSIGNED after the poll call
+final State stateSnapshot = state;
+  

[GitHub] [kafka] jonhkr commented on pull request #9256: Fix some Gradle deprecation warnings

2020-09-08 Thread GitBox


jonhkr commented on pull request #9256:
URL: https://github.com/apache/kafka/pull/9256#issuecomment-689072311


   @ijuma Yes, it was.
   I've updated the code to use the `api` configuration provided by the 
`java-library` plugin. This configuration has the same behaviour of the 
`compile` that was deprecated.
   
   Now the generated poms should look exactly as before.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] JoelWee commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

2020-09-08 Thread GitBox


JoelWee commented on a change in pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#discussion_r485130942



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##
@@ -58,23 +58,22 @@ public void init(final ProcessorContext context) {
 
 @Override
 public void process(final K1 key, final V1 value) {
-// we do join iff keys are equal, thus, if key is null we cannot join 
and just ignore the record
-// If {@code keyMapper} returns {@code null} it implies there is no 
match,
-// so ignore unless it is a left join

Review comment:
   Right, thanks for the clarification! Wouldn't we still want the` 
leftJoin` in that case though? When we reach the `leftJoin` in the 
[code](https://github.com/apache/kafka/pull/9186/commits/e9616c64dfdc33481d0b831f80ecd0385801c761),
 the `mappedKey` is never null but it might not exist in the GlobalKTable (and 
so `value2` is null). If we're doing a `leftJoin`, then we'll want to allow 
these null values? (If not, the `leftJoin` is just the same as the normal 
`join`?)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] JoelWee commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

2020-09-08 Thread GitBox


JoelWee commented on a change in pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#discussion_r485126654



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##
@@ -58,22 +58,23 @@ public void init(final ProcessorContext context) {
 
 @Override
 public void process(final K1 key, final V1 value) {
-// we do join iff keys are equal, thus, if key is null we cannot join 
and just ignore the record
-// If {@code keyMapper} returns {@code null} it implies there is no 
match,
+// We allow null keys unless {@code keyMapper} returns {@code null} 
and we ignore it as invalid.

Review comment:
   I think that makes sense - but that will mean removing the previous 
logic of "`mappedKey` is null implies key not found in global table"? (Original 
line 62)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10455) Probing rebalances are not guaranteed to be triggered by non-leader members

2020-09-08 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17192371#comment-17192371
 ] 

Guozhang Wang commented on KAFKA-10455:
---

Thanks for the info [~ableegoldman]. I think we can have two approaches here 
which are not necessarily conflicting with each other (i.e. we can probably do 
both):

1. We a) modify client to always refresh metadata before re-joining the group 
to reduce the probability of rebalance storm, and then b) modify brokers to 
just allow any join request to trigger a rebalance. But this depends on brokers 
and hence people who do not upgrade their brokers would not get this fix.

2. Based on the fact that only consumers from the same instance of the leader 
thread would trigger rebalances, we can take it as a by-product of 
multi-threading proposal such that each instance would only have one consumer 
instead of one-consumer per thread. And with a single consumer per instance, we 
can avoid this issue as well as simplifying our two-phase assignment algorithm 
to single-phase as well.

> Probing rebalances are not guaranteed to be triggered by non-leader members
> ---
>
> Key: KAFKA-10455
> URL: https://issues.apache.org/jira/browse/KAFKA-10455
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> Apparently, if a consumer rejoins the group with the same subscription 
> userdata that it previously sent, it will not trigger a rebalance. The one 
> exception here is that the group leader will always trigger a rebalance when 
> it rejoins the group.
> This has implications for KIP-441, where we rely on asking an arbitrary 
> thread to enforce the followup probing rebalances. Technically we do ask a 
> thread living on the same instance as the leader, so the odds that the leader 
> will be chosen aren't completely abysmal, but for any multithreaded 
> application they are still at best only 50%.
> Of course in general the userdata will have changed within a span of 10 
> minutes, so the actual likelihood of hitting this is much lower –  it can 
> only happen if the member's task offset sums remained unchanged. 
> Realistically, this probably requires that the member only have 
> fully-restored active tasks (encoded with the constant sentinel -2) and that 
> no tasks be added or removed.
>  
> One solution would be to make sure the leader is responsible for the probing 
> rebalance. To do this, we would need to somehow expose the memberId of the 
> thread's main consumer to the partition assignor. I'm actually not sure if 
> that's currently possible to figure out or not. If not, we could just assign 
> the probing rebalance to every thread on the leader's instance. This 
> shouldn't result in multiple followup rebalances as the rebalance schedule 
> will be updated/reset on the first followup rebalance.
> Another solution would be to make sure the userdata is always different. We 
> could encode an extra bit that flip-flops, but then we'd have to persist the 
> latest value somewhere/somehow. Alternatively we could just encode the next 
> probing rebalance time in the subscription userdata, since that is guaranteed 
> to always be different from the previous rebalance. This might get tricky 
> though, and certainly wastes space in the subscription userdata. Also, this 
> would only solve the problem for KIP-441 probing rebalances, meaning we'd 
> have to individually ensure the userdata has changed for every type of 
> followup rebalance (see related issue below). So the first proposal, 
> requiring the leader trigger the rebalance, would be preferable.
> Note that, imho, we should just allow anyone to trigger a rebalance by 
> rejoining the group. But this would presumably require a broker-side change 
> and thus we would still need a workaround for KIP-441 to work with brokers.
>  
> Related issue:
> This also means the Streams workaround for [KAFKA-9821|http://example.com] is 
> not airtight, as we encode the followup rebalance in the member who is 
> supposed to _receive_ a revoked partition, rather than the member who is 
> actually revoking said partition. While the member doing the revoking will be 
> guaranteed to have different userdata, the member receiving the partition may 
> not. Making it the responsibility of the leader to trigger _any_ type of 
> followup rebalance would solve this issue as well.
> Note that other types of followup rebalance (version probing, static 
> membership with host info change) are guaranteed to have a change in the 
> subscription userdata, and will not hit this bug



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] stanislavkozlovski commented on pull request #9266: KAFKA-10469: Resolve logger levels hierarchically

2020-09-08 Thread GitBox


stanislavkozlovski commented on pull request #9266:
URL: https://github.com/apache/kafka/pull/9266#issuecomment-689043684


   Sorry, I mistook `kafka` for the root logger. I agree it tests the same path



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji opened a new pull request #9268: KAFKA-10442; Add transaction admin APIs for KIP-664

2020-09-08 Thread GitBox


hachikuji opened a new pull request #9268:
URL: https://github.com/apache/kafka/pull/9268


   This patch adds support for the new transactional APIs from KIP-664: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-664%3A+Provide+tooling+to+detect+and+abort+hanging+transactions#KIP664:Providetoolingtodetectandaborthangingtransactions-DescribeTransactions.
 
   
   Note that this does not include support for the `--find-hanging` action. I 
will add this separately.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9267: MINOR: Add debug logs for StreamThread

2020-09-08 Thread GitBox


wcarlson5 commented on a change in pull request #9267:
URL: https://github.com/apache/kafka/pull/9267#discussion_r485098403



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -752,6 +712,77 @@ void runOnce() {
 commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, 
now);
 }
 
+private void initializeAndRestorePhase() {
+{
+// only try to initialize the assigned tasks
+// if the state is still in PARTITION_ASSIGNED after the poll call
+final State stateSnapshot = state;
+if (stateSnapshot == State.PARTITIONS_ASSIGNED
+|| stateSnapshot == State.RUNNING && 
taskManager.needsInitializationOrRestoration()) {
+
+log.debug("State is {}; initializing and restoring", 
stateSnapshot);
+
+// transit to restore active is idempotent so we can call it 
multiple times
+changelogReader.enforceRestoreActive();
+
+if (taskManager.tryToCompleteRestoration()) {
+changelogReader.transitToUpdateStandby();
+
+setState(State.RUNNING);
+}
+
+if (log.isDebugEnabled()) {
+log.debug("Initialization and restore call done. State is 
{}", state);
+}
+}
+}
+
+log.debug("Invoking ChangeLogReader#restore");

Review comment:
   Is this necessary with the logs inside restore()? 
   
   maybe can include snapshotState so we can see if it's STARTING or RUNNING? 
because we don't see the state unless it enters the initialization. Not sure if 
this would be useful

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -706,13 +662,17 @@ void runOnce() {
 totalProcessed += processed;
 }
 
+log.debug("TaskManager#process handled {} records; invoking 
TaskManager#punctuate", processed);
+
 final int punctuated = taskManager.punctuate();
 final long punctuateLatency = advanceNowAndComputeLatency();
 totalPunctuateLatency += punctuateLatency;
 if (punctuated > 0) {
 punctuateSensor.record(punctuateLatency / (double) 
punctuated, now);
 }
 
+log.debug("TaskManager#punctuate executed: {}", punctuated);

Review comment:
   trace?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

2020-09-08 Thread GitBox


ableegoldman commented on a change in pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#discussion_r485097186



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##
@@ -58,23 +58,22 @@ public void init(final ProcessorContext context) {
 
 @Override
 public void process(final K1 key, final V1 value) {
-// we do join iff keys are equal, thus, if key is null we cannot join 
and just ignore the record
-// If {@code keyMapper} returns {@code null} it implies there is no 
match,
-// so ignore unless it is a left join

Review comment:
   Sorry, I think my original comment here was a bit ambiguous & 
confusingly phrased. What I meant was that the _removal_ of the comment seemed 
correct to me, ie we should not make any special exceptions for the left join 
case and should remove the `leftJoin` part of the `if (leftJoin || value2 != 
null) ` check down on line 79





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

2020-09-08 Thread GitBox


ableegoldman commented on a change in pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#discussion_r485097186



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##
@@ -58,23 +58,22 @@ public void init(final ProcessorContext context) {
 
 @Override
 public void process(final K1 key, final V1 value) {
-// we do join iff keys are equal, thus, if key is null we cannot join 
and just ignore the record
-// If {@code keyMapper} returns {@code null} it implies there is no 
match,
-// so ignore unless it is a left join

Review comment:
   Sorry, I think my original comment here was a bit ambiguous. What I 
meant was that the _removal_ of the comment seemed correct to me, ie we should 
not make any special exceptions for the left join case and should remove the 
`leftJoin` part of the `if (leftJoin || value2 != null) ` check down on line 79





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

2020-09-08 Thread GitBox


mjsax commented on a change in pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#discussion_r485096581



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##
@@ -58,22 +58,23 @@ public void init(final ProcessorContext context) {
 
 @Override
 public void process(final K1 key, final V1 value) {
-// we do join iff keys are equal, thus, if key is null we cannot join 
and just ignore the record
-// If {@code keyMapper} returns {@code null} it implies there is no 
match,
+// We allow null keys unless {@code keyMapper} returns {@code null} 
and we ignore it as invalid.
+// This happens for GlobalKTables but never for KTables since 
keyMapper just returns the key.
+// For non-null keys, if {@code keyMapper} returns {@code null} it 
implies there is no match,
 // so ignore unless it is a left join
 //
 // we also ignore the record if value is null, because in a key-value 
data model a null-value indicates
 // an empty message (ie, there is nothing to be joined) -- this 
contrast SQL NULL semantics
 // furthermore, on left/outer joins 'null' in ValueJoiner#apply() 
indicates a missing record --
 // thus, to be consistent and to avoid ambiguous null semantics, null 
values are ignored
-if (key == null || value == null) {
+final K2 mappedKey = keyMapper.apply(key, value);
+if ((key == null && mappedKey == null) || (!leftJoin && mappedKey == 
null) || value == null) {
 LOG.warn(
 "Skipping record due to null key or value. key=[{}] value=[{}] 
topic=[{}] partition=[{}] offset=[{}]",
 key, value, context().topic(), context().partition(), 
context().offset()
 );
 droppedRecordsSensor.record();
 } else {
-final K2 mappedKey = keyMapper.apply(key, value);
 final V2 value2 = mappedKey == null ? null : 
getValueOrNull(valueGetter.get(mappedKey));

Review comment:
   At this point, we know that `mappedKey != null`, otherwise, we would 
drop the record.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

2020-09-08 Thread GitBox


mjsax commented on a change in pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#discussion_r485096581



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##
@@ -58,22 +58,23 @@ public void init(final ProcessorContext context) {
 
 @Override
 public void process(final K1 key, final V1 value) {
-// we do join iff keys are equal, thus, if key is null we cannot join 
and just ignore the record
-// If {@code keyMapper} returns {@code null} it implies there is no 
match,
+// We allow null keys unless {@code keyMapper} returns {@code null} 
and we ignore it as invalid.
+// This happens for GlobalKTables but never for KTables since 
keyMapper just returns the key.
+// For non-null keys, if {@code keyMapper} returns {@code null} it 
implies there is no match,
 // so ignore unless it is a left join
 //
 // we also ignore the record if value is null, because in a key-value 
data model a null-value indicates
 // an empty message (ie, there is nothing to be joined) -- this 
contrast SQL NULL semantics
 // furthermore, on left/outer joins 'null' in ValueJoiner#apply() 
indicates a missing record --
 // thus, to be consistent and to avoid ambiguous null semantics, null 
values are ignored
-if (key == null || value == null) {
+final K2 mappedKey = keyMapper.apply(key, value);
+if ((key == null && mappedKey == null) || (!leftJoin && mappedKey == 
null) || value == null) {
 LOG.warn(
 "Skipping record due to null key or value. key=[{}] value=[{}] 
topic=[{}] partition=[{}] offset=[{}]",
 key, value, context().topic(), context().partition(), 
context().offset()
 );
 droppedRecordsSensor.record();
 } else {
-final K2 mappedKey = keyMapper.apply(key, value);
 final V2 value2 = mappedKey == null ? null : 
getValueOrNull(valueGetter.get(mappedKey));

Review comment:
   At this point, we know that `mappedKey != null`, otherwise, we would 
have dropped the record.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10313) Out of range offset errors leading to offset reset

2020-09-08 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-10313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17192358#comment-17192358
 ] 

Michał Łukowicz commented on KAFKA-10313:
-

Hello Team!

Same issue here - Kafka 2.5.0.

Michał

> Out of range offset errors leading to offset reset
> --
>
> Key: KAFKA-10313
> URL: https://issues.apache.org/jira/browse/KAFKA-10313
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.2.2
>Reporter: Varsha Abhinandan
>Priority: Major
>
> Hi,
>   
>  We have been occasionally noticing offset resets happening on the Kafka 
> consumer because of offset out of range error. However, I don't see any 
> errors in the broker logs. No logs related to leader-election, replica lag, 
> Kafka broker pod restarts or anything. (just info logs were enabled in the 
> prod environment).
>   
>  It appeared from the logs that the out of range error was because of the 
> fetch offset being larger than the offset range on the broker. Noticed this 
> happening multiple times on different consumers, stream apps in the prod 
> environment. So, it doesn't seem like an application bug and more like a bug 
> in the KafkaConsumer. Would like to understand the cause for such errors.
>   
>  Also, none of the offset reset options are desirable. Choosing "earliest" 
> creates a sudden huge lag (we have a retention of 24hours) and choosing 
> "latest" leads to data loss (the records produced between the out of range 
> error and when offset reset happens on the consumer). So, wondering if it is 
> better for the Kafka client to separate out 'auto.offset.reset' config for 
> just offset not found. For, out of range error maybe the Kafka client can 
> automatically reset the offset to latest if the fetch offset is higher to 
> prevent data loss. Also, automatically reset it to earliest if the fetch 
> offset is lesser than the start offset. 
>   
>  Following are the logs on the consumer side :
> {noformat}
> [2020-07-17T08:46:00,322Z] [INFO ] [pipeline-thread-12 
> ([prd453-19-event-upsert]-bo-pipeline-12)] 
> [o.a.k.c.consumer.internals.Fetcher] [Consumer 
> clientId=bo-indexer-group-prd453-19-on-c19-bo-indexer-upsert-blue-5d665bcbb7-dnvkh-pid-1-kafka-message-source-id-544,
>  groupId=bo-indexer-group-prd453-19] Fetch offset 476383711 is out of range 
> for partition prd453-19-event-upsert-32, resetting offset
> [2020-07-17T08:46:00,330Z] [INFO ] [pipeline-thread-12 
> ([prd453-19-event-upsert]-bo-pipeline-12)] 
> [o.a.k.c.consumer.internals.Fetcher] [Consumer 
> clientId=bo-indexer-group-prd453-19-on-c19-bo-indexer-upsert-blue-5d665bcbb7-dnvkh-pid-1-kafka-message-source-id-544,
>  groupId=bo-indexer-group-prd453-19] Resetting offset for partition 
> prd453-19-event-upsert-32 to offset 453223789.
>   {noformat}
> Broker logs for the partition :
> {noformat}
> [2020-07-17T07:40:12,082Z]  [INFO ]  [kafka-scheduler-4]  [kafka.log.Log]  
> [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Found deletable 
> segments with base offsets [452091893] due to retention time 8640ms breach
>  [2020-07-17T07:40:12,082Z]  [INFO ]  [kafka-scheduler-4]  [kafka.log.Log]  
> [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Scheduling log 
> segment [baseOffset 452091893, size 1073741693] for deletion.
>  [2020-07-17T07:40:12,083Z]  [INFO ]  [kafka-scheduler-4]  [kafka.log.Log]  
> [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Incrementing log 
> start offset to 453223789
>  [2020-07-17T07:41:12,083Z]  [INFO ]  [kafka-scheduler-7]  [kafka.log.Log]  
> [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Deleting segment 
> 452091893
>  [2020-07-17T07:41:12,114Z]  [INFO ]  [kafka-scheduler-7]  
> [kafka.log.LogSegment]  Deleted log 
> /data/kafka/prd453-19-event-upsert-32/000452091893.log.deleted.
>  [2020-07-17T07:41:12,114Z]  [INFO ]  [kafka-scheduler-7]  
> [kafka.log.LogSegment]  Deleted offset index 
> /data/kafka/prd453-19-event-upsert-32/000452091893.index.deleted.
>  [2020-07-17T07:41:12,114Z]  [INFO ]  [kafka-scheduler-7]  
> [kafka.log.LogSegment]  Deleted time index 
> /data/kafka/prd453-19-event-upsert-32/000452091893.timeindex.deleted.
>  [2020-07-17T07:52:31,836Z]  [INFO ]  [data-plane-kafka-request-handler-3]  
> [kafka.log.ProducerStateManager]  [ProducerStateManager 
> partition=prd453-19-event-upsert-32] Writing producer snapshot at offset 
> 475609786
>  [2020-07-17T07:52:31,836Z]  [INFO ]  [data-plane-kafka-request-handler-3]  
> [kafka.log.Log]  [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] 
> Rolled new log segment at offset 475609786 in 1 ms.{noformat}
>  
> {noformat}
> [2020-07-17T09:05:12,075Z]  [INFO ]  [kafka-scheduler-2]  [kafka.log.Log]  
> [Log partition=prd453-19-event-upser

[GitHub] [kafka] mjsax commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

2020-09-08 Thread GitBox


mjsax commented on a change in pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#discussion_r485096090



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##
@@ -58,22 +58,23 @@ public void init(final ProcessorContext context) {
 
 @Override
 public void process(final K1 key, final V1 value) {
-// we do join iff keys are equal, thus, if key is null we cannot join 
and just ignore the record
-// If {@code keyMapper} returns {@code null} it implies there is no 
match,
+// We allow null keys unless {@code keyMapper} returns {@code null} 
and we ignore it as invalid.
+// This happens for GlobalKTables but never for KTables since 
keyMapper just returns the key.
+// For non-null keys, if {@code keyMapper} returns {@code null} it 
implies there is no match,
 // so ignore unless it is a left join
 //
 // we also ignore the record if value is null, because in a key-value 
data model a null-value indicates
 // an empty message (ie, there is nothing to be joined) -- this 
contrast SQL NULL semantics
 // furthermore, on left/outer joins 'null' in ValueJoiner#apply() 
indicates a missing record --
 // thus, to be consistent and to avoid ambiguous null semantics, null 
values are ignored
-if (key == null || value == null) {
+final K2 mappedKey = keyMapper.apply(key, value);
+if ((key == null && mappedKey == null) || (!leftJoin && mappedKey == 
null) || value == null) {

Review comment:
   This condition seems unnecessary complex. Should it not just be:
   ```
   if (mappedKey == null || value == null) {
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

2020-09-08 Thread GitBox


mjsax commented on a change in pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#discussion_r485095735



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##
@@ -58,22 +58,23 @@ public void init(final ProcessorContext context) {
 
 @Override
 public void process(final K1 key, final V1 value) {
-// we do join iff keys are equal, thus, if key is null we cannot join 
and just ignore the record
-// If {@code keyMapper} returns {@code null} it implies there is no 
match,
+// We allow null keys unless {@code keyMapper} returns {@code null} 
and we ignore it as invalid.

Review comment:
   I guess we don't care about the original `key` any longer and only 
consider if `keyMapper` returns `null` or not?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #9266: KAFKA-10469: Resolve logger levels hierarchically

2020-09-08 Thread GitBox


tombentley commented on pull request #9266:
URL: https://github.com/apache/kafka/pull/9266#issuecomment-689020104


   @stanislavkozlovski I'm not sure how what you describe differs from the test 
implemented, though it's not identically the same because the tests have their 
own `log4j.properties` which configures `kafka`, but not `kafka.controller`. I 
guess it might be clearer (and less fragile) if the test loggers were `foo` and 
`foo.bar`. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9267: MINOR: Add debug logs for StreamThread

2020-09-08 Thread GitBox


vvcephei commented on a change in pull request #9267:
URL: https://github.com/apache/kafka/pull/9267#discussion_r485062295



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -612,63 +612,18 @@ void runOnce() {
 final long startMs = time.milliseconds();
 now = startMs;
 
-if (state == State.PARTITIONS_ASSIGNED) {
-// try to fetch some records with zero poll millis
-// to unblock the restoration as soon as possible
-records = pollRequests(Duration.ZERO);
-} else if (state == State.PARTITIONS_REVOKED) {
-// try to fetch som records with zero poll millis to unblock
-// other useful work while waiting for the join response
-records = pollRequests(Duration.ZERO);
-} else if (state == State.RUNNING || state == State.STARTING) {
-// try to fetch some records with normal poll time
-// in order to get long polling
-records = pollRequests(pollTime);
-} else if (state == State.PENDING_SHUTDOWN) {
-// we are only here because there's rebalance in progress,
-// just poll with zero to complete it
-records = pollRequests(Duration.ZERO);
-} else {
-// any other state should not happen
-log.error("Unexpected state {} during normal iteration", state);
-throw new StreamsException(logPrefix + "Unexpected state " + state 
+ " during normal iteration");
-}
-
-final long pollLatency = advanceNowAndComputeLatency();
-
-pollSensor.record(pollLatency, now);
-if (records != null && !records.isEmpty()) {
-pollRecordsSensor.record(records.count(), now);
-taskManager.addRecordsToTasks(records);
-}
+final long pollLatency = pollPhase();

Review comment:
   `runOnce` was too long, according to checkStyle, so I factored out some 
of the execution phases.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -612,63 +612,18 @@ void runOnce() {
 final long startMs = time.milliseconds();
 now = startMs;
 
-if (state == State.PARTITIONS_ASSIGNED) {
-// try to fetch some records with zero poll millis
-// to unblock the restoration as soon as possible
-records = pollRequests(Duration.ZERO);
-} else if (state == State.PARTITIONS_REVOKED) {
-// try to fetch som records with zero poll millis to unblock
-// other useful work while waiting for the join response
-records = pollRequests(Duration.ZERO);
-} else if (state == State.RUNNING || state == State.STARTING) {
-// try to fetch some records with normal poll time
-// in order to get long polling
-records = pollRequests(pollTime);
-} else if (state == State.PENDING_SHUTDOWN) {
-// we are only here because there's rebalance in progress,
-// just poll with zero to complete it
-records = pollRequests(Duration.ZERO);
-} else {
-// any other state should not happen
-log.error("Unexpected state {} during normal iteration", state);
-throw new StreamsException(logPrefix + "Unexpected state " + state 
+ " during normal iteration");
-}
-
-final long pollLatency = advanceNowAndComputeLatency();
-
-pollSensor.record(pollLatency, now);
-if (records != null && !records.isEmpty()) {
-pollRecordsSensor.record(records.count(), now);
-taskManager.addRecordsToTasks(records);
-}
+final long pollLatency = pollPhase();
 
 // Shutdown hook could potentially be triggered and transit the thread 
state to PENDING_SHUTDOWN during #pollRequests().
 // The task manager internal states could be uninitialized if the 
state transition happens during #onPartitionsAssigned().
 // Should only proceed when the thread is still running after 
#pollRequests(), because no external state mutation
 // could affect the task manager state beyond this point within 
#runOnce().
 if (!isRunning()) {
-log.debug("State already transits to {}, skipping the run once 
call after poll request", state);
+log.debug("Thread state is already {}, skipping the run once call 
after poll request", state);

Review comment:
   Just a slight rewording I thought could be clearer.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -752,6 +712,77 @@ void runOnce() {
 commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, 
now);
 }
 
+private void initializeAndRestorePhase() {
+{
+// only try to initialize the assigned tasks
+ 

[GitHub] [kafka] ableegoldman commented on a change in pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually

2020-09-08 Thread GitBox


ableegoldman commented on a change in pull request #9262:
URL: https://github.com/apache/kafka/pull/9262#discussion_r485073167



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -328,15 +328,15 @@ private synchronized void cleanRemovedTasks(final long 
cleanupDelayMs,
 if (lock(id)) {
 final long now = time.milliseconds();
 final long lastModifiedMs = taskDir.lastModified();
-if (now > lastModifiedMs + cleanupDelayMs) {
-log.info("{} Deleting obsolete state directory {} 
for task {} as {}ms has elapsed (cleanup delay is {}ms).",
-logPrefix(), dirName, id, now - 
lastModifiedMs, cleanupDelayMs);
-
-Utils.delete(taskDir, 
Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
-} else if (manualUserCall) {
+if (manualUserCall) {

Review comment:
   Honestly it kind of seems like there is enough divergent logic to merit 
splitting this up into separate methods for the manual vs cleanup-delay cases. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10470) zstd decompression with small batches is slow and causes excessive GC

2020-09-08 Thread Robert Wagner (Jira)
Robert Wagner created KAFKA-10470:
-

 Summary: zstd decompression with small batches is slow and causes 
excessive GC
 Key: KAFKA-10470
 URL: https://issues.apache.org/jira/browse/KAFKA-10470
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.5.1
Reporter: Robert Wagner


Similar to KAFKA-5150 but for zstd instead of LZ4, it appears that a large 
decompression buffer (128kb) created by zstd-jni per batch is causing a 
significant performance bottleneck.

The next upcoming version of zstd-jni (1.4.5-7) will have a new constructor for 
ZstdInputStream that allows the client to pass its own buffer.  A similar fix 
as [PR #2967|https://github.com/apache/kafka/pull/2967] could be used to have 
the  ZstdConstructor use a BufferSupplier to re-use the decompression buffer.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei opened a new pull request #9267: MINOR: Add debug logs for StreamThread

2020-09-08 Thread GitBox


vvcephei opened a new pull request #9267:
URL: https://github.com/apache/kafka/pull/9267


   Add debug logs to see when Streams calls poll, process, commit, etc. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-08 Thread GitBox


lct45 commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r485053442



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecordTimestamp != null && 
leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+}
+}
+
+/**
+ * Created to handle records where 0 < timestamp < timeDifferenceMs. 
These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifferenceMs]
+ * window, and we will update or create their right windows as new 
records come in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+// A window from [0, timeDifferenceMs] that holds all early records
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+ValueAndTimestamp rightWinAgg = null;
+boolean rightWinAlreadyCreated = false;
+final Set windowStartTimes = new HashSet<>();
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+// to catch the current record's right window, if it 
exists, without more calls to the store
+timestamp + 1)
+) {
+KeyValue, ValueAndTimestamp> next;
+while (iterator.hasNext()) {
+next = iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+if (startTime == 0) {
+combinedWindow = next;
+if (windowMaxRecordTimestamp < timestamp) {
+// If maxRecordTimestamp > timestamp, the current 
record is out-of-order, meaning that the
+// previous record's right window would have been 
created already by other records. This
+// will always be true for early records, as they 
all fall within [0, timeDifferenceMs].
+previousRecordTimestamp = windowMaxRecordTimestamp;
+}
+
+} else if (startTime <= timestamp) {
+rightWinAgg = next.value;
+putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+} else if (startTime == timestamp + 1) {
+rightWinAlreadyCreated = true;
+}
+}
+}
+
+// if there wasn't a right window agg found and we need a right 
window for our new record,
+// the current aggregate in the combined window will go in the new 
record's right window
+if (rightWinAgg == null && combinedWindow != null && 
combinedWindow.value.timestamp() > timestamp) {
+rightWinAgg = combinedWindow.value;
+}
+
+//create right window for ne

[GitHub] [kafka] tombentley opened a new pull request #9266: KAFKA-10469: Resolve logger levels hierarchically

2020-09-08 Thread GitBox


tombentley opened a new pull request #9266:
URL: https://github.com/apache/kafka/pull/9266


   Previous to root logger level was used, ignoring intervening loggers with 
different levels.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #9266: KAFKA-10469: Resolve logger levels hierarchically

2020-09-08 Thread GitBox


tombentley commented on pull request #9266:
URL: https://github.com/apache/kafka/pull/9266#issuecomment-688987558


   @ijuma please could you take a look?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10469) describeConfigs() for broker loggers returns incorrect values

2020-09-08 Thread Tom Bentley (Jira)
Tom Bentley created KAFKA-10469:
---

 Summary: describeConfigs() for broker loggers returns incorrect 
values
 Key: KAFKA-10469
 URL: https://issues.apache.org/jira/browse/KAFKA-10469
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Tom Bentley
Assignee: Tom Bentley


{{Log4jController#loggers}} incorrectly uses the root logger's log level for 
any loggers which lack a configured log level of their own. This is incorrect 
because loggers without an explicit level inherit their level from their parent 
logger and this resolved level might be different from the root logger's level. 
This means that the levels reported from {{Admin.describeConfigs}}, which uses 
{{Log4jController#loggers}} are incorrect. This can be shown by using the 
default {{log4j.properties}} and describing a broker's loggers, it reports

{noformat}
kafka.controller=TRACE
kafka.controller.ControllerChannelManager=INFO
kafka.controller.ControllerEventManager$ControllerEventThread=INFO
kafka.controller.KafkaController=INFO
kafka.controller.RequestSendThread=INFO
kafka.controller.TopicDeletionManager=INFO
kafka.controller.ZkPartitionStateMachine=INFO
kafka.controller.ZkReplicaStateMachine=INFO
{noformat}

The default {{log4j.properties}} does indeed set {{kafka.controller}} to 
{{TRACE}}, but it does not configure the others, so they're actually at 
{{TRACE}} not {{INFO}} as reported.







--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-09-08 Thread GitBox


junrao commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r485026583



##
File path: core/src/main/scala/kafka/server/DelayedOperation.scala
##
@@ -219,38 +203,38 @@ final class DelayedOperationPurgatory[T <: 
DelayedOperation](purgatoryName: Stri
   def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = {
 assert(watchKeys.nonEmpty, "The watch key list can't be empty")
 
-// The cost of tryComplete() is typically proportional to the number of 
keys. Calling
-// tryComplete() for each key is going to be expensive if there are many 
keys. Instead,
-// we do the check in the following way. Call tryComplete(). If the 
operation is not completed,
-// we just add the operation to all keys. Then we call tryComplete() 
again. At this time, if
-// the operation is still not completed, we are guaranteed that it won't 
miss any future triggering
-// event since the operation is already on the watcher list for all keys. 
This does mean that
-// if the operation is completed (by another thread) between the two 
tryComplete() calls, the
-// operation is unnecessarily added for watch. However, this is a less 
severe issue since the
-// expire reaper will clean it up periodically.
-
-// At this point the only thread that can attempt this operation is this 
current thread
-// Hence it is safe to tryComplete() without a lock
-var isCompletedByMe = operation.tryComplete()
-if (isCompletedByMe)
-  return true
-
-var watchCreated = false
-for(key <- watchKeys) {
-  // If the operation is already completed, stop adding it to the rest of 
the watcher list.
-  if (operation.isCompleted)
-return false
-  watchForOperation(key, operation)
-
-  if (!watchCreated) {
-watchCreated = true
-estimatedTotalOperations.incrementAndGet()
-  }
-}
-
-isCompletedByMe = operation.maybeTryComplete()
-if (isCompletedByMe)
-  return true
+// The cost of tryComplete() is typically proportional to the number of 
keys. Calling tryComplete() for each key is
+// going to be expensive if there are many keys. Instead, we do the check 
in the following way through safeTryCompleteOrElse().
+// If the operation is not completed, we just add the operation to all 
keys. Then we call tryComplete() again. At
+// this time, if the operation is still not completed, we are guaranteed 
that it won't miss any future triggering
+// event since the operation is already on the watcher list for all keys.
+//
+// ==[story about lock]==
+// Through safeTryCompleteOrElse(), we hold the operation's lock while 
adding the operation to watch list and doing
+// the tryComplete() check. This is to avoid a potential deadlock between 
the callers to tryCompleteElseWatch() and
+// checkAndComplete(). For example, the following deadlock can happen if 
the lock is only held for the final tryComplete()
+// 1) thread_a holds readlock of stateLock from TransactionStateManager
+// 2) thread_a is executing tryCompleteElseWatch
+// 3) thread_a adds op to watch list
+// 4) thread_b requires writelock of stateLock from 
TransactionStateManager (blocked by thread_a)
+// 5) thread_c calls checkAndComplete() and holds lock of op
+// 6) thread_c is waiting readlock of stateLock to complete op (blocked by 
thread_b)
+// 7) thread_a is waiting lock of op to call safeTryComplete (blocked by 
thread_c)

Review comment:
   to call safeTryComplete => to call the final tryComplete()

##
File path: 
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
##
@@ -119,12 +110,33 @@ class GroupCoordinatorConcurrencyTest extends 
AbstractCoordinatorConcurrencyTest
 
   @Test
   def testConcurrentTxnGoodPathSequence(): Unit = {
-verifyConcurrentOperations(createGroupMembers, allOperationsWithTxn)
+verifyConcurrentOperations(createGroupMembers, Seq(
+  new JoinGroupOperation,
+  new SyncGroupOperation,
+  new OffsetFetchOperation,
+  new CommitTxnOffsetsOperation,
+  new CompleteTxnOperation,
+  new HeartbeatOperation,
+  new LeaveGroupOperation
+))
   }
 
   @Test
   def testConcurrentRandomSequence(): Unit = {
-verifyConcurrentRandomSequences(createGroupMembers, allOperationsWithTxn)
+/**
+ * handleTxnCommitOffsets does not complete delayed requests now so it 
causes error if handleTxnCompletion is executed

Review comment:
   causes error => causes an error

##
File path: 
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
##
@@ -119,12 +110,33 @@ class GroupCoordinatorConcurrencyTest extends 
AbstractCoordinatorConcurrencyTest
 
   @Test
   def testConcurrentTxnGoodPathSequence(): Unit = {
-verifyConcurrentOperations(createGroupMembers, allOper

[jira] [Comment Edited] (KAFKA-10467) kafka-topic --describe fails for topic created by "produce"

2020-09-08 Thread Swayam Raina (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17192282#comment-17192282
 ] 

Swayam Raina edited comment on KAFKA-10467 at 9/8/20, 3:52 PM:
---

Hi [~showuon],

I was using an internal library which calls this method
{code:java}
public Future send(ProducerRecord record, Callback 
callback){code}
The simplified wrapper method is responsible for creating ProducerRecord.


was (Author: swayamraina):
Hi [~showuon],

I was using an internal library which calls this method
{code:java}
public Future send(ProducerRecord record, Callback 
callback){code}
The above method is responsible for creating ProducerRecord.

> kafka-topic --describe fails for topic created by "produce"
> ---
>
> Key: KAFKA-10467
> URL: https://issues.apache.org/jira/browse/KAFKA-10467
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 2.3.1
> Environment: MacOS 
>Reporter: Swayam Raina
>Priority: Minor
>
> {code:java}
> > kafka-topics --version
> 2.3.1 (Commit:18a913733fb71c01){code}
>  
> While producing to a topic that does not already exists
> {code:java}
> producer.send("does-not-exists", "msg-1")
> {code}
>  
> broker creates the topic
> {code:java}
> // partition file
> > ls /tmp/kafka-logs/
> does-not-exists-0{code}
>  
> If I try to list the topics, it shows also shows this new topic
> {code:java}
> > kafka-topics --bootstrap-server localhost:9092 --list
> does-not-exists-0
> {code}
> Now while trying to describe the topic that was auto-created the following 
> error is thrown
>  
> {code:java}
> > kafka-topics --bootstrap-server localhost:9092 --topic does-not-exists 
> >--describe
> Error while executing topic command : 
> org.apache.kafka.common.errors.UnknownServerException: The server experienced 
> an unexpected error when processing the request.Error while executing topic 
> command : org.apache.kafka.common.errors.UnknownServerException: The server 
> experienced an unexpected error when processing the request.[2020-09-08 
> 00:21:30,890] ERROR java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.UnknownServerException: The server experienced 
> an unexpected error when processing the request. at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>  at 
> kafka.admin.TopicCommand$AdminClientTopicService.$anonfun$describeTopic$3(TopicCommand.scala:228)
>  at 
> kafka.admin.TopicCommand$AdminClientTopicService.$anonfun$describeTopic$3$adapted(TopicCommand.scala:225)
>  at scala.collection.Iterator.foreach(Iterator.scala:941) at 
> scala.collection.Iterator.foreach$(Iterator.scala:941) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at 
> scala.collection.IterableLike.foreach(IterableLike.scala:74) at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:73) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:56) at 
> kafka.admin.TopicCommand$AdminClientTopicService.describeTopic(TopicCommand.scala:225)
>  at kafka.admin.TopicCommand$.main(TopicCommand.scala:66) at 
> kafka.admin.TopicCommand.main(TopicCommand.scala)Caused by: 
> org.apache.kafka.common.errors.UnknownServerException: The server experienced 
> an unexpected error when processing the request. (kafka.admin.TopicCommand$)
>  
> {code}
> ```
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10467) kafka-topic --describe fails for topic created by "produce"

2020-09-08 Thread Swayam Raina (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17192282#comment-17192282
 ] 

Swayam Raina commented on KAFKA-10467:
--

Hi [~showuon],

I was using an internal library which calls this method
{code:java}
public Future send(ProducerRecord record, Callback 
callback){code}
The above method is responsible for creating ProducerRecord.

> kafka-topic --describe fails for topic created by "produce"
> ---
>
> Key: KAFKA-10467
> URL: https://issues.apache.org/jira/browse/KAFKA-10467
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 2.3.1
> Environment: MacOS 
>Reporter: Swayam Raina
>Priority: Minor
>
> {code:java}
> > kafka-topics --version
> 2.3.1 (Commit:18a913733fb71c01){code}
>  
> While producing to a topic that does not already exists
> {code:java}
> producer.send("does-not-exists", "msg-1")
> {code}
>  
> broker creates the topic
> {code:java}
> // partition file
> > ls /tmp/kafka-logs/
> does-not-exists-0{code}
>  
> If I try to list the topics, it shows also shows this new topic
> {code:java}
> > kafka-topics --bootstrap-server localhost:9092 --list
> does-not-exists-0
> {code}
> Now while trying to describe the topic that was auto-created the following 
> error is thrown
>  
> {code:java}
> > kafka-topics --bootstrap-server localhost:9092 --topic does-not-exists 
> >--describe
> Error while executing topic command : 
> org.apache.kafka.common.errors.UnknownServerException: The server experienced 
> an unexpected error when processing the request.Error while executing topic 
> command : org.apache.kafka.common.errors.UnknownServerException: The server 
> experienced an unexpected error when processing the request.[2020-09-08 
> 00:21:30,890] ERROR java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.UnknownServerException: The server experienced 
> an unexpected error when processing the request. at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>  at 
> kafka.admin.TopicCommand$AdminClientTopicService.$anonfun$describeTopic$3(TopicCommand.scala:228)
>  at 
> kafka.admin.TopicCommand$AdminClientTopicService.$anonfun$describeTopic$3$adapted(TopicCommand.scala:225)
>  at scala.collection.Iterator.foreach(Iterator.scala:941) at 
> scala.collection.Iterator.foreach$(Iterator.scala:941) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at 
> scala.collection.IterableLike.foreach(IterableLike.scala:74) at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:73) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:56) at 
> kafka.admin.TopicCommand$AdminClientTopicService.describeTopic(TopicCommand.scala:225)
>  at kafka.admin.TopicCommand$.main(TopicCommand.scala:66) at 
> kafka.admin.TopicCommand.main(TopicCommand.scala)Caused by: 
> org.apache.kafka.common.errors.UnknownServerException: The server experienced 
> an unexpected error when processing the request. (kafka.admin.TopicCommand$)
>  
> {code}
> ```
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] tombentley commented on pull request #9263: KAFKA-10468: Fix CNFE on deserialization of Log4jController#getLoggers

2020-09-08 Thread GitBox


tombentley commented on pull request #9263:
URL: https://github.com/apache/kafka/pull/9263#issuecomment-688933062


   Thanks @ijuma 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley closed pull request #9263: KAFKA-10468: Fix CNFE on deserialization of Log4jController#getLoggers

2020-09-08 Thread GitBox


tombentley closed pull request #9263:
URL: https://github.com/apache/kafka/pull/9263


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9256: Fix some Gradle deprecation warnings

2020-09-08 Thread GitBox


ijuma commented on pull request #9256:
URL: https://github.com/apache/kafka/pull/9256#issuecomment-688930298


   Thanks for the PR.
   
   > Dependencies in the compile scope were moved to runtime.
   
   This seems like an issue, no?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mmerdes opened a new pull request #9265: Fix typo in name of output topic

2020-09-08 Thread GitBox


mmerdes opened a new pull request #9265:
URL: https://github.com/apache/kafka/pull/9265


   This change fixes inconsistent naming for the output topic in the 
documentation.
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9182: KAFKA-10403 Replace scala collection by java collection in generating…

2020-09-08 Thread GitBox


ijuma commented on a change in pull request #9182:
URL: https://github.com/apache/kafka/pull/9182#discussion_r484981150



##
File path: core/src/main/scala/kafka/utils/Log4jController.scala
##
@@ -87,9 +87,10 @@ object Log4jController {
 class Log4jController extends Log4jControllerMBean {
 
   def getLoggers: util.List[String] = {
-Log4jController.loggers.map {
+// we replace scala collection by java collection so mbean client is able 
to parse it without scala library.

Review comment:
   Instead of `parse`, we should say `deserialize`.

##
File path: core/src/test/scala/kafka/utils/LoggingTest.scala
##
@@ -26,6 +26,12 @@ import org.junit.Assert.{assertEquals, assertTrue}
 
 class LoggingTest extends Logging {
 
+  @Test
+  def testTypeOfGetLoggers(): Unit = {

Review comment:
   Can we add a comment to the test too since this is not obvious.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9182: KAFKA-10403 Replace scala collection by java collection in generating…

2020-09-08 Thread GitBox


chia7712 commented on pull request #9182:
URL: https://github.com/apache/kafka/pull/9182#issuecomment-688927747


   > can you please provide a test as well?
   
   done!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9182: KAFKA-10403 Replace scala collection by java collection in generating…

2020-09-08 Thread GitBox


ijuma commented on pull request #9182:
URL: https://github.com/apache/kafka/pull/9182#issuecomment-688916053


   The change looks good, can you please provide a test as well?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9263: KAFKA-10468: Fix CNFE on deserialization of Log4jController#getLoggers

2020-09-08 Thread GitBox


ijuma commented on pull request #9263:
URL: https://github.com/apache/kafka/pull/9263#issuecomment-688915561


   This is a duplicate of https://github.com/apache/kafka/pull/9182



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9231: KAFKA-10447: Migrate tools module to JUnit 5 and mockito

2020-09-08 Thread GitBox


ijuma commented on pull request #9231:
URL: https://github.com/apache/kafka/pull/9231#issuecomment-688912021


   Unrelated flaky failures:
   
   ```
   Build / JDK 15 / 
kafka.network.DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota
   Build / JDK 8 / kafka.api.PlaintextAdminIntegrationTest.testConsumerGroups
   Build / JDK 8 / 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
   
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 opened a new pull request #9264: KAFKA-5636: Add Sliding Windows documentation

2020-09-08 Thread GitBox


lct45 opened a new pull request #9264:
URL: https://github.com/apache/kafka/pull/9264


   Add necessary documentation for 
[KIP-450](https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL),
 adding sliding window aggregations to KStreams
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #9260: MINOR: Update scala default version in readme

2020-09-08 Thread GitBox


ijuma merged pull request #9260:
URL: https://github.com/apache/kafka/pull/9260


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #9263: KAFKA-10468: Fix CNFE on deserialization of Log4jController#getLoggers

2020-09-08 Thread GitBox


tombentley commented on pull request #9263:
URL: https://github.com/apache/kafka/pull/9263#issuecomment-688900724


   @mimaison this one is really trivial, grateful if you could take a look.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley opened a new pull request #9263: KAFKA-10468: Fix CNFE on deserialization of Log4jController#getLoggers

2020-09-08 Thread GitBox


tombentley opened a new pull request #9263:
URL: https://github.com/apache/kafka/pull/9263


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10468) Log4jController.getLoggers serialization

2020-09-08 Thread Tom Bentley (Jira)
Tom Bentley created KAFKA-10468:
---

 Summary: Log4jController.getLoggers serialization
 Key: KAFKA-10468
 URL: https://issues.apache.org/jira/browse/KAFKA-10468
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Tom Bentley


{{Log4jController#getLoggers()}} returns a {{java.util.List}} wrapper for a 
Scala {{List}}, which results in a {{ClassNotFoundException}} on any MBean 
client which doesn't have the scala wrapper class on its classpath.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] felipeazv removed a comment on pull request #7288: KAFKA-7931 : [Proposal] Fix metadata fetch for ephemeral brokers behind a Virtual IP

2020-09-08 Thread GitBox


felipeazv removed a comment on pull request #7288:
URL: https://github.com/apache/kafka/pull/7288#issuecomment-688860769


   Any updates in this PR?
   I am also facing this issue (client 2.5.0).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] felipeazv edited a comment on pull request #7288: KAFKA-7931 : [Proposal] Fix metadata fetch for ephemeral brokers behind a Virtual IP

2020-09-08 Thread GitBox


felipeazv edited a comment on pull request #7288:
URL: https://github.com/apache/kafka/pull/7288#issuecomment-688860769


   Any updates in this PR?
   I am also facing this issue (client 2.5.0).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >