[GitHub] [kafka] showuon commented on pull request #9888: KAFKA-12194: use stateListener to catch each state change
showuon commented on pull request #9888: URL: https://github.com/apache/kafka/pull/9888#issuecomment-760714184 @cadonna , thanks for the comments. I've updated in this commit: https://github.com/apache/kafka/pull/9888/commits/46898d994bb3c3495aea967ace9a07549f7fc1e5. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #9888: KAFKA-12194: use stateListener to catch each state change
showuon commented on a change in pull request #9888: URL: https://github.com/apache/kafka/pull/9888#discussion_r557960104 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java ## @@ -90,24 +91,64 @@ public void setup() { ); } +private void startStreamsAndWaitForRunning(final KafkaStreams kafkaStreams) throws InterruptedException { +kafkaStreams.start(); +waitForStateTransition(KafkaStreams.State.RUNNING); +} + @After public void teardown() throws IOException { +stateTransitionHistory.clear(); purgeLocalStreamsState(properties); } +private void addStreamStateChangeListener(final KafkaStreams kafkaStreams) { +// we store each new state in state transition so that we won't miss any state change +kafkaStreams.setStateListener( +(newState, oldState) -> stateTransitionHistory.add(newState) +); +} + +private void waitForStateTransition(final KafkaStreams.State expected) throws InterruptedException { +waitForCondition( +() -> !stateTransitionHistory.isEmpty() && stateTransitionHistory.contains(expected), +DEFAULT_DURATION.toMillis(), +() -> String.format("Client did not change to the %s state in time. Observed new state transitions: %s", +expected, stateTransitionHistory) +); +} Review comment: We can't just check the current state to become `RUNNING` because after we add/remove threads, the state won't change immediately. That is, if we check if the state is `RUNNING` after adding/removing threads, the check will pass, but the rebalance is not happening, yet, which will cause the test fail. So I still use `stateTransitionHistory` to check the state, and also, I checked the last state of the history to see if it is RUNNING. That should be better. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #9888: KAFKA-12194: use stateListener to catch each state change
showuon commented on a change in pull request #9888: URL: https://github.com/apache/kafka/pull/9888#discussion_r557960104 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java ## @@ -90,24 +91,64 @@ public void setup() { ); } +private void startStreamsAndWaitForRunning(final KafkaStreams kafkaStreams) throws InterruptedException { +kafkaStreams.start(); +waitForStateTransition(KafkaStreams.State.RUNNING); +} + @After public void teardown() throws IOException { +stateTransitionHistory.clear(); purgeLocalStreamsState(properties); } +private void addStreamStateChangeListener(final KafkaStreams kafkaStreams) { +// we store each new state in state transition so that we won't miss any state change +kafkaStreams.setStateListener( +(newState, oldState) -> stateTransitionHistory.add(newState) +); +} + +private void waitForStateTransition(final KafkaStreams.State expected) throws InterruptedException { +waitForCondition( +() -> !stateTransitionHistory.isEmpty() && stateTransitionHistory.contains(expected), +DEFAULT_DURATION.toMillis(), +() -> String.format("Client did not change to the %s state in time. Observed new state transitions: %s", +expected, stateTransitionHistory) +); +} Review comment: We can't just check the current state to become `RUNNING` because after we add/remove threads, the state won't change immediately. That is, if we check if the state is `RUNNING` after adding/removing threads, the check will pass, but the rebalance is not happening, yet, which will cause the test fail. So I still use `stateTransitionHistory` to check the state, and I checked the last state of the history to see if it is RUNNING. That should be better. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #9888: KAFKA-12194: use stateListener to catch each state change
showuon commented on a change in pull request #9888: URL: https://github.com/apache/kafka/pull/9888#discussion_r557939529 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java ## @@ -90,24 +91,64 @@ public void setup() { ); } +private void startStreamsAndWaitForRunning(final KafkaStreams kafkaStreams) throws InterruptedException { +kafkaStreams.start(); +waitForStateTransition(KafkaStreams.State.RUNNING); +} + @After public void teardown() throws IOException { +stateTransitionHistory.clear(); purgeLocalStreamsState(properties); } +private void addStreamStateChangeListener(final KafkaStreams kafkaStreams) { +// we store each new state in state transition so that we won't miss any state change +kafkaStreams.setStateListener( +(newState, oldState) -> stateTransitionHistory.add(newState) +); +} + +private void waitForStateTransition(final KafkaStreams.State expected) throws InterruptedException { +waitForCondition( +() -> !stateTransitionHistory.isEmpty() && stateTransitionHistory.contains(expected), +DEFAULT_DURATION.toMillis(), +() -> String.format("Client did not change to the %s state in time. Observed new state transitions: %s", +expected, stateTransitionHistory) +); +} + +// verify if there's the state change from "before" state into "after" state +private boolean hasStateTransition(final KafkaStreams.State before, final KafkaStreams.State after) { +// should have at least 2 states in history +if (stateTransitionHistory.size() < 2) { +return false; +} + +for (int i = 0; i < stateTransitionHistory.size() - 1; i++) { +if (stateTransitionHistory.get(i).equals(before) && stateTransitionHistory.get(i + 1).equals(after)) { +return true; +} +} +return false; Review comment: I use `for loop` is because I think there could be cases that there are some other state changes after RUNNING, ex: DEAD. But after your question, I think if that happened, the test should also fail as well. So, check the last 2 elements is good. Updated. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #9888: KAFKA-12194: use stateListener to catch each state change
showuon commented on a change in pull request #9888: URL: https://github.com/apache/kafka/pull/9888#discussion_r557937603 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java ## @@ -119,37 +160,47 @@ public void shouldAddStreamThread() throws Exception { .sorted().toArray(), equalTo(new String[] {"1", "2", "3"}) ); -waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.REBALANCING, DEFAULT_DURATION); -waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.RUNNING, DEFAULT_DURATION); + +waitForStateTransition(KafkaStreams.State.RUNNING); +assertTrue(hasStateTransition(KafkaStreams.State.REBALANCING, KafkaStreams.State.RUNNING)); Review comment: OK, Updated. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12191) SslTransportTls12Tls13Test can replace 'assumeTrue' by (junit 5) conditional test
[ https://issues.apache.org/jira/browse/KAFKA-12191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-12191. Fix Version/s: 2.8.0 Resolution: Fixed > SslTransportTls12Tls13Test can replace 'assumeTrue' by (junit 5) conditional > test > - > > Key: KAFKA-12191 > URL: https://issues.apache.org/jira/browse/KAFKA-12191 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: GeordieMai >Priority: Minor > Fix For: 2.8.0 > > > the conditional test is more readable than assumeTest in testing code. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 merged pull request #9899: KAFKA-12191 SslTransportTls12Tls13Test can replace 'assumeTrue' by (j…
chia7712 merged pull request #9899: URL: https://github.com/apache/kafka/pull/9899 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12189) ShellTest can replace 'assumeTrue' by (junit 5) conditional test
[ https://issues.apache.org/jira/browse/KAFKA-12189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-12189. Fix Version/s: 2.8.0 Resolution: Fixed > ShellTest can replace 'assumeTrue' by (junit 5) conditional test > > > Key: KAFKA-12189 > URL: https://issues.apache.org/jira/browse/KAFKA-12189 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: GeordieMai >Priority: Minor > Fix For: 2.8.0 > > > from https://github.com/apache/kafka/pull/9874#discussion_r556664433 > the conditional test is more readable than assumeTest in testing code. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 merged pull request #9898: KAFKA-12189 ShellTest can replace 'assumeTrue' by (junit 5) condition…
chia7712 merged pull request #9898: URL: https://github.com/apache/kafka/pull/9898 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9887: KAFKA-12195 Fix synchronization issue happening in KafkaStreams
chia7712 commented on a change in pull request #9887: URL: https://github.com/apache/kafka/pull/9887#discussion_r557882254 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -989,16 +989,20 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin public Optional removeStreamThread() { if (isRunningOrRebalancing()) { synchronized (changeThreadCount) { -for (final StreamThread streamThread : threads) { -if (streamThread.isAlive() && (!streamThread.getName().equals(Thread.currentThread().getName()) || threads.size() == 1)) { -streamThread.shutdown(); -if (!streamThread.getName().equals(Thread.currentThread().getName())) { - streamThread.waitOnThreadState(StreamThread.State.DEAD); +synchronized (threads) { Review comment: make sense. Let me use 'copy' to handle lock issue. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12199) Migrate connect:runtime module to JUnit 5
[ https://issues.apache.org/jira/browse/KAFKA-12199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17265720#comment-17265720 ] Chia-Ping Tsai commented on KAFKA-12199: [~ijuma] PowerMock does not yet have native support for Junit5 (https://github.com/powermock/powermock/issues/830) and there are a lot of test depending on PowerMock functions. It means this PR could get bigger and more complicated as we not only migrate junit 4 to junit 5 but also migrate PowerMock to other mock tool. In order to simplify the review works, we should address following tasks before upgrading junit. 1. select a mock tool which can work with junit 5 - I prefer Mockito as it works well with junit 5 2. rewrite all tests which are depending on PowerMock - this task can be separate to different sub-task as I believe the new mock tool can bring a bunch of changes. WDYT? > Migrate connect:runtime module to JUnit 5 > - > > Key: KAFKA-12199 > URL: https://issues.apache.org/jira/browse/KAFKA-12199 > Project: Kafka > Issue Type: Sub-task >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12203) Migrate connect:mirror-client module to JUnit 5
[ https://issues.apache.org/jira/browse/KAFKA-12203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-12203. Fix Version/s: 2.8.0 Resolution: Fixed > Migrate connect:mirror-client module to JUnit 5 > --- > > Key: KAFKA-12203 > URL: https://issues.apache.org/jira/browse/KAFKA-12203 > Project: Kafka > Issue Type: Sub-task >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > Fix For: 2.8.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 merged pull request #9889: KAFKA-12203 Migrate connect:mirror-client module to JUnit 5
chia7712 merged pull request #9889: URL: https://github.com/apache/kafka/pull/9889 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9889: KAFKA-12203 Migrate connect:mirror-client module to JUnit 5
chia7712 commented on pull request #9889: URL: https://github.com/apache/kafka/pull/9889#issuecomment-760659526 ``` org.apache.kafka.streams.integration.AdjustStreamThreadCountTest.shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect ``` It is traced by #9888 and #9887 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9871: KAFKA-12161; Support raft observers with optional id
abbccdda commented on a change in pull request #9871: URL: https://github.com/apache/kafka/pull/9871#discussion_r557858498 ## File path: raft/src/main/java/org/apache/kafka/raft/QuorumState.java ## @@ -125,9 +125,16 @@ public void initialize(OffsetAndEpoch logEndOffsetAndEpoch) throws IOException, final EpochState initialState; if (!election.voters().isEmpty() && !voters.equals(election.voters())) { throw new IllegalStateException("Configured voter set: " + voters -+ " is different from the voter set read from the state file: " + election.voters() + -". Check if the quorum configuration is up to date, " + -"or wipe out the local state file if necessary"); ++ " is different from the voter set read from the state file: " + election.voters() ++ ". Check if the quorum configuration is up to date, " ++ "or wipe out the local state file if necessary"); +} else if (election.hasVoted() && !isVoter()) { +String localIdDescription = localId.isPresent() ? +localId.getAsInt() + " is not a voter" : +"is undefined"; +throw new IllegalStateException("Initialized quorum state " + election Review comment: Fair enough, we could relax later when we experiment out the static quorum changes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9890: KAFKA-12198: Migrate connect:json module to JUnit 5
chia7712 commented on pull request #9890: URL: https://github.com/apache/kafka/pull/9890#issuecomment-760616110 @dengziming Could you fix the conflicting files? thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset
hachikuji commented on a change in pull request #9816: URL: https://github.com/apache/kafka/pull/9816#discussion_r557828817 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -963,49 +979,111 @@ private FetchResponseData tryCompleteFetchRequest( FetchRequestData.FetchPartition request, long currentTimeMs ) { -Optional errorOpt = validateLeaderOnlyRequest(request.currentLeaderEpoch()); -if (errorOpt.isPresent()) { -return buildEmptyFetchResponse(errorOpt.get(), Optional.empty()); -} +try { +Optional errorOpt = validateLeaderOnlyRequest(request.currentLeaderEpoch()); +if (errorOpt.isPresent()) { +return buildEmptyFetchResponse(errorOpt.get(), Optional.empty()); +} -long fetchOffset = request.fetchOffset(); -int lastFetchedEpoch = request.lastFetchedEpoch(); -LeaderState state = quorum.leaderStateOrThrow(); -Optional divergingEpochOpt = validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch); - -if (divergingEpochOpt.isPresent()) { -Optional divergingEpoch = -divergingEpochOpt.map(offsetAndEpoch -> new FetchResponseData.EpochEndOffset() -.setEpoch(offsetAndEpoch.epoch) -.setEndOffset(offsetAndEpoch.offset)); -return buildFetchResponse(Errors.NONE, MemoryRecords.EMPTY, divergingEpoch, state.highWatermark()); -} else { -LogFetchInfo info = log.read(fetchOffset, Isolation.UNCOMMITTED); +long fetchOffset = request.fetchOffset(); +int lastFetchedEpoch = request.lastFetchedEpoch(); +LeaderState state = quorum.leaderStateOrThrow(); +ValidatedFetchOffsetAndEpoch validatedOffsetAndEpoch = validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch); -if (state.updateReplicaState(replicaId, currentTimeMs, info.startOffsetMetadata)) { -onUpdateLeaderHighWatermark(state, currentTimeMs); +final Records records; +if (validatedOffsetAndEpoch.type() == ValidatedFetchOffsetAndEpoch.Type.VALID) { +LogFetchInfo info = log.read(fetchOffset, Isolation.UNCOMMITTED); + +if (state.updateReplicaState(replicaId, currentTimeMs, info.startOffsetMetadata)) { +onUpdateLeaderHighWatermark(state, currentTimeMs); +} + +records = info.records; +} else { +records = MemoryRecords.EMPTY; } -return buildFetchResponse(Errors.NONE, info.records, Optional.empty(), state.highWatermark()); +return buildFetchResponse(Errors.NONE, records, validatedOffsetAndEpoch, state.highWatermark()); +} catch (Exception e) { +logger.error("Caught unexpected error in fetch completion of request {}", request, e); +return buildEmptyFetchResponse(Errors.UNKNOWN_SERVER_ERROR, Optional.empty()); } } /** * Check whether a fetch offset and epoch is valid. Return the diverging epoch, which * is the largest epoch such that subsequent records are known to diverge. */ -private Optional validateFetchOffsetAndEpoch(long fetchOffset, int lastFetchedEpoch) { -if (fetchOffset == 0 && lastFetchedEpoch == 0) { -return Optional.empty(); +private ValidatedFetchOffsetAndEpoch validateFetchOffsetAndEpoch(long fetchOffset, int lastFetchedEpoch) { +if (log.startOffset() == 0 && fetchOffset == 0) { +if (lastFetchedEpoch != 0) { +logger.warn( +"Replica sent a zero fetch offset ({}) but the last fetched epoch ({}) was not zero", +fetchOffset, +lastFetchedEpoch +); +} +return ValidatedFetchOffsetAndEpoch.valid(new OffsetAndEpoch(fetchOffset, lastFetchedEpoch)); } -OffsetAndEpoch endOffsetAndEpoch = log.endOffsetForEpoch(lastFetchedEpoch) -.orElse(new OffsetAndEpoch(-1L, -1)); -if (endOffsetAndEpoch.epoch != lastFetchedEpoch || endOffsetAndEpoch.offset < fetchOffset) { -return Optional.of(endOffsetAndEpoch); + +Optional endOffsetAndEpochOpt = log +.endOffsetForEpoch(lastFetchedEpoch) +.flatMap(endOffsetAndEpoch -> { +if (endOffsetAndEpoch.epoch == lastFetchedEpoch && endOffsetAndEpoch.offset == log.startOffset()) { +// This means that either: +// 1. The lastFetchedEpoch is smaller than any known epoch +// 2. The current leader epoch is lastFetchedEpoch and the log is empty. +// Assume that there is not diverging information +return Optional.empty(); +} else { +
[GitHub] [kafka] hachikuji commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset
hachikuji commented on a change in pull request #9816: URL: https://github.com/apache/kafka/pull/9816#discussion_r557829548 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -2216,7 +2316,7 @@ public void complete() { // These fields are visible to both the Raft IO thread and the listener // and are protected through synchronization on this `ListenerContext` instance private BatchReader lastSent = null; -private long lastAckedOffset = 0; +private long lastAckedEndOffset = 0; Review comment: I don't feel too strongly about it, but the new name is a little confusing to me. Why would the client only be acking end offsets? Especially confusing when I see this: `lastAckedEndOffset = logStartOffset` 🙂 . I think we probably need a comment here regardless. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset
hachikuji commented on a change in pull request #9816: URL: https://github.com/apache/kafka/pull/9816#discussion_r557818816 ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -147,18 +221,102 @@ class KafkaMetadataLog( } override def createSnapshot(snapshotId: raft.OffsetAndEpoch): RawSnapshotWriter = { -FileRawSnapshotWriter.create(log.dir.toPath, snapshotId) +// Do not let the state machine create snapshots older than the latest snapshot +latestSnapshotId().ifPresent { latest => + if (latest.epoch > snapshotId.epoch || latest.offset > snapshotId.offset) { +// Since snapshots are less than the high-watermark absolute offset comparison is okay. Review comment: Is it useful here to ensure that `snapshotId` is indeed lower than the high watermark? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9888: KAFKA-12194: use stateListener to catch each state change
showuon commented on pull request #9888: URL: https://github.com/apache/kafka/pull/9888#issuecomment-760605267 Test will fail, will work it later. Don't review yet. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9600: KAFKA-10674: Controller API version bond with forwardable APIs
abbccdda commented on a change in pull request #9600: URL: https://github.com/apache/kafka/pull/9600#discussion_r557825070 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -139,8 +138,11 @@ class KafkaApis(val requestChannel: RequestChannel, request: RequestChannel.Request, handler: RequestChannel.Request => Unit ): Unit = { -def responseCallback(response: AbstractResponse): Unit = { - sendForwardedResponse(request, response) +def responseCallback(responseEither: Either[AbstractResponse, Errors]): Unit = { + responseEither match { +case Left(response) => sendForwardedResponse(request, response) +case Right(error) => closeConnection(request, Collections.singletonMap(error, 1)) Review comment: Sg, I prefer using `info` here since it should be a rare case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9733: KAFKA-10017: fix 2 issues in EosBetaUpgradeIntegrationTest
showuon commented on pull request #9733: URL: https://github.com/apache/kafka/pull/9733#issuecomment-760604903 @mjsax , I've reverted my change for the state change. Now, there's only the exception handler improvement change. And for the state change fix, after the holidays, I think, if the test is not flaky now, why should we change it, and maybe make it unreliable again. So, my thought is, we keep monitoring this test, and if tests failed again, we can discuss it again. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset
hachikuji commented on a change in pull request #9816: URL: https://github.com/apache/kafka/pull/9816#discussion_r557643619 ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -16,28 +16,41 @@ */ package kafka.raft +import java.nio.file.Files import java.nio.file.NoSuchFileException +import java.util.NoSuchElementException import java.util.Optional +import java.util.concurrent.ConcurrentSkipListSet -import kafka.log.{AppendOrigin, Log} +import kafka.log.{AppendOrigin, Log, SnapshotGenerated} import kafka.server.{FetchHighWatermark, FetchLogEnd} import org.apache.kafka.common.record.{MemoryRecords, Records} import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.raft -import org.apache.kafka.raft.{LogAppendInfo, LogFetchInfo, LogOffsetMetadata, Isolation, ReplicatedLog} +import org.apache.kafka.raft.{LogAppendInfo, LogFetchInfo, LogOffsetMetadata, Isolation, OffsetMetadata, ReplicatedLog} import org.apache.kafka.snapshot.FileRawSnapshotReader import org.apache.kafka.snapshot.FileRawSnapshotWriter import org.apache.kafka.snapshot.RawSnapshotReader import org.apache.kafka.snapshot.RawSnapshotWriter +import org.apache.kafka.snapshot.Snapshots import scala.compat.java8.OptionConverters._ -class KafkaMetadataLog( +final class KafkaMetadataLog private ( log: Log, + // This object needs to be thread-safe because the polling thread in the KafkaRaftClient implementation + // and other threads will access this object. This object is used to efficiently notify the polling thread Review comment: What other threads? ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -370,7 +375,9 @@ class Log(@volatile private var _dir: File, throw new KafkaStorageException(s"The memory mapped buffer for log of $topicPartition is already closed") } - def highWatermark: Long = highWatermarkMetadata.messageOffset + def highWatermark: Long = _highWatermarkMetadata.messageOffset + + def highWatermarkMetadata: LogOffsetMetadata = _highWatermarkMetadata Review comment: Could we instead either expose `fetchHighWatermarkMetadata` or make use of `fetchOffsetSnapshot` in `KafkaMetadataLog`? ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -147,18 +221,102 @@ class KafkaMetadataLog( } override def createSnapshot(snapshotId: raft.OffsetAndEpoch): RawSnapshotWriter = { -FileRawSnapshotWriter.create(log.dir.toPath, snapshotId) +// Do not let the state machine create snapshots older than the latest snapshot +latestSnapshotId().ifPresent { latest => + if (latest.epoch > snapshotId.epoch || latest.offset > snapshotId.offset) { +// Since snapshots are less than the high-watermark absolute offset comparison is okay. Review comment: Is it useful here to ensure that `snapshotId` here is lower than the high watermark? ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -871,15 +875,32 @@ private FetchResponseData buildFetchResponse( .setLeaderEpoch(quorum.epoch()) .setLeaderId(quorum.leaderIdOrNil()); -divergingEpoch.ifPresent(partitionData::setDivergingEpoch); +switch (validatedOffsetAndEpoch.type()) { +case DIVERGING: +partitionData.divergingEpoch() + .setEpoch(validatedOffsetAndEpoch.offsetAndEpoch().epoch) + .setEndOffset(validatedOffsetAndEpoch.offsetAndEpoch().offset); +break; +case SNAPSHOT: +partitionData.snapshotId() + .setEpoch(validatedOffsetAndEpoch.offsetAndEpoch().epoch) + .setEndOffset(validatedOffsetAndEpoch.offsetAndEpoch().offset); +break; +default: +} }); } private FetchResponseData buildEmptyFetchResponse( Errors error, Optional highWatermark ) { -return buildFetchResponse(error, MemoryRecords.EMPTY, Optional.empty(), highWatermark); +return buildFetchResponse( +error, +MemoryRecords.EMPTY, +ValidatedFetchOffsetAndEpoch.valid(new OffsetAndEpoch(-1, -1)), Review comment: Not sure it's worth creating another type, but it is a little surprising to see `valid(new OffsetAndEpoch(-1, -1))`. ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -2216,7 +2316,7 @@ public void complete() { // These fields are visible to both the Raft IO thread and the listener // and are protected through synchronization on this `ListenerContext` instance private BatchReader lastSent = null; -private long lastAckedOffset = 0; +private long lastAckedEndOffset = 0; Review comment:
[GitHub] [kafka] abbccdda commented on a change in pull request #9600: KAFKA-10674: Controller API version bond with forwardable APIs
abbccdda commented on a change in pull request #9600: URL: https://github.com/apache/kafka/pull/9600#discussion_r557823644 ## File path: core/src/main/scala/kafka/server/ForwardingManager.scala ## @@ -77,7 +79,7 @@ class ForwardingManagerImpl( override def forwardRequest( request: RequestChannel.Request, -responseCallback: AbstractResponse => Unit +responseCallback: Either[AbstractResponse, Errors] => Unit Review comment: I'm inclined to keep it as it is, since the current return type gives caller a root cause message. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9733: KAFKA-10017: fix 2 issues in EosBetaUpgradeIntegrationTest
showuon commented on pull request #9733: URL: https://github.com/apache/kafka/pull/9733#issuecomment-760597684 OK, working on it now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9600: KAFKA-10674: Controller API version bond with forwardable APIs
abbccdda commented on a change in pull request #9600: URL: https://github.com/apache/kafka/pull/9600#discussion_r557810929 ## File path: clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java ## @@ -123,21 +125,25 @@ public short latestUsableVersion(ApiKeys apiKey) { * Get the latest version supported by the broker within an allowed range of versions */ public short latestUsableVersion(ApiKeys apiKey, short oldestAllowedVersion, short latestAllowedVersion) { -ApiVersion usableVersion = supportedVersions.get(apiKey); -if (usableVersion == null) -throw new UnsupportedVersionException("The broker does not support " + apiKey); -return latestUsableVersion(apiKey, usableVersion, oldestAllowedVersion, latestAllowedVersion); +return latestUsableVersion(apiKey, supportedVersions.get(apiKey), oldestAllowedVersion, latestAllowedVersion); } -private short latestUsableVersion(ApiKeys apiKey, ApiVersion supportedVersions, - short minAllowedVersion, short maxAllowedVersion) { -short minVersion = (short) Math.max(minAllowedVersion, supportedVersions.minVersion); -short maxVersion = (short) Math.min(maxAllowedVersion, supportedVersions.maxVersion); -if (minVersion > maxVersion) +private short latestUsableVersion(ApiKeys apiKey, + ApiVersion supportedVersions, + short minAllowedVersion, + short maxAllowedVersion) { +if (supportedVersions == null) Review comment: Yea, actually we could remove the private function since it becomes single caller. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9883: MINOR: Generalize server startup to make way for KIP-500
hachikuji commented on a change in pull request #9883: URL: https://github.com/apache/kafka/pull/9883#discussion_r557807090 ## File path: core/src/main/scala/kafka/server/KafkaRaftServer.scala ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.metrics.{KafkaMetricsReporter, KafkaYammerMetrics} +import kafka.raft.KafkaRaftManager +import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole} +import kafka.utils.{CoreUtils, Logging, Mx4jLoader, VerifiableProperties} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.utils.{AppInfoParser, Time} +import org.apache.kafka.raft.internals.StringSerde + +/** + * Partially stubbed implementation of the KIP-500 server which relies on a self-managed + * Raft quorum for replication of the `@metadata` topic, which stores all of + * the cluster metadata. + */ +class KafkaRaftServer( Review comment: Makes sense. Added a comment above. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #9881: MINOR: Initialize QuorumState lazily in RaftClient.initialize()
hachikuji merged pull request #9881: URL: https://github.com/apache/kafka/pull/9881 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9883: MINOR: Generalize server startup to make way for KIP-500
cmccabe commented on a change in pull request #9883: URL: https://github.com/apache/kafka/pull/9883#discussion_r557799746 ## File path: core/src/main/scala/kafka/server/KafkaRaftServer.scala ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.metrics.{KafkaMetricsReporter, KafkaYammerMetrics} +import kafka.raft.KafkaRaftManager +import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole} +import kafka.utils.{CoreUtils, Logging, Mx4jLoader, VerifiableProperties} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.utils.{AppInfoParser, Time} +import org.apache.kafka.raft.internals.StringSerde + +/** + * Partially stubbed implementation of the KIP-500 server which relies on a self-managed + * Raft quorum for replication of the `@metadata` topic, which stores all of + * the cluster metadata. + */ +class KafkaRaftServer( Review comment: Can we have a comment at least describing the fact that it can contain either a controller, broker, or both This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9600: KAFKA-10674: Controller API version bond with forwardable APIs
hachikuji commented on a change in pull request #9600: URL: https://github.com/apache/kafka/pull/9600#discussion_r557783391 ## File path: clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java ## @@ -123,21 +125,25 @@ public short latestUsableVersion(ApiKeys apiKey) { * Get the latest version supported by the broker within an allowed range of versions */ public short latestUsableVersion(ApiKeys apiKey, short oldestAllowedVersion, short latestAllowedVersion) { -ApiVersion usableVersion = supportedVersions.get(apiKey); -if (usableVersion == null) -throw new UnsupportedVersionException("The broker does not support " + apiKey); -return latestUsableVersion(apiKey, usableVersion, oldestAllowedVersion, latestAllowedVersion); +return latestUsableVersion(apiKey, supportedVersions.get(apiKey), oldestAllowedVersion, latestAllowedVersion); } -private short latestUsableVersion(ApiKeys apiKey, ApiVersion supportedVersions, - short minAllowedVersion, short maxAllowedVersion) { -short minVersion = (short) Math.max(minAllowedVersion, supportedVersions.minVersion); -short maxVersion = (short) Math.min(maxAllowedVersion, supportedVersions.maxVersion); -if (minVersion > maxVersion) +private short latestUsableVersion(ApiKeys apiKey, + ApiVersion supportedVersions, + short minAllowedVersion, + short maxAllowedVersion) { +if (supportedVersions == null) Review comment: nit: since we moved the null check here, why don't we remove the parameter as well and call `supportedVersions.get(apiKey)` here? ## File path: clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java ## @@ -129,6 +131,40 @@ public static ApiVersionsResponseKeyCollection defaultApiKeys(final byte minMagi return apiKeys; } +public static ApiVersionsResponseKeyCollection commonApiVersionsWithActiveController(final byte minMagic, Review comment: Maybe `intersectControllerApiVersions`? ## File path: clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java ## @@ -123,21 +125,25 @@ public short latestUsableVersion(ApiKeys apiKey) { * Get the latest version supported by the broker within an allowed range of versions */ public short latestUsableVersion(ApiKeys apiKey, short oldestAllowedVersion, short latestAllowedVersion) { Review comment: We could probably simplify this so that it takes a single `ApiVersion` parameter? By the way, the implementation above `latestUsableVersion(ApiKeys apiKey)` since it basically does an intersection of the latest supported version with itself. A little helper (say `latestSupportedOrThrow`) might simplify this. ## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala ## @@ -140,6 +143,16 @@ class BrokerToControllerChannelManager( callback )) } + + def controllerApiVersions(): Option[NodeApiVersions] = +requestThread.activeControllerAddress() match { + case Some(activeController) => +if (activeController.id() == config.brokerId) + Some(currentNodeApiVersions) +else + Option(apiVersions.get(activeController.idString())) + case None => None Review comment: Seems like we can replace the `match` with a `flatMap`? ## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala ## @@ -205,20 +226,20 @@ class BrokerToControllerRequestThread( private[server] def handleResponse(request: BrokerToControllerQueueItem)(response: ClientResponse): Unit = { if (response.wasDisconnected()) { - activeController = None + updateControllerAddress(None) requestQueue.putFirst(request) } else if (response.responseBody().errorCounts().containsKey(Errors.NOT_CONTROLLER)) { // just close the controller connection and wait for metadata cache update in doWork - networkClient.disconnect(activeController.get.idString) - activeController = None + networkClient.disconnect(activeControllerAddress().get.idString) Review comment: This is another slippery looking case. Can we just rewrite this as a `foreach` so that we don't need to worry about it? ## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala ## @@ -190,11 +211,11 @@ class BrokerToControllerRequestThread( if (currentTimeMs - request.createdTimeMs >= retryTimeoutMs) { requestIter.remove() request.callback.onTimeout() - } else if (activeController.isDefined) { + } else if (activeControllerAddress().isDefined) { requestIter.remove() return Some(RequestAndCompletio
[GitHub] [kafka] bob-barrett opened a new pull request #9902: KAFKA-12193: Re-resolve IPs after a client disconnects
bob-barrett opened a new pull request #9902: URL: https://github.com/apache/kafka/pull/9902 This patch changes the NetworkClient behavior to resolve the target node's hostname after disconnecting from an established connection, rather than waiting until the previously-resolved addresses are exhausted. This is to handle the scenario when the node's IP addresses have changed during the lifetime of the connection, and means that the client does not have to try to connect to invalid IP addresses until it has tried each address. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9889: KAFKA-12203 Migrate connect:mirror-client module to JUnit 5
ijuma commented on pull request #9889: URL: https://github.com/apache/kafka/pull/9889#issuecomment-760552078 The failed Streams tests seem unrelated, but they seem to be failing very often. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #9892: KAFKA-12201: Migrate connect:basic-auth-extensio module to JUnit 5
ijuma merged pull request #9892: URL: https://github.com/apache/kafka/pull/9892 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10896) TimeoutException when Producer try to send message to Kafka Broker
[ https://issues.apache.org/jira/browse/KAFKA-10896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nguyen Cong Hieu resolved KAFKA-10896. -- Resolution: Not A Bug > TimeoutException when Producer try to send message to Kafka Broker > -- > > Key: KAFKA-10896 > URL: https://issues.apache.org/jira/browse/KAFKA-10896 > Project: Kafka > Issue Type: Bug > Components: clients, config >Affects Versions: 2.6.0 >Reporter: Nguyen Cong Hieu >Priority: Major > Attachments: console_consumer.png, console_producer.png, > producerconfig.png, producercreate.png, serverconfig.png, > timeout_exception.png > > > Dear All, > > As I tried to setup new Kafka environment, even all work normally when I use > consumer and producer binaries that shipped with Kafka source together. > > However, when I ran small java application that create producer to send > message of a topic to broker, but it has timeout exception with the message > "Caused by: org.apache.kafka.common.errors.TimeoutException: Topic otp not > present in metadata after 6 ms." > > I checked some confluence pages related to Kafka but could not resolved yet. > > Please help me to resolve this as well ? and please see my file attachments > for more details about broker information and sample source code. > Best regards. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe opened a new pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller
cmccabe opened a new pull request #9901: URL: https://github.com/apache/kafka/pull/9901 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9887: KAFKA-12195 Fix synchronization issue happening in KafkaStreams
wcarlson5 commented on a change in pull request #9887: URL: https://github.com/apache/kafka/pull/9887#discussion_r557739729 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -989,16 +989,20 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin public Optional removeStreamThread() { if (isRunningOrRebalancing()) { synchronized (changeThreadCount) { -for (final StreamThread streamThread : threads) { -if (streamThread.isAlive() && (!streamThread.getName().equals(Thread.currentThread().getName()) || threads.size() == 1)) { -streamThread.shutdown(); -if (!streamThread.getName().equals(Thread.currentThread().getName())) { - streamThread.waitOnThreadState(StreamThread.State.DEAD); +synchronized (threads) { Review comment: Why are we using the threads object as a lock here? is that was the `Collections.synchronizedList(new LinkedList<>());` uses internally? I am a little concerned that we are holding `changeThreadCount` while waiting for `threads` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12209) Add the timeline data structures for the KIP-631 controller
Colin McCabe created KAFKA-12209: Summary: Add the timeline data structures for the KIP-631 controller Key: KAFKA-12209 URL: https://issues.apache.org/jira/browse/KAFKA-12209 Project: Kafka Issue Type: Improvement Reporter: Colin McCabe -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12183) Add the KIP-631 metadata record definitions
[ https://issues.apache.org/jira/browse/KAFKA-12183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe resolved KAFKA-12183. -- Fix Version/s: 2.8.0 Resolution: Fixed > Add the KIP-631 metadata record definitions > --- > > Key: KAFKA-12183 > URL: https://issues.apache.org/jira/browse/KAFKA-12183 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Major > Labels: kip-500 > Fix For: 2.8.0 > > > Add the KIP-631 metadata record definitions -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe opened a new pull request #9900: KAFKA-12208: Rename AdminManager to ZkAdminManager
cmccabe opened a new pull request #9900: URL: https://github.com/apache/kafka/pull/9900 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12208) Rename AdminManager to ZkAdminManager
Colin McCabe created KAFKA-12208: Summary: Rename AdminManager to ZkAdminManager Key: KAFKA-12208 URL: https://issues.apache.org/jira/browse/KAFKA-12208 Project: Kafka Issue Type: Improvement Reporter: Colin McCabe -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #9871: KAFKA-12161; Support raft observers with optional id
hachikuji commented on a change in pull request #9871: URL: https://github.com/apache/kafka/pull/9871#discussion_r557725416 ## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java ## @@ -2182,6 +2182,46 @@ public void testHandleCommitCallbackFiresInCandidateState() throws Exception { assertEquals(OptionalInt.empty(), secondListener.currentClaimedEpoch()); } +@Test +public void testObserverFetchWithNoLocalId() throws Exception { +// When no `localId` is defined, the client will behave as an observer. Review comment: Makes sense. I will move it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9871: KAFKA-12161; Support raft observers with optional id
hachikuji commented on a change in pull request #9871: URL: https://github.com/apache/kafka/pull/9871#discussion_r557725206 ## File path: raft/src/main/java/org/apache/kafka/raft/QuorumState.java ## @@ -125,9 +125,16 @@ public void initialize(OffsetAndEpoch logEndOffsetAndEpoch) throws IOException, final EpochState initialState; if (!election.voters().isEmpty() && !voters.equals(election.voters())) { throw new IllegalStateException("Configured voter set: " + voters -+ " is different from the voter set read from the state file: " + election.voters() + -". Check if the quorum configuration is up to date, " + -"or wipe out the local state file if necessary"); ++ " is different from the voter set read from the state file: " + election.voters() ++ ". Check if the quorum configuration is up to date, " ++ "or wipe out the local state file if necessary"); +} else if (election.hasVoted() && !isVoter()) { +String localIdDescription = localId.isPresent() ? +localId.getAsInt() + " is not a voter" : +"is undefined"; +throw new IllegalStateException("Initialized quorum state " + election Review comment: I thought I would start out strict and relax later if needed. Worst case, the user would just remove or edit the `quorum-state` file. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9871: KAFKA-12161; Support raft observers with optional id
abbccdda commented on a change in pull request #9871: URL: https://github.com/apache/kafka/pull/9871#discussion_r557722131 ## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java ## @@ -2182,6 +2182,46 @@ public void testHandleCommitCallbackFiresInCandidateState() throws Exception { assertEquals(OptionalInt.empty(), secondListener.currentClaimedEpoch()); } +@Test +public void testObserverFetchWithNoLocalId() throws Exception { +// When no `localId` is defined, the client will behave as an observer. Review comment: Wondering whether we could copy this comment to the raft client localId field. ## File path: raft/src/main/java/org/apache/kafka/raft/QuorumState.java ## @@ -125,9 +125,16 @@ public void initialize(OffsetAndEpoch logEndOffsetAndEpoch) throws IOException, final EpochState initialState; if (!election.voters().isEmpty() && !voters.equals(election.voters())) { throw new IllegalStateException("Configured voter set: " + voters -+ " is different from the voter set read from the state file: " + election.voters() + -". Check if the quorum configuration is up to date, " + -"or wipe out the local state file if necessary"); ++ " is different from the voter set read from the state file: " + election.voters() ++ ". Check if the quorum configuration is up to date, " ++ "or wipe out the local state file if necessary"); +} else if (election.hasVoted() && !isVoter()) { +String localIdDescription = localId.isPresent() ? +localId.getAsInt() + " is not a voter" : +"is undefined"; +throw new IllegalStateException("Initialized quorum state " + election Review comment: So a node could not transit from follower to observer though restarts? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9887: KAFKA-12195 Fix synchronization issue happening in KafkaStreams
ableegoldman commented on pull request #9887: URL: https://github.com/apache/kafka/pull/9887#issuecomment-760488841 Hey @wcarlson5 can you also take a look at this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda merged pull request #9834: MINOR: fix typo in TimeIndex
abbccdda merged pull request #9834: URL: https://github.com/apache/kafka/pull/9834 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #9888: KAFKA-12194: use stateListener to catch each state change
cadonna commented on a change in pull request #9888: URL: https://github.com/apache/kafka/pull/9888#discussion_r557647216 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java ## @@ -119,37 +160,47 @@ public void shouldAddStreamThread() throws Exception { .sorted().toArray(), equalTo(new String[] {"1", "2", "3"}) ); -waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.REBALANCING, DEFAULT_DURATION); -waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.RUNNING, DEFAULT_DURATION); + +waitForStateTransition(KafkaStreams.State.RUNNING); +assertTrue(hasStateTransition(KafkaStreams.State.REBALANCING, KafkaStreams.State.RUNNING)); Review comment: We normally use `assertThat()` in new and refactored code. Please also change the other occurrences. ```suggestion assertThat(hasStateTransition(KafkaStreams.State.REBALANCING, KafkaStreams.State.RUNNING), is(true)); ``` ## File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java ## @@ -90,24 +91,64 @@ public void setup() { ); } +private void startStreamsAndWaitForRunning(final KafkaStreams kafkaStreams) throws InterruptedException { +kafkaStreams.start(); +waitForStateTransition(KafkaStreams.State.RUNNING); +} + @After public void teardown() throws IOException { +stateTransitionHistory.clear(); purgeLocalStreamsState(properties); } +private void addStreamStateChangeListener(final KafkaStreams kafkaStreams) { +// we store each new state in state transition so that we won't miss any state change +kafkaStreams.setStateListener( +(newState, oldState) -> stateTransitionHistory.add(newState) +); +} + +private void waitForStateTransition(final KafkaStreams.State expected) throws InterruptedException { +waitForCondition( +() -> !stateTransitionHistory.isEmpty() && stateTransitionHistory.contains(expected), +DEFAULT_DURATION.toMillis(), +() -> String.format("Client did not change to the %s state in time. Observed new state transitions: %s", +expected, stateTransitionHistory) +); +} + +// verify if there's the state change from "before" state into "after" state +private boolean hasStateTransition(final KafkaStreams.State before, final KafkaStreams.State after) { +// should have at least 2 states in history +if (stateTransitionHistory.size() < 2) { +return false; +} + +for (int i = 0; i < stateTransitionHistory.size() - 1; i++) { +if (stateTransitionHistory.get(i).equals(before) && stateTransitionHistory.get(i + 1).equals(after)) { +return true; +} +} +return false; Review comment: Why do we need a `for`-loop here? Wouldn't it suffice to verify the last two elements of the history and check if those two elements are a `REBALANCING` followed by a `RUNNING`? ## File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java ## @@ -90,24 +91,64 @@ public void setup() { ); } +private void startStreamsAndWaitForRunning(final KafkaStreams kafkaStreams) throws InterruptedException { +kafkaStreams.start(); +waitForStateTransition(KafkaStreams.State.RUNNING); +} + @After public void teardown() throws IOException { +stateTransitionHistory.clear(); purgeLocalStreamsState(properties); } +private void addStreamStateChangeListener(final KafkaStreams kafkaStreams) { +// we store each new state in state transition so that we won't miss any state change +kafkaStreams.setStateListener( +(newState, oldState) -> stateTransitionHistory.add(newState) +); +} + +private void waitForStateTransition(final KafkaStreams.State expected) throws InterruptedException { +waitForCondition( +() -> !stateTransitionHistory.isEmpty() && stateTransitionHistory.contains(expected), +DEFAULT_DURATION.toMillis(), +() -> String.format("Client did not change to the %s state in time. Observed new state transitions: %s", +expected, stateTransitionHistory) +); +} Review comment: Couldn't we simply wait for the current state to become `RUNNING`? ```suggestion private void waitForRunning() throws Exception { waitForCondition( () -> kafkaStreams.state() == KafkaStreams.State.RUNNING, DEFAULT_DURATION.toMillis(), () -> String.format("Client did not transi
[jira] [Updated] (KAFKA-12207) Do not maintain list of latest producer append information
[ https://issues.apache.org/jira/browse/KAFKA-12207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-12207: Labels: needs-kip (was: ) > Do not maintain list of latest producer append information > --- > > Key: KAFKA-12207 > URL: https://issues.apache.org/jira/browse/KAFKA-12207 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Priority: Major > Labels: needs-kip > > For each producerId writing to each partition, we maintain a list of the 5 > most recent appended sequence numbers and the corresponding offsets in the > log. If a producer fails to receive a successful response and retries the > Produce request, then we can still return the offset of the successful > append, which is returned to the user inside `RecordMetadata`. (Note that the > limit of 5 most recent appends is where we derive the limit on the max number > of inflight requests that the producer is allowed when idempotence is > enabled.) > This is only a "best-effort" attempt to return the offset of the append. For > example, we do not populate the full list of recent appends when the log is > reloaded. Only the latest sequence/offset are reloaded from the snapshot. If > we receive a duplicate and we do not have the offset, then the broker > currently handles this by returning OUT_OF_ORDER_SEQUENCE. > In fact, we have a separate error DUPLICATE_SEQUENCE_NUMBER which was > intended to handle this case and the producer already checks for it. If the > producer sees this error in the response, then the `send` is considered > successful, but the producer returns -1 as both the offset and timestamp > inside `RecordMetadata`. > The reason we never implemented this on the broker is probably because we > allow the sequence numbers of the producer to wrap around after reaching > Int.MaxValue. What we considered in the past is fixing a number like 1000 and > requiring that the sequence be within that range to be considered a > duplicate. A better solution going forward is to let the producer bump the > epoch when the sequence hits Int.MaxValue, but we still have to allow > sequence numbers to wrap for compatibility. > Given the loose guarantees that we already have here, I'm considering whether > the additional bookkeeping and the required memory are worth preserving. As > an alternative, we could consider the following: > 1. The broker will only maintain the latest sequence/offset for each > producerId > 2. We will return DUPLICATE_SEQUENCE_NUMBER for any sequence that is within > 1000 of the latest sequence (accounting for overflow). > 3. Instead of wrapping around sequence numbers, the producer will bump the > epoch if possible. It's worth noting that the idempotent producer can freely > bump the epoch, so the only time we should ever need to wrap the sequence is > for the transactional producer when it is used on a broker which does not > support the `InitProducerId` version which allows epoch bumps. > 4. We can remove the restriction on `max.in.flight.requests.per.connection` > and document that if the offset is required in `RecordMetadata`, then the > user must set this to 1. Internally, if connecting to an old broker which > does not support epoch bumps, then we can restrict the number of inflight > requests to 5. > The benefit in the end is that we can reduce the memory usage for producer > state and the complexity to manage that state. It also gives us a path to > removing the annoying config restriction and a better policy for sequence > overflow. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12207) Do not maintain list of latest producer append information
[ https://issues.apache.org/jira/browse/KAFKA-12207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-12207: Issue Type: Improvement (was: Bug) > Do not maintain list of latest producer append information > --- > > Key: KAFKA-12207 > URL: https://issues.apache.org/jira/browse/KAFKA-12207 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Priority: Major > > For each producerId writing to each partition, we maintain a list of the 5 > most recent appended sequence numbers and the corresponding offsets in the > log. If a producer fails to receive a successful response and retries the > Produce request, then we can still return the offset of the successful > append, which is returned to the user inside `RecordMetadata`. (Note that the > limit of 5 most recent appends is where we derive the limit on the max number > of inflight requests that the producer is allowed when idempotence is > enabled.) > This is only a "best-effort" attempt to return the offset of the append. For > example, we do not populate the full list of recent appends when the log is > reloaded. Only the latest sequence/offset are reloaded from the snapshot. If > we receive a duplicate and we do not have the offset, then the broker > currently handles this by returning OUT_OF_ORDER_SEQUENCE. > In fact, we have a separate error DUPLICATE_SEQUENCE_NUMBER which was > intended to handle this case and the producer already checks for it. If the > producer sees this error in the response, then the `send` is considered > successful, but the producer returns -1 as both the offset and timestamp > inside `RecordMetadata`. > The reason we never implemented this on the broker is probably because we > allow the sequence numbers of the producer to wrap around after reaching > Int.MaxValue. What we considered in the past is fixing a number like 1000 and > requiring that the sequence be within that range to be considered a > duplicate. A better solution going forward is to let the producer bump the > epoch when the sequence hits Int.MaxValue, but we still have to allow > sequence numbers to wrap for compatibility. > Given the loose guarantees that we already have here, I'm considering whether > the additional bookkeeping and the required memory are worth preserving. As > an alternative, we could consider the following: > 1. The broker will only maintain the latest sequence/offset for each > producerId > 2. We will return DUPLICATE_SEQUENCE_NUMBER for any sequence that is within > 1000 of the latest sequence (accounting for overflow). > 3. Instead of wrapping around sequence numbers, the producer will bump the > epoch if possible. It's worth noting that the idempotent producer can freely > bump the epoch, so the only time we should ever need to wrap the sequence is > for the transactional producer when it is used on a broker which does not > support the `InitProducerId` version which allows epoch bumps. > 4. We can remove the restriction on `max.in.flight.requests.per.connection` > and document that if the offset is required in `RecordMetadata`, then the > user must set this to 1. Internally, if connecting to an old broker which > does not support epoch bumps, then we can restrict the number of inflight > requests to 5. > The benefit in the end is that we can reduce the memory usage for producer > state and the complexity to manage that state. It also gives us a path to > removing the annoying config restriction and a better policy for sequence > overflow. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12207) Do not maintain list of latest producer append information
Jason Gustafson created KAFKA-12207: --- Summary: Do not maintain list of latest producer append information Key: KAFKA-12207 URL: https://issues.apache.org/jira/browse/KAFKA-12207 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson For each producerId writing to each partition, we maintain a list of the 5 most recent appended sequence numbers and the corresponding offsets in the log. If a producer fails to receive a successful response and retries the Produce request, then we can still return the offset of the successful append, which is returned to the user inside `RecordMetadata`. (Note that the limit of 5 most recent appends is where we derive the limit on the max number of inflight requests that the producer is allowed when idempotence is enabled.) This is only a "best-effort" attempt to return the offset of the append. For example, we do not populate the full list of recent appends when the log is reloaded. Only the latest sequence/offset are reloaded from the snapshot. If we receive a duplicate and we do not have the offset, then the broker currently handles this by returning OUT_OF_ORDER_SEQUENCE. In fact, we have a separate error DUPLICATE_SEQUENCE_NUMBER which was intended to handle this case and the producer already checks for it. If the producer sees this error in the response, then the `send` is considered successful, but the producer returns -1 as both the offset and timestamp inside `RecordMetadata`. The reason we never implemented this on the broker is probably because we allow the sequence numbers of the producer to wrap around after reaching Int.MaxValue. What we considered in the past is fixing a number like 1000 and requiring that the sequence be within that range to be considered a duplicate. A better solution going forward is to let the producer bump the epoch when the sequence hits Int.MaxValue, but we still have to allow sequence numbers to wrap for compatibility. Given the loose guarantees that we already have here, I'm considering whether the additional bookkeeping and the required memory are worth preserving. As an alternative, we could consider the following: 1. The broker will only maintain the latest sequence/offset for each producerId 2. We will return DUPLICATE_SEQUENCE_NUMBER for any sequence that is within 1000 of the latest sequence (accounting for overflow). 3. Instead of wrapping around sequence numbers, the producer will bump the epoch if possible. It's worth noting that the idempotent producer can freely bump the epoch, so the only time we should ever need to wrap the sequence is for the transactional producer when it is used on a broker which does not support the `InitProducerId` version which allows epoch bumps. 4. We can remove the restriction on `max.in.flight.requests.per.connection` and document that if the offset is required in `RecordMetadata`, then the user must set this to 1. Internally, if connecting to an old broker which does not support epoch bumps, then we can restrict the number of inflight requests to 5. The benefit in the end is that we can reduce the memory usage for producer state and the complexity to manage that state. It also gives us a path to removing the annoying config restriction and a better policy for sequence overflow. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] C0urante commented on pull request #9880: KAFKA-10792 (2.5 backport): Prevent source task shutdown from blocking herder thread (#9669)
C0urante commented on pull request #9880: URL: https://github.com/apache/kafka/pull/9880#issuecomment-760448619 Thanks Randall! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9715: Upstream ApisUtils from kip-500
jsancio commented on a change in pull request #9715: URL: https://github.com/apache/kafka/pull/9715#discussion_r557669397 ## File path: core/src/main/scala/kafka/server/AuthHelper.scala ## @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.lang.{Byte => JByte} +import java.util.Collections + +import kafka.network.RequestChannel +import kafka.security.authorizer.AclEntry +import org.apache.kafka.common.acl.AclOperation +import org.apache.kafka.common.errors.ClusterAuthorizationException +import org.apache.kafka.common.requests.RequestContext +import org.apache.kafka.common.resource.Resource.CLUSTER_NAME +import org.apache.kafka.common.resource.ResourceType.CLUSTER +import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType} +import org.apache.kafka.common.utils.Utils +import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer} + +import scala.jdk.CollectionConverters._ + + +class AuthHelper(val requestChannel: RequestChannel, + val authorizer: Option[Authorizer]) { Review comment: Adding `val` to the constructor arguments makes the `public` members. Glancing at the code it doesn't look like this is needed. ## File path: core/src/main/scala/kafka/server/RequestHandlerHelper.scala ## @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.cluster.Partition +import kafka.coordinator.group.GroupCoordinator +import kafka.coordinator.transaction.TransactionCoordinator +import kafka.network.RequestChannel +import kafka.server.QuotaFactory.QuotaManagers +import kafka.utils.Logging +import org.apache.kafka.common.errors.ClusterAuthorizationException +import org.apache.kafka.common.internals.Topic +import org.apache.kafka.common.network.Send +import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse} +import org.apache.kafka.common.utils.Time + + +object RequestHandlerHelper { + + def onLeadershipChange(groupCoordinator: GroupCoordinator, + txnCoordinator: TransactionCoordinator, + updatedLeaders: Iterable[Partition], + updatedFollowers: Iterable[Partition]): Unit = { +// for each new leader or follower, call coordinator to handle consumer group migration. +// this callback is invoked under the replica state change lock to ensure proper order of +// leadership changes +updatedLeaders.foreach { partition => + if (partition.topic == Topic.GROUP_METADATA_TOPIC_NAME) +groupCoordinator.onElection(partition.partitionId) + else if (partition.topic == Topic.TRANSACTION_STATE_TOPIC_NAME) +txnCoordinator.onElection(partition.partitionId, partition.getLeaderEpoch) +} + +updatedFollowers.foreach { partition => + if (partition.topic == Topic.GROUP_METADATA_TOPIC_NAME) +groupCoordinator.onResignation(partition.partitionId) + else if (partition.topic == Topic.TRANSACTION_STATE_TOPIC_NAME) +txnCoordinator.onResignation(partition.partitionId, Some(partition.getLeaderEpoch)) +} + } +} + + + +class RequestHandlerHelper(val requestChannel: RequestChannel, + val quotas: QuotaManagers, + val time: Time, + val logPrefix: String) extends Logging { Review comm
[GitHub] [kafka] lct45 commented on a change in pull request #9895: KAFKA-9924: Add docs for RocksDB properties-based metrics
lct45 commented on a change in pull request #9895: URL: https://github.com/apache/kafka/pull/9895#discussion_r557642098 ## File path: docs/ops.html ## @@ -2133,6 +2141,137 @@ RocksDB Properties-based Metrics + RocksDB Properties-based Metrics: + All of the following properties-based metrics have a recording level of info and are recorded when the + metrics are accessed. + If a state store consists of multiple RocksDB instances as it is the case for aggregations over time and session windows, Review comment: ```suggestion If a state store consists of multiple RocksDB instances, as is the case for aggregations over time and session windows, ``` ## File path: docs/ops.html ## @@ -2133,6 +2141,137 @@ RocksDB Properties-based Metrics + RocksDB Properties-based Metrics: + All of the following properties-based metrics have a recording level of info and are recorded when the + metrics are accessed. + If a state store consists of multiple RocksDB instances as it is the case for aggregations over time and session windows, + each metric reports the sum over the RocksDB instances of the state store, except for the block cache metrics Review comment: ```suggestion each metric reports the sum over all the RocksDB instances of the state store, except for the block cache metrics ``` ## File path: docs/ops.html ## @@ -2032,17 +2032,25 @@ RocksDB Metrics - All of the following metrics have a recording level of debug. - The metrics are collected every minute from the RocksDB state stores. - If a state store consists of multiple RocksDB instances as it is the case for aggregations over time and session windows, - each metric reports an aggregation over the RocksDB instances of the state store. + RocksDB Metrics are grouped into statistics-based metrics and properties-based metrics. + The former are recorded from statistics that a RocksDB state store collects whereas the latter are recorded from + properties that RocksDB exposes. Note that the store-scope for built-in RocksDB state stores are currently the following: rocksdb-state (for RocksDB backed key-value store) rocksdb-window-state (for RocksDB backed window store) rocksdb-session-state (for RocksDB backed session store) + + RocksDB Statistics-based Metrics: + All of the following statistics-based metrics have a recording level of debug because collecting + statistics in https://github.com/facebook/rocksdb/wiki/Statistics#stats-level-and-performance-costs";>RocksDB + may have an impact on performance. + Statistics-based metrics are collected every minute from the RocksDB state stores. + If a state store consists of multiple RocksDB instances as it is the case for aggregations over time and session windows, Review comment: ```suggestion If a state store consists of multiple RocksDB instances, as is the case for aggregations over time and session windows, ``` ## File path: docs/ops.html ## @@ -2032,17 +2032,25 @@ RocksDB Metrics - All of the following metrics have a recording level of debug. - The metrics are collected every minute from the RocksDB state stores. - If a state store consists of multiple RocksDB instances as it is the case for aggregations over time and session windows, - each metric reports an aggregation over the RocksDB instances of the state store. + RocksDB Metrics are grouped into statistics-based metrics and properties-based metrics. + The former are recorded from statistics that a RocksDB state store collects whereas the latter are recorded from + properties that RocksDB exposes. Review comment: I'm still a little confused about the difference between the two after reading this. Maybe an example of something that is exposed by RocksDB but not collected by RocksDB would help ## File path: docs/ops.html ## @@ -2133,6 +2141,137 @@ RocksDB Properties-based Metrics + RocksDB Properties-based Metrics: + All of the following properties-based metrics have a recording level of info and are recorded when the + metrics are accessed. + If a state store consists of multiple RocksDB instances as it is the case for aggregations over time and session windows, + each metric reports the sum over the RocksDB instances of the state store, except for the block cache metrics + block-cache-*. The block cache metrics report the sum over all RocksDB instances if each instance uses its + own block cache, and they report the recorded value from only one instance if a single block cache is shared + among all instances. + + + + + Metric/Attribute name + Description + Mbean name + + + num-immutable-mem-table + The number of immutable memtables that have not yet been flushed. + kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) + +
[GitHub] [kafka] mjsax commented on pull request #9733: KAFKA-10017: fix 2 issues in EosBetaUpgradeIntegrationTest
mjsax commented on pull request #9733: URL: https://github.com/apache/kafka/pull/9733#issuecomment-760424092 @showuon Sorry for late response. Pretty busy atm. We found that the fix for the exception handler contained in this PR is also blocking https://github.com/apache/kafka/pull/9720 (\cc @wcarlson5) -- could you extract this fix into a separate PR that we can merge quickly to unblock 9720? Otherwise we might need to extract your fix into 9720 to be able to move forward with it. Hope to cycle back to this PR soon.. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset
jsancio commented on a change in pull request #9816: URL: https://github.com/apache/kafka/pull/9816#discussion_r557643648 ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -29,15 +32,22 @@ import org.apache.kafka.snapshot.FileRawSnapshotReader import org.apache.kafka.snapshot.FileRawSnapshotWriter import org.apache.kafka.snapshot.RawSnapshotReader import org.apache.kafka.snapshot.RawSnapshotWriter +import org.apache.kafka.snapshot.Snapshots import scala.compat.java8.OptionConverters._ -class KafkaMetadataLog( +final class KafkaMetadataLog private ( log: Log, + snapshotIds: ConcurrentSkipListSet[raft.OffsetAndEpoch], topicPartition: TopicPartition, - maxFetchSizeInBytes: Int = 1024 * 1024 + maxFetchSizeInBytes: Int ) extends ReplicatedLog { + private[this] var startSnapshotId = snapshotIds Review comment: Scala generates different code for `private[this]` vs `private` when used on a field. Scala visibility model is different from the JVM visibility model. Because of this when using `private` on a field, Scala generates access methods for the field. When using `private[this]` Scala doesn't generate these access methods. For example: ```scala final class PrivateThisTest { private[this] var privateThis = Some(10); private var justPrivate = Some(20); } ``` generates (`javap -p ...`) the following class: ```java public final class PrivateThisTest { private scala.Some privateThis; private scala.Some justPrivate; private scala.Some justPrivate(); private void justPrivate_$eq(scala.Some); public PrivateThisTest(); } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] aloknnikhil commented on a change in pull request #9881: MINOR: Initialize QuorumState lazily in RaftClient.initialize()
aloknnikhil commented on a change in pull request #9881: URL: https://github.com/apache/kafka/pull/9881#discussion_r557624393 ## File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java ## @@ -129,16 +130,25 @@ public String toString() { QUORUM_LINGER_MS_DOC); } -public RaftConfig(Properties props) { -super(CONFIG, props); -} +private final int requestTimeoutMs; +private final int retryBackoffMs; +private final int electionTimeoutMs; +private final int electionBackoffMaxMs; +private final int fetchTimeoutMs; +private final int appendLingerMs; -public RaftConfig(Map props) { -super(CONFIG, props); +public RaftConfig(Map props) { +this(props, true); } protected RaftConfig(Map props, boolean doLog) { super(CONFIG, props, doLog); +requestTimeoutMs = getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG); +retryBackoffMs = getInt(QUORUM_RETRY_BACKOFF_MS_CONFIG); +electionTimeoutMs = getInt(QUORUM_ELECTION_TIMEOUT_MS_CONFIG); +electionBackoffMaxMs = getInt(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG); +fetchTimeoutMs = getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG); +appendLingerMs = getInt(QUORUM_LINGER_MS_CONFIG); Review comment: Yea, I think that's reasonable. Added. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9883: MINOR: Generalize server startup to make way for KIP-500
hachikuji commented on a change in pull request #9883: URL: https://github.com/apache/kafka/pull/9883#discussion_r557622659 ## File path: core/src/main/scala/kafka/server/KafkaRaftController.scala ## @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +class KafkaRaftController { Review comment: Decided to go with `ControllerServer` and `BrokerServer`. As Colin mentioned offline, the "server" suffix is helpful conveying that this component is responsible for starting sockets and such. This distinguishes `ControllerServer` from the actual implementation of the controller logic. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9881: MINOR: Initialize QuorumState lazily in RaftClient.initialize()
hachikuji commented on a change in pull request #9881: URL: https://github.com/apache/kafka/pull/9881#discussion_r557611820 ## File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java ## @@ -129,16 +130,25 @@ public String toString() { QUORUM_LINGER_MS_DOC); } -public RaftConfig(Properties props) { -super(CONFIG, props); -} +private final int requestTimeoutMs; +private final int retryBackoffMs; +private final int electionTimeoutMs; +private final int electionBackoffMaxMs; +private final int fetchTimeoutMs; +private final int appendLingerMs; -public RaftConfig(Map props) { -super(CONFIG, props); +public RaftConfig(Map props) { +this(props, true); } protected RaftConfig(Map props, boolean doLog) { super(CONFIG, props, doLog); +requestTimeoutMs = getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG); +retryBackoffMs = getInt(QUORUM_RETRY_BACKOFF_MS_CONFIG); +electionTimeoutMs = getInt(QUORUM_ELECTION_TIMEOUT_MS_CONFIG); +electionBackoffMaxMs = getInt(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG); +fetchTimeoutMs = getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG); +appendLingerMs = getInt(QUORUM_LINGER_MS_CONFIG); Review comment: Is it worth storing a field for voter connections as well to be consistent? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10792) Source tasks can block herder thread by hanging during stop
[ https://issues.apache.org/jira/browse/KAFKA-10792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17265116#comment-17265116 ] Randall Hauch commented on KAFKA-10792: --- Thanks for the backport PR for the `2.5` branch, [~ChrisEgerton]. I've merged it to the `2.5` branch for inclusion in the next patch release (2.5.2). > Source tasks can block herder thread by hanging during stop > --- > > Key: KAFKA-10792 > URL: https://issues.apache.org/jira/browse/KAFKA-10792 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.4.0, 2.5.0, 2.4.1, 2.6.0, 2.4.2, 2.5.1, 2.7.0 >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > Fix For: 2.5.2, 2.8.0, 2.7.1, 2.6.2 > > > If a source task blocks during its {{stop}} method, the herder thread will > also block, which can cause issues with detecting rebalances, reconfiguring > connectors, and other vital functions of a Connect worker. > This occurs because the call to {{SourceTask::stop}} occurs on the herder's > thread, instead of on the source task's own dedicated thread. This can be > fixed by moving the call to {{SourceTask::stop}} onto the source task's > dedicated thread and aligning with the current approach for {{Connector}} > instances and {{SinkTask}} instances. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10792) Source tasks can block herder thread by hanging during stop
[ https://issues.apache.org/jira/browse/KAFKA-10792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-10792: -- Fix Version/s: 2.5.2 > Source tasks can block herder thread by hanging during stop > --- > > Key: KAFKA-10792 > URL: https://issues.apache.org/jira/browse/KAFKA-10792 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.4.0, 2.5.0, 2.4.1, 2.6.0, 2.4.2, 2.5.1, 2.7.0 >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > Fix For: 2.5.2, 2.8.0, 2.7.1, 2.6.2 > > > If a source task blocks during its {{stop}} method, the herder thread will > also block, which can cause issues with detecting rebalances, reconfiguring > connectors, and other vital functions of a Connect worker. > This occurs because the call to {{SourceTask::stop}} occurs on the herder's > thread, instead of on the source task's own dedicated thread. This can be > fixed by moving the call to {{SourceTask::stop}} onto the source task's > dedicated thread and aligning with the current approach for {{Connector}} > instances and {{SinkTask}} instances. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rhauch merged pull request #9880: KAFKA-10792 (2.5 backport): Prevent source task shutdown from blocking herder thread (#9669)
rhauch merged pull request #9880: URL: https://github.com/apache/kafka/pull/9880 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on pull request #9880: KAFKA-10792 (2.5 backport): Prevent source task shutdown from blocking herder thread (#9669)
rhauch commented on pull request #9880: URL: https://github.com/apache/kafka/pull/9880#issuecomment-760387979 There seem to be environmental build issues; the builds are failing with "JDK" unknown and "maven" unknown. I'm going to go ahead and merge this PR to the `2.5` branch. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] g1geordie opened a new pull request #9899: KAFKA-12191 SslTransportTls12Tls13Test can replace 'assumeTrue' by (j…
g1geordie opened a new pull request #9899: URL: https://github.com/apache/kafka/pull/9899 SslTransportTls12Tls13Test can replace 'assumeTrue' by (junit 5) conditional test Test in Java8 ``` testCiphersSuiteForTls12FailsForTls13() Disabled on JRE version: 1.8.0_275 1.27 spassedtestCiphersSuiteForTls12() ignoredtestCiphersSuiteForTls13() Disabled on JRE version: 1.8.0_275 ignoredtestCiphersSuiteFailForServerTls12ClientTls13() Disabled on JRE version: 1.8.0_275 ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on pull request #9880: KAFKA-10792 (2.5 backport): Prevent source task shutdown from blocking herder thread (#9669)
rhauch commented on pull request #9880: URL: https://github.com/apache/kafka/pull/9880#issuecomment-760385519 I'm trying to get the builds to run properly, but I did pull locally and the build of that local branch does pass. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9889: KAFKA-12203 Migrate connect:mirror-client module to JUnit 5
chia7712 commented on pull request #9889: URL: https://github.com/apache/kafka/pull/9889#issuecomment-760384843 > Given the test results for the last build, I think you can merge this, right? It produces some failed tests but they pass on my local. I want to check QA again :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] g1geordie opened a new pull request #9898: KAFKA-12189 ShellTest can replace 'assumeTrue' by (junit 5) condition…
g1geordie opened a new pull request #9898: URL: https://github.com/apache/kafka/pull/9898 ShellTest can replace 'assumeTrue' by (junit 5) conditional test I also test in windows . ``` ShellTest > testEchoHello() SKIPPED ShellTest > testRunProgramWithErrorReturn() SKIPPED ShellTest > testHeadDevZero() SKIPPED ShellTest > testAttemptToRunNonExistentProgram() SKIPPED ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9889: KAFKA-12203 Migrate connect:mirror-client module to JUnit 5
ijuma commented on pull request #9889: URL: https://github.com/apache/kafka/pull/9889#issuecomment-760378170 @chia7712 Given the test results for the last build, I think you can merge this, right? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9876: KAFKA-12183: Add the KIP-631 metadata record definitions
ijuma commented on a change in pull request #9876: URL: https://github.com/apache/kafka/pull/9876#discussion_r557596784 ## File path: build.gradle ## @@ -1010,6 +1015,53 @@ project(':core') { } } +project(':metadata') { + archivesBaseName = "kafka-metadata" + + dependencies { +compile project(':clients') +compile libs.jacksonDatabind +compile libs.jacksonJDK8Datatypes +compileOnly libs.log4j +testCompile libs.junitJupiter +testCompile libs.hamcrest +testCompile libs.slf4jlog4j +testCompile project(':clients').sourceSets.test.output // for org.apache.kafka.test.IntegrationTest Review comment: This is not right, the mentioned annotation is only used for JUnit 4. I submitted a minor PR that fixes the comment at least: https://github.com/apache/kafka/pull/9897 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma opened a new pull request #9897: MINOR: Remove incorrect code in metadata module build
ijuma opened a new pull request #9897: URL: https://github.com/apache/kafka/pull/9897 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #9896: KAFKA-12206: o.a.k.common.Uuid should implement Comparable
jolshan commented on pull request #9896: URL: https://github.com/apache/kafka/pull/9896#issuecomment-760375020 LGTM This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on pull request #9888: KAFKA-12194: use stateListener to catch each state change
wcarlson5 commented on pull request #9888: URL: https://github.com/apache/kafka/pull/9888#issuecomment-760368501 @showuon These changes look good. Thanks for shoring up these tests This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe opened a new pull request #9896: KAFKA-12206: o.a.k.common.Uuid should implement Comparable
cmccabe opened a new pull request #9896: URL: https://github.com/apache/kafka/pull/9896 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12206) o.a.k.common.Uuid should implement Comparable
Colin McCabe created KAFKA-12206: Summary: o.a.k.common.Uuid should implement Comparable Key: KAFKA-12206 URL: https://issues.apache.org/jira/browse/KAFKA-12206 Project: Kafka Issue Type: Improvement Reporter: Colin McCabe -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset
jsancio commented on a change in pull request #9816: URL: https://github.com/apache/kafka/pull/9816#discussion_r557586528 ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -29,15 +32,22 @@ import org.apache.kafka.snapshot.FileRawSnapshotReader import org.apache.kafka.snapshot.FileRawSnapshotWriter import org.apache.kafka.snapshot.RawSnapshotReader import org.apache.kafka.snapshot.RawSnapshotWriter +import org.apache.kafka.snapshot.Snapshots import scala.compat.java8.OptionConverters._ -class KafkaMetadataLog( +final class KafkaMetadataLog private ( log: Log, + snapshotIds: ConcurrentSkipListSet[raft.OffsetAndEpoch], Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset
jsancio commented on a change in pull request #9816: URL: https://github.com/apache/kafka/pull/9816#discussion_r557586344 ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -113,6 +145,22 @@ class KafkaMetadataLog( log.truncateTo(offset) } + override def truncateFullyToLatestSnapshot(): Boolean = { +// Truncate the log fully if the latest snapshot is greater than the log end offset +var truncated = false Review comment: Done. Used Scala's `match`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #9876: KAFKA-12183: Add the KIP-631 metadata record definitions
cmccabe merged pull request #9876: URL: https://github.com/apache/kafka/pull/9876 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset
jsancio commented on a change in pull request #9816: URL: https://github.com/apache/kafka/pull/9816#discussion_r557586025 ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -131,6 +179,10 @@ class KafkaMetadataLog( } } + override def highWatermark: Long = { Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12190) Failure on Windows due to an UnsupportedOperationException when StateDirectory sets file permissions
[ https://issues.apache.org/jira/browse/KAFKA-12190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17265081#comment-17265081 ] Gary Russell commented on KAFKA-12190: -- Given that this is a critical bug, and a regression in a point release (2.6.1), doesn't this warrant a fairly swift 2.6.2 release? > Failure on Windows due to an UnsupportedOperationException when > StateDirectory sets file permissions > > > Key: KAFKA-12190 > URL: https://issues.apache.org/jira/browse/KAFKA-12190 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.1 >Reporter: Andy Wilkinson >Priority: Critical > Fix For: 2.8.0 > > > There appears to be a regression in Kafka 2.6.1 due to [the > changes|https://github.com/apache/kafka/pull/9583] made for KAFKA-10705 that > causes a failure on Windows. After upgrading to 2.6.1 from 2.6.0, we're > seeing failures in Spring Boot's CI on Windows such as the following: > {noformat} > Caused by: java.lang.UnsupportedOperationException: (No message provided) > at java.nio.file.Files.setPosixFilePermissions(Files.java:2044) > at > org.apache.kafka.streams.processor.internals.StateDirectory.(StateDirectory.java:115) > > at > org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:745) > at > org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:585) > at > org.springframework.kafka.config.StreamsBuilderFactoryBean.start(StreamsBuilderFactoryBean.java:316) > > at > org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) > > at > org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54) > > at > org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) > > at java.lang.Iterable.forEach(Iterable.java:75) > at > org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) > > at > org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) > > at > org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:940) > > at > org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:591) > > at > org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.configureContext(AbstractApplicationContextRunner.java:447) > > at > org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.createAndLoadContext(AbstractApplicationContextRunner.java:423) > > at > org.springframework.boot.test.context.assertj.AssertProviderApplicationContextInvocationHandler.getContextOrStartupFailure(AssertProviderApplicationContextInvocationHandler.java:61) > > at > org.springframework.boot.test.context.assertj.AssertProviderApplicationContextInvocationHandler.(AssertProviderApplicationContextInvocationHandler.java:48) > > at > org.springframework.boot.test.context.assertj.ApplicationContextAssertProvider.get(ApplicationContextAssertProvider.java:112) > > at > org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.createAssertableContext(AbstractApplicationContextRunner.java:412) > > at > org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.lambda$null$0(AbstractApplicationContextRunner.java:382) > > at > org.springframework.boot.test.util.TestPropertyValues.applyToSystemProperties(TestPropertyValues.java:175) > > at > org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.lambda$run$1(AbstractApplicationContextRunner.java:381) > > at > org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.withContextClassLoader(AbstractApplicationContextRunner.java:392) > > at > org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.run(AbstractApplicationContextRunner.java:381) > > at > org.springframework.boot.actuate.autoconfigure.metrics.KafkaMetricsAutoConfigurationTests.whenKafkaStreamsIsEnabledAndThereIsNoMeterRegistryThenListenerCustomizationBacksOff(KafkaMetricsAutoConfigurationTests.java:92) > {noformat} > The same code worked without changes using Kafka 2.6.0. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset
jsancio commented on a change in pull request #9816: URL: https://github.com/apache/kafka/pull/9816#discussion_r557585643 ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -79,13 +94,30 @@ class KafkaMetadataLog( throw new IllegalArgumentException("Attempt to append an empty record set") val appendInfo = log.appendAsFollower(records.asInstanceOf[MemoryRecords]) + +if (appendInfo.rolled) { + log.deleteOldSegments() +} + new LogAppendInfo(appendInfo.firstOffset.getOrElse { throw new KafkaException("Append failed unexpectedly") }, appendInfo.lastOffset) } override def lastFetchedEpoch: Int = { -log.latestEpoch.getOrElse(0) +log.latestEpoch.getOrElse { + latestSnapshotId.map { snapshotId => +val logEndOffset = endOffset().offset +if (snapshotId.offset == logEndOffset) { Review comment: I made this explicit by checking both log start offset and log end offset. ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -113,6 +145,22 @@ class KafkaMetadataLog( log.truncateTo(offset) } + override def truncateFullyToLatestSnapshot(): Boolean = { +// Truncate the log fully if the latest snapshot is greater than the log end offset +var truncated = false +latestSnapshotId.ifPresent { snapshotId => + if (snapshotId.epoch > log.latestEpoch.getOrElse(0) || Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset
jsancio commented on a change in pull request #9816: URL: https://github.com/apache/kafka/pull/9816#discussion_r557585891 ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -147,18 +199,106 @@ class KafkaMetadataLog( } override def createSnapshot(snapshotId: raft.OffsetAndEpoch): RawSnapshotWriter = { -FileRawSnapshotWriter.create(log.dir.toPath, snapshotId) +// TODO: Talk to Jason about truncation past the high-watermark since it can lead to truncation past snapshots. +// This can result in the leader having a snapshot that is less that the follower's snapshot. I think that the Raft +// Client checks against this and aborts. If so, then this check and exception is okay. + +// Do let the state machine create snapshots older than the latest snapshot +latestSnapshotId().ifPresent { latest => + if (latest.epoch > snapshotId.epoch || latest.offset > snapshotId.offset) { +// Since snapshots are less than the high-watermark absolute offset comparison is okay. +throw new IllegalArgumentException( + s"Attemting to create a snapshot ($snapshotId) that is not greater than the latest snapshot ($latest)" +) + } +} + +FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, Optional.of(this)) } override def readSnapshot(snapshotId: raft.OffsetAndEpoch): Optional[RawSnapshotReader] = { try { - Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId)) + if (snapshotIds.contains(snapshotId)) { +Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId)) + } else { +Optional.empty() + } } catch { - case e: NoSuchFileException => Optional.empty() + case e: NoSuchFileException => +Optional.empty() } } + override def latestSnapshotId(): Optional[raft.OffsetAndEpoch] = { +try { + Optional.of(snapshotIds.last) +} catch { + case _: NoSuchElementException => +Optional.empty() +} + } + + override def startSnapshotId(): Optional[raft.OffsetAndEpoch] = { +startSnapshotId; Review comment: Hehe. Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset
jsancio commented on a change in pull request #9816: URL: https://github.com/apache/kafka/pull/9816#discussion_r557584847 ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -29,15 +32,22 @@ import org.apache.kafka.snapshot.FileRawSnapshotReader import org.apache.kafka.snapshot.FileRawSnapshotWriter import org.apache.kafka.snapshot.RawSnapshotReader import org.apache.kafka.snapshot.RawSnapshotWriter +import org.apache.kafka.snapshot.Snapshots import scala.compat.java8.OptionConverters._ -class KafkaMetadataLog( +final class KafkaMetadataLog private ( log: Log, + snapshotIds: ConcurrentSkipListSet[raft.OffsetAndEpoch], topicPartition: TopicPartition, - maxFetchSizeInBytes: Int = 1024 * 1024 + maxFetchSizeInBytes: Int ) extends ReplicatedLog { + private[this] var startSnapshotId = snapshotIds Review comment: Give me a sec to reply to this. ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -79,13 +94,30 @@ class KafkaMetadataLog( throw new IllegalArgumentException("Attempt to append an empty record set") val appendInfo = log.appendAsFollower(records.asInstanceOf[MemoryRecords]) + +if (appendInfo.rolled) { + log.deleteOldSegments() +} + new LogAppendInfo(appendInfo.firstOffset.getOrElse { throw new KafkaException("Append failed unexpectedly") }, appendInfo.lastOffset) } override def lastFetchedEpoch: Int = { -log.latestEpoch.getOrElse(0) +log.latestEpoch.getOrElse { + latestSnapshotId.map { snapshotId => +val logEndOffset = endOffset().offset +if (snapshotId.offset == logEndOffset) { + snapshotId.epoch +} else { + throw new KafkaException( +s"Log doesn't have a last fetch epoch and there is a snapshot ($snapshotId). " + +s"Expected the snapshot's end offset to match the logs end offset ($logEndOffset)" + ) +} + } orElse(0) Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] aloknnikhil commented on a change in pull request #9881: MINOR: Initialize QuorumState lazily in RaftClient.initialize()
aloknnikhil commented on a change in pull request #9881: URL: https://github.com/apache/kafka/pull/9881#discussion_r557584646 ## File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java ## @@ -129,16 +133,41 @@ public String toString() { QUORUM_LINGER_MS_DOC); } +private final int requestTimeoutMs; +private final int retryBackoffMs; +private final int electionTimeoutMs; +private final int electionBackoffMaxMs; +private final int fetchTimeoutMs; +private final int appendLingerMs; + public RaftConfig(Properties props) { super(CONFIG, props); +requestTimeoutMs = getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG); +retryBackoffMs = getInt(QUORUM_RETRY_BACKOFF_MS_CONFIG); +electionTimeoutMs = getInt(QUORUM_ELECTION_TIMEOUT_MS_CONFIG); +electionBackoffMaxMs = getInt(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG); +fetchTimeoutMs = getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG); +appendLingerMs = getInt(QUORUM_LINGER_MS_CONFIG); } -public RaftConfig(Map props) { +public RaftConfig(Map props) { super(CONFIG, props); +requestTimeoutMs = getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG); Review comment: Perfect. Removed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9881: MINOR: Initialize QuorumState lazily in RaftClient.initialize()
hachikuji commented on a change in pull request #9881: URL: https://github.com/apache/kafka/pull/9881#discussion_r557584121 ## File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java ## @@ -129,16 +133,41 @@ public String toString() { QUORUM_LINGER_MS_DOC); } +private final int requestTimeoutMs; +private final int retryBackoffMs; +private final int electionTimeoutMs; +private final int electionBackoffMaxMs; +private final int fetchTimeoutMs; +private final int appendLingerMs; + public RaftConfig(Properties props) { super(CONFIG, props); +requestTimeoutMs = getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG); +retryBackoffMs = getInt(QUORUM_RETRY_BACKOFF_MS_CONFIG); +electionTimeoutMs = getInt(QUORUM_ELECTION_TIMEOUT_MS_CONFIG); +electionBackoffMaxMs = getInt(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG); +fetchTimeoutMs = getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG); +appendLingerMs = getInt(QUORUM_LINGER_MS_CONFIG); } -public RaftConfig(Map props) { +public RaftConfig(Map props) { super(CONFIG, props); +requestTimeoutMs = getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG); Review comment: Yeah, that's why I said it was redundant in the other comment 🙂 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9876: KAFKA-12183: Add the KIP-631 metadata record definitions
cmccabe commented on a change in pull request #9876: URL: https://github.com/apache/kafka/pull/9876#discussion_r557580624 ## File path: build.gradle ## @@ -1010,6 +1015,53 @@ project(':core') { } } +project(':metadata') { + archivesBaseName = "kafka-metadata" + + dependencies { +compile project(':clients') +compile libs.jacksonDatabind +compile libs.jacksonJDK8Datatypes +compileOnly libs.log4j +testCompile libs.junitJupiter +testCompile libs.hamcrest +testCompile libs.slf4jlog4j +testCompile project(':clients').sourceSets.test.output // for org.apache.kafka.test.IntegrationTest Review comment: later PRs add IntegrationTest to the metadata module... simpler to just keep it in this PR This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9876: KAFKA-12183: Add the KIP-631 metadata record definitions
cmccabe commented on a change in pull request #9876: URL: https://github.com/apache/kafka/pull/9876#discussion_r557580302 ## File path: build.gradle ## @@ -1010,6 +1015,53 @@ project(':core') { } } +project(':metadata') { + archivesBaseName = "kafka-metadata" + + dependencies { +compile project(':clients') +compile libs.jacksonDatabind +compile libs.jacksonJDK8Datatypes +compileOnly libs.log4j +testCompile libs.junitJupiter +testCompile libs.hamcrest Review comment: yes, we use it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #9832: KAFKA-12152; Idempotent Producer does not reset the sequence number of partitions without in-flight batches
dajac commented on a change in pull request #9832: URL: https://github.com/apache/kafka/pull/9832#discussion_r557576590 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ## @@ -598,6 +618,16 @@ synchronized Integer sequenceNumber(TopicPartition topicPartition) { return topicPartitionBookkeeper.getPartition(topicPartition).nextSequence; } +/** + * Returns the current producer id/epoch of the given TopicPartition. + */ +synchronized ProducerIdAndEpoch producerIdAndEpoch(TopicPartition topicPartition) { +if (!isTransactional()) +topicPartitionBookkeeper.addPartition(topicPartition); Review comment: I wonder if we could create the partition state once for all instead of having to do this here and in other places. I need to find where we could do so. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] aloknnikhil commented on a change in pull request #9881: MINOR: Initialize QuorumState lazily in RaftClient.initialize()
aloknnikhil commented on a change in pull request #9881: URL: https://github.com/apache/kafka/pull/9881#discussion_r557575111 ## File path: raft/src/test/java/org/apache/kafka/raft/RaftTestUtil.java ## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.Node; + +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class RaftTestUtil { +public static RaftConfig buildRaftConfig(int requestTimeoutMs, Review comment: Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] aloknnikhil commented on a change in pull request #9881: MINOR: Initialize QuorumState lazily in RaftClient.initialize()
aloknnikhil commented on a change in pull request #9881: URL: https://github.com/apache/kafka/pull/9881#discussion_r557574782 ## File path: raft/src/test/java/org/apache/kafka/raft/RaftConfigTest.java ## @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.Properties; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; Review comment: Done. Removed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] aloknnikhil commented on a change in pull request #9881: MINOR: Initialize QuorumState lazily in RaftClient.initialize()
aloknnikhil commented on a change in pull request #9881: URL: https://github.com/apache/kafka/pull/9881#discussion_r557574380 ## File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java ## @@ -129,16 +133,41 @@ public String toString() { QUORUM_LINGER_MS_DOC); } +private final int requestTimeoutMs; +private final int retryBackoffMs; +private final int electionTimeoutMs; +private final int electionBackoffMaxMs; +private final int fetchTimeoutMs; +private final int appendLingerMs; + public RaftConfig(Properties props) { super(CONFIG, props); +requestTimeoutMs = getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG); +retryBackoffMs = getInt(QUORUM_RETRY_BACKOFF_MS_CONFIG); +electionTimeoutMs = getInt(QUORUM_ELECTION_TIMEOUT_MS_CONFIG); +electionBackoffMaxMs = getInt(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG); +fetchTimeoutMs = getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG); +appendLingerMs = getInt(QUORUM_LINGER_MS_CONFIG); } -public RaftConfig(Map props) { +public RaftConfig(Map props) { super(CONFIG, props); +requestTimeoutMs = getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG); Review comment: Actually, that means I can remove the `public RaftConfig(Properties props)` constructor. Does that make sense @hachikuji? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #9832: KAFKA-12152; Idempotent Producer does not reset the sequence number of partitions without in-flight batches
dajac commented on a change in pull request #9832: URL: https://github.com/apache/kafka/pull/9832#discussion_r557574182 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ## @@ -567,8 +571,18 @@ private void bumpIdempotentProducerEpoch() { this.topicPartitionBookkeeper.startSequencesAtBeginning(topicPartition, this.producerIdAndEpoch); this.partitionsWithUnresolvedSequences.remove(topicPartition); } - this.partitionsToRewriteSequences.clear(); + +// When the epoch is bumped, reset the sequences for the partitions(s) that do not +// have in-flight batches. Sequences of the in-flight batches that did not triggered +// the epoch bump are treated when their last in-flight batch is completed. +for (TopicPartition topicPartition : this.topicPartitionBookkeeper.partitions()) { Review comment: Thanks for your comment. I do agree with your conclusion about point 2. I have managed to reproduce this in a unit test as well. Regarding your suggestion to store the producer id/epoch per partition, I like it. I actually thought about this as well but went conservative for my first draft. I have update the PR to reflect your suggestion. Please take a look and let me know what you think. I think that there are still room for improvement in the PR. Especially, I have to review the unit tests. Some might not be entirely relevant with the changes done in this PR or some might need to be refactored a bit more. I am not sure. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] aloknnikhil commented on a change in pull request #9881: MINOR: Initialize QuorumState lazily in RaftClient.initialize()
aloknnikhil commented on a change in pull request #9881: URL: https://github.com/apache/kafka/pull/9881#discussion_r557573785 ## File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java ## @@ -129,16 +133,41 @@ public String toString() { QUORUM_LINGER_MS_DOC); } +private final int requestTimeoutMs; +private final int retryBackoffMs; +private final int electionTimeoutMs; +private final int electionBackoffMaxMs; +private final int fetchTimeoutMs; +private final int appendLingerMs; + public RaftConfig(Properties props) { super(CONFIG, props); +requestTimeoutMs = getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG); +retryBackoffMs = getInt(QUORUM_RETRY_BACKOFF_MS_CONFIG); +electionTimeoutMs = getInt(QUORUM_ELECTION_TIMEOUT_MS_CONFIG); +electionBackoffMaxMs = getInt(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG); +fetchTimeoutMs = getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG); +appendLingerMs = getInt(QUORUM_LINGER_MS_CONFIG); } -public RaftConfig(Map props) { +public RaftConfig(Map props) { super(CONFIG, props); +requestTimeoutMs = getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG); Review comment: Ah thanks for catching that. Turns out both public constructors can use the protected constructor (Properties implements Map). Will refactor both. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] aloknnikhil commented on a change in pull request #9881: MINOR: Initialize QuorumState lazily in RaftClient.initialize()
aloknnikhil commented on a change in pull request #9881: URL: https://github.com/apache/kafka/pull/9881#discussion_r557571170 ## File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java ## @@ -129,16 +133,41 @@ public String toString() { QUORUM_LINGER_MS_DOC); } +private final int requestTimeoutMs; +private final int retryBackoffMs; +private final int electionTimeoutMs; +private final int electionBackoffMaxMs; +private final int fetchTimeoutMs; +private final int appendLingerMs; + public RaftConfig(Properties props) { Review comment: Looks like there are a bunch of tests that seem to be using this constructor - `testSingleQuorumVoterConnections` - `testMultiQuorumVoterConnections` - `testInvalidQuorumVotersConfig` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9881: MINOR: Initialize QuorumState lazily in RaftClient.initialize()
hachikuji commented on a change in pull request #9881: URL: https://github.com/apache/kafka/pull/9881#discussion_r557559024 ## File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java ## @@ -129,16 +133,41 @@ public String toString() { QUORUM_LINGER_MS_DOC); } +private final int requestTimeoutMs; +private final int retryBackoffMs; +private final int electionTimeoutMs; +private final int electionBackoffMaxMs; +private final int fetchTimeoutMs; +private final int appendLingerMs; + public RaftConfig(Properties props) { Review comment: nit: I think this constructor is redundant. can we remove it? ## File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java ## @@ -129,16 +133,41 @@ public String toString() { QUORUM_LINGER_MS_DOC); } +private final int requestTimeoutMs; +private final int retryBackoffMs; +private final int electionTimeoutMs; +private final int electionBackoffMaxMs; +private final int fetchTimeoutMs; +private final int appendLingerMs; + public RaftConfig(Properties props) { super(CONFIG, props); +requestTimeoutMs = getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG); +retryBackoffMs = getInt(QUORUM_RETRY_BACKOFF_MS_CONFIG); +electionTimeoutMs = getInt(QUORUM_ELECTION_TIMEOUT_MS_CONFIG); +electionBackoffMaxMs = getInt(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG); +fetchTimeoutMs = getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG); +appendLingerMs = getInt(QUORUM_LINGER_MS_CONFIG); } -public RaftConfig(Map props) { +public RaftConfig(Map props) { super(CONFIG, props); +requestTimeoutMs = getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG); Review comment: nit: the typical thing to do is invoke the other constructor. e.g. `this(props, true)` ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -2163,7 +2177,13 @@ public Long scheduleAppend(int epoch, List records) { @Override public void close() { -kafkaRaftMetrics.close(); +if (kafkaRaftMetrics != null) { +kafkaRaftMetrics.close(); +} +} + +public QuorumState quorum() { Review comment: Any chance we can make this default access? ## File path: raft/src/test/java/org/apache/kafka/raft/RaftTestUtil.java ## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.Node; + +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class RaftTestUtil { +public static RaftConfig buildRaftConfig(int requestTimeoutMs, Review comment: nit: convention in the raft code is to align first parameter on next line: ``` public static RaftConfig buildRaftConfig( int requestTimeoutMs, int retryBackoffMs, ... List voterNodes ) { ``` ## File path: raft/src/test/java/org/apache/kafka/raft/RaftConfigTest.java ## @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.Properties; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; Review comment: This does not seem used? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset
jsancio commented on a change in pull request #9816: URL: https://github.com/apache/kafka/pull/9816#discussion_r557565077 ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -147,18 +199,106 @@ class KafkaMetadataLog( } override def createSnapshot(snapshotId: raft.OffsetAndEpoch): RawSnapshotWriter = { -FileRawSnapshotWriter.create(log.dir.toPath, snapshotId) +// TODO: Talk to Jason about truncation past the high-watermark since it can lead to truncation past snapshots. +// This can result in the leader having a snapshot that is less that the follower's snapshot. I think that the Raft +// Client checks against this and aborts. If so, then this check and exception is okay. + +// Do let the state machine create snapshots older than the latest snapshot +latestSnapshotId().ifPresent { latest => + if (latest.epoch > snapshotId.epoch || latest.offset > snapshotId.offset) { +// Since snapshots are less than the high-watermark absolute offset comparison is okay. +throw new IllegalArgumentException( + s"Attemting to create a snapshot ($snapshotId) that is not greater than the latest snapshot ($latest)" +) + } +} + +FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, Optional.of(this)) } override def readSnapshot(snapshotId: raft.OffsetAndEpoch): Optional[RawSnapshotReader] = { try { - Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId)) + if (snapshotIds.contains(snapshotId)) { +Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId)) + } else { +Optional.empty() + } } catch { - case e: NoSuchFileException => Optional.empty() + case e: NoSuchFileException => +Optional.empty() } } + override def latestSnapshotId(): Optional[raft.OffsetAndEpoch] = { +try { + Optional.of(snapshotIds.last) +} catch { + case _: NoSuchElementException => +Optional.empty() +} + } + + override def startSnapshotId(): Optional[raft.OffsetAndEpoch] = { +startSnapshotId; + } + + override def snapshotFrozen(snapshotId: raft.OffsetAndEpoch): Unit = { +snapshotIds.add(snapshotId) + } + + override def updateLogStart(logStartSnapshotId: raft.OffsetAndEpoch): Boolean = { Review comment: Yes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset
jsancio commented on a change in pull request #9816: URL: https://github.com/apache/kafka/pull/9816#discussion_r557563029 ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -147,18 +199,106 @@ class KafkaMetadataLog( } override def createSnapshot(snapshotId: raft.OffsetAndEpoch): RawSnapshotWriter = { -FileRawSnapshotWriter.create(log.dir.toPath, snapshotId) +// TODO: Talk to Jason about truncation past the high-watermark since it can lead to truncation past snapshots. +// This can result in the leader having a snapshot that is less that the follower's snapshot. I think that the Raft +// Client checks against this and aborts. If so, then this check and exception is okay. + +// Do let the state machine create snapshots older than the latest snapshot +latestSnapshotId().ifPresent { latest => + if (latest.epoch > snapshotId.epoch || latest.offset > snapshotId.offset) { +// Since snapshots are less than the high-watermark absolute offset comparison is okay. +throw new IllegalArgumentException( + s"Attemting to create a snapshot ($snapshotId) that is not greater than the latest snapshot ($latest)" +) + } +} + +FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, Optional.of(this)) } override def readSnapshot(snapshotId: raft.OffsetAndEpoch): Optional[RawSnapshotReader] = { try { - Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId)) + if (snapshotIds.contains(snapshotId)) { +Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId)) + } else { +Optional.empty() + } } catch { - case e: NoSuchFileException => Optional.empty() + case e: NoSuchFileException => +Optional.empty() } } + override def latestSnapshotId(): Optional[raft.OffsetAndEpoch] = { +try { + Optional.of(snapshotIds.last) +} catch { + case _: NoSuchElementException => +Optional.empty() +} + } + + override def startSnapshotId(): Optional[raft.OffsetAndEpoch] = { Review comment: Sounds good. In this PR it is not technically the oldest snapshot but it will be when we implement deleting snapshot in https://issues.apache.org/jira/browse/KAFKA-12205 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #9895: KAFKA-9924: Add docs for RocksDB properties-based metrics
cadonna commented on pull request #9895: URL: https://github.com/apache/kafka/pull/9895#issuecomment-760336972 Call for review: @guozhangwang @ableegoldman @lct45 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna opened a new pull request #9895: KAFKA-9924: Add docs for RocksDB properties-based metrics
cadonna opened a new pull request #9895: URL: https://github.com/apache/kafka/pull/9895 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset
jsancio commented on a change in pull request #9816: URL: https://github.com/apache/kafka/pull/9816#discussion_r557557769 ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -147,18 +199,106 @@ class KafkaMetadataLog( } override def createSnapshot(snapshotId: raft.OffsetAndEpoch): RawSnapshotWriter = { -FileRawSnapshotWriter.create(log.dir.toPath, snapshotId) +// TODO: Talk to Jason about truncation past the high-watermark since it can lead to truncation past snapshots. +// This can result in the leader having a snapshot that is less that the follower's snapshot. I think that the Raft +// Client checks against this and aborts. If so, then this check and exception is okay. + +// Do let the state machine create snapshots older than the latest snapshot +latestSnapshotId().ifPresent { latest => + if (latest.epoch > snapshotId.epoch || latest.offset > snapshotId.offset) { +// Since snapshots are less than the high-watermark absolute offset comparison is okay. +throw new IllegalArgumentException( + s"Attemting to create a snapshot ($snapshotId) that is not greater than the latest snapshot ($latest)" +) + } +} + +FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, Optional.of(this)) } override def readSnapshot(snapshotId: raft.OffsetAndEpoch): Optional[RawSnapshotReader] = { try { - Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId)) + if (snapshotIds.contains(snapshotId)) { +Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId)) + } else { +Optional.empty() + } } catch { - case e: NoSuchFileException => Optional.empty() + case e: NoSuchFileException => +Optional.empty() } } + override def latestSnapshotId(): Optional[raft.OffsetAndEpoch] = { +try { + Optional.of(snapshotIds.last) +} catch { + case _: NoSuchElementException => Review comment: I agree. Unfortunately, `pollLast` removes the element from the set. I don't think we have a choice but to do this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset
jsancio commented on a change in pull request #9816: URL: https://github.com/apache/kafka/pull/9816#discussion_r557557769 ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -147,18 +199,106 @@ class KafkaMetadataLog( } override def createSnapshot(snapshotId: raft.OffsetAndEpoch): RawSnapshotWriter = { -FileRawSnapshotWriter.create(log.dir.toPath, snapshotId) +// TODO: Talk to Jason about truncation past the high-watermark since it can lead to truncation past snapshots. +// This can result in the leader having a snapshot that is less that the follower's snapshot. I think that the Raft +// Client checks against this and aborts. If so, then this check and exception is okay. + +// Do let the state machine create snapshots older than the latest snapshot +latestSnapshotId().ifPresent { latest => + if (latest.epoch > snapshotId.epoch || latest.offset > snapshotId.offset) { +// Since snapshots are less than the high-watermark absolute offset comparison is okay. +throw new IllegalArgumentException( + s"Attemting to create a snapshot ($snapshotId) that is not greater than the latest snapshot ($latest)" +) + } +} + +FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, Optional.of(this)) } override def readSnapshot(snapshotId: raft.OffsetAndEpoch): Optional[RawSnapshotReader] = { try { - Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId)) + if (snapshotIds.contains(snapshotId)) { +Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId)) + } else { +Optional.empty() + } } catch { - case e: NoSuchFileException => Optional.empty() + case e: NoSuchFileException => +Optional.empty() } } + override def latestSnapshotId(): Optional[raft.OffsetAndEpoch] = { +try { + Optional.of(snapshotIds.last) +} catch { + case _: NoSuchElementException => Review comment: I agree. Unfortunately, `pollLast` removes the element from the list. I don't think we have a choice but to do this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset
jsancio commented on a change in pull request #9816: URL: https://github.com/apache/kafka/pull/9816#discussion_r557556584 ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -147,18 +199,106 @@ class KafkaMetadataLog( } override def createSnapshot(snapshotId: raft.OffsetAndEpoch): RawSnapshotWriter = { -FileRawSnapshotWriter.create(log.dir.toPath, snapshotId) +// TODO: Talk to Jason about truncation past the high-watermark since it can lead to truncation past snapshots. Review comment: If you look at the if statement below this comment... ```scala if (latest.epoch > snapshotId.epoch || latest.offset > snapshotId.offset) { ``` That naked offset comparison is only valid if the snapshot are always less that the high-watermark and we never truncate past the high-watermark. I checked this after writing the comment. Out side of data loss we don't allow followers to truncate past the high watermark. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10877) Instantiating loggers for every FetchContext causes low request handler idle pool ratio.
[ https://issues.apache.org/jira/browse/KAFKA-10877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17265009#comment-17265009 ] Ismael Juma commented on KAFKA-10877: - Sounds good. > Instantiating loggers for every FetchContext causes low request handler idle > pool ratio. > > > Key: KAFKA-10877 > URL: https://issues.apache.org/jira/browse/KAFKA-10877 > Project: Kafka > Issue Type: Bug >Reporter: Sean McCauliff >Assignee: Sean McCauliff >Priority: Major > > JDK11 has removed some classes used by log4j2 to initialize logging contexts. > Now log4j2 uses StackWalker to discover where it has been instantiated. > StackWalker is apparently very expensive. > Kafka has a Logging trait. Classes which want to log application messages > get access to the methods provided by the trait by mixing them in using "with > Logging". When this is done on scala object (a singleton) this is fine as > the logging context in the Logging trait is only initialized at most once. > When this is done on class (e.g. class X extends Logging) the logging context > is potentially created for each instance. The logging context is needed to > determine if a log message will be emitted. So if the method debug("log me") > is called the logging context is still initialized to determine if debug > logging is enabled. Initializing the logging context calls StackWalker. > This can't be avoided even if the log message would never be written to the > log. > IncrementalFetchContext is one such class that is inheriting from Logging and > incurring a very high cpu cost. It also does this inside of locks. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)