[GitHub] [kafka] dengziming commented on pull request #10701: KAFKA-10437; Fix omitted TODO of KIP-478

2021-05-14 Thread GitBox


dengziming commented on pull request #10701:
URL: https://github.com/apache/kafka/pull/10701#issuecomment-841594749


   Hello @vvcephei, PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming opened a new pull request #10701: KAFKA-10437; Fix omitted TODO of KIP-478

2021-05-14 Thread GitBox


dengziming opened a new pull request #10701:
URL: https://github.com/apache/kafka/pull/10701


   *More detailed description of your change*
   It seems that #9396 leaves out a TODO, just fix it.
   
   *Summary of testing strategy (including rationale)*
   QA
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] zhaohaidao opened a new pull request #10700: KAFKA-12789: Remove Stale comments for meta response handling logic

2021-05-14 Thread GitBox


zhaohaidao opened a new pull request #10700:
URL: https://github.com/apache/kafka/pull/10700


   According to my understanding, the following paragraph looks like a stale 
comments.
   
   > public void handleSuccessfulResponse(RequestHeader requestHeader, long 
now, MetadataResponse response) {
   ...
   // Don't update the cluster if there are no valid nodes...the 
topic we want may still be in the process of being
   // created which means we will get errors and no nodes until it 
exists
   if (response.brokers().isEmpty()) {
   log.trace("Ignoring empty metadata response with correlation 
id {}.", requestHeader.correlationId());
   this.metadata.failedUpdate(now);
   } else {
   this.metadata.update(inProgress.requestVersion, response, 
inProgress.isPartialUpdate, now);
   }
   ...
   
   The comments above mean we will may get errors and no nodes if the topic we 
want may still be in the process of being created.
   However, every meta request will return all brokers from the logic of the 
server side, just as followed
   
   >   def handleTopicMetadataRequest(request: RequestChannel.Request): Unit = {
   ...
   val brokers = metadataCache.getAliveBrokers
   ...
 }
   
   
   I studied the related git commit history and figured out why.
   
   1. This comments was first introduced in KAFKA-642 (e11447650a). which means 
meta request only need brokers related to the topics we want.
   2. KAFKA-1535 (commitId: 4ebcdfd51f) changed the server side logic. which 
has the metadata response contain all alive brokers rather than just the ones 
needed for the given topics.
   3. However, this comments are retained till now.
   So According to my understanding, this comments looks like a stale one and 
can be removed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12776) Producer sends messages out-of-order inspite of enabling idempotence

2021-05-14 Thread NEERAJ VAIDYA (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17344958#comment-17344958
 ] 

NEERAJ VAIDYA commented on KAFKA-12776:
---

[~ableegoldman], [~ijuma], [~tombentley]

I think I kind of know why this issue occurs. Here is my take:

This issue or ordering mainly happens when there are multiple requests which 
are being handled in parallel by the KafkaProducer and when metadata is being 
fetched at the time of processing the request.

For example, consider a web service which accepts HTTP requests on the upstream 
side. For each request (R1,R2,R3,Rn), if a KafkaProducer#send is invoked, and 
if the Kafka cluster is down AND metadata is not available, then this will 
cause each of R1,Rn to be blocked in a metadata fetch call.

When the cluster is brought back online (within max.block.ms), then the first 
request which succeeds in getting the metadata will successfully get pushed 
onto the Producer buffer and eventually to the Topic.

The only way to control ordering when multiple parallel requests which are 
stuck in a Metadata fetch would be to ensure that they are all serviced by a 
single thread/queue ? I would have imagined that this must already be 
happening, but apparently not.

 

> Producer sends messages out-of-order inspite of enabling idempotence
> 
>
> Key: KAFKA-12776
> URL: https://issues.apache.org/jira/browse/KAFKA-12776
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.6.0, 2.7.0
> Environment: Linux RHEL 7.9 and Ubuntu 20.04
>Reporter: NEERAJ VAIDYA
>Priority: Major
> Attachments: mocker.zip
>
>
> I have an Apache Kafka 2.6 Producer which writes to topic-A (TA). 
> My application is basically a Spring boot web-application which accepts JSON 
> payloads via HTTP and then pushes each to a Kafka topic. I also use Spring 
> Cloud Stream Kafka in the application to create and use a Producer.
> For one of my failure handling test cases, I shutdown the Kafka cluster while 
> my applications are running. (Note : No messages have been published to the 
> Kafka cluster before I stop the cluster)
> When the producer application tries to write messages to TA, it cannot 
> because the cluster is down and hence (I assume) buffers the messages. Let's 
> say it receives 4 messages m1,m2,m3,m4 in increasing time order. (i.e. m1 is 
> first and m4 is last).
> When I bring the Kafka cluster back online, the producer sends the buffered 
> messages to the topic, but they are not in order. I receive for example, m2 
> then m3 then m1 and then m4.
> Why is that ? Is it because the buffering in the producer is multi-threaded 
> with each producing to the topic at the same time ?
> My project code is attached herewith.
> I can confirm that I have enabled idempotence. I have also tried with 
> ```max.in.flight.requests=1```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12789) Remove Stale comments for meta response handling logic

2021-05-14 Thread HaiyuanZhao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

HaiyuanZhao updated KAFKA-12789:

Description: 
According to my understanding, the following paragraph looks like a stale 
comments.
{code:java}
public void handleSuccessfulResponse(RequestHeader requestHeader, long now, 
MetadataResponse response) {
...
// Don't update the cluster if there are no valid nodes...the topic 
we want may still be in the process of being
// created which means we will get errors and no nodes until it 
exists
if (response.brokers().isEmpty()) {
log.trace("Ignoring empty metadata response with correlation id 
{}.", requestHeader.correlationId());
this.metadata.failedUpdate(now);
} else {
this.metadata.update(inProgress.requestVersion, response, 
inProgress.isPartialUpdate, now);
}
...
{code}
The comments above mean we will may get errors and no nodes if the topic we 
want may still be in the process of being created.
 However, every meta request will return all brokers from the logic of the 
server side, just as followed
{code:java}
  def handleTopicMetadataRequest(request: RequestChannel.Request): Unit = {
...
val brokers = metadataCache.getAliveBrokers
...
  }
{code}
I studied the related git commit history and figured out why.
 # This comments was first introduced in KAFKA-642 (e11447650a). which means 
meta request only need brokers related to the topics we want.
 # KAFKA-1535 (commitId: 4ebcdfd51f) changed the server side logic. which has 
the metadata response contain all alive brokers rather than just the ones 
needed for the given topics.
 # However, this comments are retained till now.

So According to my understanding, this comments looks like a stale one and can 
be removed.

  was:
According to my understanding, the following paragraph looks like a stale 
comments.
{code:java}
public void handleSuccessfulResponse(RequestHeader requestHeader, long now, 
MetadataResponse response) {
...
// Don't update the cluster if there are no valid nodes...the topic 
we want may still be in the process of being
// created which means we will get errors and no nodes until it 
exists
if (response.brokers().isEmpty()) {
log.trace("Ignoring empty metadata response with correlation id 
{}.", requestHeader.correlationId());
this.metadata.failedUpdate(now);
} else {
this.metadata.update(inProgress.requestVersion, response, 
inProgress.isPartialUpdate, now);
}
...
{code}
The comments above mean we will may get errors and no nodes if the topic we 
want may still be in the process of being created.
 However, every meta request will return all brokers from the logic of the 
server side, just as followed
{code:java}
  def handleTopicMetadataRequest(request: RequestChannel.Request): Unit = {
...
val brokers = metadataCache.getAliveBrokers
...
  }
{code}
I studied the related git commit history and figured out why.
 # This comments are first introduced in KAFKA-642. which means meta request 
only need brokers related to the topics we want.
 # KAFKA-1535 changed the server side logic. which has the metadata response 
contain all alive brokers rather than just the ones needed for the given topics.
 # However, this comments are retained till now.

So According to my understanding, this comments looks like a stale one and can 
be removed.


> Remove Stale comments for meta response handling logic
> --
>
> Key: KAFKA-12789
> URL: https://issues.apache.org/jira/browse/KAFKA-12789
> Project: Kafka
>  Issue Type: Improvement
>Reporter: HaiyuanZhao
>Assignee: HaiyuanZhao
>Priority: Minor
>
> According to my understanding, the following paragraph looks like a stale 
> comments.
> {code:java}
> public void handleSuccessfulResponse(RequestHeader requestHeader, long now, 
> MetadataResponse response) {
> ...
> // Don't update the cluster if there are no valid nodes...the 
> topic we want may still be in the process of being
> // created which means we will get errors and no nodes until it 
> exists
> if (response.brokers().isEmpty()) {
> log.trace("Ignoring empty metadata response with correlation 
> id {}.", requestHeader.correlationId());
> this.metadata.failedUpdate(now);
> } else {
> this.metadata.update(inProgress.requestVersion, response, 
> inProgress.isPartialUpdate, now);
> }
> ...
> {code}
> The comments above mean we will may get errors and no nodes if the topic we 
> want may still be 

[jira] [Updated] (KAFKA-12789) Remove Stale comments for meta response handling logic

2021-05-14 Thread HaiyuanZhao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

HaiyuanZhao updated KAFKA-12789:

Description: 
According to my understanding, the following paragraph looks like a stale 
comments.
{code:java}
public void handleSuccessfulResponse(RequestHeader requestHeader, long now, 
MetadataResponse response) {
...
// Don't update the cluster if there are no valid nodes...the topic 
we want may still be in the process of being
// created which means we will get errors and no nodes until it 
exists
if (response.brokers().isEmpty()) {
log.trace("Ignoring empty metadata response with correlation id 
{}.", requestHeader.correlationId());
this.metadata.failedUpdate(now);
} else {
this.metadata.update(inProgress.requestVersion, response, 
inProgress.isPartialUpdate, now);
}
...
{code}
The comments above mean we will may get errors and no nodes if the topic we 
want may still be in the process of being created.
 However, every meta request will return all brokers from the logic of the 
server side, just as followed
{code:java}
  def handleTopicMetadataRequest(request: RequestChannel.Request): Unit = {
...
val brokers = metadataCache.getAliveBrokers
...
  }
{code}
I studied the related git commit history and figured out why.
 # This comments are first introduced in KAFKA-642. which means meta request 
only need brokers related to the topics we want.
 # KAFKA-1535 changed the server side logic. which has the metadata response 
contain all alive brokers rather than just the ones needed for the given topics.
 # However, this comments are retained till now.

So According to my understanding, this comments looks like a stale one and can 
be removed.

  was:
According to my understanding, the following paragraph looks like a stale 
comments.
{code:java}
public void handleSuccessfulResponse(RequestHeader requestHeader, long now, 
MetadataResponse response) {
...
// Don't update the cluster if there are no valid nodes...the topic 
we want may still be in the process of being
// created which means we will get errors and no nodes until it 
exists
if (response.brokers().isEmpty()) {
log.trace("Ignoring empty metadata response with correlation id 
{}.", requestHeader.correlationId());
this.metadata.failedUpdate(now);
} else {
this.metadata.update(inProgress.requestVersion, response, 
inProgress.isPartialUpdate, now);
}
...
{code}
The comments above mean we will may get errors and no nodes if the topic we 
want may still be in the process of being created.
 However, every meta request will return all brokers from the logic of the 
server side, just as followed
{code:java}
  def handleTopicMetadataRequest(request: RequestChannel.Request): Unit = {
...
val brokers = metadataCache.getAliveBrokers
...
  }
{code}
I studied the related git commit history and figured out why.
 # This comments are first introduced in KAFKA-642. which means meta request 
only need brokers related to the topics we want.
 # Kafka-1535 changed the server side logic. which has the metadata response 
contain all alive brokers rather than just the ones needed for the given topics.
 # However, this comments are retained till now.

So According to my understanding, this comments looks like a stale one and can 
be removed.


> Remove Stale comments for meta response handling logic
> --
>
> Key: KAFKA-12789
> URL: https://issues.apache.org/jira/browse/KAFKA-12789
> Project: Kafka
>  Issue Type: Improvement
>Reporter: HaiyuanZhao
>Assignee: HaiyuanZhao
>Priority: Minor
>
> According to my understanding, the following paragraph looks like a stale 
> comments.
> {code:java}
> public void handleSuccessfulResponse(RequestHeader requestHeader, long now, 
> MetadataResponse response) {
> ...
> // Don't update the cluster if there are no valid nodes...the 
> topic we want may still be in the process of being
> // created which means we will get errors and no nodes until it 
> exists
> if (response.brokers().isEmpty()) {
> log.trace("Ignoring empty metadata response with correlation 
> id {}.", requestHeader.correlationId());
> this.metadata.failedUpdate(now);
> } else {
> this.metadata.update(inProgress.requestVersion, response, 
> inProgress.isPartialUpdate, now);
> }
> ...
> {code}
> The comments above mean we will may get errors and no nodes if the topic we 
> want may still be in the process of being created.
>  H

[jira] [Updated] (KAFKA-12789) Remove Stale comments for meta response handling logic

2021-05-14 Thread HaiyuanZhao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

HaiyuanZhao updated KAFKA-12789:

Description: 
According to my understanding, the following paragraph looks like a stale 
comments.
{code:java}
public void handleSuccessfulResponse(RequestHeader requestHeader, long now, 
MetadataResponse response) {
...
// Don't update the cluster if there are no valid nodes...the topic 
we want may still be in the process of being
// created which means we will get errors and no nodes until it 
exists
if (response.brokers().isEmpty()) {
log.trace("Ignoring empty metadata response with correlation id 
{}.", requestHeader.correlationId());
this.metadata.failedUpdate(now);
} else {
this.metadata.update(inProgress.requestVersion, response, 
inProgress.isPartialUpdate, now);
}
...
{code}
The comments above mean we will may get errors and no nodes if the topic we 
want may still be in the process of being created.
 However, every meta request will return all brokers from the logic of the 
server side, just as followed
{code:java}
  def handleTopicMetadataRequest(request: RequestChannel.Request): Unit = {
...
val brokers = metadataCache.getAliveBrokers
...
  }
{code}
I studied the related git commit history and figured out why.
 # This comments are first introduced in KAFKA-642. which means meta request 
only need brokers related to the topics we want.
 # Kafka-1535 changed the server side logic. which has the metadata response 
contain all alive brokers rather than just the ones needed for the given topics.
 # However, this comments are retained till now.

So According to my understanding, this comments looks like a stale one and can 
be removed.

  was:
According to my understanding, the following paragraph looks like a stale 
comments?
~subscript text~
/ /Don't update the cluster if there are no valid nodes...the topic we want may 
still be in the process of being,
// created which means we will get errors and no nodes until it exists



> Remove Stale comments for meta response handling logic
> --
>
> Key: KAFKA-12789
> URL: https://issues.apache.org/jira/browse/KAFKA-12789
> Project: Kafka
>  Issue Type: Improvement
>Reporter: HaiyuanZhao
>Assignee: HaiyuanZhao
>Priority: Minor
>
> According to my understanding, the following paragraph looks like a stale 
> comments.
> {code:java}
> public void handleSuccessfulResponse(RequestHeader requestHeader, long now, 
> MetadataResponse response) {
> ...
> // Don't update the cluster if there are no valid nodes...the 
> topic we want may still be in the process of being
> // created which means we will get errors and no nodes until it 
> exists
> if (response.brokers().isEmpty()) {
> log.trace("Ignoring empty metadata response with correlation 
> id {}.", requestHeader.correlationId());
> this.metadata.failedUpdate(now);
> } else {
> this.metadata.update(inProgress.requestVersion, response, 
> inProgress.isPartialUpdate, now);
> }
> ...
> {code}
> The comments above mean we will may get errors and no nodes if the topic we 
> want may still be in the process of being created.
>  However, every meta request will return all brokers from the logic of the 
> server side, just as followed
> {code:java}
>   def handleTopicMetadataRequest(request: RequestChannel.Request): Unit = {
> ...
> val brokers = metadataCache.getAliveBrokers
> ...
>   }
> {code}
> I studied the related git commit history and figured out why.
>  # This comments are first introduced in KAFKA-642. which means meta request 
> only need brokers related to the topics we want.
>  # Kafka-1535 changed the server side logic. which has the metadata response 
> contain all alive brokers rather than just the ones needed for the given 
> topics.
>  # However, this comments are retained till now.
> So According to my understanding, this comments looks like a stale one and 
> can be removed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12789) Remove Stale comments for meta response handling logic

2021-05-14 Thread HaiyuanZhao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

HaiyuanZhao updated KAFKA-12789:

Description: 
According to my understanding, the following paragraph looks like a stale 
comments?
~subscript text~
/ /Don't update the cluster if there are no valid nodes...the topic we want may 
still be in the process of being,
// created which means we will get errors and no nodes until it exists


> Remove Stale comments for meta response handling logic
> --
>
> Key: KAFKA-12789
> URL: https://issues.apache.org/jira/browse/KAFKA-12789
> Project: Kafka
>  Issue Type: Improvement
>Reporter: HaiyuanZhao
>Assignee: HaiyuanZhao
>Priority: Minor
>
> According to my understanding, the following paragraph looks like a stale 
> comments?
> ~subscript text~
> / /Don't update the cluster if there are no valid nodes...the topic we want 
> may still be in the process of being,
> // created which means we will get errors and no nodes until it exists



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12789) Remove Stale comments for meta response handling logic

2021-05-14 Thread HaiyuanZhao (Jira)
HaiyuanZhao created KAFKA-12789:
---

 Summary: Remove Stale comments for meta response handling logic
 Key: KAFKA-12789
 URL: https://issues.apache.org/jira/browse/KAFKA-12789
 Project: Kafka
  Issue Type: Improvement
Reporter: HaiyuanZhao
Assignee: HaiyuanZhao






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ccding commented on pull request #10684: MINOR: Improve Log layer segment iteration logic and few other areas

2021-05-14 Thread GitBox


ccding commented on pull request #10684:
URL: https://github.com/apache/kafka/pull/10684#issuecomment-841577947


   LGTM
   
   Thanks @kowshik 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10494: KAFKA-12788: improve KRaft replica placement

2021-05-14 Thread GitBox


junrao commented on a change in pull request #10494:
URL: https://github.com/apache/kafka/pull/10494#discussion_r632872729



##
File path: 
metadata/src/test/java/org/apache/kafka/controller/StripedReplicaPlacerTest.java
##
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.controller.StripedReplicaPlacer.BrokerList;
+import org.apache.kafka.controller.StripedReplicaPlacer.RackList;
+import org.apache.kafka.metadata.UsableBroker;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+
+@Timeout(value = 40)
+public class StripedReplicaPlacerTest {
+/**
+ * Test that the BrokerList class works as expected.
+ */
+@Test
+public void testBrokerList() {
+assertEquals(0, BrokerList.EMPTY.size());
+assertEquals(-1, BrokerList.EMPTY.next(1));
+BrokerList brokers = new BrokerList().add(0).add(1).add(2).add(3);
+assertEquals(4, brokers.size());
+assertEquals(0, brokers.next(0));
+assertEquals(1, brokers.next(0));
+assertEquals(2, brokers.next(0));
+assertEquals(3, brokers.next(0));
+assertEquals(-1, brokers.next(0));
+assertEquals(-1, brokers.next(0));
+assertEquals(1, brokers.next(1));
+assertEquals(2, brokers.next(1));
+assertEquals(3, brokers.next(1));
+assertEquals(0, brokers.next(1));
+assertEquals(-1, brokers.next(1));
+}
+
+/**
+ * Test that we perform striped replica placement as expected, and don't 
use the
+ * fenced replica if we don't have to.
+ */
+@Test
+public void testAvoidFencedReplicaIfPossibleOnSingleRack() {
+MockRandom random = new MockRandom();
+RackList rackList = new RackList(random, Arrays.asList(
+new UsableBroker(3, Optional.empty(), false),
+new UsableBroker(1, Optional.empty(), true),
+new UsableBroker(0, Optional.empty(), false),
+new UsableBroker(4, Optional.empty(), false),
+new UsableBroker(2, Optional.empty(), false)).iterator());
+assertEquals(5, rackList.numTotalBrokers());
+assertEquals(4, rackList.numUnfencedBrokers());
+assertEquals(Collections.singletonList(Optional.empty()), 
rackList.rackNames());
+assertThrows(InvalidReplicationFactorException.class, () -> 
rackList.place(0));
+assertThrows(InvalidReplicationFactorException.class, () -> 
rackList.place(-1));
+assertEquals(Arrays.asList(3, 4, 0, 2), rackList.place(4));
+assertEquals(Arrays.asList(4, 0, 2, 3), rackList.place(4));
+assertEquals(Arrays.asList(0, 2, 3, 4), rackList.place(4));
+assertEquals(Arrays.asList(2, 3, 4, 0), rackList.place(4));
+assertEquals(Arrays.asList(0, 4, 3, 2), rackList.place(4));
+}
+
+/**
+ * Test that we will place on the fenced replica if we need to.
+ */
+@Test
+public void testPlacementOnFencedReplicaOnSingleRack() {
+MockRandom random = new MockRandom();
+RackList rackList = new RackList(random, Arrays.asList(
+new UsableBroker(3, Optional.empty(), false),
+new UsableBroker(1, Optional.empty(), true),
+new UsableBroker(2, Optional.empty(), false)).iterator());
+assertEquals(3, rackList.numTotalBrokers());
+assertEquals(2, rackList.numUnfencedBrokers());
+assertEquals(Collections.singletonList(Optional.empty()), 
rackList.rackNames());
+assertEquals(Arrays.asList(3, 2, 1), rackList.place(3));
+assertEquals(Arrays.asList(2, 3, 1), rackList.place(3));
+assertEquals(Arrays.asList(3, 2, 1), rackList.place(3));
+assertEquals(Arrays.asList(2, 3, 1), rackList.place(3));
+}
+
+@Test
+public void testRackListWithMu

[GitHub] [kafka] dielhennr commented on a change in pull request #10572: KAFKA-12697: Add OfflinePartitionCount and PreferredReplicaImbalanceCount metrics to Quorum Controller

2021-05-14 Thread GitBox


dielhennr commented on a change in pull request #10572:
URL: https://github.com/apache/kafka/pull/10572#discussion_r632858915



##
File path: metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
##
@@ -139,10 +140,16 @@ public TopicIdPartition next() {
  * Partitions with no isr members appear in this map under id NO_LEADER.
  */
 private final TimelineHashMap> 
isrMembers;
+
+private final Map offlinePartitionCounts;

Review comment:
   This was so that when a topic is removed, any offline partitions for 
that topic are decremented from the counter.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10572: KAFKA-12697: Add OfflinePartitionCount and PreferredReplicaImbalanceCount metrics to Quorum Controller

2021-05-14 Thread GitBox


cmccabe commented on a change in pull request #10572:
URL: https://github.com/apache/kafka/pull/10572#discussion_r632857030



##
File path: metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
##
@@ -139,10 +140,16 @@ public TopicIdPartition next() {
  * Partitions with no isr members appear in this map under id NO_LEADER.
  */
 private final TimelineHashMap> 
isrMembers;
+
+private final Map offlinePartitionCounts;

Review comment:
   We cannot use regular maps here because they will not roll back to the 
desired state during a snapshot restore.
   
   In any case, I don't see why we need this map.  It's enough to know how many 
offline partitions there are, which we already have a count of below.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe opened a new pull request #10699: MINOR: Add support for ZK Authorizer with KRaft

2021-05-14 Thread GitBox


cmccabe opened a new pull request #10699:
URL: https://github.com/apache/kafka/pull/10699


   This patch adds support for running the ZooKeeper-based 
kafka.security.authorizer.AclAuthorizer with KRaft clusters. Set the 
authorizer.class.name config as well as the zookeeper.connect config while also 
setting the typical KRaft configs (node.id, process.roles, etc.), and the 
cluster will use KRaft for metadata and ZooKeeper for ACL storage. A system 
test that exercises the authorizer is included.
   
   This patch also changes "Raft" to "KRaft" in several system test files. It 
also fixes a bug where system test admin clients were unable to connect to a 
cluster with broker credentials via the SSL security protocol when the broker 
was using that for inter-broker communication and SASL for client communication.
   
   Co-authored-by: Ron Dagostino 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12776) Producer sends messages out-of-order inspite of enabling idempotence

2021-05-14 Thread NEERAJ VAIDYA (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17344914#comment-17344914
 ] 

NEERAJ VAIDYA commented on KAFKA-12776:
---

[~tombentley]

Yes, this has been reproduced with a KafkaProducer as well.

 

> Producer sends messages out-of-order inspite of enabling idempotence
> 
>
> Key: KAFKA-12776
> URL: https://issues.apache.org/jira/browse/KAFKA-12776
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.6.0, 2.7.0
> Environment: Linux RHEL 7.9 and Ubuntu 20.04
>Reporter: NEERAJ VAIDYA
>Priority: Major
> Attachments: mocker.zip
>
>
> I have an Apache Kafka 2.6 Producer which writes to topic-A (TA). 
> My application is basically a Spring boot web-application which accepts JSON 
> payloads via HTTP and then pushes each to a Kafka topic. I also use Spring 
> Cloud Stream Kafka in the application to create and use a Producer.
> For one of my failure handling test cases, I shutdown the Kafka cluster while 
> my applications are running. (Note : No messages have been published to the 
> Kafka cluster before I stop the cluster)
> When the producer application tries to write messages to TA, it cannot 
> because the cluster is down and hence (I assume) buffers the messages. Let's 
> say it receives 4 messages m1,m2,m3,m4 in increasing time order. (i.e. m1 is 
> first and m4 is last).
> When I bring the Kafka cluster back online, the producer sends the buffered 
> messages to the topic, but they are not in order. I receive for example, m2 
> then m3 then m1 and then m4.
> Why is that ? Is it because the buffering in the producer is multi-threaded 
> with each producing to the topic at the same time ?
> My project code is attached herewith.
> I can confirm that I have enabled idempotence. I have also tried with 
> ```max.in.flight.requests=1```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rondagostino commented on pull request #10694: MINOR: fix system test TestSecurityRollingUpgrade

2021-05-14 Thread GitBox


rondagostino commented on pull request #10694:
URL: https://github.com/apache/kafka/pull/10694#issuecomment-841512146


   Full system test output here: 
https://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2021-05-14--001.1621023792--rondagostino--fix_TestSecurityRollingUpgrade--78762fb8a/report.html
   
   Streams test failures are unrelated.  The one test failure in 
`TestSecurityRollingUpgrade` is fixed by the followup commit in this PR 
(https://github.com/apache/kafka/pull/10694/commits/6e4007fa3a805fde1e9933d4c1a200b472a16137)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10494: KAFKA-12788: improve KRaft replica placement

2021-05-14 Thread GitBox


cmccabe commented on a change in pull request #10494:
URL: https://github.com/apache/kafka/pull/10494#discussion_r632806423



##
File path: 
metadata/src/test/java/org/apache/kafka/controller/StripedReplicaPlacerTest.java
##
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.controller.StripedReplicaPlacer.BrokerList;
+import org.apache.kafka.controller.StripedReplicaPlacer.RackList;
+import org.apache.kafka.metadata.UsableBroker;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+
+@Timeout(value = 40)
+public class StripedReplicaPlacerTest {
+/**
+ * Test that the BrokerList class works as expected.
+ */
+@Test
+public void testBrokerList() {
+assertEquals(0, BrokerList.EMPTY.size());
+assertEquals(-1, BrokerList.EMPTY.next(1));
+BrokerList brokers = new BrokerList().add(0).add(1).add(2).add(3);
+assertEquals(4, brokers.size());
+assertEquals(0, brokers.next(0));
+assertEquals(1, brokers.next(0));
+assertEquals(2, brokers.next(0));
+assertEquals(3, brokers.next(0));
+assertEquals(-1, brokers.next(0));
+assertEquals(-1, brokers.next(0));
+assertEquals(1, brokers.next(1));
+assertEquals(2, brokers.next(1));
+assertEquals(3, brokers.next(1));
+assertEquals(0, brokers.next(1));
+assertEquals(-1, brokers.next(1));
+}
+
+/**
+ * Test that we perform striped replica placement as expected, and don't 
use the
+ * fenced replica if we don't have to.
+ */
+@Test
+public void testAvoidFencedReplicaIfPossibleOnSingleRack() {
+MockRandom random = new MockRandom();
+RackList rackList = new RackList(random, Arrays.asList(
+new UsableBroker(3, Optional.empty(), false),
+new UsableBroker(1, Optional.empty(), true),
+new UsableBroker(0, Optional.empty(), false),
+new UsableBroker(4, Optional.empty(), false),
+new UsableBroker(2, Optional.empty(), false)).iterator());
+assertEquals(5, rackList.numTotalBrokers());
+assertEquals(4, rackList.numUnfencedBrokers());
+assertEquals(Collections.singletonList(Optional.empty()), 
rackList.rackNames());
+assertThrows(InvalidReplicationFactorException.class, () -> 
rackList.place(0));
+assertThrows(InvalidReplicationFactorException.class, () -> 
rackList.place(-1));
+assertEquals(Arrays.asList(3, 4, 0, 2), rackList.place(4));
+assertEquals(Arrays.asList(4, 0, 2, 3), rackList.place(4));
+assertEquals(Arrays.asList(0, 2, 3, 4), rackList.place(4));
+assertEquals(Arrays.asList(2, 3, 4, 0), rackList.place(4));
+assertEquals(Arrays.asList(0, 4, 3, 2), rackList.place(4));
+}
+
+/**
+ * Test that we will place on the fenced replica if we need to.
+ */
+@Test
+public void testPlacementOnFencedReplicaOnSingleRack() {
+MockRandom random = new MockRandom();
+RackList rackList = new RackList(random, Arrays.asList(
+new UsableBroker(3, Optional.empty(), false),
+new UsableBroker(1, Optional.empty(), true),
+new UsableBroker(2, Optional.empty(), false)).iterator());
+assertEquals(3, rackList.numTotalBrokers());
+assertEquals(2, rackList.numUnfencedBrokers());
+assertEquals(Collections.singletonList(Optional.empty()), 
rackList.rackNames());
+assertEquals(Arrays.asList(3, 2, 1), rackList.place(3));
+assertEquals(Arrays.asList(2, 3, 1), rackList.place(3));
+assertEquals(Arrays.asList(3, 2, 1), rackList.place(3));
+assertEquals(Arrays.asList(2, 3, 1), rackList.place(3));
+}
+
+@Test
+public void testRackListWithMultipleRacks() {
+MockRandom random = new MockRandom();
+R

[GitHub] [kafka] dejan2609 opened a new pull request #10698: KAFKA-12770: introduce `checkstyleVersion` build option (for overriding CheckStyle project-defined dependency version)

2021-05-14 Thread GitBox


dejan2609 opened a new pull request #10698:
URL: https://github.com/apache/kafka/pull/10698


   @ijuma please review this.
   
   JIRA ticket: https://issues.apache.org/jira/browse/KAFKA-12770
   
   Related PR/comment: 
https://github.com/apache/kafka/pull/10656#issuecomment-835809154
   
   @romani how to use this (hopefully this will be merged into trunk at some 
point):
   - this command uses project-defined CheckStyle version: `./gradlew 
checkstyleMain checkstyleTest` 
   - while this one overrides it: `./gradlew clean checkstyleMain 
checkstyleTest -PcheckstyleVersion=8.41.1`
   
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10494: KAFKA-12788: improve KRaft replica placement

2021-05-14 Thread GitBox


cmccabe commented on a change in pull request #10494:
URL: https://github.com/apache/kafka/pull/10494#discussion_r632796785



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/StripedReplicaPlacer.java
##
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.metadata.OptionalStringComparator;
+import org.apache.kafka.metadata.UsableBroker;
+
+
+/**
+ * The striped replica placer.
+ *
+ *
+ * GOALS
+ * The design of this placer attempts to satisfy a few competing goals.  
Firstly, we want
+ * to spread the replicas as evenly as we can across racks.  In the simple 
case where
+ * broker racks have not been configured, this goal is a no-op, of course.  
But it is the
+ * highest priority goal in multi-rack clusters.
+ *
+ * Our second goal is to spread the replicas evenly across brokers.  Since we 
are placing
+ * multiple partitions, we try to avoid putting each partition on the same set 
of
+ * replicas, even if it does satisfy the rack placement goal.  However, we 
treat the rack
+ * placement goal as higher priority than this goal-- if you configure 10 
brokers in rack
+ * A and B, and 1 broker in rack C, you will end up with a lot of partitions 
on that one
+ * broker in rack C.  If you were to place a lot of partitions with 
replication factor 3,
+ * each partition would try to get a replica there.  In general racks are 
supposed to be
+ * about the same size -- if they aren't, this is a user error.
+ *
+ * Thirdly, we would prefer to place replicas on unfenced brokers, rather than 
on fenced
+ * brokers.
+ *
+ *
+ * CONSTRAINTS
+ * In addition to these goals, we have two constraints.  Unlike the goals, 
these are not
+ * optional -- they are mandatory.  Placement will fail if a constraint cannot 
be
+ * satisfied.  The first constraint is that we can't place more than one 
replica on the
+ * same broker.  This imposes an upper limit on replication factor-- for 
example, a 3-node
+ * cluster can't have any topics with replication factor 4.  This constraint 
comes from
+ * Kafka's internal design.
+ *
+ * The second constraint is that the leader of each partition must be an 
unfenced broker.
+ * This constraint is a bit arbitrary.  In theory, we could allow people to 
create
+ * new topics even if every broker were fenced.  However, this would be 
confusing for
+ * users.
+ *
+ *
+ * ALGORITHM
+ * The StripedReplicaPlacer constructor loads the broker data into rack 
objects.  Each
+ * rack object contains a sorted list of fenced brokers, and a separate sorted 
list of
+ * unfenced brokers.  The racks themselves are organized into a sorted list, 
stored inside
+ * the top-level RackList object.
+ *
+ * The general idea is that we place replicas on to racks in a round-robin 
fashion.  So if
+ * we had racks A, B, C, and D, and we were creating a new partition with 
replication
+ * factor 3, our first replica might come from A, our second from B, and our 
third from C.
+ * Of course our placement would not be very fair if we always started with 
rack A.
+ * Therefore, we generate a random starting offset when the RackList is 
created.  So one
+ * time we might go B, C, D.  Another time we might go C, D, A.  And so forth.
+ *
+ * Note that each partition we generate advances the starting offset by one.
+ * So in our 4-rack cluster, with 3 partitions, we might choose these racks:
+ *
+ * partition 1: A, B, C
+ * partition 2: B, C, A
+ * partition 3: C, A, B
+ *
+ * This is what generates the characteristic "striped" pattern of this placer.
+ *
+ * So far I haven't said anything about how we choose a replica from within a 
rack.  In
+ * fact, this is also done in a round-robin fashion.  So if rack A had replica 
A0, A1, A2,
+ * and A3, we might return A0 the first time, A1, the second, A2 the third, 
and so on.
+ * Just like with the racks, we add a random sta

[jira] [Updated] (KAFKA-12770) Gradle build: allow the CheckStyle version to be specified via parameter

2021-05-14 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-12770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dejan Stojadinović updated KAFKA-12770:
---
Summary: Gradle build: allow the CheckStyle version to be specified via 
parameter  (was: Jenkins build: allow the CheckStyle version to be specified 
via parameter)

> Gradle build: allow the CheckStyle version to be specified via parameter
> 
>
> Key: KAFKA-12770
> URL: https://issues.apache.org/jira/browse/KAFKA-12770
> Project: Kafka
>  Issue Type: Improvement
>  Components: build
>Reporter: Dejan Stojadinović
>Assignee: Dejan Stojadinović
>Priority: Minor
>
>   ^*(i) Prologue*: 
> [https://github.com/apache/kafka/pull/10656#issuecomment-836074067]^
> *(on) Rationale:* if we implement this CheckStyle team ([~romani] and others) 
> can add Kafka project to their regression suite: 
> [https://github.com/apache/kafka/pull/10656#issuecomment-835809154] 
> *Related links:*
>  * [https://github.com/apache/kafka/blob/2.8.0/Jenkinsfile#L28]
>  * [https://github.com/apache/kafka#common-build-options]
>  * 
> [https://docs.gradle.org/7.0.1/dsl/org.gradle.api.plugins.quality.CheckstyleExtension.html#org.gradle.api.plugins.quality.CheckstyleExtension:toolVersion]
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on a change in pull request #10494: KAFKA-12788: improve KRaft replica placement

2021-05-14 Thread GitBox


cmccabe commented on a change in pull request #10494:
URL: https://github.com/apache/kafka/pull/10494#discussion_r632796424



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/StripedReplicaPlacer.java
##
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.metadata.OptionalStringComparator;
+import org.apache.kafka.metadata.UsableBroker;
+
+
+/**
+ * The striped replica placer.
+ *
+ *
+ * GOALS
+ * The design of this placer attempts to satisfy a few competing goals.  
Firstly, we want
+ * to spread the replicas as evenly as we can across racks.  In the simple 
case where
+ * broker racks have not been configured, this goal is a no-op, of course.  
But it is the
+ * highest priority goal in multi-rack clusters.
+ *
+ * Our second goal is to spread the replicas evenly across brokers.  Since we 
are placing
+ * multiple partitions, we try to avoid putting each partition on the same set 
of
+ * replicas, even if it does satisfy the rack placement goal.  However, we 
treat the rack
+ * placement goal as higher priority than this goal-- if you configure 10 
brokers in rack
+ * A and B, and 1 broker in rack C, you will end up with a lot of partitions 
on that one
+ * broker in rack C.  If you were to place a lot of partitions with 
replication factor 3,
+ * each partition would try to get a replica there.  In general racks are 
supposed to be
+ * about the same size -- if they aren't, this is a user error.
+ *
+ * Thirdly, we would prefer to place replicas on unfenced brokers, rather than 
on fenced

Review comment:
   I added some text saying that we want new leaders to be evenly 
distributed if any one broker is fenced




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10690: MINOR: clarify message ordering with max in-flight requests and idempotent producer

2021-05-14 Thread GitBox


ableegoldman commented on a change in pull request #10690:
URL: https://github.com/apache/kafka/pull/10690#discussion_r632788590



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
##
@@ -247,9 +248,10 @@
 public static final String ENABLE_IDEMPOTENCE_DOC = "When set to 'true', 
the producer will ensure that exactly one copy of each message is written in 
the stream. If 'false', producer "
 + "retries due to 
broker failures, etc., may write duplicates of the retried message in the 
stream. "
 + "Note that enabling 
idempotence requires " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " 
to be less than or equal to 5, "
-+ "" + 
RETRIES_CONFIG + " to be greater than 0 and " + ACKS_CONFIG + 
" must be 'all'. If these values "
++ "" + 
RETRIES_CONFIG + " to be greater than 0, and " + ACKS_CONFIG + 
" must be 'all'. If these values "
 + "are not explicitly 
set by the user, suitable values will be chosen. If incompatible values are 
set, "
-+ "a 
ConfigException will be thrown.";
++ "a 
ConfigException will be thrown. With an idempotent producer, 
setting the " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
++ " greater 
than 1 will not break ordering guarantees.";

Review comment:
   will do




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10690: MINOR: clarify message ordering with max in-flight requests and idempotent producer

2021-05-14 Thread GitBox


ableegoldman commented on a change in pull request #10690:
URL: https://github.com/apache/kafka/pull/10690#discussion_r632787843



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
##
@@ -201,7 +201,8 @@
 public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = 
"max.in.flight.requests.per.connection";
 private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = 
"The maximum number of unacknowledged requests the client will send on a single 
connection before blocking."
 + 
" Note that if this setting is set to be greater than 1 and there are failed 
sends, there is a risk of"
-+ 
" message re-ordering due to retries (i.e., if retries are enabled).";
++ 
" message re-ordering due to retries (i.e., if retries are enabled). With an 
idempotent producer, this"
++ 
" can be up to 5 and still provide ordering guarantees.";

Review comment:
   @ijuma done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (KAFKA-12754) TaskMetadata endOffsets does not update when the offsets are read

2021-05-14 Thread Walker Carlson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walker Carlson closed KAFKA-12754.
--

> TaskMetadata endOffsets does not update when the offsets are read
> -
>
> Key: KAFKA-12754
> URL: https://issues.apache.org/jira/browse/KAFKA-12754
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Walker Carlson
>Assignee: Walker Carlson
>Priority: Major
> Fix For: 3.0.0
>
>
> The high water mark in StreamTask is not updated optimally. Also it would be 
> good to have the metadata offsets have a initial value of -1 instead of an 
> empty map that way the set of TopicPartitions won't change.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12754) TaskMetadata endOffsets does not update when the offsets are read

2021-05-14 Thread Walker Carlson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17344864#comment-17344864
 ] 

Walker Carlson commented on KAFKA-12754:


Huh, I guess I was wrong. Sorry!

> TaskMetadata endOffsets does not update when the offsets are read
> -
>
> Key: KAFKA-12754
> URL: https://issues.apache.org/jira/browse/KAFKA-12754
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Walker Carlson
>Assignee: Walker Carlson
>Priority: Major
> Fix For: 3.0.0
>
>
> The high water mark in StreamTask is not updated optimally. Also it would be 
> good to have the metadata offsets have a initial value of -1 instead of an 
> empty map that way the set of TopicPartitions won't change.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12754) TaskMetadata endOffsets does not update when the offsets are read

2021-05-14 Thread Walker Carlson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walker Carlson updated KAFKA-12754:
---
Affects Version/s: (was: 2.8.0)

> TaskMetadata endOffsets does not update when the offsets are read
> -
>
> Key: KAFKA-12754
> URL: https://issues.apache.org/jira/browse/KAFKA-12754
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Walker Carlson
>Assignee: Walker Carlson
>Priority: Major
> Fix For: 3.0.0
>
>
> The high water mark in StreamTask is not updated optimally. Also it would be 
> good to have the metadata offsets have a initial value of -1 instead of an 
> empty map that way the set of TopicPartitions won't change.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12754) TaskMetadata endOffsets does not update when the offsets are read

2021-05-14 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17344863#comment-17344863
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12754:


[~wcarlson5] are you sure? I tried to cherrypick this just now and pretty much 
everything had merge conflicts. It doesn't look like the feature is in 2.8 
after all, for example TaskMetadata doesn't have any of the new APIs like 
committedOffsets(), endOffsets(), or timeCurrentIdlingStarted()

> TaskMetadata endOffsets does not update when the offsets are read
> -
>
> Key: KAFKA-12754
> URL: https://issues.apache.org/jira/browse/KAFKA-12754
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Walker Carlson
>Assignee: Walker Carlson
>Priority: Major
> Fix For: 3.0.0
>
>
> The high water mark in StreamTask is not updated optimally. Also it would be 
> good to have the metadata offsets have a initial value of -1 instead of an 
> empty map that way the set of TopicPartitions won't change.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on a change in pull request #10494: KAFKA-12788: improve KRaft replica placement

2021-05-14 Thread GitBox


cmccabe commented on a change in pull request #10494:
URL: https://github.com/apache/kafka/pull/10494#discussion_r632770796



##
File path: 
metadata/src/test/java/org/apache/kafka/controller/StripedReplicaPlacerTest.java
##
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.controller.StripedReplicaPlacer.BrokerList;
+import org.apache.kafka.controller.StripedReplicaPlacer.RackList;
+import org.apache.kafka.metadata.UsableBroker;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+
+@Timeout(value = 40)
+public class StripedReplicaPlacerTest {
+/**
+ * Test that the BrokerList class works as expected.
+ */
+@Test
+public void testBrokerList() {
+assertEquals(0, BrokerList.EMPTY.size());
+assertEquals(-1, BrokerList.EMPTY.next(1));
+BrokerList brokers = new BrokerList().add(0).add(1).add(2).add(3);
+assertEquals(4, brokers.size());
+assertEquals(0, brokers.next(0));
+assertEquals(1, brokers.next(0));
+assertEquals(2, brokers.next(0));
+assertEquals(3, brokers.next(0));
+assertEquals(-1, brokers.next(0));
+assertEquals(-1, brokers.next(0));
+assertEquals(1, brokers.next(1));
+assertEquals(2, brokers.next(1));
+assertEquals(3, brokers.next(1));
+assertEquals(0, brokers.next(1));
+assertEquals(-1, brokers.next(1));
+}
+
+/**
+ * Test that we perform striped replica placement as expected, and don't 
use the
+ * fenced replica if we don't have to.
+ */
+@Test
+public void testAvoidFencedReplicaIfPossibleOnSingleRack() {
+MockRandom random = new MockRandom();
+RackList rackList = new RackList(random, Arrays.asList(
+new UsableBroker(3, Optional.empty(), false),
+new UsableBroker(1, Optional.empty(), true),
+new UsableBroker(0, Optional.empty(), false),
+new UsableBroker(4, Optional.empty(), false),
+new UsableBroker(2, Optional.empty(), false)).iterator());
+assertEquals(5, rackList.numTotalBrokers());
+assertEquals(4, rackList.numUnfencedBrokers());
+assertEquals(Collections.singletonList(Optional.empty()), 
rackList.rackNames());
+assertThrows(InvalidReplicationFactorException.class, () -> 
rackList.place(0));
+assertThrows(InvalidReplicationFactorException.class, () -> 
rackList.place(-1));
+assertEquals(Arrays.asList(3, 4, 0, 2), rackList.place(4));
+assertEquals(Arrays.asList(4, 0, 2, 3), rackList.place(4));
+assertEquals(Arrays.asList(0, 2, 3, 4), rackList.place(4));
+assertEquals(Arrays.asList(2, 3, 4, 0), rackList.place(4));
+assertEquals(Arrays.asList(0, 4, 3, 2), rackList.place(4));
+}
+
+/**
+ * Test that we will place on the fenced replica if we need to.
+ */
+@Test
+public void testPlacementOnFencedReplicaOnSingleRack() {
+MockRandom random = new MockRandom();
+RackList rackList = new RackList(random, Arrays.asList(
+new UsableBroker(3, Optional.empty(), false),
+new UsableBroker(1, Optional.empty(), true),
+new UsableBroker(2, Optional.empty(), false)).iterator());
+assertEquals(3, rackList.numTotalBrokers());
+assertEquals(2, rackList.numUnfencedBrokers());
+assertEquals(Collections.singletonList(Optional.empty()), 
rackList.rackNames());
+assertEquals(Arrays.asList(3, 2, 1), rackList.place(3));
+assertEquals(Arrays.asList(2, 3, 1), rackList.place(3));
+assertEquals(Arrays.asList(3, 2, 1), rackList.place(3));
+assertEquals(Arrays.asList(2, 3, 1), rackList.place(3));
+}
+
+@Test
+public void testRackListWithMultipleRacks() {
+MockRandom random = new MockRandom();
+R

[GitHub] [kafka] cmccabe commented on a change in pull request #10494: KAFKA-12788: improve KRaft replica placement

2021-05-14 Thread GitBox


cmccabe commented on a change in pull request #10494:
URL: https://github.com/apache/kafka/pull/10494#discussion_r632769701



##
File path: 
metadata/src/test/java/org/apache/kafka/controller/StripedReplicaPlacerTest.java
##
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.controller.StripedReplicaPlacer.BrokerList;
+import org.apache.kafka.controller.StripedReplicaPlacer.RackList;
+import org.apache.kafka.metadata.UsableBroker;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+
+@Timeout(value = 40)
+public class StripedReplicaPlacerTest {
+/**
+ * Test that the BrokerList class works as expected.
+ */
+@Test
+public void testBrokerList() {
+assertEquals(0, BrokerList.EMPTY.size());
+assertEquals(-1, BrokerList.EMPTY.next(1));
+BrokerList brokers = new BrokerList().add(0).add(1).add(2).add(3);
+assertEquals(4, brokers.size());
+assertEquals(0, brokers.next(0));
+assertEquals(1, brokers.next(0));
+assertEquals(2, brokers.next(0));
+assertEquals(3, brokers.next(0));
+assertEquals(-1, brokers.next(0));
+assertEquals(-1, brokers.next(0));
+assertEquals(1, brokers.next(1));
+assertEquals(2, brokers.next(1));
+assertEquals(3, brokers.next(1));
+assertEquals(0, brokers.next(1));
+assertEquals(-1, brokers.next(1));
+}
+
+/**
+ * Test that we perform striped replica placement as expected, and don't 
use the
+ * fenced replica if we don't have to.
+ */
+@Test
+public void testAvoidFencedReplicaIfPossibleOnSingleRack() {
+MockRandom random = new MockRandom();
+RackList rackList = new RackList(random, Arrays.asList(
+new UsableBroker(3, Optional.empty(), false),
+new UsableBroker(1, Optional.empty(), true),
+new UsableBroker(0, Optional.empty(), false),
+new UsableBroker(4, Optional.empty(), false),
+new UsableBroker(2, Optional.empty(), false)).iterator());
+assertEquals(5, rackList.numTotalBrokers());
+assertEquals(4, rackList.numUnfencedBrokers());
+assertEquals(Collections.singletonList(Optional.empty()), 
rackList.rackNames());
+assertThrows(InvalidReplicationFactorException.class, () -> 
rackList.place(0));
+assertThrows(InvalidReplicationFactorException.class, () -> 
rackList.place(-1));
+assertEquals(Arrays.asList(3, 4, 0, 2), rackList.place(4));
+assertEquals(Arrays.asList(4, 0, 2, 3), rackList.place(4));
+assertEquals(Arrays.asList(0, 2, 3, 4), rackList.place(4));
+assertEquals(Arrays.asList(2, 3, 4, 0), rackList.place(4));
+assertEquals(Arrays.asList(0, 4, 3, 2), rackList.place(4));
+}
+
+/**
+ * Test that we will place on the fenced replica if we need to.
+ */
+@Test
+public void testPlacementOnFencedReplicaOnSingleRack() {
+MockRandom random = new MockRandom();
+RackList rackList = new RackList(random, Arrays.asList(
+new UsableBroker(3, Optional.empty(), false),
+new UsableBroker(1, Optional.empty(), true),
+new UsableBroker(2, Optional.empty(), false)).iterator());
+assertEquals(3, rackList.numTotalBrokers());
+assertEquals(2, rackList.numUnfencedBrokers());
+assertEquals(Collections.singletonList(Optional.empty()), 
rackList.rackNames());
+assertEquals(Arrays.asList(3, 2, 1), rackList.place(3));
+assertEquals(Arrays.asList(2, 3, 1), rackList.place(3));
+assertEquals(Arrays.asList(3, 2, 1), rackList.place(3));

Review comment:
   Because broker 1 is fenced, we don't place a replica there until we need 
to (there are no more replicas remaining).  So it will always be the last / 
least preferred rep

[jira] [Commented] (KAFKA-12754) TaskMetadata endOffsets does not update when the offsets are read

2021-05-14 Thread Walker Carlson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17344857#comment-17344857
 ] 

Walker Carlson commented on KAFKA-12754:


[~ableegoldman] I think picking back to 2.8 would be good

> TaskMetadata endOffsets does not update when the offsets are read
> -
>
> Key: KAFKA-12754
> URL: https://issues.apache.org/jira/browse/KAFKA-12754
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Walker Carlson
>Assignee: Walker Carlson
>Priority: Major
> Fix For: 3.0.0
>
>
> The high water mark in StreamTask is not updated optimally. Also it would be 
> good to have the metadata offsets have a initial value of -1 instead of an 
> empty map that way the set of TopicPartitions won't change.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12754) TaskMetadata endOffsets does not update when the offsets are read

2021-05-14 Thread Walker Carlson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walker Carlson updated KAFKA-12754:
---
Affects Version/s: 2.8.0

> TaskMetadata endOffsets does not update when the offsets are read
> -
>
> Key: KAFKA-12754
> URL: https://issues.apache.org/jira/browse/KAFKA-12754
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Walker Carlson
>Assignee: Walker Carlson
>Priority: Major
> Fix For: 3.0.0
>
>
> The high water mark in StreamTask is not updated optimally. Also it would be 
> good to have the metadata offsets have a initial value of -1 instead of an 
> empty map that way the set of TopicPartitions won't change.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12788) Improve KRaft replica placement

2021-05-14 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-12788:


 Summary: Improve KRaft replica placement
 Key: KAFKA-12788
 URL: https://issues.apache.org/jira/browse/KAFKA-12788
 Project: Kafka
  Issue Type: Improvement
Reporter: Colin McCabe
Assignee: Colin McCabe


Implement the existing Kafka replica placement algorithm for KRaft.
This also means implementing rack awareness. Previously, we just chose
replicas randomly in a non-rack-aware fashion. Also, allow replicas to
be placed on fenced brokers if there are no other choices. This was
specified in KIP-631 but previously not implemented.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on pull request #10494: KAFKA-12788: improve KRaft replica placement

2021-05-14 Thread GitBox


cmccabe commented on pull request #10494:
URL: https://github.com/apache/kafka/pull/10494#issuecomment-841470406


   > @cmccabe : Thanks for the PR. A few comments below. Also, we probably want 
to associate a jira with the PR since it's a bit large.
   
   Fair.  I created KAFKA-12788 for this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12471) Implement createPartitions in KIP-500 mode

2021-05-14 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-12471.
--
Fix Version/s: 3.0.0
   Resolution: Fixed

> Implement createPartitions in KIP-500 mode
> --
>
> Key: KAFKA-12471
> URL: https://issues.apache.org/jira/browse/KAFKA-12471
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
>  Labels: kip-500
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12334) Add the KIP-631 metadata shell

2021-05-14 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-12334.
--
Fix Version/s: 2.8.0
   Resolution: Fixed

Added in 2.8

> Add the KIP-631 metadata shell
> --
>
> Key: KAFKA-12334
> URL: https://issues.apache.org/jira/browse/KAFKA-12334
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
>  Labels: kip-500
> Fix For: 2.8.0
>
>
> interactively examine the metadata stored in a KIP-500 cluster.
> It can read the metadata from the controllers directly, by connecting to
> them, or from a metadata snapshot on disk. In the former case, the
> quorum voters must be specified by passing the --controllers flag; in
> the latter case, the snapshot file should be specified via --snapshot.
> The metadata tool works by replaying the log and storing the state into
> in-memory nodes. These nodes are presented in a fashion similar to
> filesystem directories.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12275) KIP-500: Remove controllerOnly restriction from the DecommissionBroker API

2021-05-14 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-12275.
--
Fix Version/s: 2.8.0
 Assignee: Colin McCabe
   Resolution: Fixed

DecomissionBroker was replaced by UnregisterBroker, and this is handled on 
brokers, not just controllers.

> KIP-500: Remove controllerOnly restriction from the DecommissionBroker API
> --
>
> Key: KAFKA-12275
> URL: https://issues.apache.org/jira/browse/KAFKA-12275
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Alok Nikhil
>Assignee: Colin McCabe
>Priority: Minor
>  Labels: kip-500
> Fix For: 2.8.0
>
>
> We need to fix this to forward the request to the active controller.
> Considering this is a KIP-500 only API and we don't have an implementation 
> yet, we are terminating the connection instead for now.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe merged pull request #10688: KAFKA-12778: Fix QuorumController request timeouts and electLeaders

2021-05-14 Thread GitBox


cmccabe merged pull request #10688:
URL: https://github.com/apache/kafka/pull/10688


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12787) Configure and integrate controller snapshot with the RaftClient

2021-05-14 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-12787:
--

 Summary: Configure and integrate controller snapshot with the 
RaftClient
 Key: KAFKA-12787
 URL: https://issues.apache.org/jira/browse/KAFKA-12787
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jose Armando Garcia Sancio
Assignee: Jose Armando Garcia Sancio






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-12647) Implement loading snapshot in the broker

2021-05-14 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio reassigned KAFKA-12647:
--

Assignee: Colin McCabe  (was: Jose Armando Garcia Sancio)

> Implement loading snapshot in the broker
> 
>
> Key: KAFKA-12647
> URL: https://issues.apache.org/jira/browse/KAFKA-12647
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jose Armando Garcia Sancio
>Assignee: Colin McCabe
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12754) TaskMetadata endOffsets does not update when the offsets are read

2021-05-14 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-12754.

Fix Version/s: 3.0.0
   Resolution: Fixed

> TaskMetadata endOffsets does not update when the offsets are read
> -
>
> Key: KAFKA-12754
> URL: https://issues.apache.org/jira/browse/KAFKA-12754
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Walker Carlson
>Assignee: Walker Carlson
>Priority: Major
> Fix For: 3.0.0
>
>
> The high water mark in StreamTask is not updated optimally. Also it would be 
> good to have the metadata offsets have a initial value of -1 instead of an 
> empty map that way the set of TopicPartitions won't change.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12754) TaskMetadata endOffsets does not update when the offsets are read

2021-05-14 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17344835#comment-17344835
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12754:


[~wcarlson5] was this feature released in 2.8 and needs this fix to be 
cherrypicked back, or is it just in 3.0?

> TaskMetadata endOffsets does not update when the offsets are read
> -
>
> Key: KAFKA-12754
> URL: https://issues.apache.org/jira/browse/KAFKA-12754
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Walker Carlson
>Assignee: Walker Carlson
>Priority: Major
> Fix For: 3.0.0
>
>
> The high water mark in StreamTask is not updated optimally. Also it would be 
> good to have the metadata offsets have a initial value of -1 instead of an 
> empty map that way the set of TopicPartitions won't change.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman merged pull request #10634: KAFKA-12754: Improve endOffsets for TaskMetadata

2021-05-14 Thread GitBox


ableegoldman merged pull request #10634:
URL: https://github.com/apache/kafka/pull/10634


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10634: KAFKA-12754: Improve endOffsets for TaskMetadata

2021-05-14 Thread GitBox


ableegoldman commented on pull request #10634:
URL: https://github.com/apache/kafka/pull/10634#issuecomment-841450435


   Looks like just the usual flaky test failures in the build: 
`RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions()` 
and 
`KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable()`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on pull request #10697: MINOR: Add @cluster annotation to StreamsNamedRepartitionTopicTest

2021-05-14 Thread GitBox


rondagostino commented on pull request #10697:
URL: https://github.com/apache/kafka/pull/10697#issuecomment-841448877


   Tested locally to confirm that the added size is the correct one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino opened a new pull request #10697: MINOR: Add @cluster annotation to StreamsNamedRepartitionTopicTest

2021-05-14 Thread GitBox


rondagostino opened a new pull request #10697:
URL: https://github.com/apache/kafka/pull/10697


   The StreamsNamedRepartitionTopicTest system tests did not have the 
`@cluster` annotation and was therefore taking up the entire cluster.  For 
example, we see this in the log output:
   
   
`kafkatest.tests.streams.streams_named_repartition_topic_test.StreamsNamedRepartitionTopicTest.test_upgrade_topology_with_named_repartition_topic
 is using entire cluster. It's possible this test has no associated cluster 
metadata.`
   
   This PR adds the missing annotation.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dejan2609 commented on pull request #10606: KAFKA-12728: version upgrades: gradle (6.8.3 -->> 7.0.2) and gradle shadow plugin (6.1.0 -->> 7.0.0)

2021-05-14 Thread GitBox


dejan2609 commented on pull request #10606:
URL: https://github.com/apache/kafka/pull/10606#issuecomment-841421148


   Test result on Jenkins shows no failures.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #10688: KAFKA-12778: Fix QuorumController request timeouts and electLeaders

2021-05-14 Thread GitBox


cmccabe commented on pull request #10688:
URL: https://github.com/apache/kafka/pull/10688#issuecomment-841410862


   > Generally, I don't like that we need to explicitly extract the timeout 
from the request and pass it into the controller call. It would nice if this 
could be generalized or automated somehow. However, something like this is not 
needed for this minor fix.
   
   If we were starting all over again, we could give all RPC requests a 
timeout, located in the request header itself.  But that isn't really the case 
today -- some requests don't have timeouts, some do, and we have to extract it 
from the specific request schema.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah opened a new pull request #10696: KAFKA-12777 Refactor and cleanup AutoTopicCreationManager

2021-05-14 Thread GitBox


mumrah opened a new pull request #10696:
URL: https://github.com/apache/kafka/pull/10696


   Rather than using multiple optional class members to determine if we are in 
ZK or KRaft mode, use inheritance. The factory method in 
AutoTopicCreationManager now makes the decision about which mode we're in and 
provides only the needed dependencies to the concrete classes (no optionals).
   
   ```scala
   object AutoTopicCreationManager {
 def apply(
   config: KafkaConfig,
   channelManager: BrokerToControllerChannelManager,
   metadataSupport: MetadataSupport,
   groupCoordinator: GroupCoordinator,
   txnCoordinator: TransactionCoordinator,
 ): AutoTopicCreationManager = {
   metadataSupport match {
 case zk: ZkSupport => new ZkAutoTopicCreationManager(config, zk, 
groupCoordinator, txnCoordinator)
 case _: RaftSupport => new DefaultAutoTopicCreationManager(config, 
channelManager, groupCoordinator, txnCoordinator)
   }
 }
   }
   ```
   
   This also adds some error handling to the response handler when running in 
KRaft mode (KAFKA-12777).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10688: KAFKA-12778: Fix QuorumController request timeouts and electLeaders

2021-05-14 Thread GitBox


cmccabe commented on a change in pull request #10688:
URL: https://github.com/apache/kafka/pull/10688#discussion_r632697059



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -800,19 +800,44 @@ void handleBrokerUnfenced(int brokerId, long brokerEpoch, 
List records = new ArrayList<>();
 ElectLeadersResponseData response = new ElectLeadersResponseData();
-for (TopicPartitions topic : request.topicPartitions()) {
-ReplicaElectionResult topicResults =
-new ReplicaElectionResult().setTopic(topic.topic());
-response.replicaElectionResults().add(topicResults);
-for (int partitionId : topic.partitions()) {
-ApiError error = electLeader(topic.topic(), partitionId, 
uncleanOk, records);
-topicResults.partitionResult().add(new PartitionResult().
-setPartitionId(partitionId).
-setErrorCode(error.error().code()).
-setErrorMessage(error.message()));
+if (request.topicPartitions() == null) {
+// If topicPartitions is null, we try to elect a new leader for 
every partition.
+// There are some obvious issues with this wire protocol.  For 
example, what
+// if we have too many partitions to fit the results in a single 
RPC?  Or what
+// if we generate too many records to fit in a single batch?  This 
behavior

Review comment:
   Thinking about this more, it doesn't seem necessary to do this 
atomically.  We can just do it non-atomically.  It's not a logically 
indivisible operation like creating a topic.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10688: KAFKA-12778: Fix QuorumController request timeouts and electLeaders

2021-05-14 Thread GitBox


cmccabe commented on a change in pull request #10688:
URL: https://github.com/apache/kafka/pull/10688#discussion_r632696250



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -1165,4 +1192,22 @@ public long curClaimEpoch() {
 public void close() throws InterruptedException {
 queue.close();
 }
+
+// VisibleForTesting
+CountDownLatch pause() {
+final CountDownLatch latch = new CountDownLatch(1);
+appendControlEvent("pause", () -> {
+try {
+latch.await();
+} catch (InterruptedException e) {
+log.info("Interrupted while waiting for unpause.", e);
+}
+});
+return latch;
+}
+
+// VisibleForTesting
+Time time() {
+return time;
+}

Review comment:
   We have several helper classes for creating quorum controllers, so 
getting access to the time parameter that way would be difficult.  Anyway, this 
is package-private, so it really only applies to unit tests specifically.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10688: KAFKA-12778: Fix QuorumController request timeouts and electLeaders

2021-05-14 Thread GitBox


cmccabe commented on a change in pull request #10688:
URL: https://github.com/apache/kafka/pull/10688#discussion_r632695450



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -1030,17 +1050,24 @@ private QuorumController(LogContext logContext,
 @Override
 public CompletableFuture
 alterPartitionReassignments(AlterPartitionReassignmentsRequestData 
request) {
-CompletableFuture future = 
new CompletableFuture<>();
-future.completeExceptionally(new UnsupportedOperationException());
-return future;
+if (request.topics().isEmpty()) {
+return CompletableFuture.completedFuture(new 
AlterPartitionReassignmentsResponseData());
+}
+return appendWriteEvent("alterPartitionReassignments",
+time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs(), 
MILLISECONDS),
+() -> {
+throw new UnsupportedOperationException();
+});
 }
 
 @Override
 public CompletableFuture
 listPartitionReassignments(ListPartitionReassignmentsRequestData 
request) {
-CompletableFuture future = new 
CompletableFuture<>();
-future.completeExceptionally(new UnsupportedOperationException());
-return future;
+return appendReadEvent("listPartitionReassignments",
+time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs(), 
MILLISECONDS),
+() -> {
+throw new UnsupportedOperationException();
+});

Review comment:
   agreed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10688: KAFKA-12778: Fix QuorumController request timeouts and electLeaders

2021-05-14 Thread GitBox


cmccabe commented on a change in pull request #10688:
URL: https://github.com/apache/kafka/pull/10688#discussion_r632695254



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -1002,7 +1013,13 @@ private QuorumController(LogContext logContext,
 @Override
 public CompletableFuture
 electLeaders(ElectLeadersRequestData request) {
-return appendWriteEvent("electLeaders", request.timeoutMs(),
+// If topicPartitions is null, we will try to trigger a new leader 
election on
+// all partitions (!).  But if it's empty, there is nothing to do.
+if (request.topicPartitions() != null && 
request.topicPartitions().isEmpty()) {
+return CompletableFuture.completedFuture(new 
ElectLeadersResponseData());
+}
+return appendWriteEvent("electLeaders",
+time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs(), 
MILLISECONDS),

Review comment:
   This is a slightly different fix, although sort of related.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #10664: KAFKA-12749: Changelog topic config on suppressed KTable lost

2021-05-14 Thread GitBox


wcarlson5 commented on a change in pull request #10664:
URL: https://github.com/apache/kafka/pull/10664#discussion_r632673138



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java
##
@@ -46,13 +48,13 @@ public void bufferBuilderShouldBeConsistent() {
 assertThat(
 "keys alone should be set",
 maxRecords(2L),
-is(new EagerBufferConfigImpl(2L, MAX_VALUE))
+is(new EagerBufferConfigImpl(2L, MAX_VALUE, 
Collections.emptyMap()))

Review comment:
   @vichu Can you add some tests that don't just use an empty map?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe opened a new pull request #10695: KAFKA-12783: Remove the deprecated ZK-based partition reassignment API

2021-05-14 Thread GitBox


cmccabe opened a new pull request #10695:
URL: https://github.com/apache/kafka/pull/10695


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino opened a new pull request #10694: MINOR: fix system test TestSecurityRollingUpgrade

2021-05-14 Thread GitBox


rondagostino opened a new pull request #10694:
URL: https://github.com/apache/kafka/pull/10694


   SecurityConfig for a Kafka cluster in a system test is cached due to 
https://github.com/apache/kafka/pull/8917, but we mutate the security config 
during some system tests, and those mutations were not being passed through 
after-the-fact.  These system tests were not testing what they were supposed to 
be testing.  This patch passes through the potential changes so that we again 
test what we are supposed to be testing.
   
   Also, since we became very specific about what SASL mechanisms to enable 
when updating the system tests for KRaft, we need to explicitly explicitly 
indicate to the SecurityConfig any additional SASL mechanisms that we want to 
enable.  This was always necessary once we made the KRaft changes, but it was 
not apparent due to the above bug (where mutations were not being passed 
through).  This patch provides a way to pass additional SASL mechanisms to the 
SecurityConfig by adding an option `sasl_mechanism` to KafkaListener -- this is 
what gets passed into the SecurityConfig when we enable a new security protocol 
in the middle of a system test.
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-3968) fsync() is not called on parent directory when new FileMessageSet is flushed to disk

2021-05-14 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17344701#comment-17344701
 ] 

Jun Rao commented on KAFKA-3968:


Merged the reworked PR [https://github.com/apache/kafka/pull/10680] to trunk.

> fsync() is not called on parent directory when new FileMessageSet is flushed 
> to disk
> 
>
> Key: KAFKA-3968
> URL: https://issues.apache.org/jira/browse/KAFKA-3968
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.9.0.1, 0.10.0.0
> Environment: Linux, ext4 filesystem
>Reporter: Andrey Neporada
>Assignee: Cong Ding
>Priority: Major
>  Labels: reliability
> Fix For: 3.0.0
>
>
> Kafka does not call fsync() on directory when new log segment is created and 
> flushed to disk.
> The problem is that following sequence of calls doesn't guarantee file 
> durability: 
> fd = open("log", O_RDWR | O_CREATE); // suppose open creates "log"
> write(fd);
> fsync(fd);
> If system crashes after fsync() but before parent directory have been flushed 
> to disk, the log file can disappear.
> This is true at least for ext4 on Linux.
> Proposed solution is to flush directory when flush() is called for the first 
> time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao merged pull request #10680: Rework on KAFKA-3968: fsync the parent directory of a segment file when the file is created

2021-05-14 Thread GitBox


junrao merged pull request #10680:
URL: https://github.com/apache/kafka/pull/10680


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12761) Consumer offsets are deleted 7 days after last offset commit instead of EMPTY status

2021-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-12761:

Component/s: (was: streams)

> Consumer offsets are deleted 7 days after last offset commit instead of EMPTY 
> status
> 
>
> Key: KAFKA-12761
> URL: https://issues.apache.org/jira/browse/KAFKA-12761
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.5.0, 2.7.0
>Reporter: Tomasz Kaszuba
>Priority: Major
> Attachments: log.txt
>
>
> If I understand correctly the following 
> [KIP-211|https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets]
>  consumer offsets should only be cleared based on having an Empty status: 
> {{Empty}}: The field {{current_state_timestamp}} is set to when group last 
> transitioned to this state. If the group stays in this for 
> {{offsets.retention.minutes}}, the following offset cleanup scheduled task 
> will remove all offsets in the group (as explained above).
> After a week of not consuming any new messages BUT still connected to the 
> consumer group I had the consumer offsets deleted on restart of the k8s pod.
> {noformat}
> 2021-05-06 10:10:04.684  INFO 1 --- [ncurred-pattern] 
> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer 
> clientId=ieb-x07-baseline-pc-data-storage-incurred-pattern-86c84635-4c96-4941-b440-5ecd4584d3fd-StreamThread-1-consumer,
>  groupId=ieb-x07-baseline-pc-data-storage-incurred-pattern] Found no 
> committed offset for partition ieb.publish.baseline_pc.incurred_pattern-0 
> {noformat}
> I looked at what is happening in the the system topic __consumer_offsets and 
> I see the following:
> {noformat}
> 17138150 2021-04-27 07:14:50 
> [ieb-x07-baseline-pc-data-storage-due-pattern,ieb.publish.baseline_pc.due_pattern,0]::OffsetAndMetadata(offset=646,
>  leaderEpoch=Optional.empty, metadata=AQAAAXkOMJr2, 
> commitTimestamp=1619500490253, expireTimestamp=None)
> 53670252 2021-05-03 17:44:11 
> ieb-x07-baseline-pc-data-storage-due-pattern::GroupMetadata(groupId=ieb-x07-baseline-pc-data-storage-due-pattern,
>  generation=13, protocolType=Some(consumer), currentState=Stable, 
> members=Map(ieb-x07-baseline-pc-data-storage-due-pattern-a8f21ea3-3bc0-4dc9-b82c-6b88f9c74008-StreamThread-1-consumer-50603fe4-10f7-432e-b306-115329e82b38
>  -> 
> MemberMetadata(memberId=ieb-x07-baseline-pc-data-storage-due-pattern-a8f21ea3-3bc0-4dc9-b82c-6b88f9c74008-StreamThread-1-consumer-50603fe4-10f7-432e-b306-115329e82b38,
>  groupInstanceId=Some(null), 
> clientId=ieb-x07-baseline-pc-data-storage-due-pattern-a8f21ea3-3bc0-4dc9-b82c-6b88f9c74008-StreamThread-1-consumer,
>  clientHost=/172.23.194.239, sessionTimeoutMs=1, 
> rebalanceTimeoutMs=30, supportedProtocols=List(stream), )))
> 65226775 2021-05-06 11:56:13 
> ieb-x07-baseline-pc-data-storage-due-pattern::GroupMetadata(groupId=ieb-x07-baseline-pc-data-storage-due-pattern,
>  generation=14, protocolType=Some(consumer), currentState=Empty, 
> members=Map())
> 65226793 2021-05-06 12:10:00 
> [ieb-x07-baseline-pc-data-storage-due-pattern,ieb.publish.baseline_pc.due_pattern,0]::NULL
> 65226795 2021-05-06 12:10:03 
> ieb-x07-baseline-pc-data-storage-due-pattern::GroupMetadata(groupId=ieb-x07-baseline-pc-data-storage-due-pattern,
>  generation=15, protocolType=Some(consumer), currentState=Stable, 
> members=Map(ieb-x07-baseline-pc-data-storage-due-pattern-0fb01327-b21e-4be7-851a-9985e381f8b8-StreamThread-1-consumer-efb312c3-9c24-4088-a5e0-563a3d52c944
>  -> 
> MemberMetadata(memberId=ieb-x07-baseline-pc-data-storage-due-pattern-0fb01327-b21e-4be7-851a-9985e381f8b8-StreamThread-1-consumer-efb312c3-9c24-4088-a5e0-563a3d52c944,
>  groupInstanceId=Some(null), 
> clientId=ieb-x07-baseline-pc-data-storage-due-pattern-0fb01327-b21e-4be7-851a-9985e381f8b8-StreamThread-1-consumer,
>  clientHost=/172.23.193.184, sessionTimeoutMs=1, 
> rebalanceTimeoutMs=30, supportedProtocols=List(stream), )))
> 65226809 2021-05-06 12:10:09 
> [ieb-x07-baseline-pc-data-storage-due-pattern,ieb.publish.baseline_pc.due_pattern,0]::OffsetAndMetadata(offset=2,
>  leaderEpoch=Optional.empty, metadata=AQAAAXlBJ/Sy, 
> commitTimestamp=1620295809338, expireTimestamp=None) {noformat}
> As you can see the last commited offset was on the 27th of April but the 
> group still had status "Stable" on the 3rd of May. It transitioned to "Empty" 
> on the 6th of May when the pod was restarted. Following this you can see the 
> tombstone message set to delete the offsets which corresponds to the streams 
> logs. (UTC+2).
> For me it looks like the cleanup only took the last commit timestamp into 
> consideration and not the Stable status. Am I misunderstanding how this 
> should work?
> The 

[GitHub] [kafka] wcarlson5 commented on pull request #10634: KAFKA-12754: Improve endOffsets for TaskMetadata

2021-05-14 Thread GitBox


wcarlson5 commented on pull request #10634:
URL: https://github.com/apache/kafka/pull/10634#issuecomment-841342581


   @ableegoldman I think it ready :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #10634: KAFKA-12754: Improve endOffsets for TaskMetadata

2021-05-14 Thread GitBox


wcarlson5 commented on a change in pull request #10634:
URL: https://github.com/apache/kafka/pull/10634#discussion_r632637377



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
##
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.TaskMetadata;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@Category(IntegrationTest.class)
+public class TaskMetadataIntegrationTest {
+
+public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1, new Properties(), 0L, 0L);
+
+@BeforeClass
+public static void startCluster() throws IOException {
+CLUSTER.start();
+}
+
+@AfterClass
+public static void closeCluster() {
+CLUSTER.stop();
+}
+public static final Duration DEFAULT_DURATION = Duration.ofSeconds(30);
+
+@Rule
+public TestName testName = new TestName();
+
+private String inputTopic;
+private static StreamsBuilder builder;
+private static Properties properties;
+private static String appId = "TaskMetadataTest_";
+private AtomicBoolean process;
+private AtomicBoolean commit;
+
+@Before
+public void setup() {
+final String testId = safeUniqueTestName(getClass(), testName);
+appId = appId + testId;
+inputTopic = "input" + testId;
+IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);
+
+builder  = new StreamsBuilder();
+
+process = new AtomicBoolean(true);
+commit = new AtomicBoolean(true);
+
+final KStream stream = builder.stream(inputTopic);
+stream.process(PauseProcessor::new);
+
+properties  = mkObjectProperties(
+mkMap(
+mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers()),
+mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+mkEntry(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath()),
+mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2),
+mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class),
+
mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class),
+mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1L)
+)
+);
+}
+
+@Test
+public void shouldReportCorrectCommittedOffsetInformati

[GitHub] [kafka] mumrah commented on a change in pull request #10688: KAFKA-12778: Fix QuorumController request timeouts and electLeaders

2021-05-14 Thread GitBox


mumrah commented on a change in pull request #10688:
URL: https://github.com/apache/kafka/pull/10688#discussion_r632616511



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -1002,7 +1013,13 @@ private QuorumController(LogContext logContext,
 @Override
 public CompletableFuture
 electLeaders(ElectLeadersRequestData request) {
-return appendWriteEvent("electLeaders", request.timeoutMs(),
+// If topicPartitions is null, we will try to trigger a new leader 
election on
+// all partitions (!).  But if it's empty, there is nothing to do.
+if (request.topicPartitions() != null && 
request.topicPartitions().isEmpty()) {
+return CompletableFuture.completedFuture(new 
ElectLeadersResponseData());
+}
+return appendWriteEvent("electLeaders",
+time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs(), 
MILLISECONDS),

Review comment:
   Is this related to timeouts, or is it just a different fix?

##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -800,19 +800,44 @@ void handleBrokerUnfenced(int brokerId, long brokerEpoch, 
List records = new ArrayList<>();
 ElectLeadersResponseData response = new ElectLeadersResponseData();
-for (TopicPartitions topic : request.topicPartitions()) {
-ReplicaElectionResult topicResults =
-new ReplicaElectionResult().setTopic(topic.topic());
-response.replicaElectionResults().add(topicResults);
-for (int partitionId : topic.partitions()) {
-ApiError error = electLeader(topic.topic(), partitionId, 
uncleanOk, records);
-topicResults.partitionResult().add(new PartitionResult().
-setPartitionId(partitionId).
-setErrorCode(error.error().code()).
-setErrorMessage(error.message()));
+if (request.topicPartitions() == null) {
+// If topicPartitions is null, we try to elect a new leader for 
every partition.
+// There are some obvious issues with this wire protocol.  For 
example, what
+// if we have too many partitions to fit the results in a single 
RPC?  Or what
+// if we generate too many records to fit in a single batch?  This 
behavior

Review comment:
   Do we need all the records in a single batch, or could we create a batch 
for each topic (including its partitions)?

##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -1165,4 +1192,22 @@ public long curClaimEpoch() {
 public void close() throws InterruptedException {
 queue.close();
 }
+
+// VisibleForTesting
+CountDownLatch pause() {
+final CountDownLatch latch = new CountDownLatch(1);
+appendControlEvent("pause", () -> {
+try {
+latch.await();
+} catch (InterruptedException e) {
+log.info("Interrupted while waiting for unpause.", e);
+}
+});
+return latch;
+}
+
+// VisibleForTesting
+Time time() {
+return time;
+}

Review comment:
   Do we need this? Can't we just use the Time we pass into the constructor 
in tests? Not a big deal really, just wondering

##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -1030,17 +1050,24 @@ private QuorumController(LogContext logContext,
 @Override
 public CompletableFuture
 alterPartitionReassignments(AlterPartitionReassignmentsRequestData 
request) {
-CompletableFuture future = 
new CompletableFuture<>();
-future.completeExceptionally(new UnsupportedOperationException());
-return future;
+if (request.topics().isEmpty()) {
+return CompletableFuture.completedFuture(new 
AlterPartitionReassignmentsResponseData());
+}
+return appendWriteEvent("alterPartitionReassignments",
+time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs(), 
MILLISECONDS),
+() -> {
+throw new UnsupportedOperationException();
+});
 }
 
 @Override
 public CompletableFuture
 listPartitionReassignments(ListPartitionReassignmentsRequestData 
request) {
-CompletableFuture future = new 
CompletableFuture<>();
-future.completeExceptionally(new UnsupportedOperationException());
-return future;
+return appendReadEvent("listPartitionReassignments",
+time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs(), 
MILLISECONDS),
+() -> {
+throw new UnsupportedOperationException();
+});

Review comment:
   nit: we can do without the curly braces here and above. However, these 
will soon be repl

[GitHub] [kafka] lbradstreet commented on a change in pull request #10620: KAFKA-12736: KafkaProducer.flush holds onto completed ProducerBatch(s) until flush completes

2021-05-14 Thread GitBox


lbradstreet commented on a change in pull request #10620:
URL: https://github.com/apache/kafka/pull/10620#discussion_r632614924



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
##
@@ -710,8 +710,11 @@ private boolean appendsInProgress() {
  */
 public void awaitFlushCompletion() throws InterruptedException {
 try {
-for (ProducerBatch batch : this.incomplete.copyAll())
-batch.produceFuture.await();
+// Obtain a copy of all of the incomplete ProduceRequestResult(s) 
the time of the flush.
+// We must be careful not to hold a reference to the 
ProduceBatch(s) so that garbage
+// collection can occur on the contents.

Review comment:
   Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dejan2609 commented on pull request #10606: KAFKA-12728: version upgrades: gradle (6.8.3 -->> 7.0.2) and gradle shadow plugin (6.1.0 -->> 7.0.0)

2021-05-14 Thread GitBox


dejan2609 commented on pull request #10606:
URL: https://github.com/apache/kafka/pull/10606#issuecomment-841303016


   Bumped/force-pushed (I took the liberty to skipped local testing on my end 
just to save some time).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10606: KAFKA-12728: version upgrades: gradle (6.8.3 -->> 7.0.1) and gradle shadow plugin (6.1.0 -->> 7.0.0)

2021-05-14 Thread GitBox


ijuma commented on pull request #10606:
URL: https://github.com/apache/kafka/pull/10606#issuecomment-841294495


   Let's bump it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dejan2609 edited a comment on pull request #10606: KAFKA-12728: version upgrades: gradle (6.8.3 -->> 7.0.1) and gradle shadow plugin (6.1.0 -->> 7.0.0)

2021-05-14 Thread GitBox


dejan2609 edited a comment on pull request #10606:
URL: https://github.com/apache/kafka/pull/10606#issuecomment-841291321


   FYI/note: Gradle 7.0.2 hotfix is released few hours ago (it solves one 
corner case on Alpine Linux): 
https://github.com/gradle/gradle/releases/tag/v7.0.2
   
   Let me know if you want me to bump Gradle to 7.0.2 (but it also makes sense 
to skip this, because issue affects only one Linux distro). 
   I would bump it although... please have your say @ijuma 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dejan2609 edited a comment on pull request #10606: KAFKA-12728: version upgrades: gradle (6.8.3 -->> 7.0.1) and gradle shadow plugin (6.1.0 -->> 7.0.0)

2021-05-14 Thread GitBox


dejan2609 edited a comment on pull request #10606:
URL: https://github.com/apache/kafka/pull/10606#issuecomment-841291321


   FYI/note: Gradle 7.0.2 hotfix is released few hours ago (it solves one 
corner case on Alpine Linux): 
https://github.com/gradle/gradle/releases/tag/v7.0.2
   
   Let me know if you want me to bump Gradle to 7.0.2 (but it also makes sense 
to skip this, change affects only one Linux distro). 
   I would bump it although... please have your say @ijuma 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dejan2609 commented on pull request #10606: KAFKA-12728: version upgrades: gradle (6.8.3 -->> 7.0.1) and gradle shadow plugin (6.1.0 -->> 7.0.0)

2021-05-14 Thread GitBox


dejan2609 commented on pull request #10606:
URL: https://github.com/apache/kafka/pull/10606#issuecomment-841291321


   FYI/note: Gradle 7.0.2 hotfix is released few hours ago (it solves one 
corner case on Alpine Linux): 
https://github.com/gradle/gradle/releases/tag/v7.0.2
   
   Let me know if you want me to bump Gradle (but it also makes sense to skip 
this). Please have your say @ijuma 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on a change in pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits

2021-05-14 Thread GitBox


kpatelatwork commented on a change in pull request #10528:
URL: https://github.com/apache/kafka/pull/10528#discussion_r632541366



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
##
@@ -105,6 +105,11 @@ public void remove(ConnectorTaskId id) {
 }
 
 private void commit(WorkerSourceTask workerTask) {
+if (!workerTask.shouldCommitOffsets()) {

Review comment:
   makes sense




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12776) Producer sends messages out-of-order inspite of enabling idempotence

2021-05-14 Thread Tom Bentley (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17344595#comment-17344595
 ] 

Tom Bentley commented on KAFKA-12776:
-

[~neeraj.vaidya] can you reproduce this without using Spring, just the 
KafkaProducer?

> Producer sends messages out-of-order inspite of enabling idempotence
> 
>
> Key: KAFKA-12776
> URL: https://issues.apache.org/jira/browse/KAFKA-12776
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.6.0, 2.7.0
> Environment: Linux RHEL 7.9 and Ubuntu 20.04
>Reporter: NEERAJ VAIDYA
>Priority: Major
> Attachments: mocker.zip
>
>
> I have an Apache Kafka 2.6 Producer which writes to topic-A (TA). 
> My application is basically a Spring boot web-application which accepts JSON 
> payloads via HTTP and then pushes each to a Kafka topic. I also use Spring 
> Cloud Stream Kafka in the application to create and use a Producer.
> For one of my failure handling test cases, I shutdown the Kafka cluster while 
> my applications are running. (Note : No messages have been published to the 
> Kafka cluster before I stop the cluster)
> When the producer application tries to write messages to TA, it cannot 
> because the cluster is down and hence (I assume) buffers the messages. Let's 
> say it receives 4 messages m1,m2,m3,m4 in increasing time order. (i.e. m1 is 
> first and m4 is last).
> When I bring the Kafka cluster back online, the producer sends the buffered 
> messages to the topic, but they are not in order. I receive for example, m2 
> then m3 then m1 and then m4.
> Why is that ? Is it because the buffering in the producer is multi-threaded 
> with each producing to the topic at the same time ?
> My project code is attached herewith.
> I can confirm that I have enabled idempotence. I have also tried with 
> ```max.in.flight.requests=1```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on a change in pull request #10690: MINOR: clarify message ordering with max in-flight requests and idempotent producer

2021-05-14 Thread GitBox


ijuma commented on a change in pull request #10690:
URL: https://github.com/apache/kafka/pull/10690#discussion_r632510173



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
##
@@ -247,9 +248,10 @@
 public static final String ENABLE_IDEMPOTENCE_DOC = "When set to 'true', 
the producer will ensure that exactly one copy of each message is written in 
the stream. If 'false', producer "
 + "retries due to 
broker failures, etc., may write duplicates of the retried message in the 
stream. "
 + "Note that enabling 
idempotence requires " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " 
to be less than or equal to 5, "
-+ "" + 
RETRIES_CONFIG + " to be greater than 0 and " + ACKS_CONFIG + 
" must be 'all'. If these values "
++ "" + 
RETRIES_CONFIG + " to be greater than 0, and " + ACKS_CONFIG + 
" must be 'all'. If these values "
 + "are not explicitly 
set by the user, suitable values will be chosen. If incompatible values are 
set, "
-+ "a 
ConfigException will be thrown.";
++ "a 
ConfigException will be thrown. With an idempotent producer, 
setting the " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
++ " greater 
than 1 will not break ordering guarantees.";

Review comment:
   Instead of an additional sentence at the end, could we modify the 
original sentence to also mention ordering?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
##
@@ -247,9 +248,10 @@
 public static final String ENABLE_IDEMPOTENCE_DOC = "When set to 'true', 
the producer will ensure that exactly one copy of each message is written in 
the stream. If 'false', producer "
 + "retries due to 
broker failures, etc., may write duplicates of the retried message in the 
stream. "
 + "Note that enabling 
idempotence requires " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " 
to be less than or equal to 5, "
-+ "" + 
RETRIES_CONFIG + " to be greater than 0 and " + ACKS_CONFIG + 
" must be 'all'. If these values "
++ "" + 
RETRIES_CONFIG + " to be greater than 0, and " + ACKS_CONFIG + 
" must be 'all'. If these values "
 + "are not explicitly 
set by the user, suitable values will be chosen. If incompatible values are 
set, "
-+ "a 
ConfigException will be thrown.";
++ "a 
ConfigException will be thrown. With an idempotent producer, 
setting the " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
++ " greater 
than 1 will not break ordering guarantees.";

Review comment:
   Instead of an additional sentence at the end, could we modify the first 
sentence to also mention ordering?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #10690: MINOR: clarify message ordering with max in-flight requests and idempotent producer

2021-05-14 Thread GitBox


ijuma commented on a change in pull request #10690:
URL: https://github.com/apache/kafka/pull/10690#discussion_r632509174



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
##
@@ -201,7 +201,8 @@
 public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = 
"max.in.flight.requests.per.connection";
 private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = 
"The maximum number of unacknowledged requests the client will send on a single 
connection before blocking."
 + 
" Note that if this setting is set to be greater than 1 and there are failed 
sends, there is a risk of"
-+ 
" message re-ordering due to retries (i.e., if retries are enabled).";
++ 
" message re-ordering due to retries (i.e., if retries are enabled). With an 
idempotent producer, this"
++ 
" can be up to 5 and still provide ordering guarantees.";

Review comment:
   The `enable.idempotence` will be true by default in 3.0, so we should 
perhaps have the "up to 5" thing first.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-12776) Producer sends messages out-of-order inspite of enabling idempotence

2021-05-14 Thread NEERAJ VAIDYA (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17344517#comment-17344517
 ] 

NEERAJ VAIDYA edited comment on KAFKA-12776 at 5/14/21, 12:51 PM:
--

Hi Tom

The topic has just 1 partition.

And just clarify, all messages have the same key. Yet they get reordered.


was (Author: neeraj.vaidya):
Hi Tom

The topic has just 1 partition.

> Producer sends messages out-of-order inspite of enabling idempotence
> 
>
> Key: KAFKA-12776
> URL: https://issues.apache.org/jira/browse/KAFKA-12776
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.6.0, 2.7.0
> Environment: Linux RHEL 7.9 and Ubuntu 20.04
>Reporter: NEERAJ VAIDYA
>Priority: Major
> Attachments: mocker.zip
>
>
> I have an Apache Kafka 2.6 Producer which writes to topic-A (TA). 
> My application is basically a Spring boot web-application which accepts JSON 
> payloads via HTTP and then pushes each to a Kafka topic. I also use Spring 
> Cloud Stream Kafka in the application to create and use a Producer.
> For one of my failure handling test cases, I shutdown the Kafka cluster while 
> my applications are running. (Note : No messages have been published to the 
> Kafka cluster before I stop the cluster)
> When the producer application tries to write messages to TA, it cannot 
> because the cluster is down and hence (I assume) buffers the messages. Let's 
> say it receives 4 messages m1,m2,m3,m4 in increasing time order. (i.e. m1 is 
> first and m4 is last).
> When I bring the Kafka cluster back online, the producer sends the buffered 
> messages to the topic, but they are not in order. I receive for example, m2 
> then m3 then m1 and then m4.
> Why is that ? Is it because the buffering in the producer is multi-threaded 
> with each producing to the topic at the same time ?
> My project code is attached herewith.
> I can confirm that I have enabled idempotence. I have also tried with 
> ```max.in.flight.requests=1```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-12776) Producer sends messages out-of-order inspite of enabling idempotence

2021-05-14 Thread NEERAJ VAIDYA (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17344517#comment-17344517
 ] 

NEERAJ VAIDYA edited comment on KAFKA-12776 at 5/14/21, 12:51 PM:
--

Hi [~tombentley]

The topic has just 1 partition.

And just clarify, all messages have the same key. Yet they get reordered.


was (Author: neeraj.vaidya):
Hi Tom

The topic has just 1 partition.

And just clarify, all messages have the same key. Yet they get reordered.

> Producer sends messages out-of-order inspite of enabling idempotence
> 
>
> Key: KAFKA-12776
> URL: https://issues.apache.org/jira/browse/KAFKA-12776
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.6.0, 2.7.0
> Environment: Linux RHEL 7.9 and Ubuntu 20.04
>Reporter: NEERAJ VAIDYA
>Priority: Major
> Attachments: mocker.zip
>
>
> I have an Apache Kafka 2.6 Producer which writes to topic-A (TA). 
> My application is basically a Spring boot web-application which accepts JSON 
> payloads via HTTP and then pushes each to a Kafka topic. I also use Spring 
> Cloud Stream Kafka in the application to create and use a Producer.
> For one of my failure handling test cases, I shutdown the Kafka cluster while 
> my applications are running. (Note : No messages have been published to the 
> Kafka cluster before I stop the cluster)
> When the producer application tries to write messages to TA, it cannot 
> because the cluster is down and hence (I assume) buffers the messages. Let's 
> say it receives 4 messages m1,m2,m3,m4 in increasing time order. (i.e. m1 is 
> first and m4 is last).
> When I bring the Kafka cluster back online, the producer sends the buffered 
> messages to the topic, but they are not in order. I receive for example, m2 
> then m3 then m1 and then m4.
> Why is that ? Is it because the buffering in the producer is multi-threaded 
> with each producing to the topic at the same time ?
> My project code is attached herewith.
> I can confirm that I have enabled idempotence. I have also tried with 
> ```max.in.flight.requests=1```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12786) Getting SslTransportLayerTest error

2021-05-14 Thread Sibelle (Jira)
Sibelle created KAFKA-12786:
---

 Summary: Getting SslTransportLayerTest error 
 Key: KAFKA-12786
 URL: https://issues.apache.org/jira/browse/KAFKA-12786
 Project: Kafka
  Issue Type: Bug
  Components: unit tests
 Environment: Ububtu 20.04
Reporter: Sibelle
 Attachments: Error.png

SaslAuthenticatorTest > testRepeatedValidSaslPlainOverSsl() PASSED
org.apache.kafka.common.network.SslTransportLayerTest.testUnsupportedTLSVersion(Args)[1]
 failed, log available in 
/kafka/clients/build/reports/testOutput/org.apache.kafka.common.network.SslTransportLayerTest.testUnsupportedTLSVersion(Args)[1].test.stdout

SslTransportLayerTest > [1] tlsProtocol=TLSv1.2, useInlinePem=false FAILED
org.opentest4j.AssertionFailedError: Condition not met within timeout 
15000. Metric not updated failed-authentication-total expected:<1.0> but 
was:<0.0> ==> expected:  but was: 
at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55)
at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40)
at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:193)
at 
org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:320)
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:368)
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:317)
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:301)
at 
org.apache.kafka.common.network.NioEchoServer.waitForMetrics(NioEchoServer.java:196)
at 
org.apache.kafka.common.network.NioEchoServer.verifyAuthenticationMetrics(NioEchoServer.java:155)
at 
org.apache.kafka.common.network.SslTransportLayerTest.testUnsupportedTLSVersion(SslTransportLayerTest.java:644)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] AndroideRob commented on a change in pull request #10591: Fix minor bugs in the existing documentation

2021-05-14 Thread GitBox


AndroideRob commented on a change in pull request #10591:
URL: https://github.com/apache/kafka/pull/10591#discussion_r632489696



##
File path: docs/ops.html
##
@@ -78,7 +78,7 @@   
auto.leader.rebalance.enable=true
 You can also set this to false, but you will then need to manually restore 
leadership to the restored replicas by running the command:
-> 
bin/kafka-preferred-replica-election.sh --bootstrap-server 
broker_host:port
+> 
bin/kafka-leader-election.sh --bootstrap-server broker_host:port

Review comment:
   @showuon hey, do you have any feedback on this? thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #10693: KAFKA-12625: Fix the NOTICE file

2021-05-14 Thread GitBox


jlprat commented on pull request #10693:
URL: https://github.com/apache/kafka/pull/10693#issuecomment-841197433


   Failures were https://issues.apache.org/jira/browse/KAFKA-9009 and 
https://issues.apache.org/jira/browse/KAFKA-12629


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12774) kafka-streams 2.8: logging in uncaught-exceptionhandler doesn't go through log4j

2021-05-14 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-12774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17344537#comment-17344537
 ] 

Jørgen commented on KAFKA-12774:


It actually works as expected when throwing from selectKey 👍

So it seems like a real cornercase. Here is the log when throwing from 
selectKey (formatted as json as expected):
{code:java}
{"instant":{"epochSecond":1620990806,"nanoOfSecond":307831000},"thread":"sb-gp-ms-template-a66ab1de-100a-43a3-ba73-1ea077ed95b5-StreamThread-1","level":"INFO","loggerName":"org.example.kafka.KafkaConfig","message":"Peeked
 key=key1 
value=value1","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","contextMap":{},"threadId":111,"threadPriority":5,"timestamp":"2021-05-14T13:13:26.307+0200"}
{"instant":{"epochSecond":1620990806,"nanoOfSecond":309238000},"thread":"sb-gp-ms-template-a66ab1de-100a-43a3-ba73-1ea077ed95b5-StreamThread-1","level":"ERROR","loggerName":"org.apache.kafka.streams.processor.internals.TaskManager","message":"stream-thread
 [sb-gp-ms-template-a66ab1de-100a-43a3-ba73-1ea077ed95b5-StreamThread-1] Failed 
to process stream task 0_0 due to the following 
error:","thrown":{"commonElementCount":0,"localizedMessage":"Exception caught 
in process. taskId=0_0, processor=KSTREAM-SOURCE-00, topic=foo, 
partition=0, offset=0, stacktrace=java.lang.RuntimeException: 
test-exception\n\tat 
org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:31)\n\tat 
org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:18)\n\tat 
org.apache.kafka.streams.kstream.internals.KStreamImpl.lambda$internalSelectKey$0(KStreamImpl.java:236)\n\tat
 
org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\tat
 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)\n\tat
 
org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\tat
 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\tat
 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)\n\tat
 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)\n\tat
 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)\n","message":"Exception
 caught in process. taskId=0_0, processor=KSTREAM-SOURCE-00, topic=foo, 
partition=0, offset=0, stacktrace=java.lang.RuntimeException: 
test-exception\n\tat 
org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:31)\n\tat 
org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:18)\n\tat 
org.apache.kafka.streams.kstream.internals.KStreamImpl.lambda$internalSelectKey$0(KStreamImpl.java:236)\n\tat
 
org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)\n\tat
 
org.apache.ka

[GitHub] [kafka] jlprat commented on a change in pull request #10693: KAFKA-12625: Fix the NOTICE file

2021-05-14 Thread GitBox


jlprat commented on a change in pull request #10693:
URL: https://github.com/apache/kafka/pull/10693#discussion_r632410626



##
File path: LICENSE-binary
##
@@ -243,7 +243,6 @@ netty-handler-4.1.59.Final
 netty-resolver-4.1.59.Final
 netty-transport-4.1.59.Final
 netty-transport-native-epoll-4.1.59.Final
-netty-transport-native-epoll-4.1.59.Final

Review comment:
   I removed this line from LICENSE-binary as it was a duplicate one




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #10651: MINOR: Kafka Streams code samples formating unification

2021-05-14 Thread GitBox


jlprat commented on pull request #10651:
URL: https://github.com/apache/kafka/pull/10651#issuecomment-841165081


   @cadonna Would you like me to split this PR into several ones? One per file, 
for example?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12776) Producer sends messages out-of-order inspite of enabling idempotence

2021-05-14 Thread NEERAJ VAIDYA (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17344517#comment-17344517
 ] 

NEERAJ VAIDYA commented on KAFKA-12776:
---

Hi Tom

The topic has just 1 partition.

> Producer sends messages out-of-order inspite of enabling idempotence
> 
>
> Key: KAFKA-12776
> URL: https://issues.apache.org/jira/browse/KAFKA-12776
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.6.0, 2.7.0
> Environment: Linux RHEL 7.9 and Ubuntu 20.04
>Reporter: NEERAJ VAIDYA
>Priority: Major
> Attachments: mocker.zip
>
>
> I have an Apache Kafka 2.6 Producer which writes to topic-A (TA). 
> My application is basically a Spring boot web-application which accepts JSON 
> payloads via HTTP and then pushes each to a Kafka topic. I also use Spring 
> Cloud Stream Kafka in the application to create and use a Producer.
> For one of my failure handling test cases, I shutdown the Kafka cluster while 
> my applications are running. (Note : No messages have been published to the 
> Kafka cluster before I stop the cluster)
> When the producer application tries to write messages to TA, it cannot 
> because the cluster is down and hence (I assume) buffers the messages. Let's 
> say it receives 4 messages m1,m2,m3,m4 in increasing time order. (i.e. m1 is 
> first and m4 is last).
> When I bring the Kafka cluster back online, the producer sends the buffered 
> messages to the topic, but they are not in order. I receive for example, m2 
> then m3 then m1 and then m4.
> Why is that ? Is it because the buffering in the producer is multi-threaded 
> with each producing to the topic at the same time ?
> My project code is attached herewith.
> I can confirm that I have enabled idempotence. I have also tried with 
> ```max.in.flight.requests=1```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10201) Update codebase to use more inclusive terms

2021-05-14 Thread Omnia Ibrahim (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17344516#comment-17344516
 ] 

Omnia Ibrahim commented on KAFKA-10201:
---

[~xvrl] if you still need help with the remaining items I can pick up one of 
them. 

> Update codebase to use more inclusive terms
> ---
>
> Key: KAFKA-10201
> URL: https://issues.apache.org/jira/browse/KAFKA-10201
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Xavier Léauté
>Priority: Major
> Fix For: 3.0.0
>
>
> see the corresponding KIP 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-629:+Use+racially+neutral+terms+in+our+codebase



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jlprat commented on a change in pull request #10693: KAFKA-12625: Fix the NOTICE file

2021-05-14 Thread GitBox


jlprat commented on a change in pull request #10693:
URL: https://github.com/apache/kafka/pull/10693#discussion_r632410626



##
File path: LICENSE-binary
##
@@ -243,7 +243,6 @@ netty-handler-4.1.59.Final
 netty-resolver-4.1.59.Final
 netty-transport-4.1.59.Final
 netty-transport-native-epoll-4.1.59.Final
-netty-transport-native-epoll-4.1.59.Final

Review comment:
   I removed this line from LICENCE-binary as it was a duplicate one




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #10693: KAFKA-12625: Fix the NOTICE file

2021-05-14 Thread GitBox


jlprat commented on pull request #10693:
URL: https://github.com/apache/kafka/pull/10693#issuecomment-841123185


   I would highly appreciate any hint on how to proceed for the dependencies 
that do not include a notice file in their jar files but might include them in 
their source code repositories.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat opened a new pull request #10693: KAFKA-12625: Fix the NOTICE file

2021-05-14 Thread GitBox


jlprat opened a new pull request #10693:
URL: https://github.com/apache/kafka/pull/10693


   Adds new NOTICE-binary file and packages it in the binary release
   This follows up the https://github.com/apache/kafka/pull/10474 pull where 
LICENSE was fix.
   
   Similarly as in that PR, I do not know if this is correct, and I would add 
the same disclaimer @vvcephei did: "Please make no assumption that I know what 
I'm doing and let me know if anything seems wrong."
   
   I went through all jar files within the distribution file and copied the 
content of any existing NOTICE file.
   
   Notes:
   * For the cases where several dependencies including the same NOTICE, I 
added it only once.
   * There are cases where the jar included in Kafka's distribution file 
doesn't contain any NOTICE file, hence nothing was copied. These are:
 * activation-1.1.1.jar
 * argparse4j-0.7.0.jar
 * jackson-annotations-2.10.5.jar
 * jackson-dataformat-csv-2.10.5.jar
 * jackson-datatype-jdk8-2.10.5.jar
 * jackson-jaxrs-base-2.10.5.jar
 * jackson-module-scala_2.13-2.10.5.jar
 * jakarta.validation-api-2.0.2.jar
 * javassist-3.27.0-GA.jar
 * javax.servlet-api-3.1.0.jar
 * javax.ws.rs-api-2.1.1.jar
 * jaxb-api-2.3.0.jar
 * jline-3.12.1.jar
 * jopt-simple-5.0.4.jar
 * lz4-java-1.7.1.jar
 * metrics-core-2.2.0.jar
 * netty-buffer-4.1.62.Final.jar
 * netty-codec-4.1.62.Final.jar
 * netty-common-4.1.62.Final.jar
 * netty-handler-4.1.62.Final.jar
 * netty-resolver-4.1.62.Final.jar
 * netty-transport-4.1.62.Final.jar
 * netty-transport-native-epoll-4.1.62.Final.jar
 * netty-transport-native-unix-common-4.1.62.Final.jar
 * reflections-0.9.12.jar
 * rocksdbjni-6.19.3.jar
 * scala-collection-compat_2.13-2.3.0.jar
 * scala-java8-compat_2.13-0.9.1.jar
 * scala-logging_2.13-3.9.2.jar
 * slf4j-api-1.7.30.jar
 * slf4j-log4j12-1.7.30.jar
 * snappy-java-1.1.8.1.jar
 * zstd-jni-1.4.9-1.jar
   * For some of those _NOTICE-missing-in-jar_ dependencies, there were other 
dependencies from the same project that included a NOTICE file, for example, 
the jackson ones.
   * For the Netty project, I manually copied their NOTICE file from github
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12784) ssl kafka failed

2021-05-14 Thread Trofimov (Jira)
Trofimov created KAFKA-12784:


 Summary: ssl kafka failed
 Key: KAFKA-12784
 URL: https://issues.apache.org/jira/browse/KAFKA-12784
 Project: Kafka
  Issue Type: Task
  Components: config, consumer, KafkaConnect
Affects Versions: 2.8.0
Reporter: Trofimov
 Fix For: 2.8.0


*kafka version:* kafka_2.13- 2.8.0

i have problem with ssl kafka. I can't figure out how 
ssl.endpoint.identification.algorithm = works because everything works fine for 
me if this parameter is empty.

 

If I put it https, I will have problems "_no subject alternative dns name 
matching_" with brokers.

 

*My dns name 1 server:*

 

[root@zeus1 /home/trofimov-im]#  nslookup IP_ADDR

IP_ADDR.in-addr.arpa  name = zeus1.bbk.strf.ru.

 

I removed unnecessary

*cert in truststore:*

 

Keystore type: jks
Keystore provider: SUN

Your keystore contains 7 entries

Alias name: caroot
Creation date: May 11, 2021
Entry type: trustedCertEntry

Owner: CN=Enterprise CA 2, DC=bbk, DC=strf, DC=ru
Issuer: CN=Root CA, O=bbk, C=RU

 

***
***


Alias name: zeus1.cert
Creation date: May 11, 2021
Entry type: PrivateKeyEntry
Certificate chain length: 1
Certificate[1]:
Owner: CN=zeus1.bbk.strf.ru, OU=SCS, O=BBK of Russia, L=Moscow, ST=Moscow, C=RU
Issuer: CN=Enterprise CA 2, DC=bbk, DC=strf, DC=ru
Serial number: 1d0007b167a6fd474142f6b79f0007b167
Valid from: Tue Apr 27 19:33:52 MSK 2021 until: Mon Nov 20 14:19:00 MSK 2023
Certificate fingerprints:
 MD5: 85:E5:4F:30:A6:A1:0E:A0:8B:7E:70:1C:2B:01:65:BA
 SHA1: 84:20:E8:0E:8E:24:EB:E4:93:92:7B:D1:61:3B:75:A9:D8:83:12:DE
 SHA256: 
E6:3D:4E:BD:93:22:B5:4E:28:5A:78:F6:B8:53:1B:BF:6C:39:3D:FC:EB:CF:F8:62:FC:DA:9B:BE:59:4E:F6:EE
Signature algorithm name: SHA256withRSA
Subject Public Key Algorithm: 2048-bit RSA key
Version: 3


#8: ObjectId: 2.5.29.17 Criticality=false
SubjectAlternativeName [
 DNSName: scs-kafka.bbk.strf.ru
 DNSName: *.scs-kafka.bbk.strf.ru
 DNSName: scs-kafka
 DNSName: *.scs-kafka
 DNSName: zeus1.bbk.strf.ru
 DNSName: *.zeus1.bbk.strf.ru
 DNSName: zeus1
 DNSName: *.zeus1
]

 

***
***


Alias name: zeus2.cert
Creation date: May 11, 2021
Entry type: PrivateKeyEntry
Certificate chain length: 1
Certificate[1]:
Owner: CN=zeus2.bbk.strf.ru, OU=SCS, O=BBK of Russia, L=Moscow, ST=Moscow, C=RU
Issuer: CN=Enterprise CA 2, DC=bbk, DC=strf, DC=ru
Serial number: 1d0007b169e5e4f88b66d2e1ce0007b169
Valid from: Tue Apr 27 19:35:28 MSK 2021 until: Mon Nov 20 14:19:00 MSK 2023
Certificate fingerprints:
 MD5: 98:19:39:A9:DF:73:61:EB:17:30:BB:40:75:16:CE:0A
 SHA1: 81:0E:77:60:31:77:FC:5A:5C:E3:5F:45:F5:97:C6:84:F0:7B:DB:B5
 SHA256: 
8D:89:2D:B0:AA:9B:8E:95:D0:54:42:E9:E2:6D:67:FC:7A:6E:F4:50:58:76:F4:F7:0E:F5:D6:F7:A8:C1:5D:51
Signature algorithm name: SHA256withRSA
Subject Public Key Algorithm: 2048-bit RSA key
Version: 3

 

#8: ObjectId: 2.5.29.17 Criticality=false
SubjectAlternativeName [
 DNSName: scs-kafka.bbk.strf.ru
 DNSName: *.scs-kafka.bbk.strf.ru
 DNSName: scs-kafka
 DNSName: *.scs-kafka
 DNSName: zeus2.bbk.strf.ru
 DNSName: *.zeus2.bbk.strf.ru
 DNSName: zeus2
 DNSName: *.zeus2
]

 

***
***

 

*keystore is the same*

*The configuration is like this:* 

 

ssl.keystore.location=/home/kafka/kafka.server.keystore.jks

ssl.keystore.password=password

ssl.key.password= password

 

ssl.truststore.location=/home/kafka/kafka.server.truststore.jks

ssl.truststore.password= password

 

ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1

ssl.keystore.type=JKS

ssl.truststore.type=JKS

 

security.inter.broker.protocol=SSL

ssl.client.auth=required

ssl.endpoint.identification.algorithm=

 

*What's wrong, where to dig?*

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-6987) Reimplement KafkaFuture with CompletableFuture

2021-05-14 Thread Tom Bentley (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tom Bentley updated KAFKA-6987:
---
Fix Version/s: 3.0.0

> Reimplement KafkaFuture with CompletableFuture
> --
>
> Key: KAFKA-6987
> URL: https://issues.apache.org/jira/browse/KAFKA-6987
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.0.0
>Reporter: Andras Beni
>Assignee: Tom Bentley
>Priority: Minor
> Fix For: 3.0.0
>
>
> KafkaFuture documentation states:
> {{This will eventually become a thin shim on top of Java 8's 
> CompletableFuture.}}
> With Java 7 support dropped in 2.0, it is time to get rid of custom code.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mimaison commented on pull request #10660: MINOR: Updating files with release 2.7.1

2021-05-14 Thread GitBox


mimaison commented on pull request #10660:
URL: https://github.com/apache/kafka/pull/10660#issuecomment-841103731


   Thanks @mjsax, I can see the artifacts now. I've rekicked the build


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #10646: KAFKA-8897 Follow-up: Consolidate the global state stores

2021-05-14 Thread GitBox


cadonna commented on a change in pull request #10646:
URL: https://github.com/apache/kafka/pull/10646#discussion_r632371505



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -128,8 +129,7 @@ public void setGlobalProcessorContext(final 
InternalProcessorContext globalProce
 }
 
 final Set changelogTopics = new HashSet<>();
-for (final StateStore stateStore : globalStateStores) {
-globalStoreNames.add(stateStore.name());
+for (final StateStore stateStore : topology.globalStateStores()) {
 final String sourceTopic = 
storeToChangelogTopic.get(stateStore.name());
 changelogTopics.add(sourceTopic);
 stateStore.init((StateStoreContext) globalProcessorContext, 
stateStore);

Review comment:
   On a second thought, it might also be relevant for production code since 
we now can restart the stream thread after a fatal error. This is not yet 
possible for a global stream thread, but it might be possible in future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10690: MINOR: clarify message ordering with max in-flight requests and idempotent producer

2021-05-14 Thread GitBox


ableegoldman commented on pull request #10690:
URL: https://github.com/apache/kafka/pull/10690#issuecomment-841084469


   Wow, all checks actually passed! That's the second completely green build 
I've seen in 24 hours, must be my lucky day 😄 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10690: MINOR: clarify message ordering with max in-flight requests and idempotent producer

2021-05-14 Thread GitBox


ableegoldman commented on a change in pull request #10690:
URL: https://github.com/apache/kafka/pull/10690#discussion_r632355592



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
##
@@ -247,9 +248,10 @@
 public static final String ENABLE_IDEMPOTENCE_DOC = "When set to 'true', 
the producer will ensure that exactly one copy of each message is written in 
the stream. If 'false', producer "
 + "retries due to 
broker failures, etc., may write duplicates of the retried message in the 
stream. "
 + "Note that enabling 
idempotence requires " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " 
to be less than or equal to 5, "
-+ "" + 
RETRIES_CONFIG + " to be greater than 0 and " + ACKS_CONFIG + 
" must be 'all'. If these values "
++ "" + 
RETRIES_CONFIG + " to be greater than 0, and " + ACKS_CONFIG + 
" must be 'all'. If these values "
 + "are not explicitly 
set by the user, suitable values will be chosen. If incompatible values are 
set, "
-+ "a 
ConfigException will be thrown.";
++ "a 
ConfigException will be thrown. With an idempotent producer, 
setting the " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
++ " greater 
than 1 will not break ordering guarantees.";

Review comment:
   That requirement is covered a few sentences before this one, it might be 
cut off by Github.  I think it's clear that by "greater than 1", we just mean 
"between 1 and 5, the maximum allowable limit as stated above"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-12625) Fix the NOTICE file

2021-05-14 Thread Josep Prat (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josep Prat reassigned KAFKA-12625:
--

Assignee: Josep Prat

> Fix the NOTICE file
> ---
>
> Key: KAFKA-12625
> URL: https://issues.apache.org/jira/browse/KAFKA-12625
> Project: Kafka
>  Issue Type: Task
>Reporter: John Roesler
>Assignee: Josep Prat
>Priority: Blocker
> Fix For: 3.0.0, 2.8.1
>
>
> In https://issues.apache.org/jira/browse/KAFKA-12602, we fixed the license 
> file, and in the comments, Justin noted that we really should fix the NOTICE 
> file as well.
> Basically, we need to look though each of the packaged dependencies and 
> transmit each of their NOTICEs (for Apache2 deps) or otherwise, any copyright 
> notices they assert.
> It would be good to consider automating a check for this as well (see 
> https://issues.apache.org/jira/browse/KAFKA-12622)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12625) Fix the NOTICE file

2021-05-14 Thread Josep Prat (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17344377#comment-17344377
 ] 

Josep Prat commented on KAFKA-12625:


[~vvcephei] As I see no assignee neither any PR in Github on this, I would like 
to take this task.

> Fix the NOTICE file
> ---
>
> Key: KAFKA-12625
> URL: https://issues.apache.org/jira/browse/KAFKA-12625
> Project: Kafka
>  Issue Type: Task
>Reporter: John Roesler
>Priority: Blocker
> Fix For: 3.0.0, 2.8.1
>
>
> In https://issues.apache.org/jira/browse/KAFKA-12602, we fixed the license 
> file, and in the comments, Justin noted that we really should fix the NOTICE 
> file as well.
> Basically, we need to look though each of the packaged dependencies and 
> transmit each of their NOTICEs (for Apache2 deps) or otherwise, any copyright 
> notices they assert.
> It would be good to consider automating a check for this as well (see 
> https://issues.apache.org/jira/browse/KAFKA-12622)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12776) Producer sends messages out-of-order inspite of enabling idempotence

2021-05-14 Thread Tom Bentley (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17344376#comment-17344376
 ] 

Tom Bentley commented on KAFKA-12776:
-

[~neeraj.vaidya] how many partitions does the topic have? I don't see it stated 
so far. Kafka only guarantees ordering within a partition, so if records are 
written to multiple partitions you will only observe order being preserved for 
records sent to the same partition.

> Producer sends messages out-of-order inspite of enabling idempotence
> 
>
> Key: KAFKA-12776
> URL: https://issues.apache.org/jira/browse/KAFKA-12776
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.6.0, 2.7.0
> Environment: Linux RHEL 7.9 and Ubuntu 20.04
>Reporter: NEERAJ VAIDYA
>Priority: Major
> Attachments: mocker.zip
>
>
> I have an Apache Kafka 2.6 Producer which writes to topic-A (TA). 
> My application is basically a Spring boot web-application which accepts JSON 
> payloads via HTTP and then pushes each to a Kafka topic. I also use Spring 
> Cloud Stream Kafka in the application to create and use a Producer.
> For one of my failure handling test cases, I shutdown the Kafka cluster while 
> my applications are running. (Note : No messages have been published to the 
> Kafka cluster before I stop the cluster)
> When the producer application tries to write messages to TA, it cannot 
> because the cluster is down and hence (I assume) buffers the messages. Let's 
> say it receives 4 messages m1,m2,m3,m4 in increasing time order. (i.e. m1 is 
> first and m4 is last).
> When I bring the Kafka cluster back online, the producer sends the buffered 
> messages to the topic, but they are not in order. I receive for example, m2 
> then m3 then m1 and then m4.
> Why is that ? Is it because the buffering in the producer is multi-threaded 
> with each producing to the topic at the same time ?
> My project code is attached herewith.
> I can confirm that I have enabled idempotence. I have also tried with 
> ```max.in.flight.requests=1```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dejan2609 edited a comment on pull request #10606: KAFKA-12728: version upgrades: gradle (6.8.3 -->> 7.0.1) and gradle shadow plugin (6.1.0 -->> 7.0.0)

2021-05-14 Thread GitBox


dejan2609 edited a comment on pull request #10606:
URL: https://github.com/apache/kafka/pull/10606#issuecomment-841055689


   PR rebased, commits are squashed into one; tests on Jenkins are looking 
good: it seems that only one unrelated test failed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dejan2609 commented on pull request #10606: KAFKA-12728: version upgrades: gradle (6.8.3 -->> 7.0.1) and gradle shadow plugin (6.1.0 -->> 7.0.0)

2021-05-14 Thread GitBox


dejan2609 commented on pull request #10606:
URL: https://github.com/apache/kafka/pull/10606#issuecomment-841055689


   It seems that only one unrelated test failed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org