[GitHub] [kafka] satishd commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-19 Thread via GitHub


satishd commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1299301497


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -64,302 +65,393 @@
 class ConsumerTask implements Runnable, Closeable {
 private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-private static final long POLL_INTERVAL_MS = 100L;
+static long pollIntervalMs = 100L;

Review Comment:
   +1 on `pollIntervalMs` should not be a static field here. We can also have 
this as a config if needed later. 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-19 Thread via GitHub


satishd commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1299301497


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -64,302 +65,393 @@
 class ConsumerTask implements Runnable, Closeable {
 private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-private static final long POLL_INTERVAL_MS = 100L;
+static long pollIntervalMs = 100L;

Review Comment:
   `pollIntervalMs` should not be a static field here. We can also have this as 
a config if needed later. 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14133) Remaining EasyMock to Mockito tests

2023-08-19 Thread Fei Xie (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17756373#comment-17756373
 ] 

Fei Xie commented on KAFKA-14133:
-

Sent out a few more PRs. All tests until #21 
MeteredTimestampedKeyValueStoreTest (including #21) are taken care of now.

> Remaining EasyMock to Mockito tests
> ---
>
> Key: KAFKA-14133
> URL: https://issues.apache.org/jira/browse/KAFKA-14133
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
>
> {color:#de350b}There are tests which use both PowerMock and EasyMock. I have 
> put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here 
> rely solely on EasyMock.{color}
> Unless stated in brackets the tests are in the streams module.
> A list of tests which still require to be moved from EasyMock to Mockito as 
> of 2nd of August 2022 which do not have a Jira issue and do not have pull 
> requests I am aware of which are opened:
> {color:#ff8b00}In Review{color}
> {color:#00875a}Merged{color}
>  # {color:#00875a}WorkerConnectorTest{color} (connect) (owner: [~yash.mayya] )
>  # {color:#00875a}WorkerCoordinatorTest{color} (connect) (owner: 
> [~yash.mayya] )
>  # {color:#00875a}RootResourceTest{color} (connect) (owner: [~yash.mayya] )
>  # {color:#00875a}ByteArrayProducerRecordEquals{color} (connect) (owner: 
> [~yash.mayya] )
>  # {color:#00875a}KStreamFlatTransformTest{color} (owner: Christo)
>  # {color:#00875a}KStreamFlatTransformValuesTest{color} (owner: Christo)
>  # {color:#00875a}KStreamPrintTest{color} (owner: Christo)
>  # {color:#00875a}KStreamRepartitionTest{color} (owner: Christo)
>  # {color:#00875a}MaterializedInternalTest{color} (owner: Christo)
>  # {color:#00875a}TransformerSupplierAdapterTest{color} (owner: Christo)
>  # {color:#00875a}KTableSuppressProcessorMetricsTest{color} (owner: Christo)
>  # {color:#00875a}ClientUtilsTest{color} (owner: Christo)
>  # {color:#00875a}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: 
> Christo)
>  # {color:#00875a}TopologyTest{color} (owner: Christo)
>  # {color:#00875a}KTableSuppressProcessorTest{color} (owner: Christo)
>  # {color:#00875a}ChangeLoggingSessionBytesStoreTest{color} (owner: Christo)
>  # {color:#00875a}ChangeLoggingTimestampedWindowBytesStoreTest{color} (owner: 
> Christo)
>  # {color:#00875a}ChangeLoggingWindowBytesStoreTest{color} (owner: Christo)
>  # {color:#00875a}MeteredTimestampedWindowStoreTest{color} (owner: Christo)
>  # {color:#00875a}StreamsRebalanceListenerTest{color} (owner: Christo)
>  # {color:#00875a}TimestampedKeyValueStoreMaterializerTest{color} (owner: 
> Christo)
>  # {color:#00875a}CachingInMemoryKeyValueStoreTest{color} (owner: Christo)
>  # {color:#00875a}CachingInMemorySessionStoreTest{color} (owner: Christo)
>  # {color:#00875a}CachingPersistentSessionStoreTest{color} (owner: Christo)
>  # {color:#00875a}CachingPersistentWindowStoreTest{color} (owner: Christo)
>  # {color:#00875a}ChangeLoggingKeyValueBytesStoreTest{color} (owner: Christo)
>  # {color:#00875a}ChangeLoggingTimestampedKeyValueBytesStoreTest{color} 
> (owner: Christo)
>  # {color:#00875a}CompositeReadOnlyWindowStoreTest{color} (owner: Christo)
>  # {color:#00875a}KeyValueStoreBuilderTest{color} (owner: Christo)
>  # {color:#00875a}RocksDBStoreTest{color} (owner: Christo)
>  # {color:#00875a}StreamThreadStateStoreProviderTest{color} (owner: Christo)
>  # {color:#ff8b00}TaskManagerTest{color} (owner: Christo)
>  # {color:#00875a}InternalTopicManagerTest{color} (owner: Christo)
>  # {color:#00875a}ProcessorContextImplTest{color} (owner: Christo)
>  # {color:#00875a}WriteConsistencyVectorTest{color} (owner: Christo)
>  # {color:#00875a}StreamsAssignmentScaleTest{color} (owner: Christo)
>  # {color:#00875a}StreamsPartitionAssignorTest{color} (owner: Christo)
>  # {color:#00875a}AssignmentTestUtils{color} (owner: Christo)
>  # {color:#ff8b00}ProcessorStateManagerTest{color} (owner: Matthew) 
> (takeover: Christo)
>  # {color:#ff8b00}StandbyTaskTest{color} (owner: Matthew)
>  # {color:#ff8b00}StoreChangelogReaderTest{color} (owner: Matthew)
>  # {color:#ff8b00}StreamTaskTest{color} (owner: Matthew)
>  # {color:#ff8b00}StreamThreadTest{color} (owner: Matthew) (takeover: Christo)
>  # {color:#ff8b00}StreamsMetricsImplTest{color} (owner: Dalibor) (Captured in 
> https://issues.apache.org/jira/browse/KAFKA-12947)
>  # {color:#00875a}TimeOrderedCachingPersistentWindowStoreTest{color} (owner: 
> [~shekharrajak])
>  # {color:#00875a}TimeOrderedWindowStoreTest{color} (owner: [~shekharrajak]) 
> [https://github.com/apache/kafka/pull/12777] 
>  # {color:#ff8b00}AbstractStreamTest{color} (owner: Christo)
>  # {color:#ff8b00}KStreamTransformValuesTest{color} (owner: Christo)
>  # {color:#ff8b00}KTableImplTest{color} (owner: Chr

[GitHub] [kafka] olalamichelle opened a new pull request, #14259: [KAFKA-14133] Migrate EasyMock to Mockito in MeteredTimestampedKeyValueStoreTest

2023-08-19 Thread via GitHub


olalamichelle opened a new pull request, #14259:
URL: https://github.com/apache/kafka/pull/14259

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] olalamichelle opened a new pull request, #14258: [KAFKA-14133] Migrate EasyMock to Mockito in MeteredSessionStoreTest

2023-08-19 Thread via GitHub


olalamichelle opened a new pull request, #14258:
URL: https://github.com/apache/kafka/pull/14258

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] olalamichelle opened a new pull request, #14257: [KAFKA-14133] Migrate EasyMock to Mockito in MeteredKeyValueStoreTest

2023-08-19 Thread via GitHub


olalamichelle opened a new pull request, #14257:
URL: https://github.com/apache/kafka/pull/14257

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] olalamichelle opened a new pull request, #14256: [KAFKA-14133] Migrate EasyMock to Mockito in GlobalStateStoreProviderTest, KeyValue…

2023-08-19 Thread via GitHub


olalamichelle opened a new pull request, #14256:
URL: https://github.com/apache/kafka/pull/14256

   …IteratorFacadeTest, KeyValueSegmentTest, TopologyMetadataTest
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] olalamichelle opened a new pull request, #14255: [KAFKA-14133] Migrate EasyMock to Mockito in StateRestoreCallbackAdapterTest, Store…

2023-08-19 Thread via GitHub


olalamichelle opened a new pull request, #14255:
URL: https://github.com/apache/kafka/pull/14255

   …ToProcessorContextAdapterTest, StreamsProducerTest
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] olalamichelle opened a new pull request, #14254: [KAFKA-14133] Migrate EasyMock to Mockito in RecordCollectorTest

2023-08-19 Thread via GitHub


olalamichelle opened a new pull request, #14254:
URL: https://github.com/apache/kafka/pull/14254

   As titled.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15219) Support delegation tokens in KRaft

2023-08-19 Thread Ron Dagostino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Dagostino resolved KAFKA-15219.
---
Fix Version/s: 3.6.0
   Resolution: Fixed

> Support delegation tokens in KRaft
> --
>
> Key: KAFKA-15219
> URL: https://issues.apache.org/jira/browse/KAFKA-15219
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.6.0
>Reporter: Viktor Somogyi-Vass
>Assignee: Proven Provenzano
>Priority: Critical
> Fix For: 3.6.0
>
>
> Delegation tokens have been created in KIP-48 and improved in KIP-373. KRaft 
> enabled the way to supporting them in KIP-900 by adding SCRAM support but 
> delegation tokens still don't support KRaft.
> There are multiple issues:
> - TokenManager still would try to create tokens in Zookeeper. Instead of this 
> we should forward admin requests to the controller that would store them in 
> the metadata similarly to SCRAM. We probably won't need new protocols just 
> enveloping similarly to other existing controller requests.
> - TokenManager should run on Controller nodes only (or in mixed mode).
> - Integration tests will need to be adapted as well and parameterize them 
> with Zookeeper/KRaft.
> - Documentation needs to be improved to factor in KRaft.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] rondagostino commented on pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-19 Thread via GitHub


rondagostino commented on PR #14083:
URL: https://github.com/apache/kafka/pull/14083#issuecomment-1685077921

   Merged to 3.6


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino merged pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-19 Thread via GitHub


rondagostino merged PR #14083:
URL: https://github.com/apache/kafka/pull/14083


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna merged pull request #14249: KAFKA-10199: Change to RUNNING if no pending task to init exist

2023-08-19 Thread via GitHub


cadonna merged PR #14249:
URL: https://github.com/apache/kafka/pull/14249


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14133) Remaining EasyMock to Mockito tests

2023-08-19 Thread Fei Xie (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17756336#comment-17756336
 ] 

Fei Xie commented on KAFKA-14133:
-

[~christo_lolov]  I have created 4 tickets and associated PRs for the following 
tests:
 # AbstractStreamTest 
https://issues.apache.org/jira/browse/KAFKA-15385?filter=-2 
 # KStreamTransformValuesTest 
https://issues.apache.org/jira/browse/KAFKA-15382?filter=-2 
 # KTableImplTest https://issues.apache.org/jira/browse/KAFKA-15383?filter=-2 
 # KTableTransformValuesTest 
https://issues.apache.org/jira/browse/KAFKA-15384?filter=-2

Could you please review the PRs? Thank you!

> Remaining EasyMock to Mockito tests
> ---
>
> Key: KAFKA-14133
> URL: https://issues.apache.org/jira/browse/KAFKA-14133
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
>
> {color:#de350b}There are tests which use both PowerMock and EasyMock. I have 
> put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here 
> rely solely on EasyMock.{color}
> Unless stated in brackets the tests are in the streams module.
> A list of tests which still require to be moved from EasyMock to Mockito as 
> of 2nd of August 2022 which do not have a Jira issue and do not have pull 
> requests I am aware of which are opened:
> {color:#ff8b00}In Review{color}
> {color:#00875a}Merged{color}
>  # {color:#00875a}WorkerConnectorTest{color} (connect) (owner: [~yash.mayya] )
>  # {color:#00875a}WorkerCoordinatorTest{color} (connect) (owner: 
> [~yash.mayya] )
>  # {color:#00875a}RootResourceTest{color} (connect) (owner: [~yash.mayya] )
>  # {color:#00875a}ByteArrayProducerRecordEquals{color} (connect) (owner: 
> [~yash.mayya] )
>  # {color:#00875a}KStreamFlatTransformTest{color} (owner: Christo)
>  # {color:#00875a}KStreamFlatTransformValuesTest{color} (owner: Christo)
>  # {color:#00875a}KStreamPrintTest{color} (owner: Christo)
>  # {color:#00875a}KStreamRepartitionTest{color} (owner: Christo)
>  # {color:#00875a}MaterializedInternalTest{color} (owner: Christo)
>  # {color:#00875a}TransformerSupplierAdapterTest{color} (owner: Christo)
>  # {color:#00875a}KTableSuppressProcessorMetricsTest{color} (owner: Christo)
>  # {color:#00875a}ClientUtilsTest{color} (owner: Christo)
>  # {color:#00875a}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: 
> Christo)
>  # {color:#00875a}TopologyTest{color} (owner: Christo)
>  # {color:#00875a}KTableSuppressProcessorTest{color} (owner: Christo)
>  # {color:#00875a}ChangeLoggingSessionBytesStoreTest{color} (owner: Christo)
>  # {color:#00875a}ChangeLoggingTimestampedWindowBytesStoreTest{color} (owner: 
> Christo)
>  # {color:#00875a}ChangeLoggingWindowBytesStoreTest{color} (owner: Christo)
>  # {color:#00875a}MeteredTimestampedWindowStoreTest{color} (owner: Christo)
>  # {color:#00875a}StreamsRebalanceListenerTest{color} (owner: Christo)
>  # {color:#00875a}TimestampedKeyValueStoreMaterializerTest{color} (owner: 
> Christo)
>  # {color:#00875a}CachingInMemoryKeyValueStoreTest{color} (owner: Christo)
>  # {color:#00875a}CachingInMemorySessionStoreTest{color} (owner: Christo)
>  # {color:#00875a}CachingPersistentSessionStoreTest{color} (owner: Christo)
>  # {color:#00875a}CachingPersistentWindowStoreTest{color} (owner: Christo)
>  # {color:#00875a}ChangeLoggingKeyValueBytesStoreTest{color} (owner: Christo)
>  # {color:#00875a}ChangeLoggingTimestampedKeyValueBytesStoreTest{color} 
> (owner: Christo)
>  # {color:#00875a}CompositeReadOnlyWindowStoreTest{color} (owner: Christo)
>  # {color:#00875a}KeyValueStoreBuilderTest{color} (owner: Christo)
>  # {color:#00875a}RocksDBStoreTest{color} (owner: Christo)
>  # {color:#00875a}StreamThreadStateStoreProviderTest{color} (owner: Christo)
>  # {color:#ff8b00}TaskManagerTest{color} (owner: Christo)
>  # {color:#00875a}InternalTopicManagerTest{color} (owner: Christo)
>  # {color:#00875a}ProcessorContextImplTest{color} (owner: Christo)
>  # {color:#00875a}WriteConsistencyVectorTest{color} (owner: Christo)
>  # {color:#00875a}StreamsAssignmentScaleTest{color} (owner: Christo)
>  # {color:#00875a}StreamsPartitionAssignorTest{color} (owner: Christo)
>  # {color:#00875a}AssignmentTestUtils{color} (owner: Christo)
>  # {color:#ff8b00}ProcessorStateManagerTest{color} (owner: Matthew) 
> (takeover: Christo)
>  # {color:#ff8b00}StandbyTaskTest{color} (owner: Matthew)
>  # {color:#ff8b00}StoreChangelogReaderTest{color} (owner: Matthew)
>  # {color:#ff8b00}StreamTaskTest{color} (owner: Matthew)
>  # {color:#ff8b00}StreamThreadTest{color} (owner: Matthew) (takeover: Christo)
>  # {color:#ff8b00}StreamsMetricsImplTest{color} (owner: Dalibor) (Captured in 
> https://issues.apache.org/jira/browse/KAFKA-12947)
>  # {color:#00875a}TimeOrderedCachingPersistentWindowSto

[GitHub] [kafka] cadonna commented on pull request #14249: KAFKA-10199: Change to RUNNING if no pending task to init exist

2023-08-19 Thread via GitHub


cadonna commented on PR #14249:
URL: https://github.com/apache/kafka/pull/14249#issuecomment-1685047002

   I got the approval for this PR in https://github.com/apache/kafka/pull/14214.
   I opened a new one because all build runs on 
https://github.com/apache/kafka/pull/14214 run into a timeout.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-19 Thread via GitHub


junrao commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1299221576


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -696,11 +704,327 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+if (isLeader()) {
+logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+}
+
+class RemoteLogRetentionHandler {
+
+private final Optional retentionSizeData;
+private final Optional retentionTimeData;
+
+private long remainingBreachedSize;
+
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(Optional 
retentionSizeData, Optional retentionTimeData) {
+this.retentionSizeData = retentionSizeData;
+this.retentionTimeData = retentionTimeData;
+remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+if (!retentionSizeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size >= 0
+if (remainingBreachedSize > 0) {
+long remainingBytes = remainingBreachedSize - 
x.segmentSizeInBytes();
+if (remainingBytes >= 0) {
+remainingBreachedSize = remainingBytes;
+return true;
+}
+}
+
+return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention size {} breach. Log size after deletion will be {}.",
+metadata.remoteLogSegmentId(), 
retentionSizeData.get().retentionSize, remainingBreachedSize + 
retentionSizeData.get().retentionSize);
+}
+return isSegmentDeleted;
+}
+
+public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+if (!retentionTimeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+x -> x.maxTimestampMs() <= 
retentionTimeData.get().cleanupUntilMs);
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+// It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+// are ascending with in an epoch.
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention time {}ms breach based on the largest record timestamp in the 
segment",
+metadata.remoteLogSegmentId(), 
retentionTimeData.get().retentionMs);
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+if (isSegmentDeleted && retentionSizeData.isPresent()) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+}
+
+return isSegmentDeleted;
+}
+
+// It removes the segments beyond the current leader's earliest 
epoch. Those segments are considered as
+// unreferenced because they are not part of the current leader 
epoch lineage.
+private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageExcepti

[GitHub] [kafka] olalamichelle opened a new pull request, #14253: [KAFKA-15382] Migrate EasyMock to Mockito in KStreamTransformValuesTest

2023-08-19 Thread via GitHub


olalamichelle opened a new pull request, #14253:
URL: https://github.com/apache/kafka/pull/14253

   As titled.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] olalamichelle opened a new pull request, #14252: [KAFKA-15383] Migrate EasyMock to Mockito in KTableImplTest

2023-08-19 Thread via GitHub


olalamichelle opened a new pull request, #14252:
URL: https://github.com/apache/kafka/pull/14252

   As titled.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] olalamichelle opened a new pull request, #14251: [KAFKA-15384] Migrate EasyMock to Mockito in KTableTransformValuesTest

2023-08-19 Thread via GitHub


olalamichelle opened a new pull request, #14251:
URL: https://github.com/apache/kafka/pull/14251

   As titled.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] olalamichelle opened a new pull request, #14250: Migrate EasyMock to Mockito in AbstractStreamTest.java

2023-08-19 Thread via GitHub


olalamichelle opened a new pull request, #14250:
URL: https://github.com/apache/kafka/pull/14250

   As titled.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15385) Replace EasyMock with Mockito for AbstractStreamTest

2023-08-19 Thread Fei Xie (Jira)
Fei Xie created KAFKA-15385:
---

 Summary: Replace EasyMock with Mockito for AbstractStreamTest
 Key: KAFKA-15385
 URL: https://issues.apache.org/jira/browse/KAFKA-15385
 Project: Kafka
  Issue Type: Sub-task
Reporter: Fei Xie






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15384) Replace EasyMock with Mockito for KTableTransformValuesTest

2023-08-19 Thread Fei Xie (Jira)
Fei Xie created KAFKA-15384:
---

 Summary: Replace EasyMock with Mockito for 
KTableTransformValuesTest
 Key: KAFKA-15384
 URL: https://issues.apache.org/jira/browse/KAFKA-15384
 Project: Kafka
  Issue Type: Sub-task
Reporter: Fei Xie






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15383) Replace EasyMock with Mockito for KTableImplTest

2023-08-19 Thread Fei Xie (Jira)
Fei Xie created KAFKA-15383:
---

 Summary: Replace EasyMock with Mockito for KTableImplTest
 Key: KAFKA-15383
 URL: https://issues.apache.org/jira/browse/KAFKA-15383
 Project: Kafka
  Issue Type: Sub-task
Reporter: Fei Xie






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15382) Replace EasyMock with Mockito for KStreamTransformValuesTest

2023-08-19 Thread Fei Xie (Jira)
Fei Xie created KAFKA-15382:
---

 Summary: Replace EasyMock with Mockito for 
KStreamTransformValuesTest
 Key: KAFKA-15382
 URL: https://issues.apache.org/jira/browse/KAFKA-15382
 Project: Kafka
  Issue Type: Sub-task
Reporter: Fei Xie






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] olalamichelle commented on a diff in pull request #14078: KAFKA-14780: Fix flaky test 'testSecondaryRefreshAfterElapsedDelay'

2023-08-19 Thread via GitHub


olalamichelle commented on code in PR #14078:
URL: https://github.com/apache/kafka/pull/14078#discussion_r1299213816


##
clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksTest.java:
##
@@ -195,4 +231,68 @@ public String getBody() {
 return Mockito.spy(httpsJwks);
 }
 
+/**
+ * A mock ScheduledExecutorService just for the test. Note that this is 
not a generally reusable mock as it does not
+ * implement some interfaces like scheduleWithFixedDelay, etc. And it does 
not return ScheduledFuture correctly.
+ */
+private class MockExecutorService implements MockTime.Listener {

Review Comment:
   Yes it is very similar but I have to create my own mock for 2 reasons:
   1. MockScheduler.schedule method does not take a period parameter which 
schedules a periodical task. I can add this function to MockScheduler but then 
I will also implement the corresponding addWaiter which is pretty much just put 
all the MockExecutorService code into MockScheduler. I don't think it is a good 
idea since it will make MockScheduler very cumbersome. Moreover, if I do that 
its schedule interface cannot take a ExecutorService parameter and it needs to 
take care of the execution itself inside. See below for more details.
   2. MockScheduler is really just a scheduler that does not take care of the 
execution. It just submits tasks to the executor service. The flakiness of the 
test comes from the real clock based executor service where, because of CPU 
scheduling, cannot be 100% accurate on timings. We need a callback-based one in 
the test to make sure it is reliable. It is just the logic (since there 
principles are the same) behind a mock scheduler and a mock executor service is 
the same that we have to use MockTime and be callback-based so they are very 
similar.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] olalamichelle commented on a diff in pull request #14078: KAFKA-14780: Fix flaky test 'testSecondaryRefreshAfterElapsedDelay'

2023-08-19 Thread via GitHub


olalamichelle commented on code in PR #14078:
URL: https://github.com/apache/kafka/pull/14078#discussion_r1299213816


##
clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksTest.java:
##
@@ -195,4 +231,68 @@ public String getBody() {
 return Mockito.spy(httpsJwks);
 }
 
+/**
+ * A mock ScheduledExecutorService just for the test. Note that this is 
not a generally reusable mock as it does not
+ * implement some interfaces like scheduleWithFixedDelay, etc. And it does 
not return ScheduledFuture correctly.
+ */
+private class MockExecutorService implements MockTime.Listener {

Review Comment:
   Yes it is very similar but I have to create my own mock for 2 reasons:
   1. MockScheduler.schedule method does not take a period parameter which 
schedules a periodical task. I can add this function to MockScheduler but then 
I will also implement the corresponding addWaiter which is pretty much just put 
all the MockExecutorService code into MockScheduler. I don't think it is a good 
idea since it will make MockScheduler very cumbersome. Moreover, if I do that 
its schedule interface cannot take a ExecutorService parameter and it needs to 
take care of the execution itself inside. See my second point for more details.
   2. MockScheduler is really just a scheduler that does not take care of the 
execution. It just submits tasks to the executor service. The flakiness of the 
test comes from the real clock based executor service where, because of CPU 
scheduling, cannot be 100% accurate on timings. We need a callback-based one in 
the test to make sure it is reliable. It is just the logic (since there 
principles are the same) behind a mock scheduler and a mock executor service is 
the same that we have to use MockTime and be callback-based so they are very 
similar.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] olalamichelle commented on a diff in pull request #14078: KAFKA-14780: Fix flaky test 'testSecondaryRefreshAfterElapsedDelay'

2023-08-19 Thread via GitHub


olalamichelle commented on code in PR #14078:
URL: https://github.com/apache/kafka/pull/14078#discussion_r1299213610


##
clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksTest.java:
##
@@ -195,4 +231,68 @@ public String getBody() {
 return Mockito.spy(httpsJwks);
 }
 
+/**
+ * A mock ScheduledExecutorService just for the test. Note that this is 
not a generally reusable mock as it does not
+ * implement some interfaces like scheduleWithFixedDelay, etc. And it does 
not return ScheduledFuture correctly.
+ */
+private class MockExecutorService implements MockTime.Listener {
+private final MockTime time;
+
+private final TreeMap>>> waiters = new TreeMap<>();
+
+public MockExecutorService(MockTime time) {
+this.time = time;
+time.addListener(this);
+}
+
+@Override
+public synchronized void onTimeUpdated() {

Review Comment:
   This is a test-only class so I think it should be fine? We should make sure 
scheduled thing does not have exceptions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] olalamichelle commented on a diff in pull request #14078: KAFKA-14780: Fix flaky test 'testSecondaryRefreshAfterElapsedDelay'

2023-08-19 Thread via GitHub


olalamichelle commented on code in PR #14078:
URL: https://github.com/apache/kafka/pull/14078#discussion_r1299213816


##
clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksTest.java:
##
@@ -195,4 +231,68 @@ public String getBody() {
 return Mockito.spy(httpsJwks);
 }
 
+/**
+ * A mock ScheduledExecutorService just for the test. Note that this is 
not a generally reusable mock as it does not
+ * implement some interfaces like scheduleWithFixedDelay, etc. And it does 
not return ScheduledFuture correctly.
+ */
+private class MockExecutorService implements MockTime.Listener {

Review Comment:
   Yes it is very similar but I have to create my own mock for 2 reasons:
   1. MockScheduler.schedule method does not take a period parameter which 
schedules a periodical task. I can add this function to MockScheduler but then 
I will also implement the corresponding addWaiter which is pretty much just put 
all the MockExecutorService code into MockScheduler. I don't think it is a good 
idea since it will make MockScheduler very cumbersome. Moreover, if I do that 
its schedule interface cannot take a ExecutorService parameter and it needs to 
take care of the execution itself inside. See #2 for more details.
   2. MockScheduler is really just a scheduler that does not take care of the 
execution. It just submits tasks to the executor service. The flakiness of the 
test comes from the real clock based executor service where, because of CPU 
scheduling, cannot be 100% accurate on timings. We need a callback-based one in 
the test to make sure it is reliable. It is just the logic (since there 
principles are the same) behind a mock scheduler and a mock executor service is 
the same that we have to use MockTime and be callback-based so they are very 
similar.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] olalamichelle commented on a diff in pull request #14078: KAFKA-14780: Fix flaky test 'testSecondaryRefreshAfterElapsedDelay'

2023-08-19 Thread via GitHub


olalamichelle commented on code in PR #14078:
URL: https://github.com/apache/kafka/pull/14078#discussion_r1299213641


##
clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksTest.java:
##
@@ -195,4 +231,68 @@ public String getBody() {
 return Mockito.spy(httpsJwks);
 }
 
+/**
+ * A mock ScheduledExecutorService just for the test. Note that this is 
not a generally reusable mock as it does not
+ * implement some interfaces like scheduleWithFixedDelay, etc. And it does 
not return ScheduledFuture correctly.
+ */
+private class MockExecutorService implements MockTime.Listener {
+private final MockTime time;
+
+private final TreeMap>>> waiters = new TreeMap<>();
+
+public MockExecutorService(MockTime time) {
+this.time = time;
+time.addListener(this);
+}
+
+@Override
+public synchronized void onTimeUpdated() {
+long timeMs = time.milliseconds();
+while (true) {
+Map.Entry>>> entry = waiters.firstEntry();
+if ((entry == null) || (entry.getKey() > timeMs)) {
+break;
+}
+for (AbstractMap.SimpleEntry> pair 
: entry.getValue()) {
+pair.getValue().complete(timeMs);
+if (pair.getKey() != null) {
+addWaiter(entry.getKey() + pair.getKey(), 
pair.getKey(), pair.getValue());
+}
+}
+waiters.remove(entry.getKey());
+}
+}
+
+private synchronized void addWaiter(long delayMs, Long period, 
KafkaFutureImpl waiter) {
+long timeMs = time.milliseconds();
+if (delayMs <= 0) {
+waiter.complete(timeMs);
+} else {
+long triggerTimeMs = timeMs + delayMs;
+List>> 
futures =
+waiters.computeIfAbsent(triggerTimeMs, k -> new 
ArrayList<>());
+futures.add(new AbstractMap.SimpleEntry<>(period, waiter));
+}
+}
+
+/**
+ * Internal utility function for periodic or one time refreshes.
+ *
+ * @param period null indicates one time refresh, otherwise it is 
periodic.
+ */
+public  ScheduledFuture schedule(final Callable callable, 
long delayMs, Long period) {
+
+KafkaFutureImpl waiter = new KafkaFutureImpl<>();
+waiter.thenApply((KafkaFuture.BaseFunction) now -> {
+try {
+callable.call();
+} catch (Throwable ignored) {
+}

Review Comment:
   This is a test-only class so I think it should be fine? We should make sure 
scheduled thing does not have exceptions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] olalamichelle commented on a diff in pull request #14078: KAFKA-14780: Fix flaky test 'testSecondaryRefreshAfterElapsedDelay'

2023-08-19 Thread via GitHub


olalamichelle commented on code in PR #14078:
URL: https://github.com/apache/kafka/pull/14078#discussion_r1299213641


##
clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksTest.java:
##
@@ -195,4 +231,68 @@ public String getBody() {
 return Mockito.spy(httpsJwks);
 }
 
+/**
+ * A mock ScheduledExecutorService just for the test. Note that this is 
not a generally reusable mock as it does not
+ * implement some interfaces like scheduleWithFixedDelay, etc. And it does 
not return ScheduledFuture correctly.
+ */
+private class MockExecutorService implements MockTime.Listener {
+private final MockTime time;
+
+private final TreeMap>>> waiters = new TreeMap<>();
+
+public MockExecutorService(MockTime time) {
+this.time = time;
+time.addListener(this);
+}
+
+@Override
+public synchronized void onTimeUpdated() {
+long timeMs = time.milliseconds();
+while (true) {
+Map.Entry>>> entry = waiters.firstEntry();
+if ((entry == null) || (entry.getKey() > timeMs)) {
+break;
+}
+for (AbstractMap.SimpleEntry> pair 
: entry.getValue()) {
+pair.getValue().complete(timeMs);
+if (pair.getKey() != null) {
+addWaiter(entry.getKey() + pair.getKey(), 
pair.getKey(), pair.getValue());
+}
+}
+waiters.remove(entry.getKey());
+}
+}
+
+private synchronized void addWaiter(long delayMs, Long period, 
KafkaFutureImpl waiter) {
+long timeMs = time.milliseconds();
+if (delayMs <= 0) {
+waiter.complete(timeMs);
+} else {
+long triggerTimeMs = timeMs + delayMs;
+List>> 
futures =
+waiters.computeIfAbsent(triggerTimeMs, k -> new 
ArrayList<>());
+futures.add(new AbstractMap.SimpleEntry<>(period, waiter));
+}
+}
+
+/**
+ * Internal utility function for periodic or one time refreshes.
+ *
+ * @param period null indicates one time refresh, otherwise it is 
periodic.
+ */
+public  ScheduledFuture schedule(final Callable callable, 
long delayMs, Long period) {
+
+KafkaFutureImpl waiter = new KafkaFutureImpl<>();
+waiter.thenApply((KafkaFuture.BaseFunction) now -> {
+try {
+callable.call();
+} catch (Throwable ignored) {
+}

Review Comment:
   Yes it is very similar but I have to create my own mock for 2 reasons:
   1. MockScheduler.schedule method does not take a period parameter which 
schedules a periodical task. I can add this function to MockScheduler but then 
I will also implement the corresponding addWaiter which is pretty much just put 
all the MockExecutorService code into MockScheduler. I don't think it is a good 
idea since it will make MockScheduler very cumbersome. Moreover, if I do that 
its schedule interface cannot take a ExecutorService parameter and it needs to 
take care of the execution itself inside. See #2 for more details.
   2. MockScheduler is really just a scheduler that does not take care of the 
execution. It just submits tasks to the executor service. The flakiness of the 
test comes from the real clock-based executor service where, because of CPU 
scheduling, cannot be 100% accurate on timings. We need a callback-based one in 
the test to make sure it is reliable. It is just the logic (since their 
principles are the same) behind a mock scheduler and a mock executor service is 
the same that we have to use MockTime and be callback-based so they are very 
similar.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] olalamichelle commented on a diff in pull request #14078: KAFKA-14780: Fix flaky test 'testSecondaryRefreshAfterElapsedDelay'

2023-08-19 Thread via GitHub


olalamichelle commented on code in PR #14078:
URL: https://github.com/apache/kafka/pull/14078#discussion_r1299213610


##
clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksTest.java:
##
@@ -195,4 +231,68 @@ public String getBody() {
 return Mockito.spy(httpsJwks);
 }
 
+/**
+ * A mock ScheduledExecutorService just for the test. Note that this is 
not a generally reusable mock as it does not
+ * implement some interfaces like scheduleWithFixedDelay, etc. And it does 
not return ScheduledFuture correctly.
+ */
+private class MockExecutorService implements MockTime.Listener {
+private final MockTime time;
+
+private final TreeMap>>> waiters = new TreeMap<>();
+
+public MockExecutorService(MockTime time) {
+this.time = time;
+time.addListener(this);
+}
+
+@Override
+public synchronized void onTimeUpdated() {

Review Comment:
   This is a test-only class so I think it should be fine? We should make sure 
scheduled thing does not have exceptions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] olalamichelle commented on a diff in pull request #14078: KAFKA-14780: Fix flaky test 'testSecondaryRefreshAfterElapsedDelay'

2023-08-19 Thread via GitHub


olalamichelle commented on code in PR #14078:
URL: https://github.com/apache/kafka/pull/14078#discussion_r1299213580


##
clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksTest.java:
##
@@ -195,4 +231,68 @@ public String getBody() {
 return Mockito.spy(httpsJwks);
 }
 
+/**
+ * A mock ScheduledExecutorService just for the test. Note that this is 
not a generally reusable mock as it does not
+ * implement some interfaces like scheduleWithFixedDelay, etc. And it does 
not return ScheduledFuture correctly.
+ */
+private class MockExecutorService implements MockTime.Listener {
+private final MockTime time;
+
+private final TreeMap>>> waiters = new TreeMap<>();
+
+public MockExecutorService(MockTime time) {
+this.time = time;
+time.addListener(this);
+}
+
+@Override
+public synchronized void onTimeUpdated() {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-19 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1299185257


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -698,11 +707,329 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+if (isLeader()) {
+logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+}
+
+class RemoteLogRetentionHandler {
+
+private final Optional retentionSizeData;
+private final Optional retentionTimeData;
+
+private long remainingBreachedSize;
+
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(Optional 
retentionSizeData, Optional retentionTimeData) {
+this.retentionSizeData = retentionSizeData;
+this.retentionTimeData = retentionTimeData;
+remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+if (!retentionSizeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size >= 0
+if (remainingBreachedSize > 0) {
+long remainingBytes = remainingBreachedSize - 
x.segmentSizeInBytes();
+if (remainingBytes >= 0) {
+remainingBreachedSize = remainingBytes;
+return true;
+}
+}
+
+return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention size {} breach. Log size after deletion will be {}.",
+metadata.remoteLogSegmentId(), 
retentionSizeData.get().retentionSize, remainingBreachedSize + 
retentionSizeData.get().retentionSize);
+}
+return isSegmentDeleted;
+}
+
+public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+if (!retentionTimeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+x -> x.maxTimestampMs() <= 
retentionTimeData.get().cleanupUntilMs);
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+// It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+// are ascending with in an epoch.
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention time {}ms breach based on the largest record timestamp in the 
segment",
+metadata.remoteLogSegmentId(), 
retentionTimeData.get().retentionMs);
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+if (isSegmentDeleted && retentionSizeData.isPresent()) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+}
+
+return isSegmentDeleted;
+}
+
+// It removes the segments beyond the current leader's earliest 
epoch. Those segments are considered as
+// unreferenced because they are not part of the current leader 
epoch lineage.
+private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageExcept

[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-19 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1299184981


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -696,11 +704,327 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+if (isLeader()) {
+logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+}
+
+class RemoteLogRetentionHandler {
+
+private final Optional retentionSizeData;
+private final Optional retentionTimeData;
+
+private long remainingBreachedSize;
+
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(Optional 
retentionSizeData, Optional retentionTimeData) {
+this.retentionSizeData = retentionSizeData;
+this.retentionTimeData = retentionTimeData;
+remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+if (!retentionSizeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size >= 0
+if (remainingBreachedSize > 0) {
+long remainingBytes = remainingBreachedSize - 
x.segmentSizeInBytes();
+if (remainingBytes >= 0) {
+remainingBreachedSize = remainingBytes;
+return true;
+}
+}
+
+return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention size {} breach. Log size after deletion will be {}.",
+metadata.remoteLogSegmentId(), 
retentionSizeData.get().retentionSize, remainingBreachedSize + 
retentionSizeData.get().retentionSize);
+}
+return isSegmentDeleted;
+}
+
+public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+if (!retentionTimeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+x -> x.maxTimestampMs() <= 
retentionTimeData.get().cleanupUntilMs);
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+// It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+// are ascending with in an epoch.
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention time {}ms breach based on the largest record timestamp in the 
segment",
+metadata.remoteLogSegmentId(), 
retentionTimeData.get().retentionMs);
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+if (isSegmentDeleted && retentionSizeData.isPresent()) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+}
+
+return isSegmentDeleted;
+}
+
+// It removes the segments beyond the current leader's earliest 
epoch. Those segments are considered as
+// unreferenced because they are not part of the current leader 
epoch lineage.
+private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageExcept

[GitHub] [kafka] cadonna commented on a diff in pull request #13927: [DO NOT MERGE] KAFKA-10199: Enable state updater by default

2023-08-19 Thread via GitHub


cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1299182368


##
streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java:
##
@@ -212,10 +213,11 @@ public void shouldRestoreState() throws Exception {
 );
 
 // we need long enough timeout to by-pass the log manager's 
InitialTaskDelayMs, which is hard-coded on server side
+final long waitForPurgeMs = 6;
 TestUtils.waitForCondition(
-new RepartitionTopicVerified(currentSize -> currentSize <= 
PURGE_SEGMENT_BYTES),
-6,
-"Repartition topic " + REPARTITION_TOPIC + " not purged data after 
6 ms."
+new RepartitionTopicVerified(currentSize -> currentSize <= 4L * 
PURGE_SEGMENT_BYTES),

Review Comment:
   Merged https://github.com/apache/kafka/pull/14227



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14429) Move OffsetStorageReader from storage package to source package

2023-08-19 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17756308#comment-17756308
 ] 

Sagar Rao commented on KAFKA-14429:
---

hey [~gharris1727], given Chris's comment above, do you think we can close this 
one ?

> Move OffsetStorageReader from storage package to source package
> ---
>
> Key: KAFKA-14429
> URL: https://issues.apache.org/jira/browse/KAFKA-14429
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Reporter: Greg Harris
>Priority: Minor
>  Labels: needs-kip
>
> The OffsetStorageReader is an interface provided to source connectors. This 
> does not fit with the broader context of the `storage` package, which is 
> focused on sink/source-agnostic converters and serialization/deserialization.
> The current interface should be deprecated and extend from the relocated 
> interface in a different package.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14138) The Exception Throwing Behavior of Transactional Producer is Inconsistent

2023-08-19 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-14138:
-

Assignee: (was: Sagar Rao)

> The Exception Throwing Behavior of Transactional Producer is Inconsistent
> -
>
> Key: KAFKA-14138
> URL: https://issues.apache.org/jira/browse/KAFKA-14138
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Reporter: Guozhang Wang
>Priority: Critical
>
> There's an issue for inconsistent error throwing inside Kafka Producer when 
> transactions are enabled. In short, there are two places where the received 
> error code from the brokers would be eventually thrown to the caller:
> * Recorded on the batch's metadata, via "Sender#failBatch"
> * Recorded on the txn manager, via "txnManager#handleFailedBatch".
> The former would be thrown from 1) the `Future` returned from 
> the `send`; or 2) the `callback` inside `send(record, callback)`. Whereas, 
> the latter would be thrown from `producer.send()` directly in which we call 
> `txnManager.maybeAddPartition -> maybeFailWithError`. However, when thrown 
> from the former, it's not wrapped hence the direct exception (e.g. 
> ClusterAuthorizationException), whereas in the latter it's wrapped as, e.g. 
> KafkaException(ClusterAuthorizationException). And which one would be thrown 
> depend on a race condition since we cannot control by the time the caller 
> thread calls `txnManager.maybeAddPartition`, if the previous produceRequest's 
> error has been sent back or not.
> For example consider the following sequence for idempotent producer:
> 1. caller thread: within future = producer.send(), call 
> recordAccumulator.append
> 2. sender thread: drain the accumulator, send the produceRequest and get the 
> error back.
> 3. caller thread: within future = producer.send(), call 
> txnManager.maybeAddPartition, in which we would check `maybeFailWithError` 
> before `isTransactional`.
> 4. caller thread: future.get()
> In a sequence where then 3) happened before 2), we would only get the raw 
> exception at step 4; in a sequence where 2) happened before 3), then we would 
> throw the exception immediately at 3).
> This inconsistent error throwing is pretty annoying for users since they'd 
> need to handle both cases, but many of them actually do not know this 
> trickiness. We should make the error throwing consistent, e.g. we should 
> consider: 1) which errors would be thrown from callback / future.get, and 
> which would be thrown from the `send` call directly, and these errors should 
> better be non-overlapping, 2) whether we should wrap the raw error or not, we 
> should do so consistently.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cadonna opened a new pull request, #14249: KAFKA-10199: Change to RUNNING if no pending task to init exist

2023-08-19 Thread via GitHub


cadonna opened a new pull request, #14249:
URL: https://github.com/apache/kafka/pull/14249

   A stream thread should only change to RUNNING if there are no active tasks 
in restoration in the state updater and if there are no pending tasks to 
recycle and to init.
   
   Usually all pending tasks to init are added to the state updater in the same 
poll iteration that handles the assignment. However, if during an 
initialization of a task a LockException the task is re-added to the tasks to 
init and initialization is retried in the next poll iteration.
   
   A LockException might occur when a state directory is still locked by 
another thread, when the rebalance just happened.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna closed pull request #14214: KAFKA-10199: Change to RUNNING if no pending task to init exist

2023-08-19 Thread via GitHub


cadonna closed pull request #14214: KAFKA-10199: Change to RUNNING if no 
pending task to init exist
URL: https://github.com/apache/kafka/pull/14214


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kun98-liu opened a new pull request, #14248: MINOR: Add libs.slf4jlog4j to dependencies in project(':core')

2023-08-19 Thread via GitHub


kun98-liu opened a new pull request, #14248:
URL: https://github.com/apache/kafka/pull/14248

   ## Description
   - When I use IntelliJ to debug `kafka.core.main` following the 
[wiki](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=145722808),
 I kept having error messages like these.
   ```
   > Task :core:Kafka.main()
   SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
   SLF4J: Defaulting to no-operation (NOP) logger implementation
   SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
details.
   SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
   SLF4J: Defaulting to no-operation MDCAdapter implementation.
   SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further 
details.
   ```
   - My debug configuration is as:
   ```
   -Xmx1G
   -Xms1G
   -server
   -XX:+UseG1GC
   -XX:MaxGCPauseMillis=20
   -XX:InitiatingHeapOccupancyPercent=35
   -XX:+ExplicitGCInvokesConcurrent
   -Djava.awt.headless=true
   -Dcom.sun.management.jmxremote
   -Dcom.sun.management.jmxremote.ssl=false
   -Dcom.sun.management.jmxremote.authenticate=false
   -Dkafka.logs.dir="/Users/me/logs"
   -Dlog4j.configuration=file:/path/to/kafka/config/log4j.properties
   ```
   
   - Adding `implementation libs.slf4jlog4j` resolved my problem,
   - This change will make life easier for who want to start contributing to 
kafka.
   
   
   ## Test
   - Run locally and test by `quick-start`. Should have no impact on existing 
functions.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna merged pull request #14227: MINOR: Decouple purging committed records from committing

2023-08-19 Thread via GitHub


cadonna merged PR #14227:
URL: https://github.com/apache/kafka/pull/14227


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #14227: MINOR: Decouple purging committed records from committing

2023-08-19 Thread via GitHub


cadonna commented on PR #14227:
URL: https://github.com/apache/kafka/pull/14227#issuecomment-1684911229

   Build failures are unrelated:
   ```
   Build / JDK 20 and Scala 2.13 / 
kafka.log.remote.RemoteIndexCacheTest.testClose()
   Build / JDK 20 and Scala 2.13 / 
kafka.network.ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()
   Build / JDK 20 and Scala 2.13 / 
kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize()
   Build / JDK 20 and Scala 2.13 / kafka.zk.ZkMigrationIntegrationTest.[1] 
Type=ZK, Name=testStartZkBrokerWithAuthorizer, MetadataVersion=3.4-IV0, 
Security=PLAINTEXT
   Build / JDK 20 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()
   Build / JDK 11 and Scala 2.13 / 
kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()
   Build / JDK 17 and Scala 2.13 / 
kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize()
   Build / JDK 17 and Scala 2.13 / kafka.zk.ZkMigrationIntegrationTest.[1] 
Type=ZK, Name=testDualWriteScram, MetadataVersion=3.5-IV2, Security=PLAINTEXT
   Build / JDK 17 and Scala 2.13 / kafka.zk.ZkMigrationIntegrationTest.[1] 
Type=ZK, Name=testStartZkBrokerWithAuthorizer, MetadataVersion=3.4-IV0, 
Security=PLAINTEXT
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerTest.testFenceMultipleBrokers()
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector
   Build / JDK 8 and Scala 2.12 / 
kafka.api.PlaintextConsumerTest.testSubsequentPatternSubscription()
   Build / JDK 8 and Scala 2.12 / 
kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize()
   Build / JDK 8 and Scala 2.12 / kafka.zk.ZkMigrationIntegrationTest.[1] 
Type=ZK, Name=testNewAndChangedTopicsInDualWrite, MetadataVersion=3.4-IV0, 
Security=PLAINTEXT
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerTest.testNewPartitionUpdates()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15302) Stale value returned when using store.all() in punctuation function.

2023-08-19 Thread Jinyong Choi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17756245#comment-17756245
 ] 

Jinyong Choi commented on KAFKA-15302:
--

I got it.

> Stale value returned when using store.all() in punctuation function.
> 
>
> Key: KAFKA-15302
> URL: https://issues.apache.org/jira/browse/KAFKA-15302
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Jinyong Choi
>Priority: Major
>
> When using the store.all() function within the Punctuation function of 
> this.context.schedule, the previous value is returned. In other words, even 
> though the value has been stored from 1 to 2, it doesn't return 2; instead, 
> it returns 1.
> In the provided test code, you can see the output 'BROKEN !!!', and while 
> this doesn't occur 100% of the time, by adding logs, it's evident that during 
> the while loop after all() is called, the cache is flushed. As a result, the 
> named cache holds a null value, causing the return of a value from RocksDB. 
> This is observed as the value after the .get() call is different from the 
> expected value. This is possibly due to the consistent read functionality of 
> RocksDB, although the exact cause is not certain.
> Of course, if you perform {{store.flush()}} before {{all()}} there won't be 
> any errors.
>  
>  * test code (forked from balajirrao and modified for this)
> [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main]
>  
> {code:java}
> private void forwardAll(final long timestamp) {
> //
>     System.err.println("forwardAll Start");    KeyValueIterator Integer> kvList = this.kvStore.all();
>     while (kvList.hasNext()) {
>         KeyValue entry = kvList.next();
>         final Record msg = new Record<>(entry.key, 
> entry.value, context.currentSystemTimeMs());
>         final Integer storeValue = this.kvStore.get(entry.key);        if 
> (entry.value != storeValue) {
>             System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: 
> " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + 
> " but KeyValueIterator value: " + entry.value);
>             throw new RuntimeException("Broken!");
>         }        this.context.forward(msg);
>     }
>     kvList.close();
> }
> {code}
>  * log file (add log in stream source)
>  
> {code:java}
> # console log
> sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1"
> [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20)
> ...
> [info] running Coordinator 1
> appid: 95108c48-7c69-4eeb-adbd-9d091bd84933
> [0] starting instance +1
> forwardAll Start
> [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but 
> KeyValueIterator value: 1
> # log file
> ...
> 01:05:00.382 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on 
> flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401
> 01:05:00.388 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named Cache flush 
> dirtyKeys.size():7873 entries:7873
> 01:05:00.434 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.p.i.ProcessorStateManager -- stream-thread 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  stream-task [0_0] Flushed cache or buffer Counts
> ...
> 01:05:00.587 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.s.i.CachingKeyValueStore --  KeyValueIterator 
> all()
> 01:05:00.588 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.RocksDBStore --  RocksDB KeyValueIterator all
> 01:05:00.590 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.ThreadCache -- stream-thread 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>   MemoryLRUCacheBytesIterator cache all()
> 01:05:00.591 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache --   NamedCache allKeys() 
> size():325771
> 01:05:00.637 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache --   NamedCache keySetIterator() 
> TreeSet size():325771
> ...
> 01:05:07.052 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e