[jira] [Issue Comment Deleted] (KAFKA-10304) Revisit and improve the tests of MirrorMaker 2
[ https://issues.apache.org/jira/browse/KAFKA-10304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang updated KAFKA-10304: --- Comment: was deleted (was: https://github.com/apache/kafka/pull/9224) > Revisit and improve the tests of MirrorMaker 2 > -- > > Key: KAFKA-10304 > URL: https://issues.apache.org/jira/browse/KAFKA-10304 > Project: Kafka > Issue Type: Test > Components: KafkaConnect, mirrormaker >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Minor > Fix For: 2.8.0 > > > In a different MM2 change https://github.com/apache/kafka/pull/9029, some > concerns on tests were raised. It may be a good time to revisit and refactor > the tests, possibly in the following way: > (1) are 100 messages good enough for integration tests? > (2) what about the broker failure in the middle of integration tests? > (3) other validations to check (e.g. topic config sync) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10304) Revisit and improve the tests of MirrorMaker 2
[ https://issues.apache.org/jira/browse/KAFKA-10304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17192664#comment-17192664 ] Ning Zhang commented on KAFKA-10304: Hi [~mimaison] [~ryannedolan] this is mostly a refactoring pr on the MM2 integration tests. The purpose of doing that: (1) address the concern in the previous PR (https://github.com/apache/kafka/pull/9029), (2) prepare for the future development (e.g. extract common functions). I think the current PR (https://github.com/apache/kafka/pull/9224) is just a starting point, and I am very appreciated for your feedback on what to test additionally and how to get close to the real scenario. > Revisit and improve the tests of MirrorMaker 2 > -- > > Key: KAFKA-10304 > URL: https://issues.apache.org/jira/browse/KAFKA-10304 > Project: Kafka > Issue Type: Test > Components: KafkaConnect, mirrormaker >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Minor > Fix For: 2.8.0 > > > In a different MM2 change https://github.com/apache/kafka/pull/9029, some > concerns on tests were raised. It may be a good time to revisit and refactor > the tests, possibly in the following way: > (1) are 100 messages good enough for integration tests? > (2) what about the broker failure in the middle of integration tests? > (3) other validations to check (e.g. topic config sync) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10304) Revisit and improve the tests of MirrorMaker 2
[ https://issues.apache.org/jira/browse/KAFKA-10304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang updated KAFKA-10304: --- Description: In a different MM2 change https://github.com/apache/kafka/pull/9029, some concerns on tests were raised. It may be a good time to revisit and refactor the tests, possibly in the following way: (1) are 100 messages good enough for integration tests? (2) what about the broker failure in the middle of integration tests? (3) other validations to check (e.g. topic config sync) was: due to the quick development of Kafka MM 2, unit and integration tests of MirrorMaker 2 were made just for covering each individual feature and some of them are simply copy-n-paste from the existing tests with small tweaks. It may be a good time to revisit and improve the tests, possibly in the following way: (1) are 100 messages good enough for integration tests? (2) what about the failure in the middle of integration tests? (3) Do we want to check other messages (e.g. checkpoint, heartbeat, offset sync..) beyond the mirrored message in integration tests? > Revisit and improve the tests of MirrorMaker 2 > -- > > Key: KAFKA-10304 > URL: https://issues.apache.org/jira/browse/KAFKA-10304 > Project: Kafka > Issue Type: Test > Components: KafkaConnect, mirrormaker >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Minor > Fix For: 2.8.0 > > > In a different MM2 change https://github.com/apache/kafka/pull/9029, some > concerns on tests were raised. It may be a good time to revisit and refactor > the tests, possibly in the following way: > (1) are 100 messages good enough for integration tests? > (2) what about the broker failure in the middle of integration tests? > (3) other validations to check (e.g. topic config sync) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ning2008wisc commented on a change in pull request #9224: refactor MM2 integration tests
ning2008wisc commented on a change in pull request #9224: URL: https://github.com/apache/kafka/pull/9224#discussion_r485363204 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTest.java ## @@ -396,27 +330,67 @@ public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedExceptio try (Consumer consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap( "group.id", "consumer-group-1"), "test-topic-2")) { // we need to wait for consuming all the records for MM2 replicating the expected offsets -waitForConsumingAllRecords(consumer1); +waitForConsumingAllRecords(consumer1, NUM_RECORDS_PRODUCED); } // create a consumer at backup cluster with same consumer group Id to consume old and new topic consumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap( "group.id", "consumer-group-1"), "primary.test-topic-1", "primary.test-topic-2"); -waitForConsumerGroupOffsetSync(consumer, Arrays.asList("primary.test-topic-1", "primary.test-topic-2"), "consumer-group-1"); +waitForConsumerGroupOffsetSync(backup, consumer, Arrays.asList("primary.test-topic-1", "primary.test-topic-2"), +"consumer-group-1", NUM_RECORDS_PRODUCED); records = consumer.poll(Duration.ofMillis(500)); // similar reasoning as above, no more records to consume by the same consumer group at backup cluster assertEquals("consumer record size is not zero", 0, records.count()); consumer.close(); - } + +@Test +public void testWithBrokerRestart() throws InterruptedException { +// test with a higher number of records +int numRecords = NUM_RECORDS_PRODUCED * 100; + +produceRecords(Arrays.asList(primary), Arrays.asList("test-topic-1"), numRecords); -private void deleteAllTopics(EmbeddedKafkaCluster cluster) { -Admin client = cluster.createAdminClient(); -try { -client.deleteTopics(client.listTopics().names().get()); -} catch (Throwable e) { -} +// one way replication from primary to backup +mm2Props.put("backup->primary.enabled", "false"); +mm2Config = new MirrorMakerConfig(mm2Props); + +waitUntilMirrorMakerIsRunning(backup, SOURCE_CONNECTOR, mm2Config, "primary", "backup"); + +// have to sleep a little for MM to be ready for the following the kafka broker restart +Thread.sleep(TimeUnit.SECONDS.toMillis(1)); + +// restart kafka broker at backup cluster +restartKafkaBroker(backup); + +Consumer consumer = backup.kafka().createConsumerAndSubscribeTo( +Collections.singletonMap("group.id", "consumer-group-1"), "primary.test-topic-1"); +// verify the consumption equals to produce +waitForConsumingAllRecords(consumer, numRecords); +consumer.commitAsync(); + +// produce another set of records +produceRecords(Arrays.asList(primary), Arrays.asList("test-topic-1"), numRecords); +// restart kafka broker at primary cluster +restartKafkaBroker(primary); +// verify the consumption equals to produce +waitForConsumingAllRecords(consumer, numRecords); + +consumer.close(); +} + +void createTopics() { +// to verify topic config will be sync-ed across clusters +Map topicConfig = new HashMap<>(); +topicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT); +// create these topics before starting the connectors so we don't need to wait for discovery +primary.kafka().createTopic("test-topic-1", NUM_PARTITIONS, 1, topicConfig); Review comment: when creating `test-topic-1` topic on primary cluster, add a topic config. Later on, we will check if the config is synced from primary to backup cluster. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe opened a new pull request #9269: MINOR: add ImplicitLinkedHashCollection#moveToEnd
cmccabe opened a new pull request #9269: URL: https://github.com/apache/kafka/pull/9269 Add ImplicitLinkedHashCollection#moveToEnd. Refactor ImplicitLinkedHashCollectionIterator to be a little bit more robust against concurrent modifications to the map. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ning2008wisc commented on a change in pull request #9224: refactor MM2 integration tests
ning2008wisc commented on a change in pull request #9224: URL: https://github.com/apache/kafka/pull/9224#discussion_r485362458 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTest.java ## @@ -207,13 +173,16 @@ public void testReplication() throws InterruptedException { mm2Config = new MirrorMakerConfig(mm2Props); -waitUntilMirrorMakerIsRunning(backup, mm2Config, "primary", "backup"); +waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, "primary", "backup"); -waitUntilMirrorMakerIsRunning(primary, mm2Config, "backup", "primary"); +waitUntilMirrorMakerIsRunning(primary, CONNECTOR_LIST, mm2Config, "backup", "primary"); MirrorClient primaryClient = new MirrorClient(mm2Config.clientConfig("primary")); MirrorClient backupClient = new MirrorClient(mm2Config.clientConfig("backup")); - + +assertEquals("topic config was not synced", TopicConfig.CLEANUP_POLICY_COMPACT, +getTopicConfig(backup.kafka(), "primary.test-topic-1", TopicConfig.CLEANUP_POLICY_CONFIG)); Review comment: Add a check for topic config sync, since the topic created on primary cluster has a "cleanup.policy" config This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ning2008wisc commented on a change in pull request #9224: refactor MM2 integration tests
ning2008wisc commented on a change in pull request #9224: URL: https://github.com/apache/kafka/pull/9224#discussion_r485361003 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/TestUtils.java ## @@ -32,4 +73,166 @@ } return props; } + +/* + * launch the connectors on kafka connect cluster, then check if they are running + */ +public static void waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster connectCluster, List connectorClasses, Review comment: this is mostly copy-paste from `MirrorConnectorsIntegrationTest` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10467) kafka-topic --describe fails for topic created by "produce"
[ https://issues.apache.org/jira/browse/KAFKA-10467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17192630#comment-17192630 ] Swayam Raina commented on KAFKA-10467: -- [~huxi_2b] the topic gets created by the broker and exists (see description) > kafka-topic --describe fails for topic created by "produce" > --- > > Key: KAFKA-10467 > URL: https://issues.apache.org/jira/browse/KAFKA-10467 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 2.3.1 > Environment: MacOS >Reporter: Swayam Raina >Priority: Minor > > {code:java} > > kafka-topics --version > 2.3.1 (Commit:18a913733fb71c01){code} > > While producing to a topic that does not already exists > {code:java} > producer.send("does-not-exists", "msg-1") > {code} > > broker creates the topic > {code:java} > // partition file > > ls /tmp/kafka-logs/ > does-not-exists-0{code} > > If I try to list the topics, it shows also shows this new topic > {code:java} > > kafka-topics --bootstrap-server localhost:9092 --list > does-not-exists-0 > {code} > Now while trying to describe the topic that was auto-created the following > error is thrown > > {code:java} > > kafka-topics --bootstrap-server localhost:9092 --topic does-not-exists > >--describe > Error while executing topic command : > org.apache.kafka.common.errors.UnknownServerException: The server experienced > an unexpected error when processing the request.Error while executing topic > command : org.apache.kafka.common.errors.UnknownServerException: The server > experienced an unexpected error when processing the request.[2020-09-08 > 00:21:30,890] ERROR java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.UnknownServerException: The server experienced > an unexpected error when processing the request. at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at > kafka.admin.TopicCommand$AdminClientTopicService.$anonfun$describeTopic$3(TopicCommand.scala:228) > at > kafka.admin.TopicCommand$AdminClientTopicService.$anonfun$describeTopic$3$adapted(TopicCommand.scala:225) > at scala.collection.Iterator.foreach(Iterator.scala:941) at > scala.collection.Iterator.foreach$(Iterator.scala:941) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at > scala.collection.IterableLike.foreach(IterableLike.scala:74) at > scala.collection.IterableLike.foreach$(IterableLike.scala:73) at > scala.collection.AbstractIterable.foreach(Iterable.scala:56) at > kafka.admin.TopicCommand$AdminClientTopicService.describeTopic(TopicCommand.scala:225) > at kafka.admin.TopicCommand$.main(TopicCommand.scala:66) at > kafka.admin.TopicCommand.main(TopicCommand.scala)Caused by: > org.apache.kafka.common.errors.UnknownServerException: The server experienced > an unexpected error when processing the request. (kafka.admin.TopicCommand$) > > {code} > ``` > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on pull request #9102: KAFKA-10326 Both serializer and deserializer should be able to see th…
guozhangwang commented on pull request #9102: URL: https://github.com/apache/kafka/pull/9102#issuecomment-689304026 I took a quick look and it lgtm. @abbccdda If your pass is good please feel free to merge. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ning2008wisc commented on a change in pull request #9224: refactor MM2 integration tests
ning2008wisc commented on a change in pull request #9224: URL: https://github.com/apache/kafka/pull/9224#discussion_r485337705 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/TestUtils.java ## @@ -32,4 +71,141 @@ } return props; } + +public static void waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster connectCluster, List mirrorClasses, Review comment: This is the simple move from `MirrorConnectorsIntegrationTest` with generalization of connector class This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ning2008wisc commented on a change in pull request #9224: refactor MM2 integration tests
ning2008wisc commented on a change in pull request #9224: URL: https://github.com/apache/kafka/pull/9224#discussion_r485337316 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/TestUtils.java ## @@ -16,12 +16,51 @@ */ package org.apache.kafka.connect.mirror; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; +import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.DescribeConfigsResult; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; + +import static org.apache.kafka.connect.mirror.TestUtils.NUM_WORKERS; +import static org.apache.kafka.test.TestUtils.waitForCondition; +import static org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.CONNECTOR_SETUP_DURATION_MS; +import static org.junit.Assert.assertTrue; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TestUtils { Review comment: propose `TestUtils` to be the central place to host common functions that will be used by integration tests This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #8834: KAFKA-10134: Enable heartbeat during PrepareRebalance and Depend On State For Poll Timeout
guozhangwang commented on a change in pull request #8834: URL: https://github.com/apache/kafka/pull/8834#discussion_r485326873 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -652,10 +644,10 @@ public void handle(JoinGroupResponse joinResponse, RequestFuture fut } else if (error == Errors.MEMBER_ID_REQUIRED) { // Broker requires a concrete member id to be allowed to join the group. Update member id // and send another join group request in next cycle. +String memberId = joinResponse.data().memberId(); +log.debug("Attempt to join group returned {} error. Will set the member id as {} and then rejoin", error, memberId); synchronized (AbstractCoordinator.this) { -AbstractCoordinator.this.generation = new Generation(OffsetCommitRequest.DEFAULT_GENERATION_ID, -joinResponse.data().memberId(), null); -AbstractCoordinator.this.resetStateAndRejoin(); Review comment: Yes, this is redundant since we are raising this error and `resetStateAndRejoin()` would still be executed at the handler anyways. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #8834: KAFKA-10134: Enable heartbeat during PrepareRebalance and Depend On State For Poll Timeout
guozhangwang commented on a change in pull request #8834: URL: https://github.com/apache/kafka/pull/8834#discussion_r485324085 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -446,14 +453,15 @@ boolean joinGroupIfNeeded(final Timer timer) { resetJoinGroupFuture(); needsJoinPrepare = true; } else { -log.info("Generation data was cleared by heartbeat thread. Initiating rejoin."); +log.info("Generation data was cleared by heartbeat thread to {} and state is now {} before " + + "the rebalance callback is triggered, marking this rebalance as failed and retry", + generation, state); resetStateAndRejoin(); Review comment: That's a good question. I just thought about this and I think I can change the caller of `resetGeneration` (which is the only place that hb thread can reset the generation) and move the `state = MemberState.UNJOINED;` into the callee to make sure that they are always changed together. ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -433,7 +440,7 @@ boolean joinGroupIfNeeded(final Timer timer) { generationSnapshot = this.generation; } -if (generationSnapshot != Generation.NO_GENERATION) { +if (generationSnapshot != Generation.NO_GENERATION && state == MemberState.STABLE) { Review comment: Ack, will do. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9264: KAFKA-5636: Add Sliding Windows documentation
ableegoldman commented on a change in pull request #9264: URL: https://github.com/apache/kafka/pull/9264#discussion_r485314713 ## File path: docs/streams/developer-guide/dsl-api.html ## @@ -3286,13 +3306,33 @@ KTable-KTable Foreign-Key Sliding time windows Sliding windows are actually quite different from hopping and tumbling windows. In Kafka Streams, sliding windows -are used only for join operations, and can be specified through the -JoinWindows class. -A sliding window models a fixed-size window that slides continuously over the time axis; here, two data records are +are used for join operations, specified by using the +JoinWindows class, and windowed aggregations, specified by using the code class="docutils literal">SlidingWindows class. Review comment: ```suggestion JoinWindows class, and windowed aggregations, specified by using the SlidingWindows class. ``` ## File path: docs/streams/developer-guide/dsl-api.html ## @@ -3286,13 +3306,33 @@ KTable-KTable Foreign-Key Sliding time windows Sliding windows are actually quite different from hopping and tumbling windows. In Kafka Streams, sliding windows -are used only for join operations, and can be specified through the -JoinWindows class. -A sliding window models a fixed-size window that slides continuously over the time axis; here, two data records are +are used for join operations, specified by using the +JoinWindows class, and windowed aggregations, specified by using the code class="docutils literal">SlidingWindows class. +A sliding window models a fixed-size window that slides continuously over the time axis. In this model, two data records are said to be included in the same window if (in the case of symmetric windows) the difference of their timestamps is -within the window size. Thus, sliding windows are not aligned to the epoch, but to the data record timestamps. In -contrast to hopping and tumbling windows, the lower and upper window time interval bounds of sliding windows are -both inclusive. +within the window size. As a sliding window moves along the time axis, records may fall into multiple snapshots of +the sliding window, but each unique combination of records appears only in one sliding window snapshot. +The following code defines a sliding window with a time difference of 10 minutes and a grace period of 30 minutes: +import org.apache.kafka.streams.kstream.SlidingWindows; + +// A sliding time window with a time difference of 10 minutes and grace period of 30 minutes +Duration timeDifferenceMs = Duration.ofMinutes(10); +Duration gracePeriodMs = Duration.ofMinutes(30); +SlidingWindows.withTimeDifferenceAndGrace(timeDifferenceMs,gracePeriodMs); + + + Note + Sliding windows require that you set a grace period, as shown above. For time windows and session windows, + setting the grace period is optional and defaults to 24 hours. + + + + This diagram shows windowing a stream of data records with sliding windows. The overlap of + the sliding window snapshots varies depending on the record times. In this diagram, the time numbers represent miliseconds. For example, + t=5 means “at the five milisecond mark”. Review comment: I rendered the site locally and there's something weird going on here. Seems like you can't use literal `"`. Look at what the other captions do, seems like it might be `“` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)
ableegoldman commented on pull request #9239: URL: https://github.com/apache/kafka/pull/9239#issuecomment-689272887 I think this is ready for @vvcephei to review, once the early records PR is merged and this one is rebased This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on pull request #9157: URL: https://github.com/apache/kafka/pull/9157#issuecomment-689272607 Not sure what's the deal with that but I ran it locally myself and everything passed. Can we merge this @mjsax ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on pull request #9157: URL: https://github.com/apache/kafka/pull/9157#issuecomment-689272398 Looks like the three Java builds all passed, but the CI build failed due to ``` [2020-09-09T02:35:12.324Z] + mkdir test-streams-archetype [2020-09-09T02:35:12.324Z] mkdir: cannot create directory 'test-streams-archetype': File exists [2020-09-09T02:35:12.325Z] + echo Could not create test directory for stream quickstart archetype [2020-09-09T02:35:12.325Z] Could not create test directory for stream quickstart archetype ``` I guess there was some issue with cleaning up or running this twice...? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10467) kafka-topic --describe fails for topic created by "produce"
[ https://issues.apache.org/jira/browse/KAFKA-10467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17192585#comment-17192585 ] huxihx commented on KAFKA-10467: Seems the problem is caused by another issue since it should have thrown `Topic 'does-not-exists' does not exist as expected`. Could you manually create a topic using TopicCommand and then describe it to see if everything works? > kafka-topic --describe fails for topic created by "produce" > --- > > Key: KAFKA-10467 > URL: https://issues.apache.org/jira/browse/KAFKA-10467 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 2.3.1 > Environment: MacOS >Reporter: Swayam Raina >Priority: Minor > > {code:java} > > kafka-topics --version > 2.3.1 (Commit:18a913733fb71c01){code} > > While producing to a topic that does not already exists > {code:java} > producer.send("does-not-exists", "msg-1") > {code} > > broker creates the topic > {code:java} > // partition file > > ls /tmp/kafka-logs/ > does-not-exists-0{code} > > If I try to list the topics, it shows also shows this new topic > {code:java} > > kafka-topics --bootstrap-server localhost:9092 --list > does-not-exists-0 > {code} > Now while trying to describe the topic that was auto-created the following > error is thrown > > {code:java} > > kafka-topics --bootstrap-server localhost:9092 --topic does-not-exists > >--describe > Error while executing topic command : > org.apache.kafka.common.errors.UnknownServerException: The server experienced > an unexpected error when processing the request.Error while executing topic > command : org.apache.kafka.common.errors.UnknownServerException: The server > experienced an unexpected error when processing the request.[2020-09-08 > 00:21:30,890] ERROR java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.UnknownServerException: The server experienced > an unexpected error when processing the request. at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at > kafka.admin.TopicCommand$AdminClientTopicService.$anonfun$describeTopic$3(TopicCommand.scala:228) > at > kafka.admin.TopicCommand$AdminClientTopicService.$anonfun$describeTopic$3$adapted(TopicCommand.scala:225) > at scala.collection.Iterator.foreach(Iterator.scala:941) at > scala.collection.Iterator.foreach$(Iterator.scala:941) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at > scala.collection.IterableLike.foreach(IterableLike.scala:74) at > scala.collection.IterableLike.foreach$(IterableLike.scala:73) at > scala.collection.AbstractIterable.foreach(Iterable.scala:56) at > kafka.admin.TopicCommand$AdminClientTopicService.describeTopic(TopicCommand.scala:225) > at kafka.admin.TopicCommand$.main(TopicCommand.scala:66) at > kafka.admin.TopicCommand.main(TopicCommand.scala)Caused by: > org.apache.kafka.common.errors.UnknownServerException: The server experienced > an unexpected error when processing the request. (kafka.admin.TopicCommand$) > > {code} > ``` > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on pull request #9257: KAFKA-10463 the necessary utilities in Dockerfile should include git
chia7712 commented on pull request #9257: URL: https://github.com/apache/kafka/pull/9257#issuecomment-689265404 @ijuma Could you take a look? I want to run system test on different jdk. I configure ```jdk_version``` to change the os of kafka dockerfile. Without this patch, it is impossible to switch to other popular jdk images (for example, openjdk:11 and azul/zulu-openjdk:11) quickly since the ```git``` is not installed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9182: KAFKA-10403 Replace scala collection by java collection in generating…
chia7712 commented on pull request #9182: URL: https://github.com/apache/kafka/pull/9182#issuecomment-689256679 ``` Execution failed for task ':connect:mirror:integrationTest'. 01:19:16 > Process 'Gradle Test Executor 48' finished with non-zero exit value 1 01:19:16This problem might be caused by incorrect test process configuration. 01:19:16Please refer to the test execution section in the User Manual at ``` the error is unrelated to this PR This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on pull request #8657: URL: https://github.com/apache/kafka/pull/8657#issuecomment-689256048 ``` Build / JDK 15 / kafka.network.ConnectionQuotasTest.testNoConnectionLimitsByDefault Build / JDK 11 / kafka.network.DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota Build / JDK 11 / org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true] ``` ``` Module: kafkatest.tests.connect.connect_distributed_test Class: ConnectDistributedTest Method: test_bounce Arguments: { "clean": true, "connect_protocol": "sessioned" } ``` On my local, they are flaky on trunk branch. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9062: KAFKA-8098: fix the flaky test by disabling the auto commit to avoid member rejoining
showuon commented on pull request #9062: URL: https://github.com/apache/kafka/pull/9062#issuecomment-689247748 @omkreddy , any comment for this PR? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9104: KAFKA-10266: Update the connector config header.converter
showuon commented on pull request #9104: URL: https://github.com/apache/kafka/pull/9104#issuecomment-689247469 @kkonstantine , could you review this PR? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9178: KAFKA-8362: fix the old checkpoint won't be removed after alter log dir
showuon commented on pull request #9178: URL: https://github.com/apache/kafka/pull/9178#issuecomment-689247342 @tombentley @ijuma , could you help review this PR? Thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9179: KAFKA-10390: Remove ignore case option when grep process info to be more specific
showuon commented on pull request #9179: URL: https://github.com/apache/kafka/pull/9179#issuecomment-689244826 @cmccabe @lbradstreet , any other comments for this PR? Thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly
showuon commented on pull request #9202: URL: https://github.com/apache/kafka/pull/9202#issuecomment-689244512 @ijuma , could you help review this PR again? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
junrao commented on pull request #8657: URL: https://github.com/apache/kafka/pull/8657#issuecomment-689241869 @ijuma @hachikuji @rajinisivaram : I think this PR is ready to be merged. Any further comments from you? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)
ableegoldman commented on a change in pull request #9239: URL: https://github.com/apache/kafka/pull/9239#discussion_r485266653 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java ## @@ -246,4 +238,31 @@ private void assertOutputKeyValueTimestamp(final TestOutputTopic testRecord = new TestRecord<>(expectedKey, expectedValue, null, expectedTimestamp); assertThat(nonWindowedRecord, equalTo(testRecord)); } + +private void assertOutputKeyValueNotOrdered(final Set> results) { Review comment: This makes it sound like you want to assert that the output is not ordered, which I don't think is the point here? Also, since you're only calling this from one place and are asserting a specific output that corresponds to a specific test, I would just inline this check in the test instead of moving it out to a new method ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java ## @@ -239,52 +241,81 @@ private void doCountSlidingWindows(final MockProcessorSupplier inputTopic.pipeInput("2", "B", 1000L); inputTopic.pipeInput("3", "C", 600L); } -assertThat(supplier.theCapturedProcessor().processed(), equalTo(Arrays.asList( -// processing A@500 -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 1L, 500L), -// processing A@999 -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 1L, 999L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 2L, 999L), -// processing A@600 -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 3L, 999L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 2L, 999L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(100L, 600L)), 2L, 600L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 1L, 999L), -// processing B@500 -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 1L, 500L), -// processing B@600 -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 1L, 600L), -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(100L, 600L)), 2L, 600L), -// processing B@700 -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 2L, 700L), -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101L)), 1L, 700L), -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(200L, 700L)), 3L, 700L), -// processing C@501 -new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(1L, 501L)), 1L, 501L), -// processing first A@1000 -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 3L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 2L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 1L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 4L, 1000L), -// processing second A@1000 -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 5L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 4L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 3L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 2L, 1000L), -// processing first B@1000 -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 3L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 2L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 1L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 4L, 1000L), -// processing second B@1000 -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 5L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 4L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 3L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 2L, 1000L), -// processing C@600 -new KeyValueTimestamp<>(new Windowed<>("3", new Ti
[GitHub] [kafka] showuon commented on pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint
showuon commented on pull request #9121: URL: https://github.com/apache/kafka/pull/9121#issuecomment-689196185 @mjsax could you review this PR again? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-5636) Add Sliding-Window support for Aggregations
[ https://issues.apache.org/jira/browse/KAFKA-5636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leah Thomas resolved KAFKA-5636. Resolution: Fixed > Add Sliding-Window support for Aggregations > --- > > Key: KAFKA-5636 > URL: https://issues.apache.org/jira/browse/KAFKA-5636 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.11.0.0 >Reporter: Michael G. Noll >Assignee: Leah Thomas >Priority: Major > Labels: needs-kip > Fix For: 2.7.0 > > > We support three windowing types for aggregations in the DSL right now: > * Tumbling windows > * Hopping windows (note: some stream processing tools call these "sliding > windows") > * Session windows > Some users have expressed the need for sliding windows. While we do use > sliding windows for joins, we do not yet support sliding window aggregations > in the DSL -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-5636) Add Sliding-Window support for Aggregations
[ https://issues.apache.org/jira/browse/KAFKA-5636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leah Thomas updated KAFKA-5636: --- Fix Version/s: 2.7.0 > Add Sliding-Window support for Aggregations > --- > > Key: KAFKA-5636 > URL: https://issues.apache.org/jira/browse/KAFKA-5636 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.11.0.0 >Reporter: Michael G. Noll >Assignee: Leah Thomas >Priority: Major > Labels: needs-kip > Fix For: 2.7.0 > > > We support three windowing types for aggregations in the DSL right now: > * Tumbling windows > * Hopping windows (note: some stream processing tools call these "sliding > windows") > * Session windows > Some users have expressed the need for sliding windows. While we do use > sliding windows for joins, we do not yet support sliding window aggregations > in the DSL -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r485249087 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ## @@ -439,6 +442,185 @@ public void testJoin() { } } +@SuppressWarnings("unchecked") Review comment: It's probably some weird Java thing where it lazily types the generics and doesn't force the cast until you put it in the map. (I just made that up, but @vvcephei would probably know) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9157: Update for KIP-450 to handle early records
lct45 commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r485248486 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ## @@ -439,6 +442,185 @@ public void testJoin() { } } +@SuppressWarnings("unchecked") Review comment: Yeah I do find it odd that there are no warnings earlier, it's something about the Map that's triggering them This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] JoelWee commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join
JoelWee commented on a change in pull request #9186: URL: https://github.com/apache/kafka/pull/9186#discussion_r485244413 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java ## @@ -58,22 +58,23 @@ public void init(final ProcessorContext context) { @Override public void process(final K1 key, final V1 value) { -// we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record -// If {@code keyMapper} returns {@code null} it implies there is no match, +// We allow null keys unless {@code keyMapper} returns {@code null} and we ignore it as invalid. Review comment: Yep makes sense. Have updated it now. Just noting here that this means we're changing a [test](https://github.com/apache/kafka/pull/9186/files#diff-e3715715832b244da2d8787362b0c570R230) we previously had This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r485239998 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ## @@ -439,6 +442,185 @@ public void testJoin() { } } +@SuppressWarnings("unchecked") Review comment: That seems weird to me. Guessing it's ultimately due to the `supplier.theCapturedProcessor().processed()` we loop over. But then wouldn't we get the warning a bit earlier? 🤷♀️ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9157: Update for KIP-450 to handle early records
lct45 commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r485235880 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ## @@ -439,6 +442,185 @@ public void testJoin() { } } +@SuppressWarnings("unchecked") Review comment: Without it, there are warnings with the transition from `K, V` to `Long, ValueAndTimestamp` when adding and updating the hash map that holds the results. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9264: KAFKA-5636: Add Sliding Windows documentation
ableegoldman commented on pull request #9264: URL: https://github.com/apache/kafka/pull/9264#issuecomment-689173489 cc @guozhangwang for secondary review & merge This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
[ https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17192513#comment-17192513 ] Sophie Blee-Goldman commented on KAFKA-10134: - Hey [~davispw], It looks like you might have run into a few distinct issues: the rebalancing problems, the "insufficient bytes available" IllegalStateException, and the "Active task 3_0 should have been suspended" IllegalStateException. The rebalancing seems to point to this issue, as the full fix did not make it into 2.6.0 in time. It would be great if you could test out the patch and see if that helps (building from [pull/8834|https://github.com/apache/kafka/pull/8834] specifically, which is not yet merged). The patch I linked also includes a fix for KAFKA-10122, another cause of unnecessary rebalances. For the two IllegalStateException issues, could you open separate tickets? They seem unrelated to this, and to each other, but definitely merit a closer look. Any logs you have from the time of the exceptions would help a lot. Thanks! > High CPU issue during rebalance in Kafka consumer after upgrading to 2.5 > > > Key: KAFKA-10134 > URL: https://issues.apache.org/jira/browse/KAFKA-10134 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.5.0 >Reporter: Sean Guo >Assignee: Guozhang Wang >Priority: Blocker > Fix For: 2.5.2, 2.6.1 > > Attachments: consumer3.log.2020-08-20.log, > consumer5.log.2020-07-22.log > > > We want to utilize the new rebalance protocol to mitigate the stop-the-world > effect during the rebalance as our tasks are long running task. > But after the upgrade when we try to kill an instance to let rebalance happen > when there is some load(some are long running tasks >30S) there, the CPU will > go sky-high. It reads ~700% in our metrics so there should be several threads > are in a tight loop. We have several consumer threads consuming from > different partitions during the rebalance. This is reproducible in both the > new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The > difference is that with old eager rebalance rebalance protocol used the high > CPU usage will dropped after the rebalance done. But when using cooperative > one, it seems the consumers threads are stuck on something and couldn't > finish the rebalance so the high CPU usage won't drop until we stopped our > load. Also a small load without long running task also won't cause continuous > high CPU usage as the rebalance can finish in that case. > > "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 > cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable > [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 > os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 > runnable [0x7fe119aab000] java.lang.Thread.State: RUNNABLE at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) > at > > By debugging into the code we found it looks like the clients are in a loop > on finding the coordinator. > I also tried the old rebalance protocol for the new version the issue still > exists but the CPU will be back to normal when the rebalance is done. > Also tried the same on the 2.4.1 which seems don't have this issue. So it > seems related something changed between 2.4.1 and 2.5.0. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r485222996 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -146,96 +165,203 @@ public void processInOrder(final K key, final V value, final long timestamp) { boolean leftWinAlreadyCreated = false; boolean rightWinAlreadyCreated = false; -// keep the left type window closest to the record -Window latestLeftTypeWindow = null; +Long previousRecordTimestamp = null; + try ( final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( key, key, -timestamp - 2 * windows.timeDifferenceMs(), +Math.max(0, inputRecordTimestamp - 2 * windows.timeDifferenceMs()), // to catch the current record's right window, if it exists, without more calls to the store -timestamp + 1) +inputRecordTimestamp + 1) ) { while (iterator.hasNext()) { -final KeyValue, ValueAndTimestamp> next = iterator.next(); -windowStartTimes.add(next.key.window().start()); -final long startTime = next.key.window().start(); +final KeyValue, ValueAndTimestamp> windowBeingProcessed = iterator.next(); +final long startTime = windowBeingProcessed.key.window().start(); +windowStartTimes.add(startTime); final long endTime = startTime + windows.timeDifferenceMs(); +final long windowMaxRecordTimestamp = windowBeingProcessed.value.timestamp(); -if (endTime < timestamp) { -leftWinAgg = next.value; -if (isLeftWindow(next)) { -latestLeftTypeWindow = next.key.window(); -} -} else if (endTime == timestamp) { +if (endTime < inputRecordTimestamp) { +leftWinAgg = windowBeingProcessed.value; +previousRecordTimestamp = windowMaxRecordTimestamp; +} else if (endTime == inputRecordTimestamp) { leftWinAlreadyCreated = true; -putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); -} else if (endTime > timestamp && startTime <= timestamp) { -rightWinAgg = next.value; -putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); -} else { +if (windowMaxRecordTimestamp < inputRecordTimestamp) { +previousRecordTimestamp = windowMaxRecordTimestamp; +} + updateWindowAndForward(windowBeingProcessed.key.window(), windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp); +} else if (endTime > inputRecordTimestamp && startTime <= inputRecordTimestamp) { +rightWinAgg = windowBeingProcessed.value; + updateWindowAndForward(windowBeingProcessed.key.window(), windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp); +} else if (startTime == inputRecordTimestamp + 1) { rightWinAlreadyCreated = true; +} else { +throw new IllegalStateException("Unexpected window found when processing sliding windows"); } } } //create right window for previous record -if (latestLeftTypeWindow != null) { -final long rightWinStart = latestLeftTypeWindow.end() + 1; -if (!windowStartTimes.contains(rightWinStart)) { -final TimeWindow window = new TimeWindow(rightWinStart, rightWinStart + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); -putAndForward(window, valueAndTime, key, value, closeTime, timestamp); +if (previousRecordTimestamp != null) { +final long previousRightWinStart = previousRecordTimestamp + 1; +if (rightWindowNecessaryAndPossible(windowStartTimes, previousRightWinStart, inputRecordTimestamp)) { +final TimeWindow window = new TimeWindow(previousRightWinStart, previousRightWinStart + windows.timeDifferenceMs()); +final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp); +
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r485221931 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -146,96 +165,203 @@ public void processInOrder(final K key, final V value, final long timestamp) { boolean leftWinAlreadyCreated = false; boolean rightWinAlreadyCreated = false; -// keep the left type window closest to the record -Window latestLeftTypeWindow = null; +Long previousRecordTimestamp = null; + try ( final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( key, key, -timestamp - 2 * windows.timeDifferenceMs(), +Math.max(0, inputRecordTimestamp - 2 * windows.timeDifferenceMs()), // to catch the current record's right window, if it exists, without more calls to the store -timestamp + 1) +inputRecordTimestamp + 1) ) { while (iterator.hasNext()) { -final KeyValue, ValueAndTimestamp> next = iterator.next(); -windowStartTimes.add(next.key.window().start()); -final long startTime = next.key.window().start(); +final KeyValue, ValueAndTimestamp> windowBeingProcessed = iterator.next(); +final long startTime = windowBeingProcessed.key.window().start(); +windowStartTimes.add(startTime); final long endTime = startTime + windows.timeDifferenceMs(); +final long windowMaxRecordTimestamp = windowBeingProcessed.value.timestamp(); -if (endTime < timestamp) { -leftWinAgg = next.value; -if (isLeftWindow(next)) { -latestLeftTypeWindow = next.key.window(); -} -} else if (endTime == timestamp) { +if (endTime < inputRecordTimestamp) { +leftWinAgg = windowBeingProcessed.value; +previousRecordTimestamp = windowMaxRecordTimestamp; +} else if (endTime == inputRecordTimestamp) { leftWinAlreadyCreated = true; -putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); -} else if (endTime > timestamp && startTime <= timestamp) { -rightWinAgg = next.value; -putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); -} else { +if (windowMaxRecordTimestamp < inputRecordTimestamp) { +previousRecordTimestamp = windowMaxRecordTimestamp; +} + updateWindowAndForward(windowBeingProcessed.key.window(), windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp); +} else if (endTime > inputRecordTimestamp && startTime <= inputRecordTimestamp) { +rightWinAgg = windowBeingProcessed.value; + updateWindowAndForward(windowBeingProcessed.key.window(), windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp); +} else if (startTime == inputRecordTimestamp + 1) { rightWinAlreadyCreated = true; +} else { +throw new IllegalStateException("Unexpected window found when processing sliding windows"); } } } //create right window for previous record -if (latestLeftTypeWindow != null) { -final long rightWinStart = latestLeftTypeWindow.end() + 1; -if (!windowStartTimes.contains(rightWinStart)) { -final TimeWindow window = new TimeWindow(rightWinStart, rightWinStart + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); -putAndForward(window, valueAndTime, key, value, closeTime, timestamp); +if (previousRecordTimestamp != null) { +final long previousRightWinStart = previousRecordTimestamp + 1; +if (rightWindowNecessaryAndPossible(windowStartTimes, previousRightWinStart, inputRecordTimestamp)) { +final TimeWindow window = new TimeWindow(previousRightWinStart, previousRightWinStart + windows.timeDifferenceMs()); +final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp); +
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r485221052 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -146,96 +165,203 @@ public void processInOrder(final K key, final V value, final long timestamp) { boolean leftWinAlreadyCreated = false; boolean rightWinAlreadyCreated = false; -// keep the left type window closest to the record -Window latestLeftTypeWindow = null; +Long previousRecordTimestamp = null; + try ( final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( key, key, -timestamp - 2 * windows.timeDifferenceMs(), +Math.max(0, inputRecordTimestamp - 2 * windows.timeDifferenceMs()), // to catch the current record's right window, if it exists, without more calls to the store -timestamp + 1) +inputRecordTimestamp + 1) ) { while (iterator.hasNext()) { -final KeyValue, ValueAndTimestamp> next = iterator.next(); -windowStartTimes.add(next.key.window().start()); -final long startTime = next.key.window().start(); +final KeyValue, ValueAndTimestamp> windowBeingProcessed = iterator.next(); +final long startTime = windowBeingProcessed.key.window().start(); +windowStartTimes.add(startTime); final long endTime = startTime + windows.timeDifferenceMs(); +final long windowMaxRecordTimestamp = windowBeingProcessed.value.timestamp(); -if (endTime < timestamp) { -leftWinAgg = next.value; -if (isLeftWindow(next)) { -latestLeftTypeWindow = next.key.window(); -} -} else if (endTime == timestamp) { +if (endTime < inputRecordTimestamp) { +leftWinAgg = windowBeingProcessed.value; +previousRecordTimestamp = windowMaxRecordTimestamp; +} else if (endTime == inputRecordTimestamp) { leftWinAlreadyCreated = true; -putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); -} else if (endTime > timestamp && startTime <= timestamp) { -rightWinAgg = next.value; -putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); -} else { +if (windowMaxRecordTimestamp < inputRecordTimestamp) { +previousRecordTimestamp = windowMaxRecordTimestamp; +} + updateWindowAndForward(windowBeingProcessed.key.window(), windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp); +} else if (endTime > inputRecordTimestamp && startTime <= inputRecordTimestamp) { +rightWinAgg = windowBeingProcessed.value; + updateWindowAndForward(windowBeingProcessed.key.window(), windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp); +} else if (startTime == inputRecordTimestamp + 1) { rightWinAlreadyCreated = true; +} else { +throw new IllegalStateException("Unexpected window found when processing sliding windows"); Review comment: nit: log an error and include the relevant info (eg `windowStart` and `inputRecordTimestamp` at least). Same for the IllegalStateException in `processEarly` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10471) TimeIndex handling may cause data loss in certain back to back failure
[ https://issues.apache.org/jira/browse/KAFKA-10471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17192504#comment-17192504 ] Rohit Shekhar commented on KAFKA-10471: --- +1 [~junrao] will detect unclean shutdown more reliably. [~dhruvilshah] agree with your comment too, just not sure the cost of it during the log load time, so can we do a sanity test in a lazy fashion too? > TimeIndex handling may cause data loss in certain back to back failure > -- > > Key: KAFKA-10471 > URL: https://issues.apache.org/jira/browse/KAFKA-10471 > Project: Kafka > Issue Type: Bug > Components: core, log >Reporter: Rohit Shekhar >Priority: Critical > > # Active segment for log A going clean shutdown - trim the time index to the > latest fill value, set the clean shutdown marker. > # Broker restarts, loading logs - no recovery due to clean shutdown marker, > log A recovers with the previous active segment as current. It also resized > the TimeIndex to the max. > # Before all the log loads, the broker had a hard shutdown causing a clean > shutdown marker left as is. > # Broker restarts, log A skips recovery due to the presence of a clean > shutdown marker but the TimeIndex file assumes the resized file from the > previous instance is all full (it assumes either file is newly created or is > full with valid value). > # The first append to the active segment will result in roll and TimeIndex > will be rolled with the timestamp value of the last valid entry (0) > # Segment's largest timestamp gives 0 (this can cause premature deletion of > data due to retention. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join
ableegoldman commented on a change in pull request #9186: URL: https://github.com/apache/kafka/pull/9186#discussion_r485218625 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java ## @@ -58,22 +58,23 @@ public void init(final ProcessorContext context) { @Override public void process(final K1 key, final V1 value) { -// we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record -// If {@code keyMapper} returns {@code null} it implies there is no match, +// We allow null keys unless {@code keyMapper} returns {@code null} and we ignore it as invalid. Review comment: Well, if `mappedKey` is null then there can't be a match in the global table since we can't do a lookup with a null key. I think what @mjsax means here (correct me if wrong) is just that we could phrase it a bit differently to say something like ``` // If the mappedKey is null, we ignore it as invalid. This should never happen for KTables // since keyMapper just returns the key, but for GlobalKTables a non-null key can result // in a null mappedKey. There can't be a match for a null mappedKey, so we drop it ``` ...or something. Thoughts? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10471) TimeIndex handling may cause data loss in certain back to back failure
[ https://issues.apache.org/jira/browse/KAFKA-10471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17192503#comment-17192503 ] Jun Rao commented on KAFKA-10471: - [~dhruvilshah] : I agree that the sanity check in the index file is useful to catch bugs like this. For this particular bug, the root cause is that the clean shutdown file doesn't reflect the actual on-disk state correctly. So, it would be useful to fix that directly too. > TimeIndex handling may cause data loss in certain back to back failure > -- > > Key: KAFKA-10471 > URL: https://issues.apache.org/jira/browse/KAFKA-10471 > Project: Kafka > Issue Type: Bug > Components: core, log >Reporter: Rohit Shekhar >Priority: Critical > > # Active segment for log A going clean shutdown - trim the time index to the > latest fill value, set the clean shutdown marker. > # Broker restarts, loading logs - no recovery due to clean shutdown marker, > log A recovers with the previous active segment as current. It also resized > the TimeIndex to the max. > # Before all the log loads, the broker had a hard shutdown causing a clean > shutdown marker left as is. > # Broker restarts, log A skips recovery due to the presence of a clean > shutdown marker but the TimeIndex file assumes the resized file from the > previous instance is all full (it assumes either file is newly created or is > full with valid value). > # The first append to the active segment will result in roll and TimeIndex > will be rolled with the timestamp value of the last valid entry (0) > # Segment's largest timestamp gives 0 (this can cause premature deletion of > data due to retention. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join
ableegoldman commented on a change in pull request #9186: URL: https://github.com/apache/kafka/pull/9186#discussion_r485218625 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java ## @@ -58,22 +58,23 @@ public void init(final ProcessorContext context) { @Override public void process(final K1 key, final V1 value) { -// we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record -// If {@code keyMapper} returns {@code null} it implies there is no match, +// We allow null keys unless {@code keyMapper} returns {@code null} and we ignore it as invalid. Review comment: Well, if `mappedKey` is null then there can't be a match in the global table since we can't do a lookup with a null key. I think what @mjsax means here (correct me if wrong) is just that we could phrase it a bit differently to say something like ``` // If the mappedKey is null, we ignore it as invalid. This should never happen for KTables since keyMapper // just returns the key, but for GlobalKTables a non-null key can result in a null mappedKey. Since there // can't be a match for a null mappedKey, we drop it ``` ...or something. Thoughts? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join
ableegoldman commented on a change in pull request #9186: URL: https://github.com/apache/kafka/pull/9186#discussion_r485218625 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java ## @@ -58,22 +58,23 @@ public void init(final ProcessorContext context) { @Override public void process(final K1 key, final V1 value) { -// we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record -// If {@code keyMapper} returns {@code null} it implies there is no match, +// We allow null keys unless {@code keyMapper} returns {@code null} and we ignore it as invalid. Review comment: Well, if `mappedKey` is null then there can't be a match in the global table since we can't do a lookup with a null key. I think what @mjsax means here (correct me if wrong) is just that we could phrase it a bit differently to say something like "If the mappedKey is null, we ignore it as invalid. This should never happen for KTables since keyMapper just returns the key, but for GlobalKTables a non-null key can result in a null mappedKey. Since there can't be a match for a null mappedKey, we drop it" ...or something. Thoughts? ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java ## @@ -58,22 +58,23 @@ public void init(final ProcessorContext context) { @Override public void process(final K1 key, final V1 value) { -// we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record -// If {@code keyMapper} returns {@code null} it implies there is no match, +// We allow null keys unless {@code keyMapper} returns {@code null} and we ignore it as invalid. Review comment: Well, if `mappedKey` is null then there can't be a match in the global table since we can't do a lookup with a null key. I think what @mjsax means here (correct me if wrong) is just that we could phrase it a bit differently to say something like ``` If the mappedKey is null, we ignore it as invalid. This should never happen for KTables since keyMapper just returns the key, but for GlobalKTables a non-null key can result in a null mappedKey. Since there can't be a match for a null mappedKey, we drop it ``` ...or something. Thoughts? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join
ableegoldman commented on a change in pull request #9186: URL: https://github.com/apache/kafka/pull/9186#discussion_r485217278 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java ## @@ -58,23 +58,22 @@ public void init(final ProcessorContext context) { @Override public void process(final K1 key, final V1 value) { -// we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record -// If {@code keyMapper} returns {@code null} it implies there is no match, -// so ignore unless it is a left join Review comment: Ah yeah sorry, I'm getting things mixed up here...this comment is referring to when the mappedKey is null while the condition I cited below now only applies to when the value is null. Your reasoning sounds correct, we should still process the record in that case if it's a left join. But we should also remove this comment, since if the mappedKey is null then we drop it, regardless of if its a left join or any other This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10471) TimeIndex handling may cause data loss in certain back to back failure
[ https://issues.apache.org/jira/browse/KAFKA-10471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17192494#comment-17192494 ] Dhruvil Shah commented on KAFKA-10471: -- It may be nice to use the time index sanity check to catch issues where an index file is not in a state we expect. We used to perform a sanity check for indices associated with all segments, but with the changes to load segments lazily, we dropped those checks. If we could reintroduce those checks back safely, that may be sufficient to catch and fix such cases. We are taking a similar approach in https://issues.apache.org/jira/browse/KAFKA-10207, so that might help here too. > TimeIndex handling may cause data loss in certain back to back failure > -- > > Key: KAFKA-10471 > URL: https://issues.apache.org/jira/browse/KAFKA-10471 > Project: Kafka > Issue Type: Bug > Components: core, log >Reporter: Rohit Shekhar >Priority: Critical > > # Active segment for log A going clean shutdown - trim the time index to the > latest fill value, set the clean shutdown marker. > # Broker restarts, loading logs - no recovery due to clean shutdown marker, > log A recovers with the previous active segment as current. It also resized > the TimeIndex to the max. > # Before all the log loads, the broker had a hard shutdown causing a clean > shutdown marker left as is. > # Broker restarts, log A skips recovery due to the presence of a clean > shutdown marker but the TimeIndex file assumes the resized file from the > previous instance is all full (it assumes either file is newly created or is > full with valid value). > # The first append to the active segment will result in roll and TimeIndex > will be rolled with the timestamp value of the last valid entry (0) > # Segment's largest timestamp gives 0 (this can cause premature deletion of > data due to retention. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10471) TimeIndex handling may cause data loss in certain back to back failure
[ https://issues.apache.org/jira/browse/KAFKA-10471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17192492#comment-17192492 ] Jun Rao commented on KAFKA-10471: - [~rshekhar]: Thanks for reporting this. This is a very good finding. If the TimeIndex doesn't start in a clean state, it can cause multiple things to go wrong afterward. One way to fix this issue is to check the presence of the clean shutdown file in LogManager.loadLogs() before loading each individual log. We then delete the clean shutdown file and pass a clean shutdown flag to Log and use that in Log.recoverLog(). That way, in step 4 above, the TimeIndex will be rebuilt properly. > TimeIndex handling may cause data loss in certain back to back failure > -- > > Key: KAFKA-10471 > URL: https://issues.apache.org/jira/browse/KAFKA-10471 > Project: Kafka > Issue Type: Bug > Components: core, log >Reporter: Rohit Shekhar >Priority: Critical > > # Active segment for log A going clean shutdown - trim the time index to the > latest fill value, set the clean shutdown marker. > # Broker restarts, loading logs - no recovery due to clean shutdown marker, > log A recovers with the previous active segment as current. It also resized > the TimeIndex to the max. > # Before all the log loads, the broker had a hard shutdown causing a clean > shutdown marker left as is. > # Broker restarts, log A skips recovery due to the presence of a clean > shutdown marker but the TimeIndex file assumes the resized file from the > previous instance is all full (it assumes either file is newly created or is > full with valid value). > # The first append to the active segment will result in roll and TimeIndex > will be rolled with the timestamp value of the last valid entry (0) > # Segment's largest timestamp gives 0 (this can cause premature deletion of > data due to retention. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10448) Preserve Source Partition in Kafka Streams from context
[ https://issues.apache.org/jira/browse/KAFKA-10448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17192491#comment-17192491 ] Matthias J. Sax commented on KAFKA-10448: - Thanks, feel free to start a discussion on the dev mailing list directly. > Preserve Source Partition in Kafka Streams from context > --- > > Key: KAFKA-10448 > URL: https://issues.apache.org/jira/browse/KAFKA-10448 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.5.0 >Reporter: satya >Priority: Minor > Labels: needs-kip > > Currently Kafka streams Sink Nodes use default partitioner or has the > provision of using a custom partitioner which has to be dependent on > key/value. I am looking for an enhancement of Sink Node to ensure source > partition is preserved instead of deriving the partition again using > key/value. One of our use case has producers which have custom partitioners > that we dont have access to as it is a third-party application. By simply > preserving the partition through context.partition() would be helpful. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10471) TimeIndex handling may cause data loss in certain back to back failure
[ https://issues.apache.org/jira/browse/KAFKA-10471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rohit Shekhar updated KAFKA-10471: -- Description: # Active segment for log A going clean shutdown - trim the time index to the latest fill value, set the clean shutdown marker. # Broker restarts, loading logs - no recovery due to clean shutdown marker, log A recovers with the previous active segment as current. It also resized the TimeIndex to the max. # Before all the log loads, the broker had a hard shutdown causing a clean shutdown marker left as is. # Broker restarts, log A skips recovery due to the presence of a clean shutdown marker but the TimeIndex file assumes the resized file from the previous instance is all full (it assumes either file is newly created or is full with valid value). # The first append to the active segment will result in roll and TimeIndex will be rolled with the timestamp value of the last valid entry (0) # Segment's largest timestamp gives 0 (this can cause premature deletion of data due to retention. was: # Active segment for log A going clean shutdown - trim the time index to the latest fill value, set the clean shutdown marker. # Broker restarts, loading logs - no recovery due to clean shutdown marker, log A recovers with the previous active segment as current. It also resized the TimeIndex to the max. # Before all the log loads, the broker had a hard shutdown causing a clean shutdown marker left as is. # Broker restarts, log A skips recovery due to the presence of a clean shutdown marker but the TimeIndex file assumes the resized file from the previous instance is all full (it assumes either file is newly created or is full with valid value). # The first append to the active segment will result in roll and TimeIndex will be rolled with the timestamp value of the last valid entry (0) # Segment.the largest timestamp gives 0 (this can cause premature deletion of data due to retention. > TimeIndex handling may cause data loss in certain back to back failure > -- > > Key: KAFKA-10471 > URL: https://issues.apache.org/jira/browse/KAFKA-10471 > Project: Kafka > Issue Type: Bug > Components: core, log >Reporter: Rohit Shekhar >Priority: Critical > > # Active segment for log A going clean shutdown - trim the time index to the > latest fill value, set the clean shutdown marker. > # Broker restarts, loading logs - no recovery due to clean shutdown marker, > log A recovers with the previous active segment as current. It also resized > the TimeIndex to the max. > # Before all the log loads, the broker had a hard shutdown causing a clean > shutdown marker left as is. > # Broker restarts, log A skips recovery due to the presence of a clean > shutdown marker but the TimeIndex file assumes the resized file from the > previous instance is all full (it assumes either file is newly created or is > full with valid value). > # The first append to the active segment will result in roll and TimeIndex > will be rolled with the timestamp value of the last valid entry (0) > # Segment's largest timestamp gives 0 (this can cause premature deletion of > data due to retention. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10471) TimeIndex handling may cause data loss in certain back to back failure
Rohit Shekhar created KAFKA-10471: - Summary: TimeIndex handling may cause data loss in certain back to back failure Key: KAFKA-10471 URL: https://issues.apache.org/jira/browse/KAFKA-10471 Project: Kafka Issue Type: Bug Components: core, log Reporter: Rohit Shekhar # Active segment for log A going clean shutdown - trim the time index to the latest fill value, set the clean shutdown marker. # Broker restarts, loading logs - no recovery due to clean shutdown marker, log A recovers with the previous active segment as current. It also resized the TimeIndex to the max. # Before all the log loads, the broker had a hard shutdown causing a clean shutdown marker left as is. # Broker restarts, log A skips recovery due to the presence of a clean shutdown marker but the TimeIndex file assumes the resized file from the previous instance is all full (it assumes either file is newly created or is full with valid value). # The first append to the active segment will result in roll and TimeIndex will be rolled with the timestamp value of the last valid entry (0) # Segment.the largest timestamp gives 0 (this can cause premature deletion of data due to retention. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on a change in pull request #9157: Update for KIP-450 to handle early records
mjsax commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r485129599 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -117,25 +117,44 @@ public void process(final K key, final V value) { return; } -final long timestamp = context().timestamp(); -//don't process records that don't fall within a full sliding window -if (timestamp < windows.timeDifferenceMs()) { +final long inputRecordTimestamp = context().timestamp(); +observedStreamTime = Math.max(observedStreamTime, inputRecordTimestamp); +final long closeTime = observedStreamTime - windows.gracePeriodMs(); + +if (inputRecordTimestamp + 1 + windows.timeDifferenceMs() <= closeTime) { Review comment: nit: `1` -> `1L` ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -146,96 +165,203 @@ public void processInOrder(final K key, final V value, final long timestamp) { boolean leftWinAlreadyCreated = false; boolean rightWinAlreadyCreated = false; -// keep the left type window closest to the record -Window latestLeftTypeWindow = null; +Long previousRecordTimestamp = null; + try ( final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( key, key, -timestamp - 2 * windows.timeDifferenceMs(), +Math.max(0, inputRecordTimestamp - 2 * windows.timeDifferenceMs()), // to catch the current record's right window, if it exists, without more calls to the store -timestamp + 1) +inputRecordTimestamp + 1) ) { while (iterator.hasNext()) { -final KeyValue, ValueAndTimestamp> next = iterator.next(); -windowStartTimes.add(next.key.window().start()); -final long startTime = next.key.window().start(); +final KeyValue, ValueAndTimestamp> windowBeingProcessed = iterator.next(); +final long startTime = windowBeingProcessed.key.window().start(); +windowStartTimes.add(startTime); final long endTime = startTime + windows.timeDifferenceMs(); +final long windowMaxRecordTimestamp = windowBeingProcessed.value.timestamp(); -if (endTime < timestamp) { -leftWinAgg = next.value; -if (isLeftWindow(next)) { -latestLeftTypeWindow = next.key.window(); -} -} else if (endTime == timestamp) { +if (endTime < inputRecordTimestamp) { +leftWinAgg = windowBeingProcessed.value; +previousRecordTimestamp = windowMaxRecordTimestamp; +} else if (endTime == inputRecordTimestamp) { leftWinAlreadyCreated = true; -putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); -} else if (endTime > timestamp && startTime <= timestamp) { -rightWinAgg = next.value; -putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); -} else { +if (windowMaxRecordTimestamp < inputRecordTimestamp) { +previousRecordTimestamp = windowMaxRecordTimestamp; +} + updateWindowAndForward(windowBeingProcessed.key.window(), windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp); +} else if (endTime > inputRecordTimestamp && startTime <= inputRecordTimestamp) { +rightWinAgg = windowBeingProcessed.value; + updateWindowAndForward(windowBeingProcessed.key.window(), windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp); +} else if (startTime == inputRecordTimestamp + 1) { rightWinAlreadyCreated = true; +} else { +throw new IllegalStateException("Unexpected window found when processing sliding windows"); } } } //create right window for previous record -if (latestLeftTypeWindow != null) { -final long rightWinStart = latestLeftTypeWindow.end() + 1; -if (!windowStartTimes.contains(rightWinStart)) { -final TimeWindow window = new TimeWindo
[GitHub] [kafka] lct45 commented on pull request #9264: KAFKA-5636: Add Sliding Windows documentation
lct45 commented on pull request #9264: URL: https://github.com/apache/kafka/pull/9264#issuecomment-689115428 @ableegoldman for review This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on pull request #9157: URL: https://github.com/apache/kafka/pull/9157#issuecomment-689114712 One unrelated test failure: `kafka.api.PlaintextAdminIntegrationTest.testAlterReplicaLogDirs` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10432) LeaderEpochCache is incorrectly recovered on segment recovery for epoch 0
[ https://issues.apache.org/jira/browse/KAFKA-10432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-10432. - Fix Version/s: 2.6.1 Resolution: Fixed > LeaderEpochCache is incorrectly recovered on segment recovery for epoch 0 > - > > Key: KAFKA-10432 > URL: https://issues.apache.org/jira/browse/KAFKA-10432 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0 >Reporter: Lucas Bradstreet >Priority: Major > Fix For: 2.6.1 > > > I added some functionality to the system tests to compare epoch cache > lineages ([https://github.com/apache/kafka/pull/9213]), and I found a bug in > leader epoch cache recovery. > The test hard kills a broker and the cache hasn't been flushed yet, and then > it starts up and goes through log recovery. After recovery there is > divergence in the epoch caches for epoch 0: > {noformat} > AssertionError: leader epochs for output-topic-1 didn't match > [{0: 9393L, 2: 9441L, 4: 42656L}, > {0: 0L, 2: 9441L, 4: 42656L}, > {0: 0L, 2: 9441L, 4: 42656L}] > > > {noformat} > The cache is supposed to include the offset for epoch 0 but in recovery it > skips it > [https://github.com/apache/kafka/blob/487b3682ebe0eefde3445b37ee72956451a9d15e/core/src/main/scala/kafka/log/LogSegment.scala#L364] > due to > [https://github.com/apache/kafka/commit/d152989f26f51b9004b881397db818ad6eaf0392]. > Then it stamps the epoch with a later offset when fetching from the leader. > I'm not sure why the recovery code includes the condition > `batch.partitionLeaderEpoch > 0`. I discussed this with Jason Gustafson and > he believes it may have been intended to avoid assigning negative epochs but > is not sure why it was added. None of the tests fail with this check removed. > {noformat} > leaderEpochCache.foreach { cache => > if (batch.partitionLeaderEpoch > 0 && > cache.latestEpoch.forall(batch.partitionLeaderEpoch > _)) > cache.assign(batch.partitionLeaderEpoch, batch.baseOffset) > } > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji merged pull request #9219: KAFKA-10432: LeaderEpochCache is incorrectly recovered for leader epoch 0
hachikuji merged pull request #9219: URL: https://github.com/apache/kafka/pull/9219 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #9267: MINOR: Add debug logs for StreamThread
guozhangwang commented on a change in pull request #9267: URL: https://github.com/apache/kafka/pull/9267#discussion_r485132348 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -706,13 +662,17 @@ void runOnce() { totalProcessed += processed; } +log.debug("TaskManager#process handled {} records; invoking TaskManager#punctuate", processed); Review comment: Nit: I'd suggest we do not expose internal class names in log entries, e.g. here we can say "Processed {} records with {} iterations, invoking punctuation now", ditto below. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -689,6 +644,7 @@ void runOnce() { * 6. Otherwise, increment N. */ do { +log.debug("Invoking TaskManager#process with {} iterations.", numIterations); Review comment: What's the rationale of recording both the starting and the ending of a procedure? If it is for trouble shooting purposes only maybe the starting log entry can be trace while ending entry is debug? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -752,6 +712,77 @@ void runOnce() { commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, now); } +private void initializeAndRestorePhase() { +{ +// only try to initialize the assigned tasks +// if the state is still in PARTITION_ASSIGNED after the poll call +final State stateSnapshot = state; +if (stateSnapshot == State.PARTITIONS_ASSIGNED +|| stateSnapshot == State.RUNNING && taskManager.needsInitializationOrRestoration()) { + +log.debug("State is {}; initializing and restoring", stateSnapshot); + +// transit to restore active is idempotent so we can call it multiple times +changelogReader.enforceRestoreActive(); + +if (taskManager.tryToCompleteRestoration()) { +changelogReader.transitToUpdateStandby(); + +setState(State.RUNNING); +} + +if (log.isDebugEnabled()) { +log.debug("Initialization and restore call done. State is {}", state); +} +} +} + +log.debug("Invoking ChangeLogReader#restore"); +// we can always let changelog reader try restoring in order to initialize the changelogs; +// if there's no active restoring or standby updating it would not try to fetch any data +changelogReader.restore(); +} + +private long pollPhase() { +final ConsumerRecords records; +log.debug("Invoking Consumer#poll"); + +if (state == State.PARTITIONS_ASSIGNED) { +// try to fetch some records with zero poll millis +// to unblock the restoration as soon as possible +records = pollRequests(Duration.ZERO); +} else if (state == State.PARTITIONS_REVOKED) { +// try to fetch som records with zero poll millis to unblock +// other useful work while waiting for the join response +records = pollRequests(Duration.ZERO); +} else if (state == State.RUNNING || state == State.STARTING) { +// try to fetch some records with normal poll time +// in order to get long polling +records = pollRequests(pollTime); +} else if (state == State.PENDING_SHUTDOWN) { +// we are only here because there's rebalance in progress, +// just poll with zero to complete it +records = pollRequests(Duration.ZERO); +} else { +// any other state should not happen +log.error("Unexpected state {} during normal iteration", state); +throw new StreamsException(logPrefix + "Unexpected state " + state + " during normal iteration"); +} + +final long pollLatency = advanceNowAndComputeLatency(); + +if (log.isDebugEnabled()) { +log.debug("Consumer#poll completed in {} ms and fetched {} records", pollLatency, records.count()); +} +pollSensor.record(pollLatency, now); + +if (!records.isEmpty()) { Review comment: SG. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -752,6 +712,77 @@ void runOnce() { commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, now); } +private void initializeAndRestorePhase() { +{ +// only try to initialize the assigned tasks +// if the state is still in PARTITION_ASSIGNED after the poll call +final State stateSnapshot = state; +
[GitHub] [kafka] jonhkr commented on pull request #9256: Fix some Gradle deprecation warnings
jonhkr commented on pull request #9256: URL: https://github.com/apache/kafka/pull/9256#issuecomment-689072311 @ijuma Yes, it was. I've updated the code to use the `api` configuration provided by the `java-library` plugin. This configuration has the same behaviour of the `compile` that was deprecated. Now the generated poms should look exactly as before. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] JoelWee commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join
JoelWee commented on a change in pull request #9186: URL: https://github.com/apache/kafka/pull/9186#discussion_r485130942 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java ## @@ -58,23 +58,22 @@ public void init(final ProcessorContext context) { @Override public void process(final K1 key, final V1 value) { -// we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record -// If {@code keyMapper} returns {@code null} it implies there is no match, -// so ignore unless it is a left join Review comment: Right, thanks for the clarification! Wouldn't we still want the` leftJoin` in that case though? When we reach the `leftJoin` in the [code](https://github.com/apache/kafka/pull/9186/commits/e9616c64dfdc33481d0b831f80ecd0385801c761), the `mappedKey` is never null but it might not exist in the GlobalKTable (and so `value2` is null). If we're doing a `leftJoin`, then we'll want to allow these null values? (If not, the `leftJoin` is just the same as the normal `join`?) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] JoelWee commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join
JoelWee commented on a change in pull request #9186: URL: https://github.com/apache/kafka/pull/9186#discussion_r485126654 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java ## @@ -58,22 +58,23 @@ public void init(final ProcessorContext context) { @Override public void process(final K1 key, final V1 value) { -// we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record -// If {@code keyMapper} returns {@code null} it implies there is no match, +// We allow null keys unless {@code keyMapper} returns {@code null} and we ignore it as invalid. Review comment: I think that makes sense - but that will mean removing the previous logic of "`mappedKey` is null implies key not found in global table"? (Original line 62) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10455) Probing rebalances are not guaranteed to be triggered by non-leader members
[ https://issues.apache.org/jira/browse/KAFKA-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17192371#comment-17192371 ] Guozhang Wang commented on KAFKA-10455: --- Thanks for the info [~ableegoldman]. I think we can have two approaches here which are not necessarily conflicting with each other (i.e. we can probably do both): 1. We a) modify client to always refresh metadata before re-joining the group to reduce the probability of rebalance storm, and then b) modify brokers to just allow any join request to trigger a rebalance. But this depends on brokers and hence people who do not upgrade their brokers would not get this fix. 2. Based on the fact that only consumers from the same instance of the leader thread would trigger rebalances, we can take it as a by-product of multi-threading proposal such that each instance would only have one consumer instead of one-consumer per thread. And with a single consumer per instance, we can avoid this issue as well as simplifying our two-phase assignment algorithm to single-phase as well. > Probing rebalances are not guaranteed to be triggered by non-leader members > --- > > Key: KAFKA-10455 > URL: https://issues.apache.org/jira/browse/KAFKA-10455 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Sophie Blee-Goldman >Priority: Major > > Apparently, if a consumer rejoins the group with the same subscription > userdata that it previously sent, it will not trigger a rebalance. The one > exception here is that the group leader will always trigger a rebalance when > it rejoins the group. > This has implications for KIP-441, where we rely on asking an arbitrary > thread to enforce the followup probing rebalances. Technically we do ask a > thread living on the same instance as the leader, so the odds that the leader > will be chosen aren't completely abysmal, but for any multithreaded > application they are still at best only 50%. > Of course in general the userdata will have changed within a span of 10 > minutes, so the actual likelihood of hitting this is much lower – it can > only happen if the member's task offset sums remained unchanged. > Realistically, this probably requires that the member only have > fully-restored active tasks (encoded with the constant sentinel -2) and that > no tasks be added or removed. > > One solution would be to make sure the leader is responsible for the probing > rebalance. To do this, we would need to somehow expose the memberId of the > thread's main consumer to the partition assignor. I'm actually not sure if > that's currently possible to figure out or not. If not, we could just assign > the probing rebalance to every thread on the leader's instance. This > shouldn't result in multiple followup rebalances as the rebalance schedule > will be updated/reset on the first followup rebalance. > Another solution would be to make sure the userdata is always different. We > could encode an extra bit that flip-flops, but then we'd have to persist the > latest value somewhere/somehow. Alternatively we could just encode the next > probing rebalance time in the subscription userdata, since that is guaranteed > to always be different from the previous rebalance. This might get tricky > though, and certainly wastes space in the subscription userdata. Also, this > would only solve the problem for KIP-441 probing rebalances, meaning we'd > have to individually ensure the userdata has changed for every type of > followup rebalance (see related issue below). So the first proposal, > requiring the leader trigger the rebalance, would be preferable. > Note that, imho, we should just allow anyone to trigger a rebalance by > rejoining the group. But this would presumably require a broker-side change > and thus we would still need a workaround for KIP-441 to work with brokers. > > Related issue: > This also means the Streams workaround for [KAFKA-9821|http://example.com] is > not airtight, as we encode the followup rebalance in the member who is > supposed to _receive_ a revoked partition, rather than the member who is > actually revoking said partition. While the member doing the revoking will be > guaranteed to have different userdata, the member receiving the partition may > not. Making it the responsibility of the leader to trigger _any_ type of > followup rebalance would solve this issue as well. > Note that other types of followup rebalance (version probing, static > membership with host info change) are guaranteed to have a change in the > subscription userdata, and will not hit this bug -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] stanislavkozlovski commented on pull request #9266: KAFKA-10469: Resolve logger levels hierarchically
stanislavkozlovski commented on pull request #9266: URL: https://github.com/apache/kafka/pull/9266#issuecomment-689043684 Sorry, I mistook `kafka` for the root logger. I agree it tests the same path This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji opened a new pull request #9268: KAFKA-10442; Add transaction admin APIs for KIP-664
hachikuji opened a new pull request #9268: URL: https://github.com/apache/kafka/pull/9268 This patch adds support for the new transactional APIs from KIP-664: https://cwiki.apache.org/confluence/display/KAFKA/KIP-664%3A+Provide+tooling+to+detect+and+abort+hanging+transactions#KIP664:Providetoolingtodetectandaborthangingtransactions-DescribeTransactions. Note that this does not include support for the `--find-hanging` action. I will add this separately. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9267: MINOR: Add debug logs for StreamThread
wcarlson5 commented on a change in pull request #9267: URL: https://github.com/apache/kafka/pull/9267#discussion_r485098403 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -752,6 +712,77 @@ void runOnce() { commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, now); } +private void initializeAndRestorePhase() { +{ +// only try to initialize the assigned tasks +// if the state is still in PARTITION_ASSIGNED after the poll call +final State stateSnapshot = state; +if (stateSnapshot == State.PARTITIONS_ASSIGNED +|| stateSnapshot == State.RUNNING && taskManager.needsInitializationOrRestoration()) { + +log.debug("State is {}; initializing and restoring", stateSnapshot); + +// transit to restore active is idempotent so we can call it multiple times +changelogReader.enforceRestoreActive(); + +if (taskManager.tryToCompleteRestoration()) { +changelogReader.transitToUpdateStandby(); + +setState(State.RUNNING); +} + +if (log.isDebugEnabled()) { +log.debug("Initialization and restore call done. State is {}", state); +} +} +} + +log.debug("Invoking ChangeLogReader#restore"); Review comment: Is this necessary with the logs inside restore()? maybe can include snapshotState so we can see if it's STARTING or RUNNING? because we don't see the state unless it enters the initialization. Not sure if this would be useful ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -706,13 +662,17 @@ void runOnce() { totalProcessed += processed; } +log.debug("TaskManager#process handled {} records; invoking TaskManager#punctuate", processed); + final int punctuated = taskManager.punctuate(); final long punctuateLatency = advanceNowAndComputeLatency(); totalPunctuateLatency += punctuateLatency; if (punctuated > 0) { punctuateSensor.record(punctuateLatency / (double) punctuated, now); } +log.debug("TaskManager#punctuate executed: {}", punctuated); Review comment: trace? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join
ableegoldman commented on a change in pull request #9186: URL: https://github.com/apache/kafka/pull/9186#discussion_r485097186 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java ## @@ -58,23 +58,22 @@ public void init(final ProcessorContext context) { @Override public void process(final K1 key, final V1 value) { -// we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record -// If {@code keyMapper} returns {@code null} it implies there is no match, -// so ignore unless it is a left join Review comment: Sorry, I think my original comment here was a bit ambiguous & confusingly phrased. What I meant was that the _removal_ of the comment seemed correct to me, ie we should not make any special exceptions for the left join case and should remove the `leftJoin` part of the `if (leftJoin || value2 != null) ` check down on line 79 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join
ableegoldman commented on a change in pull request #9186: URL: https://github.com/apache/kafka/pull/9186#discussion_r485097186 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java ## @@ -58,23 +58,22 @@ public void init(final ProcessorContext context) { @Override public void process(final K1 key, final V1 value) { -// we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record -// If {@code keyMapper} returns {@code null} it implies there is no match, -// so ignore unless it is a left join Review comment: Sorry, I think my original comment here was a bit ambiguous. What I meant was that the _removal_ of the comment seemed correct to me, ie we should not make any special exceptions for the left join case and should remove the `leftJoin` part of the `if (leftJoin || value2 != null) ` check down on line 79 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join
mjsax commented on a change in pull request #9186: URL: https://github.com/apache/kafka/pull/9186#discussion_r485096581 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java ## @@ -58,22 +58,23 @@ public void init(final ProcessorContext context) { @Override public void process(final K1 key, final V1 value) { -// we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record -// If {@code keyMapper} returns {@code null} it implies there is no match, +// We allow null keys unless {@code keyMapper} returns {@code null} and we ignore it as invalid. +// This happens for GlobalKTables but never for KTables since keyMapper just returns the key. +// For non-null keys, if {@code keyMapper} returns {@code null} it implies there is no match, // so ignore unless it is a left join // // we also ignore the record if value is null, because in a key-value data model a null-value indicates // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record -- // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored -if (key == null || value == null) { +final K2 mappedKey = keyMapper.apply(key, value); +if ((key == null && mappedKey == null) || (!leftJoin && mappedKey == null) || value == null) { LOG.warn( "Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]", key, value, context().topic(), context().partition(), context().offset() ); droppedRecordsSensor.record(); } else { -final K2 mappedKey = keyMapper.apply(key, value); final V2 value2 = mappedKey == null ? null : getValueOrNull(valueGetter.get(mappedKey)); Review comment: At this point, we know that `mappedKey != null`, otherwise, we would drop the record. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join
mjsax commented on a change in pull request #9186: URL: https://github.com/apache/kafka/pull/9186#discussion_r485096581 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java ## @@ -58,22 +58,23 @@ public void init(final ProcessorContext context) { @Override public void process(final K1 key, final V1 value) { -// we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record -// If {@code keyMapper} returns {@code null} it implies there is no match, +// We allow null keys unless {@code keyMapper} returns {@code null} and we ignore it as invalid. +// This happens for GlobalKTables but never for KTables since keyMapper just returns the key. +// For non-null keys, if {@code keyMapper} returns {@code null} it implies there is no match, // so ignore unless it is a left join // // we also ignore the record if value is null, because in a key-value data model a null-value indicates // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record -- // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored -if (key == null || value == null) { +final K2 mappedKey = keyMapper.apply(key, value); +if ((key == null && mappedKey == null) || (!leftJoin && mappedKey == null) || value == null) { LOG.warn( "Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]", key, value, context().topic(), context().partition(), context().offset() ); droppedRecordsSensor.record(); } else { -final K2 mappedKey = keyMapper.apply(key, value); final V2 value2 = mappedKey == null ? null : getValueOrNull(valueGetter.get(mappedKey)); Review comment: At this point, we know that `mappedKey != null`, otherwise, we would have dropped the record. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10313) Out of range offset errors leading to offset reset
[ https://issues.apache.org/jira/browse/KAFKA-10313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17192358#comment-17192358 ] Michał Łukowicz commented on KAFKA-10313: - Hello Team! Same issue here - Kafka 2.5.0. Michał > Out of range offset errors leading to offset reset > -- > > Key: KAFKA-10313 > URL: https://issues.apache.org/jira/browse/KAFKA-10313 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.2.2 >Reporter: Varsha Abhinandan >Priority: Major > > Hi, > > We have been occasionally noticing offset resets happening on the Kafka > consumer because of offset out of range error. However, I don't see any > errors in the broker logs. No logs related to leader-election, replica lag, > Kafka broker pod restarts or anything. (just info logs were enabled in the > prod environment). > > It appeared from the logs that the out of range error was because of the > fetch offset being larger than the offset range on the broker. Noticed this > happening multiple times on different consumers, stream apps in the prod > environment. So, it doesn't seem like an application bug and more like a bug > in the KafkaConsumer. Would like to understand the cause for such errors. > > Also, none of the offset reset options are desirable. Choosing "earliest" > creates a sudden huge lag (we have a retention of 24hours) and choosing > "latest" leads to data loss (the records produced between the out of range > error and when offset reset happens on the consumer). So, wondering if it is > better for the Kafka client to separate out 'auto.offset.reset' config for > just offset not found. For, out of range error maybe the Kafka client can > automatically reset the offset to latest if the fetch offset is higher to > prevent data loss. Also, automatically reset it to earliest if the fetch > offset is lesser than the start offset. > > Following are the logs on the consumer side : > {noformat} > [2020-07-17T08:46:00,322Z] [INFO ] [pipeline-thread-12 > ([prd453-19-event-upsert]-bo-pipeline-12)] > [o.a.k.c.consumer.internals.Fetcher] [Consumer > clientId=bo-indexer-group-prd453-19-on-c19-bo-indexer-upsert-blue-5d665bcbb7-dnvkh-pid-1-kafka-message-source-id-544, > groupId=bo-indexer-group-prd453-19] Fetch offset 476383711 is out of range > for partition prd453-19-event-upsert-32, resetting offset > [2020-07-17T08:46:00,330Z] [INFO ] [pipeline-thread-12 > ([prd453-19-event-upsert]-bo-pipeline-12)] > [o.a.k.c.consumer.internals.Fetcher] [Consumer > clientId=bo-indexer-group-prd453-19-on-c19-bo-indexer-upsert-blue-5d665bcbb7-dnvkh-pid-1-kafka-message-source-id-544, > groupId=bo-indexer-group-prd453-19] Resetting offset for partition > prd453-19-event-upsert-32 to offset 453223789. > {noformat} > Broker logs for the partition : > {noformat} > [2020-07-17T07:40:12,082Z] [INFO ] [kafka-scheduler-4] [kafka.log.Log] > [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Found deletable > segments with base offsets [452091893] due to retention time 8640ms breach > [2020-07-17T07:40:12,082Z] [INFO ] [kafka-scheduler-4] [kafka.log.Log] > [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Scheduling log > segment [baseOffset 452091893, size 1073741693] for deletion. > [2020-07-17T07:40:12,083Z] [INFO ] [kafka-scheduler-4] [kafka.log.Log] > [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Incrementing log > start offset to 453223789 > [2020-07-17T07:41:12,083Z] [INFO ] [kafka-scheduler-7] [kafka.log.Log] > [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Deleting segment > 452091893 > [2020-07-17T07:41:12,114Z] [INFO ] [kafka-scheduler-7] > [kafka.log.LogSegment] Deleted log > /data/kafka/prd453-19-event-upsert-32/000452091893.log.deleted. > [2020-07-17T07:41:12,114Z] [INFO ] [kafka-scheduler-7] > [kafka.log.LogSegment] Deleted offset index > /data/kafka/prd453-19-event-upsert-32/000452091893.index.deleted. > [2020-07-17T07:41:12,114Z] [INFO ] [kafka-scheduler-7] > [kafka.log.LogSegment] Deleted time index > /data/kafka/prd453-19-event-upsert-32/000452091893.timeindex.deleted. > [2020-07-17T07:52:31,836Z] [INFO ] [data-plane-kafka-request-handler-3] > [kafka.log.ProducerStateManager] [ProducerStateManager > partition=prd453-19-event-upsert-32] Writing producer snapshot at offset > 475609786 > [2020-07-17T07:52:31,836Z] [INFO ] [data-plane-kafka-request-handler-3] > [kafka.log.Log] [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] > Rolled new log segment at offset 475609786 in 1 ms.{noformat} > > {noformat} > [2020-07-17T09:05:12,075Z] [INFO ] [kafka-scheduler-2] [kafka.log.Log] > [Log partition=prd453-19-event-upser
[GitHub] [kafka] mjsax commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join
mjsax commented on a change in pull request #9186: URL: https://github.com/apache/kafka/pull/9186#discussion_r485096090 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java ## @@ -58,22 +58,23 @@ public void init(final ProcessorContext context) { @Override public void process(final K1 key, final V1 value) { -// we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record -// If {@code keyMapper} returns {@code null} it implies there is no match, +// We allow null keys unless {@code keyMapper} returns {@code null} and we ignore it as invalid. +// This happens for GlobalKTables but never for KTables since keyMapper just returns the key. +// For non-null keys, if {@code keyMapper} returns {@code null} it implies there is no match, // so ignore unless it is a left join // // we also ignore the record if value is null, because in a key-value data model a null-value indicates // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record -- // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored -if (key == null || value == null) { +final K2 mappedKey = keyMapper.apply(key, value); +if ((key == null && mappedKey == null) || (!leftJoin && mappedKey == null) || value == null) { Review comment: This condition seems unnecessary complex. Should it not just be: ``` if (mappedKey == null || value == null) { ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join
mjsax commented on a change in pull request #9186: URL: https://github.com/apache/kafka/pull/9186#discussion_r485095735 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java ## @@ -58,22 +58,23 @@ public void init(final ProcessorContext context) { @Override public void process(final K1 key, final V1 value) { -// we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record -// If {@code keyMapper} returns {@code null} it implies there is no match, +// We allow null keys unless {@code keyMapper} returns {@code null} and we ignore it as invalid. Review comment: I guess we don't care about the original `key` any longer and only consider if `keyMapper` returns `null` or not? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #9266: KAFKA-10469: Resolve logger levels hierarchically
tombentley commented on pull request #9266: URL: https://github.com/apache/kafka/pull/9266#issuecomment-689020104 @stanislavkozlovski I'm not sure how what you describe differs from the test implemented, though it's not identically the same because the tests have their own `log4j.properties` which configures `kafka`, but not `kafka.controller`. I guess it might be clearer (and less fragile) if the test loggers were `foo` and `foo.bar`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9267: MINOR: Add debug logs for StreamThread
vvcephei commented on a change in pull request #9267: URL: https://github.com/apache/kafka/pull/9267#discussion_r485062295 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -612,63 +612,18 @@ void runOnce() { final long startMs = time.milliseconds(); now = startMs; -if (state == State.PARTITIONS_ASSIGNED) { -// try to fetch some records with zero poll millis -// to unblock the restoration as soon as possible -records = pollRequests(Duration.ZERO); -} else if (state == State.PARTITIONS_REVOKED) { -// try to fetch som records with zero poll millis to unblock -// other useful work while waiting for the join response -records = pollRequests(Duration.ZERO); -} else if (state == State.RUNNING || state == State.STARTING) { -// try to fetch some records with normal poll time -// in order to get long polling -records = pollRequests(pollTime); -} else if (state == State.PENDING_SHUTDOWN) { -// we are only here because there's rebalance in progress, -// just poll with zero to complete it -records = pollRequests(Duration.ZERO); -} else { -// any other state should not happen -log.error("Unexpected state {} during normal iteration", state); -throw new StreamsException(logPrefix + "Unexpected state " + state + " during normal iteration"); -} - -final long pollLatency = advanceNowAndComputeLatency(); - -pollSensor.record(pollLatency, now); -if (records != null && !records.isEmpty()) { -pollRecordsSensor.record(records.count(), now); -taskManager.addRecordsToTasks(records); -} +final long pollLatency = pollPhase(); Review comment: `runOnce` was too long, according to checkStyle, so I factored out some of the execution phases. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -612,63 +612,18 @@ void runOnce() { final long startMs = time.milliseconds(); now = startMs; -if (state == State.PARTITIONS_ASSIGNED) { -// try to fetch some records with zero poll millis -// to unblock the restoration as soon as possible -records = pollRequests(Duration.ZERO); -} else if (state == State.PARTITIONS_REVOKED) { -// try to fetch som records with zero poll millis to unblock -// other useful work while waiting for the join response -records = pollRequests(Duration.ZERO); -} else if (state == State.RUNNING || state == State.STARTING) { -// try to fetch some records with normal poll time -// in order to get long polling -records = pollRequests(pollTime); -} else if (state == State.PENDING_SHUTDOWN) { -// we are only here because there's rebalance in progress, -// just poll with zero to complete it -records = pollRequests(Duration.ZERO); -} else { -// any other state should not happen -log.error("Unexpected state {} during normal iteration", state); -throw new StreamsException(logPrefix + "Unexpected state " + state + " during normal iteration"); -} - -final long pollLatency = advanceNowAndComputeLatency(); - -pollSensor.record(pollLatency, now); -if (records != null && !records.isEmpty()) { -pollRecordsSensor.record(records.count(), now); -taskManager.addRecordsToTasks(records); -} +final long pollLatency = pollPhase(); // Shutdown hook could potentially be triggered and transit the thread state to PENDING_SHUTDOWN during #pollRequests(). // The task manager internal states could be uninitialized if the state transition happens during #onPartitionsAssigned(). // Should only proceed when the thread is still running after #pollRequests(), because no external state mutation // could affect the task manager state beyond this point within #runOnce(). if (!isRunning()) { -log.debug("State already transits to {}, skipping the run once call after poll request", state); +log.debug("Thread state is already {}, skipping the run once call after poll request", state); Review comment: Just a slight rewording I thought could be clearer. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -752,6 +712,77 @@ void runOnce() { commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, now); } +private void initializeAndRestorePhase() { +{ +// only try to initialize the assigned tasks +
[GitHub] [kafka] ableegoldman commented on a change in pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually
ableegoldman commented on a change in pull request #9262: URL: https://github.com/apache/kafka/pull/9262#discussion_r485073167 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -328,15 +328,15 @@ private synchronized void cleanRemovedTasks(final long cleanupDelayMs, if (lock(id)) { final long now = time.milliseconds(); final long lastModifiedMs = taskDir.lastModified(); -if (now > lastModifiedMs + cleanupDelayMs) { -log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).", -logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs); - -Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME))); -} else if (manualUserCall) { +if (manualUserCall) { Review comment: Honestly it kind of seems like there is enough divergent logic to merit splitting this up into separate methods for the manual vs cleanup-delay cases. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10470) zstd decompression with small batches is slow and causes excessive GC
Robert Wagner created KAFKA-10470: - Summary: zstd decompression with small batches is slow and causes excessive GC Key: KAFKA-10470 URL: https://issues.apache.org/jira/browse/KAFKA-10470 Project: Kafka Issue Type: Bug Affects Versions: 2.5.1 Reporter: Robert Wagner Similar to KAFKA-5150 but for zstd instead of LZ4, it appears that a large decompression buffer (128kb) created by zstd-jni per batch is causing a significant performance bottleneck. The next upcoming version of zstd-jni (1.4.5-7) will have a new constructor for ZstdInputStream that allows the client to pass its own buffer. A similar fix as [PR #2967|https://github.com/apache/kafka/pull/2967] could be used to have the ZstdConstructor use a BufferSupplier to re-use the decompression buffer. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei opened a new pull request #9267: MINOR: Add debug logs for StreamThread
vvcephei opened a new pull request #9267: URL: https://github.com/apache/kafka/pull/9267 Add debug logs to see when Streams calls poll, process, commit, etc. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9157: Update for KIP-450 to handle early records
lct45 commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r485053442 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { +// if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty +if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } -//create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { -final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); +createCurrentRecordRightWindow(timestamp, rightWinAgg, key); +} +} + +/** + * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs] + * window, and we will update or create their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +// A window from [0, timeDifferenceMs] that holds all early records +KeyValue, ValueAndTimestamp> combinedWindow = null; +ValueAndTimestamp rightWinAgg = null; +boolean rightWinAlreadyCreated = false; +final Set windowStartTimes = new HashSet<>(); + +Long previousRecordTimestamp = null; + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( +key, +key, +Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), +// to catch the current record's right window, if it exists, without more calls to the store +timestamp + 1) +) { +KeyValue, ValueAndTimestamp> next; +while (iterator.hasNext()) { +next = iterator.next(); +windowStartTimes.add(next.key.window().start()); +final long startTime = next.key.window().start(); +final long windowMaxRecordTimestamp = next.value.timestamp(); + +if (startTime == 0) { +combinedWindow = next; +if (windowMaxRecordTimestamp < timestamp) { +// If maxRecordTimestamp > timestamp, the current record is out-of-order, meaning that the +// previous record's right window would have been created already by other records. This +// will always be true for early records, as they all fall within [0, timeDifferenceMs]. +previousRecordTimestamp = windowMaxRecordTimestamp; +} + +} else if (startTime <= timestamp) { +rightWinAgg = next.value; +putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); +} else if (startTime == timestamp + 1) { +rightWinAlreadyCreated = true; +} +} +} + +// if there wasn't a right window agg found and we need a right window for our new record, +// the current aggregate in the combined window will go in the new record's right window +if (rightWinAgg == null && combinedWindow != null && combinedWindow.value.timestamp() > timestamp) { +rightWinAgg = combinedWindow.value; +} + +//create right window for ne
[GitHub] [kafka] tombentley opened a new pull request #9266: KAFKA-10469: Resolve logger levels hierarchically
tombentley opened a new pull request #9266: URL: https://github.com/apache/kafka/pull/9266 Previous to root logger level was used, ignoring intervening loggers with different levels. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #9266: KAFKA-10469: Resolve logger levels hierarchically
tombentley commented on pull request #9266: URL: https://github.com/apache/kafka/pull/9266#issuecomment-688987558 @ijuma please could you take a look? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10469) describeConfigs() for broker loggers returns incorrect values
Tom Bentley created KAFKA-10469: --- Summary: describeConfigs() for broker loggers returns incorrect values Key: KAFKA-10469 URL: https://issues.apache.org/jira/browse/KAFKA-10469 Project: Kafka Issue Type: Bug Components: core Reporter: Tom Bentley Assignee: Tom Bentley {{Log4jController#loggers}} incorrectly uses the root logger's log level for any loggers which lack a configured log level of their own. This is incorrect because loggers without an explicit level inherit their level from their parent logger and this resolved level might be different from the root logger's level. This means that the levels reported from {{Admin.describeConfigs}}, which uses {{Log4jController#loggers}} are incorrect. This can be shown by using the default {{log4j.properties}} and describing a broker's loggers, it reports {noformat} kafka.controller=TRACE kafka.controller.ControllerChannelManager=INFO kafka.controller.ControllerEventManager$ControllerEventThread=INFO kafka.controller.KafkaController=INFO kafka.controller.RequestSendThread=INFO kafka.controller.TopicDeletionManager=INFO kafka.controller.ZkPartitionStateMachine=INFO kafka.controller.ZkReplicaStateMachine=INFO {noformat} The default {{log4j.properties}} does indeed set {{kafka.controller}} to {{TRACE}}, but it does not configure the others, so they're actually at {{TRACE}} not {{INFO}} as reported. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] junrao commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
junrao commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r485026583 ## File path: core/src/main/scala/kafka/server/DelayedOperation.scala ## @@ -219,38 +203,38 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = { assert(watchKeys.nonEmpty, "The watch key list can't be empty") -// The cost of tryComplete() is typically proportional to the number of keys. Calling -// tryComplete() for each key is going to be expensive if there are many keys. Instead, -// we do the check in the following way. Call tryComplete(). If the operation is not completed, -// we just add the operation to all keys. Then we call tryComplete() again. At this time, if -// the operation is still not completed, we are guaranteed that it won't miss any future triggering -// event since the operation is already on the watcher list for all keys. This does mean that -// if the operation is completed (by another thread) between the two tryComplete() calls, the -// operation is unnecessarily added for watch. However, this is a less severe issue since the -// expire reaper will clean it up periodically. - -// At this point the only thread that can attempt this operation is this current thread -// Hence it is safe to tryComplete() without a lock -var isCompletedByMe = operation.tryComplete() -if (isCompletedByMe) - return true - -var watchCreated = false -for(key <- watchKeys) { - // If the operation is already completed, stop adding it to the rest of the watcher list. - if (operation.isCompleted) -return false - watchForOperation(key, operation) - - if (!watchCreated) { -watchCreated = true -estimatedTotalOperations.incrementAndGet() - } -} - -isCompletedByMe = operation.maybeTryComplete() -if (isCompletedByMe) - return true +// The cost of tryComplete() is typically proportional to the number of keys. Calling tryComplete() for each key is +// going to be expensive if there are many keys. Instead, we do the check in the following way through safeTryCompleteOrElse(). +// If the operation is not completed, we just add the operation to all keys. Then we call tryComplete() again. At +// this time, if the operation is still not completed, we are guaranteed that it won't miss any future triggering +// event since the operation is already on the watcher list for all keys. +// +// ==[story about lock]== +// Through safeTryCompleteOrElse(), we hold the operation's lock while adding the operation to watch list and doing +// the tryComplete() check. This is to avoid a potential deadlock between the callers to tryCompleteElseWatch() and +// checkAndComplete(). For example, the following deadlock can happen if the lock is only held for the final tryComplete() +// 1) thread_a holds readlock of stateLock from TransactionStateManager +// 2) thread_a is executing tryCompleteElseWatch +// 3) thread_a adds op to watch list +// 4) thread_b requires writelock of stateLock from TransactionStateManager (blocked by thread_a) +// 5) thread_c calls checkAndComplete() and holds lock of op +// 6) thread_c is waiting readlock of stateLock to complete op (blocked by thread_b) +// 7) thread_a is waiting lock of op to call safeTryComplete (blocked by thread_c) Review comment: to call safeTryComplete => to call the final tryComplete() ## File path: core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala ## @@ -119,12 +110,33 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest @Test def testConcurrentTxnGoodPathSequence(): Unit = { -verifyConcurrentOperations(createGroupMembers, allOperationsWithTxn) +verifyConcurrentOperations(createGroupMembers, Seq( + new JoinGroupOperation, + new SyncGroupOperation, + new OffsetFetchOperation, + new CommitTxnOffsetsOperation, + new CompleteTxnOperation, + new HeartbeatOperation, + new LeaveGroupOperation +)) } @Test def testConcurrentRandomSequence(): Unit = { -verifyConcurrentRandomSequences(createGroupMembers, allOperationsWithTxn) +/** + * handleTxnCommitOffsets does not complete delayed requests now so it causes error if handleTxnCompletion is executed Review comment: causes error => causes an error ## File path: core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala ## @@ -119,12 +110,33 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest @Test def testConcurrentTxnGoodPathSequence(): Unit = { -verifyConcurrentOperations(createGroupMembers, allOper
[jira] [Comment Edited] (KAFKA-10467) kafka-topic --describe fails for topic created by "produce"
[ https://issues.apache.org/jira/browse/KAFKA-10467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17192282#comment-17192282 ] Swayam Raina edited comment on KAFKA-10467 at 9/8/20, 3:52 PM: --- Hi [~showuon], I was using an internal library which calls this method {code:java} public Future send(ProducerRecord record, Callback callback){code} The simplified wrapper method is responsible for creating ProducerRecord. was (Author: swayamraina): Hi [~showuon], I was using an internal library which calls this method {code:java} public Future send(ProducerRecord record, Callback callback){code} The above method is responsible for creating ProducerRecord. > kafka-topic --describe fails for topic created by "produce" > --- > > Key: KAFKA-10467 > URL: https://issues.apache.org/jira/browse/KAFKA-10467 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 2.3.1 > Environment: MacOS >Reporter: Swayam Raina >Priority: Minor > > {code:java} > > kafka-topics --version > 2.3.1 (Commit:18a913733fb71c01){code} > > While producing to a topic that does not already exists > {code:java} > producer.send("does-not-exists", "msg-1") > {code} > > broker creates the topic > {code:java} > // partition file > > ls /tmp/kafka-logs/ > does-not-exists-0{code} > > If I try to list the topics, it shows also shows this new topic > {code:java} > > kafka-topics --bootstrap-server localhost:9092 --list > does-not-exists-0 > {code} > Now while trying to describe the topic that was auto-created the following > error is thrown > > {code:java} > > kafka-topics --bootstrap-server localhost:9092 --topic does-not-exists > >--describe > Error while executing topic command : > org.apache.kafka.common.errors.UnknownServerException: The server experienced > an unexpected error when processing the request.Error while executing topic > command : org.apache.kafka.common.errors.UnknownServerException: The server > experienced an unexpected error when processing the request.[2020-09-08 > 00:21:30,890] ERROR java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.UnknownServerException: The server experienced > an unexpected error when processing the request. at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at > kafka.admin.TopicCommand$AdminClientTopicService.$anonfun$describeTopic$3(TopicCommand.scala:228) > at > kafka.admin.TopicCommand$AdminClientTopicService.$anonfun$describeTopic$3$adapted(TopicCommand.scala:225) > at scala.collection.Iterator.foreach(Iterator.scala:941) at > scala.collection.Iterator.foreach$(Iterator.scala:941) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at > scala.collection.IterableLike.foreach(IterableLike.scala:74) at > scala.collection.IterableLike.foreach$(IterableLike.scala:73) at > scala.collection.AbstractIterable.foreach(Iterable.scala:56) at > kafka.admin.TopicCommand$AdminClientTopicService.describeTopic(TopicCommand.scala:225) > at kafka.admin.TopicCommand$.main(TopicCommand.scala:66) at > kafka.admin.TopicCommand.main(TopicCommand.scala)Caused by: > org.apache.kafka.common.errors.UnknownServerException: The server experienced > an unexpected error when processing the request. (kafka.admin.TopicCommand$) > > {code} > ``` > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10467) kafka-topic --describe fails for topic created by "produce"
[ https://issues.apache.org/jira/browse/KAFKA-10467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17192282#comment-17192282 ] Swayam Raina commented on KAFKA-10467: -- Hi [~showuon], I was using an internal library which calls this method {code:java} public Future send(ProducerRecord record, Callback callback){code} The above method is responsible for creating ProducerRecord. > kafka-topic --describe fails for topic created by "produce" > --- > > Key: KAFKA-10467 > URL: https://issues.apache.org/jira/browse/KAFKA-10467 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 2.3.1 > Environment: MacOS >Reporter: Swayam Raina >Priority: Minor > > {code:java} > > kafka-topics --version > 2.3.1 (Commit:18a913733fb71c01){code} > > While producing to a topic that does not already exists > {code:java} > producer.send("does-not-exists", "msg-1") > {code} > > broker creates the topic > {code:java} > // partition file > > ls /tmp/kafka-logs/ > does-not-exists-0{code} > > If I try to list the topics, it shows also shows this new topic > {code:java} > > kafka-topics --bootstrap-server localhost:9092 --list > does-not-exists-0 > {code} > Now while trying to describe the topic that was auto-created the following > error is thrown > > {code:java} > > kafka-topics --bootstrap-server localhost:9092 --topic does-not-exists > >--describe > Error while executing topic command : > org.apache.kafka.common.errors.UnknownServerException: The server experienced > an unexpected error when processing the request.Error while executing topic > command : org.apache.kafka.common.errors.UnknownServerException: The server > experienced an unexpected error when processing the request.[2020-09-08 > 00:21:30,890] ERROR java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.UnknownServerException: The server experienced > an unexpected error when processing the request. at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at > kafka.admin.TopicCommand$AdminClientTopicService.$anonfun$describeTopic$3(TopicCommand.scala:228) > at > kafka.admin.TopicCommand$AdminClientTopicService.$anonfun$describeTopic$3$adapted(TopicCommand.scala:225) > at scala.collection.Iterator.foreach(Iterator.scala:941) at > scala.collection.Iterator.foreach$(Iterator.scala:941) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at > scala.collection.IterableLike.foreach(IterableLike.scala:74) at > scala.collection.IterableLike.foreach$(IterableLike.scala:73) at > scala.collection.AbstractIterable.foreach(Iterable.scala:56) at > kafka.admin.TopicCommand$AdminClientTopicService.describeTopic(TopicCommand.scala:225) > at kafka.admin.TopicCommand$.main(TopicCommand.scala:66) at > kafka.admin.TopicCommand.main(TopicCommand.scala)Caused by: > org.apache.kafka.common.errors.UnknownServerException: The server experienced > an unexpected error when processing the request. (kafka.admin.TopicCommand$) > > {code} > ``` > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] tombentley commented on pull request #9263: KAFKA-10468: Fix CNFE on deserialization of Log4jController#getLoggers
tombentley commented on pull request #9263: URL: https://github.com/apache/kafka/pull/9263#issuecomment-688933062 Thanks @ijuma This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley closed pull request #9263: KAFKA-10468: Fix CNFE on deserialization of Log4jController#getLoggers
tombentley closed pull request #9263: URL: https://github.com/apache/kafka/pull/9263 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9256: Fix some Gradle deprecation warnings
ijuma commented on pull request #9256: URL: https://github.com/apache/kafka/pull/9256#issuecomment-688930298 Thanks for the PR. > Dependencies in the compile scope were moved to runtime. This seems like an issue, no? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mmerdes opened a new pull request #9265: Fix typo in name of output topic
mmerdes opened a new pull request #9265: URL: https://github.com/apache/kafka/pull/9265 This change fixes inconsistent naming for the output topic in the documentation. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9182: KAFKA-10403 Replace scala collection by java collection in generating…
ijuma commented on a change in pull request #9182: URL: https://github.com/apache/kafka/pull/9182#discussion_r484981150 ## File path: core/src/main/scala/kafka/utils/Log4jController.scala ## @@ -87,9 +87,10 @@ object Log4jController { class Log4jController extends Log4jControllerMBean { def getLoggers: util.List[String] = { -Log4jController.loggers.map { +// we replace scala collection by java collection so mbean client is able to parse it without scala library. Review comment: Instead of `parse`, we should say `deserialize`. ## File path: core/src/test/scala/kafka/utils/LoggingTest.scala ## @@ -26,6 +26,12 @@ import org.junit.Assert.{assertEquals, assertTrue} class LoggingTest extends Logging { + @Test + def testTypeOfGetLoggers(): Unit = { Review comment: Can we add a comment to the test too since this is not obvious. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9182: KAFKA-10403 Replace scala collection by java collection in generating…
chia7712 commented on pull request #9182: URL: https://github.com/apache/kafka/pull/9182#issuecomment-688927747 > can you please provide a test as well? done! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9182: KAFKA-10403 Replace scala collection by java collection in generating…
ijuma commented on pull request #9182: URL: https://github.com/apache/kafka/pull/9182#issuecomment-688916053 The change looks good, can you please provide a test as well? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9263: KAFKA-10468: Fix CNFE on deserialization of Log4jController#getLoggers
ijuma commented on pull request #9263: URL: https://github.com/apache/kafka/pull/9263#issuecomment-688915561 This is a duplicate of https://github.com/apache/kafka/pull/9182 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9231: KAFKA-10447: Migrate tools module to JUnit 5 and mockito
ijuma commented on pull request #9231: URL: https://github.com/apache/kafka/pull/9231#issuecomment-688912021 Unrelated flaky failures: ``` Build / JDK 15 / kafka.network.DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota Build / JDK 8 / kafka.api.PlaintextAdminIntegrationTest.testConsumerGroups Build / JDK 8 / org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true] ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 opened a new pull request #9264: KAFKA-5636: Add Sliding Windows documentation
lct45 opened a new pull request #9264: URL: https://github.com/apache/kafka/pull/9264 Add necessary documentation for [KIP-450](https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL), adding sliding window aggregations to KStreams ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #9260: MINOR: Update scala default version in readme
ijuma merged pull request #9260: URL: https://github.com/apache/kafka/pull/9260 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #9263: KAFKA-10468: Fix CNFE on deserialization of Log4jController#getLoggers
tombentley commented on pull request #9263: URL: https://github.com/apache/kafka/pull/9263#issuecomment-688900724 @mimaison this one is really trivial, grateful if you could take a look. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley opened a new pull request #9263: KAFKA-10468: Fix CNFE on deserialization of Log4jController#getLoggers
tombentley opened a new pull request #9263: URL: https://github.com/apache/kafka/pull/9263 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10468) Log4jController.getLoggers serialization
Tom Bentley created KAFKA-10468: --- Summary: Log4jController.getLoggers serialization Key: KAFKA-10468 URL: https://issues.apache.org/jira/browse/KAFKA-10468 Project: Kafka Issue Type: Bug Components: core Reporter: Tom Bentley {{Log4jController#getLoggers()}} returns a {{java.util.List}} wrapper for a Scala {{List}}, which results in a {{ClassNotFoundException}} on any MBean client which doesn't have the scala wrapper class on its classpath. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] felipeazv removed a comment on pull request #7288: KAFKA-7931 : [Proposal] Fix metadata fetch for ephemeral brokers behind a Virtual IP
felipeazv removed a comment on pull request #7288: URL: https://github.com/apache/kafka/pull/7288#issuecomment-688860769 Any updates in this PR? I am also facing this issue (client 2.5.0). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] felipeazv edited a comment on pull request #7288: KAFKA-7931 : [Proposal] Fix metadata fetch for ephemeral brokers behind a Virtual IP
felipeazv edited a comment on pull request #7288: URL: https://github.com/apache/kafka/pull/7288#issuecomment-688860769 Any updates in this PR? I am also facing this issue (client 2.5.0). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org