[GitHub] [kafka] dengziming commented on pull request #10701: KAFKA-10437; Fix omitted TODO of KIP-478
dengziming commented on pull request #10701: URL: https://github.com/apache/kafka/pull/10701#issuecomment-841594749 Hello @vvcephei, PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming opened a new pull request #10701: KAFKA-10437; Fix omitted TODO of KIP-478
dengziming opened a new pull request #10701: URL: https://github.com/apache/kafka/pull/10701 *More detailed description of your change* It seems that #9396 leaves out a TODO, just fix it. *Summary of testing strategy (including rationale)* QA ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao opened a new pull request #10700: KAFKA-12789: Remove Stale comments for meta response handling logic
zhaohaidao opened a new pull request #10700: URL: https://github.com/apache/kafka/pull/10700 According to my understanding, the following paragraph looks like a stale comments. > public void handleSuccessfulResponse(RequestHeader requestHeader, long now, MetadataResponse response) { ... // Don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being // created which means we will get errors and no nodes until it exists if (response.brokers().isEmpty()) { log.trace("Ignoring empty metadata response with correlation id {}.", requestHeader.correlationId()); this.metadata.failedUpdate(now); } else { this.metadata.update(inProgress.requestVersion, response, inProgress.isPartialUpdate, now); } ... The comments above mean we will may get errors and no nodes if the topic we want may still be in the process of being created. However, every meta request will return all brokers from the logic of the server side, just as followed > def handleTopicMetadataRequest(request: RequestChannel.Request): Unit = { ... val brokers = metadataCache.getAliveBrokers ... } I studied the related git commit history and figured out why. 1. This comments was first introduced in KAFKA-642 (e11447650a). which means meta request only need brokers related to the topics we want. 2. KAFKA-1535 (commitId: 4ebcdfd51f) changed the server side logic. which has the metadata response contain all alive brokers rather than just the ones needed for the given topics. 3. However, this comments are retained till now. So According to my understanding, this comments looks like a stale one and can be removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12776) Producer sends messages out-of-order inspite of enabling idempotence
[ https://issues.apache.org/jira/browse/KAFKA-12776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17344958#comment-17344958 ] NEERAJ VAIDYA commented on KAFKA-12776: --- [~ableegoldman], [~ijuma], [~tombentley] I think I kind of know why this issue occurs. Here is my take: This issue or ordering mainly happens when there are multiple requests which are being handled in parallel by the KafkaProducer and when metadata is being fetched at the time of processing the request. For example, consider a web service which accepts HTTP requests on the upstream side. For each request (R1,R2,R3,Rn), if a KafkaProducer#send is invoked, and if the Kafka cluster is down AND metadata is not available, then this will cause each of R1,Rn to be blocked in a metadata fetch call. When the cluster is brought back online (within max.block.ms), then the first request which succeeds in getting the metadata will successfully get pushed onto the Producer buffer and eventually to the Topic. The only way to control ordering when multiple parallel requests which are stuck in a Metadata fetch would be to ensure that they are all serviced by a single thread/queue ? I would have imagined that this must already be happening, but apparently not. > Producer sends messages out-of-order inspite of enabling idempotence > > > Key: KAFKA-12776 > URL: https://issues.apache.org/jira/browse/KAFKA-12776 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.6.0, 2.7.0 > Environment: Linux RHEL 7.9 and Ubuntu 20.04 >Reporter: NEERAJ VAIDYA >Priority: Major > Attachments: mocker.zip > > > I have an Apache Kafka 2.6 Producer which writes to topic-A (TA). > My application is basically a Spring boot web-application which accepts JSON > payloads via HTTP and then pushes each to a Kafka topic. I also use Spring > Cloud Stream Kafka in the application to create and use a Producer. > For one of my failure handling test cases, I shutdown the Kafka cluster while > my applications are running. (Note : No messages have been published to the > Kafka cluster before I stop the cluster) > When the producer application tries to write messages to TA, it cannot > because the cluster is down and hence (I assume) buffers the messages. Let's > say it receives 4 messages m1,m2,m3,m4 in increasing time order. (i.e. m1 is > first and m4 is last). > When I bring the Kafka cluster back online, the producer sends the buffered > messages to the topic, but they are not in order. I receive for example, m2 > then m3 then m1 and then m4. > Why is that ? Is it because the buffering in the producer is multi-threaded > with each producing to the topic at the same time ? > My project code is attached herewith. > I can confirm that I have enabled idempotence. I have also tried with > ```max.in.flight.requests=1``` -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12789) Remove Stale comments for meta response handling logic
[ https://issues.apache.org/jira/browse/KAFKA-12789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] HaiyuanZhao updated KAFKA-12789: Description: According to my understanding, the following paragraph looks like a stale comments. {code:java} public void handleSuccessfulResponse(RequestHeader requestHeader, long now, MetadataResponse response) { ... // Don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being // created which means we will get errors and no nodes until it exists if (response.brokers().isEmpty()) { log.trace("Ignoring empty metadata response with correlation id {}.", requestHeader.correlationId()); this.metadata.failedUpdate(now); } else { this.metadata.update(inProgress.requestVersion, response, inProgress.isPartialUpdate, now); } ... {code} The comments above mean we will may get errors and no nodes if the topic we want may still be in the process of being created. However, every meta request will return all brokers from the logic of the server side, just as followed {code:java} def handleTopicMetadataRequest(request: RequestChannel.Request): Unit = { ... val brokers = metadataCache.getAliveBrokers ... } {code} I studied the related git commit history and figured out why. # This comments was first introduced in KAFKA-642 (e11447650a). which means meta request only need brokers related to the topics we want. # KAFKA-1535 (commitId: 4ebcdfd51f) changed the server side logic. which has the metadata response contain all alive brokers rather than just the ones needed for the given topics. # However, this comments are retained till now. So According to my understanding, this comments looks like a stale one and can be removed. was: According to my understanding, the following paragraph looks like a stale comments. {code:java} public void handleSuccessfulResponse(RequestHeader requestHeader, long now, MetadataResponse response) { ... // Don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being // created which means we will get errors and no nodes until it exists if (response.brokers().isEmpty()) { log.trace("Ignoring empty metadata response with correlation id {}.", requestHeader.correlationId()); this.metadata.failedUpdate(now); } else { this.metadata.update(inProgress.requestVersion, response, inProgress.isPartialUpdate, now); } ... {code} The comments above mean we will may get errors and no nodes if the topic we want may still be in the process of being created. However, every meta request will return all brokers from the logic of the server side, just as followed {code:java} def handleTopicMetadataRequest(request: RequestChannel.Request): Unit = { ... val brokers = metadataCache.getAliveBrokers ... } {code} I studied the related git commit history and figured out why. # This comments are first introduced in KAFKA-642. which means meta request only need brokers related to the topics we want. # KAFKA-1535 changed the server side logic. which has the metadata response contain all alive brokers rather than just the ones needed for the given topics. # However, this comments are retained till now. So According to my understanding, this comments looks like a stale one and can be removed. > Remove Stale comments for meta response handling logic > -- > > Key: KAFKA-12789 > URL: https://issues.apache.org/jira/browse/KAFKA-12789 > Project: Kafka > Issue Type: Improvement >Reporter: HaiyuanZhao >Assignee: HaiyuanZhao >Priority: Minor > > According to my understanding, the following paragraph looks like a stale > comments. > {code:java} > public void handleSuccessfulResponse(RequestHeader requestHeader, long now, > MetadataResponse response) { > ... > // Don't update the cluster if there are no valid nodes...the > topic we want may still be in the process of being > // created which means we will get errors and no nodes until it > exists > if (response.brokers().isEmpty()) { > log.trace("Ignoring empty metadata response with correlation > id {}.", requestHeader.correlationId()); > this.metadata.failedUpdate(now); > } else { > this.metadata.update(inProgress.requestVersion, response, > inProgress.isPartialUpdate, now); > } > ... > {code} > The comments above mean we will may get errors and no nodes if the topic we > want may still be
[jira] [Updated] (KAFKA-12789) Remove Stale comments for meta response handling logic
[ https://issues.apache.org/jira/browse/KAFKA-12789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] HaiyuanZhao updated KAFKA-12789: Description: According to my understanding, the following paragraph looks like a stale comments. {code:java} public void handleSuccessfulResponse(RequestHeader requestHeader, long now, MetadataResponse response) { ... // Don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being // created which means we will get errors and no nodes until it exists if (response.brokers().isEmpty()) { log.trace("Ignoring empty metadata response with correlation id {}.", requestHeader.correlationId()); this.metadata.failedUpdate(now); } else { this.metadata.update(inProgress.requestVersion, response, inProgress.isPartialUpdate, now); } ... {code} The comments above mean we will may get errors and no nodes if the topic we want may still be in the process of being created. However, every meta request will return all brokers from the logic of the server side, just as followed {code:java} def handleTopicMetadataRequest(request: RequestChannel.Request): Unit = { ... val brokers = metadataCache.getAliveBrokers ... } {code} I studied the related git commit history and figured out why. # This comments are first introduced in KAFKA-642. which means meta request only need brokers related to the topics we want. # KAFKA-1535 changed the server side logic. which has the metadata response contain all alive brokers rather than just the ones needed for the given topics. # However, this comments are retained till now. So According to my understanding, this comments looks like a stale one and can be removed. was: According to my understanding, the following paragraph looks like a stale comments. {code:java} public void handleSuccessfulResponse(RequestHeader requestHeader, long now, MetadataResponse response) { ... // Don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being // created which means we will get errors and no nodes until it exists if (response.brokers().isEmpty()) { log.trace("Ignoring empty metadata response with correlation id {}.", requestHeader.correlationId()); this.metadata.failedUpdate(now); } else { this.metadata.update(inProgress.requestVersion, response, inProgress.isPartialUpdate, now); } ... {code} The comments above mean we will may get errors and no nodes if the topic we want may still be in the process of being created. However, every meta request will return all brokers from the logic of the server side, just as followed {code:java} def handleTopicMetadataRequest(request: RequestChannel.Request): Unit = { ... val brokers = metadataCache.getAliveBrokers ... } {code} I studied the related git commit history and figured out why. # This comments are first introduced in KAFKA-642. which means meta request only need brokers related to the topics we want. # Kafka-1535 changed the server side logic. which has the metadata response contain all alive brokers rather than just the ones needed for the given topics. # However, this comments are retained till now. So According to my understanding, this comments looks like a stale one and can be removed. > Remove Stale comments for meta response handling logic > -- > > Key: KAFKA-12789 > URL: https://issues.apache.org/jira/browse/KAFKA-12789 > Project: Kafka > Issue Type: Improvement >Reporter: HaiyuanZhao >Assignee: HaiyuanZhao >Priority: Minor > > According to my understanding, the following paragraph looks like a stale > comments. > {code:java} > public void handleSuccessfulResponse(RequestHeader requestHeader, long now, > MetadataResponse response) { > ... > // Don't update the cluster if there are no valid nodes...the > topic we want may still be in the process of being > // created which means we will get errors and no nodes until it > exists > if (response.brokers().isEmpty()) { > log.trace("Ignoring empty metadata response with correlation > id {}.", requestHeader.correlationId()); > this.metadata.failedUpdate(now); > } else { > this.metadata.update(inProgress.requestVersion, response, > inProgress.isPartialUpdate, now); > } > ... > {code} > The comments above mean we will may get errors and no nodes if the topic we > want may still be in the process of being created. > H
[jira] [Updated] (KAFKA-12789) Remove Stale comments for meta response handling logic
[ https://issues.apache.org/jira/browse/KAFKA-12789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] HaiyuanZhao updated KAFKA-12789: Description: According to my understanding, the following paragraph looks like a stale comments. {code:java} public void handleSuccessfulResponse(RequestHeader requestHeader, long now, MetadataResponse response) { ... // Don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being // created which means we will get errors and no nodes until it exists if (response.brokers().isEmpty()) { log.trace("Ignoring empty metadata response with correlation id {}.", requestHeader.correlationId()); this.metadata.failedUpdate(now); } else { this.metadata.update(inProgress.requestVersion, response, inProgress.isPartialUpdate, now); } ... {code} The comments above mean we will may get errors and no nodes if the topic we want may still be in the process of being created. However, every meta request will return all brokers from the logic of the server side, just as followed {code:java} def handleTopicMetadataRequest(request: RequestChannel.Request): Unit = { ... val brokers = metadataCache.getAliveBrokers ... } {code} I studied the related git commit history and figured out why. # This comments are first introduced in KAFKA-642. which means meta request only need brokers related to the topics we want. # Kafka-1535 changed the server side logic. which has the metadata response contain all alive brokers rather than just the ones needed for the given topics. # However, this comments are retained till now. So According to my understanding, this comments looks like a stale one and can be removed. was: According to my understanding, the following paragraph looks like a stale comments? ~subscript text~ / /Don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being, // created which means we will get errors and no nodes until it exists > Remove Stale comments for meta response handling logic > -- > > Key: KAFKA-12789 > URL: https://issues.apache.org/jira/browse/KAFKA-12789 > Project: Kafka > Issue Type: Improvement >Reporter: HaiyuanZhao >Assignee: HaiyuanZhao >Priority: Minor > > According to my understanding, the following paragraph looks like a stale > comments. > {code:java} > public void handleSuccessfulResponse(RequestHeader requestHeader, long now, > MetadataResponse response) { > ... > // Don't update the cluster if there are no valid nodes...the > topic we want may still be in the process of being > // created which means we will get errors and no nodes until it > exists > if (response.brokers().isEmpty()) { > log.trace("Ignoring empty metadata response with correlation > id {}.", requestHeader.correlationId()); > this.metadata.failedUpdate(now); > } else { > this.metadata.update(inProgress.requestVersion, response, > inProgress.isPartialUpdate, now); > } > ... > {code} > The comments above mean we will may get errors and no nodes if the topic we > want may still be in the process of being created. > However, every meta request will return all brokers from the logic of the > server side, just as followed > {code:java} > def handleTopicMetadataRequest(request: RequestChannel.Request): Unit = { > ... > val brokers = metadataCache.getAliveBrokers > ... > } > {code} > I studied the related git commit history and figured out why. > # This comments are first introduced in KAFKA-642. which means meta request > only need brokers related to the topics we want. > # Kafka-1535 changed the server side logic. which has the metadata response > contain all alive brokers rather than just the ones needed for the given > topics. > # However, this comments are retained till now. > So According to my understanding, this comments looks like a stale one and > can be removed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12789) Remove Stale comments for meta response handling logic
[ https://issues.apache.org/jira/browse/KAFKA-12789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] HaiyuanZhao updated KAFKA-12789: Description: According to my understanding, the following paragraph looks like a stale comments? ~subscript text~ / /Don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being, // created which means we will get errors and no nodes until it exists > Remove Stale comments for meta response handling logic > -- > > Key: KAFKA-12789 > URL: https://issues.apache.org/jira/browse/KAFKA-12789 > Project: Kafka > Issue Type: Improvement >Reporter: HaiyuanZhao >Assignee: HaiyuanZhao >Priority: Minor > > According to my understanding, the following paragraph looks like a stale > comments? > ~subscript text~ > / /Don't update the cluster if there are no valid nodes...the topic we want > may still be in the process of being, > // created which means we will get errors and no nodes until it exists -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12789) Remove Stale comments for meta response handling logic
HaiyuanZhao created KAFKA-12789: --- Summary: Remove Stale comments for meta response handling logic Key: KAFKA-12789 URL: https://issues.apache.org/jira/browse/KAFKA-12789 Project: Kafka Issue Type: Improvement Reporter: HaiyuanZhao Assignee: HaiyuanZhao -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ccding commented on pull request #10684: MINOR: Improve Log layer segment iteration logic and few other areas
ccding commented on pull request #10684: URL: https://github.com/apache/kafka/pull/10684#issuecomment-841577947 LGTM Thanks @kowshik -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #10494: KAFKA-12788: improve KRaft replica placement
junrao commented on a change in pull request #10494: URL: https://github.com/apache/kafka/pull/10494#discussion_r632872729 ## File path: metadata/src/test/java/org/apache/kafka/controller/StripedReplicaPlacerTest.java ## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.controller.StripedReplicaPlacer.BrokerList; +import org.apache.kafka.controller.StripedReplicaPlacer.RackList; +import org.apache.kafka.metadata.UsableBroker; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + + +@Timeout(value = 40) +public class StripedReplicaPlacerTest { +/** + * Test that the BrokerList class works as expected. + */ +@Test +public void testBrokerList() { +assertEquals(0, BrokerList.EMPTY.size()); +assertEquals(-1, BrokerList.EMPTY.next(1)); +BrokerList brokers = new BrokerList().add(0).add(1).add(2).add(3); +assertEquals(4, brokers.size()); +assertEquals(0, brokers.next(0)); +assertEquals(1, brokers.next(0)); +assertEquals(2, brokers.next(0)); +assertEquals(3, brokers.next(0)); +assertEquals(-1, brokers.next(0)); +assertEquals(-1, brokers.next(0)); +assertEquals(1, brokers.next(1)); +assertEquals(2, brokers.next(1)); +assertEquals(3, brokers.next(1)); +assertEquals(0, brokers.next(1)); +assertEquals(-1, brokers.next(1)); +} + +/** + * Test that we perform striped replica placement as expected, and don't use the + * fenced replica if we don't have to. + */ +@Test +public void testAvoidFencedReplicaIfPossibleOnSingleRack() { +MockRandom random = new MockRandom(); +RackList rackList = new RackList(random, Arrays.asList( +new UsableBroker(3, Optional.empty(), false), +new UsableBroker(1, Optional.empty(), true), +new UsableBroker(0, Optional.empty(), false), +new UsableBroker(4, Optional.empty(), false), +new UsableBroker(2, Optional.empty(), false)).iterator()); +assertEquals(5, rackList.numTotalBrokers()); +assertEquals(4, rackList.numUnfencedBrokers()); +assertEquals(Collections.singletonList(Optional.empty()), rackList.rackNames()); +assertThrows(InvalidReplicationFactorException.class, () -> rackList.place(0)); +assertThrows(InvalidReplicationFactorException.class, () -> rackList.place(-1)); +assertEquals(Arrays.asList(3, 4, 0, 2), rackList.place(4)); +assertEquals(Arrays.asList(4, 0, 2, 3), rackList.place(4)); +assertEquals(Arrays.asList(0, 2, 3, 4), rackList.place(4)); +assertEquals(Arrays.asList(2, 3, 4, 0), rackList.place(4)); +assertEquals(Arrays.asList(0, 4, 3, 2), rackList.place(4)); +} + +/** + * Test that we will place on the fenced replica if we need to. + */ +@Test +public void testPlacementOnFencedReplicaOnSingleRack() { +MockRandom random = new MockRandom(); +RackList rackList = new RackList(random, Arrays.asList( +new UsableBroker(3, Optional.empty(), false), +new UsableBroker(1, Optional.empty(), true), +new UsableBroker(2, Optional.empty(), false)).iterator()); +assertEquals(3, rackList.numTotalBrokers()); +assertEquals(2, rackList.numUnfencedBrokers()); +assertEquals(Collections.singletonList(Optional.empty()), rackList.rackNames()); +assertEquals(Arrays.asList(3, 2, 1), rackList.place(3)); +assertEquals(Arrays.asList(2, 3, 1), rackList.place(3)); +assertEquals(Arrays.asList(3, 2, 1), rackList.place(3)); +assertEquals(Arrays.asList(2, 3, 1), rackList.place(3)); +} + +@Test +public void testRackListWithMu
[GitHub] [kafka] dielhennr commented on a change in pull request #10572: KAFKA-12697: Add OfflinePartitionCount and PreferredReplicaImbalanceCount metrics to Quorum Controller
dielhennr commented on a change in pull request #10572: URL: https://github.com/apache/kafka/pull/10572#discussion_r632858915 ## File path: metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java ## @@ -139,10 +140,16 @@ public TopicIdPartition next() { * Partitions with no isr members appear in this map under id NO_LEADER. */ private final TimelineHashMap> isrMembers; + +private final Map offlinePartitionCounts; Review comment: This was so that when a topic is removed, any offline partitions for that topic are decremented from the counter. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10572: KAFKA-12697: Add OfflinePartitionCount and PreferredReplicaImbalanceCount metrics to Quorum Controller
cmccabe commented on a change in pull request #10572: URL: https://github.com/apache/kafka/pull/10572#discussion_r632857030 ## File path: metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java ## @@ -139,10 +140,16 @@ public TopicIdPartition next() { * Partitions with no isr members appear in this map under id NO_LEADER. */ private final TimelineHashMap> isrMembers; + +private final Map offlinePartitionCounts; Review comment: We cannot use regular maps here because they will not roll back to the desired state during a snapshot restore. In any case, I don't see why we need this map. It's enough to know how many offline partitions there are, which we already have a count of below. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe opened a new pull request #10699: MINOR: Add support for ZK Authorizer with KRaft
cmccabe opened a new pull request #10699: URL: https://github.com/apache/kafka/pull/10699 This patch adds support for running the ZooKeeper-based kafka.security.authorizer.AclAuthorizer with KRaft clusters. Set the authorizer.class.name config as well as the zookeeper.connect config while also setting the typical KRaft configs (node.id, process.roles, etc.), and the cluster will use KRaft for metadata and ZooKeeper for ACL storage. A system test that exercises the authorizer is included. This patch also changes "Raft" to "KRaft" in several system test files. It also fixes a bug where system test admin clients were unable to connect to a cluster with broker credentials via the SSL security protocol when the broker was using that for inter-broker communication and SASL for client communication. Co-authored-by: Ron Dagostino -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12776) Producer sends messages out-of-order inspite of enabling idempotence
[ https://issues.apache.org/jira/browse/KAFKA-12776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17344914#comment-17344914 ] NEERAJ VAIDYA commented on KAFKA-12776: --- [~tombentley] Yes, this has been reproduced with a KafkaProducer as well. > Producer sends messages out-of-order inspite of enabling idempotence > > > Key: KAFKA-12776 > URL: https://issues.apache.org/jira/browse/KAFKA-12776 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.6.0, 2.7.0 > Environment: Linux RHEL 7.9 and Ubuntu 20.04 >Reporter: NEERAJ VAIDYA >Priority: Major > Attachments: mocker.zip > > > I have an Apache Kafka 2.6 Producer which writes to topic-A (TA). > My application is basically a Spring boot web-application which accepts JSON > payloads via HTTP and then pushes each to a Kafka topic. I also use Spring > Cloud Stream Kafka in the application to create and use a Producer. > For one of my failure handling test cases, I shutdown the Kafka cluster while > my applications are running. (Note : No messages have been published to the > Kafka cluster before I stop the cluster) > When the producer application tries to write messages to TA, it cannot > because the cluster is down and hence (I assume) buffers the messages. Let's > say it receives 4 messages m1,m2,m3,m4 in increasing time order. (i.e. m1 is > first and m4 is last). > When I bring the Kafka cluster back online, the producer sends the buffered > messages to the topic, but they are not in order. I receive for example, m2 > then m3 then m1 and then m4. > Why is that ? Is it because the buffering in the producer is multi-threaded > with each producing to the topic at the same time ? > My project code is attached herewith. > I can confirm that I have enabled idempotence. I have also tried with > ```max.in.flight.requests=1``` -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rondagostino commented on pull request #10694: MINOR: fix system test TestSecurityRollingUpgrade
rondagostino commented on pull request #10694: URL: https://github.com/apache/kafka/pull/10694#issuecomment-841512146 Full system test output here: https://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2021-05-14--001.1621023792--rondagostino--fix_TestSecurityRollingUpgrade--78762fb8a/report.html Streams test failures are unrelated. The one test failure in `TestSecurityRollingUpgrade` is fixed by the followup commit in this PR (https://github.com/apache/kafka/pull/10694/commits/6e4007fa3a805fde1e9933d4c1a200b472a16137) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10494: KAFKA-12788: improve KRaft replica placement
cmccabe commented on a change in pull request #10494: URL: https://github.com/apache/kafka/pull/10494#discussion_r632806423 ## File path: metadata/src/test/java/org/apache/kafka/controller/StripedReplicaPlacerTest.java ## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.controller.StripedReplicaPlacer.BrokerList; +import org.apache.kafka.controller.StripedReplicaPlacer.RackList; +import org.apache.kafka.metadata.UsableBroker; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + + +@Timeout(value = 40) +public class StripedReplicaPlacerTest { +/** + * Test that the BrokerList class works as expected. + */ +@Test +public void testBrokerList() { +assertEquals(0, BrokerList.EMPTY.size()); +assertEquals(-1, BrokerList.EMPTY.next(1)); +BrokerList brokers = new BrokerList().add(0).add(1).add(2).add(3); +assertEquals(4, brokers.size()); +assertEquals(0, brokers.next(0)); +assertEquals(1, brokers.next(0)); +assertEquals(2, brokers.next(0)); +assertEquals(3, brokers.next(0)); +assertEquals(-1, brokers.next(0)); +assertEquals(-1, brokers.next(0)); +assertEquals(1, brokers.next(1)); +assertEquals(2, brokers.next(1)); +assertEquals(3, brokers.next(1)); +assertEquals(0, brokers.next(1)); +assertEquals(-1, brokers.next(1)); +} + +/** + * Test that we perform striped replica placement as expected, and don't use the + * fenced replica if we don't have to. + */ +@Test +public void testAvoidFencedReplicaIfPossibleOnSingleRack() { +MockRandom random = new MockRandom(); +RackList rackList = new RackList(random, Arrays.asList( +new UsableBroker(3, Optional.empty(), false), +new UsableBroker(1, Optional.empty(), true), +new UsableBroker(0, Optional.empty(), false), +new UsableBroker(4, Optional.empty(), false), +new UsableBroker(2, Optional.empty(), false)).iterator()); +assertEquals(5, rackList.numTotalBrokers()); +assertEquals(4, rackList.numUnfencedBrokers()); +assertEquals(Collections.singletonList(Optional.empty()), rackList.rackNames()); +assertThrows(InvalidReplicationFactorException.class, () -> rackList.place(0)); +assertThrows(InvalidReplicationFactorException.class, () -> rackList.place(-1)); +assertEquals(Arrays.asList(3, 4, 0, 2), rackList.place(4)); +assertEquals(Arrays.asList(4, 0, 2, 3), rackList.place(4)); +assertEquals(Arrays.asList(0, 2, 3, 4), rackList.place(4)); +assertEquals(Arrays.asList(2, 3, 4, 0), rackList.place(4)); +assertEquals(Arrays.asList(0, 4, 3, 2), rackList.place(4)); +} + +/** + * Test that we will place on the fenced replica if we need to. + */ +@Test +public void testPlacementOnFencedReplicaOnSingleRack() { +MockRandom random = new MockRandom(); +RackList rackList = new RackList(random, Arrays.asList( +new UsableBroker(3, Optional.empty(), false), +new UsableBroker(1, Optional.empty(), true), +new UsableBroker(2, Optional.empty(), false)).iterator()); +assertEquals(3, rackList.numTotalBrokers()); +assertEquals(2, rackList.numUnfencedBrokers()); +assertEquals(Collections.singletonList(Optional.empty()), rackList.rackNames()); +assertEquals(Arrays.asList(3, 2, 1), rackList.place(3)); +assertEquals(Arrays.asList(2, 3, 1), rackList.place(3)); +assertEquals(Arrays.asList(3, 2, 1), rackList.place(3)); +assertEquals(Arrays.asList(2, 3, 1), rackList.place(3)); +} + +@Test +public void testRackListWithMultipleRacks() { +MockRandom random = new MockRandom(); +R
[GitHub] [kafka] dejan2609 opened a new pull request #10698: KAFKA-12770: introduce `checkstyleVersion` build option (for overriding CheckStyle project-defined dependency version)
dejan2609 opened a new pull request #10698: URL: https://github.com/apache/kafka/pull/10698 @ijuma please review this. JIRA ticket: https://issues.apache.org/jira/browse/KAFKA-12770 Related PR/comment: https://github.com/apache/kafka/pull/10656#issuecomment-835809154 @romani how to use this (hopefully this will be merged into trunk at some point): - this command uses project-defined CheckStyle version: `./gradlew checkstyleMain checkstyleTest` - while this one overrides it: `./gradlew clean checkstyleMain checkstyleTest -PcheckstyleVersion=8.41.1` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10494: KAFKA-12788: improve KRaft replica placement
cmccabe commented on a change in pull request #10494: URL: https://github.com/apache/kafka/pull/10494#discussion_r632796785 ## File path: metadata/src/main/java/org/apache/kafka/controller/StripedReplicaPlacer.java ## @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; + +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.metadata.OptionalStringComparator; +import org.apache.kafka.metadata.UsableBroker; + + +/** + * The striped replica placer. + * + * + * GOALS + * The design of this placer attempts to satisfy a few competing goals. Firstly, we want + * to spread the replicas as evenly as we can across racks. In the simple case where + * broker racks have not been configured, this goal is a no-op, of course. But it is the + * highest priority goal in multi-rack clusters. + * + * Our second goal is to spread the replicas evenly across brokers. Since we are placing + * multiple partitions, we try to avoid putting each partition on the same set of + * replicas, even if it does satisfy the rack placement goal. However, we treat the rack + * placement goal as higher priority than this goal-- if you configure 10 brokers in rack + * A and B, and 1 broker in rack C, you will end up with a lot of partitions on that one + * broker in rack C. If you were to place a lot of partitions with replication factor 3, + * each partition would try to get a replica there. In general racks are supposed to be + * about the same size -- if they aren't, this is a user error. + * + * Thirdly, we would prefer to place replicas on unfenced brokers, rather than on fenced + * brokers. + * + * + * CONSTRAINTS + * In addition to these goals, we have two constraints. Unlike the goals, these are not + * optional -- they are mandatory. Placement will fail if a constraint cannot be + * satisfied. The first constraint is that we can't place more than one replica on the + * same broker. This imposes an upper limit on replication factor-- for example, a 3-node + * cluster can't have any topics with replication factor 4. This constraint comes from + * Kafka's internal design. + * + * The second constraint is that the leader of each partition must be an unfenced broker. + * This constraint is a bit arbitrary. In theory, we could allow people to create + * new topics even if every broker were fenced. However, this would be confusing for + * users. + * + * + * ALGORITHM + * The StripedReplicaPlacer constructor loads the broker data into rack objects. Each + * rack object contains a sorted list of fenced brokers, and a separate sorted list of + * unfenced brokers. The racks themselves are organized into a sorted list, stored inside + * the top-level RackList object. + * + * The general idea is that we place replicas on to racks in a round-robin fashion. So if + * we had racks A, B, C, and D, and we were creating a new partition with replication + * factor 3, our first replica might come from A, our second from B, and our third from C. + * Of course our placement would not be very fair if we always started with rack A. + * Therefore, we generate a random starting offset when the RackList is created. So one + * time we might go B, C, D. Another time we might go C, D, A. And so forth. + * + * Note that each partition we generate advances the starting offset by one. + * So in our 4-rack cluster, with 3 partitions, we might choose these racks: + * + * partition 1: A, B, C + * partition 2: B, C, A + * partition 3: C, A, B + * + * This is what generates the characteristic "striped" pattern of this placer. + * + * So far I haven't said anything about how we choose a replica from within a rack. In + * fact, this is also done in a round-robin fashion. So if rack A had replica A0, A1, A2, + * and A3, we might return A0 the first time, A1, the second, A2 the third, and so on. + * Just like with the racks, we add a random sta
[jira] [Updated] (KAFKA-12770) Gradle build: allow the CheckStyle version to be specified via parameter
[ https://issues.apache.org/jira/browse/KAFKA-12770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dejan Stojadinović updated KAFKA-12770: --- Summary: Gradle build: allow the CheckStyle version to be specified via parameter (was: Jenkins build: allow the CheckStyle version to be specified via parameter) > Gradle build: allow the CheckStyle version to be specified via parameter > > > Key: KAFKA-12770 > URL: https://issues.apache.org/jira/browse/KAFKA-12770 > Project: Kafka > Issue Type: Improvement > Components: build >Reporter: Dejan Stojadinović >Assignee: Dejan Stojadinović >Priority: Minor > > ^*(i) Prologue*: > [https://github.com/apache/kafka/pull/10656#issuecomment-836074067]^ > *(on) Rationale:* if we implement this CheckStyle team ([~romani] and others) > can add Kafka project to their regression suite: > [https://github.com/apache/kafka/pull/10656#issuecomment-835809154] > *Related links:* > * [https://github.com/apache/kafka/blob/2.8.0/Jenkinsfile#L28] > * [https://github.com/apache/kafka#common-build-options] > * > [https://docs.gradle.org/7.0.1/dsl/org.gradle.api.plugins.quality.CheckstyleExtension.html#org.gradle.api.plugins.quality.CheckstyleExtension:toolVersion] > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on a change in pull request #10494: KAFKA-12788: improve KRaft replica placement
cmccabe commented on a change in pull request #10494: URL: https://github.com/apache/kafka/pull/10494#discussion_r632796424 ## File path: metadata/src/main/java/org/apache/kafka/controller/StripedReplicaPlacer.java ## @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; + +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.metadata.OptionalStringComparator; +import org.apache.kafka.metadata.UsableBroker; + + +/** + * The striped replica placer. + * + * + * GOALS + * The design of this placer attempts to satisfy a few competing goals. Firstly, we want + * to spread the replicas as evenly as we can across racks. In the simple case where + * broker racks have not been configured, this goal is a no-op, of course. But it is the + * highest priority goal in multi-rack clusters. + * + * Our second goal is to spread the replicas evenly across brokers. Since we are placing + * multiple partitions, we try to avoid putting each partition on the same set of + * replicas, even if it does satisfy the rack placement goal. However, we treat the rack + * placement goal as higher priority than this goal-- if you configure 10 brokers in rack + * A and B, and 1 broker in rack C, you will end up with a lot of partitions on that one + * broker in rack C. If you were to place a lot of partitions with replication factor 3, + * each partition would try to get a replica there. In general racks are supposed to be + * about the same size -- if they aren't, this is a user error. + * + * Thirdly, we would prefer to place replicas on unfenced brokers, rather than on fenced Review comment: I added some text saying that we want new leaders to be evenly distributed if any one broker is fenced -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10690: MINOR: clarify message ordering with max in-flight requests and idempotent producer
ableegoldman commented on a change in pull request #10690: URL: https://github.com/apache/kafka/pull/10690#discussion_r632788590 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java ## @@ -247,9 +248,10 @@ public static final String ENABLE_IDEMPOTENCE_DOC = "When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer " + "retries due to broker failures, etc., may write duplicates of the retried message in the stream. " + "Note that enabling idempotence requires " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to be less than or equal to 5, " -+ "" + RETRIES_CONFIG + " to be greater than 0 and " + ACKS_CONFIG + " must be 'all'. If these values " ++ "" + RETRIES_CONFIG + " to be greater than 0, and " + ACKS_CONFIG + " must be 'all'. If these values " + "are not explicitly set by the user, suitable values will be chosen. If incompatible values are set, " -+ "a ConfigException will be thrown."; ++ "a ConfigException will be thrown. With an idempotent producer, setting the " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION ++ " greater than 1 will not break ordering guarantees."; Review comment: will do -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10690: MINOR: clarify message ordering with max in-flight requests and idempotent producer
ableegoldman commented on a change in pull request #10690: URL: https://github.com/apache/kafka/pull/10690#discussion_r632787843 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java ## @@ -201,7 +201,8 @@ public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection"; private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking." + " Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of" -+ " message re-ordering due to retries (i.e., if retries are enabled)."; ++ " message re-ordering due to retries (i.e., if retries are enabled). With an idempotent producer, this" ++ " can be up to 5 and still provide ordering guarantees."; Review comment: @ijuma done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (KAFKA-12754) TaskMetadata endOffsets does not update when the offsets are read
[ https://issues.apache.org/jira/browse/KAFKA-12754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Walker Carlson closed KAFKA-12754. -- > TaskMetadata endOffsets does not update when the offsets are read > - > > Key: KAFKA-12754 > URL: https://issues.apache.org/jira/browse/KAFKA-12754 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Walker Carlson >Assignee: Walker Carlson >Priority: Major > Fix For: 3.0.0 > > > The high water mark in StreamTask is not updated optimally. Also it would be > good to have the metadata offsets have a initial value of -1 instead of an > empty map that way the set of TopicPartitions won't change. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12754) TaskMetadata endOffsets does not update when the offsets are read
[ https://issues.apache.org/jira/browse/KAFKA-12754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17344864#comment-17344864 ] Walker Carlson commented on KAFKA-12754: Huh, I guess I was wrong. Sorry! > TaskMetadata endOffsets does not update when the offsets are read > - > > Key: KAFKA-12754 > URL: https://issues.apache.org/jira/browse/KAFKA-12754 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Walker Carlson >Assignee: Walker Carlson >Priority: Major > Fix For: 3.0.0 > > > The high water mark in StreamTask is not updated optimally. Also it would be > good to have the metadata offsets have a initial value of -1 instead of an > empty map that way the set of TopicPartitions won't change. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12754) TaskMetadata endOffsets does not update when the offsets are read
[ https://issues.apache.org/jira/browse/KAFKA-12754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Walker Carlson updated KAFKA-12754: --- Affects Version/s: (was: 2.8.0) > TaskMetadata endOffsets does not update when the offsets are read > - > > Key: KAFKA-12754 > URL: https://issues.apache.org/jira/browse/KAFKA-12754 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Walker Carlson >Assignee: Walker Carlson >Priority: Major > Fix For: 3.0.0 > > > The high water mark in StreamTask is not updated optimally. Also it would be > good to have the metadata offsets have a initial value of -1 instead of an > empty map that way the set of TopicPartitions won't change. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12754) TaskMetadata endOffsets does not update when the offsets are read
[ https://issues.apache.org/jira/browse/KAFKA-12754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17344863#comment-17344863 ] A. Sophie Blee-Goldman commented on KAFKA-12754: [~wcarlson5] are you sure? I tried to cherrypick this just now and pretty much everything had merge conflicts. It doesn't look like the feature is in 2.8 after all, for example TaskMetadata doesn't have any of the new APIs like committedOffsets(), endOffsets(), or timeCurrentIdlingStarted() > TaskMetadata endOffsets does not update when the offsets are read > - > > Key: KAFKA-12754 > URL: https://issues.apache.org/jira/browse/KAFKA-12754 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Walker Carlson >Assignee: Walker Carlson >Priority: Major > Fix For: 3.0.0 > > > The high water mark in StreamTask is not updated optimally. Also it would be > good to have the metadata offsets have a initial value of -1 instead of an > empty map that way the set of TopicPartitions won't change. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on a change in pull request #10494: KAFKA-12788: improve KRaft replica placement
cmccabe commented on a change in pull request #10494: URL: https://github.com/apache/kafka/pull/10494#discussion_r632770796 ## File path: metadata/src/test/java/org/apache/kafka/controller/StripedReplicaPlacerTest.java ## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.controller.StripedReplicaPlacer.BrokerList; +import org.apache.kafka.controller.StripedReplicaPlacer.RackList; +import org.apache.kafka.metadata.UsableBroker; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + + +@Timeout(value = 40) +public class StripedReplicaPlacerTest { +/** + * Test that the BrokerList class works as expected. + */ +@Test +public void testBrokerList() { +assertEquals(0, BrokerList.EMPTY.size()); +assertEquals(-1, BrokerList.EMPTY.next(1)); +BrokerList brokers = new BrokerList().add(0).add(1).add(2).add(3); +assertEquals(4, brokers.size()); +assertEquals(0, brokers.next(0)); +assertEquals(1, brokers.next(0)); +assertEquals(2, brokers.next(0)); +assertEquals(3, brokers.next(0)); +assertEquals(-1, brokers.next(0)); +assertEquals(-1, brokers.next(0)); +assertEquals(1, brokers.next(1)); +assertEquals(2, brokers.next(1)); +assertEquals(3, brokers.next(1)); +assertEquals(0, brokers.next(1)); +assertEquals(-1, brokers.next(1)); +} + +/** + * Test that we perform striped replica placement as expected, and don't use the + * fenced replica if we don't have to. + */ +@Test +public void testAvoidFencedReplicaIfPossibleOnSingleRack() { +MockRandom random = new MockRandom(); +RackList rackList = new RackList(random, Arrays.asList( +new UsableBroker(3, Optional.empty(), false), +new UsableBroker(1, Optional.empty(), true), +new UsableBroker(0, Optional.empty(), false), +new UsableBroker(4, Optional.empty(), false), +new UsableBroker(2, Optional.empty(), false)).iterator()); +assertEquals(5, rackList.numTotalBrokers()); +assertEquals(4, rackList.numUnfencedBrokers()); +assertEquals(Collections.singletonList(Optional.empty()), rackList.rackNames()); +assertThrows(InvalidReplicationFactorException.class, () -> rackList.place(0)); +assertThrows(InvalidReplicationFactorException.class, () -> rackList.place(-1)); +assertEquals(Arrays.asList(3, 4, 0, 2), rackList.place(4)); +assertEquals(Arrays.asList(4, 0, 2, 3), rackList.place(4)); +assertEquals(Arrays.asList(0, 2, 3, 4), rackList.place(4)); +assertEquals(Arrays.asList(2, 3, 4, 0), rackList.place(4)); +assertEquals(Arrays.asList(0, 4, 3, 2), rackList.place(4)); +} + +/** + * Test that we will place on the fenced replica if we need to. + */ +@Test +public void testPlacementOnFencedReplicaOnSingleRack() { +MockRandom random = new MockRandom(); +RackList rackList = new RackList(random, Arrays.asList( +new UsableBroker(3, Optional.empty(), false), +new UsableBroker(1, Optional.empty(), true), +new UsableBroker(2, Optional.empty(), false)).iterator()); +assertEquals(3, rackList.numTotalBrokers()); +assertEquals(2, rackList.numUnfencedBrokers()); +assertEquals(Collections.singletonList(Optional.empty()), rackList.rackNames()); +assertEquals(Arrays.asList(3, 2, 1), rackList.place(3)); +assertEquals(Arrays.asList(2, 3, 1), rackList.place(3)); +assertEquals(Arrays.asList(3, 2, 1), rackList.place(3)); +assertEquals(Arrays.asList(2, 3, 1), rackList.place(3)); +} + +@Test +public void testRackListWithMultipleRacks() { +MockRandom random = new MockRandom(); +R
[GitHub] [kafka] cmccabe commented on a change in pull request #10494: KAFKA-12788: improve KRaft replica placement
cmccabe commented on a change in pull request #10494: URL: https://github.com/apache/kafka/pull/10494#discussion_r632769701 ## File path: metadata/src/test/java/org/apache/kafka/controller/StripedReplicaPlacerTest.java ## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.controller.StripedReplicaPlacer.BrokerList; +import org.apache.kafka.controller.StripedReplicaPlacer.RackList; +import org.apache.kafka.metadata.UsableBroker; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + + +@Timeout(value = 40) +public class StripedReplicaPlacerTest { +/** + * Test that the BrokerList class works as expected. + */ +@Test +public void testBrokerList() { +assertEquals(0, BrokerList.EMPTY.size()); +assertEquals(-1, BrokerList.EMPTY.next(1)); +BrokerList brokers = new BrokerList().add(0).add(1).add(2).add(3); +assertEquals(4, brokers.size()); +assertEquals(0, brokers.next(0)); +assertEquals(1, brokers.next(0)); +assertEquals(2, brokers.next(0)); +assertEquals(3, brokers.next(0)); +assertEquals(-1, brokers.next(0)); +assertEquals(-1, brokers.next(0)); +assertEquals(1, brokers.next(1)); +assertEquals(2, brokers.next(1)); +assertEquals(3, brokers.next(1)); +assertEquals(0, brokers.next(1)); +assertEquals(-1, brokers.next(1)); +} + +/** + * Test that we perform striped replica placement as expected, and don't use the + * fenced replica if we don't have to. + */ +@Test +public void testAvoidFencedReplicaIfPossibleOnSingleRack() { +MockRandom random = new MockRandom(); +RackList rackList = new RackList(random, Arrays.asList( +new UsableBroker(3, Optional.empty(), false), +new UsableBroker(1, Optional.empty(), true), +new UsableBroker(0, Optional.empty(), false), +new UsableBroker(4, Optional.empty(), false), +new UsableBroker(2, Optional.empty(), false)).iterator()); +assertEquals(5, rackList.numTotalBrokers()); +assertEquals(4, rackList.numUnfencedBrokers()); +assertEquals(Collections.singletonList(Optional.empty()), rackList.rackNames()); +assertThrows(InvalidReplicationFactorException.class, () -> rackList.place(0)); +assertThrows(InvalidReplicationFactorException.class, () -> rackList.place(-1)); +assertEquals(Arrays.asList(3, 4, 0, 2), rackList.place(4)); +assertEquals(Arrays.asList(4, 0, 2, 3), rackList.place(4)); +assertEquals(Arrays.asList(0, 2, 3, 4), rackList.place(4)); +assertEquals(Arrays.asList(2, 3, 4, 0), rackList.place(4)); +assertEquals(Arrays.asList(0, 4, 3, 2), rackList.place(4)); +} + +/** + * Test that we will place on the fenced replica if we need to. + */ +@Test +public void testPlacementOnFencedReplicaOnSingleRack() { +MockRandom random = new MockRandom(); +RackList rackList = new RackList(random, Arrays.asList( +new UsableBroker(3, Optional.empty(), false), +new UsableBroker(1, Optional.empty(), true), +new UsableBroker(2, Optional.empty(), false)).iterator()); +assertEquals(3, rackList.numTotalBrokers()); +assertEquals(2, rackList.numUnfencedBrokers()); +assertEquals(Collections.singletonList(Optional.empty()), rackList.rackNames()); +assertEquals(Arrays.asList(3, 2, 1), rackList.place(3)); +assertEquals(Arrays.asList(2, 3, 1), rackList.place(3)); +assertEquals(Arrays.asList(3, 2, 1), rackList.place(3)); Review comment: Because broker 1 is fenced, we don't place a replica there until we need to (there are no more replicas remaining). So it will always be the last / least preferred rep
[jira] [Commented] (KAFKA-12754) TaskMetadata endOffsets does not update when the offsets are read
[ https://issues.apache.org/jira/browse/KAFKA-12754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17344857#comment-17344857 ] Walker Carlson commented on KAFKA-12754: [~ableegoldman] I think picking back to 2.8 would be good > TaskMetadata endOffsets does not update when the offsets are read > - > > Key: KAFKA-12754 > URL: https://issues.apache.org/jira/browse/KAFKA-12754 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Walker Carlson >Assignee: Walker Carlson >Priority: Major > Fix For: 3.0.0 > > > The high water mark in StreamTask is not updated optimally. Also it would be > good to have the metadata offsets have a initial value of -1 instead of an > empty map that way the set of TopicPartitions won't change. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12754) TaskMetadata endOffsets does not update when the offsets are read
[ https://issues.apache.org/jira/browse/KAFKA-12754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Walker Carlson updated KAFKA-12754: --- Affects Version/s: 2.8.0 > TaskMetadata endOffsets does not update when the offsets are read > - > > Key: KAFKA-12754 > URL: https://issues.apache.org/jira/browse/KAFKA-12754 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Walker Carlson >Assignee: Walker Carlson >Priority: Major > Fix For: 3.0.0 > > > The high water mark in StreamTask is not updated optimally. Also it would be > good to have the metadata offsets have a initial value of -1 instead of an > empty map that way the set of TopicPartitions won't change. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12788) Improve KRaft replica placement
Colin McCabe created KAFKA-12788: Summary: Improve KRaft replica placement Key: KAFKA-12788 URL: https://issues.apache.org/jira/browse/KAFKA-12788 Project: Kafka Issue Type: Improvement Reporter: Colin McCabe Assignee: Colin McCabe Implement the existing Kafka replica placement algorithm for KRaft. This also means implementing rack awareness. Previously, we just chose replicas randomly in a non-rack-aware fashion. Also, allow replicas to be placed on fenced brokers if there are no other choices. This was specified in KIP-631 but previously not implemented. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on pull request #10494: KAFKA-12788: improve KRaft replica placement
cmccabe commented on pull request #10494: URL: https://github.com/apache/kafka/pull/10494#issuecomment-841470406 > @cmccabe : Thanks for the PR. A few comments below. Also, we probably want to associate a jira with the PR since it's a bit large. Fair. I created KAFKA-12788 for this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12471) Implement createPartitions in KIP-500 mode
[ https://issues.apache.org/jira/browse/KAFKA-12471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe resolved KAFKA-12471. -- Fix Version/s: 3.0.0 Resolution: Fixed > Implement createPartitions in KIP-500 mode > -- > > Key: KAFKA-12471 > URL: https://issues.apache.org/jira/browse/KAFKA-12471 > Project: Kafka > Issue Type: New Feature >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Major > Labels: kip-500 > Fix For: 3.0.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12334) Add the KIP-631 metadata shell
[ https://issues.apache.org/jira/browse/KAFKA-12334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe resolved KAFKA-12334. -- Fix Version/s: 2.8.0 Resolution: Fixed Added in 2.8 > Add the KIP-631 metadata shell > -- > > Key: KAFKA-12334 > URL: https://issues.apache.org/jira/browse/KAFKA-12334 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Major > Labels: kip-500 > Fix For: 2.8.0 > > > interactively examine the metadata stored in a KIP-500 cluster. > It can read the metadata from the controllers directly, by connecting to > them, or from a metadata snapshot on disk. In the former case, the > quorum voters must be specified by passing the --controllers flag; in > the latter case, the snapshot file should be specified via --snapshot. > The metadata tool works by replaying the log and storing the state into > in-memory nodes. These nodes are presented in a fashion similar to > filesystem directories. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12275) KIP-500: Remove controllerOnly restriction from the DecommissionBroker API
[ https://issues.apache.org/jira/browse/KAFKA-12275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe resolved KAFKA-12275. -- Fix Version/s: 2.8.0 Assignee: Colin McCabe Resolution: Fixed DecomissionBroker was replaced by UnregisterBroker, and this is handled on brokers, not just controllers. > KIP-500: Remove controllerOnly restriction from the DecommissionBroker API > -- > > Key: KAFKA-12275 > URL: https://issues.apache.org/jira/browse/KAFKA-12275 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Alok Nikhil >Assignee: Colin McCabe >Priority: Minor > Labels: kip-500 > Fix For: 2.8.0 > > > We need to fix this to forward the request to the active controller. > Considering this is a KIP-500 only API and we don't have an implementation > yet, we are terminating the connection instead for now. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe merged pull request #10688: KAFKA-12778: Fix QuorumController request timeouts and electLeaders
cmccabe merged pull request #10688: URL: https://github.com/apache/kafka/pull/10688 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12787) Configure and integrate controller snapshot with the RaftClient
Jose Armando Garcia Sancio created KAFKA-12787: -- Summary: Configure and integrate controller snapshot with the RaftClient Key: KAFKA-12787 URL: https://issues.apache.org/jira/browse/KAFKA-12787 Project: Kafka Issue Type: Sub-task Reporter: Jose Armando Garcia Sancio Assignee: Jose Armando Garcia Sancio -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-12647) Implement loading snapshot in the broker
[ https://issues.apache.org/jira/browse/KAFKA-12647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Armando Garcia Sancio reassigned KAFKA-12647: -- Assignee: Colin McCabe (was: Jose Armando Garcia Sancio) > Implement loading snapshot in the broker > > > Key: KAFKA-12647 > URL: https://issues.apache.org/jira/browse/KAFKA-12647 > Project: Kafka > Issue Type: Sub-task >Reporter: Jose Armando Garcia Sancio >Assignee: Colin McCabe >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12754) TaskMetadata endOffsets does not update when the offsets are read
[ https://issues.apache.org/jira/browse/KAFKA-12754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-12754. Fix Version/s: 3.0.0 Resolution: Fixed > TaskMetadata endOffsets does not update when the offsets are read > - > > Key: KAFKA-12754 > URL: https://issues.apache.org/jira/browse/KAFKA-12754 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Walker Carlson >Assignee: Walker Carlson >Priority: Major > Fix For: 3.0.0 > > > The high water mark in StreamTask is not updated optimally. Also it would be > good to have the metadata offsets have a initial value of -1 instead of an > empty map that way the set of TopicPartitions won't change. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12754) TaskMetadata endOffsets does not update when the offsets are read
[ https://issues.apache.org/jira/browse/KAFKA-12754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17344835#comment-17344835 ] A. Sophie Blee-Goldman commented on KAFKA-12754: [~wcarlson5] was this feature released in 2.8 and needs this fix to be cherrypicked back, or is it just in 3.0? > TaskMetadata endOffsets does not update when the offsets are read > - > > Key: KAFKA-12754 > URL: https://issues.apache.org/jira/browse/KAFKA-12754 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Walker Carlson >Assignee: Walker Carlson >Priority: Major > Fix For: 3.0.0 > > > The high water mark in StreamTask is not updated optimally. Also it would be > good to have the metadata offsets have a initial value of -1 instead of an > empty map that way the set of TopicPartitions won't change. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman merged pull request #10634: KAFKA-12754: Improve endOffsets for TaskMetadata
ableegoldman merged pull request #10634: URL: https://github.com/apache/kafka/pull/10634 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10634: KAFKA-12754: Improve endOffsets for TaskMetadata
ableegoldman commented on pull request #10634: URL: https://github.com/apache/kafka/pull/10634#issuecomment-841450435 Looks like just the usual flaky test failures in the build: `RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions()` and `KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on pull request #10697: MINOR: Add @cluster annotation to StreamsNamedRepartitionTopicTest
rondagostino commented on pull request #10697: URL: https://github.com/apache/kafka/pull/10697#issuecomment-841448877 Tested locally to confirm that the added size is the correct one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino opened a new pull request #10697: MINOR: Add @cluster annotation to StreamsNamedRepartitionTopicTest
rondagostino opened a new pull request #10697: URL: https://github.com/apache/kafka/pull/10697 The StreamsNamedRepartitionTopicTest system tests did not have the `@cluster` annotation and was therefore taking up the entire cluster. For example, we see this in the log output: `kafkatest.tests.streams.streams_named_repartition_topic_test.StreamsNamedRepartitionTopicTest.test_upgrade_topology_with_named_repartition_topic is using entire cluster. It's possible this test has no associated cluster metadata.` This PR adds the missing annotation. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dejan2609 commented on pull request #10606: KAFKA-12728: version upgrades: gradle (6.8.3 -->> 7.0.2) and gradle shadow plugin (6.1.0 -->> 7.0.0)
dejan2609 commented on pull request #10606: URL: https://github.com/apache/kafka/pull/10606#issuecomment-841421148 Test result on Jenkins shows no failures. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #10688: KAFKA-12778: Fix QuorumController request timeouts and electLeaders
cmccabe commented on pull request #10688: URL: https://github.com/apache/kafka/pull/10688#issuecomment-841410862 > Generally, I don't like that we need to explicitly extract the timeout from the request and pass it into the controller call. It would nice if this could be generalized or automated somehow. However, something like this is not needed for this minor fix. If we were starting all over again, we could give all RPC requests a timeout, located in the request header itself. But that isn't really the case today -- some requests don't have timeouts, some do, and we have to extract it from the specific request schema. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah opened a new pull request #10696: KAFKA-12777 Refactor and cleanup AutoTopicCreationManager
mumrah opened a new pull request #10696: URL: https://github.com/apache/kafka/pull/10696 Rather than using multiple optional class members to determine if we are in ZK or KRaft mode, use inheritance. The factory method in AutoTopicCreationManager now makes the decision about which mode we're in and provides only the needed dependencies to the concrete classes (no optionals). ```scala object AutoTopicCreationManager { def apply( config: KafkaConfig, channelManager: BrokerToControllerChannelManager, metadataSupport: MetadataSupport, groupCoordinator: GroupCoordinator, txnCoordinator: TransactionCoordinator, ): AutoTopicCreationManager = { metadataSupport match { case zk: ZkSupport => new ZkAutoTopicCreationManager(config, zk, groupCoordinator, txnCoordinator) case _: RaftSupport => new DefaultAutoTopicCreationManager(config, channelManager, groupCoordinator, txnCoordinator) } } } ``` This also adds some error handling to the response handler when running in KRaft mode (KAFKA-12777). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10688: KAFKA-12778: Fix QuorumController request timeouts and electLeaders
cmccabe commented on a change in pull request #10688: URL: https://github.com/apache/kafka/pull/10688#discussion_r632697059 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -800,19 +800,44 @@ void handleBrokerUnfenced(int brokerId, long brokerEpoch, List records = new ArrayList<>(); ElectLeadersResponseData response = new ElectLeadersResponseData(); -for (TopicPartitions topic : request.topicPartitions()) { -ReplicaElectionResult topicResults = -new ReplicaElectionResult().setTopic(topic.topic()); -response.replicaElectionResults().add(topicResults); -for (int partitionId : topic.partitions()) { -ApiError error = electLeader(topic.topic(), partitionId, uncleanOk, records); -topicResults.partitionResult().add(new PartitionResult(). -setPartitionId(partitionId). -setErrorCode(error.error().code()). -setErrorMessage(error.message())); +if (request.topicPartitions() == null) { +// If topicPartitions is null, we try to elect a new leader for every partition. +// There are some obvious issues with this wire protocol. For example, what +// if we have too many partitions to fit the results in a single RPC? Or what +// if we generate too many records to fit in a single batch? This behavior Review comment: Thinking about this more, it doesn't seem necessary to do this atomically. We can just do it non-atomically. It's not a logically indivisible operation like creating a topic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10688: KAFKA-12778: Fix QuorumController request timeouts and electLeaders
cmccabe commented on a change in pull request #10688: URL: https://github.com/apache/kafka/pull/10688#discussion_r632696250 ## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ## @@ -1165,4 +1192,22 @@ public long curClaimEpoch() { public void close() throws InterruptedException { queue.close(); } + +// VisibleForTesting +CountDownLatch pause() { +final CountDownLatch latch = new CountDownLatch(1); +appendControlEvent("pause", () -> { +try { +latch.await(); +} catch (InterruptedException e) { +log.info("Interrupted while waiting for unpause.", e); +} +}); +return latch; +} + +// VisibleForTesting +Time time() { +return time; +} Review comment: We have several helper classes for creating quorum controllers, so getting access to the time parameter that way would be difficult. Anyway, this is package-private, so it really only applies to unit tests specifically. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10688: KAFKA-12778: Fix QuorumController request timeouts and electLeaders
cmccabe commented on a change in pull request #10688: URL: https://github.com/apache/kafka/pull/10688#discussion_r632695450 ## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ## @@ -1030,17 +1050,24 @@ private QuorumController(LogContext logContext, @Override public CompletableFuture alterPartitionReassignments(AlterPartitionReassignmentsRequestData request) { -CompletableFuture future = new CompletableFuture<>(); -future.completeExceptionally(new UnsupportedOperationException()); -return future; +if (request.topics().isEmpty()) { +return CompletableFuture.completedFuture(new AlterPartitionReassignmentsResponseData()); +} +return appendWriteEvent("alterPartitionReassignments", +time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs(), MILLISECONDS), +() -> { +throw new UnsupportedOperationException(); +}); } @Override public CompletableFuture listPartitionReassignments(ListPartitionReassignmentsRequestData request) { -CompletableFuture future = new CompletableFuture<>(); -future.completeExceptionally(new UnsupportedOperationException()); -return future; +return appendReadEvent("listPartitionReassignments", +time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs(), MILLISECONDS), +() -> { +throw new UnsupportedOperationException(); +}); Review comment: agreed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10688: KAFKA-12778: Fix QuorumController request timeouts and electLeaders
cmccabe commented on a change in pull request #10688: URL: https://github.com/apache/kafka/pull/10688#discussion_r632695254 ## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ## @@ -1002,7 +1013,13 @@ private QuorumController(LogContext logContext, @Override public CompletableFuture electLeaders(ElectLeadersRequestData request) { -return appendWriteEvent("electLeaders", request.timeoutMs(), +// If topicPartitions is null, we will try to trigger a new leader election on +// all partitions (!). But if it's empty, there is nothing to do. +if (request.topicPartitions() != null && request.topicPartitions().isEmpty()) { +return CompletableFuture.completedFuture(new ElectLeadersResponseData()); +} +return appendWriteEvent("electLeaders", +time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs(), MILLISECONDS), Review comment: This is a slightly different fix, although sort of related. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #10664: KAFKA-12749: Changelog topic config on suppressed KTable lost
wcarlson5 commented on a change in pull request #10664: URL: https://github.com/apache/kafka/pull/10664#discussion_r632673138 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java ## @@ -46,13 +48,13 @@ public void bufferBuilderShouldBeConsistent() { assertThat( "keys alone should be set", maxRecords(2L), -is(new EagerBufferConfigImpl(2L, MAX_VALUE)) +is(new EagerBufferConfigImpl(2L, MAX_VALUE, Collections.emptyMap())) Review comment: @vichu Can you add some tests that don't just use an empty map? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe opened a new pull request #10695: KAFKA-12783: Remove the deprecated ZK-based partition reassignment API
cmccabe opened a new pull request #10695: URL: https://github.com/apache/kafka/pull/10695 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino opened a new pull request #10694: MINOR: fix system test TestSecurityRollingUpgrade
rondagostino opened a new pull request #10694: URL: https://github.com/apache/kafka/pull/10694 SecurityConfig for a Kafka cluster in a system test is cached due to https://github.com/apache/kafka/pull/8917, but we mutate the security config during some system tests, and those mutations were not being passed through after-the-fact. These system tests were not testing what they were supposed to be testing. This patch passes through the potential changes so that we again test what we are supposed to be testing. Also, since we became very specific about what SASL mechanisms to enable when updating the system tests for KRaft, we need to explicitly explicitly indicate to the SecurityConfig any additional SASL mechanisms that we want to enable. This was always necessary once we made the KRaft changes, but it was not apparent due to the above bug (where mutations were not being passed through). This patch provides a way to pass additional SASL mechanisms to the SecurityConfig by adding an option `sasl_mechanism` to KafkaListener -- this is what gets passed into the SecurityConfig when we enable a new security protocol in the middle of a system test. *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-3968) fsync() is not called on parent directory when new FileMessageSet is flushed to disk
[ https://issues.apache.org/jira/browse/KAFKA-3968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17344701#comment-17344701 ] Jun Rao commented on KAFKA-3968: Merged the reworked PR [https://github.com/apache/kafka/pull/10680] to trunk. > fsync() is not called on parent directory when new FileMessageSet is flushed > to disk > > > Key: KAFKA-3968 > URL: https://issues.apache.org/jira/browse/KAFKA-3968 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.9.0.1, 0.10.0.0 > Environment: Linux, ext4 filesystem >Reporter: Andrey Neporada >Assignee: Cong Ding >Priority: Major > Labels: reliability > Fix For: 3.0.0 > > > Kafka does not call fsync() on directory when new log segment is created and > flushed to disk. > The problem is that following sequence of calls doesn't guarantee file > durability: > fd = open("log", O_RDWR | O_CREATE); // suppose open creates "log" > write(fd); > fsync(fd); > If system crashes after fsync() but before parent directory have been flushed > to disk, the log file can disappear. > This is true at least for ext4 on Linux. > Proposed solution is to flush directory when flush() is called for the first > time. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] junrao merged pull request #10680: Rework on KAFKA-3968: fsync the parent directory of a segment file when the file is created
junrao merged pull request #10680: URL: https://github.com/apache/kafka/pull/10680 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12761) Consumer offsets are deleted 7 days after last offset commit instead of EMPTY status
[ https://issues.apache.org/jira/browse/KAFKA-12761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-12761: Component/s: (was: streams) > Consumer offsets are deleted 7 days after last offset commit instead of EMPTY > status > > > Key: KAFKA-12761 > URL: https://issues.apache.org/jira/browse/KAFKA-12761 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.5.0, 2.7.0 >Reporter: Tomasz Kaszuba >Priority: Major > Attachments: log.txt > > > If I understand correctly the following > [KIP-211|https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets] > consumer offsets should only be cleared based on having an Empty status: > {{Empty}}: The field {{current_state_timestamp}} is set to when group last > transitioned to this state. If the group stays in this for > {{offsets.retention.minutes}}, the following offset cleanup scheduled task > will remove all offsets in the group (as explained above). > After a week of not consuming any new messages BUT still connected to the > consumer group I had the consumer offsets deleted on restart of the k8s pod. > {noformat} > 2021-05-06 10:10:04.684 INFO 1 --- [ncurred-pattern] > o.a.k.c.c.internals.ConsumerCoordinator : [Consumer > clientId=ieb-x07-baseline-pc-data-storage-incurred-pattern-86c84635-4c96-4941-b440-5ecd4584d3fd-StreamThread-1-consumer, > groupId=ieb-x07-baseline-pc-data-storage-incurred-pattern] Found no > committed offset for partition ieb.publish.baseline_pc.incurred_pattern-0 > {noformat} > I looked at what is happening in the the system topic __consumer_offsets and > I see the following: > {noformat} > 17138150 2021-04-27 07:14:50 > [ieb-x07-baseline-pc-data-storage-due-pattern,ieb.publish.baseline_pc.due_pattern,0]::OffsetAndMetadata(offset=646, > leaderEpoch=Optional.empty, metadata=AQAAAXkOMJr2, > commitTimestamp=1619500490253, expireTimestamp=None) > 53670252 2021-05-03 17:44:11 > ieb-x07-baseline-pc-data-storage-due-pattern::GroupMetadata(groupId=ieb-x07-baseline-pc-data-storage-due-pattern, > generation=13, protocolType=Some(consumer), currentState=Stable, > members=Map(ieb-x07-baseline-pc-data-storage-due-pattern-a8f21ea3-3bc0-4dc9-b82c-6b88f9c74008-StreamThread-1-consumer-50603fe4-10f7-432e-b306-115329e82b38 > -> > MemberMetadata(memberId=ieb-x07-baseline-pc-data-storage-due-pattern-a8f21ea3-3bc0-4dc9-b82c-6b88f9c74008-StreamThread-1-consumer-50603fe4-10f7-432e-b306-115329e82b38, > groupInstanceId=Some(null), > clientId=ieb-x07-baseline-pc-data-storage-due-pattern-a8f21ea3-3bc0-4dc9-b82c-6b88f9c74008-StreamThread-1-consumer, > clientHost=/172.23.194.239, sessionTimeoutMs=1, > rebalanceTimeoutMs=30, supportedProtocols=List(stream), ))) > 65226775 2021-05-06 11:56:13 > ieb-x07-baseline-pc-data-storage-due-pattern::GroupMetadata(groupId=ieb-x07-baseline-pc-data-storage-due-pattern, > generation=14, protocolType=Some(consumer), currentState=Empty, > members=Map()) > 65226793 2021-05-06 12:10:00 > [ieb-x07-baseline-pc-data-storage-due-pattern,ieb.publish.baseline_pc.due_pattern,0]::NULL > 65226795 2021-05-06 12:10:03 > ieb-x07-baseline-pc-data-storage-due-pattern::GroupMetadata(groupId=ieb-x07-baseline-pc-data-storage-due-pattern, > generation=15, protocolType=Some(consumer), currentState=Stable, > members=Map(ieb-x07-baseline-pc-data-storage-due-pattern-0fb01327-b21e-4be7-851a-9985e381f8b8-StreamThread-1-consumer-efb312c3-9c24-4088-a5e0-563a3d52c944 > -> > MemberMetadata(memberId=ieb-x07-baseline-pc-data-storage-due-pattern-0fb01327-b21e-4be7-851a-9985e381f8b8-StreamThread-1-consumer-efb312c3-9c24-4088-a5e0-563a3d52c944, > groupInstanceId=Some(null), > clientId=ieb-x07-baseline-pc-data-storage-due-pattern-0fb01327-b21e-4be7-851a-9985e381f8b8-StreamThread-1-consumer, > clientHost=/172.23.193.184, sessionTimeoutMs=1, > rebalanceTimeoutMs=30, supportedProtocols=List(stream), ))) > 65226809 2021-05-06 12:10:09 > [ieb-x07-baseline-pc-data-storage-due-pattern,ieb.publish.baseline_pc.due_pattern,0]::OffsetAndMetadata(offset=2, > leaderEpoch=Optional.empty, metadata=AQAAAXlBJ/Sy, > commitTimestamp=1620295809338, expireTimestamp=None) {noformat} > As you can see the last commited offset was on the 27th of April but the > group still had status "Stable" on the 3rd of May. It transitioned to "Empty" > on the 6th of May when the pod was restarted. Following this you can see the > tombstone message set to delete the offsets which corresponds to the streams > logs. (UTC+2). > For me it looks like the cleanup only took the last commit timestamp into > consideration and not the Stable status. Am I misunderstanding how this > should work? > The
[GitHub] [kafka] wcarlson5 commented on pull request #10634: KAFKA-12754: Improve endOffsets for TaskMetadata
wcarlson5 commented on pull request #10634: URL: https://github.com/apache/kafka/pull/10634#issuecomment-841342581 @ableegoldman I think it ready :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #10634: KAFKA-12754: Improve endOffsets for TaskMetadata
wcarlson5 commented on a change in pull request #10634: URL: https://github.com/apache/kafka/pull/10634#discussion_r632637377 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java ## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.integration; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.TaskMetadata; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkObjectProperties; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +@Category(IntegrationTest.class) +public class TaskMetadataIntegrationTest { + +public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, new Properties(), 0L, 0L); + +@BeforeClass +public static void startCluster() throws IOException { +CLUSTER.start(); +} + +@AfterClass +public static void closeCluster() { +CLUSTER.stop(); +} +public static final Duration DEFAULT_DURATION = Duration.ofSeconds(30); + +@Rule +public TestName testName = new TestName(); + +private String inputTopic; +private static StreamsBuilder builder; +private static Properties properties; +private static String appId = "TaskMetadataTest_"; +private AtomicBoolean process; +private AtomicBoolean commit; + +@Before +public void setup() { +final String testId = safeUniqueTestName(getClass(), testName); +appId = appId + testId; +inputTopic = "input" + testId; +IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic); + +builder = new StreamsBuilder(); + +process = new AtomicBoolean(true); +commit = new AtomicBoolean(true); + +final KStream stream = builder.stream(inputTopic); +stream.process(PauseProcessor::new); + +properties = mkObjectProperties( +mkMap( +mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), +mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId), +mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()), +mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2), +mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class), + mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class), +mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1L) +) +); +} + +@Test +public void shouldReportCorrectCommittedOffsetInformati
[GitHub] [kafka] mumrah commented on a change in pull request #10688: KAFKA-12778: Fix QuorumController request timeouts and electLeaders
mumrah commented on a change in pull request #10688: URL: https://github.com/apache/kafka/pull/10688#discussion_r632616511 ## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ## @@ -1002,7 +1013,13 @@ private QuorumController(LogContext logContext, @Override public CompletableFuture electLeaders(ElectLeadersRequestData request) { -return appendWriteEvent("electLeaders", request.timeoutMs(), +// If topicPartitions is null, we will try to trigger a new leader election on +// all partitions (!). But if it's empty, there is nothing to do. +if (request.topicPartitions() != null && request.topicPartitions().isEmpty()) { +return CompletableFuture.completedFuture(new ElectLeadersResponseData()); +} +return appendWriteEvent("electLeaders", +time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs(), MILLISECONDS), Review comment: Is this related to timeouts, or is it just a different fix? ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -800,19 +800,44 @@ void handleBrokerUnfenced(int brokerId, long brokerEpoch, List records = new ArrayList<>(); ElectLeadersResponseData response = new ElectLeadersResponseData(); -for (TopicPartitions topic : request.topicPartitions()) { -ReplicaElectionResult topicResults = -new ReplicaElectionResult().setTopic(topic.topic()); -response.replicaElectionResults().add(topicResults); -for (int partitionId : topic.partitions()) { -ApiError error = electLeader(topic.topic(), partitionId, uncleanOk, records); -topicResults.partitionResult().add(new PartitionResult(). -setPartitionId(partitionId). -setErrorCode(error.error().code()). -setErrorMessage(error.message())); +if (request.topicPartitions() == null) { +// If topicPartitions is null, we try to elect a new leader for every partition. +// There are some obvious issues with this wire protocol. For example, what +// if we have too many partitions to fit the results in a single RPC? Or what +// if we generate too many records to fit in a single batch? This behavior Review comment: Do we need all the records in a single batch, or could we create a batch for each topic (including its partitions)? ## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ## @@ -1165,4 +1192,22 @@ public long curClaimEpoch() { public void close() throws InterruptedException { queue.close(); } + +// VisibleForTesting +CountDownLatch pause() { +final CountDownLatch latch = new CountDownLatch(1); +appendControlEvent("pause", () -> { +try { +latch.await(); +} catch (InterruptedException e) { +log.info("Interrupted while waiting for unpause.", e); +} +}); +return latch; +} + +// VisibleForTesting +Time time() { +return time; +} Review comment: Do we need this? Can't we just use the Time we pass into the constructor in tests? Not a big deal really, just wondering ## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ## @@ -1030,17 +1050,24 @@ private QuorumController(LogContext logContext, @Override public CompletableFuture alterPartitionReassignments(AlterPartitionReassignmentsRequestData request) { -CompletableFuture future = new CompletableFuture<>(); -future.completeExceptionally(new UnsupportedOperationException()); -return future; +if (request.topics().isEmpty()) { +return CompletableFuture.completedFuture(new AlterPartitionReassignmentsResponseData()); +} +return appendWriteEvent("alterPartitionReassignments", +time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs(), MILLISECONDS), +() -> { +throw new UnsupportedOperationException(); +}); } @Override public CompletableFuture listPartitionReassignments(ListPartitionReassignmentsRequestData request) { -CompletableFuture future = new CompletableFuture<>(); -future.completeExceptionally(new UnsupportedOperationException()); -return future; +return appendReadEvent("listPartitionReassignments", +time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs(), MILLISECONDS), +() -> { +throw new UnsupportedOperationException(); +}); Review comment: nit: we can do without the curly braces here and above. However, these will soon be repl
[GitHub] [kafka] lbradstreet commented on a change in pull request #10620: KAFKA-12736: KafkaProducer.flush holds onto completed ProducerBatch(s) until flush completes
lbradstreet commented on a change in pull request #10620: URL: https://github.com/apache/kafka/pull/10620#discussion_r632614924 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ## @@ -710,8 +710,11 @@ private boolean appendsInProgress() { */ public void awaitFlushCompletion() throws InterruptedException { try { -for (ProducerBatch batch : this.incomplete.copyAll()) -batch.produceFuture.await(); +// Obtain a copy of all of the incomplete ProduceRequestResult(s) the time of the flush. +// We must be careful not to hold a reference to the ProduceBatch(s) so that garbage +// collection can occur on the contents. Review comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dejan2609 commented on pull request #10606: KAFKA-12728: version upgrades: gradle (6.8.3 -->> 7.0.2) and gradle shadow plugin (6.1.0 -->> 7.0.0)
dejan2609 commented on pull request #10606: URL: https://github.com/apache/kafka/pull/10606#issuecomment-841303016 Bumped/force-pushed (I took the liberty to skipped local testing on my end just to save some time). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10606: KAFKA-12728: version upgrades: gradle (6.8.3 -->> 7.0.1) and gradle shadow plugin (6.1.0 -->> 7.0.0)
ijuma commented on pull request #10606: URL: https://github.com/apache/kafka/pull/10606#issuecomment-841294495 Let's bump it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dejan2609 edited a comment on pull request #10606: KAFKA-12728: version upgrades: gradle (6.8.3 -->> 7.0.1) and gradle shadow plugin (6.1.0 -->> 7.0.0)
dejan2609 edited a comment on pull request #10606: URL: https://github.com/apache/kafka/pull/10606#issuecomment-841291321 FYI/note: Gradle 7.0.2 hotfix is released few hours ago (it solves one corner case on Alpine Linux): https://github.com/gradle/gradle/releases/tag/v7.0.2 Let me know if you want me to bump Gradle to 7.0.2 (but it also makes sense to skip this, because issue affects only one Linux distro). I would bump it although... please have your say @ijuma -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dejan2609 edited a comment on pull request #10606: KAFKA-12728: version upgrades: gradle (6.8.3 -->> 7.0.1) and gradle shadow plugin (6.1.0 -->> 7.0.0)
dejan2609 edited a comment on pull request #10606: URL: https://github.com/apache/kafka/pull/10606#issuecomment-841291321 FYI/note: Gradle 7.0.2 hotfix is released few hours ago (it solves one corner case on Alpine Linux): https://github.com/gradle/gradle/releases/tag/v7.0.2 Let me know if you want me to bump Gradle to 7.0.2 (but it also makes sense to skip this, change affects only one Linux distro). I would bump it although... please have your say @ijuma -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dejan2609 commented on pull request #10606: KAFKA-12728: version upgrades: gradle (6.8.3 -->> 7.0.1) and gradle shadow plugin (6.1.0 -->> 7.0.0)
dejan2609 commented on pull request #10606: URL: https://github.com/apache/kafka/pull/10606#issuecomment-841291321 FYI/note: Gradle 7.0.2 hotfix is released few hours ago (it solves one corner case on Alpine Linux): https://github.com/gradle/gradle/releases/tag/v7.0.2 Let me know if you want me to bump Gradle (but it also makes sense to skip this). Please have your say @ijuma -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kpatelatwork commented on a change in pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits
kpatelatwork commented on a change in pull request #10528: URL: https://github.com/apache/kafka/pull/10528#discussion_r632541366 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java ## @@ -105,6 +105,11 @@ public void remove(ConnectorTaskId id) { } private void commit(WorkerSourceTask workerTask) { +if (!workerTask.shouldCommitOffsets()) { Review comment: makes sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12776) Producer sends messages out-of-order inspite of enabling idempotence
[ https://issues.apache.org/jira/browse/KAFKA-12776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17344595#comment-17344595 ] Tom Bentley commented on KAFKA-12776: - [~neeraj.vaidya] can you reproduce this without using Spring, just the KafkaProducer? > Producer sends messages out-of-order inspite of enabling idempotence > > > Key: KAFKA-12776 > URL: https://issues.apache.org/jira/browse/KAFKA-12776 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.6.0, 2.7.0 > Environment: Linux RHEL 7.9 and Ubuntu 20.04 >Reporter: NEERAJ VAIDYA >Priority: Major > Attachments: mocker.zip > > > I have an Apache Kafka 2.6 Producer which writes to topic-A (TA). > My application is basically a Spring boot web-application which accepts JSON > payloads via HTTP and then pushes each to a Kafka topic. I also use Spring > Cloud Stream Kafka in the application to create and use a Producer. > For one of my failure handling test cases, I shutdown the Kafka cluster while > my applications are running. (Note : No messages have been published to the > Kafka cluster before I stop the cluster) > When the producer application tries to write messages to TA, it cannot > because the cluster is down and hence (I assume) buffers the messages. Let's > say it receives 4 messages m1,m2,m3,m4 in increasing time order. (i.e. m1 is > first and m4 is last). > When I bring the Kafka cluster back online, the producer sends the buffered > messages to the topic, but they are not in order. I receive for example, m2 > then m3 then m1 and then m4. > Why is that ? Is it because the buffering in the producer is multi-threaded > with each producing to the topic at the same time ? > My project code is attached herewith. > I can confirm that I have enabled idempotence. I have also tried with > ```max.in.flight.requests=1``` -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma commented on a change in pull request #10690: MINOR: clarify message ordering with max in-flight requests and idempotent producer
ijuma commented on a change in pull request #10690: URL: https://github.com/apache/kafka/pull/10690#discussion_r632510173 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java ## @@ -247,9 +248,10 @@ public static final String ENABLE_IDEMPOTENCE_DOC = "When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer " + "retries due to broker failures, etc., may write duplicates of the retried message in the stream. " + "Note that enabling idempotence requires " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to be less than or equal to 5, " -+ "" + RETRIES_CONFIG + " to be greater than 0 and " + ACKS_CONFIG + " must be 'all'. If these values " ++ "" + RETRIES_CONFIG + " to be greater than 0, and " + ACKS_CONFIG + " must be 'all'. If these values " + "are not explicitly set by the user, suitable values will be chosen. If incompatible values are set, " -+ "a ConfigException will be thrown."; ++ "a ConfigException will be thrown. With an idempotent producer, setting the " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION ++ " greater than 1 will not break ordering guarantees."; Review comment: Instead of an additional sentence at the end, could we modify the original sentence to also mention ordering? ## File path: clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java ## @@ -247,9 +248,10 @@ public static final String ENABLE_IDEMPOTENCE_DOC = "When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer " + "retries due to broker failures, etc., may write duplicates of the retried message in the stream. " + "Note that enabling idempotence requires " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to be less than or equal to 5, " -+ "" + RETRIES_CONFIG + " to be greater than 0 and " + ACKS_CONFIG + " must be 'all'. If these values " ++ "" + RETRIES_CONFIG + " to be greater than 0, and " + ACKS_CONFIG + " must be 'all'. If these values " + "are not explicitly set by the user, suitable values will be chosen. If incompatible values are set, " -+ "a ConfigException will be thrown."; ++ "a ConfigException will be thrown. With an idempotent producer, setting the " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION ++ " greater than 1 will not break ordering guarantees."; Review comment: Instead of an additional sentence at the end, could we modify the first sentence to also mention ordering? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #10690: MINOR: clarify message ordering with max in-flight requests and idempotent producer
ijuma commented on a change in pull request #10690: URL: https://github.com/apache/kafka/pull/10690#discussion_r632509174 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java ## @@ -201,7 +201,8 @@ public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection"; private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking." + " Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of" -+ " message re-ordering due to retries (i.e., if retries are enabled)."; ++ " message re-ordering due to retries (i.e., if retries are enabled). With an idempotent producer, this" ++ " can be up to 5 and still provide ordering guarantees."; Review comment: The `enable.idempotence` will be true by default in 3.0, so we should perhaps have the "up to 5" thing first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-12776) Producer sends messages out-of-order inspite of enabling idempotence
[ https://issues.apache.org/jira/browse/KAFKA-12776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17344517#comment-17344517 ] NEERAJ VAIDYA edited comment on KAFKA-12776 at 5/14/21, 12:51 PM: -- Hi Tom The topic has just 1 partition. And just clarify, all messages have the same key. Yet they get reordered. was (Author: neeraj.vaidya): Hi Tom The topic has just 1 partition. > Producer sends messages out-of-order inspite of enabling idempotence > > > Key: KAFKA-12776 > URL: https://issues.apache.org/jira/browse/KAFKA-12776 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.6.0, 2.7.0 > Environment: Linux RHEL 7.9 and Ubuntu 20.04 >Reporter: NEERAJ VAIDYA >Priority: Major > Attachments: mocker.zip > > > I have an Apache Kafka 2.6 Producer which writes to topic-A (TA). > My application is basically a Spring boot web-application which accepts JSON > payloads via HTTP and then pushes each to a Kafka topic. I also use Spring > Cloud Stream Kafka in the application to create and use a Producer. > For one of my failure handling test cases, I shutdown the Kafka cluster while > my applications are running. (Note : No messages have been published to the > Kafka cluster before I stop the cluster) > When the producer application tries to write messages to TA, it cannot > because the cluster is down and hence (I assume) buffers the messages. Let's > say it receives 4 messages m1,m2,m3,m4 in increasing time order. (i.e. m1 is > first and m4 is last). > When I bring the Kafka cluster back online, the producer sends the buffered > messages to the topic, but they are not in order. I receive for example, m2 > then m3 then m1 and then m4. > Why is that ? Is it because the buffering in the producer is multi-threaded > with each producing to the topic at the same time ? > My project code is attached herewith. > I can confirm that I have enabled idempotence. I have also tried with > ```max.in.flight.requests=1``` -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-12776) Producer sends messages out-of-order inspite of enabling idempotence
[ https://issues.apache.org/jira/browse/KAFKA-12776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17344517#comment-17344517 ] NEERAJ VAIDYA edited comment on KAFKA-12776 at 5/14/21, 12:51 PM: -- Hi [~tombentley] The topic has just 1 partition. And just clarify, all messages have the same key. Yet they get reordered. was (Author: neeraj.vaidya): Hi Tom The topic has just 1 partition. And just clarify, all messages have the same key. Yet they get reordered. > Producer sends messages out-of-order inspite of enabling idempotence > > > Key: KAFKA-12776 > URL: https://issues.apache.org/jira/browse/KAFKA-12776 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.6.0, 2.7.0 > Environment: Linux RHEL 7.9 and Ubuntu 20.04 >Reporter: NEERAJ VAIDYA >Priority: Major > Attachments: mocker.zip > > > I have an Apache Kafka 2.6 Producer which writes to topic-A (TA). > My application is basically a Spring boot web-application which accepts JSON > payloads via HTTP and then pushes each to a Kafka topic. I also use Spring > Cloud Stream Kafka in the application to create and use a Producer. > For one of my failure handling test cases, I shutdown the Kafka cluster while > my applications are running. (Note : No messages have been published to the > Kafka cluster before I stop the cluster) > When the producer application tries to write messages to TA, it cannot > because the cluster is down and hence (I assume) buffers the messages. Let's > say it receives 4 messages m1,m2,m3,m4 in increasing time order. (i.e. m1 is > first and m4 is last). > When I bring the Kafka cluster back online, the producer sends the buffered > messages to the topic, but they are not in order. I receive for example, m2 > then m3 then m1 and then m4. > Why is that ? Is it because the buffering in the producer is multi-threaded > with each producing to the topic at the same time ? > My project code is attached herewith. > I can confirm that I have enabled idempotence. I have also tried with > ```max.in.flight.requests=1``` -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12786) Getting SslTransportLayerTest error
Sibelle created KAFKA-12786: --- Summary: Getting SslTransportLayerTest error Key: KAFKA-12786 URL: https://issues.apache.org/jira/browse/KAFKA-12786 Project: Kafka Issue Type: Bug Components: unit tests Environment: Ububtu 20.04 Reporter: Sibelle Attachments: Error.png SaslAuthenticatorTest > testRepeatedValidSaslPlainOverSsl() PASSED org.apache.kafka.common.network.SslTransportLayerTest.testUnsupportedTLSVersion(Args)[1] failed, log available in /kafka/clients/build/reports/testOutput/org.apache.kafka.common.network.SslTransportLayerTest.testUnsupportedTLSVersion(Args)[1].test.stdout SslTransportLayerTest > [1] tlsProtocol=TLSv1.2, useInlinePem=false FAILED org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. Metric not updated failed-authentication-total expected:<1.0> but was:<0.0> ==> expected: but was: at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:193) at org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:320) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:368) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:317) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:301) at org.apache.kafka.common.network.NioEchoServer.waitForMetrics(NioEchoServer.java:196) at org.apache.kafka.common.network.NioEchoServer.verifyAuthenticationMetrics(NioEchoServer.java:155) at org.apache.kafka.common.network.SslTransportLayerTest.testUnsupportedTLSVersion(SslTransportLayerTest.java:644) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] AndroideRob commented on a change in pull request #10591: Fix minor bugs in the existing documentation
AndroideRob commented on a change in pull request #10591: URL: https://github.com/apache/kafka/pull/10591#discussion_r632489696 ## File path: docs/ops.html ## @@ -78,7 +78,7 @@ auto.leader.rebalance.enable=true You can also set this to false, but you will then need to manually restore leadership to the restored replicas by running the command: -> bin/kafka-preferred-replica-election.sh --bootstrap-server broker_host:port +> bin/kafka-leader-election.sh --bootstrap-server broker_host:port Review comment: @showuon hey, do you have any feedback on this? thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #10693: KAFKA-12625: Fix the NOTICE file
jlprat commented on pull request #10693: URL: https://github.com/apache/kafka/pull/10693#issuecomment-841197433 Failures were https://issues.apache.org/jira/browse/KAFKA-9009 and https://issues.apache.org/jira/browse/KAFKA-12629 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12774) kafka-streams 2.8: logging in uncaught-exceptionhandler doesn't go through log4j
[ https://issues.apache.org/jira/browse/KAFKA-12774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17344537#comment-17344537 ] Jørgen commented on KAFKA-12774: It actually works as expected when throwing from selectKey 👍 So it seems like a real cornercase. Here is the log when throwing from selectKey (formatted as json as expected): {code:java} {"instant":{"epochSecond":1620990806,"nanoOfSecond":307831000},"thread":"sb-gp-ms-template-a66ab1de-100a-43a3-ba73-1ea077ed95b5-StreamThread-1","level":"INFO","loggerName":"org.example.kafka.KafkaConfig","message":"Peeked key=key1 value=value1","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","contextMap":{},"threadId":111,"threadPriority":5,"timestamp":"2021-05-14T13:13:26.307+0200"} {"instant":{"epochSecond":1620990806,"nanoOfSecond":309238000},"thread":"sb-gp-ms-template-a66ab1de-100a-43a3-ba73-1ea077ed95b5-StreamThread-1","level":"ERROR","loggerName":"org.apache.kafka.streams.processor.internals.TaskManager","message":"stream-thread [sb-gp-ms-template-a66ab1de-100a-43a3-ba73-1ea077ed95b5-StreamThread-1] Failed to process stream task 0_0 due to the following error:","thrown":{"commonElementCount":0,"localizedMessage":"Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-00, topic=foo, partition=0, offset=0, stacktrace=java.lang.RuntimeException: test-exception\n\tat org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:31)\n\tat org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:18)\n\tat org.apache.kafka.streams.kstream.internals.KStreamImpl.lambda$internalSelectKey$0(KStreamImpl.java:236)\n\tat org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)\n\tat org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)\n\tat org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)\n\tat org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\tat org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)\n\tat org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)\n\tat org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)\n","message":"Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-00, topic=foo, partition=0, offset=0, stacktrace=java.lang.RuntimeException: test-exception\n\tat org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:31)\n\tat org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:18)\n\tat org.apache.kafka.streams.kstream.internals.KStreamImpl.lambda$internalSelectKey$0(KStreamImpl.java:236)\n\tat org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)\n\tat org.apache.ka
[GitHub] [kafka] jlprat commented on a change in pull request #10693: KAFKA-12625: Fix the NOTICE file
jlprat commented on a change in pull request #10693: URL: https://github.com/apache/kafka/pull/10693#discussion_r632410626 ## File path: LICENSE-binary ## @@ -243,7 +243,6 @@ netty-handler-4.1.59.Final netty-resolver-4.1.59.Final netty-transport-4.1.59.Final netty-transport-native-epoll-4.1.59.Final -netty-transport-native-epoll-4.1.59.Final Review comment: I removed this line from LICENSE-binary as it was a duplicate one -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #10651: MINOR: Kafka Streams code samples formating unification
jlprat commented on pull request #10651: URL: https://github.com/apache/kafka/pull/10651#issuecomment-841165081 @cadonna Would you like me to split this PR into several ones? One per file, for example? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12776) Producer sends messages out-of-order inspite of enabling idempotence
[ https://issues.apache.org/jira/browse/KAFKA-12776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17344517#comment-17344517 ] NEERAJ VAIDYA commented on KAFKA-12776: --- Hi Tom The topic has just 1 partition. > Producer sends messages out-of-order inspite of enabling idempotence > > > Key: KAFKA-12776 > URL: https://issues.apache.org/jira/browse/KAFKA-12776 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.6.0, 2.7.0 > Environment: Linux RHEL 7.9 and Ubuntu 20.04 >Reporter: NEERAJ VAIDYA >Priority: Major > Attachments: mocker.zip > > > I have an Apache Kafka 2.6 Producer which writes to topic-A (TA). > My application is basically a Spring boot web-application which accepts JSON > payloads via HTTP and then pushes each to a Kafka topic. I also use Spring > Cloud Stream Kafka in the application to create and use a Producer. > For one of my failure handling test cases, I shutdown the Kafka cluster while > my applications are running. (Note : No messages have been published to the > Kafka cluster before I stop the cluster) > When the producer application tries to write messages to TA, it cannot > because the cluster is down and hence (I assume) buffers the messages. Let's > say it receives 4 messages m1,m2,m3,m4 in increasing time order. (i.e. m1 is > first and m4 is last). > When I bring the Kafka cluster back online, the producer sends the buffered > messages to the topic, but they are not in order. I receive for example, m2 > then m3 then m1 and then m4. > Why is that ? Is it because the buffering in the producer is multi-threaded > with each producing to the topic at the same time ? > My project code is attached herewith. > I can confirm that I have enabled idempotence. I have also tried with > ```max.in.flight.requests=1``` -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10201) Update codebase to use more inclusive terms
[ https://issues.apache.org/jira/browse/KAFKA-10201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17344516#comment-17344516 ] Omnia Ibrahim commented on KAFKA-10201: --- [~xvrl] if you still need help with the remaining items I can pick up one of them. > Update codebase to use more inclusive terms > --- > > Key: KAFKA-10201 > URL: https://issues.apache.org/jira/browse/KAFKA-10201 > Project: Kafka > Issue Type: Improvement >Reporter: Xavier Léauté >Priority: Major > Fix For: 3.0.0 > > > see the corresponding KIP > https://cwiki.apache.org/confluence/display/KAFKA/KIP-629:+Use+racially+neutral+terms+in+our+codebase -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jlprat commented on a change in pull request #10693: KAFKA-12625: Fix the NOTICE file
jlprat commented on a change in pull request #10693: URL: https://github.com/apache/kafka/pull/10693#discussion_r632410626 ## File path: LICENSE-binary ## @@ -243,7 +243,6 @@ netty-handler-4.1.59.Final netty-resolver-4.1.59.Final netty-transport-4.1.59.Final netty-transport-native-epoll-4.1.59.Final -netty-transport-native-epoll-4.1.59.Final Review comment: I removed this line from LICENCE-binary as it was a duplicate one -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #10693: KAFKA-12625: Fix the NOTICE file
jlprat commented on pull request #10693: URL: https://github.com/apache/kafka/pull/10693#issuecomment-841123185 I would highly appreciate any hint on how to proceed for the dependencies that do not include a notice file in their jar files but might include them in their source code repositories. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat opened a new pull request #10693: KAFKA-12625: Fix the NOTICE file
jlprat opened a new pull request #10693: URL: https://github.com/apache/kafka/pull/10693 Adds new NOTICE-binary file and packages it in the binary release This follows up the https://github.com/apache/kafka/pull/10474 pull where LICENSE was fix. Similarly as in that PR, I do not know if this is correct, and I would add the same disclaimer @vvcephei did: "Please make no assumption that I know what I'm doing and let me know if anything seems wrong." I went through all jar files within the distribution file and copied the content of any existing NOTICE file. Notes: * For the cases where several dependencies including the same NOTICE, I added it only once. * There are cases where the jar included in Kafka's distribution file doesn't contain any NOTICE file, hence nothing was copied. These are: * activation-1.1.1.jar * argparse4j-0.7.0.jar * jackson-annotations-2.10.5.jar * jackson-dataformat-csv-2.10.5.jar * jackson-datatype-jdk8-2.10.5.jar * jackson-jaxrs-base-2.10.5.jar * jackson-module-scala_2.13-2.10.5.jar * jakarta.validation-api-2.0.2.jar * javassist-3.27.0-GA.jar * javax.servlet-api-3.1.0.jar * javax.ws.rs-api-2.1.1.jar * jaxb-api-2.3.0.jar * jline-3.12.1.jar * jopt-simple-5.0.4.jar * lz4-java-1.7.1.jar * metrics-core-2.2.0.jar * netty-buffer-4.1.62.Final.jar * netty-codec-4.1.62.Final.jar * netty-common-4.1.62.Final.jar * netty-handler-4.1.62.Final.jar * netty-resolver-4.1.62.Final.jar * netty-transport-4.1.62.Final.jar * netty-transport-native-epoll-4.1.62.Final.jar * netty-transport-native-unix-common-4.1.62.Final.jar * reflections-0.9.12.jar * rocksdbjni-6.19.3.jar * scala-collection-compat_2.13-2.3.0.jar * scala-java8-compat_2.13-0.9.1.jar * scala-logging_2.13-3.9.2.jar * slf4j-api-1.7.30.jar * slf4j-log4j12-1.7.30.jar * snappy-java-1.1.8.1.jar * zstd-jni-1.4.9-1.jar * For some of those _NOTICE-missing-in-jar_ dependencies, there were other dependencies from the same project that included a NOTICE file, for example, the jackson ones. * For the Netty project, I manually copied their NOTICE file from github ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12784) ssl kafka failed
Trofimov created KAFKA-12784: Summary: ssl kafka failed Key: KAFKA-12784 URL: https://issues.apache.org/jira/browse/KAFKA-12784 Project: Kafka Issue Type: Task Components: config, consumer, KafkaConnect Affects Versions: 2.8.0 Reporter: Trofimov Fix For: 2.8.0 *kafka version:* kafka_2.13- 2.8.0 i have problem with ssl kafka. I can't figure out how ssl.endpoint.identification.algorithm = works because everything works fine for me if this parameter is empty. If I put it https, I will have problems "_no subject alternative dns name matching_" with brokers. *My dns name 1 server:* [root@zeus1 /home/trofimov-im]# nslookup IP_ADDR IP_ADDR.in-addr.arpa name = zeus1.bbk.strf.ru. I removed unnecessary *cert in truststore:* Keystore type: jks Keystore provider: SUN Your keystore contains 7 entries Alias name: caroot Creation date: May 11, 2021 Entry type: trustedCertEntry Owner: CN=Enterprise CA 2, DC=bbk, DC=strf, DC=ru Issuer: CN=Root CA, O=bbk, C=RU *** *** Alias name: zeus1.cert Creation date: May 11, 2021 Entry type: PrivateKeyEntry Certificate chain length: 1 Certificate[1]: Owner: CN=zeus1.bbk.strf.ru, OU=SCS, O=BBK of Russia, L=Moscow, ST=Moscow, C=RU Issuer: CN=Enterprise CA 2, DC=bbk, DC=strf, DC=ru Serial number: 1d0007b167a6fd474142f6b79f0007b167 Valid from: Tue Apr 27 19:33:52 MSK 2021 until: Mon Nov 20 14:19:00 MSK 2023 Certificate fingerprints: MD5: 85:E5:4F:30:A6:A1:0E:A0:8B:7E:70:1C:2B:01:65:BA SHA1: 84:20:E8:0E:8E:24:EB:E4:93:92:7B:D1:61:3B:75:A9:D8:83:12:DE SHA256: E6:3D:4E:BD:93:22:B5:4E:28:5A:78:F6:B8:53:1B:BF:6C:39:3D:FC:EB:CF:F8:62:FC:DA:9B:BE:59:4E:F6:EE Signature algorithm name: SHA256withRSA Subject Public Key Algorithm: 2048-bit RSA key Version: 3 #8: ObjectId: 2.5.29.17 Criticality=false SubjectAlternativeName [ DNSName: scs-kafka.bbk.strf.ru DNSName: *.scs-kafka.bbk.strf.ru DNSName: scs-kafka DNSName: *.scs-kafka DNSName: zeus1.bbk.strf.ru DNSName: *.zeus1.bbk.strf.ru DNSName: zeus1 DNSName: *.zeus1 ] *** *** Alias name: zeus2.cert Creation date: May 11, 2021 Entry type: PrivateKeyEntry Certificate chain length: 1 Certificate[1]: Owner: CN=zeus2.bbk.strf.ru, OU=SCS, O=BBK of Russia, L=Moscow, ST=Moscow, C=RU Issuer: CN=Enterprise CA 2, DC=bbk, DC=strf, DC=ru Serial number: 1d0007b169e5e4f88b66d2e1ce0007b169 Valid from: Tue Apr 27 19:35:28 MSK 2021 until: Mon Nov 20 14:19:00 MSK 2023 Certificate fingerprints: MD5: 98:19:39:A9:DF:73:61:EB:17:30:BB:40:75:16:CE:0A SHA1: 81:0E:77:60:31:77:FC:5A:5C:E3:5F:45:F5:97:C6:84:F0:7B:DB:B5 SHA256: 8D:89:2D:B0:AA:9B:8E:95:D0:54:42:E9:E2:6D:67:FC:7A:6E:F4:50:58:76:F4:F7:0E:F5:D6:F7:A8:C1:5D:51 Signature algorithm name: SHA256withRSA Subject Public Key Algorithm: 2048-bit RSA key Version: 3 #8: ObjectId: 2.5.29.17 Criticality=false SubjectAlternativeName [ DNSName: scs-kafka.bbk.strf.ru DNSName: *.scs-kafka.bbk.strf.ru DNSName: scs-kafka DNSName: *.scs-kafka DNSName: zeus2.bbk.strf.ru DNSName: *.zeus2.bbk.strf.ru DNSName: zeus2 DNSName: *.zeus2 ] *** *** *keystore is the same* *The configuration is like this:* ssl.keystore.location=/home/kafka/kafka.server.keystore.jks ssl.keystore.password=password ssl.key.password= password ssl.truststore.location=/home/kafka/kafka.server.truststore.jks ssl.truststore.password= password ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 ssl.keystore.type=JKS ssl.truststore.type=JKS security.inter.broker.protocol=SSL ssl.client.auth=required ssl.endpoint.identification.algorithm= *What's wrong, where to dig?* -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-6987) Reimplement KafkaFuture with CompletableFuture
[ https://issues.apache.org/jira/browse/KAFKA-6987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tom Bentley updated KAFKA-6987: --- Fix Version/s: 3.0.0 > Reimplement KafkaFuture with CompletableFuture > -- > > Key: KAFKA-6987 > URL: https://issues.apache.org/jira/browse/KAFKA-6987 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.0.0 >Reporter: Andras Beni >Assignee: Tom Bentley >Priority: Minor > Fix For: 3.0.0 > > > KafkaFuture documentation states: > {{This will eventually become a thin shim on top of Java 8's > CompletableFuture.}} > With Java 7 support dropped in 2.0, it is time to get rid of custom code. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mimaison commented on pull request #10660: MINOR: Updating files with release 2.7.1
mimaison commented on pull request #10660: URL: https://github.com/apache/kafka/pull/10660#issuecomment-841103731 Thanks @mjsax, I can see the artifacts now. I've rekicked the build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #10646: KAFKA-8897 Follow-up: Consolidate the global state stores
cadonna commented on a change in pull request #10646: URL: https://github.com/apache/kafka/pull/10646#discussion_r632371505 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -128,8 +129,7 @@ public void setGlobalProcessorContext(final InternalProcessorContext globalProce } final Set changelogTopics = new HashSet<>(); -for (final StateStore stateStore : globalStateStores) { -globalStoreNames.add(stateStore.name()); +for (final StateStore stateStore : topology.globalStateStores()) { final String sourceTopic = storeToChangelogTopic.get(stateStore.name()); changelogTopics.add(sourceTopic); stateStore.init((StateStoreContext) globalProcessorContext, stateStore); Review comment: On a second thought, it might also be relevant for production code since we now can restart the stream thread after a fatal error. This is not yet possible for a global stream thread, but it might be possible in future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10690: MINOR: clarify message ordering with max in-flight requests and idempotent producer
ableegoldman commented on pull request #10690: URL: https://github.com/apache/kafka/pull/10690#issuecomment-841084469 Wow, all checks actually passed! That's the second completely green build I've seen in 24 hours, must be my lucky day 😄 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10690: MINOR: clarify message ordering with max in-flight requests and idempotent producer
ableegoldman commented on a change in pull request #10690: URL: https://github.com/apache/kafka/pull/10690#discussion_r632355592 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java ## @@ -247,9 +248,10 @@ public static final String ENABLE_IDEMPOTENCE_DOC = "When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer " + "retries due to broker failures, etc., may write duplicates of the retried message in the stream. " + "Note that enabling idempotence requires " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to be less than or equal to 5, " -+ "" + RETRIES_CONFIG + " to be greater than 0 and " + ACKS_CONFIG + " must be 'all'. If these values " ++ "" + RETRIES_CONFIG + " to be greater than 0, and " + ACKS_CONFIG + " must be 'all'. If these values " + "are not explicitly set by the user, suitable values will be chosen. If incompatible values are set, " -+ "a ConfigException will be thrown."; ++ "a ConfigException will be thrown. With an idempotent producer, setting the " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION ++ " greater than 1 will not break ordering guarantees."; Review comment: That requirement is covered a few sentences before this one, it might be cut off by Github. I think it's clear that by "greater than 1", we just mean "between 1 and 5, the maximum allowable limit as stated above" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-12625) Fix the NOTICE file
[ https://issues.apache.org/jira/browse/KAFKA-12625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josep Prat reassigned KAFKA-12625: -- Assignee: Josep Prat > Fix the NOTICE file > --- > > Key: KAFKA-12625 > URL: https://issues.apache.org/jira/browse/KAFKA-12625 > Project: Kafka > Issue Type: Task >Reporter: John Roesler >Assignee: Josep Prat >Priority: Blocker > Fix For: 3.0.0, 2.8.1 > > > In https://issues.apache.org/jira/browse/KAFKA-12602, we fixed the license > file, and in the comments, Justin noted that we really should fix the NOTICE > file as well. > Basically, we need to look though each of the packaged dependencies and > transmit each of their NOTICEs (for Apache2 deps) or otherwise, any copyright > notices they assert. > It would be good to consider automating a check for this as well (see > https://issues.apache.org/jira/browse/KAFKA-12622) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12625) Fix the NOTICE file
[ https://issues.apache.org/jira/browse/KAFKA-12625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17344377#comment-17344377 ] Josep Prat commented on KAFKA-12625: [~vvcephei] As I see no assignee neither any PR in Github on this, I would like to take this task. > Fix the NOTICE file > --- > > Key: KAFKA-12625 > URL: https://issues.apache.org/jira/browse/KAFKA-12625 > Project: Kafka > Issue Type: Task >Reporter: John Roesler >Priority: Blocker > Fix For: 3.0.0, 2.8.1 > > > In https://issues.apache.org/jira/browse/KAFKA-12602, we fixed the license > file, and in the comments, Justin noted that we really should fix the NOTICE > file as well. > Basically, we need to look though each of the packaged dependencies and > transmit each of their NOTICEs (for Apache2 deps) or otherwise, any copyright > notices they assert. > It would be good to consider automating a check for this as well (see > https://issues.apache.org/jira/browse/KAFKA-12622) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12776) Producer sends messages out-of-order inspite of enabling idempotence
[ https://issues.apache.org/jira/browse/KAFKA-12776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17344376#comment-17344376 ] Tom Bentley commented on KAFKA-12776: - [~neeraj.vaidya] how many partitions does the topic have? I don't see it stated so far. Kafka only guarantees ordering within a partition, so if records are written to multiple partitions you will only observe order being preserved for records sent to the same partition. > Producer sends messages out-of-order inspite of enabling idempotence > > > Key: KAFKA-12776 > URL: https://issues.apache.org/jira/browse/KAFKA-12776 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.6.0, 2.7.0 > Environment: Linux RHEL 7.9 and Ubuntu 20.04 >Reporter: NEERAJ VAIDYA >Priority: Major > Attachments: mocker.zip > > > I have an Apache Kafka 2.6 Producer which writes to topic-A (TA). > My application is basically a Spring boot web-application which accepts JSON > payloads via HTTP and then pushes each to a Kafka topic. I also use Spring > Cloud Stream Kafka in the application to create and use a Producer. > For one of my failure handling test cases, I shutdown the Kafka cluster while > my applications are running. (Note : No messages have been published to the > Kafka cluster before I stop the cluster) > When the producer application tries to write messages to TA, it cannot > because the cluster is down and hence (I assume) buffers the messages. Let's > say it receives 4 messages m1,m2,m3,m4 in increasing time order. (i.e. m1 is > first and m4 is last). > When I bring the Kafka cluster back online, the producer sends the buffered > messages to the topic, but they are not in order. I receive for example, m2 > then m3 then m1 and then m4. > Why is that ? Is it because the buffering in the producer is multi-threaded > with each producing to the topic at the same time ? > My project code is attached herewith. > I can confirm that I have enabled idempotence. I have also tried with > ```max.in.flight.requests=1``` -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dejan2609 edited a comment on pull request #10606: KAFKA-12728: version upgrades: gradle (6.8.3 -->> 7.0.1) and gradle shadow plugin (6.1.0 -->> 7.0.0)
dejan2609 edited a comment on pull request #10606: URL: https://github.com/apache/kafka/pull/10606#issuecomment-841055689 PR rebased, commits are squashed into one; tests on Jenkins are looking good: it seems that only one unrelated test failed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dejan2609 commented on pull request #10606: KAFKA-12728: version upgrades: gradle (6.8.3 -->> 7.0.1) and gradle shadow plugin (6.1.0 -->> 7.0.0)
dejan2609 commented on pull request #10606: URL: https://github.com/apache/kafka/pull/10606#issuecomment-841055689 It seems that only one unrelated test failed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org