[GitHub] [kafka] feyman2016 commented on a change in pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot

2021-04-25 Thread GitBox


feyman2016 commented on a change in pull request #10593:
URL: https://github.com/apache/kafka/pull/10593#discussion_r619989241



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -2268,6 +2269,20 @@ private Long append(int epoch, List records, boolean 
isAtomic) {
 );
 }
 
+private void validateSnapshotId(OffsetAndEpoch snapshotId) {
+Optional highWatermarkOpt = 
quorum().highWatermark();
+if (!highWatermarkOpt.isPresent() || highWatermarkOpt.get().offset <= 
snapshotId.offset) {

Review comment:
   Thank you, I updated the PR as well




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on a change in pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot

2021-04-25 Thread GitBox


feyman2016 commented on a change in pull request #10593:
URL: https://github.com/apache/kafka/pull/10593#discussion_r619988848



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -2268,6 +2269,20 @@ private Long append(int epoch, List records, boolean 
isAtomic) {
 );
 }
 
+private void validateSnapshotId(OffsetAndEpoch snapshotId) {
+Optional highWatermarkOpt = 
quorum().highWatermark();
+if (!highWatermarkOpt.isPresent() || highWatermarkOpt.get().offset <= 
snapshotId.offset) {
+throw new KafkaException("Trying to creating snapshot with 
snapshotId: " + snapshotId + " whose offset is larger than the high-watermark: 
" +
+highWatermarkOpt + ". This may necessarily mean a bug in 
the caller, since the there should be a minimum " +
+"size of records between the latest snapshot and the 
high-watermark when creating snapshot");
+}
+int leaderEpoch = quorum().epoch();
+if (snapshotId.epoch > leaderEpoch) {
+throw new KafkaException("Trying to creating snapshot with 
snapshotId: " + snapshotId + " whose epoch is" +
+" larger than the current leader epoch: " + leaderEpoch);
+}

Review comment:
   Thanks, previous I thought the quorum epoch is the leader epoch cache as 
a mistake~
   Updated the PR, in the jira, one thing I'm not sure about is that:
   
   > 2. The epoch of the snapshot is equal to the quorum epoch.
   
   I think the snapshotId's epoch <= quorum epoch should be fine?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on a change in pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot

2021-04-25 Thread GitBox


feyman2016 commented on a change in pull request #10593:
URL: https://github.com/apache/kafka/pull/10593#discussion_r619988848



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -2268,6 +2269,20 @@ private Long append(int epoch, List records, boolean 
isAtomic) {
 );
 }
 
+private void validateSnapshotId(OffsetAndEpoch snapshotId) {
+Optional highWatermarkOpt = 
quorum().highWatermark();
+if (!highWatermarkOpt.isPresent() || highWatermarkOpt.get().offset <= 
snapshotId.offset) {
+throw new KafkaException("Trying to creating snapshot with 
snapshotId: " + snapshotId + " whose offset is larger than the high-watermark: 
" +
+highWatermarkOpt + ". This may necessarily mean a bug in 
the caller, since the there should be a minimum " +
+"size of records between the latest snapshot and the 
high-watermark when creating snapshot");
+}
+int leaderEpoch = quorum().epoch();
+if (snapshotId.epoch > leaderEpoch) {
+throw new KafkaException("Trying to creating snapshot with 
snapshotId: " + snapshotId + " whose epoch is" +
+" larger than the current leader epoch: " + leaderEpoch);
+}

Review comment:
   Thanks, previous I thought the quorum epoch is the leader epoch cache as 
a mistake~
   Updated the PR, one thing I'm not sure about is that:
   
   > The epoch of the snapshot is equal to the quorum epoch.
   
   I think the snapshotId's epoch <= quorum epoch should be fine?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-12535) Consider Revising Document Anchors for Properties

2021-04-25 Thread Minoru Tomioka (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Minoru Tomioka reassigned KAFKA-12535:
--

Assignee: (was: Minoru Tomioka)

> Consider Revising Document Anchors for Properties
> -
>
> Key: KAFKA-12535
> URL: https://issues.apache.org/jira/browse/KAFKA-12535
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation
>Affects Versions: 2.7.0
>Reporter: Gary Russell
>Priority: Minor
>
> Anchors for ToC entries work fine:
> https://kafka.apache.org/documentation/#producerconfigs
> With the section title appearing below the "floating" banner. However, 
> anchors for properties, e.g. 
> https://kafka.apache.org/documentation/#producerconfigs_max.block.ms don't 
> render properly; the first part of the property description is "hidden" under 
> the floating banner.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] socutes opened a new pull request #10596: KAFKA-12715: ACL authentication, Host field support IP network segment

2021-04-25 Thread GitBox


socutes opened a new pull request #10596:
URL: https://github.com/apache/kafka/pull/10596


   At present, ACL authentication, the Host field only supports equal matching 
of source IP, so we hope that the Host field can support matching of IP network 
segment.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Modify the Host matching logic of the AclAuthorizer's function 
matchingAclExists
   - [ ] Add IP segment validation test cases
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] socutes closed pull request #10594: Merge pull request #1 from apache/trunk

2021-04-25 Thread GitBox


socutes closed pull request #10594:
URL: https://github.com/apache/kafka/pull/10594


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #10548: KAFKA-12396 added a nullcheck before trying to retrieve a key

2021-04-25 Thread GitBox


mjsax commented on pull request #10548:
URL: https://github.com/apache/kafka/pull/10548#issuecomment-826417134


   Seems there is some checkstyle error:
   ```
   [2021-04-22T23:02:56.789Z] [ant:checkstyle] [ERROR] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-10548/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java:35:8:
 Unused import - java.util.Objects. [UnusedImports]
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10548: KAFKA-12396 added a nullcheck before trying to retrieve a key

2021-04-25 Thread GitBox


mjsax commented on a change in pull request #10548:
URL: https://github.com/apache/kafka/pull/10548#discussion_r619899204



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -234,13 +247,15 @@ public V delete(final K key) {
 
 @Override
 public , P> KeyValueIterator 
prefixScan(final P prefix, final PS prefixKeySerializer) {
-
+Objects.requireNonNull(prefix, "key cannot be null");

Review comment:
   KafkaStreams runtime _always_ "wraps" any store with a corresponding 
`MeteredXxxStore` (cf 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java#L42-L49)
 -- those `MeteredXxxStores` do the transaction from objects to bytes (ie they 
use the serdes) and also track state store metrics. (Note that stores provided 
to the runtime always have type `` while they are exposed to 
`Processors` as `` types.)
   
   Thus, when you call `context.stateStore(...)` you always get a 
`MeteredXxxStore` object -- of course, those details are hidden behind the 
interface type.
   
   This architecture allows us to unify code and separate concerns. In fact, it 
also allows us to add/remove more "layers": we can insert a "caching layer" 
(cf. 
https://kafka.apache.org/28/documentation/streams/developer-guide/memory-mgmt.html)
 and a "change logging layer" (both are inserted by default).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10548: KAFKA-12396 added a nullcheck before trying to retrieve a key

2021-04-25 Thread GitBox


mjsax commented on a change in pull request #10548:
URL: https://github.com/apache/kafka/pull/10548#discussion_r619899204



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -234,13 +247,15 @@ public V delete(final K key) {
 
 @Override
 public , P> KeyValueIterator 
prefixScan(final P prefix, final PS prefixKeySerializer) {
-
+Objects.requireNonNull(prefix, "key cannot be null");

Review comment:
   KafkaStreams runtime _always_ "wraps" any store with a corresponding 
`MeteredXxxStore` (cf 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java#L42-L49)
 -- those `MeteredXxxsStores` do the transaction from objects to bytes (ie they 
use the serdes) and also track state store metrics. (Note that stores provided 
to the runtime always have type `` while they are exposed to 
`Processors` as `` types.)
   
   This, when you call `context.stateStore(...)` you always get a 
`MeteredXxxStore` object -- of course, those details are hidden behind the 
interface type.
   
   This architecture allows us to unify code and separate concerns. In fact, it 
also allows us to add/remove more "layers": we can insert a "caching layer" 
(cf. 
https://kafka.apache.org/28/documentation/streams/developer-guide/memory-mgmt.html)
 and a "change logging layer" (both are inserted by default).

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -234,13 +247,15 @@ public V delete(final K key) {
 
 @Override
 public , P> KeyValueIterator 
prefixScan(final P prefix, final PS prefixKeySerializer) {
-
+Objects.requireNonNull(prefix, "key cannot be null");

Review comment:
   KafkaStreams runtime _always_ "wraps" any store with a corresponding 
`MeteredXxxStore` (cf 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java#L42-L49)
 -- those `MeteredXxxStores` do the transaction from objects to bytes (ie they 
use the serdes) and also track state store metrics. (Note that stores provided 
to the runtime always have type `` while they are exposed to 
`Processors` as `` types.)
   
   This, when you call `context.stateStore(...)` you always get a 
`MeteredXxxStore` object -- of course, those details are hidden behind the 
interface type.
   
   This architecture allows us to unify code and separate concerns. In fact, it 
also allows us to add/remove more "layers": we can insert a "caching layer" 
(cf. 
https://kafka.apache.org/28/documentation/streams/developer-guide/memory-mgmt.html)
 and a "change logging layer" (both are inserted by default).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10548: KAFKA-12396 added a nullcheck before trying to retrieve a key

2021-04-25 Thread GitBox


mjsax commented on a change in pull request #10548:
URL: https://github.com/apache/kafka/pull/10548#discussion_r619899204



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -234,13 +247,15 @@ public V delete(final K key) {
 
 @Override
 public , P> KeyValueIterator 
prefixScan(final P prefix, final PS prefixKeySerializer) {
-
+Objects.requireNonNull(prefix, "key cannot be null");

Review comment:
   KafkaStreams runtime _always_ "wraps" any store with a corresponding 
`MeteredXxxStore` (cf 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java#L42-L49)
 -- those `MeteredXxxsStores` do the transaction from objects to bytes (ie they 
use the serdes) and also track state store metrics. (Note that stores provided 
to the runtime always have type `` while they are exposed to 
`Processors` as `` types.)
   
   This, when you call `context.stateStore(...)` you always get a 
`MeteredXxxStore` object -- of course, those details are hidden behind the 
interface type.
   
   This architecture allows us to unify code and separate concerns. In fact, it 
also allows us to add/remove more "layers": we can insert a "caching layer" 
(cf. 
https://kafka.apache.org/28/documentation/streams/developer-guide/memory-mgmt.html)
 and a changelogging layer (both are inserted by default).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10548: KAFKA-12396 added a nullcheck before trying to retrieve a key

2021-04-25 Thread GitBox


mjsax commented on a change in pull request #10548:
URL: https://github.com/apache/kafka/pull/10548#discussion_r619899204



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -234,13 +247,15 @@ public V delete(final K key) {
 
 @Override
 public , P> KeyValueIterator 
prefixScan(final P prefix, final PS prefixKeySerializer) {
-
+Objects.requireNonNull(prefix, "key cannot be null");

Review comment:
   KafkaStreams runtime _always_ "wraps" any store with a corresponding 
`MeteredXxxStore` -- those `MeteredXxxsStores` do the transaction from objects 
to bytes (ie they use the serdes) and also track state store metrics. (Note 
that stores provided to the runtime always have type `` while 
they are exposed to `Processors` as `` types.)
   
   This, when you call `context.stateStore(...)` you always get a 
`MeteredXxxStore` object -- of course, those details are hidden behind the 
interface type.
   
   This architecture allows us to unify code and separate concerns. In fact, it 
also allows us to add/remove more "layers": we can insert a "caching layer" 
(cf. 
https://kafka.apache.org/28/documentation/streams/developer-guide/memory-mgmt.html)
 and a changelogging layer (both are inserted by default).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-12713) Report "REAL" broker/consumer fetch latency

2021-04-25 Thread Ming Liu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17331590#comment-17331590
 ] 

Ming Liu edited comment on KAFKA-12713 at 4/25/21, 11:16 PM:
-

The idea is:

0. Add waitTimeMs in Request()

1. In delayedOperation DelayedFetch class, add some code to track the actual 
wait time. 

2. In processResponseCallback() of handleFetchRequest, we can add additional 
parameter of waitTimeMs invoked from DelayedFetch.  It will set 
request.waitTimeMs.

3. In updateRequestMetrics() function, if waitTimeMs is not zero, we will 
deduct that out of RemoteTime and TotalTime.

Let me know for any suggestion/feedback.  I like to propose a KIP on that 
change. 


was (Author: mingaliu):
The idea I am trying right now is:
 1. Add waitTimeMS in FetchResponse.
 2. If the fetch has to wait in purgatory due to either 
replica.fetch.wait.max.ms or fetch.min.bytes, then it will fill the waitTimeMS 
in FetchResponse.
 3. In updateRequestMetrics() function, we will special-process the Fetch 
response, and remove the waitTimeMS out of RemoteTime and TotalTime.

Let me know for any suggestion/feedback.  I like to propose a KIP on that 
change. 

> Report "REAL" broker/consumer fetch latency
> ---
>
> Key: KAFKA-12713
> URL: https://issues.apache.org/jira/browse/KAFKA-12713
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ming Liu
>Priority: Major
>
> The fetch latency is an important metrics to monitor for the cluster 
> performance. With ACK=ALL, the produce latency is affected primarily by 
> broker fetch latency.
> However, currently the reported fetch latency didn't reflect the true fetch 
> latency because it sometimes need to stay in purgatory and wait for 
> replica.fetch.wait.max.ms when data is not available. This greatly affect the 
> real P50, P99 etc. 
> I like to propose a KIP to be able track the real fetch latency for both 
> broker follower and consumer. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on pull request #10466: KAFKA-12417 "streams" module: switch deprecated Gradle configuration `testRuntime`

2021-04-25 Thread GitBox


ijuma commented on pull request #10466:
URL: https://github.com/apache/kafka/pull/10466#issuecomment-826400303


   @dejan2609 can you please rebase on trunk?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #10466: KAFKA-12417 "streams" module: switch deprecated Gradle configuration `testRuntime`

2021-04-25 Thread GitBox


ijuma commented on a change in pull request #10466:
URL: https://github.com/apache/kafka/pull/10466#discussion_r619889058



##
File path: build.gradle
##
@@ -1491,13 +1491,14 @@ project(':streams') {
   }
 
   tasks.create(name: "copyDependantLibs", type: Copy) {
-from (configurations.testRuntime) {

Review comment:
   @vvcephei do you know why we're copying test dependencies here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #10466: KAFKA-12417 "streams" module: switch deprecated Gradle configuration `testRuntime`

2021-04-25 Thread GitBox


ijuma commented on a change in pull request #10466:
URL: https://github.com/apache/kafka/pull/10466#discussion_r619888968



##
File path: build.gradle
##
@@ -1491,13 +1491,14 @@ project(':streams') {
   }
 
   tasks.create(name: "copyDependantLibs", type: Copy) {
-from (configurations.testRuntime) {
-  include('slf4j-log4j12*')
-  include('log4j*jar')
-  include('*hamcrest*')
+from (configurations.testCompileClasspath) {
+  include('jackson*')
+  include('slf4j-api*')
 }
 from (configurations.runtimeClasspath) {
-  exclude('kafka-clients*')
+  include('connect*')
+  include('*java*')
+  include('*jni*')

Review comment:
   Why did we switch from exclude to include?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot

2021-04-25 Thread GitBox


jsancio commented on a change in pull request #10593:
URL: https://github.com/apache/kafka/pull/10593#discussion_r619882204



##
File path: 
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
##
@@ -1335,6 +1313,51 @@ public void 
testFetchSnapshotRequestClusterIdValidation() throws Exception {
 
context.assertSentFetchSnapshotResponse(Errors.INCONSISTENT_CLUSTER_ID);
 }
 
+@Test
+public void testCreateSnapshotWithInvalidSnapshotId() throws Exception {
+int localId = 0;
+int otherNodeId = localId + 1;
+Set voters = Utils.mkSet(localId, otherNodeId);
+int epoch = 2;
+
+List appendRecords = Arrays.asList("a", "b", "c");
+OffsetAndEpoch invalidSnapshotId1 = new OffsetAndEpoch(3, epoch);
+
+RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+.appendToLog(epoch, appendRecords)
+.withAppendLingerMs(1)
+.build();
+
+context.becomeLeader();
+int currentEpoch = context.currentEpoch();
+
+// When creating snapshot:
+// 1. high watermark cannot be empty
+assertEquals(OptionalLong.empty(), context.client.highWatermark());
+assertThrows(KafkaException.class, () -> 
context.client.createSnapshot(invalidSnapshotId1));
+
+// 2. high watermark must larger than the snapshotId's endOffset
+advanceHighWatermark(context, currentEpoch, currentEpoch, otherNodeId, 
localId);
+assertNotEquals(OptionalLong.empty(), context.client.highWatermark());
+OffsetAndEpoch invalidSnapshotId2 = new 
OffsetAndEpoch(context.client.highWatermark().getAsLong(), currentEpoch);
+assertThrows(KafkaException.class, () -> 
context.client.createSnapshot(invalidSnapshotId2));
+
+// 3. the current leader epoch cache must larger than the snapshotId's 
epoch
+OffsetAndEpoch invalidSnapshotId3 = new 
OffsetAndEpoch(context.client.highWatermark().getAsLong() - 1, currentEpoch + 
1);
+assertThrows(KafkaException.class, () -> 
context.client.createSnapshot(invalidSnapshotId3));
+}
+
+private void advanceHighWatermark(RaftClientTestContext context,

Review comment:
   Thanks for cleaning up the code duplication.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot

2021-04-25 Thread GitBox


jsancio commented on a change in pull request #10593:
URL: https://github.com/apache/kafka/pull/10593#discussion_r619882084



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -2268,6 +2269,20 @@ private Long append(int epoch, List records, boolean 
isAtomic) {
 );
 }
 
+private void validateSnapshotId(OffsetAndEpoch snapshotId) {
+Optional highWatermarkOpt = 
quorum().highWatermark();
+if (!highWatermarkOpt.isPresent() || highWatermarkOpt.get().offset <= 
snapshotId.offset) {

Review comment:
   You are correct. I think when I created the Jira I overlooked that both 
snapshot id's end offset and the high-watermark are exclusive values. Update 
the Jira's description.

##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -2268,6 +2269,20 @@ private Long append(int epoch, List records, boolean 
isAtomic) {
 );
 }
 
+private void validateSnapshotId(OffsetAndEpoch snapshotId) {
+Optional highWatermarkOpt = 
quorum().highWatermark();
+if (!highWatermarkOpt.isPresent() || highWatermarkOpt.get().offset <= 
snapshotId.offset) {
+throw new KafkaException("Trying to creating snapshot with 
snapshotId: " + snapshotId + " whose offset is larger than the high-watermark: 
" +
+highWatermarkOpt + ". This may necessarily mean a bug in 
the caller, since the there should be a minimum " +
+"size of records between the latest snapshot and the 
high-watermark when creating snapshot");
+}
+int leaderEpoch = quorum().epoch();
+if (snapshotId.epoch > leaderEpoch) {
+throw new KafkaException("Trying to creating snapshot with 
snapshotId: " + snapshotId + " whose epoch is" +
+" larger than the current leader epoch: " + leaderEpoch);
+}

Review comment:
   From the Jira:
   > The end offset and epoch of the snapshot is valid based on the leader 
epoch cache.
   
   How about also validating against the leader epoch cache? See 
https://github.com/apache/kafka/blob/trunk/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java#L124.
 This is important because both the snapshot and the leader epoch cache are 
used to validate offsets. See 
https://github.com/apache/kafka/blob/trunk/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java#L85.
 The term leader epoch cache comes the variable name `leaderEpochCache` used in 
`kafka.log.Log`.

##
File path: 
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
##
@@ -1335,6 +1313,51 @@ public void 
testFetchSnapshotRequestClusterIdValidation() throws Exception {
 
context.assertSentFetchSnapshotResponse(Errors.INCONSISTENT_CLUSTER_ID);
 }
 
+@Test
+public void testCreateSnapshotWithInvalidSnapshotId() throws Exception {
+int localId = 0;
+int otherNodeId = localId + 1;
+Set voters = Utils.mkSet(localId, otherNodeId);
+int epoch = 2;
+
+List appendRecords = Arrays.asList("a", "b", "c");
+OffsetAndEpoch invalidSnapshotId1 = new OffsetAndEpoch(3, epoch);
+
+RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+.appendToLog(epoch, appendRecords)
+.withAppendLingerMs(1)
+.build();
+
+context.becomeLeader();
+int currentEpoch = context.currentEpoch();
+
+// When creating snapshot:
+// 1. high watermark cannot be empty
+assertEquals(OptionalLong.empty(), context.client.highWatermark());
+assertThrows(KafkaException.class, () -> 
context.client.createSnapshot(invalidSnapshotId1));
+
+// 2. high watermark must larger than the snapshotId's endOffset
+advanceHighWatermark(context, currentEpoch, currentEpoch, otherNodeId, 
localId);
+assertNotEquals(OptionalLong.empty(), context.client.highWatermark());
+OffsetAndEpoch invalidSnapshotId2 = new 
OffsetAndEpoch(context.client.highWatermark().getAsLong(), currentEpoch);
+assertThrows(KafkaException.class, () -> 
context.client.createSnapshot(invalidSnapshotId2));
+
+// 3. the current leader epoch cache must larger than the snapshotId's 
epoch
+OffsetAndEpoch invalidSnapshotId3 = new 
OffsetAndEpoch(context.client.highWatermark().getAsLong() - 1, currentEpoch + 
1);
+assertThrows(KafkaException.class, () -> 
context.client.createSnapshot(invalidSnapshotId3));
+}
+
+private void advanceHighWatermark(RaftClientTestContext context,

Review comment:
   Thanks for clean up the code duplication.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10800) Validate the snapshot id when the state machine creates a snapshot

2021-04-25 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio updated KAFKA-10800:
---
Description: 
When the state machine attempts to create a snapshot writer we should validate 
that the following is true:
 # The end offset of the snapshot is less than or equal to the high-watermark.
 # The epoch of the snapshot is equal to the quorum epoch.
 # The end offset and epoch of the snapshot is valid based on the leader epoch 
cache.

Note that this validation should not be performed when the raft client creates 
the snapshot writer because in that case the local log is out of date and the 
follower should trust the snapshot id sent by the partition leader.

  was:
When the state machine attempts to create a snapshot writer we should validate 
that the following is true:
 # The end offset and epoch of the snapshot is less than the high-watermark.
 # The end offset and epoch of the snapshot is valid based on the leader epoch 
cache.

Note that this validation should not be performed when the raft client creates 
the snapshot writer because in that case the local log is out of date and the 
follower should trust the snapshot id sent by the partition leader.


> Validate the snapshot id when the state machine creates a snapshot
> --
>
> Key: KAFKA-10800
> URL: https://issues.apache.org/jira/browse/KAFKA-10800
> Project: Kafka
>  Issue Type: Sub-task
>  Components: replication
>Reporter: Jose Armando Garcia Sancio
>Assignee: Haoran Xuan
>Priority: Major
>
> When the state machine attempts to create a snapshot writer we should 
> validate that the following is true:
>  # The end offset of the snapshot is less than or equal to the high-watermark.
>  # The epoch of the snapshot is equal to the quorum epoch.
>  # The end offset and epoch of the snapshot is valid based on the leader 
> epoch cache.
> Note that this validation should not be performed when the raft client 
> creates the snapshot writer because in that case the local log is out of date 
> and the follower should trust the snapshot id sent by the partition leader.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on a change in pull request #10573: KAFKA-12574: KIP-732, Deprecate eos-alpha and replace eos-beta with eos-v2

2021-04-25 Thread GitBox


mjsax commented on a change in pull request #10573:
URL: https://github.com/apache/kafka/pull/10573#discussion_r619878083



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
##
@@ -126,10 +129,10 @@ public void setup() {
 clientSupplier.setCluster(cluster);
 streamsProducer = new StreamsProducer(
 config,
-"threadId",
+processId + "-StreamThread-1",

Review comment:
   This PR does not change `StreamsProducer` so this parsing should have 
happened for `_beta` already -- what do I miss?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10573: KAFKA-12574: KIP-732, Deprecate eos-alpha and replace eos-beta with eos-v2

2021-04-25 Thread GitBox


mjsax commented on a change in pull request #10573:
URL: https://github.com/apache/kafka/pull/10573#discussion_r619877817



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
##
@@ -111,6 +113,7 @@
 
 private final StringSerializer stringSerializer = new StringSerializer();
 private final ByteArraySerializer byteArraySerializer = new 
ByteArraySerializer();
+private final UUID processId = UUID.randomUUID();

Review comment:
   > only for eos-v2 for some reason
   
   That is weird -- if `StreamsProducer` requires the `processID` it should 
have required it for `_beta` already? Would be good to understand -- maybe we 
unmasked a bug?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10573: KAFKA-12574: KIP-732, Deprecate eos-alpha and replace eos-beta with eos-v2

2021-04-25 Thread GitBox


mjsax commented on a change in pull request #10573:
URL: https://github.com/apache/kafka/pull/10573#discussion_r619877425



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -603,7 +606,7 @@ boolean runLoop() {
 log.error("Shutting down because the Kafka cluster seems 
to be on a too old version. " +
   "Setting {}=\"{}\" requires broker version 2.5 
or higher.",
   StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
-  EXACTLY_ONCE_BETA);
+  StreamsConfig.EXACTLY_ONCE_V2);

Review comment:
   SGTM -- if the required code changes to get the actual value are to 
much, I am fine with hard-coding the value, too.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10573: KAFKA-12574: KIP-732, Deprecate eos-alpha and replace eos-beta with eos-v2

2021-04-25 Thread GitBox


mjsax commented on a change in pull request #10573:
URL: https://github.com/apache/kafka/pull/10573#discussion_r619876369



##
File path: docs/streams/upgrade-guide.html
##
@@ -93,6 +95,12 @@ Upgrade Guide and API Changes
 
 
 Streams API changes in 3.0.0
+
+  The StreamsConfig.EXACTLY_ONCE and 
StreamsConfig.EXACTLY_ONCE_BETA configs have been deprecated, and 
a new StreamsConfig.EXACTLY_ONCE_V2 config has been
+  introduced. This is the same feature as eos-beta, but renamed to 
highlight its production-readiness. Users of exactly-once semantics should plan 
to migrate to the eos-v2 config and prepare for the removal of the deprecated 
configs in 4.0 or after at least a year
+  from the release of 3.0, whichever comes last. Note that eos-v2 requires 
broker version 2.5 or higher, like eos-beta, so users should begin to upgrade 
their kafka cluster if necessary. See

Review comment:
   Fair enough.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10573: KAFKA-12574: KIP-732, Deprecate eos-alpha and replace eos-beta with eos-v2

2021-04-25 Thread GitBox


mjsax commented on a change in pull request #10573:
URL: https://github.com/apache/kafka/pull/10573#discussion_r619876217



##
File path: docs/streams/upgrade-guide.html
##
@@ -93,6 +95,12 @@ Upgrade Guide and API Changes
 
 
 Streams API changes in 3.0.0
+
+  The StreamsConfig.EXACTLY_ONCE and 
StreamsConfig.EXACTLY_ONCE_BETA configs have been deprecated, and 
a new StreamsConfig.EXACTLY_ONCE_V2 config has been

Review comment:
   Well, we do deprecate `StreamsConfig.EXACTLY_ONCE`, too, but user might 
just do `config.put("processing.guarantee", "exactly_once");` (or have a config 
file with `"exactly_once"`) in it. To me, the main change is that the config 
itself is deprecated and the deprecation of variable is just a "side effect".




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10573: KAFKA-12574: KIP-732, Deprecate eos-alpha and replace eos-beta with eos-v2

2021-04-25 Thread GitBox


mjsax commented on a change in pull request #10573:
URL: https://github.com/apache/kafka/pull/10573#discussion_r619875764



##
File path: docs/streams/upgrade-guide.html
##
@@ -93,6 +95,12 @@ Upgrade Guide and API Changes
 
 
 Streams API changes in 3.0.0
+
+  The StreamsConfig.EXACTLY_ONCE and 
StreamsConfig.EXACTLY_ONCE_BETA configs have been deprecated, and 
a new StreamsConfig.EXACTLY_ONCE_V2 config has been
+  introduced. This is the same feature as eos-beta, but renamed to 
highlight its production-readiness. Users of exactly-once semantics should plan 
to migrate to the eos-v2 config and prepare for the removal of the deprecated 
configs in 4.0 or after at least a year

Review comment:
   I would be very bolt about it:
   ```
   We deprecated processing.guarantee configuration value 
"exactly_once"
   (for EOS version 1) in favor of the improved EOS version 2, formerly 
configured via
   "exactly_once_beta. To avoid the confusion about the term 
"beta" in the config value
   (it was never meant to imply it's not production ready), we furthermore 
renamed
   "exactly_once_beta" to "exactly_once_v2". 
   ```
   
   Or something similar.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12453) Guidance on whether a topology is eligible for optimisation

2021-04-25 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17331635#comment-17331635
 ] 

Matthias J. Sax commented on KAFKA-12453:
-

Thanks for asking. In general 3rd parties hold the copyright. But as a 
Confluent employee, I can tell you that you don't need to worry about it for 
this case – take whatever is helpful.

For testing, check out 
[https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes]
 and 
[https://cwiki.apache.org/confluence/display/KAFKA/Setup+Kafka+Website+on+Local+Apache+Server]
 .

> Guidance on whether a topology is eligible for optimisation
> ---
>
> Key: KAFKA-12453
> URL: https://issues.apache.org/jira/browse/KAFKA-12453
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Patrick O'Keeffe
>Priority: Major
>
> Since the introduction of KStream.toTable() in Kafka 2.6.x, the decision 
> about whether a topology is eligible for optimisation is no longer a simple 
> one, and is related to whether toTable() operations are preceded by key 
> changing operators.
> This decision requires expert level knowledge, and there are serious 
> implications associated with getting it wrong in terms of fault tolerance
> Some ideas spring to mind around how to guide developers to make the correct 
> decision:
>  # Topology.describe() could indicate whether this topology is eligible for 
> optimisation
>  # Topologies could be automatically optimised - note this may have an impact 
> at deployment time, in that an application reset may be required. The 
> developer would need to made aware of this and adjust the deployment plan 
> accordingly
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dejan2609 commented on pull request #10466: KAFKA-12417 "streams" module: switch deprecated Gradle configuration `testRuntime`

2021-04-25 Thread GitBox


dejan2609 commented on pull request #10466:
URL: https://github.com/apache/kafka/pull/10466#issuecomment-826381116


   @ijuma So, now I have something to show: I managed perform some 
reverse-engineering and received exactly the same content in 
`/streams/build/dependent-libs-${version.scala}` folder (that folder is being 
filled with dependencies by a gradle task `streams:copyDependantLibs`). _Mental 
note: it would be interesting to fully discover this task background and usage._
   
   Also: Gradle version is bumped (from **6.8.3** to **7.0**).
   
   Pitfall: `./gradlew clean streams:copyDependantLibs` execution times are 
significantly higher when comparing to a trunk... will try to sort that out, if 
possible.
   
   |git branch | Gradle 6.8.3 | Gradle 7.0 |
   | ---|   --: | --: | 
   | trunk | 16 seconds | **_not applicable_** |
   | this PR | 1 minute 55 seconds | 2 minutes 10 seconds |
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wenbingshen opened a new pull request #10595: MINOR: Add some metrics names

2021-04-25 Thread GitBox


wenbingshen opened a new pull request #10595:
URL: https://github.com/apache/kafka/pull/10595


   As the title.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-12713) Report "REAL" broker/consumer fetch latency

2021-04-25 Thread Ming Liu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17331590#comment-17331590
 ] 

Ming Liu edited comment on KAFKA-12713 at 4/25/21, 5:33 PM:


The idea I am trying right now is:
 1. Add waitTimeMS in FetchResponse.
 2. If the fetch has to wait in purgatory due to either 
replica.fetch.wait.max.ms or fetch.min.bytes, then it will fill the waitTimeMS 
in FetchResponse.
 3. In updateRequestMetrics() function, we will special-process the Fetch 
response, and remove the waitTimeMS out of RemoteTime and TotalTime.

Let me know for any suggestion/feedback.  I like to propose a KIP on that 
change. 


was (Author: mingaliu):
The idea is like this,  we can:
1. Add waitTimeMS in FetchResponse.
2. If the fetch has to wait in purgatory due to either 
replica.fetch.wait.max.ms or fetch.min.bytes, then it will fill the waitTimeMS 
in FetchResponse.
3. In updateRequestMetrics() function, we will special-process the Fetch 
response, and remove the waitTimeMS out of RemoteTime and TotalTime.

> Report "REAL" broker/consumer fetch latency
> ---
>
> Key: KAFKA-12713
> URL: https://issues.apache.org/jira/browse/KAFKA-12713
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ming Liu
>Priority: Major
>
> The fetch latency is an important metrics to monitor for the cluster 
> performance. With ACK=ALL, the produce latency is affected primarily by 
> broker fetch latency.
> However, currently the reported fetch latency didn't reflect the true fetch 
> latency because it sometimes need to stay in purgatory and wait for 
> replica.fetch.wait.max.ms when data is not available. This greatly affect the 
> real P50, P99 etc. 
> I like to propose a KIP to be able track the real fetch latency for both 
> broker follower and consumer. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12713) Report "REAL" broker/consumer fetch latency

2021-04-25 Thread Ming Liu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17331590#comment-17331590
 ] 

Ming Liu commented on KAFKA-12713:
--

The idea is like this,  we can:
1. Add waitTimeMS in FetchResponse.
2. If the fetch has to wait in purgatory due to either 
replica.fetch.wait.max.ms or fetch.min.bytes, then it will fill the waitTimeMS 
in FetchResponse.
3. In updateRequestMetrics() function, we will special-process the Fetch 
response, and remove the waitTimeMS out of RemoteTime and TotalTime.

> Report "REAL" broker/consumer fetch latency
> ---
>
> Key: KAFKA-12713
> URL: https://issues.apache.org/jira/browse/KAFKA-12713
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ming Liu
>Priority: Major
>
> The fetch latency is an important metrics to monitor for the cluster 
> performance. With ACK=ALL, the produce latency is affected primarily by 
> broker fetch latency.
> However, currently the reported fetch latency didn't reflect the true fetch 
> latency because it sometimes need to stay in purgatory and wait for 
> replica.fetch.wait.max.ms when data is not available. This greatly affect the 
> real P50, P99 etc. 
> I like to propose a KIP to be able track the real fetch latency for both 
> broker follower and consumer. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] socutes opened a new pull request #10594: Merge pull request #1 from apache/trunk

2021-04-25 Thread GitBox


socutes opened a new pull request #10594:
URL: https://github.com/apache/kafka/pull/10594


   merge
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12715) ACL authentication, Host field support IP network segment

2021-04-25 Thread xu lobo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17331526#comment-17331526
 ] 

xu lobo commented on KAFKA-12715:
-

I could add this feature and would like to hear from the community。

> ACL authentication, Host field support IP network segment
> -
>
> Key: KAFKA-12715
> URL: https://issues.apache.org/jira/browse/KAFKA-12715
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: xu lobo
>Priority: Major
> Attachments: image-2021-04-25-21-46-48-859.png
>
>
> At present, ACL authentication, the Host field only supports equal matching 
> of source IP, so we hope that the Host field can support matching of IP 
> network segment.
> !image-2021-04-25-21-46-48-859.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12715) ACL authentication, Host field support IP network segment

2021-04-25 Thread xu lobo (Jira)
xu lobo created KAFKA-12715:
---

 Summary: ACL authentication, Host field support IP network segment
 Key: KAFKA-12715
 URL: https://issues.apache.org/jira/browse/KAFKA-12715
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: xu lobo
 Attachments: image-2021-04-25-21-46-48-859.png

At present, ACL authentication, the Host field only supports equal matching of 
source IP, so we hope that the Host field can support matching of IP network 
segment.

!image-2021-04-25-21-46-48-859.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12702) Unhandled exception caught in InterBrokerSendThread

2021-04-25 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-12702.

Fix Version/s: 2.8.1
   3.0.0
   Resolution: Fixed

> Unhandled exception caught in InterBrokerSendThread
> ---
>
> Key: KAFKA-12702
> URL: https://issues.apache.org/jira/browse/KAFKA-12702
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0
>Reporter: Wenbing Shen
>Assignee: Wenbing Shen
>Priority: Blocker
> Fix For: 3.0.0, 2.8.1
>
> Attachments: afterFixing.png, beforeFixing.png, 
> image-2021-04-21-17-12-28-471.png
>
>
> In kraft mode, if listeners and advertised.listeners are not configured with 
> host addresses, the host parameter value of Listener in 
> BrokerRegistrationRequestData will be null. When the broker is started, a 
> null pointer exception will be thrown, causing startup failure.
> A feasible solution is to replace the empty host of endPoint in 
> advertisedListeners with InetAddress.getLocalHost.getCanonicalHostName in 
> Broker Server when building networkListeners.
> The following is the debug log:
> before fixing:
> [2021-04-21 14:15:20,032] DEBUG (broker-2-to-controller-send-thread 
> org.apache.kafka.clients.NetworkClient 522) [broker-2-to-controller] Sending 
> BROKER_REGISTRATION request with header RequestHeader(apiKey=BROKER_REGIS
> TRATION, apiVersion=0, clientId=2, correlationId=6) and timeout 3 to node 
> 2: BrokerRegistrationRequestData(brokerId=2, 
> clusterId='nCqve6D1TEef3NpQniA0Mg', incarnationId=X8w4_1DFT2yUjOm6asPjIQ, 
> listeners=[Listener(n
> ame='PLAINTEXT', {color:#FF}host=null,{color} port=9092, 
> securityProtocol=0)], features=[], rack=null)
> [2021-04-21 14:15:20,033] ERROR (broker-2-to-controller-send-thread 
> kafka.server.BrokerToControllerRequestThread 76) 
> [broker-2-to-controller-send-thread]: unhandled exception caught in 
> InterBrokerSendThread
> java.lang.NullPointerException
>  at 
> org.apache.kafka.common.message.BrokerRegistrationRequestData$Listener.addSize(BrokerRegistrationRequestData.java:515)
>  at 
> org.apache.kafka.common.message.BrokerRegistrationRequestData.addSize(BrokerRegistrationRequestData.java:216)
>  at 
> org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218)
>  at 
> org.apache.kafka.common.protocol.SendBuilder.buildRequestSend(SendBuilder.java:187)
>  at 
> org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:101)
>  at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:525)
>  at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:501)
>  at org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:461)
>  at 
> kafka.common.InterBrokerSendThread.$anonfun$sendRequests$1(InterBrokerSendThread.scala:104)
>  at 
> kafka.common.InterBrokerSendThread.$anonfun$sendRequests$1$adapted(InterBrokerSendThread.scala:99)
>  at kafka.common.InterBrokerSendThread$$Lambda$259/910445654.apply(Unknown 
> Source)
>  at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
>  at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
>  at scala.collection.AbstractIterable.foreach(Iterable.scala:919)
>  at 
> kafka.common.InterBrokerSendThread.sendRequests(InterBrokerSendThread.scala:99)
>  at 
> kafka.common.InterBrokerSendThread.pollOnce(InterBrokerSendThread.scala:73)
>  at 
> kafka.server.BrokerToControllerRequestThread.doWork(BrokerToControllerChannelManager.scala:368)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
> [2021-04-21 14:15:20,034] INFO (broker-2-to-controller-send-thread 
> kafka.server.BrokerToControllerRequestThread 66) 
> [broker-2-to-controller-send-thread]: Stopped
> after fixing:
> [2021-04-21 15:05:01,095] DEBUG (BrokerToControllerChannelManager broker=2 
> name=heartbeat org.apache.kafka.clients.NetworkClient 512) 
> [BrokerToControllerChannelManager broker=2 name=heartbeat] Sending 
> BROKER_REGISTRATI
> ON request with header RequestHeader(apiKey=BROKER_REGISTRATION, 
> apiVersion=0, clientId=2, correlationId=0) and timeout 3 to node 2: 
> BrokerRegistrationRequestData(brokerId=2, clusterId='nCqve6D1TEef3NpQniA0Mg', 
> inc
> arnationId=xF29h_IRR1KzrERWwssQ2w, listeners=[Listener(name='PLAINTEXT', 
> host='hdpxxx.cn', port=9092, securityProtocol=0)], features=[], rack=null)
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on pull request #10586: KAFKA-12702: Fix NPE in networkListeners from BrokerServer in 2.8

2021-04-25 Thread GitBox


chia7712 commented on pull request #10586:
URL: https://github.com/apache/kafka/pull/10586#issuecomment-826324896


   @wenbingshen thanks for this backport!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 merged pull request #10586: KAFKA-12702: Fix NPE in networkListeners from BrokerServer in 2.8

2021-04-25 Thread GitBox


chia7712 merged pull request #10586:
URL: https://github.com/apache/kafka/pull/10586


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 merged pull request #10558: KAFKA-12684: Fix noop set is incorrectly replaced with succeeded set from LeaderElectionCommand

2021-04-25 Thread GitBox


chia7712 merged pull request #10558:
URL: https://github.com/apache/kafka/pull/10558


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12684) The valid partition list is incorrectly replaced by the successfully elected partition list

2021-04-25 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-12684.

Resolution: Fixed

> The valid partition list is incorrectly replaced by the successfully elected 
> partition list
> ---
>
> Key: KAFKA-12684
> URL: https://issues.apache.org/jira/browse/KAFKA-12684
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 2.6.0, 2.7.0
>Reporter: Wenbing Shen
>Assignee: Wenbing Shen
>Priority: Minor
> Fix For: 3.0.0
>
> Attachments: election-preferred-leader.png, non-preferred-leader.png
>
>
> When using the kafka-election-tool for preferred replica election, if there 
> are partitions in the elected list that are in the preferred replica, the 
> list of partitions already in the preferred replica will be replaced by the 
> successfully elected partition list.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on pull request #9627: KAFKA-10746: Change to Warn logs when necessary to notify users

2021-04-25 Thread GitBox


chia7712 commented on pull request #9627:
URL: https://github.com/apache/kafka/pull/9627#issuecomment-826320025


   @showuon thanks for this patch!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 merged pull request #9627: KAFKA-10746: Change to Warn logs when necessary to notify users

2021-04-25 Thread GitBox


chia7712 merged pull request #9627:
URL: https://github.com/apache/kafka/pull/9627


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12714) Kafka 2.8 server not starting on Windows OS

2021-04-25 Thread Rajni Jain (Jira)
Rajni Jain created KAFKA-12714:
--

 Summary: Kafka 2.8 server not starting on Windows OS
 Key: KAFKA-12714
 URL: https://issues.apache.org/jira/browse/KAFKA-12714
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.8.0
Reporter: Rajni Jain


On Windows OS , Server is not getting started because of below error.

Ubuntu , everything is working as expected.

kafka-server-start.bat config\kraft\server.properties
[2021-04-25 16:50:15,831] INFO Registered kafka:type=kafka.Log4jController 
MBean (kafka.utils.Log4jControllerRegistration$)
[2021-04-25 16:50:16,151] INFO Setting -D 
jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS 
renegotiation (org.apache.zookeeper.common.X509Util)
[2021-04-25 16:50:16,254] DEBUG Initializing task scheduler. 
(kafka.utils.KafkaScheduler)
[2021-04-25 16:50:16,320] INFO [Log partition=@metadata-0, 
dir=C:\tmp\kraft-combined-logs] Recovering unflushed segment 0 (kafka.log.Log)
[2021-04-25 16:50:16,335] INFO [Log partition=@metadata-0, 
dir=C:\tmp\kraft-combined-logs] Loading producer state till offset 0 with 
message format version 2 (kafka.log.Log)
[2021-04-25 16:50:16,335] DEBUG Loaded index file 
C:\tmp\kraft-combined-logs\@metadata-0\.index with 
maxEntries = 1310720, maxIndexSize = 10485760, entries = 1310720, lastOffset = 
0, file position = 10485760 (kafka.log.OffsetIndex)
[2021-04-25 16:50:16,335] DEBUG Truncated index 
C:\tmp\kraft-combined-logs\@metadata-0\.index to 0 entries; 
position is now 0 and last offset is now 0 (kafka.log.OffsetIndex)
[2021-04-25 16:50:16,335] DEBUG Index 
C:\tmp\kraft-combined-logs\@metadata-0\.index was not 
resized because it already has size 10485760 (kafka.log.AbstractIndex)
[2021-04-25 16:50:16,351] DEBUG Loaded index file 
C:\tmp\kraft-combined-logs\@metadata-0\.timeindex with 
maxEntries = 873813, maxIndexSize = 10485760, entries = 873813, lastOffset = 
TimestampOffset(0,0), file position = 10485756 (kafka.log.TimeIndex)
[2021-04-25 16:50:16,351] DEBUG Truncated index 
C:\tmp\kraft-combined-logs\@metadata-0\.timeindex to 0 
entries; position is now 0 and last entry is now TimestampOffset(-1,0) 
(kafka.log.TimeIndex)
[2021-04-25 16:50:16,351] DEBUG Index 
C:\tmp\kraft-combined-logs\@metadata-0\.timeindex was not 
resized because it already has size 10485756 (kafka.log.AbstractIndex)
[2021-04-25 16:50:16,351] DEBUG Resized 
C:\tmp\kraft-combined-logs\@metadata-0\.index to 0, 
position is 0 and limit is 0 (kafka.log.AbstractIndex)
[2021-04-25 16:50:16,351] DEBUG Resized 
C:\tmp\kraft-combined-logs\@metadata-0\.timeindex to 0, 
position is 0 and limit is 0 (kafka.log.AbstractIndex)
[2021-04-25 16:50:16,351] DEBUG Resized 
C:\tmp\kraft-combined-logs\@metadata-0\.index to 10485760, 
position is 0 and limit is 10485760 (kafka.log.AbstractIndex)
[2021-04-25 16:50:16,351] DEBUG Resized 
C:\tmp\kraft-combined-logs\@metadata-0\.timeindex to 
10485756, position is 0 and limit is 10485756 (kafka.log.AbstractIndex)
[2021-04-25 16:50:16,367] INFO [Log partition=@metadata-0, 
dir=C:\tmp\kraft-combined-logs] Loading producer state till offset 0 with 
message format version 2 (kafka.log.Log)
[2021-04-25 16:50:16,373] DEBUG Scheduling task PeriodicProducerExpirationCheck 
with initial delay 2147483647 ms and period 2147483647 ms. 
(kafka.utils.KafkaScheduler)
[2021-04-25 16:50:16,420] INFO [raft-expiration-reaper]: Starting 
(kafka.raft.TimingWheelExpirationService$ExpiredOperationReaper)
[2021-04-25 16:50:16,590] ERROR Exiting Kafka due to fatal exception 
(kafka.Kafka$)
java.nio.file.FileSystemException: 
C:\tmp\kraft-combined-logs\@metadata-0\quorum-state.tmp -> 
C:\tmp\kraft-combined-logs\@metadata-0\quorum-state: The process cannot access 
the file because it is being used by another process.

at 
java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
 at 
java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
 at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:395)
 at 
java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:288)
 at java.base/java.nio.file.Files.move(Files.java:1421)
 at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:904)
 at 
org.apache.kafka.raft.FileBasedStateStore.writeElectionStateToFile(FileBasedStateStore.java:148)
 at 
org.apache.kafka.raft.FileBasedStateStore.writeElectionState(FileBasedStateStore.java:124)
 at org.apache.kafka.raft.QuorumState.transitionTo(QuorumState.java:449)
 at org.apache.kafka.raft.QuorumState.initialize(QuorumState.java:202)
 at org.apache.kafka.raft.KafkaRaftClient.initialize(KafkaRaftClient.java:351)
 at 

[GitHub] [kafka] tang7526 commented on a change in pull request #10588: KAFKA-12662: add unit test for ProducerPerformance

2021-04-25 Thread GitBox


tang7526 commented on a change in pull request #10588:
URL: https://github.com/apache/kafka/pull/10588#discussion_r619798137



##
File path: tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
##
@@ -190,6 +166,51 @@ public static void main(String[] args) throws Exception {
 
 }
 
+public KafkaProducer createKafkaProducer(Properties props) 
{
+return new KafkaProducer<>(props);
+}
+
+public static Properties readProps(List producerProps, String 
producerConfig, String transactionalId,
+boolean transactionsEnabled) throws IOException {
+Properties props = new Properties();
+if (producerConfig != null) {
+props.putAll(Utils.loadProps(producerConfig));
+}
+if (producerProps != null)
+for (String prop : producerProps) {
+String[] pieces = prop.split("=");
+if (pieces.length != 2)
+throw new IllegalArgumentException("Invalid property: " + 
prop);
+props.put(pieces[0], pieces[1]);
+}
+
+props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
+props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
+if (transactionsEnabled)
+props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
+return props;
+}
+
+public static List readPayloadFile(String payloadFilePath, String 
payloadDelimiter) throws IOException {
+List payloadByteList = new ArrayList<>();
+if (payloadFilePath != null) {
+Path path = Paths.get(payloadFilePath);
+System.out.println("Reading payloads from: " + 
path.toAbsolutePath());
+if (Files.notExists(path) || Files.size(path) == 0)  {
+throw new  IllegalArgumentException("File does not exist or 
empty file provided.");

Review comment:
   Thank you for your opinion. I have removed that space.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tang7526 commented on a change in pull request #10588: KAFKA-12662: add unit test for ProducerPerformance

2021-04-25 Thread GitBox


tang7526 commented on a change in pull request #10588:
URL: https://github.com/apache/kafka/pull/10588#discussion_r619798057



##
File path: build.gradle
##
@@ -1485,7 +1485,7 @@ project(':tools') {
 testImplementation libs.junitJupiter
 testImplementation project(':clients').sourceSets.test.output
 testImplementation libs.mockitoInline // supports mocking static methods, 
final classes, etc.
-
+testImplementation libs.easymock

Review comment:
   Thank you for your opinion. I have rewritten it with mockito.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10800) Validate the snapshot id when the state machine creates a snapshot

2021-04-25 Thread Haoran Xuan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17331498#comment-17331498
 ] 

Haoran Xuan commented on KAFKA-10800:
-

[~jagsancio] The PR [GitHub Pull Request 
#10593|https://github.com/apache/kafka/pull/10593] is ready for review now, 
thank you!

> Validate the snapshot id when the state machine creates a snapshot
> --
>
> Key: KAFKA-10800
> URL: https://issues.apache.org/jira/browse/KAFKA-10800
> Project: Kafka
>  Issue Type: Sub-task
>  Components: replication
>Reporter: Jose Armando Garcia Sancio
>Assignee: Haoran Xuan
>Priority: Major
>
> When the state machine attempts to create a snapshot writer we should 
> validate that the following is true:
>  # The end offset and epoch of the snapshot is less than the high-watermark.
>  # The end offset and epoch of the snapshot is valid based on the leader 
> epoch cache.
> Note that this validation should not be performed when the raft client 
> creates the snapshot writer because in that case the local log is out of date 
> and the follower should trust the snapshot id sent by the partition leader.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] feyman2016 commented on pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot

2021-04-25 Thread GitBox


feyman2016 commented on pull request #10593:
URL: https://github.com/apache/kafka/pull/10593#issuecomment-826304130


   @jsancio Could you please help to review? Thanks!
   Locally verified, all the failed tests should not be related. See failed 
tests in 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10593/2/testReport/?cloudbees-analytics-link=scm-reporting%2Ftests%2Ffailed
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on a change in pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot

2021-04-25 Thread GitBox


feyman2016 commented on a change in pull request #10593:
URL: https://github.com/apache/kafka/pull/10593#discussion_r619795184



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -2268,6 +2269,20 @@ private Long append(int epoch, List records, boolean 
isAtomic) {
 );
 }
 
+private void validateSnapshotId(OffsetAndEpoch snapshotId) {
+Optional highWatermarkOpt = 
quorum().highWatermark();
+if (!highWatermarkOpt.isPresent() || highWatermarkOpt.get().offset <= 
snapshotId.offset) {

Review comment:
   Conceptually, the `snapshotId.offset=highWatermarkOpt.get().offset` is 
ok, because the record at `snapshotId.offset` is not included in the snapshot, 
but I'm not sure if there are other restrictions because in the Jira 
description, it says: `The end offset and epoch of the snapshot is less than 
the high-watermark`, please kindly advice @jsancio 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on a change in pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot

2021-04-25 Thread GitBox


feyman2016 commented on a change in pull request #10593:
URL: https://github.com/apache/kafka/pull/10593#discussion_r619793903



##
File path: 
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
##
@@ -1335,6 +1313,51 @@ public void 
testFetchSnapshotRequestClusterIdValidation() throws Exception {
 
context.assertSentFetchSnapshotResponse(Errors.INCONSISTENT_CLUSTER_ID);
 }
 
+@Test
+public void testCreateSnapshotWithInvalidSnapshotId() throws Exception {
+int localId = 0;
+int otherNodeId = localId + 1;
+Set voters = Utils.mkSet(localId, otherNodeId);
+int epoch = 2;
+
+List appendRecords = Arrays.asList("a", "b", "c");
+OffsetAndEpoch invalidSnapshotId1 = new OffsetAndEpoch(3, epoch);
+
+RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+.appendToLog(epoch, appendRecords)
+.withAppendLingerMs(1)
+.build();
+
+context.becomeLeader();
+int currentEpoch = context.currentEpoch();
+
+// When creating snapshot:
+// 1. high watermark cannot be empty
+assertEquals(OptionalLong.empty(), context.client.highWatermark());
+assertThrows(KafkaException.class, () -> 
context.client.createSnapshot(invalidSnapshotId1));
+
+// 2. high watermark must larger than the snapshotId's endOffset
+advanceHighWatermark(context, currentEpoch, currentEpoch, otherNodeId, 
localId);
+assertNotEquals(OptionalLong.empty(), context.client.highWatermark());
+OffsetAndEpoch invalidSnapshotId2 = new 
OffsetAndEpoch(context.client.highWatermark().getAsLong(), currentEpoch);
+assertThrows(KafkaException.class, () -> 
context.client.createSnapshot(invalidSnapshotId2));
+
+// 3. the current leader epoch cache must larger than the snapshotId's 
epoch
+OffsetAndEpoch invalidSnapshotId3 = new 
OffsetAndEpoch(context.client.highWatermark().getAsLong() - 1, currentEpoch + 
1);
+assertThrows(KafkaException.class, () -> 
context.client.createSnapshot(invalidSnapshotId3));
+}
+
+private void advanceHighWatermark(RaftClientTestContext context,

Review comment:
   Extract the functionality to avoid duplicate




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on a change in pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot

2021-04-25 Thread GitBox


feyman2016 commented on a change in pull request #10593:
URL: https://github.com/apache/kafka/pull/10593#discussion_r619793844



##
File path: raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterTest.java
##
@@ -38,7 +38,7 @@
 
 @Test
 public void testWritingSnapshot() throws IOException {
-OffsetAndEpoch id = new OffsetAndEpoch(10L, 3);
+OffsetAndEpoch id = new OffsetAndEpoch(0L, 1);

Review comment:
   The highwatermark here is 1, so we need to make the snapshotId's 
endOffset < 1.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on a change in pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot

2021-04-25 Thread GitBox


feyman2016 commented on a change in pull request #10593:
URL: https://github.com/apache/kafka/pull/10593#discussion_r619793709



##
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java
##
@@ -96,7 +96,7 @@ public BatchBuilder(
 }
 
 /**
- * Append a record to this patch. The caller must first verify there is 
room for the batch
+ * Append a record to this batch. The caller must first verify there is 
room for the batch

Review comment:
   Side fix




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org