[GitHub] [kafka] feyman2016 commented on a change in pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot
feyman2016 commented on a change in pull request #10593: URL: https://github.com/apache/kafka/pull/10593#discussion_r619989241 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -2268,6 +2269,20 @@ private Long append(int epoch, List records, boolean isAtomic) { ); } +private void validateSnapshotId(OffsetAndEpoch snapshotId) { +Optional highWatermarkOpt = quorum().highWatermark(); +if (!highWatermarkOpt.isPresent() || highWatermarkOpt.get().offset <= snapshotId.offset) { Review comment: Thank you, I updated the PR as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] feyman2016 commented on a change in pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot
feyman2016 commented on a change in pull request #10593: URL: https://github.com/apache/kafka/pull/10593#discussion_r619988848 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -2268,6 +2269,20 @@ private Long append(int epoch, List records, boolean isAtomic) { ); } +private void validateSnapshotId(OffsetAndEpoch snapshotId) { +Optional highWatermarkOpt = quorum().highWatermark(); +if (!highWatermarkOpt.isPresent() || highWatermarkOpt.get().offset <= snapshotId.offset) { +throw new KafkaException("Trying to creating snapshot with snapshotId: " + snapshotId + " whose offset is larger than the high-watermark: " + +highWatermarkOpt + ". This may necessarily mean a bug in the caller, since the there should be a minimum " + +"size of records between the latest snapshot and the high-watermark when creating snapshot"); +} +int leaderEpoch = quorum().epoch(); +if (snapshotId.epoch > leaderEpoch) { +throw new KafkaException("Trying to creating snapshot with snapshotId: " + snapshotId + " whose epoch is" + +" larger than the current leader epoch: " + leaderEpoch); +} Review comment: Thanks, previous I thought the quorum epoch is the leader epoch cache as a mistake~ Updated the PR, in the jira, one thing I'm not sure about is that: > 2. The epoch of the snapshot is equal to the quorum epoch. I think the snapshotId's epoch <= quorum epoch should be fine? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] feyman2016 commented on a change in pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot
feyman2016 commented on a change in pull request #10593: URL: https://github.com/apache/kafka/pull/10593#discussion_r619988848 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -2268,6 +2269,20 @@ private Long append(int epoch, List records, boolean isAtomic) { ); } +private void validateSnapshotId(OffsetAndEpoch snapshotId) { +Optional highWatermarkOpt = quorum().highWatermark(); +if (!highWatermarkOpt.isPresent() || highWatermarkOpt.get().offset <= snapshotId.offset) { +throw new KafkaException("Trying to creating snapshot with snapshotId: " + snapshotId + " whose offset is larger than the high-watermark: " + +highWatermarkOpt + ". This may necessarily mean a bug in the caller, since the there should be a minimum " + +"size of records between the latest snapshot and the high-watermark when creating snapshot"); +} +int leaderEpoch = quorum().epoch(); +if (snapshotId.epoch > leaderEpoch) { +throw new KafkaException("Trying to creating snapshot with snapshotId: " + snapshotId + " whose epoch is" + +" larger than the current leader epoch: " + leaderEpoch); +} Review comment: Thanks, previous I thought the quorum epoch is the leader epoch cache as a mistake~ Updated the PR, one thing I'm not sure about is that: > The epoch of the snapshot is equal to the quorum epoch. I think the snapshotId's epoch <= quorum epoch should be fine? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-12535) Consider Revising Document Anchors for Properties
[ https://issues.apache.org/jira/browse/KAFKA-12535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Minoru Tomioka reassigned KAFKA-12535: -- Assignee: (was: Minoru Tomioka) > Consider Revising Document Anchors for Properties > - > > Key: KAFKA-12535 > URL: https://issues.apache.org/jira/browse/KAFKA-12535 > Project: Kafka > Issue Type: Improvement > Components: documentation >Affects Versions: 2.7.0 >Reporter: Gary Russell >Priority: Minor > > Anchors for ToC entries work fine: > https://kafka.apache.org/documentation/#producerconfigs > With the section title appearing below the "floating" banner. However, > anchors for properties, e.g. > https://kafka.apache.org/documentation/#producerconfigs_max.block.ms don't > render properly; the first part of the property description is "hidden" under > the floating banner. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] socutes opened a new pull request #10596: KAFKA-12715: ACL authentication, Host field support IP network segment
socutes opened a new pull request #10596: URL: https://github.com/apache/kafka/pull/10596 At present, ACL authentication, the Host field only supports equal matching of source IP, so we hope that the Host field can support matching of IP network segment. ### Committer Checklist (excluded from commit message) - [ ] Modify the Host matching logic of the AclAuthorizer's function matchingAclExists - [ ] Add IP segment validation test cases -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] socutes closed pull request #10594: Merge pull request #1 from apache/trunk
socutes closed pull request #10594: URL: https://github.com/apache/kafka/pull/10594 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #10548: KAFKA-12396 added a nullcheck before trying to retrieve a key
mjsax commented on pull request #10548: URL: https://github.com/apache/kafka/pull/10548#issuecomment-826417134 Seems there is some checkstyle error: ``` [2021-04-22T23:02:56.789Z] [ant:checkstyle] [ERROR] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-10548/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java:35:8: Unused import - java.util.Objects. [UnusedImports] ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10548: KAFKA-12396 added a nullcheck before trying to retrieve a key
mjsax commented on a change in pull request #10548: URL: https://github.com/apache/kafka/pull/10548#discussion_r619899204 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -234,13 +247,15 @@ public V delete(final K key) { @Override public , P> KeyValueIterator prefixScan(final P prefix, final PS prefixKeySerializer) { - +Objects.requireNonNull(prefix, "key cannot be null"); Review comment: KafkaStreams runtime _always_ "wraps" any store with a corresponding `MeteredXxxStore` (cf https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java#L42-L49) -- those `MeteredXxxStores` do the transaction from objects to bytes (ie they use the serdes) and also track state store metrics. (Note that stores provided to the runtime always have type `` while they are exposed to `Processors` as `` types.) Thus, when you call `context.stateStore(...)` you always get a `MeteredXxxStore` object -- of course, those details are hidden behind the interface type. This architecture allows us to unify code and separate concerns. In fact, it also allows us to add/remove more "layers": we can insert a "caching layer" (cf. https://kafka.apache.org/28/documentation/streams/developer-guide/memory-mgmt.html) and a "change logging layer" (both are inserted by default). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10548: KAFKA-12396 added a nullcheck before trying to retrieve a key
mjsax commented on a change in pull request #10548: URL: https://github.com/apache/kafka/pull/10548#discussion_r619899204 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -234,13 +247,15 @@ public V delete(final K key) { @Override public , P> KeyValueIterator prefixScan(final P prefix, final PS prefixKeySerializer) { - +Objects.requireNonNull(prefix, "key cannot be null"); Review comment: KafkaStreams runtime _always_ "wraps" any store with a corresponding `MeteredXxxStore` (cf https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java#L42-L49) -- those `MeteredXxxsStores` do the transaction from objects to bytes (ie they use the serdes) and also track state store metrics. (Note that stores provided to the runtime always have type `` while they are exposed to `Processors` as `` types.) This, when you call `context.stateStore(...)` you always get a `MeteredXxxStore` object -- of course, those details are hidden behind the interface type. This architecture allows us to unify code and separate concerns. In fact, it also allows us to add/remove more "layers": we can insert a "caching layer" (cf. https://kafka.apache.org/28/documentation/streams/developer-guide/memory-mgmt.html) and a "change logging layer" (both are inserted by default). ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -234,13 +247,15 @@ public V delete(final K key) { @Override public , P> KeyValueIterator prefixScan(final P prefix, final PS prefixKeySerializer) { - +Objects.requireNonNull(prefix, "key cannot be null"); Review comment: KafkaStreams runtime _always_ "wraps" any store with a corresponding `MeteredXxxStore` (cf https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java#L42-L49) -- those `MeteredXxxStores` do the transaction from objects to bytes (ie they use the serdes) and also track state store metrics. (Note that stores provided to the runtime always have type `` while they are exposed to `Processors` as `` types.) This, when you call `context.stateStore(...)` you always get a `MeteredXxxStore` object -- of course, those details are hidden behind the interface type. This architecture allows us to unify code and separate concerns. In fact, it also allows us to add/remove more "layers": we can insert a "caching layer" (cf. https://kafka.apache.org/28/documentation/streams/developer-guide/memory-mgmt.html) and a "change logging layer" (both are inserted by default). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10548: KAFKA-12396 added a nullcheck before trying to retrieve a key
mjsax commented on a change in pull request #10548: URL: https://github.com/apache/kafka/pull/10548#discussion_r619899204 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -234,13 +247,15 @@ public V delete(final K key) { @Override public , P> KeyValueIterator prefixScan(final P prefix, final PS prefixKeySerializer) { - +Objects.requireNonNull(prefix, "key cannot be null"); Review comment: KafkaStreams runtime _always_ "wraps" any store with a corresponding `MeteredXxxStore` (cf https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java#L42-L49) -- those `MeteredXxxsStores` do the transaction from objects to bytes (ie they use the serdes) and also track state store metrics. (Note that stores provided to the runtime always have type `` while they are exposed to `Processors` as `` types.) This, when you call `context.stateStore(...)` you always get a `MeteredXxxStore` object -- of course, those details are hidden behind the interface type. This architecture allows us to unify code and separate concerns. In fact, it also allows us to add/remove more "layers": we can insert a "caching layer" (cf. https://kafka.apache.org/28/documentation/streams/developer-guide/memory-mgmt.html) and a changelogging layer (both are inserted by default). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10548: KAFKA-12396 added a nullcheck before trying to retrieve a key
mjsax commented on a change in pull request #10548: URL: https://github.com/apache/kafka/pull/10548#discussion_r619899204 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -234,13 +247,15 @@ public V delete(final K key) { @Override public , P> KeyValueIterator prefixScan(final P prefix, final PS prefixKeySerializer) { - +Objects.requireNonNull(prefix, "key cannot be null"); Review comment: KafkaStreams runtime _always_ "wraps" any store with a corresponding `MeteredXxxStore` -- those `MeteredXxxsStores` do the transaction from objects to bytes (ie they use the serdes) and also track state store metrics. (Note that stores provided to the runtime always have type `` while they are exposed to `Processors` as `` types.) This, when you call `context.stateStore(...)` you always get a `MeteredXxxStore` object -- of course, those details are hidden behind the interface type. This architecture allows us to unify code and separate concerns. In fact, it also allows us to add/remove more "layers": we can insert a "caching layer" (cf. https://kafka.apache.org/28/documentation/streams/developer-guide/memory-mgmt.html) and a changelogging layer (both are inserted by default). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-12713) Report "REAL" broker/consumer fetch latency
[ https://issues.apache.org/jira/browse/KAFKA-12713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17331590#comment-17331590 ] Ming Liu edited comment on KAFKA-12713 at 4/25/21, 11:16 PM: - The idea is: 0. Add waitTimeMs in Request() 1. In delayedOperation DelayedFetch class, add some code to track the actual wait time. 2. In processResponseCallback() of handleFetchRequest, we can add additional parameter of waitTimeMs invoked from DelayedFetch. It will set request.waitTimeMs. 3. In updateRequestMetrics() function, if waitTimeMs is not zero, we will deduct that out of RemoteTime and TotalTime. Let me know for any suggestion/feedback. I like to propose a KIP on that change. was (Author: mingaliu): The idea I am trying right now is: 1. Add waitTimeMS in FetchResponse. 2. If the fetch has to wait in purgatory due to either replica.fetch.wait.max.ms or fetch.min.bytes, then it will fill the waitTimeMS in FetchResponse. 3. In updateRequestMetrics() function, we will special-process the Fetch response, and remove the waitTimeMS out of RemoteTime and TotalTime. Let me know for any suggestion/feedback. I like to propose a KIP on that change. > Report "REAL" broker/consumer fetch latency > --- > > Key: KAFKA-12713 > URL: https://issues.apache.org/jira/browse/KAFKA-12713 > Project: Kafka > Issue Type: Bug >Reporter: Ming Liu >Priority: Major > > The fetch latency is an important metrics to monitor for the cluster > performance. With ACK=ALL, the produce latency is affected primarily by > broker fetch latency. > However, currently the reported fetch latency didn't reflect the true fetch > latency because it sometimes need to stay in purgatory and wait for > replica.fetch.wait.max.ms when data is not available. This greatly affect the > real P50, P99 etc. > I like to propose a KIP to be able track the real fetch latency for both > broker follower and consumer. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma commented on pull request #10466: KAFKA-12417 "streams" module: switch deprecated Gradle configuration `testRuntime`
ijuma commented on pull request #10466: URL: https://github.com/apache/kafka/pull/10466#issuecomment-826400303 @dejan2609 can you please rebase on trunk? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #10466: KAFKA-12417 "streams" module: switch deprecated Gradle configuration `testRuntime`
ijuma commented on a change in pull request #10466: URL: https://github.com/apache/kafka/pull/10466#discussion_r619889058 ## File path: build.gradle ## @@ -1491,13 +1491,14 @@ project(':streams') { } tasks.create(name: "copyDependantLibs", type: Copy) { -from (configurations.testRuntime) { Review comment: @vvcephei do you know why we're copying test dependencies here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #10466: KAFKA-12417 "streams" module: switch deprecated Gradle configuration `testRuntime`
ijuma commented on a change in pull request #10466: URL: https://github.com/apache/kafka/pull/10466#discussion_r619888968 ## File path: build.gradle ## @@ -1491,13 +1491,14 @@ project(':streams') { } tasks.create(name: "copyDependantLibs", type: Copy) { -from (configurations.testRuntime) { - include('slf4j-log4j12*') - include('log4j*jar') - include('*hamcrest*') +from (configurations.testCompileClasspath) { + include('jackson*') + include('slf4j-api*') } from (configurations.runtimeClasspath) { - exclude('kafka-clients*') + include('connect*') + include('*java*') + include('*jni*') Review comment: Why did we switch from exclude to include? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot
jsancio commented on a change in pull request #10593: URL: https://github.com/apache/kafka/pull/10593#discussion_r619882204 ## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java ## @@ -1335,6 +1313,51 @@ public void testFetchSnapshotRequestClusterIdValidation() throws Exception { context.assertSentFetchSnapshotResponse(Errors.INCONSISTENT_CLUSTER_ID); } +@Test +public void testCreateSnapshotWithInvalidSnapshotId() throws Exception { +int localId = 0; +int otherNodeId = localId + 1; +Set voters = Utils.mkSet(localId, otherNodeId); +int epoch = 2; + +List appendRecords = Arrays.asList("a", "b", "c"); +OffsetAndEpoch invalidSnapshotId1 = new OffsetAndEpoch(3, epoch); + +RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) +.appendToLog(epoch, appendRecords) +.withAppendLingerMs(1) +.build(); + +context.becomeLeader(); +int currentEpoch = context.currentEpoch(); + +// When creating snapshot: +// 1. high watermark cannot be empty +assertEquals(OptionalLong.empty(), context.client.highWatermark()); +assertThrows(KafkaException.class, () -> context.client.createSnapshot(invalidSnapshotId1)); + +// 2. high watermark must larger than the snapshotId's endOffset +advanceHighWatermark(context, currentEpoch, currentEpoch, otherNodeId, localId); +assertNotEquals(OptionalLong.empty(), context.client.highWatermark()); +OffsetAndEpoch invalidSnapshotId2 = new OffsetAndEpoch(context.client.highWatermark().getAsLong(), currentEpoch); +assertThrows(KafkaException.class, () -> context.client.createSnapshot(invalidSnapshotId2)); + +// 3. the current leader epoch cache must larger than the snapshotId's epoch +OffsetAndEpoch invalidSnapshotId3 = new OffsetAndEpoch(context.client.highWatermark().getAsLong() - 1, currentEpoch + 1); +assertThrows(KafkaException.class, () -> context.client.createSnapshot(invalidSnapshotId3)); +} + +private void advanceHighWatermark(RaftClientTestContext context, Review comment: Thanks for cleaning up the code duplication. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot
jsancio commented on a change in pull request #10593: URL: https://github.com/apache/kafka/pull/10593#discussion_r619882084 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -2268,6 +2269,20 @@ private Long append(int epoch, List records, boolean isAtomic) { ); } +private void validateSnapshotId(OffsetAndEpoch snapshotId) { +Optional highWatermarkOpt = quorum().highWatermark(); +if (!highWatermarkOpt.isPresent() || highWatermarkOpt.get().offset <= snapshotId.offset) { Review comment: You are correct. I think when I created the Jira I overlooked that both snapshot id's end offset and the high-watermark are exclusive values. Update the Jira's description. ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -2268,6 +2269,20 @@ private Long append(int epoch, List records, boolean isAtomic) { ); } +private void validateSnapshotId(OffsetAndEpoch snapshotId) { +Optional highWatermarkOpt = quorum().highWatermark(); +if (!highWatermarkOpt.isPresent() || highWatermarkOpt.get().offset <= snapshotId.offset) { +throw new KafkaException("Trying to creating snapshot with snapshotId: " + snapshotId + " whose offset is larger than the high-watermark: " + +highWatermarkOpt + ". This may necessarily mean a bug in the caller, since the there should be a minimum " + +"size of records between the latest snapshot and the high-watermark when creating snapshot"); +} +int leaderEpoch = quorum().epoch(); +if (snapshotId.epoch > leaderEpoch) { +throw new KafkaException("Trying to creating snapshot with snapshotId: " + snapshotId + " whose epoch is" + +" larger than the current leader epoch: " + leaderEpoch); +} Review comment: From the Jira: > The end offset and epoch of the snapshot is valid based on the leader epoch cache. How about also validating against the leader epoch cache? See https://github.com/apache/kafka/blob/trunk/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java#L124. This is important because both the snapshot and the leader epoch cache are used to validate offsets. See https://github.com/apache/kafka/blob/trunk/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java#L85. The term leader epoch cache comes the variable name `leaderEpochCache` used in `kafka.log.Log`. ## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java ## @@ -1335,6 +1313,51 @@ public void testFetchSnapshotRequestClusterIdValidation() throws Exception { context.assertSentFetchSnapshotResponse(Errors.INCONSISTENT_CLUSTER_ID); } +@Test +public void testCreateSnapshotWithInvalidSnapshotId() throws Exception { +int localId = 0; +int otherNodeId = localId + 1; +Set voters = Utils.mkSet(localId, otherNodeId); +int epoch = 2; + +List appendRecords = Arrays.asList("a", "b", "c"); +OffsetAndEpoch invalidSnapshotId1 = new OffsetAndEpoch(3, epoch); + +RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) +.appendToLog(epoch, appendRecords) +.withAppendLingerMs(1) +.build(); + +context.becomeLeader(); +int currentEpoch = context.currentEpoch(); + +// When creating snapshot: +// 1. high watermark cannot be empty +assertEquals(OptionalLong.empty(), context.client.highWatermark()); +assertThrows(KafkaException.class, () -> context.client.createSnapshot(invalidSnapshotId1)); + +// 2. high watermark must larger than the snapshotId's endOffset +advanceHighWatermark(context, currentEpoch, currentEpoch, otherNodeId, localId); +assertNotEquals(OptionalLong.empty(), context.client.highWatermark()); +OffsetAndEpoch invalidSnapshotId2 = new OffsetAndEpoch(context.client.highWatermark().getAsLong(), currentEpoch); +assertThrows(KafkaException.class, () -> context.client.createSnapshot(invalidSnapshotId2)); + +// 3. the current leader epoch cache must larger than the snapshotId's epoch +OffsetAndEpoch invalidSnapshotId3 = new OffsetAndEpoch(context.client.highWatermark().getAsLong() - 1, currentEpoch + 1); +assertThrows(KafkaException.class, () -> context.client.createSnapshot(invalidSnapshotId3)); +} + +private void advanceHighWatermark(RaftClientTestContext context, Review comment: Thanks for clean up the code duplication. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10800) Validate the snapshot id when the state machine creates a snapshot
[ https://issues.apache.org/jira/browse/KAFKA-10800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Armando Garcia Sancio updated KAFKA-10800: --- Description: When the state machine attempts to create a snapshot writer we should validate that the following is true: # The end offset of the snapshot is less than or equal to the high-watermark. # The epoch of the snapshot is equal to the quorum epoch. # The end offset and epoch of the snapshot is valid based on the leader epoch cache. Note that this validation should not be performed when the raft client creates the snapshot writer because in that case the local log is out of date and the follower should trust the snapshot id sent by the partition leader. was: When the state machine attempts to create a snapshot writer we should validate that the following is true: # The end offset and epoch of the snapshot is less than the high-watermark. # The end offset and epoch of the snapshot is valid based on the leader epoch cache. Note that this validation should not be performed when the raft client creates the snapshot writer because in that case the local log is out of date and the follower should trust the snapshot id sent by the partition leader. > Validate the snapshot id when the state machine creates a snapshot > -- > > Key: KAFKA-10800 > URL: https://issues.apache.org/jira/browse/KAFKA-10800 > Project: Kafka > Issue Type: Sub-task > Components: replication >Reporter: Jose Armando Garcia Sancio >Assignee: Haoran Xuan >Priority: Major > > When the state machine attempts to create a snapshot writer we should > validate that the following is true: > # The end offset of the snapshot is less than or equal to the high-watermark. > # The epoch of the snapshot is equal to the quorum epoch. > # The end offset and epoch of the snapshot is valid based on the leader > epoch cache. > Note that this validation should not be performed when the raft client > creates the snapshot writer because in that case the local log is out of date > and the follower should trust the snapshot id sent by the partition leader. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on a change in pull request #10573: KAFKA-12574: KIP-732, Deprecate eos-alpha and replace eos-beta with eos-v2
mjsax commented on a change in pull request #10573: URL: https://github.com/apache/kafka/pull/10573#discussion_r619878083 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java ## @@ -126,10 +129,10 @@ public void setup() { clientSupplier.setCluster(cluster); streamsProducer = new StreamsProducer( config, -"threadId", +processId + "-StreamThread-1", Review comment: This PR does not change `StreamsProducer` so this parsing should have happened for `_beta` already -- what do I miss? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10573: KAFKA-12574: KIP-732, Deprecate eos-alpha and replace eos-beta with eos-v2
mjsax commented on a change in pull request #10573: URL: https://github.com/apache/kafka/pull/10573#discussion_r619877817 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java ## @@ -111,6 +113,7 @@ private final StringSerializer stringSerializer = new StringSerializer(); private final ByteArraySerializer byteArraySerializer = new ByteArraySerializer(); +private final UUID processId = UUID.randomUUID(); Review comment: > only for eos-v2 for some reason That is weird -- if `StreamsProducer` requires the `processID` it should have required it for `_beta` already? Would be good to understand -- maybe we unmasked a bug? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10573: KAFKA-12574: KIP-732, Deprecate eos-alpha and replace eos-beta with eos-v2
mjsax commented on a change in pull request #10573: URL: https://github.com/apache/kafka/pull/10573#discussion_r619877425 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -603,7 +606,7 @@ boolean runLoop() { log.error("Shutting down because the Kafka cluster seems to be on a too old version. " + "Setting {}=\"{}\" requires broker version 2.5 or higher.", StreamsConfig.PROCESSING_GUARANTEE_CONFIG, - EXACTLY_ONCE_BETA); + StreamsConfig.EXACTLY_ONCE_V2); Review comment: SGTM -- if the required code changes to get the actual value are to much, I am fine with hard-coding the value, too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10573: KAFKA-12574: KIP-732, Deprecate eos-alpha and replace eos-beta with eos-v2
mjsax commented on a change in pull request #10573: URL: https://github.com/apache/kafka/pull/10573#discussion_r619876369 ## File path: docs/streams/upgrade-guide.html ## @@ -93,6 +95,12 @@ Upgrade Guide and API Changes Streams API changes in 3.0.0 + + The StreamsConfig.EXACTLY_ONCE and StreamsConfig.EXACTLY_ONCE_BETA configs have been deprecated, and a new StreamsConfig.EXACTLY_ONCE_V2 config has been + introduced. This is the same feature as eos-beta, but renamed to highlight its production-readiness. Users of exactly-once semantics should plan to migrate to the eos-v2 config and prepare for the removal of the deprecated configs in 4.0 or after at least a year + from the release of 3.0, whichever comes last. Note that eos-v2 requires broker version 2.5 or higher, like eos-beta, so users should begin to upgrade their kafka cluster if necessary. See Review comment: Fair enough. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10573: KAFKA-12574: KIP-732, Deprecate eos-alpha and replace eos-beta with eos-v2
mjsax commented on a change in pull request #10573: URL: https://github.com/apache/kafka/pull/10573#discussion_r619876217 ## File path: docs/streams/upgrade-guide.html ## @@ -93,6 +95,12 @@ Upgrade Guide and API Changes Streams API changes in 3.0.0 + + The StreamsConfig.EXACTLY_ONCE and StreamsConfig.EXACTLY_ONCE_BETA configs have been deprecated, and a new StreamsConfig.EXACTLY_ONCE_V2 config has been Review comment: Well, we do deprecate `StreamsConfig.EXACTLY_ONCE`, too, but user might just do `config.put("processing.guarantee", "exactly_once");` (or have a config file with `"exactly_once"`) in it. To me, the main change is that the config itself is deprecated and the deprecation of variable is just a "side effect". -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10573: KAFKA-12574: KIP-732, Deprecate eos-alpha and replace eos-beta with eos-v2
mjsax commented on a change in pull request #10573: URL: https://github.com/apache/kafka/pull/10573#discussion_r619875764 ## File path: docs/streams/upgrade-guide.html ## @@ -93,6 +95,12 @@ Upgrade Guide and API Changes Streams API changes in 3.0.0 + + The StreamsConfig.EXACTLY_ONCE and StreamsConfig.EXACTLY_ONCE_BETA configs have been deprecated, and a new StreamsConfig.EXACTLY_ONCE_V2 config has been + introduced. This is the same feature as eos-beta, but renamed to highlight its production-readiness. Users of exactly-once semantics should plan to migrate to the eos-v2 config and prepare for the removal of the deprecated configs in 4.0 or after at least a year Review comment: I would be very bolt about it: ``` We deprecated processing.guarantee configuration value "exactly_once" (for EOS version 1) in favor of the improved EOS version 2, formerly configured via "exactly_once_beta. To avoid the confusion about the term "beta" in the config value (it was never meant to imply it's not production ready), we furthermore renamed "exactly_once_beta" to "exactly_once_v2". ``` Or something similar. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12453) Guidance on whether a topology is eligible for optimisation
[ https://issues.apache.org/jira/browse/KAFKA-12453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17331635#comment-17331635 ] Matthias J. Sax commented on KAFKA-12453: - Thanks for asking. In general 3rd parties hold the copyright. But as a Confluent employee, I can tell you that you don't need to worry about it for this case – take whatever is helpful. For testing, check out [https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes] and [https://cwiki.apache.org/confluence/display/KAFKA/Setup+Kafka+Website+on+Local+Apache+Server] . > Guidance on whether a topology is eligible for optimisation > --- > > Key: KAFKA-12453 > URL: https://issues.apache.org/jira/browse/KAFKA-12453 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Patrick O'Keeffe >Priority: Major > > Since the introduction of KStream.toTable() in Kafka 2.6.x, the decision > about whether a topology is eligible for optimisation is no longer a simple > one, and is related to whether toTable() operations are preceded by key > changing operators. > This decision requires expert level knowledge, and there are serious > implications associated with getting it wrong in terms of fault tolerance > Some ideas spring to mind around how to guide developers to make the correct > decision: > # Topology.describe() could indicate whether this topology is eligible for > optimisation > # Topologies could be automatically optimised - note this may have an impact > at deployment time, in that an application reset may be required. The > developer would need to made aware of this and adjust the deployment plan > accordingly > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dejan2609 commented on pull request #10466: KAFKA-12417 "streams" module: switch deprecated Gradle configuration `testRuntime`
dejan2609 commented on pull request #10466: URL: https://github.com/apache/kafka/pull/10466#issuecomment-826381116 @ijuma So, now I have something to show: I managed perform some reverse-engineering and received exactly the same content in `/streams/build/dependent-libs-${version.scala}` folder (that folder is being filled with dependencies by a gradle task `streams:copyDependantLibs`). _Mental note: it would be interesting to fully discover this task background and usage._ Also: Gradle version is bumped (from **6.8.3** to **7.0**). Pitfall: `./gradlew clean streams:copyDependantLibs` execution times are significantly higher when comparing to a trunk... will try to sort that out, if possible. |git branch | Gradle 6.8.3 | Gradle 7.0 | | ---| --: | --: | | trunk | 16 seconds | **_not applicable_** | | this PR | 1 minute 55 seconds | 2 minutes 10 seconds | -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wenbingshen opened a new pull request #10595: MINOR: Add some metrics names
wenbingshen opened a new pull request #10595: URL: https://github.com/apache/kafka/pull/10595 As the title. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-12713) Report "REAL" broker/consumer fetch latency
[ https://issues.apache.org/jira/browse/KAFKA-12713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17331590#comment-17331590 ] Ming Liu edited comment on KAFKA-12713 at 4/25/21, 5:33 PM: The idea I am trying right now is: 1. Add waitTimeMS in FetchResponse. 2. If the fetch has to wait in purgatory due to either replica.fetch.wait.max.ms or fetch.min.bytes, then it will fill the waitTimeMS in FetchResponse. 3. In updateRequestMetrics() function, we will special-process the Fetch response, and remove the waitTimeMS out of RemoteTime and TotalTime. Let me know for any suggestion/feedback. I like to propose a KIP on that change. was (Author: mingaliu): The idea is like this, we can: 1. Add waitTimeMS in FetchResponse. 2. If the fetch has to wait in purgatory due to either replica.fetch.wait.max.ms or fetch.min.bytes, then it will fill the waitTimeMS in FetchResponse. 3. In updateRequestMetrics() function, we will special-process the Fetch response, and remove the waitTimeMS out of RemoteTime and TotalTime. > Report "REAL" broker/consumer fetch latency > --- > > Key: KAFKA-12713 > URL: https://issues.apache.org/jira/browse/KAFKA-12713 > Project: Kafka > Issue Type: Bug >Reporter: Ming Liu >Priority: Major > > The fetch latency is an important metrics to monitor for the cluster > performance. With ACK=ALL, the produce latency is affected primarily by > broker fetch latency. > However, currently the reported fetch latency didn't reflect the true fetch > latency because it sometimes need to stay in purgatory and wait for > replica.fetch.wait.max.ms when data is not available. This greatly affect the > real P50, P99 etc. > I like to propose a KIP to be able track the real fetch latency for both > broker follower and consumer. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12713) Report "REAL" broker/consumer fetch latency
[ https://issues.apache.org/jira/browse/KAFKA-12713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17331590#comment-17331590 ] Ming Liu commented on KAFKA-12713: -- The idea is like this, we can: 1. Add waitTimeMS in FetchResponse. 2. If the fetch has to wait in purgatory due to either replica.fetch.wait.max.ms or fetch.min.bytes, then it will fill the waitTimeMS in FetchResponse. 3. In updateRequestMetrics() function, we will special-process the Fetch response, and remove the waitTimeMS out of RemoteTime and TotalTime. > Report "REAL" broker/consumer fetch latency > --- > > Key: KAFKA-12713 > URL: https://issues.apache.org/jira/browse/KAFKA-12713 > Project: Kafka > Issue Type: Bug >Reporter: Ming Liu >Priority: Major > > The fetch latency is an important metrics to monitor for the cluster > performance. With ACK=ALL, the produce latency is affected primarily by > broker fetch latency. > However, currently the reported fetch latency didn't reflect the true fetch > latency because it sometimes need to stay in purgatory and wait for > replica.fetch.wait.max.ms when data is not available. This greatly affect the > real P50, P99 etc. > I like to propose a KIP to be able track the real fetch latency for both > broker follower and consumer. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] socutes opened a new pull request #10594: Merge pull request #1 from apache/trunk
socutes opened a new pull request #10594: URL: https://github.com/apache/kafka/pull/10594 merge *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12715) ACL authentication, Host field support IP network segment
[ https://issues.apache.org/jira/browse/KAFKA-12715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17331526#comment-17331526 ] xu lobo commented on KAFKA-12715: - I could add this feature and would like to hear from the community。 > ACL authentication, Host field support IP network segment > - > > Key: KAFKA-12715 > URL: https://issues.apache.org/jira/browse/KAFKA-12715 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: xu lobo >Priority: Major > Attachments: image-2021-04-25-21-46-48-859.png > > > At present, ACL authentication, the Host field only supports equal matching > of source IP, so we hope that the Host field can support matching of IP > network segment. > !image-2021-04-25-21-46-48-859.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12715) ACL authentication, Host field support IP network segment
xu lobo created KAFKA-12715: --- Summary: ACL authentication, Host field support IP network segment Key: KAFKA-12715 URL: https://issues.apache.org/jira/browse/KAFKA-12715 Project: Kafka Issue Type: Improvement Components: core Reporter: xu lobo Attachments: image-2021-04-25-21-46-48-859.png At present, ACL authentication, the Host field only supports equal matching of source IP, so we hope that the Host field can support matching of IP network segment. !image-2021-04-25-21-46-48-859.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12702) Unhandled exception caught in InterBrokerSendThread
[ https://issues.apache.org/jira/browse/KAFKA-12702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-12702. Fix Version/s: 2.8.1 3.0.0 Resolution: Fixed > Unhandled exception caught in InterBrokerSendThread > --- > > Key: KAFKA-12702 > URL: https://issues.apache.org/jira/browse/KAFKA-12702 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.0 >Reporter: Wenbing Shen >Assignee: Wenbing Shen >Priority: Blocker > Fix For: 3.0.0, 2.8.1 > > Attachments: afterFixing.png, beforeFixing.png, > image-2021-04-21-17-12-28-471.png > > > In kraft mode, if listeners and advertised.listeners are not configured with > host addresses, the host parameter value of Listener in > BrokerRegistrationRequestData will be null. When the broker is started, a > null pointer exception will be thrown, causing startup failure. > A feasible solution is to replace the empty host of endPoint in > advertisedListeners with InetAddress.getLocalHost.getCanonicalHostName in > Broker Server when building networkListeners. > The following is the debug log: > before fixing: > [2021-04-21 14:15:20,032] DEBUG (broker-2-to-controller-send-thread > org.apache.kafka.clients.NetworkClient 522) [broker-2-to-controller] Sending > BROKER_REGISTRATION request with header RequestHeader(apiKey=BROKER_REGIS > TRATION, apiVersion=0, clientId=2, correlationId=6) and timeout 3 to node > 2: BrokerRegistrationRequestData(brokerId=2, > clusterId='nCqve6D1TEef3NpQniA0Mg', incarnationId=X8w4_1DFT2yUjOm6asPjIQ, > listeners=[Listener(n > ame='PLAINTEXT', {color:#FF}host=null,{color} port=9092, > securityProtocol=0)], features=[], rack=null) > [2021-04-21 14:15:20,033] ERROR (broker-2-to-controller-send-thread > kafka.server.BrokerToControllerRequestThread 76) > [broker-2-to-controller-send-thread]: unhandled exception caught in > InterBrokerSendThread > java.lang.NullPointerException > at > org.apache.kafka.common.message.BrokerRegistrationRequestData$Listener.addSize(BrokerRegistrationRequestData.java:515) > at > org.apache.kafka.common.message.BrokerRegistrationRequestData.addSize(BrokerRegistrationRequestData.java:216) > at > org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218) > at > org.apache.kafka.common.protocol.SendBuilder.buildRequestSend(SendBuilder.java:187) > at > org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:101) > at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:525) > at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:501) > at org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:461) > at > kafka.common.InterBrokerSendThread.$anonfun$sendRequests$1(InterBrokerSendThread.scala:104) > at > kafka.common.InterBrokerSendThread.$anonfun$sendRequests$1$adapted(InterBrokerSendThread.scala:99) > at kafka.common.InterBrokerSendThread$$Lambda$259/910445654.apply(Unknown > Source) > at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563) > at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561) > at scala.collection.AbstractIterable.foreach(Iterable.scala:919) > at > kafka.common.InterBrokerSendThread.sendRequests(InterBrokerSendThread.scala:99) > at > kafka.common.InterBrokerSendThread.pollOnce(InterBrokerSendThread.scala:73) > at > kafka.server.BrokerToControllerRequestThread.doWork(BrokerToControllerChannelManager.scala:368) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) > [2021-04-21 14:15:20,034] INFO (broker-2-to-controller-send-thread > kafka.server.BrokerToControllerRequestThread 66) > [broker-2-to-controller-send-thread]: Stopped > after fixing: > [2021-04-21 15:05:01,095] DEBUG (BrokerToControllerChannelManager broker=2 > name=heartbeat org.apache.kafka.clients.NetworkClient 512) > [BrokerToControllerChannelManager broker=2 name=heartbeat] Sending > BROKER_REGISTRATI > ON request with header RequestHeader(apiKey=BROKER_REGISTRATION, > apiVersion=0, clientId=2, correlationId=0) and timeout 3 to node 2: > BrokerRegistrationRequestData(brokerId=2, clusterId='nCqve6D1TEef3NpQniA0Mg', > inc > arnationId=xF29h_IRR1KzrERWwssQ2w, listeners=[Listener(name='PLAINTEXT', > host='hdpxxx.cn', port=9092, securityProtocol=0)], features=[], rack=null) > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on pull request #10586: KAFKA-12702: Fix NPE in networkListeners from BrokerServer in 2.8
chia7712 commented on pull request #10586: URL: https://github.com/apache/kafka/pull/10586#issuecomment-826324896 @wenbingshen thanks for this backport! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 merged pull request #10586: KAFKA-12702: Fix NPE in networkListeners from BrokerServer in 2.8
chia7712 merged pull request #10586: URL: https://github.com/apache/kafka/pull/10586 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 merged pull request #10558: KAFKA-12684: Fix noop set is incorrectly replaced with succeeded set from LeaderElectionCommand
chia7712 merged pull request #10558: URL: https://github.com/apache/kafka/pull/10558 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12684) The valid partition list is incorrectly replaced by the successfully elected partition list
[ https://issues.apache.org/jira/browse/KAFKA-12684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-12684. Resolution: Fixed > The valid partition list is incorrectly replaced by the successfully elected > partition list > --- > > Key: KAFKA-12684 > URL: https://issues.apache.org/jira/browse/KAFKA-12684 > Project: Kafka > Issue Type: Bug > Components: tools >Affects Versions: 2.6.0, 2.7.0 >Reporter: Wenbing Shen >Assignee: Wenbing Shen >Priority: Minor > Fix For: 3.0.0 > > Attachments: election-preferred-leader.png, non-preferred-leader.png > > > When using the kafka-election-tool for preferred replica election, if there > are partitions in the elected list that are in the preferred replica, the > list of partitions already in the preferred replica will be replaced by the > successfully elected partition list. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on pull request #9627: KAFKA-10746: Change to Warn logs when necessary to notify users
chia7712 commented on pull request #9627: URL: https://github.com/apache/kafka/pull/9627#issuecomment-826320025 @showuon thanks for this patch! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 merged pull request #9627: KAFKA-10746: Change to Warn logs when necessary to notify users
chia7712 merged pull request #9627: URL: https://github.com/apache/kafka/pull/9627 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12714) Kafka 2.8 server not starting on Windows OS
Rajni Jain created KAFKA-12714: -- Summary: Kafka 2.8 server not starting on Windows OS Key: KAFKA-12714 URL: https://issues.apache.org/jira/browse/KAFKA-12714 Project: Kafka Issue Type: Bug Components: core Affects Versions: 2.8.0 Reporter: Rajni Jain On Windows OS , Server is not getting started because of below error. Ubuntu , everything is working as expected. kafka-server-start.bat config\kraft\server.properties [2021-04-25 16:50:15,831] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$) [2021-04-25 16:50:16,151] INFO Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation (org.apache.zookeeper.common.X509Util) [2021-04-25 16:50:16,254] DEBUG Initializing task scheduler. (kafka.utils.KafkaScheduler) [2021-04-25 16:50:16,320] INFO [Log partition=@metadata-0, dir=C:\tmp\kraft-combined-logs] Recovering unflushed segment 0 (kafka.log.Log) [2021-04-25 16:50:16,335] INFO [Log partition=@metadata-0, dir=C:\tmp\kraft-combined-logs] Loading producer state till offset 0 with message format version 2 (kafka.log.Log) [2021-04-25 16:50:16,335] DEBUG Loaded index file C:\tmp\kraft-combined-logs\@metadata-0\.index with maxEntries = 1310720, maxIndexSize = 10485760, entries = 1310720, lastOffset = 0, file position = 10485760 (kafka.log.OffsetIndex) [2021-04-25 16:50:16,335] DEBUG Truncated index C:\tmp\kraft-combined-logs\@metadata-0\.index to 0 entries; position is now 0 and last offset is now 0 (kafka.log.OffsetIndex) [2021-04-25 16:50:16,335] DEBUG Index C:\tmp\kraft-combined-logs\@metadata-0\.index was not resized because it already has size 10485760 (kafka.log.AbstractIndex) [2021-04-25 16:50:16,351] DEBUG Loaded index file C:\tmp\kraft-combined-logs\@metadata-0\.timeindex with maxEntries = 873813, maxIndexSize = 10485760, entries = 873813, lastOffset = TimestampOffset(0,0), file position = 10485756 (kafka.log.TimeIndex) [2021-04-25 16:50:16,351] DEBUG Truncated index C:\tmp\kraft-combined-logs\@metadata-0\.timeindex to 0 entries; position is now 0 and last entry is now TimestampOffset(-1,0) (kafka.log.TimeIndex) [2021-04-25 16:50:16,351] DEBUG Index C:\tmp\kraft-combined-logs\@metadata-0\.timeindex was not resized because it already has size 10485756 (kafka.log.AbstractIndex) [2021-04-25 16:50:16,351] DEBUG Resized C:\tmp\kraft-combined-logs\@metadata-0\.index to 0, position is 0 and limit is 0 (kafka.log.AbstractIndex) [2021-04-25 16:50:16,351] DEBUG Resized C:\tmp\kraft-combined-logs\@metadata-0\.timeindex to 0, position is 0 and limit is 0 (kafka.log.AbstractIndex) [2021-04-25 16:50:16,351] DEBUG Resized C:\tmp\kraft-combined-logs\@metadata-0\.index to 10485760, position is 0 and limit is 10485760 (kafka.log.AbstractIndex) [2021-04-25 16:50:16,351] DEBUG Resized C:\tmp\kraft-combined-logs\@metadata-0\.timeindex to 10485756, position is 0 and limit is 10485756 (kafka.log.AbstractIndex) [2021-04-25 16:50:16,367] INFO [Log partition=@metadata-0, dir=C:\tmp\kraft-combined-logs] Loading producer state till offset 0 with message format version 2 (kafka.log.Log) [2021-04-25 16:50:16,373] DEBUG Scheduling task PeriodicProducerExpirationCheck with initial delay 2147483647 ms and period 2147483647 ms. (kafka.utils.KafkaScheduler) [2021-04-25 16:50:16,420] INFO [raft-expiration-reaper]: Starting (kafka.raft.TimingWheelExpirationService$ExpiredOperationReaper) [2021-04-25 16:50:16,590] ERROR Exiting Kafka due to fatal exception (kafka.Kafka$) java.nio.file.FileSystemException: C:\tmp\kraft-combined-logs\@metadata-0\quorum-state.tmp -> C:\tmp\kraft-combined-logs\@metadata-0\quorum-state: The process cannot access the file because it is being used by another process. at java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92) at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103) at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:395) at java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:288) at java.base/java.nio.file.Files.move(Files.java:1421) at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:904) at org.apache.kafka.raft.FileBasedStateStore.writeElectionStateToFile(FileBasedStateStore.java:148) at org.apache.kafka.raft.FileBasedStateStore.writeElectionState(FileBasedStateStore.java:124) at org.apache.kafka.raft.QuorumState.transitionTo(QuorumState.java:449) at org.apache.kafka.raft.QuorumState.initialize(QuorumState.java:202) at org.apache.kafka.raft.KafkaRaftClient.initialize(KafkaRaftClient.java:351) at
[GitHub] [kafka] tang7526 commented on a change in pull request #10588: KAFKA-12662: add unit test for ProducerPerformance
tang7526 commented on a change in pull request #10588: URL: https://github.com/apache/kafka/pull/10588#discussion_r619798137 ## File path: tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java ## @@ -190,6 +166,51 @@ public static void main(String[] args) throws Exception { } +public KafkaProducer createKafkaProducer(Properties props) { +return new KafkaProducer<>(props); +} + +public static Properties readProps(List producerProps, String producerConfig, String transactionalId, +boolean transactionsEnabled) throws IOException { +Properties props = new Properties(); +if (producerConfig != null) { +props.putAll(Utils.loadProps(producerConfig)); +} +if (producerProps != null) +for (String prop : producerProps) { +String[] pieces = prop.split("="); +if (pieces.length != 2) +throw new IllegalArgumentException("Invalid property: " + prop); +props.put(pieces[0], pieces[1]); +} + +props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); +props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); +if (transactionsEnabled) +props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); +return props; +} + +public static List readPayloadFile(String payloadFilePath, String payloadDelimiter) throws IOException { +List payloadByteList = new ArrayList<>(); +if (payloadFilePath != null) { +Path path = Paths.get(payloadFilePath); +System.out.println("Reading payloads from: " + path.toAbsolutePath()); +if (Files.notExists(path) || Files.size(path) == 0) { +throw new IllegalArgumentException("File does not exist or empty file provided."); Review comment: Thank you for your opinion. I have removed that space. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tang7526 commented on a change in pull request #10588: KAFKA-12662: add unit test for ProducerPerformance
tang7526 commented on a change in pull request #10588: URL: https://github.com/apache/kafka/pull/10588#discussion_r619798057 ## File path: build.gradle ## @@ -1485,7 +1485,7 @@ project(':tools') { testImplementation libs.junitJupiter testImplementation project(':clients').sourceSets.test.output testImplementation libs.mockitoInline // supports mocking static methods, final classes, etc. - +testImplementation libs.easymock Review comment: Thank you for your opinion. I have rewritten it with mockito. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10800) Validate the snapshot id when the state machine creates a snapshot
[ https://issues.apache.org/jira/browse/KAFKA-10800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17331498#comment-17331498 ] Haoran Xuan commented on KAFKA-10800: - [~jagsancio] The PR [GitHub Pull Request #10593|https://github.com/apache/kafka/pull/10593] is ready for review now, thank you! > Validate the snapshot id when the state machine creates a snapshot > -- > > Key: KAFKA-10800 > URL: https://issues.apache.org/jira/browse/KAFKA-10800 > Project: Kafka > Issue Type: Sub-task > Components: replication >Reporter: Jose Armando Garcia Sancio >Assignee: Haoran Xuan >Priority: Major > > When the state machine attempts to create a snapshot writer we should > validate that the following is true: > # The end offset and epoch of the snapshot is less than the high-watermark. > # The end offset and epoch of the snapshot is valid based on the leader > epoch cache. > Note that this validation should not be performed when the raft client > creates the snapshot writer because in that case the local log is out of date > and the follower should trust the snapshot id sent by the partition leader. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] feyman2016 commented on pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot
feyman2016 commented on pull request #10593: URL: https://github.com/apache/kafka/pull/10593#issuecomment-826304130 @jsancio Could you please help to review? Thanks! Locally verified, all the failed tests should not be related. See failed tests in https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10593/2/testReport/?cloudbees-analytics-link=scm-reporting%2Ftests%2Ffailed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] feyman2016 commented on a change in pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot
feyman2016 commented on a change in pull request #10593: URL: https://github.com/apache/kafka/pull/10593#discussion_r619795184 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -2268,6 +2269,20 @@ private Long append(int epoch, List records, boolean isAtomic) { ); } +private void validateSnapshotId(OffsetAndEpoch snapshotId) { +Optional highWatermarkOpt = quorum().highWatermark(); +if (!highWatermarkOpt.isPresent() || highWatermarkOpt.get().offset <= snapshotId.offset) { Review comment: Conceptually, the `snapshotId.offset=highWatermarkOpt.get().offset` is ok, because the record at `snapshotId.offset` is not included in the snapshot, but I'm not sure if there are other restrictions because in the Jira description, it says: `The end offset and epoch of the snapshot is less than the high-watermark`, please kindly advice @jsancio -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] feyman2016 commented on a change in pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot
feyman2016 commented on a change in pull request #10593: URL: https://github.com/apache/kafka/pull/10593#discussion_r619793903 ## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java ## @@ -1335,6 +1313,51 @@ public void testFetchSnapshotRequestClusterIdValidation() throws Exception { context.assertSentFetchSnapshotResponse(Errors.INCONSISTENT_CLUSTER_ID); } +@Test +public void testCreateSnapshotWithInvalidSnapshotId() throws Exception { +int localId = 0; +int otherNodeId = localId + 1; +Set voters = Utils.mkSet(localId, otherNodeId); +int epoch = 2; + +List appendRecords = Arrays.asList("a", "b", "c"); +OffsetAndEpoch invalidSnapshotId1 = new OffsetAndEpoch(3, epoch); + +RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) +.appendToLog(epoch, appendRecords) +.withAppendLingerMs(1) +.build(); + +context.becomeLeader(); +int currentEpoch = context.currentEpoch(); + +// When creating snapshot: +// 1. high watermark cannot be empty +assertEquals(OptionalLong.empty(), context.client.highWatermark()); +assertThrows(KafkaException.class, () -> context.client.createSnapshot(invalidSnapshotId1)); + +// 2. high watermark must larger than the snapshotId's endOffset +advanceHighWatermark(context, currentEpoch, currentEpoch, otherNodeId, localId); +assertNotEquals(OptionalLong.empty(), context.client.highWatermark()); +OffsetAndEpoch invalidSnapshotId2 = new OffsetAndEpoch(context.client.highWatermark().getAsLong(), currentEpoch); +assertThrows(KafkaException.class, () -> context.client.createSnapshot(invalidSnapshotId2)); + +// 3. the current leader epoch cache must larger than the snapshotId's epoch +OffsetAndEpoch invalidSnapshotId3 = new OffsetAndEpoch(context.client.highWatermark().getAsLong() - 1, currentEpoch + 1); +assertThrows(KafkaException.class, () -> context.client.createSnapshot(invalidSnapshotId3)); +} + +private void advanceHighWatermark(RaftClientTestContext context, Review comment: Extract the functionality to avoid duplicate -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] feyman2016 commented on a change in pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot
feyman2016 commented on a change in pull request #10593: URL: https://github.com/apache/kafka/pull/10593#discussion_r619793844 ## File path: raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterTest.java ## @@ -38,7 +38,7 @@ @Test public void testWritingSnapshot() throws IOException { -OffsetAndEpoch id = new OffsetAndEpoch(10L, 3); +OffsetAndEpoch id = new OffsetAndEpoch(0L, 1); Review comment: The highwatermark here is 1, so we need to make the snapshotId's endOffset < 1. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] feyman2016 commented on a change in pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot
feyman2016 commented on a change in pull request #10593: URL: https://github.com/apache/kafka/pull/10593#discussion_r619793709 ## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java ## @@ -96,7 +96,7 @@ public BatchBuilder( } /** - * Append a record to this patch. The caller must first verify there is room for the batch + * Append a record to this batch. The caller must first verify there is room for the batch Review comment: Side fix -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org