[GitHub] [kafka] showuon commented on pull request #9888: KAFKA-12194: use stateListener to catch each state change

2021-01-14 Thread GitBox


showuon commented on pull request #9888:
URL: https://github.com/apache/kafka/pull/9888#issuecomment-760714184


   @cadonna , thanks for the comments. I've updated in this commit: 
https://github.com/apache/kafka/pull/9888/commits/46898d994bb3c3495aea967ace9a07549f7fc1e5.
 Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #9888: KAFKA-12194: use stateListener to catch each state change

2021-01-14 Thread GitBox


showuon commented on a change in pull request #9888:
URL: https://github.com/apache/kafka/pull/9888#discussion_r557960104



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##
@@ -90,24 +91,64 @@ public void setup() {
 );
 }
 
+private void startStreamsAndWaitForRunning(final KafkaStreams 
kafkaStreams) throws InterruptedException {
+kafkaStreams.start();
+waitForStateTransition(KafkaStreams.State.RUNNING);
+}
+
 @After
 public void teardown() throws IOException {
+stateTransitionHistory.clear();
 purgeLocalStreamsState(properties);
 }
 
+private void addStreamStateChangeListener(final KafkaStreams kafkaStreams) 
{
+// we store each new state in state transition so that we won't miss 
any state change
+kafkaStreams.setStateListener(
+(newState, oldState) -> stateTransitionHistory.add(newState)
+);
+}
+
+private void waitForStateTransition(final KafkaStreams.State expected) 
throws InterruptedException {
+waitForCondition(
+() -> !stateTransitionHistory.isEmpty() && 
stateTransitionHistory.contains(expected),
+DEFAULT_DURATION.toMillis(),
+() -> String.format("Client did not change to the %s state in 
time. Observed new state transitions: %s",
+expected, stateTransitionHistory)
+);
+}

Review comment:
   We can't just check the current state to become `RUNNING` because after 
we add/remove threads, the state won't change immediately. That is, if we check 
if the state is `RUNNING` after adding/removing threads, the check will pass, 
but the rebalance is not happening, yet, which will cause the test fail. So I 
still use `stateTransitionHistory` to check the state, and also, I checked the 
last state of the history to see if it is RUNNING. That should be better. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #9888: KAFKA-12194: use stateListener to catch each state change

2021-01-14 Thread GitBox


showuon commented on a change in pull request #9888:
URL: https://github.com/apache/kafka/pull/9888#discussion_r557960104



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##
@@ -90,24 +91,64 @@ public void setup() {
 );
 }
 
+private void startStreamsAndWaitForRunning(final KafkaStreams 
kafkaStreams) throws InterruptedException {
+kafkaStreams.start();
+waitForStateTransition(KafkaStreams.State.RUNNING);
+}
+
 @After
 public void teardown() throws IOException {
+stateTransitionHistory.clear();
 purgeLocalStreamsState(properties);
 }
 
+private void addStreamStateChangeListener(final KafkaStreams kafkaStreams) 
{
+// we store each new state in state transition so that we won't miss 
any state change
+kafkaStreams.setStateListener(
+(newState, oldState) -> stateTransitionHistory.add(newState)
+);
+}
+
+private void waitForStateTransition(final KafkaStreams.State expected) 
throws InterruptedException {
+waitForCondition(
+() -> !stateTransitionHistory.isEmpty() && 
stateTransitionHistory.contains(expected),
+DEFAULT_DURATION.toMillis(),
+() -> String.format("Client did not change to the %s state in 
time. Observed new state transitions: %s",
+expected, stateTransitionHistory)
+);
+}

Review comment:
   We can't just check the current state to become `RUNNING` because after 
we add/remove threads, the state won't change immediately. That is, if we check 
if the state is `RUNNING` after adding/removing threads, the check will pass, 
but the rebalance is not happening, yet, which will cause the test fail. So I 
still use `stateTransitionHistory` to check the state, and I checked the last 
state of the history to see if it is RUNNING. That should be better. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #9888: KAFKA-12194: use stateListener to catch each state change

2021-01-14 Thread GitBox


showuon commented on a change in pull request #9888:
URL: https://github.com/apache/kafka/pull/9888#discussion_r557939529



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##
@@ -90,24 +91,64 @@ public void setup() {
 );
 }
 
+private void startStreamsAndWaitForRunning(final KafkaStreams 
kafkaStreams) throws InterruptedException {
+kafkaStreams.start();
+waitForStateTransition(KafkaStreams.State.RUNNING);
+}
+
 @After
 public void teardown() throws IOException {
+stateTransitionHistory.clear();
 purgeLocalStreamsState(properties);
 }
 
+private void addStreamStateChangeListener(final KafkaStreams kafkaStreams) 
{
+// we store each new state in state transition so that we won't miss 
any state change
+kafkaStreams.setStateListener(
+(newState, oldState) -> stateTransitionHistory.add(newState)
+);
+}
+
+private void waitForStateTransition(final KafkaStreams.State expected) 
throws InterruptedException {
+waitForCondition(
+() -> !stateTransitionHistory.isEmpty() && 
stateTransitionHistory.contains(expected),
+DEFAULT_DURATION.toMillis(),
+() -> String.format("Client did not change to the %s state in 
time. Observed new state transitions: %s",
+expected, stateTransitionHistory)
+);
+}
+
+// verify if there's the state change from "before" state into "after" 
state
+private boolean hasStateTransition(final KafkaStreams.State before, final 
KafkaStreams.State after) {
+// should have at least 2 states in history
+if (stateTransitionHistory.size() < 2) {
+return false;
+}
+
+for (int i = 0; i < stateTransitionHistory.size() - 1; i++) {
+if (stateTransitionHistory.get(i).equals(before) && 
stateTransitionHistory.get(i + 1).equals(after)) {
+return true;
+}
+}
+return false;

Review comment:
   I use `for loop` is because I think there could be cases that there are 
some other state changes after RUNNING, ex: DEAD. But after your question, I 
think if that happened, the test should also fail as well. So, check the last 2 
elements is good. Updated. Thanks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #9888: KAFKA-12194: use stateListener to catch each state change

2021-01-14 Thread GitBox


showuon commented on a change in pull request #9888:
URL: https://github.com/apache/kafka/pull/9888#discussion_r557937603



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##
@@ -119,37 +160,47 @@ public void shouldAddStreamThread() throws Exception {
 .sorted().toArray(),
 equalTo(new String[] {"1", "2", "3"})
 );
-waitForApplicationState(Collections.singletonList(kafkaStreams), 
KafkaStreams.State.REBALANCING, DEFAULT_DURATION);
-waitForApplicationState(Collections.singletonList(kafkaStreams), 
KafkaStreams.State.RUNNING, DEFAULT_DURATION);
+
+waitForStateTransition(KafkaStreams.State.RUNNING);
+assertTrue(hasStateTransition(KafkaStreams.State.REBALANCING, 
KafkaStreams.State.RUNNING));

Review comment:
   OK, Updated. Thanks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12191) SslTransportTls12Tls13Test can replace 'assumeTrue' by (junit 5) conditional test

2021-01-14 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-12191.

Fix Version/s: 2.8.0
   Resolution: Fixed

> SslTransportTls12Tls13Test can replace 'assumeTrue' by (junit 5) conditional 
> test
> -
>
> Key: KAFKA-12191
> URL: https://issues.apache.org/jira/browse/KAFKA-12191
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: GeordieMai
>Priority: Minor
> Fix For: 2.8.0
>
>
> the conditional test is more readable than assumeTest in testing code.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 merged pull request #9899: KAFKA-12191 SslTransportTls12Tls13Test can replace 'assumeTrue' by (j…

2021-01-14 Thread GitBox


chia7712 merged pull request #9899:
URL: https://github.com/apache/kafka/pull/9899


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12189) ShellTest can replace 'assumeTrue' by (junit 5) conditional test

2021-01-14 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-12189.

Fix Version/s: 2.8.0
   Resolution: Fixed

> ShellTest can replace 'assumeTrue' by (junit 5) conditional test
> 
>
> Key: KAFKA-12189
> URL: https://issues.apache.org/jira/browse/KAFKA-12189
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: GeordieMai
>Priority: Minor
> Fix For: 2.8.0
>
>
> from https://github.com/apache/kafka/pull/9874#discussion_r556664433
> the conditional test is more readable than assumeTest in testing code.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 merged pull request #9898: KAFKA-12189 ShellTest can replace 'assumeTrue' by (junit 5) condition…

2021-01-14 Thread GitBox


chia7712 merged pull request #9898:
URL: https://github.com/apache/kafka/pull/9898


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9887: KAFKA-12195 Fix synchronization issue happening in KafkaStreams

2021-01-14 Thread GitBox


chia7712 commented on a change in pull request #9887:
URL: https://github.com/apache/kafka/pull/9887#discussion_r557882254



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -989,16 +989,20 @@ private StreamThread createAndAddStreamThread(final long 
cacheSizePerThread, fin
 public Optional removeStreamThread() {
 if (isRunningOrRebalancing()) {
 synchronized (changeThreadCount) {
-for (final StreamThread streamThread : threads) {
-if (streamThread.isAlive() && 
(!streamThread.getName().equals(Thread.currentThread().getName()) || 
threads.size() == 1)) {
-streamThread.shutdown();
-if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+synchronized (threads) {

Review comment:
   make sense. Let me use 'copy' to handle lock issue.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12199) Migrate connect:runtime module to JUnit 5

2021-01-14 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17265720#comment-17265720
 ] 

Chia-Ping Tsai commented on KAFKA-12199:


[~ijuma] PowerMock does not yet have native support for Junit5 
(https://github.com/powermock/powermock/issues/830) and there are a lot of test 
depending on PowerMock functions. It means this PR could get bigger and more 
complicated as we not only migrate junit 4 to junit 5 but also migrate 
PowerMock to other mock tool.

In order to simplify the review works, we should address following tasks before 
upgrading junit.

1. select a mock tool which can work with junit 5 - I prefer Mockito as it 
works well with junit 5
2. rewrite all tests which are depending on PowerMock - this task can be 
separate to different sub-task as I believe the new mock tool can bring a bunch 
of changes.

WDYT?

> Migrate connect:runtime module to JUnit 5
> -
>
> Key: KAFKA-12199
> URL: https://issues.apache.org/jira/browse/KAFKA-12199
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12203) Migrate connect:mirror-client module to JUnit 5

2021-01-14 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-12203.

Fix Version/s: 2.8.0
   Resolution: Fixed

> Migrate connect:mirror-client module to JUnit 5
> ---
>
> Key: KAFKA-12203
> URL: https://issues.apache.org/jira/browse/KAFKA-12203
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
> Fix For: 2.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 merged pull request #9889: KAFKA-12203 Migrate connect:mirror-client module to JUnit 5

2021-01-14 Thread GitBox


chia7712 merged pull request #9889:
URL: https://github.com/apache/kafka/pull/9889


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9889: KAFKA-12203 Migrate connect:mirror-client module to JUnit 5

2021-01-14 Thread GitBox


chia7712 commented on pull request #9889:
URL: https://github.com/apache/kafka/pull/9889#issuecomment-760659526


   ```
   
org.apache.kafka.streams.integration.AdjustStreamThreadCountTest.shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect
   ```
   
   It is traced by #9888 and #9887



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9871: KAFKA-12161; Support raft observers with optional id

2021-01-14 Thread GitBox


abbccdda commented on a change in pull request #9871:
URL: https://github.com/apache/kafka/pull/9871#discussion_r557858498



##
File path: raft/src/main/java/org/apache/kafka/raft/QuorumState.java
##
@@ -125,9 +125,16 @@ public void initialize(OffsetAndEpoch 
logEndOffsetAndEpoch) throws IOException,
 final EpochState initialState;
 if (!election.voters().isEmpty() && !voters.equals(election.voters())) 
{
 throw new IllegalStateException("Configured voter set: " + voters
-+ " is different from the voter set read from the state file: 
" + election.voters() +
-". Check if the quorum configuration is up to date, " +
-"or wipe out the local state file if necessary");
++ " is different from the voter set read from the state file: 
" + election.voters()
++ ". Check if the quorum configuration is up to date, "
++ "or wipe out the local state file if necessary");
+} else if (election.hasVoted() && !isVoter()) {
+String localIdDescription = localId.isPresent() ?
+localId.getAsInt() + " is not a voter" :
+"is undefined";
+throw new IllegalStateException("Initialized quorum state " + 
election

Review comment:
   Fair enough, we could relax later when we experiment out the static 
quorum changes.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9890: KAFKA-12198: Migrate connect:json module to JUnit 5

2021-01-14 Thread GitBox


chia7712 commented on pull request #9890:
URL: https://github.com/apache/kafka/pull/9890#issuecomment-760616110


   @dengziming Could you fix the conflicting files? thanks!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset

2021-01-14 Thread GitBox


hachikuji commented on a change in pull request #9816:
URL: https://github.com/apache/kafka/pull/9816#discussion_r557828817



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -963,49 +979,111 @@ private FetchResponseData tryCompleteFetchRequest(
 FetchRequestData.FetchPartition request,
 long currentTimeMs
 ) {
-Optional errorOpt = 
validateLeaderOnlyRequest(request.currentLeaderEpoch());
-if (errorOpt.isPresent()) {
-return buildEmptyFetchResponse(errorOpt.get(), Optional.empty());
-}
+try {
+Optional errorOpt = 
validateLeaderOnlyRequest(request.currentLeaderEpoch());
+if (errorOpt.isPresent()) {
+return buildEmptyFetchResponse(errorOpt.get(), 
Optional.empty());
+}
 
-long fetchOffset = request.fetchOffset();
-int lastFetchedEpoch = request.lastFetchedEpoch();
-LeaderState state = quorum.leaderStateOrThrow();
-Optional divergingEpochOpt = 
validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch);
-
-if (divergingEpochOpt.isPresent()) {
-Optional divergingEpoch =
-divergingEpochOpt.map(offsetAndEpoch -> new 
FetchResponseData.EpochEndOffset()
-.setEpoch(offsetAndEpoch.epoch)
-.setEndOffset(offsetAndEpoch.offset));
-return buildFetchResponse(Errors.NONE, MemoryRecords.EMPTY, 
divergingEpoch, state.highWatermark());
-} else {
-LogFetchInfo info = log.read(fetchOffset, Isolation.UNCOMMITTED);
+long fetchOffset = request.fetchOffset();
+int lastFetchedEpoch = request.lastFetchedEpoch();
+LeaderState state = quorum.leaderStateOrThrow();
+ValidatedFetchOffsetAndEpoch validatedOffsetAndEpoch = 
validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch);
 
-if (state.updateReplicaState(replicaId, currentTimeMs, 
info.startOffsetMetadata)) {
-onUpdateLeaderHighWatermark(state, currentTimeMs);
+final Records records;
+if (validatedOffsetAndEpoch.type() == 
ValidatedFetchOffsetAndEpoch.Type.VALID) {
+LogFetchInfo info = log.read(fetchOffset, 
Isolation.UNCOMMITTED);
+
+if (state.updateReplicaState(replicaId, currentTimeMs, 
info.startOffsetMetadata)) {
+onUpdateLeaderHighWatermark(state, currentTimeMs);
+}
+
+records = info.records;
+} else {
+records = MemoryRecords.EMPTY;
 }
 
-return buildFetchResponse(Errors.NONE, info.records, 
Optional.empty(), state.highWatermark());
+return buildFetchResponse(Errors.NONE, records, 
validatedOffsetAndEpoch, state.highWatermark());
+} catch (Exception e) {
+logger.error("Caught unexpected error in fetch completion of 
request {}", request, e);
+return buildEmptyFetchResponse(Errors.UNKNOWN_SERVER_ERROR, 
Optional.empty());
 }
 }
 
 /**
  * Check whether a fetch offset and epoch is valid. Return the diverging 
epoch, which
  * is the largest epoch such that subsequent records are known to diverge.
  */
-private Optional validateFetchOffsetAndEpoch(long 
fetchOffset, int lastFetchedEpoch) {
-if (fetchOffset == 0 && lastFetchedEpoch == 0) {
-return Optional.empty();
+private ValidatedFetchOffsetAndEpoch validateFetchOffsetAndEpoch(long 
fetchOffset, int lastFetchedEpoch) {
+if (log.startOffset() == 0 && fetchOffset == 0) {
+if (lastFetchedEpoch != 0) {
+logger.warn(
+"Replica sent a zero fetch offset ({}) but the last 
fetched epoch ({}) was not zero",
+fetchOffset,
+lastFetchedEpoch
+);
+}
+return ValidatedFetchOffsetAndEpoch.valid(new 
OffsetAndEpoch(fetchOffset, lastFetchedEpoch));
 }
 
-OffsetAndEpoch endOffsetAndEpoch = 
log.endOffsetForEpoch(lastFetchedEpoch)
-.orElse(new OffsetAndEpoch(-1L, -1));
-if (endOffsetAndEpoch.epoch != lastFetchedEpoch || 
endOffsetAndEpoch.offset < fetchOffset) {
-return Optional.of(endOffsetAndEpoch);
+
+Optional endOffsetAndEpochOpt = log
+.endOffsetForEpoch(lastFetchedEpoch)
+.flatMap(endOffsetAndEpoch -> {
+if (endOffsetAndEpoch.epoch == lastFetchedEpoch && 
endOffsetAndEpoch.offset == log.startOffset()) {
+// This means that either:
+// 1. The lastFetchedEpoch is smaller than any known epoch
+// 2. The current leader epoch is lastFetchedEpoch and the 
log is empty.
+// Assume that there is not diverging information
+return Optional.empty();
+} else {
+  

[GitHub] [kafka] hachikuji commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset

2021-01-14 Thread GitBox


hachikuji commented on a change in pull request #9816:
URL: https://github.com/apache/kafka/pull/9816#discussion_r557829548



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -2216,7 +2316,7 @@ public void complete() {
 // These fields are visible to both the Raft IO thread and the listener
 // and are protected through synchronization on this `ListenerContext` 
instance
 private BatchReader lastSent = null;
-private long lastAckedOffset = 0;
+private long lastAckedEndOffset = 0;

Review comment:
   I don't feel too strongly about it, but the new name is a little 
confusing to me. Why would the client only be acking end offsets? Especially 
confusing when I see this: `lastAckedEndOffset = logStartOffset` 🙂 .  I think 
we probably need a comment here regardless.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset

2021-01-14 Thread GitBox


hachikuji commented on a change in pull request #9816:
URL: https://github.com/apache/kafka/pull/9816#discussion_r557818816



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -147,18 +221,102 @@ class KafkaMetadataLog(
   }
 
   override def createSnapshot(snapshotId: raft.OffsetAndEpoch): 
RawSnapshotWriter = {
-FileRawSnapshotWriter.create(log.dir.toPath, snapshotId)
+// Do not let the state machine create snapshots older than the latest 
snapshot
+latestSnapshotId().ifPresent { latest =>
+  if (latest.epoch > snapshotId.epoch || latest.offset > 
snapshotId.offset) {
+// Since snapshots are less than the high-watermark absolute offset 
comparison is okay.

Review comment:
   Is it useful here to ensure that `snapshotId` is indeed lower than the 
high watermark?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9888: KAFKA-12194: use stateListener to catch each state change

2021-01-14 Thread GitBox


showuon commented on pull request #9888:
URL: https://github.com/apache/kafka/pull/9888#issuecomment-760605267


   Test will fail, will work it later. Don't review yet. Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9600: KAFKA-10674: Controller API version bond with forwardable APIs

2021-01-14 Thread GitBox


abbccdda commented on a change in pull request #9600:
URL: https://github.com/apache/kafka/pull/9600#discussion_r557825070



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -139,8 +138,11 @@ class KafkaApis(val requestChannel: RequestChannel,
 request: RequestChannel.Request,
 handler: RequestChannel.Request => Unit
   ): Unit = {
-def responseCallback(response: AbstractResponse): Unit = {
-  sendForwardedResponse(request, response)
+def responseCallback(responseEither: Either[AbstractResponse, Errors]): 
Unit = {
+  responseEither match {
+case Left(response) => sendForwardedResponse(request, response)
+case Right(error) => closeConnection(request, 
Collections.singletonMap(error, 1))

Review comment:
   Sg, I prefer using `info` here since it should be a rare case.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9733: KAFKA-10017: fix 2 issues in EosBetaUpgradeIntegrationTest

2021-01-14 Thread GitBox


showuon commented on pull request #9733:
URL: https://github.com/apache/kafka/pull/9733#issuecomment-760604903


   @mjsax , I've reverted my change for the state change. Now, there's only the 
exception handler improvement change. 
   
   And for the state change fix, after the holidays, I think, if the test is 
not flaky now, why should we change it, and maybe make it unreliable again. So, 
my thought is, we keep monitoring this test, and if tests failed again, we can 
discuss it again. What do you think?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset

2021-01-14 Thread GitBox


hachikuji commented on a change in pull request #9816:
URL: https://github.com/apache/kafka/pull/9816#discussion_r557643619



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -16,28 +16,41 @@
  */
 package kafka.raft
 
+import java.nio.file.Files
 import java.nio.file.NoSuchFileException
+import java.util.NoSuchElementException
 import java.util.Optional
+import java.util.concurrent.ConcurrentSkipListSet
 
-import kafka.log.{AppendOrigin, Log}
+import kafka.log.{AppendOrigin, Log, SnapshotGenerated}
 import kafka.server.{FetchHighWatermark, FetchLogEnd}
 import org.apache.kafka.common.record.{MemoryRecords, Records}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.raft
-import org.apache.kafka.raft.{LogAppendInfo, LogFetchInfo, LogOffsetMetadata, 
Isolation, ReplicatedLog}
+import org.apache.kafka.raft.{LogAppendInfo, LogFetchInfo, LogOffsetMetadata, 
Isolation, OffsetMetadata, ReplicatedLog}
 import org.apache.kafka.snapshot.FileRawSnapshotReader
 import org.apache.kafka.snapshot.FileRawSnapshotWriter
 import org.apache.kafka.snapshot.RawSnapshotReader
 import org.apache.kafka.snapshot.RawSnapshotWriter
+import org.apache.kafka.snapshot.Snapshots
 
 import scala.compat.java8.OptionConverters._
 
-class KafkaMetadataLog(
+final class KafkaMetadataLog private (
   log: Log,
+  // This object needs to be thread-safe because the polling thread in the 
KafkaRaftClient implementation
+  // and other threads will access this object. This object is used to 
efficiently notify the polling thread

Review comment:
   What other threads? 

##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -370,7 +375,9 @@ class Log(@volatile private var _dir: File,
   throw new KafkaStorageException(s"The memory mapped buffer for log of 
$topicPartition is already closed")
   }
 
-  def highWatermark: Long = highWatermarkMetadata.messageOffset
+  def highWatermark: Long = _highWatermarkMetadata.messageOffset
+
+  def highWatermarkMetadata: LogOffsetMetadata = _highWatermarkMetadata

Review comment:
   Could we instead either expose `fetchHighWatermarkMetadata` or make use 
of `fetchOffsetSnapshot` in `KafkaMetadataLog`? 

##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -147,18 +221,102 @@ class KafkaMetadataLog(
   }
 
   override def createSnapshot(snapshotId: raft.OffsetAndEpoch): 
RawSnapshotWriter = {
-FileRawSnapshotWriter.create(log.dir.toPath, snapshotId)
+// Do not let the state machine create snapshots older than the latest 
snapshot
+latestSnapshotId().ifPresent { latest =>
+  if (latest.epoch > snapshotId.epoch || latest.offset > 
snapshotId.offset) {
+// Since snapshots are less than the high-watermark absolute offset 
comparison is okay.

Review comment:
   Is it useful here to ensure that `snapshotId` here is lower than the 
high watermark?

##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -871,15 +875,32 @@ private FetchResponseData buildFetchResponse(
 .setLeaderEpoch(quorum.epoch())
 .setLeaderId(quorum.leaderIdOrNil());
 
-divergingEpoch.ifPresent(partitionData::setDivergingEpoch);
+switch (validatedOffsetAndEpoch.type()) {
+case DIVERGING:
+partitionData.divergingEpoch()
+
.setEpoch(validatedOffsetAndEpoch.offsetAndEpoch().epoch)
+
.setEndOffset(validatedOffsetAndEpoch.offsetAndEpoch().offset);
+break;
+case SNAPSHOT:
+partitionData.snapshotId()
+
.setEpoch(validatedOffsetAndEpoch.offsetAndEpoch().epoch)
+
.setEndOffset(validatedOffsetAndEpoch.offsetAndEpoch().offset);
+break;
+default:
+}
 });
 }
 
 private FetchResponseData buildEmptyFetchResponse(
 Errors error,
 Optional highWatermark
 ) {
-return buildFetchResponse(error, MemoryRecords.EMPTY, 
Optional.empty(), highWatermark);
+return buildFetchResponse(
+error,
+MemoryRecords.EMPTY,
+ValidatedFetchOffsetAndEpoch.valid(new OffsetAndEpoch(-1, -1)),

Review comment:
   Not sure it's worth creating another type, but it is a little surprising 
to see `valid(new OffsetAndEpoch(-1, -1))`.

##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -2216,7 +2316,7 @@ public void complete() {
 // These fields are visible to both the Raft IO thread and the listener
 // and are protected through synchronization on this `ListenerContext` 
instance
 private BatchReader lastSent = null;
-private long lastAckedOffset = 0;
+private long lastAckedEndOffset = 0;

Review comment:
  

[GitHub] [kafka] abbccdda commented on a change in pull request #9600: KAFKA-10674: Controller API version bond with forwardable APIs

2021-01-14 Thread GitBox


abbccdda commented on a change in pull request #9600:
URL: https://github.com/apache/kafka/pull/9600#discussion_r557823644



##
File path: core/src/main/scala/kafka/server/ForwardingManager.scala
##
@@ -77,7 +79,7 @@ class ForwardingManagerImpl(
 
   override def forwardRequest(
 request: RequestChannel.Request,
-responseCallback: AbstractResponse => Unit
+responseCallback: Either[AbstractResponse, Errors] => Unit

Review comment:
   I'm inclined to keep it as it is, since the current return type gives 
caller a root cause message.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9733: KAFKA-10017: fix 2 issues in EosBetaUpgradeIntegrationTest

2021-01-14 Thread GitBox


showuon commented on pull request #9733:
URL: https://github.com/apache/kafka/pull/9733#issuecomment-760597684


   OK, working on it now.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9600: KAFKA-10674: Controller API version bond with forwardable APIs

2021-01-14 Thread GitBox


abbccdda commented on a change in pull request #9600:
URL: https://github.com/apache/kafka/pull/9600#discussion_r557810929



##
File path: clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
##
@@ -123,21 +125,25 @@ public short latestUsableVersion(ApiKeys apiKey) {
  * Get the latest version supported by the broker within an allowed range 
of versions
  */
 public short latestUsableVersion(ApiKeys apiKey, short 
oldestAllowedVersion, short latestAllowedVersion) {
-ApiVersion usableVersion = supportedVersions.get(apiKey);
-if (usableVersion == null)
-throw new UnsupportedVersionException("The broker does not support 
" + apiKey);
-return latestUsableVersion(apiKey, usableVersion, 
oldestAllowedVersion, latestAllowedVersion);
+return latestUsableVersion(apiKey, supportedVersions.get(apiKey), 
oldestAllowedVersion, latestAllowedVersion);
 }
 
-private short latestUsableVersion(ApiKeys apiKey, ApiVersion 
supportedVersions,
-  short minAllowedVersion, short 
maxAllowedVersion) {
-short minVersion = (short) Math.max(minAllowedVersion, 
supportedVersions.minVersion);
-short maxVersion = (short) Math.min(maxAllowedVersion, 
supportedVersions.maxVersion);
-if (minVersion > maxVersion)
+private short latestUsableVersion(ApiKeys apiKey,
+  ApiVersion supportedVersions,
+  short minAllowedVersion,
+  short maxAllowedVersion) {
+if (supportedVersions == null)

Review comment:
   Yea, actually we could remove the private function since it becomes 
single caller.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9883: MINOR: Generalize server startup to make way for KIP-500

2021-01-14 Thread GitBox


hachikuji commented on a change in pull request #9883:
URL: https://github.com/apache/kafka/pull/9883#discussion_r557807090



##
File path: core/src/main/scala/kafka/server/KafkaRaftServer.scala
##
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.metrics.{KafkaMetricsReporter, KafkaYammerMetrics}
+import kafka.raft.KafkaRaftManager
+import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole}
+import kafka.utils.{CoreUtils, Logging, Mx4jLoader, VerifiableProperties}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.utils.{AppInfoParser, Time}
+import org.apache.kafka.raft.internals.StringSerde
+
+/**
+ * Partially stubbed implementation of the KIP-500 server which relies on a 
self-managed
+ * Raft quorum for replication of the `@metadata` topic, which stores all of
+ * the cluster metadata.
+ */
+class KafkaRaftServer(

Review comment:
   Makes sense. Added a comment above.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #9881: MINOR: Initialize QuorumState lazily in RaftClient.initialize()

2021-01-14 Thread GitBox


hachikuji merged pull request #9881:
URL: https://github.com/apache/kafka/pull/9881


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9883: MINOR: Generalize server startup to make way for KIP-500

2021-01-14 Thread GitBox


cmccabe commented on a change in pull request #9883:
URL: https://github.com/apache/kafka/pull/9883#discussion_r557799746



##
File path: core/src/main/scala/kafka/server/KafkaRaftServer.scala
##
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.metrics.{KafkaMetricsReporter, KafkaYammerMetrics}
+import kafka.raft.KafkaRaftManager
+import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole}
+import kafka.utils.{CoreUtils, Logging, Mx4jLoader, VerifiableProperties}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.utils.{AppInfoParser, Time}
+import org.apache.kafka.raft.internals.StringSerde
+
+/**
+ * Partially stubbed implementation of the KIP-500 server which relies on a 
self-managed
+ * Raft quorum for replication of the `@metadata` topic, which stores all of
+ * the cluster metadata.
+ */
+class KafkaRaftServer(

Review comment:
   Can we have a comment at least describing the fact that it can contain 
either a controller, broker, or both
   






This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9600: KAFKA-10674: Controller API version bond with forwardable APIs

2021-01-14 Thread GitBox


hachikuji commented on a change in pull request #9600:
URL: https://github.com/apache/kafka/pull/9600#discussion_r557783391



##
File path: clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
##
@@ -123,21 +125,25 @@ public short latestUsableVersion(ApiKeys apiKey) {
  * Get the latest version supported by the broker within an allowed range 
of versions
  */
 public short latestUsableVersion(ApiKeys apiKey, short 
oldestAllowedVersion, short latestAllowedVersion) {
-ApiVersion usableVersion = supportedVersions.get(apiKey);
-if (usableVersion == null)
-throw new UnsupportedVersionException("The broker does not support 
" + apiKey);
-return latestUsableVersion(apiKey, usableVersion, 
oldestAllowedVersion, latestAllowedVersion);
+return latestUsableVersion(apiKey, supportedVersions.get(apiKey), 
oldestAllowedVersion, latestAllowedVersion);
 }
 
-private short latestUsableVersion(ApiKeys apiKey, ApiVersion 
supportedVersions,
-  short minAllowedVersion, short 
maxAllowedVersion) {
-short minVersion = (short) Math.max(minAllowedVersion, 
supportedVersions.minVersion);
-short maxVersion = (short) Math.min(maxAllowedVersion, 
supportedVersions.maxVersion);
-if (minVersion > maxVersion)
+private short latestUsableVersion(ApiKeys apiKey,
+  ApiVersion supportedVersions,
+  short minAllowedVersion,
+  short maxAllowedVersion) {
+if (supportedVersions == null)

Review comment:
   nit: since we moved the null check here, why don't we remove the 
parameter as well and call `supportedVersions.get(apiKey)` here?

##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
##
@@ -129,6 +131,40 @@ public static ApiVersionsResponseKeyCollection 
defaultApiKeys(final byte minMagi
 return apiKeys;
 }
 
+public static ApiVersionsResponseKeyCollection 
commonApiVersionsWithActiveController(final byte minMagic,

Review comment:
   Maybe `intersectControllerApiVersions`?

##
File path: clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
##
@@ -123,21 +125,25 @@ public short latestUsableVersion(ApiKeys apiKey) {
  * Get the latest version supported by the broker within an allowed range 
of versions
  */
 public short latestUsableVersion(ApiKeys apiKey, short 
oldestAllowedVersion, short latestAllowedVersion) {

Review comment:
   We could probably simplify this so that it takes a single `ApiVersion` 
parameter?
   
   By the way, the implementation above `latestUsableVersion(ApiKeys apiKey)` 
since it basically does an intersection of the latest supported version with 
itself. A little helper (say `latestSupportedOrThrow`) might simplify this.

##
File path: 
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##
@@ -140,6 +143,16 @@ class BrokerToControllerChannelManager(
   callback
 ))
   }
+
+  def controllerApiVersions(): Option[NodeApiVersions] =
+requestThread.activeControllerAddress() match {
+  case Some(activeController) =>
+if (activeController.id() == config.brokerId)
+  Some(currentNodeApiVersions)
+else
+  Option(apiVersions.get(activeController.idString()))
+  case None => None

Review comment:
   Seems like we can replace the `match` with a `flatMap`?

##
File path: 
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##
@@ -205,20 +226,20 @@ class BrokerToControllerRequestThread(
 
   private[server] def handleResponse(request: 
BrokerToControllerQueueItem)(response: ClientResponse): Unit = {
 if (response.wasDisconnected()) {
-  activeController = None
+  updateControllerAddress(None)
   requestQueue.putFirst(request)
 } else if 
(response.responseBody().errorCounts().containsKey(Errors.NOT_CONTROLLER)) {
   // just close the controller connection and wait for metadata cache 
update in doWork
-  networkClient.disconnect(activeController.get.idString)
-  activeController = None
+  networkClient.disconnect(activeControllerAddress().get.idString)

Review comment:
   This is another slippery looking case. Can we just rewrite this as a 
`foreach` so that we don't need to worry about it?

##
File path: 
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##
@@ -190,11 +211,11 @@ class BrokerToControllerRequestThread(
   if (currentTimeMs - request.createdTimeMs >= retryTimeoutMs) {
 requestIter.remove()
 request.callback.onTimeout()
-  } else if (activeController.isDefined) {
+  } else if (activeControllerAddress().isDefined) {
 requestIter.remove()
 return Some(RequestAndCompletio

[GitHub] [kafka] bob-barrett opened a new pull request #9902: KAFKA-12193: Re-resolve IPs after a client disconnects

2021-01-14 Thread GitBox


bob-barrett opened a new pull request #9902:
URL: https://github.com/apache/kafka/pull/9902


   This patch changes the NetworkClient behavior to resolve the target node's 
hostname after disconnecting from an established connection, rather than 
waiting until the previously-resolved addresses are exhausted. This is to 
handle the scenario when the node's IP addresses have changed during the 
lifetime of the connection, and means that the client does not have to try to 
connect to invalid IP addresses until it has tried each address.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9889: KAFKA-12203 Migrate connect:mirror-client module to JUnit 5

2021-01-14 Thread GitBox


ijuma commented on pull request #9889:
URL: https://github.com/apache/kafka/pull/9889#issuecomment-760552078


   The failed Streams tests seem unrelated, but they seem to be failing very 
often.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #9892: KAFKA-12201: Migrate connect:basic-auth-extensio module to JUnit 5

2021-01-14 Thread GitBox


ijuma merged pull request #9892:
URL: https://github.com/apache/kafka/pull/9892


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10896) TimeoutException when Producer try to send message to Kafka Broker

2021-01-14 Thread Nguyen Cong Hieu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nguyen Cong Hieu resolved KAFKA-10896.
--
Resolution: Not A Bug

> TimeoutException when Producer try to send message to Kafka Broker
> --
>
> Key: KAFKA-10896
> URL: https://issues.apache.org/jira/browse/KAFKA-10896
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, config
>Affects Versions: 2.6.0
>Reporter: Nguyen Cong Hieu
>Priority: Major
> Attachments: console_consumer.png, console_producer.png, 
> producerconfig.png, producercreate.png, serverconfig.png, 
> timeout_exception.png
>
>
> Dear All,
>  
> As I tried to setup new Kafka environment, even all work normally when I use 
> consumer and producer binaries that shipped with Kafka source together. 
>  
> However, when I ran small java application that create producer to send 
> message of a topic to broker, but it has timeout exception with the message 
> "Caused by: org.apache.kafka.common.errors.TimeoutException: Topic otp not 
> present in metadata after 6 ms."
>  
> I checked some confluence pages related to Kafka but could not resolved yet.
>  
> Please help me to resolve this as well ? and please see my file attachments 
> for more details about broker information and sample source code.
> Best regards.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe opened a new pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-14 Thread GitBox


cmccabe opened a new pull request #9901:
URL: https://github.com/apache/kafka/pull/9901


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9887: KAFKA-12195 Fix synchronization issue happening in KafkaStreams

2021-01-14 Thread GitBox


wcarlson5 commented on a change in pull request #9887:
URL: https://github.com/apache/kafka/pull/9887#discussion_r557739729



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -989,16 +989,20 @@ private StreamThread createAndAddStreamThread(final long 
cacheSizePerThread, fin
 public Optional removeStreamThread() {
 if (isRunningOrRebalancing()) {
 synchronized (changeThreadCount) {
-for (final StreamThread streamThread : threads) {
-if (streamThread.isAlive() && 
(!streamThread.getName().equals(Thread.currentThread().getName()) || 
threads.size() == 1)) {
-streamThread.shutdown();
-if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+synchronized (threads) {

Review comment:
   Why are we using the threads object as a lock here? is that was the 
`Collections.synchronizedList(new LinkedList<>());` uses internally?
   
   I am a little concerned that we are holding `changeThreadCount` while 
waiting for `threads`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12209) Add the timeline data structures for the KIP-631 controller

2021-01-14 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-12209:


 Summary: Add the timeline data structures for the KIP-631 
controller
 Key: KAFKA-12209
 URL: https://issues.apache.org/jira/browse/KAFKA-12209
 Project: Kafka
  Issue Type: Improvement
Reporter: Colin McCabe






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12183) Add the KIP-631 metadata record definitions

2021-01-14 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-12183.
--
Fix Version/s: 2.8.0
   Resolution: Fixed

> Add the KIP-631 metadata record definitions
> ---
>
> Key: KAFKA-12183
> URL: https://issues.apache.org/jira/browse/KAFKA-12183
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
>  Labels: kip-500
> Fix For: 2.8.0
>
>
> Add the KIP-631 metadata record definitions



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe opened a new pull request #9900: KAFKA-12208: Rename AdminManager to ZkAdminManager

2021-01-14 Thread GitBox


cmccabe opened a new pull request #9900:
URL: https://github.com/apache/kafka/pull/9900


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12208) Rename AdminManager to ZkAdminManager

2021-01-14 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-12208:


 Summary: Rename AdminManager to ZkAdminManager
 Key: KAFKA-12208
 URL: https://issues.apache.org/jira/browse/KAFKA-12208
 Project: Kafka
  Issue Type: Improvement
Reporter: Colin McCabe






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #9871: KAFKA-12161; Support raft observers with optional id

2021-01-14 Thread GitBox


hachikuji commented on a change in pull request #9871:
URL: https://github.com/apache/kafka/pull/9871#discussion_r557725416



##
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##
@@ -2182,6 +2182,46 @@ public void 
testHandleCommitCallbackFiresInCandidateState() throws Exception {
 assertEquals(OptionalInt.empty(), 
secondListener.currentClaimedEpoch());
 }
 
+@Test
+public void testObserverFetchWithNoLocalId() throws Exception {
+// When no `localId` is defined, the client will behave as an observer.

Review comment:
   Makes sense. I will move it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9871: KAFKA-12161; Support raft observers with optional id

2021-01-14 Thread GitBox


hachikuji commented on a change in pull request #9871:
URL: https://github.com/apache/kafka/pull/9871#discussion_r557725206



##
File path: raft/src/main/java/org/apache/kafka/raft/QuorumState.java
##
@@ -125,9 +125,16 @@ public void initialize(OffsetAndEpoch 
logEndOffsetAndEpoch) throws IOException,
 final EpochState initialState;
 if (!election.voters().isEmpty() && !voters.equals(election.voters())) 
{
 throw new IllegalStateException("Configured voter set: " + voters
-+ " is different from the voter set read from the state file: 
" + election.voters() +
-". Check if the quorum configuration is up to date, " +
-"or wipe out the local state file if necessary");
++ " is different from the voter set read from the state file: 
" + election.voters()
++ ". Check if the quorum configuration is up to date, "
++ "or wipe out the local state file if necessary");
+} else if (election.hasVoted() && !isVoter()) {
+String localIdDescription = localId.isPresent() ?
+localId.getAsInt() + " is not a voter" :
+"is undefined";
+throw new IllegalStateException("Initialized quorum state " + 
election

Review comment:
   I thought I would start out strict and relax later if needed. Worst 
case, the user would just remove or edit the `quorum-state` file.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9871: KAFKA-12161; Support raft observers with optional id

2021-01-14 Thread GitBox


abbccdda commented on a change in pull request #9871:
URL: https://github.com/apache/kafka/pull/9871#discussion_r557722131



##
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##
@@ -2182,6 +2182,46 @@ public void 
testHandleCommitCallbackFiresInCandidateState() throws Exception {
 assertEquals(OptionalInt.empty(), 
secondListener.currentClaimedEpoch());
 }
 
+@Test
+public void testObserverFetchWithNoLocalId() throws Exception {
+// When no `localId` is defined, the client will behave as an observer.

Review comment:
   Wondering whether we could copy this comment to the raft client localId 
field.

##
File path: raft/src/main/java/org/apache/kafka/raft/QuorumState.java
##
@@ -125,9 +125,16 @@ public void initialize(OffsetAndEpoch 
logEndOffsetAndEpoch) throws IOException,
 final EpochState initialState;
 if (!election.voters().isEmpty() && !voters.equals(election.voters())) 
{
 throw new IllegalStateException("Configured voter set: " + voters
-+ " is different from the voter set read from the state file: 
" + election.voters() +
-". Check if the quorum configuration is up to date, " +
-"or wipe out the local state file if necessary");
++ " is different from the voter set read from the state file: 
" + election.voters()
++ ". Check if the quorum configuration is up to date, "
++ "or wipe out the local state file if necessary");
+} else if (election.hasVoted() && !isVoter()) {
+String localIdDescription = localId.isPresent() ?
+localId.getAsInt() + " is not a voter" :
+"is undefined";
+throw new IllegalStateException("Initialized quorum state " + 
election

Review comment:
   So a node could not transit from follower to observer though restarts?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9887: KAFKA-12195 Fix synchronization issue happening in KafkaStreams

2021-01-14 Thread GitBox


ableegoldman commented on pull request #9887:
URL: https://github.com/apache/kafka/pull/9887#issuecomment-760488841


   Hey @wcarlson5 can you also take a look at this?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda merged pull request #9834: MINOR: fix typo in TimeIndex

2021-01-14 Thread GitBox


abbccdda merged pull request #9834:
URL: https://github.com/apache/kafka/pull/9834


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9888: KAFKA-12194: use stateListener to catch each state change

2021-01-14 Thread GitBox


cadonna commented on a change in pull request #9888:
URL: https://github.com/apache/kafka/pull/9888#discussion_r557647216



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##
@@ -119,37 +160,47 @@ public void shouldAddStreamThread() throws Exception {
 .sorted().toArray(),
 equalTo(new String[] {"1", "2", "3"})
 );
-waitForApplicationState(Collections.singletonList(kafkaStreams), 
KafkaStreams.State.REBALANCING, DEFAULT_DURATION);
-waitForApplicationState(Collections.singletonList(kafkaStreams), 
KafkaStreams.State.RUNNING, DEFAULT_DURATION);
+
+waitForStateTransition(KafkaStreams.State.RUNNING);
+assertTrue(hasStateTransition(KafkaStreams.State.REBALANCING, 
KafkaStreams.State.RUNNING));

Review comment:
   We normally use `assertThat()` in new and refactored code. Please also 
change the other occurrences.
   
   ```suggestion
   assertThat(hasStateTransition(KafkaStreams.State.REBALANCING, 
KafkaStreams.State.RUNNING), is(true));
   ```

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##
@@ -90,24 +91,64 @@ public void setup() {
 );
 }
 
+private void startStreamsAndWaitForRunning(final KafkaStreams 
kafkaStreams) throws InterruptedException {
+kafkaStreams.start();
+waitForStateTransition(KafkaStreams.State.RUNNING);
+}
+
 @After
 public void teardown() throws IOException {
+stateTransitionHistory.clear();
 purgeLocalStreamsState(properties);
 }
 
+private void addStreamStateChangeListener(final KafkaStreams kafkaStreams) 
{
+// we store each new state in state transition so that we won't miss 
any state change
+kafkaStreams.setStateListener(
+(newState, oldState) -> stateTransitionHistory.add(newState)
+);
+}
+
+private void waitForStateTransition(final KafkaStreams.State expected) 
throws InterruptedException {
+waitForCondition(
+() -> !stateTransitionHistory.isEmpty() && 
stateTransitionHistory.contains(expected),
+DEFAULT_DURATION.toMillis(),
+() -> String.format("Client did not change to the %s state in 
time. Observed new state transitions: %s",
+expected, stateTransitionHistory)
+);
+}
+
+// verify if there's the state change from "before" state into "after" 
state
+private boolean hasStateTransition(final KafkaStreams.State before, final 
KafkaStreams.State after) {
+// should have at least 2 states in history
+if (stateTransitionHistory.size() < 2) {
+return false;
+}
+
+for (int i = 0; i < stateTransitionHistory.size() - 1; i++) {
+if (stateTransitionHistory.get(i).equals(before) && 
stateTransitionHistory.get(i + 1).equals(after)) {
+return true;
+}
+}
+return false;

Review comment:
   Why do we need a `for`-loop here? Wouldn't it suffice to verify the last 
two elements of the history and check if those two elements are a `REBALANCING` 
followed by a `RUNNING`? 

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##
@@ -90,24 +91,64 @@ public void setup() {
 );
 }
 
+private void startStreamsAndWaitForRunning(final KafkaStreams 
kafkaStreams) throws InterruptedException {
+kafkaStreams.start();
+waitForStateTransition(KafkaStreams.State.RUNNING);
+}
+
 @After
 public void teardown() throws IOException {
+stateTransitionHistory.clear();
 purgeLocalStreamsState(properties);
 }
 
+private void addStreamStateChangeListener(final KafkaStreams kafkaStreams) 
{
+// we store each new state in state transition so that we won't miss 
any state change
+kafkaStreams.setStateListener(
+(newState, oldState) -> stateTransitionHistory.add(newState)
+);
+}
+
+private void waitForStateTransition(final KafkaStreams.State expected) 
throws InterruptedException {
+waitForCondition(
+() -> !stateTransitionHistory.isEmpty() && 
stateTransitionHistory.contains(expected),
+DEFAULT_DURATION.toMillis(),
+() -> String.format("Client did not change to the %s state in 
time. Observed new state transitions: %s",
+expected, stateTransitionHistory)
+);
+}

Review comment:
   Couldn't we simply wait for the current state to become `RUNNING`?
   
   ```suggestion
   private void waitForRunning() throws Exception {
   waitForCondition(
   () -> kafkaStreams.state() == KafkaStreams.State.RUNNING,
   DEFAULT_DURATION.toMillis(),
   () -> String.format("Client did not transi

[jira] [Updated] (KAFKA-12207) Do not maintain list of latest producer append information

2021-01-14 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-12207:

Labels: needs-kip  (was: )

> Do not maintain list of latest producer append information 
> ---
>
> Key: KAFKA-12207
> URL: https://issues.apache.org/jira/browse/KAFKA-12207
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>  Labels: needs-kip
>
> For each producerId writing to each partition, we maintain a list of the 5 
> most recent appended sequence numbers and the corresponding offsets in the 
> log. If a producer fails to receive a successful response and retries the 
> Produce request, then we can still return the offset of the successful 
> append, which is returned to the user inside `RecordMetadata`. (Note that the 
> limit of 5 most recent appends is where we derive the limit on the max number 
> of inflight requests that the producer is allowed when idempotence is 
> enabled.)
> This is only a "best-effort" attempt to return the offset of the append. For 
> example, we do not populate the full list of recent appends when the log is 
> reloaded. Only the latest sequence/offset are reloaded from the snapshot. If 
> we receive a duplicate and we do not have the offset, then the broker 
> currently handles this by returning OUT_OF_ORDER_SEQUENCE.
> In fact, we have a separate error DUPLICATE_SEQUENCE_NUMBER which was 
> intended to handle this case and the producer already checks for it. If the 
> producer sees this error in the response, then the `send` is considered 
> successful, but the producer returns -1 as both the offset and timestamp 
> inside `RecordMetadata`.
> The reason we never implemented this on the broker is probably because we 
> allow the sequence numbers of the producer to wrap around after reaching 
> Int.MaxValue. What we considered in the past is fixing a number like 1000 and 
> requiring that the sequence be within that range to be considered a 
> duplicate. A better solution going forward is to let the producer bump the 
> epoch when the sequence hits Int.MaxValue, but we still have to allow 
> sequence numbers to wrap for compatibility.
> Given the loose guarantees that we already have here, I'm considering whether 
> the additional bookkeeping and the required memory are worth preserving. As 
> an alternative, we could consider the following:
> 1. The broker will only maintain the latest sequence/offset for each 
> producerId
> 2. We will return DUPLICATE_SEQUENCE_NUMBER for any sequence that is within 
> 1000 of the latest sequence (accounting for overflow). 
> 3. Instead of wrapping around sequence numbers, the producer will bump the 
> epoch if possible. It's worth noting that the idempotent producer can freely 
> bump the epoch, so the only time we should ever need to wrap the sequence is 
> for the transactional producer when it is used on a broker which does not 
> support the `InitProducerId` version which allows epoch bumps.
> 4. We can remove the restriction on `max.in.flight.requests.per.connection` 
> and document that if the offset is required in `RecordMetadata`, then the 
> user must set this to 1. Internally, if connecting to an old broker which 
> does not support epoch bumps, then we can restrict the number of inflight 
> requests to 5.
> The benefit in the end is that we can reduce the memory usage for producer 
> state and the complexity to manage that state. It also gives us a path to 
> removing the annoying config restriction and a better policy for sequence 
> overflow.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12207) Do not maintain list of latest producer append information

2021-01-14 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-12207:

Issue Type: Improvement  (was: Bug)

> Do not maintain list of latest producer append information 
> ---
>
> Key: KAFKA-12207
> URL: https://issues.apache.org/jira/browse/KAFKA-12207
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>
> For each producerId writing to each partition, we maintain a list of the 5 
> most recent appended sequence numbers and the corresponding offsets in the 
> log. If a producer fails to receive a successful response and retries the 
> Produce request, then we can still return the offset of the successful 
> append, which is returned to the user inside `RecordMetadata`. (Note that the 
> limit of 5 most recent appends is where we derive the limit on the max number 
> of inflight requests that the producer is allowed when idempotence is 
> enabled.)
> This is only a "best-effort" attempt to return the offset of the append. For 
> example, we do not populate the full list of recent appends when the log is 
> reloaded. Only the latest sequence/offset are reloaded from the snapshot. If 
> we receive a duplicate and we do not have the offset, then the broker 
> currently handles this by returning OUT_OF_ORDER_SEQUENCE.
> In fact, we have a separate error DUPLICATE_SEQUENCE_NUMBER which was 
> intended to handle this case and the producer already checks for it. If the 
> producer sees this error in the response, then the `send` is considered 
> successful, but the producer returns -1 as both the offset and timestamp 
> inside `RecordMetadata`.
> The reason we never implemented this on the broker is probably because we 
> allow the sequence numbers of the producer to wrap around after reaching 
> Int.MaxValue. What we considered in the past is fixing a number like 1000 and 
> requiring that the sequence be within that range to be considered a 
> duplicate. A better solution going forward is to let the producer bump the 
> epoch when the sequence hits Int.MaxValue, but we still have to allow 
> sequence numbers to wrap for compatibility.
> Given the loose guarantees that we already have here, I'm considering whether 
> the additional bookkeeping and the required memory are worth preserving. As 
> an alternative, we could consider the following:
> 1. The broker will only maintain the latest sequence/offset for each 
> producerId
> 2. We will return DUPLICATE_SEQUENCE_NUMBER for any sequence that is within 
> 1000 of the latest sequence (accounting for overflow). 
> 3. Instead of wrapping around sequence numbers, the producer will bump the 
> epoch if possible. It's worth noting that the idempotent producer can freely 
> bump the epoch, so the only time we should ever need to wrap the sequence is 
> for the transactional producer when it is used on a broker which does not 
> support the `InitProducerId` version which allows epoch bumps.
> 4. We can remove the restriction on `max.in.flight.requests.per.connection` 
> and document that if the offset is required in `RecordMetadata`, then the 
> user must set this to 1. Internally, if connecting to an old broker which 
> does not support epoch bumps, then we can restrict the number of inflight 
> requests to 5.
> The benefit in the end is that we can reduce the memory usage for producer 
> state and the complexity to manage that state. It also gives us a path to 
> removing the annoying config restriction and a better policy for sequence 
> overflow.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12207) Do not maintain list of latest producer append information

2021-01-14 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-12207:
---

 Summary: Do not maintain list of latest producer append 
information 
 Key: KAFKA-12207
 URL: https://issues.apache.org/jira/browse/KAFKA-12207
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson


For each producerId writing to each partition, we maintain a list of the 5 most 
recent appended sequence numbers and the corresponding offsets in the log. If a 
producer fails to receive a successful response and retries the Produce 
request, then we can still return the offset of the successful append, which is 
returned to the user inside `RecordMetadata`. (Note that the limit of 5 most 
recent appends is where we derive the limit on the max number of inflight 
requests that the producer is allowed when idempotence is enabled.)

This is only a "best-effort" attempt to return the offset of the append. For 
example, we do not populate the full list of recent appends when the log is 
reloaded. Only the latest sequence/offset are reloaded from the snapshot. If we 
receive a duplicate and we do not have the offset, then the broker currently 
handles this by returning OUT_OF_ORDER_SEQUENCE.

In fact, we have a separate error DUPLICATE_SEQUENCE_NUMBER which was intended 
to handle this case and the producer already checks for it. If the producer 
sees this error in the response, then the `send` is considered successful, but 
the producer returns -1 as both the offset and timestamp inside 
`RecordMetadata`.

The reason we never implemented this on the broker is probably because we allow 
the sequence numbers of the producer to wrap around after reaching 
Int.MaxValue. What we considered in the past is fixing a number like 1000 and 
requiring that the sequence be within that range to be considered a duplicate. 
A better solution going forward is to let the producer bump the epoch when the 
sequence hits Int.MaxValue, but we still have to allow sequence numbers to wrap 
for compatibility.

Given the loose guarantees that we already have here, I'm considering whether 
the additional bookkeeping and the required memory are worth preserving. As an 
alternative, we could consider the following:

1. The broker will only maintain the latest sequence/offset for each producerId
2. We will return DUPLICATE_SEQUENCE_NUMBER for any sequence that is within 
1000 of the latest sequence (accounting for overflow). 
3. Instead of wrapping around sequence numbers, the producer will bump the 
epoch if possible. It's worth noting that the idempotent producer can freely 
bump the epoch, so the only time we should ever need to wrap the sequence is 
for the transactional producer when it is used on a broker which does not 
support the `InitProducerId` version which allows epoch bumps.
4. We can remove the restriction on `max.in.flight.requests.per.connection` and 
document that if the offset is required in `RecordMetadata`, then the user must 
set this to 1. Internally, if connecting to an old broker which does not 
support epoch bumps, then we can restrict the number of inflight requests to 5.

The benefit in the end is that we can reduce the memory usage for producer 
state and the complexity to manage that state. It also gives us a path to 
removing the annoying config restriction and a better policy for sequence 
overflow.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] C0urante commented on pull request #9880: KAFKA-10792 (2.5 backport): Prevent source task shutdown from blocking herder thread (#9669)

2021-01-14 Thread GitBox


C0urante commented on pull request #9880:
URL: https://github.com/apache/kafka/pull/9880#issuecomment-760448619


   Thanks Randall!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9715: Upstream ApisUtils from kip-500

2021-01-14 Thread GitBox


jsancio commented on a change in pull request #9715:
URL: https://github.com/apache/kafka/pull/9715#discussion_r557669397



##
File path: core/src/main/scala/kafka/server/AuthHelper.scala
##
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.lang.{Byte => JByte}
+import java.util.Collections
+
+import kafka.network.RequestChannel
+import kafka.security.authorizer.AclEntry
+import org.apache.kafka.common.acl.AclOperation
+import org.apache.kafka.common.errors.ClusterAuthorizationException
+import org.apache.kafka.common.requests.RequestContext
+import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
+import org.apache.kafka.common.resource.ResourceType.CLUSTER
+import org.apache.kafka.common.resource.{PatternType, Resource, 
ResourcePattern, ResourceType}
+import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, 
Authorizer}
+
+import scala.jdk.CollectionConverters._
+
+
+class AuthHelper(val requestChannel: RequestChannel,
+ val authorizer: Option[Authorizer]) {

Review comment:
   Adding `val` to the constructor arguments makes the `public` members. 
Glancing at the code it doesn't look like this is needed.

##
File path: core/src/main/scala/kafka/server/RequestHandlerHelper.scala
##
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.cluster.Partition
+import kafka.coordinator.group.GroupCoordinator
+import kafka.coordinator.transaction.TransactionCoordinator
+import kafka.network.RequestChannel
+import kafka.server.QuotaFactory.QuotaManagers
+import kafka.utils.Logging
+import org.apache.kafka.common.errors.ClusterAuthorizationException
+import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.network.Send
+import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse}
+import org.apache.kafka.common.utils.Time
+
+
+object RequestHandlerHelper {
+
+  def onLeadershipChange(groupCoordinator: GroupCoordinator,
+ txnCoordinator: TransactionCoordinator,
+ updatedLeaders: Iterable[Partition],
+ updatedFollowers: Iterable[Partition]): Unit = {
+// for each new leader or follower, call coordinator to handle consumer 
group migration.
+// this callback is invoked under the replica state change lock to ensure 
proper order of
+// leadership changes
+updatedLeaders.foreach { partition =>
+  if (partition.topic == Topic.GROUP_METADATA_TOPIC_NAME)
+groupCoordinator.onElection(partition.partitionId)
+  else if (partition.topic == Topic.TRANSACTION_STATE_TOPIC_NAME)
+txnCoordinator.onElection(partition.partitionId, 
partition.getLeaderEpoch)
+}
+
+updatedFollowers.foreach { partition =>
+  if (partition.topic == Topic.GROUP_METADATA_TOPIC_NAME)
+groupCoordinator.onResignation(partition.partitionId)
+  else if (partition.topic == Topic.TRANSACTION_STATE_TOPIC_NAME)
+txnCoordinator.onResignation(partition.partitionId, 
Some(partition.getLeaderEpoch))
+}
+  }
+}
+
+
+
+class RequestHandlerHelper(val requestChannel: RequestChannel,
+   val quotas: QuotaManagers,
+   val time: Time,
+   val logPrefix: String) extends Logging {

Review comm

[GitHub] [kafka] lct45 commented on a change in pull request #9895: KAFKA-9924: Add docs for RocksDB properties-based metrics

2021-01-14 Thread GitBox


lct45 commented on a change in pull request #9895:
URL: https://github.com/apache/kafka/pull/9895#discussion_r557642098



##
File path: docs/ops.html
##
@@ -2133,6 +2141,137 @@ RocksDB 
Properties-based Metrics
+  RocksDB Properties-based Metrics:
+  All of the following properties-based metrics have a recording level of 
info and are recorded when the
+  metrics are accessed.
+  If a state store consists of multiple RocksDB instances as it is the case 
for aggregations over time and session windows,

Review comment:
   ```suggestion
 If a state store consists of multiple RocksDB instances, as is the case 
for aggregations over time and session windows,
   ```

##
File path: docs/ops.html
##
@@ -2133,6 +2141,137 @@ RocksDB 
Properties-based Metrics
+  RocksDB Properties-based Metrics:
+  All of the following properties-based metrics have a recording level of 
info and are recorded when the
+  metrics are accessed.
+  If a state store consists of multiple RocksDB instances as it is the case 
for aggregations over time and session windows,
+  each metric reports the sum over the RocksDB instances of the state store, 
except for the block cache metrics

Review comment:
   ```suggestion
 each metric reports the sum over all the RocksDB instances of the state 
store, except for the block cache metrics
   ```

##
File path: docs/ops.html
##
@@ -2032,17 +2032,25 @@ RocksDB 
Metrics
-  All of the following metrics have a recording level of debug.
-  The metrics are collected every minute from the RocksDB state stores.
-  If a state store consists of multiple RocksDB instances as it is the case 
for aggregations over time and session windows,
-  each metric reports an aggregation over the RocksDB instances of the state 
store.
+  RocksDB Metrics are grouped into statistics-based metrics and 
properties-based metrics.
+  The former are recorded from statistics that a RocksDB state store collects 
whereas the latter are recorded from
+  properties that RocksDB exposes.
   Note that the store-scope for built-in RocksDB state stores are 
currently the following:
   
 rocksdb-state (for RocksDB backed key-value store)
 rocksdb-window-state (for RocksDB backed window 
store)
 rocksdb-session-state (for RocksDB backed session 
store)
   
 
+  
+  RocksDB Statistics-based Metrics:
+  All of the following statistics-based metrics have a recording level of 
debug because collecting
+  statistics in https://github.com/facebook/rocksdb/wiki/Statistics#stats-level-and-performance-costs";>RocksDB
+  may have an impact on performance.
+  Statistics-based metrics are collected every minute from the RocksDB state 
stores.
+  If a state store consists of multiple RocksDB instances as it is the case 
for aggregations over time and session windows,

Review comment:
   ```suggestion
 If a state store consists of multiple RocksDB instances, as is the case 
for aggregations over time and session windows,
   ```

##
File path: docs/ops.html
##
@@ -2032,17 +2032,25 @@ RocksDB 
Metrics
-  All of the following metrics have a recording level of debug.
-  The metrics are collected every minute from the RocksDB state stores.
-  If a state store consists of multiple RocksDB instances as it is the case 
for aggregations over time and session windows,
-  each metric reports an aggregation over the RocksDB instances of the state 
store.
+  RocksDB Metrics are grouped into statistics-based metrics and 
properties-based metrics.
+  The former are recorded from statistics that a RocksDB state store collects 
whereas the latter are recorded from
+  properties that RocksDB exposes.

Review comment:
   I'm still a little confused about the difference between the two after 
reading this. Maybe an example of something that is exposed by RocksDB but not 
collected by RocksDB would help

##
File path: docs/ops.html
##
@@ -2133,6 +2141,137 @@ RocksDB 
Properties-based Metrics
+  RocksDB Properties-based Metrics:
+  All of the following properties-based metrics have a recording level of 
info and are recorded when the
+  metrics are accessed.
+  If a state store consists of multiple RocksDB instances as it is the case 
for aggregations over time and session windows,
+  each metric reports the sum over the RocksDB instances of the state store, 
except for the block cache metrics
+  block-cache-*. The block cache metrics report the sum over all 
RocksDB instances if each instance uses its
+  own block cache, and they report the recorded value from only one instance 
if a single block cache is shared
+  among all instances.
+
+  
+
+
+  Metric/Attribute name
+  Description
+  Mbean name
+
+
+  num-immutable-mem-table
+  The number of immutable memtables that have not yet been 
flushed.
+  
kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
+
+   

[GitHub] [kafka] mjsax commented on pull request #9733: KAFKA-10017: fix 2 issues in EosBetaUpgradeIntegrationTest

2021-01-14 Thread GitBox


mjsax commented on pull request #9733:
URL: https://github.com/apache/kafka/pull/9733#issuecomment-760424092


   @showuon Sorry for late response. Pretty busy atm.
   
   We found that the fix for the exception handler contained in this PR is also 
blocking https://github.com/apache/kafka/pull/9720 (\cc @wcarlson5) -- could 
you extract this fix into a separate PR that we can merge quickly to unblock 
9720? Otherwise we might need to extract your fix into 9720 to be able to move 
forward with it.
   
   Hope to cycle back to this PR soon..
   
   Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset

2021-01-14 Thread GitBox


jsancio commented on a change in pull request #9816:
URL: https://github.com/apache/kafka/pull/9816#discussion_r557643648



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -29,15 +32,22 @@ import org.apache.kafka.snapshot.FileRawSnapshotReader
 import org.apache.kafka.snapshot.FileRawSnapshotWriter
 import org.apache.kafka.snapshot.RawSnapshotReader
 import org.apache.kafka.snapshot.RawSnapshotWriter
+import org.apache.kafka.snapshot.Snapshots
 
 import scala.compat.java8.OptionConverters._
 
-class KafkaMetadataLog(
+final class KafkaMetadataLog private (
   log: Log,
+  snapshotIds: ConcurrentSkipListSet[raft.OffsetAndEpoch],
   topicPartition: TopicPartition,
-  maxFetchSizeInBytes: Int = 1024 * 1024
+  maxFetchSizeInBytes: Int
 ) extends ReplicatedLog {
 
+  private[this] var startSnapshotId = snapshotIds

Review comment:
   Scala generates different code for `private[this]` vs `private` when 
used on a field.
   
   Scala visibility model is different from the JVM visibility model. Because 
of this when using `private` on a field, Scala generates access methods for the 
field. When using `private[this]` Scala doesn't generate these access methods. 
For example:
   
   ```scala
   final class PrivateThisTest {
 private[this] var privateThis = Some(10);
 private var justPrivate = Some(20);
   }
   ```
   generates (`javap -p ...`) the following class:
   ```java
   public final class PrivateThisTest {
 private scala.Some privateThis;
 private scala.Some justPrivate;
 private scala.Some justPrivate();
 private void justPrivate_$eq(scala.Some);
 public PrivateThisTest();
   }
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil commented on a change in pull request #9881: MINOR: Initialize QuorumState lazily in RaftClient.initialize()

2021-01-14 Thread GitBox


aloknnikhil commented on a change in pull request #9881:
URL: https://github.com/apache/kafka/pull/9881#discussion_r557624393



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -129,16 +130,25 @@ public String toString() {
 QUORUM_LINGER_MS_DOC);
 }
 
-public RaftConfig(Properties props) {
-super(CONFIG, props);
-}
+private final int requestTimeoutMs;
+private final int retryBackoffMs;
+private final int electionTimeoutMs;
+private final int electionBackoffMaxMs;
+private final int fetchTimeoutMs;
+private final int appendLingerMs;
 
-public RaftConfig(Map props) {
-super(CONFIG, props);
+public RaftConfig(Map props) {
+this(props, true);
 }
 
 protected RaftConfig(Map props, boolean doLog) {
 super(CONFIG, props, doLog);
+requestTimeoutMs = getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG);
+retryBackoffMs = getInt(QUORUM_RETRY_BACKOFF_MS_CONFIG);
+electionTimeoutMs = getInt(QUORUM_ELECTION_TIMEOUT_MS_CONFIG);
+electionBackoffMaxMs = getInt(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG);
+fetchTimeoutMs = getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG);
+appendLingerMs = getInt(QUORUM_LINGER_MS_CONFIG);

Review comment:
   Yea, I think that's reasonable. Added.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9883: MINOR: Generalize server startup to make way for KIP-500

2021-01-14 Thread GitBox


hachikuji commented on a change in pull request #9883:
URL: https://github.com/apache/kafka/pull/9883#discussion_r557622659



##
File path: core/src/main/scala/kafka/server/KafkaRaftController.scala
##
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+class KafkaRaftController {

Review comment:
   Decided to go with `ControllerServer` and `BrokerServer`. As Colin 
mentioned offline, the "server" suffix is helpful conveying that this component 
is responsible for starting sockets and such. This distinguishes 
`ControllerServer` from the actual implementation of the controller logic. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9881: MINOR: Initialize QuorumState lazily in RaftClient.initialize()

2021-01-14 Thread GitBox


hachikuji commented on a change in pull request #9881:
URL: https://github.com/apache/kafka/pull/9881#discussion_r557611820



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -129,16 +130,25 @@ public String toString() {
 QUORUM_LINGER_MS_DOC);
 }
 
-public RaftConfig(Properties props) {
-super(CONFIG, props);
-}
+private final int requestTimeoutMs;
+private final int retryBackoffMs;
+private final int electionTimeoutMs;
+private final int electionBackoffMaxMs;
+private final int fetchTimeoutMs;
+private final int appendLingerMs;
 
-public RaftConfig(Map props) {
-super(CONFIG, props);
+public RaftConfig(Map props) {
+this(props, true);
 }
 
 protected RaftConfig(Map props, boolean doLog) {
 super(CONFIG, props, doLog);
+requestTimeoutMs = getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG);
+retryBackoffMs = getInt(QUORUM_RETRY_BACKOFF_MS_CONFIG);
+electionTimeoutMs = getInt(QUORUM_ELECTION_TIMEOUT_MS_CONFIG);
+electionBackoffMaxMs = getInt(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG);
+fetchTimeoutMs = getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG);
+appendLingerMs = getInt(QUORUM_LINGER_MS_CONFIG);

Review comment:
   Is it worth storing a field for voter connections as well to be 
consistent?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10792) Source tasks can block herder thread by hanging during stop

2021-01-14 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17265116#comment-17265116
 ] 

Randall Hauch commented on KAFKA-10792:
---

Thanks for the backport PR for the `2.5` branch, [~ChrisEgerton]. I've merged 
it to the `2.5` branch for inclusion in the next patch release (2.5.2).

> Source tasks can block herder thread by hanging during stop
> ---
>
> Key: KAFKA-10792
> URL: https://issues.apache.org/jira/browse/KAFKA-10792
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.4.0, 2.5.0, 2.4.1, 2.6.0, 2.4.2, 2.5.1, 2.7.0
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 2.5.2, 2.8.0, 2.7.1, 2.6.2
>
>
> If a source task blocks during its {{stop}} method, the herder thread will 
> also block, which can cause issues with detecting rebalances, reconfiguring 
> connectors, and other vital functions of a Connect worker.
> This occurs because the call to {{SourceTask::stop}} occurs on the herder's 
> thread, instead of on the source task's own dedicated thread. This can be 
> fixed by moving the call to {{SourceTask::stop}} onto the source task's 
> dedicated thread and aligning with the current approach for {{Connector}} 
> instances and {{SinkTask}} instances.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10792) Source tasks can block herder thread by hanging during stop

2021-01-14 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-10792:
--
Fix Version/s: 2.5.2

> Source tasks can block herder thread by hanging during stop
> ---
>
> Key: KAFKA-10792
> URL: https://issues.apache.org/jira/browse/KAFKA-10792
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.4.0, 2.5.0, 2.4.1, 2.6.0, 2.4.2, 2.5.1, 2.7.0
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 2.5.2, 2.8.0, 2.7.1, 2.6.2
>
>
> If a source task blocks during its {{stop}} method, the herder thread will 
> also block, which can cause issues with detecting rebalances, reconfiguring 
> connectors, and other vital functions of a Connect worker.
> This occurs because the call to {{SourceTask::stop}} occurs on the herder's 
> thread, instead of on the source task's own dedicated thread. This can be 
> fixed by moving the call to {{SourceTask::stop}} onto the source task's 
> dedicated thread and aligning with the current approach for {{Connector}} 
> instances and {{SinkTask}} instances.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rhauch merged pull request #9880: KAFKA-10792 (2.5 backport): Prevent source task shutdown from blocking herder thread (#9669)

2021-01-14 Thread GitBox


rhauch merged pull request #9880:
URL: https://github.com/apache/kafka/pull/9880


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on pull request #9880: KAFKA-10792 (2.5 backport): Prevent source task shutdown from blocking herder thread (#9669)

2021-01-14 Thread GitBox


rhauch commented on pull request #9880:
URL: https://github.com/apache/kafka/pull/9880#issuecomment-760387979


   There seem to be environmental build issues; the builds are failing with 
"JDK" unknown and "maven" unknown.
   
   I'm going to go ahead and merge this PR to the `2.5` branch.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] g1geordie opened a new pull request #9899: KAFKA-12191 SslTransportTls12Tls13Test can replace 'assumeTrue' by (j…

2021-01-14 Thread GitBox


g1geordie opened a new pull request #9899:
URL: https://github.com/apache/kafka/pull/9899


   SslTransportTls12Tls13Test can replace 'assumeTrue' by (junit 5) conditional 
test
   Test in Java8
   ```
   testCiphersSuiteForTls12FailsForTls13()
   Disabled on JRE version: 1.8.0_275
   
   1.27 spassedtestCiphersSuiteForTls12()
   
   ignoredtestCiphersSuiteForTls13()
   Disabled on JRE version: 1.8.0_275
   
   ignoredtestCiphersSuiteFailForServerTls12ClientTls13()
   Disabled on JRE version: 1.8.0_275
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on pull request #9880: KAFKA-10792 (2.5 backport): Prevent source task shutdown from blocking herder thread (#9669)

2021-01-14 Thread GitBox


rhauch commented on pull request #9880:
URL: https://github.com/apache/kafka/pull/9880#issuecomment-760385519


   I'm trying to get the builds to run properly, but I did pull locally and the 
build of that local branch does pass.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9889: KAFKA-12203 Migrate connect:mirror-client module to JUnit 5

2021-01-14 Thread GitBox


chia7712 commented on pull request #9889:
URL: https://github.com/apache/kafka/pull/9889#issuecomment-760384843


   > Given the test results for the last build, I think you can merge this, 
right?
   
   It produces some failed tests but they pass on my local. I want to check QA 
again :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] g1geordie opened a new pull request #9898: KAFKA-12189 ShellTest can replace 'assumeTrue' by (junit 5) condition…

2021-01-14 Thread GitBox


g1geordie opened a new pull request #9898:
URL: https://github.com/apache/kafka/pull/9898


   ShellTest can replace 'assumeTrue' by (junit 5) conditional test
   
   I also test in windows .
   ```
   ShellTest > testEchoHello() SKIPPED
   ShellTest > testRunProgramWithErrorReturn() SKIPPED
   ShellTest > testHeadDevZero() SKIPPED
   ShellTest > testAttemptToRunNonExistentProgram() SKIPPED
   
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9889: KAFKA-12203 Migrate connect:mirror-client module to JUnit 5

2021-01-14 Thread GitBox


ijuma commented on pull request #9889:
URL: https://github.com/apache/kafka/pull/9889#issuecomment-760378170


   @chia7712 Given the test results for the last build, I think you can merge 
this, right?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9876: KAFKA-12183: Add the KIP-631 metadata record definitions

2021-01-14 Thread GitBox


ijuma commented on a change in pull request #9876:
URL: https://github.com/apache/kafka/pull/9876#discussion_r557596784



##
File path: build.gradle
##
@@ -1010,6 +1015,53 @@ project(':core') {
   }
 }
 
+project(':metadata') {
+  archivesBaseName = "kafka-metadata"
+
+  dependencies {
+compile project(':clients')
+compile libs.jacksonDatabind
+compile libs.jacksonJDK8Datatypes
+compileOnly libs.log4j
+testCompile libs.junitJupiter
+testCompile libs.hamcrest
+testCompile libs.slf4jlog4j
+testCompile project(':clients').sourceSets.test.output // for 
org.apache.kafka.test.IntegrationTest

Review comment:
   This is not right, the mentioned annotation is only used for JUnit 4. I 
submitted a minor PR that fixes the comment at least: 
https://github.com/apache/kafka/pull/9897





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma opened a new pull request #9897: MINOR: Remove incorrect code in metadata module build

2021-01-14 Thread GitBox


ijuma opened a new pull request #9897:
URL: https://github.com/apache/kafka/pull/9897


   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on pull request #9896: KAFKA-12206: o.a.k.common.Uuid should implement Comparable

2021-01-14 Thread GitBox


jolshan commented on pull request #9896:
URL: https://github.com/apache/kafka/pull/9896#issuecomment-760375020


   LGTM



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on pull request #9888: KAFKA-12194: use stateListener to catch each state change

2021-01-14 Thread GitBox


wcarlson5 commented on pull request #9888:
URL: https://github.com/apache/kafka/pull/9888#issuecomment-760368501


   @showuon These changes look good. Thanks for shoring up these tests



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe opened a new pull request #9896: KAFKA-12206: o.a.k.common.Uuid should implement Comparable

2021-01-14 Thread GitBox


cmccabe opened a new pull request #9896:
URL: https://github.com/apache/kafka/pull/9896


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12206) o.a.k.common.Uuid should implement Comparable

2021-01-14 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-12206:


 Summary: o.a.k.common.Uuid should implement Comparable 
 Key: KAFKA-12206
 URL: https://issues.apache.org/jira/browse/KAFKA-12206
 Project: Kafka
  Issue Type: Improvement
Reporter: Colin McCabe






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset

2021-01-14 Thread GitBox


jsancio commented on a change in pull request #9816:
URL: https://github.com/apache/kafka/pull/9816#discussion_r557586528



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -29,15 +32,22 @@ import org.apache.kafka.snapshot.FileRawSnapshotReader
 import org.apache.kafka.snapshot.FileRawSnapshotWriter
 import org.apache.kafka.snapshot.RawSnapshotReader
 import org.apache.kafka.snapshot.RawSnapshotWriter
+import org.apache.kafka.snapshot.Snapshots
 
 import scala.compat.java8.OptionConverters._
 
-class KafkaMetadataLog(
+final class KafkaMetadataLog private (
   log: Log,
+  snapshotIds: ConcurrentSkipListSet[raft.OffsetAndEpoch],

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset

2021-01-14 Thread GitBox


jsancio commented on a change in pull request #9816:
URL: https://github.com/apache/kafka/pull/9816#discussion_r557586344



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -113,6 +145,22 @@ class KafkaMetadataLog(
 log.truncateTo(offset)
   }
 
+  override def truncateFullyToLatestSnapshot(): Boolean = {
+// Truncate the log fully if the latest snapshot is greater than the log 
end offset
+var truncated = false

Review comment:
   Done. Used Scala's `match`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #9876: KAFKA-12183: Add the KIP-631 metadata record definitions

2021-01-14 Thread GitBox


cmccabe merged pull request #9876:
URL: https://github.com/apache/kafka/pull/9876


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset

2021-01-14 Thread GitBox


jsancio commented on a change in pull request #9816:
URL: https://github.com/apache/kafka/pull/9816#discussion_r557586025



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -131,6 +179,10 @@ class KafkaMetadataLog(
 }
   }
 
+  override def highWatermark: Long = {

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12190) Failure on Windows due to an UnsupportedOperationException when StateDirectory sets file permissions

2021-01-14 Thread Gary Russell (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17265081#comment-17265081
 ] 

Gary Russell commented on KAFKA-12190:
--

Given that this is a critical bug, and a regression in a point release (2.6.1), 
doesn't this warrant a fairly swift 2.6.2 release?

> Failure on Windows due to an UnsupportedOperationException when 
> StateDirectory sets file permissions
> 
>
> Key: KAFKA-12190
> URL: https://issues.apache.org/jira/browse/KAFKA-12190
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.1
>Reporter: Andy Wilkinson
>Priority: Critical
> Fix For: 2.8.0
>
>
> There appears to be a regression in Kafka 2.6.1 due to [the 
> changes|https://github.com/apache/kafka/pull/9583] made for KAFKA-10705 that 
> causes a failure on Windows. After upgrading to 2.6.1 from 2.6.0, we're 
> seeing failures in Spring Boot's CI on Windows such as the following:
> {noformat}
> Caused by: java.lang.UnsupportedOperationException: (No message provided)
> at java.nio.file.Files.setPosixFilePermissions(Files.java:2044)
> at 
> org.apache.kafka.streams.processor.internals.StateDirectory.(StateDirectory.java:115)
> 
> at 
> org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:745)
> at 
> org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:585)
> at 
> org.springframework.kafka.config.StreamsBuilderFactoryBean.start(StreamsBuilderFactoryBean.java:316)
> 
> at 
> org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178)
> 
> at 
> org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54)
> 
> at 
> org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356)
> 
> at java.lang.Iterable.forEach(Iterable.java:75)
> at 
> org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155)
> 
> at 
> org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123)
> 
> at 
> org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:940)
> 
> at 
> org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:591)
> 
> at 
> org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.configureContext(AbstractApplicationContextRunner.java:447)
> 
> at 
> org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.createAndLoadContext(AbstractApplicationContextRunner.java:423)
> 
> at 
> org.springframework.boot.test.context.assertj.AssertProviderApplicationContextInvocationHandler.getContextOrStartupFailure(AssertProviderApplicationContextInvocationHandler.java:61)
> 
> at 
> org.springframework.boot.test.context.assertj.AssertProviderApplicationContextInvocationHandler.(AssertProviderApplicationContextInvocationHandler.java:48)
> 
> at 
> org.springframework.boot.test.context.assertj.ApplicationContextAssertProvider.get(ApplicationContextAssertProvider.java:112)
> 
> at 
> org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.createAssertableContext(AbstractApplicationContextRunner.java:412)
> 
> at 
> org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.lambda$null$0(AbstractApplicationContextRunner.java:382)
> 
> at 
> org.springframework.boot.test.util.TestPropertyValues.applyToSystemProperties(TestPropertyValues.java:175)
> 
> at 
> org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.lambda$run$1(AbstractApplicationContextRunner.java:381)
> 
> at 
> org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.withContextClassLoader(AbstractApplicationContextRunner.java:392)
> 
> at 
> org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.run(AbstractApplicationContextRunner.java:381)
> 
> at 
> org.springframework.boot.actuate.autoconfigure.metrics.KafkaMetricsAutoConfigurationTests.whenKafkaStreamsIsEnabledAndThereIsNoMeterRegistryThenListenerCustomizationBacksOff(KafkaMetricsAutoConfigurationTests.java:92)
> {noformat}
> The same code worked without changes using Kafka 2.6.0.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset

2021-01-14 Thread GitBox


jsancio commented on a change in pull request #9816:
URL: https://github.com/apache/kafka/pull/9816#discussion_r557585643



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -79,13 +94,30 @@ class KafkaMetadataLog(
   throw new IllegalArgumentException("Attempt to append an empty record 
set")
 
 val appendInfo = log.appendAsFollower(records.asInstanceOf[MemoryRecords])
+
+if (appendInfo.rolled) {
+  log.deleteOldSegments()
+}
+
 new LogAppendInfo(appendInfo.firstOffset.getOrElse {
   throw new KafkaException("Append failed unexpectedly")
 }, appendInfo.lastOffset)
   }
 
   override def lastFetchedEpoch: Int = {
-log.latestEpoch.getOrElse(0)
+log.latestEpoch.getOrElse {
+  latestSnapshotId.map { snapshotId =>
+val logEndOffset = endOffset().offset
+if (snapshotId.offset == logEndOffset) {

Review comment:
   I made this explicit by checking both log start offset and log end 
offset.

##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -113,6 +145,22 @@ class KafkaMetadataLog(
 log.truncateTo(offset)
   }
 
+  override def truncateFullyToLatestSnapshot(): Boolean = {
+// Truncate the log fully if the latest snapshot is greater than the log 
end offset
+var truncated = false
+latestSnapshotId.ifPresent { snapshotId =>
+  if (snapshotId.epoch > log.latestEpoch.getOrElse(0) ||

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset

2021-01-14 Thread GitBox


jsancio commented on a change in pull request #9816:
URL: https://github.com/apache/kafka/pull/9816#discussion_r557585891



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -147,18 +199,106 @@ class KafkaMetadataLog(
   }
 
   override def createSnapshot(snapshotId: raft.OffsetAndEpoch): 
RawSnapshotWriter = {
-FileRawSnapshotWriter.create(log.dir.toPath, snapshotId)
+// TODO: Talk to Jason about truncation past the high-watermark since it 
can lead to truncation past snapshots.
+// This can result in the leader having a snapshot that is less that the 
follower's snapshot. I think that the Raft
+// Client checks against this and aborts. If so, then this check and 
exception is okay.
+
+// Do let the state machine create snapshots older than the latest snapshot
+latestSnapshotId().ifPresent { latest =>
+  if (latest.epoch > snapshotId.epoch || latest.offset > 
snapshotId.offset) {
+// Since snapshots are less than the high-watermark absolute offset 
comparison is okay.
+throw new IllegalArgumentException(
+  s"Attemting to create a snapshot ($snapshotId) that is not greater 
than the latest snapshot ($latest)"
+)
+  }
+}
+
+FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, Optional.of(this))
   }
 
   override def readSnapshot(snapshotId: raft.OffsetAndEpoch): 
Optional[RawSnapshotReader] = {
 try {
-  Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+  if (snapshotIds.contains(snapshotId)) {
+Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+  } else {
+Optional.empty()
+  }
 } catch {
-  case e: NoSuchFileException => Optional.empty()
+  case e: NoSuchFileException =>
+Optional.empty()
 }
   }
 
+  override def latestSnapshotId(): Optional[raft.OffsetAndEpoch] = {
+try {
+  Optional.of(snapshotIds.last)
+} catch {
+  case _: NoSuchElementException =>
+Optional.empty()
+}
+  }
+
+  override def startSnapshotId(): Optional[raft.OffsetAndEpoch] = {
+startSnapshotId;

Review comment:
   Hehe. Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset

2021-01-14 Thread GitBox


jsancio commented on a change in pull request #9816:
URL: https://github.com/apache/kafka/pull/9816#discussion_r557584847



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -29,15 +32,22 @@ import org.apache.kafka.snapshot.FileRawSnapshotReader
 import org.apache.kafka.snapshot.FileRawSnapshotWriter
 import org.apache.kafka.snapshot.RawSnapshotReader
 import org.apache.kafka.snapshot.RawSnapshotWriter
+import org.apache.kafka.snapshot.Snapshots
 
 import scala.compat.java8.OptionConverters._
 
-class KafkaMetadataLog(
+final class KafkaMetadataLog private (
   log: Log,
+  snapshotIds: ConcurrentSkipListSet[raft.OffsetAndEpoch],
   topicPartition: TopicPartition,
-  maxFetchSizeInBytes: Int = 1024 * 1024
+  maxFetchSizeInBytes: Int
 ) extends ReplicatedLog {
 
+  private[this] var startSnapshotId = snapshotIds

Review comment:
   Give me a sec to reply to this.

##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -79,13 +94,30 @@ class KafkaMetadataLog(
   throw new IllegalArgumentException("Attempt to append an empty record 
set")
 
 val appendInfo = log.appendAsFollower(records.asInstanceOf[MemoryRecords])
+
+if (appendInfo.rolled) {
+  log.deleteOldSegments()
+}
+
 new LogAppendInfo(appendInfo.firstOffset.getOrElse {
   throw new KafkaException("Append failed unexpectedly")
 }, appendInfo.lastOffset)
   }
 
   override def lastFetchedEpoch: Int = {
-log.latestEpoch.getOrElse(0)
+log.latestEpoch.getOrElse {
+  latestSnapshotId.map { snapshotId =>
+val logEndOffset = endOffset().offset
+if (snapshotId.offset == logEndOffset) {
+  snapshotId.epoch
+} else {
+  throw new KafkaException(
+s"Log doesn't have a last fetch epoch and there is a snapshot 
($snapshotId). " +
+s"Expected the snapshot's end offset to match the logs end offset 
($logEndOffset)"
+  )
+}
+  } orElse(0)

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil commented on a change in pull request #9881: MINOR: Initialize QuorumState lazily in RaftClient.initialize()

2021-01-14 Thread GitBox


aloknnikhil commented on a change in pull request #9881:
URL: https://github.com/apache/kafka/pull/9881#discussion_r557584646



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -129,16 +133,41 @@ public String toString() {
 QUORUM_LINGER_MS_DOC);
 }
 
+private final int requestTimeoutMs;
+private final int retryBackoffMs;
+private final int electionTimeoutMs;
+private final int electionBackoffMaxMs;
+private final int fetchTimeoutMs;
+private final int appendLingerMs;
+
 public RaftConfig(Properties props) {
 super(CONFIG, props);
+requestTimeoutMs = getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG);
+retryBackoffMs = getInt(QUORUM_RETRY_BACKOFF_MS_CONFIG);
+electionTimeoutMs = getInt(QUORUM_ELECTION_TIMEOUT_MS_CONFIG);
+electionBackoffMaxMs = getInt(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG);
+fetchTimeoutMs = getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG);
+appendLingerMs = getInt(QUORUM_LINGER_MS_CONFIG);
 }
 
-public RaftConfig(Map props) {
+public RaftConfig(Map props) {
 super(CONFIG, props);
+requestTimeoutMs = getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG);

Review comment:
   Perfect. Removed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9881: MINOR: Initialize QuorumState lazily in RaftClient.initialize()

2021-01-14 Thread GitBox


hachikuji commented on a change in pull request #9881:
URL: https://github.com/apache/kafka/pull/9881#discussion_r557584121



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -129,16 +133,41 @@ public String toString() {
 QUORUM_LINGER_MS_DOC);
 }
 
+private final int requestTimeoutMs;
+private final int retryBackoffMs;
+private final int electionTimeoutMs;
+private final int electionBackoffMaxMs;
+private final int fetchTimeoutMs;
+private final int appendLingerMs;
+
 public RaftConfig(Properties props) {
 super(CONFIG, props);
+requestTimeoutMs = getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG);
+retryBackoffMs = getInt(QUORUM_RETRY_BACKOFF_MS_CONFIG);
+electionTimeoutMs = getInt(QUORUM_ELECTION_TIMEOUT_MS_CONFIG);
+electionBackoffMaxMs = getInt(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG);
+fetchTimeoutMs = getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG);
+appendLingerMs = getInt(QUORUM_LINGER_MS_CONFIG);
 }
 
-public RaftConfig(Map props) {
+public RaftConfig(Map props) {
 super(CONFIG, props);
+requestTimeoutMs = getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG);

Review comment:
   Yeah, that's why I said it was redundant in the other comment 🙂 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9876: KAFKA-12183: Add the KIP-631 metadata record definitions

2021-01-14 Thread GitBox


cmccabe commented on a change in pull request #9876:
URL: https://github.com/apache/kafka/pull/9876#discussion_r557580624



##
File path: build.gradle
##
@@ -1010,6 +1015,53 @@ project(':core') {
   }
 }
 
+project(':metadata') {
+  archivesBaseName = "kafka-metadata"
+
+  dependencies {
+compile project(':clients')
+compile libs.jacksonDatabind
+compile libs.jacksonJDK8Datatypes
+compileOnly libs.log4j
+testCompile libs.junitJupiter
+testCompile libs.hamcrest
+testCompile libs.slf4jlog4j
+testCompile project(':clients').sourceSets.test.output // for 
org.apache.kafka.test.IntegrationTest

Review comment:
   later PRs add IntegrationTest to the metadata module... simpler to just 
keep it in this PR





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9876: KAFKA-12183: Add the KIP-631 metadata record definitions

2021-01-14 Thread GitBox


cmccabe commented on a change in pull request #9876:
URL: https://github.com/apache/kafka/pull/9876#discussion_r557580302



##
File path: build.gradle
##
@@ -1010,6 +1015,53 @@ project(':core') {
   }
 }
 
+project(':metadata') {
+  archivesBaseName = "kafka-metadata"
+
+  dependencies {
+compile project(':clients')
+compile libs.jacksonDatabind
+compile libs.jacksonJDK8Datatypes
+compileOnly libs.log4j
+testCompile libs.junitJupiter
+testCompile libs.hamcrest

Review comment:
   yes, we use it





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9832: KAFKA-12152; Idempotent Producer does not reset the sequence number of partitions without in-flight batches

2021-01-14 Thread GitBox


dajac commented on a change in pull request #9832:
URL: https://github.com/apache/kafka/pull/9832#discussion_r557576590



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##
@@ -598,6 +618,16 @@ synchronized Integer sequenceNumber(TopicPartition 
topicPartition) {
 return 
topicPartitionBookkeeper.getPartition(topicPartition).nextSequence;
 }
 
+/**
+ * Returns the current producer id/epoch of the given TopicPartition.
+ */
+synchronized ProducerIdAndEpoch producerIdAndEpoch(TopicPartition 
topicPartition) {
+if (!isTransactional())
+topicPartitionBookkeeper.addPartition(topicPartition);

Review comment:
   I wonder if we could create the partition state once for all instead of 
having to do this here and in other places. I need to find where we could do so.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil commented on a change in pull request #9881: MINOR: Initialize QuorumState lazily in RaftClient.initialize()

2021-01-14 Thread GitBox


aloknnikhil commented on a change in pull request #9881:
URL: https://github.com/apache/kafka/pull/9881#discussion_r557575111



##
File path: raft/src/test/java/org/apache/kafka/raft/RaftTestUtil.java
##
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.apache.kafka.common.Node;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class RaftTestUtil {
+public static RaftConfig buildRaftConfig(int requestTimeoutMs,

Review comment:
   Fixed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil commented on a change in pull request #9881: MINOR: Initialize QuorumState lazily in RaftClient.initialize()

2021-01-14 Thread GitBox


aloknnikhil commented on a change in pull request #9881:
URL: https://github.com/apache/kafka/pull/9881#discussion_r557574782



##
File path: raft/src/test/java/org/apache/kafka/raft/RaftConfigTest.java
##
@@ -24,6 +24,7 @@
 import java.util.HashMap;
 import java.util.Properties;
 
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;

Review comment:
   Done. Removed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil commented on a change in pull request #9881: MINOR: Initialize QuorumState lazily in RaftClient.initialize()

2021-01-14 Thread GitBox


aloknnikhil commented on a change in pull request #9881:
URL: https://github.com/apache/kafka/pull/9881#discussion_r557574380



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -129,16 +133,41 @@ public String toString() {
 QUORUM_LINGER_MS_DOC);
 }
 
+private final int requestTimeoutMs;
+private final int retryBackoffMs;
+private final int electionTimeoutMs;
+private final int electionBackoffMaxMs;
+private final int fetchTimeoutMs;
+private final int appendLingerMs;
+
 public RaftConfig(Properties props) {
 super(CONFIG, props);
+requestTimeoutMs = getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG);
+retryBackoffMs = getInt(QUORUM_RETRY_BACKOFF_MS_CONFIG);
+electionTimeoutMs = getInt(QUORUM_ELECTION_TIMEOUT_MS_CONFIG);
+electionBackoffMaxMs = getInt(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG);
+fetchTimeoutMs = getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG);
+appendLingerMs = getInt(QUORUM_LINGER_MS_CONFIG);
 }
 
-public RaftConfig(Map props) {
+public RaftConfig(Map props) {
 super(CONFIG, props);
+requestTimeoutMs = getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG);

Review comment:
   Actually, that means I can remove the `public RaftConfig(Properties 
props)` constructor. Does that make sense @hachikuji?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9832: KAFKA-12152; Idempotent Producer does not reset the sequence number of partitions without in-flight batches

2021-01-14 Thread GitBox


dajac commented on a change in pull request #9832:
URL: https://github.com/apache/kafka/pull/9832#discussion_r557574182



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##
@@ -567,8 +571,18 @@ private void bumpIdempotentProducerEpoch() {
 
this.topicPartitionBookkeeper.startSequencesAtBeginning(topicPartition, 
this.producerIdAndEpoch);
 this.partitionsWithUnresolvedSequences.remove(topicPartition);
 }
-
 this.partitionsToRewriteSequences.clear();
+
+// When the epoch is bumped, reset the sequences for the partitions(s) 
that do not
+// have in-flight batches. Sequences of the in-flight batches that did 
not triggered
+// the epoch bump are treated when their last in-flight batch is 
completed.
+for (TopicPartition topicPartition : 
this.topicPartitionBookkeeper.partitions()) {

Review comment:
   Thanks for your comment. I do agree with your conclusion about point 2. 
I have managed to reproduce this in a unit test as well.
   
   Regarding your suggestion to store the producer id/epoch per partition, I 
like it. I actually thought about this as well but went conservative for my 
first draft. I have update the PR to reflect your suggestion. Please take a 
look and let me know what you think.
   
   I think that there are still room for improvement in the PR. Especially, I 
have to review the unit tests. Some might not be entirely relevant with the 
changes done in this PR or some might need to be refactored a bit more. I am 
not sure.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil commented on a change in pull request #9881: MINOR: Initialize QuorumState lazily in RaftClient.initialize()

2021-01-14 Thread GitBox


aloknnikhil commented on a change in pull request #9881:
URL: https://github.com/apache/kafka/pull/9881#discussion_r557573785



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -129,16 +133,41 @@ public String toString() {
 QUORUM_LINGER_MS_DOC);
 }
 
+private final int requestTimeoutMs;
+private final int retryBackoffMs;
+private final int electionTimeoutMs;
+private final int electionBackoffMaxMs;
+private final int fetchTimeoutMs;
+private final int appendLingerMs;
+
 public RaftConfig(Properties props) {
 super(CONFIG, props);
+requestTimeoutMs = getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG);
+retryBackoffMs = getInt(QUORUM_RETRY_BACKOFF_MS_CONFIG);
+electionTimeoutMs = getInt(QUORUM_ELECTION_TIMEOUT_MS_CONFIG);
+electionBackoffMaxMs = getInt(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG);
+fetchTimeoutMs = getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG);
+appendLingerMs = getInt(QUORUM_LINGER_MS_CONFIG);
 }
 
-public RaftConfig(Map props) {
+public RaftConfig(Map props) {
 super(CONFIG, props);
+requestTimeoutMs = getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG);

Review comment:
   Ah thanks for catching that. Turns out both public constructors can use 
the protected constructor (Properties implements Map). Will refactor both.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil commented on a change in pull request #9881: MINOR: Initialize QuorumState lazily in RaftClient.initialize()

2021-01-14 Thread GitBox


aloknnikhil commented on a change in pull request #9881:
URL: https://github.com/apache/kafka/pull/9881#discussion_r557571170



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -129,16 +133,41 @@ public String toString() {
 QUORUM_LINGER_MS_DOC);
 }
 
+private final int requestTimeoutMs;
+private final int retryBackoffMs;
+private final int electionTimeoutMs;
+private final int electionBackoffMaxMs;
+private final int fetchTimeoutMs;
+private final int appendLingerMs;
+
 public RaftConfig(Properties props) {

Review comment:
   Looks like there are a bunch of tests that seem to be using this 
constructor
   - `testSingleQuorumVoterConnections`
   - `testMultiQuorumVoterConnections`
   - `testInvalidQuorumVotersConfig`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9881: MINOR: Initialize QuorumState lazily in RaftClient.initialize()

2021-01-14 Thread GitBox


hachikuji commented on a change in pull request #9881:
URL: https://github.com/apache/kafka/pull/9881#discussion_r557559024



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -129,16 +133,41 @@ public String toString() {
 QUORUM_LINGER_MS_DOC);
 }
 
+private final int requestTimeoutMs;
+private final int retryBackoffMs;
+private final int electionTimeoutMs;
+private final int electionBackoffMaxMs;
+private final int fetchTimeoutMs;
+private final int appendLingerMs;
+
 public RaftConfig(Properties props) {

Review comment:
   nit: I think this constructor is redundant. can we remove it?

##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -129,16 +133,41 @@ public String toString() {
 QUORUM_LINGER_MS_DOC);
 }
 
+private final int requestTimeoutMs;
+private final int retryBackoffMs;
+private final int electionTimeoutMs;
+private final int electionBackoffMaxMs;
+private final int fetchTimeoutMs;
+private final int appendLingerMs;
+
 public RaftConfig(Properties props) {
 super(CONFIG, props);
+requestTimeoutMs = getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG);
+retryBackoffMs = getInt(QUORUM_RETRY_BACKOFF_MS_CONFIG);
+electionTimeoutMs = getInt(QUORUM_ELECTION_TIMEOUT_MS_CONFIG);
+electionBackoffMaxMs = getInt(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG);
+fetchTimeoutMs = getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG);
+appendLingerMs = getInt(QUORUM_LINGER_MS_CONFIG);
 }
 
-public RaftConfig(Map props) {
+public RaftConfig(Map props) {
 super(CONFIG, props);
+requestTimeoutMs = getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG);

Review comment:
   nit: the typical thing to do is invoke the other constructor. e.g. 
`this(props, true)`

##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -2163,7 +2177,13 @@ public Long scheduleAppend(int epoch, List records) {
 
 @Override
 public void close() {
-kafkaRaftMetrics.close();
+if (kafkaRaftMetrics != null) {
+kafkaRaftMetrics.close();
+}
+}
+
+public QuorumState quorum() {

Review comment:
   Any chance we can make this default access?

##
File path: raft/src/test/java/org/apache/kafka/raft/RaftTestUtil.java
##
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.apache.kafka.common.Node;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class RaftTestUtil {
+public static RaftConfig buildRaftConfig(int requestTimeoutMs,

Review comment:
   nit: convention in the raft code is to align first parameter on next 
line:
   ```
   public static RaftConfig buildRaftConfig(
   int requestTimeoutMs,
   int retryBackoffMs,
   ...
   List voterNodes
   ) {
   ```

##
File path: raft/src/test/java/org/apache/kafka/raft/RaftConfigTest.java
##
@@ -24,6 +24,7 @@
 import java.util.HashMap;
 import java.util.Properties;
 
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;

Review comment:
   This does not seem used?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset

2021-01-14 Thread GitBox


jsancio commented on a change in pull request #9816:
URL: https://github.com/apache/kafka/pull/9816#discussion_r557565077



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -147,18 +199,106 @@ class KafkaMetadataLog(
   }
 
   override def createSnapshot(snapshotId: raft.OffsetAndEpoch): 
RawSnapshotWriter = {
-FileRawSnapshotWriter.create(log.dir.toPath, snapshotId)
+// TODO: Talk to Jason about truncation past the high-watermark since it 
can lead to truncation past snapshots.
+// This can result in the leader having a snapshot that is less that the 
follower's snapshot. I think that the Raft
+// Client checks against this and aborts. If so, then this check and 
exception is okay.
+
+// Do let the state machine create snapshots older than the latest snapshot
+latestSnapshotId().ifPresent { latest =>
+  if (latest.epoch > snapshotId.epoch || latest.offset > 
snapshotId.offset) {
+// Since snapshots are less than the high-watermark absolute offset 
comparison is okay.
+throw new IllegalArgumentException(
+  s"Attemting to create a snapshot ($snapshotId) that is not greater 
than the latest snapshot ($latest)"
+)
+  }
+}
+
+FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, Optional.of(this))
   }
 
   override def readSnapshot(snapshotId: raft.OffsetAndEpoch): 
Optional[RawSnapshotReader] = {
 try {
-  Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+  if (snapshotIds.contains(snapshotId)) {
+Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+  } else {
+Optional.empty()
+  }
 } catch {
-  case e: NoSuchFileException => Optional.empty()
+  case e: NoSuchFileException =>
+Optional.empty()
 }
   }
 
+  override def latestSnapshotId(): Optional[raft.OffsetAndEpoch] = {
+try {
+  Optional.of(snapshotIds.last)
+} catch {
+  case _: NoSuchElementException =>
+Optional.empty()
+}
+  }
+
+  override def startSnapshotId(): Optional[raft.OffsetAndEpoch] = {
+startSnapshotId;
+  }
+
+  override def snapshotFrozen(snapshotId: raft.OffsetAndEpoch): Unit = {
+snapshotIds.add(snapshotId)
+  }
+
+  override def updateLogStart(logStartSnapshotId: raft.OffsetAndEpoch): 
Boolean = {

Review comment:
   Yes.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset

2021-01-14 Thread GitBox


jsancio commented on a change in pull request #9816:
URL: https://github.com/apache/kafka/pull/9816#discussion_r557563029



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -147,18 +199,106 @@ class KafkaMetadataLog(
   }
 
   override def createSnapshot(snapshotId: raft.OffsetAndEpoch): 
RawSnapshotWriter = {
-FileRawSnapshotWriter.create(log.dir.toPath, snapshotId)
+// TODO: Talk to Jason about truncation past the high-watermark since it 
can lead to truncation past snapshots.
+// This can result in the leader having a snapshot that is less that the 
follower's snapshot. I think that the Raft
+// Client checks against this and aborts. If so, then this check and 
exception is okay.
+
+// Do let the state machine create snapshots older than the latest snapshot
+latestSnapshotId().ifPresent { latest =>
+  if (latest.epoch > snapshotId.epoch || latest.offset > 
snapshotId.offset) {
+// Since snapshots are less than the high-watermark absolute offset 
comparison is okay.
+throw new IllegalArgumentException(
+  s"Attemting to create a snapshot ($snapshotId) that is not greater 
than the latest snapshot ($latest)"
+)
+  }
+}
+
+FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, Optional.of(this))
   }
 
   override def readSnapshot(snapshotId: raft.OffsetAndEpoch): 
Optional[RawSnapshotReader] = {
 try {
-  Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+  if (snapshotIds.contains(snapshotId)) {
+Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+  } else {
+Optional.empty()
+  }
 } catch {
-  case e: NoSuchFileException => Optional.empty()
+  case e: NoSuchFileException =>
+Optional.empty()
 }
   }
 
+  override def latestSnapshotId(): Optional[raft.OffsetAndEpoch] = {
+try {
+  Optional.of(snapshotIds.last)
+} catch {
+  case _: NoSuchElementException =>
+Optional.empty()
+}
+  }
+
+  override def startSnapshotId(): Optional[raft.OffsetAndEpoch] = {

Review comment:
   Sounds good. In this PR it is not technically the oldest snapshot but it 
will be when we implement deleting snapshot in 
https://issues.apache.org/jira/browse/KAFKA-12205





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #9895: KAFKA-9924: Add docs for RocksDB properties-based metrics

2021-01-14 Thread GitBox


cadonna commented on pull request #9895:
URL: https://github.com/apache/kafka/pull/9895#issuecomment-760336972


   Call for review: @guozhangwang @ableegoldman @lct45 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna opened a new pull request #9895: KAFKA-9924: Add docs for RocksDB properties-based metrics

2021-01-14 Thread GitBox


cadonna opened a new pull request #9895:
URL: https://github.com/apache/kafka/pull/9895


   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset

2021-01-14 Thread GitBox


jsancio commented on a change in pull request #9816:
URL: https://github.com/apache/kafka/pull/9816#discussion_r557557769



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -147,18 +199,106 @@ class KafkaMetadataLog(
   }
 
   override def createSnapshot(snapshotId: raft.OffsetAndEpoch): 
RawSnapshotWriter = {
-FileRawSnapshotWriter.create(log.dir.toPath, snapshotId)
+// TODO: Talk to Jason about truncation past the high-watermark since it 
can lead to truncation past snapshots.
+// This can result in the leader having a snapshot that is less that the 
follower's snapshot. I think that the Raft
+// Client checks against this and aborts. If so, then this check and 
exception is okay.
+
+// Do let the state machine create snapshots older than the latest snapshot
+latestSnapshotId().ifPresent { latest =>
+  if (latest.epoch > snapshotId.epoch || latest.offset > 
snapshotId.offset) {
+// Since snapshots are less than the high-watermark absolute offset 
comparison is okay.
+throw new IllegalArgumentException(
+  s"Attemting to create a snapshot ($snapshotId) that is not greater 
than the latest snapshot ($latest)"
+)
+  }
+}
+
+FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, Optional.of(this))
   }
 
   override def readSnapshot(snapshotId: raft.OffsetAndEpoch): 
Optional[RawSnapshotReader] = {
 try {
-  Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+  if (snapshotIds.contains(snapshotId)) {
+Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+  } else {
+Optional.empty()
+  }
 } catch {
-  case e: NoSuchFileException => Optional.empty()
+  case e: NoSuchFileException =>
+Optional.empty()
 }
   }
 
+  override def latestSnapshotId(): Optional[raft.OffsetAndEpoch] = {
+try {
+  Optional.of(snapshotIds.last)
+} catch {
+  case _: NoSuchElementException =>

Review comment:
   I agree. Unfortunately, `pollLast` removes the element from the set. I 
don't think we have a choice but to do this. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset

2021-01-14 Thread GitBox


jsancio commented on a change in pull request #9816:
URL: https://github.com/apache/kafka/pull/9816#discussion_r557557769



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -147,18 +199,106 @@ class KafkaMetadataLog(
   }
 
   override def createSnapshot(snapshotId: raft.OffsetAndEpoch): 
RawSnapshotWriter = {
-FileRawSnapshotWriter.create(log.dir.toPath, snapshotId)
+// TODO: Talk to Jason about truncation past the high-watermark since it 
can lead to truncation past snapshots.
+// This can result in the leader having a snapshot that is less that the 
follower's snapshot. I think that the Raft
+// Client checks against this and aborts. If so, then this check and 
exception is okay.
+
+// Do let the state machine create snapshots older than the latest snapshot
+latestSnapshotId().ifPresent { latest =>
+  if (latest.epoch > snapshotId.epoch || latest.offset > 
snapshotId.offset) {
+// Since snapshots are less than the high-watermark absolute offset 
comparison is okay.
+throw new IllegalArgumentException(
+  s"Attemting to create a snapshot ($snapshotId) that is not greater 
than the latest snapshot ($latest)"
+)
+  }
+}
+
+FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, Optional.of(this))
   }
 
   override def readSnapshot(snapshotId: raft.OffsetAndEpoch): 
Optional[RawSnapshotReader] = {
 try {
-  Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+  if (snapshotIds.contains(snapshotId)) {
+Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+  } else {
+Optional.empty()
+  }
 } catch {
-  case e: NoSuchFileException => Optional.empty()
+  case e: NoSuchFileException =>
+Optional.empty()
 }
   }
 
+  override def latestSnapshotId(): Optional[raft.OffsetAndEpoch] = {
+try {
+  Optional.of(snapshotIds.last)
+} catch {
+  case _: NoSuchElementException =>

Review comment:
   I agree. Unfortunately, `pollLast` removes the element from the list. I 
don't think we have a choice but to do this. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset

2021-01-14 Thread GitBox


jsancio commented on a change in pull request #9816:
URL: https://github.com/apache/kafka/pull/9816#discussion_r557556584



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -147,18 +199,106 @@ class KafkaMetadataLog(
   }
 
   override def createSnapshot(snapshotId: raft.OffsetAndEpoch): 
RawSnapshotWriter = {
-FileRawSnapshotWriter.create(log.dir.toPath, snapshotId)
+// TODO: Talk to Jason about truncation past the high-watermark since it 
can lead to truncation past snapshots.

Review comment:
   If you look at the if statement below this comment...
   ```scala
   if (latest.epoch > snapshotId.epoch || latest.offset > snapshotId.offset) {
   ```
   That naked offset comparison is only valid if the snapshot are always less 
that the high-watermark and we never truncate past the high-watermark.
   
   I checked this after writing the comment. Out side of data loss we don't 
allow followers to truncate past the high watermark.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10877) Instantiating loggers for every FetchContext causes low request handler idle pool ratio.

2021-01-14 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17265009#comment-17265009
 ] 

Ismael Juma commented on KAFKA-10877:
-

Sounds good.

> Instantiating loggers for every FetchContext causes low request handler idle 
> pool ratio.
> 
>
> Key: KAFKA-10877
> URL: https://issues.apache.org/jira/browse/KAFKA-10877
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sean McCauliff
>Assignee: Sean McCauliff
>Priority: Major
>
> JDK11 has removed some classes used by log4j2 to initialize logging contexts. 
>  Now log4j2 uses StackWalker to discover where it has been instantiated.  
> StackWalker is apparently very expensive.
> Kafka has a Logging trait.  Classes which want to log application messages 
> get access to the methods provided by the trait by mixing them in using "with 
> Logging".  When this is done on scala object (a singleton) this is fine as 
> the logging context in the Logging trait is only initialized at most once.   
> When this is done on class (e.g. class X extends Logging) the logging context 
> is potentially created for each instance.  The logging context is needed to 
> determine if a log message will be emitted.  So if the method debug("log me") 
> is called the logging context is still initialized to determine if debug 
> logging is enabled.  Initializing the logging context calls StackWalker.  
> This can't be avoided even if the log message would never be written to the 
> log.
> IncrementalFetchContext is one such class that is inheriting from Logging and 
> incurring a very high cpu cost.  It also does this inside of locks.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >