[GitHub] [kafka] satishd commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions
satishd commented on code in PR #14127: URL: https://github.com/apache/kafka/pull/14127#discussion_r1299301497 ## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java: ## @@ -64,302 +65,393 @@ class ConsumerTask implements Runnable, Closeable { private static final Logger log = LoggerFactory.getLogger(ConsumerTask.class); -private static final long POLL_INTERVAL_MS = 100L; +static long pollIntervalMs = 100L; Review Comment: +1 on `pollIntervalMs` should not be a static field here. We can also have this as a config if needed later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions
satishd commented on code in PR #14127: URL: https://github.com/apache/kafka/pull/14127#discussion_r1299301497 ## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java: ## @@ -64,302 +65,393 @@ class ConsumerTask implements Runnable, Closeable { private static final Logger log = LoggerFactory.getLogger(ConsumerTask.class); -private static final long POLL_INTERVAL_MS = 100L; +static long pollIntervalMs = 100L; Review Comment: `pollIntervalMs` should not be a static field here. We can also have this as a config if needed later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14133) Remaining EasyMock to Mockito tests
[ https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17756373#comment-17756373 ] Fei Xie commented on KAFKA-14133: - Sent out a few more PRs. All tests until #21 MeteredTimestampedKeyValueStoreTest (including #21) are taken care of now. > Remaining EasyMock to Mockito tests > --- > > Key: KAFKA-14133 > URL: https://issues.apache.org/jira/browse/KAFKA-14133 > Project: Kafka > Issue Type: Sub-task >Reporter: Christo Lolov >Assignee: Christo Lolov >Priority: Major > > {color:#de350b}There are tests which use both PowerMock and EasyMock. I have > put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here > rely solely on EasyMock.{color} > Unless stated in brackets the tests are in the streams module. > A list of tests which still require to be moved from EasyMock to Mockito as > of 2nd of August 2022 which do not have a Jira issue and do not have pull > requests I am aware of which are opened: > {color:#ff8b00}In Review{color} > {color:#00875a}Merged{color} > # {color:#00875a}WorkerConnectorTest{color} (connect) (owner: [~yash.mayya] ) > # {color:#00875a}WorkerCoordinatorTest{color} (connect) (owner: > [~yash.mayya] ) > # {color:#00875a}RootResourceTest{color} (connect) (owner: [~yash.mayya] ) > # {color:#00875a}ByteArrayProducerRecordEquals{color} (connect) (owner: > [~yash.mayya] ) > # {color:#00875a}KStreamFlatTransformTest{color} (owner: Christo) > # {color:#00875a}KStreamFlatTransformValuesTest{color} (owner: Christo) > # {color:#00875a}KStreamPrintTest{color} (owner: Christo) > # {color:#00875a}KStreamRepartitionTest{color} (owner: Christo) > # {color:#00875a}MaterializedInternalTest{color} (owner: Christo) > # {color:#00875a}TransformerSupplierAdapterTest{color} (owner: Christo) > # {color:#00875a}KTableSuppressProcessorMetricsTest{color} (owner: Christo) > # {color:#00875a}ClientUtilsTest{color} (owner: Christo) > # {color:#00875a}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: > Christo) > # {color:#00875a}TopologyTest{color} (owner: Christo) > # {color:#00875a}KTableSuppressProcessorTest{color} (owner: Christo) > # {color:#00875a}ChangeLoggingSessionBytesStoreTest{color} (owner: Christo) > # {color:#00875a}ChangeLoggingTimestampedWindowBytesStoreTest{color} (owner: > Christo) > # {color:#00875a}ChangeLoggingWindowBytesStoreTest{color} (owner: Christo) > # {color:#00875a}MeteredTimestampedWindowStoreTest{color} (owner: Christo) > # {color:#00875a}StreamsRebalanceListenerTest{color} (owner: Christo) > # {color:#00875a}TimestampedKeyValueStoreMaterializerTest{color} (owner: > Christo) > # {color:#00875a}CachingInMemoryKeyValueStoreTest{color} (owner: Christo) > # {color:#00875a}CachingInMemorySessionStoreTest{color} (owner: Christo) > # {color:#00875a}CachingPersistentSessionStoreTest{color} (owner: Christo) > # {color:#00875a}CachingPersistentWindowStoreTest{color} (owner: Christo) > # {color:#00875a}ChangeLoggingKeyValueBytesStoreTest{color} (owner: Christo) > # {color:#00875a}ChangeLoggingTimestampedKeyValueBytesStoreTest{color} > (owner: Christo) > # {color:#00875a}CompositeReadOnlyWindowStoreTest{color} (owner: Christo) > # {color:#00875a}KeyValueStoreBuilderTest{color} (owner: Christo) > # {color:#00875a}RocksDBStoreTest{color} (owner: Christo) > # {color:#00875a}StreamThreadStateStoreProviderTest{color} (owner: Christo) > # {color:#ff8b00}TaskManagerTest{color} (owner: Christo) > # {color:#00875a}InternalTopicManagerTest{color} (owner: Christo) > # {color:#00875a}ProcessorContextImplTest{color} (owner: Christo) > # {color:#00875a}WriteConsistencyVectorTest{color} (owner: Christo) > # {color:#00875a}StreamsAssignmentScaleTest{color} (owner: Christo) > # {color:#00875a}StreamsPartitionAssignorTest{color} (owner: Christo) > # {color:#00875a}AssignmentTestUtils{color} (owner: Christo) > # {color:#ff8b00}ProcessorStateManagerTest{color} (owner: Matthew) > (takeover: Christo) > # {color:#ff8b00}StandbyTaskTest{color} (owner: Matthew) > # {color:#ff8b00}StoreChangelogReaderTest{color} (owner: Matthew) > # {color:#ff8b00}StreamTaskTest{color} (owner: Matthew) > # {color:#ff8b00}StreamThreadTest{color} (owner: Matthew) (takeover: Christo) > # {color:#ff8b00}StreamsMetricsImplTest{color} (owner: Dalibor) (Captured in > https://issues.apache.org/jira/browse/KAFKA-12947) > # {color:#00875a}TimeOrderedCachingPersistentWindowStoreTest{color} (owner: > [~shekharrajak]) > # {color:#00875a}TimeOrderedWindowStoreTest{color} (owner: [~shekharrajak]) > [https://github.com/apache/kafka/pull/12777] > # {color:#ff8b00}AbstractStreamTest{color} (owner: Christo) > # {color:#ff8b00}KStreamTransformValuesTest{color} (owner: Christo) > # {color:#ff8b00}KTableImplTest{color} (owner: Chr
[GitHub] [kafka] olalamichelle opened a new pull request, #14259: [KAFKA-14133] Migrate EasyMock to Mockito in MeteredTimestampedKeyValueStoreTest
olalamichelle opened a new pull request, #14259: URL: https://github.com/apache/kafka/pull/14259 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] olalamichelle opened a new pull request, #14258: [KAFKA-14133] Migrate EasyMock to Mockito in MeteredSessionStoreTest
olalamichelle opened a new pull request, #14258: URL: https://github.com/apache/kafka/pull/14258 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] olalamichelle opened a new pull request, #14257: [KAFKA-14133] Migrate EasyMock to Mockito in MeteredKeyValueStoreTest
olalamichelle opened a new pull request, #14257: URL: https://github.com/apache/kafka/pull/14257 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] olalamichelle opened a new pull request, #14256: [KAFKA-14133] Migrate EasyMock to Mockito in GlobalStateStoreProviderTest, KeyValue…
olalamichelle opened a new pull request, #14256: URL: https://github.com/apache/kafka/pull/14256 …IteratorFacadeTest, KeyValueSegmentTest, TopologyMetadataTest *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] olalamichelle opened a new pull request, #14255: [KAFKA-14133] Migrate EasyMock to Mockito in StateRestoreCallbackAdapterTest, Store…
olalamichelle opened a new pull request, #14255: URL: https://github.com/apache/kafka/pull/14255 …ToProcessorContextAdapterTest, StreamsProducerTest *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] olalamichelle opened a new pull request, #14254: [KAFKA-14133] Migrate EasyMock to Mockito in RecordCollectorTest
olalamichelle opened a new pull request, #14254: URL: https://github.com/apache/kafka/pull/14254 As titled. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15219) Support delegation tokens in KRaft
[ https://issues.apache.org/jira/browse/KAFKA-15219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ron Dagostino resolved KAFKA-15219. --- Fix Version/s: 3.6.0 Resolution: Fixed > Support delegation tokens in KRaft > -- > > Key: KAFKA-15219 > URL: https://issues.apache.org/jira/browse/KAFKA-15219 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.6.0 >Reporter: Viktor Somogyi-Vass >Assignee: Proven Provenzano >Priority: Critical > Fix For: 3.6.0 > > > Delegation tokens have been created in KIP-48 and improved in KIP-373. KRaft > enabled the way to supporting them in KIP-900 by adding SCRAM support but > delegation tokens still don't support KRaft. > There are multiple issues: > - TokenManager still would try to create tokens in Zookeeper. Instead of this > we should forward admin requests to the controller that would store them in > the metadata similarly to SCRAM. We probably won't need new protocols just > enveloping similarly to other existing controller requests. > - TokenManager should run on Controller nodes only (or in mixed mode). > - Integration tests will need to be adapted as well and parameterize them > with Zookeeper/KRaft. > - Documentation needs to be improved to factor in KRaft. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] rondagostino commented on pull request #14083: KAFKA-15219: KRaft support for DelegationTokens
rondagostino commented on PR #14083: URL: https://github.com/apache/kafka/pull/14083#issuecomment-1685077921 Merged to 3.6 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino merged pull request #14083: KAFKA-15219: KRaft support for DelegationTokens
rondagostino merged PR #14083: URL: https://github.com/apache/kafka/pull/14083 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna merged pull request #14249: KAFKA-10199: Change to RUNNING if no pending task to init exist
cadonna merged PR #14249: URL: https://github.com/apache/kafka/pull/14249 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14133) Remaining EasyMock to Mockito tests
[ https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17756336#comment-17756336 ] Fei Xie commented on KAFKA-14133: - [~christo_lolov] I have created 4 tickets and associated PRs for the following tests: # AbstractStreamTest https://issues.apache.org/jira/browse/KAFKA-15385?filter=-2 # KStreamTransformValuesTest https://issues.apache.org/jira/browse/KAFKA-15382?filter=-2 # KTableImplTest https://issues.apache.org/jira/browse/KAFKA-15383?filter=-2 # KTableTransformValuesTest https://issues.apache.org/jira/browse/KAFKA-15384?filter=-2 Could you please review the PRs? Thank you! > Remaining EasyMock to Mockito tests > --- > > Key: KAFKA-14133 > URL: https://issues.apache.org/jira/browse/KAFKA-14133 > Project: Kafka > Issue Type: Sub-task >Reporter: Christo Lolov >Assignee: Christo Lolov >Priority: Major > > {color:#de350b}There are tests which use both PowerMock and EasyMock. I have > put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here > rely solely on EasyMock.{color} > Unless stated in brackets the tests are in the streams module. > A list of tests which still require to be moved from EasyMock to Mockito as > of 2nd of August 2022 which do not have a Jira issue and do not have pull > requests I am aware of which are opened: > {color:#ff8b00}In Review{color} > {color:#00875a}Merged{color} > # {color:#00875a}WorkerConnectorTest{color} (connect) (owner: [~yash.mayya] ) > # {color:#00875a}WorkerCoordinatorTest{color} (connect) (owner: > [~yash.mayya] ) > # {color:#00875a}RootResourceTest{color} (connect) (owner: [~yash.mayya] ) > # {color:#00875a}ByteArrayProducerRecordEquals{color} (connect) (owner: > [~yash.mayya] ) > # {color:#00875a}KStreamFlatTransformTest{color} (owner: Christo) > # {color:#00875a}KStreamFlatTransformValuesTest{color} (owner: Christo) > # {color:#00875a}KStreamPrintTest{color} (owner: Christo) > # {color:#00875a}KStreamRepartitionTest{color} (owner: Christo) > # {color:#00875a}MaterializedInternalTest{color} (owner: Christo) > # {color:#00875a}TransformerSupplierAdapterTest{color} (owner: Christo) > # {color:#00875a}KTableSuppressProcessorMetricsTest{color} (owner: Christo) > # {color:#00875a}ClientUtilsTest{color} (owner: Christo) > # {color:#00875a}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: > Christo) > # {color:#00875a}TopologyTest{color} (owner: Christo) > # {color:#00875a}KTableSuppressProcessorTest{color} (owner: Christo) > # {color:#00875a}ChangeLoggingSessionBytesStoreTest{color} (owner: Christo) > # {color:#00875a}ChangeLoggingTimestampedWindowBytesStoreTest{color} (owner: > Christo) > # {color:#00875a}ChangeLoggingWindowBytesStoreTest{color} (owner: Christo) > # {color:#00875a}MeteredTimestampedWindowStoreTest{color} (owner: Christo) > # {color:#00875a}StreamsRebalanceListenerTest{color} (owner: Christo) > # {color:#00875a}TimestampedKeyValueStoreMaterializerTest{color} (owner: > Christo) > # {color:#00875a}CachingInMemoryKeyValueStoreTest{color} (owner: Christo) > # {color:#00875a}CachingInMemorySessionStoreTest{color} (owner: Christo) > # {color:#00875a}CachingPersistentSessionStoreTest{color} (owner: Christo) > # {color:#00875a}CachingPersistentWindowStoreTest{color} (owner: Christo) > # {color:#00875a}ChangeLoggingKeyValueBytesStoreTest{color} (owner: Christo) > # {color:#00875a}ChangeLoggingTimestampedKeyValueBytesStoreTest{color} > (owner: Christo) > # {color:#00875a}CompositeReadOnlyWindowStoreTest{color} (owner: Christo) > # {color:#00875a}KeyValueStoreBuilderTest{color} (owner: Christo) > # {color:#00875a}RocksDBStoreTest{color} (owner: Christo) > # {color:#00875a}StreamThreadStateStoreProviderTest{color} (owner: Christo) > # {color:#ff8b00}TaskManagerTest{color} (owner: Christo) > # {color:#00875a}InternalTopicManagerTest{color} (owner: Christo) > # {color:#00875a}ProcessorContextImplTest{color} (owner: Christo) > # {color:#00875a}WriteConsistencyVectorTest{color} (owner: Christo) > # {color:#00875a}StreamsAssignmentScaleTest{color} (owner: Christo) > # {color:#00875a}StreamsPartitionAssignorTest{color} (owner: Christo) > # {color:#00875a}AssignmentTestUtils{color} (owner: Christo) > # {color:#ff8b00}ProcessorStateManagerTest{color} (owner: Matthew) > (takeover: Christo) > # {color:#ff8b00}StandbyTaskTest{color} (owner: Matthew) > # {color:#ff8b00}StoreChangelogReaderTest{color} (owner: Matthew) > # {color:#ff8b00}StreamTaskTest{color} (owner: Matthew) > # {color:#ff8b00}StreamThreadTest{color} (owner: Matthew) (takeover: Christo) > # {color:#ff8b00}StreamsMetricsImplTest{color} (owner: Dalibor) (Captured in > https://issues.apache.org/jira/browse/KAFKA-12947) > # {color:#00875a}TimeOrderedCachingPersistentWindowSto
[GitHub] [kafka] cadonna commented on pull request #14249: KAFKA-10199: Change to RUNNING if no pending task to init exist
cadonna commented on PR #14249: URL: https://github.com/apache/kafka/pull/14249#issuecomment-1685047002 I got the approval for this PR in https://github.com/apache/kafka/pull/14214. I opened a new one because all build runs on https://github.com/apache/kafka/pull/14214 run into a timeout. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
junrao commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1299221576 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -696,11 +704,327 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +if (isLeader()) { +logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} +} + +class RemoteLogRetentionHandler { + +private final Optional retentionSizeData; +private final Optional retentionTimeData; + +private long remainingBreachedSize; + +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(Optional retentionSizeData, Optional retentionTimeData) { +this.retentionSizeData = retentionSizeData; +this.retentionTimeData = retentionTimeData; +remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionSizeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size >= 0 +if (remainingBreachedSize > 0) { +long remainingBytes = remainingBreachedSize - x.segmentSizeInBytes(); +if (remainingBytes >= 0) { +remainingBreachedSize = remainingBytes; +return true; +} +} + +return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", +metadata.remoteLogSegmentId(), retentionSizeData.get().retentionSize, remainingBreachedSize + retentionSizeData.get().retentionSize); +} +return isSegmentDeleted; +} + +public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) +throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionTimeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, +x -> x.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs); +if (isSegmentDeleted) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +// It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals +// are ascending with in an epoch. +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment", +metadata.remoteLogSegmentId(), retentionTimeData.get().retentionMs); +} +return isSegmentDeleted; +} + +private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) +throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); +if (isSegmentDeleted && retentionSizeData.isPresent()) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); +} + +return isSegmentDeleted; +} + +// It removes the segments beyond the current leader's earliest epoch. Those segments are considered as +// unreferenced because they are not part of the current leader epoch lineage. +private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageExcepti
[GitHub] [kafka] olalamichelle opened a new pull request, #14253: [KAFKA-15382] Migrate EasyMock to Mockito in KStreamTransformValuesTest
olalamichelle opened a new pull request, #14253: URL: https://github.com/apache/kafka/pull/14253 As titled. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] olalamichelle opened a new pull request, #14252: [KAFKA-15383] Migrate EasyMock to Mockito in KTableImplTest
olalamichelle opened a new pull request, #14252: URL: https://github.com/apache/kafka/pull/14252 As titled. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] olalamichelle opened a new pull request, #14251: [KAFKA-15384] Migrate EasyMock to Mockito in KTableTransformValuesTest
olalamichelle opened a new pull request, #14251: URL: https://github.com/apache/kafka/pull/14251 As titled. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] olalamichelle opened a new pull request, #14250: Migrate EasyMock to Mockito in AbstractStreamTest.java
olalamichelle opened a new pull request, #14250: URL: https://github.com/apache/kafka/pull/14250 As titled. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15385) Replace EasyMock with Mockito for AbstractStreamTest
Fei Xie created KAFKA-15385: --- Summary: Replace EasyMock with Mockito for AbstractStreamTest Key: KAFKA-15385 URL: https://issues.apache.org/jira/browse/KAFKA-15385 Project: Kafka Issue Type: Sub-task Reporter: Fei Xie -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15384) Replace EasyMock with Mockito for KTableTransformValuesTest
Fei Xie created KAFKA-15384: --- Summary: Replace EasyMock with Mockito for KTableTransformValuesTest Key: KAFKA-15384 URL: https://issues.apache.org/jira/browse/KAFKA-15384 Project: Kafka Issue Type: Sub-task Reporter: Fei Xie -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15383) Replace EasyMock with Mockito for KTableImplTest
Fei Xie created KAFKA-15383: --- Summary: Replace EasyMock with Mockito for KTableImplTest Key: KAFKA-15383 URL: https://issues.apache.org/jira/browse/KAFKA-15383 Project: Kafka Issue Type: Sub-task Reporter: Fei Xie -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15382) Replace EasyMock with Mockito for KStreamTransformValuesTest
Fei Xie created KAFKA-15382: --- Summary: Replace EasyMock with Mockito for KStreamTransformValuesTest Key: KAFKA-15382 URL: https://issues.apache.org/jira/browse/KAFKA-15382 Project: Kafka Issue Type: Sub-task Reporter: Fei Xie -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] olalamichelle commented on a diff in pull request #14078: KAFKA-14780: Fix flaky test 'testSecondaryRefreshAfterElapsedDelay'
olalamichelle commented on code in PR #14078: URL: https://github.com/apache/kafka/pull/14078#discussion_r1299213816 ## clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksTest.java: ## @@ -195,4 +231,68 @@ public String getBody() { return Mockito.spy(httpsJwks); } +/** + * A mock ScheduledExecutorService just for the test. Note that this is not a generally reusable mock as it does not + * implement some interfaces like scheduleWithFixedDelay, etc. And it does not return ScheduledFuture correctly. + */ +private class MockExecutorService implements MockTime.Listener { Review Comment: Yes it is very similar but I have to create my own mock for 2 reasons: 1. MockScheduler.schedule method does not take a period parameter which schedules a periodical task. I can add this function to MockScheduler but then I will also implement the corresponding addWaiter which is pretty much just put all the MockExecutorService code into MockScheduler. I don't think it is a good idea since it will make MockScheduler very cumbersome. Moreover, if I do that its schedule interface cannot take a ExecutorService parameter and it needs to take care of the execution itself inside. See below for more details. 2. MockScheduler is really just a scheduler that does not take care of the execution. It just submits tasks to the executor service. The flakiness of the test comes from the real clock based executor service where, because of CPU scheduling, cannot be 100% accurate on timings. We need a callback-based one in the test to make sure it is reliable. It is just the logic (since there principles are the same) behind a mock scheduler and a mock executor service is the same that we have to use MockTime and be callback-based so they are very similar. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] olalamichelle commented on a diff in pull request #14078: KAFKA-14780: Fix flaky test 'testSecondaryRefreshAfterElapsedDelay'
olalamichelle commented on code in PR #14078: URL: https://github.com/apache/kafka/pull/14078#discussion_r1299213816 ## clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksTest.java: ## @@ -195,4 +231,68 @@ public String getBody() { return Mockito.spy(httpsJwks); } +/** + * A mock ScheduledExecutorService just for the test. Note that this is not a generally reusable mock as it does not + * implement some interfaces like scheduleWithFixedDelay, etc. And it does not return ScheduledFuture correctly. + */ +private class MockExecutorService implements MockTime.Listener { Review Comment: Yes it is very similar but I have to create my own mock for 2 reasons: 1. MockScheduler.schedule method does not take a period parameter which schedules a periodical task. I can add this function to MockScheduler but then I will also implement the corresponding addWaiter which is pretty much just put all the MockExecutorService code into MockScheduler. I don't think it is a good idea since it will make MockScheduler very cumbersome. Moreover, if I do that its schedule interface cannot take a ExecutorService parameter and it needs to take care of the execution itself inside. See my second point for more details. 2. MockScheduler is really just a scheduler that does not take care of the execution. It just submits tasks to the executor service. The flakiness of the test comes from the real clock based executor service where, because of CPU scheduling, cannot be 100% accurate on timings. We need a callback-based one in the test to make sure it is reliable. It is just the logic (since there principles are the same) behind a mock scheduler and a mock executor service is the same that we have to use MockTime and be callback-based so they are very similar. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] olalamichelle commented on a diff in pull request #14078: KAFKA-14780: Fix flaky test 'testSecondaryRefreshAfterElapsedDelay'
olalamichelle commented on code in PR #14078: URL: https://github.com/apache/kafka/pull/14078#discussion_r1299213610 ## clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksTest.java: ## @@ -195,4 +231,68 @@ public String getBody() { return Mockito.spy(httpsJwks); } +/** + * A mock ScheduledExecutorService just for the test. Note that this is not a generally reusable mock as it does not + * implement some interfaces like scheduleWithFixedDelay, etc. And it does not return ScheduledFuture correctly. + */ +private class MockExecutorService implements MockTime.Listener { +private final MockTime time; + +private final TreeMap>>> waiters = new TreeMap<>(); + +public MockExecutorService(MockTime time) { +this.time = time; +time.addListener(this); +} + +@Override +public synchronized void onTimeUpdated() { Review Comment: This is a test-only class so I think it should be fine? We should make sure scheduled thing does not have exceptions? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] olalamichelle commented on a diff in pull request #14078: KAFKA-14780: Fix flaky test 'testSecondaryRefreshAfterElapsedDelay'
olalamichelle commented on code in PR #14078: URL: https://github.com/apache/kafka/pull/14078#discussion_r1299213816 ## clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksTest.java: ## @@ -195,4 +231,68 @@ public String getBody() { return Mockito.spy(httpsJwks); } +/** + * A mock ScheduledExecutorService just for the test. Note that this is not a generally reusable mock as it does not + * implement some interfaces like scheduleWithFixedDelay, etc. And it does not return ScheduledFuture correctly. + */ +private class MockExecutorService implements MockTime.Listener { Review Comment: Yes it is very similar but I have to create my own mock for 2 reasons: 1. MockScheduler.schedule method does not take a period parameter which schedules a periodical task. I can add this function to MockScheduler but then I will also implement the corresponding addWaiter which is pretty much just put all the MockExecutorService code into MockScheduler. I don't think it is a good idea since it will make MockScheduler very cumbersome. Moreover, if I do that its schedule interface cannot take a ExecutorService parameter and it needs to take care of the execution itself inside. See #2 for more details. 2. MockScheduler is really just a scheduler that does not take care of the execution. It just submits tasks to the executor service. The flakiness of the test comes from the real clock based executor service where, because of CPU scheduling, cannot be 100% accurate on timings. We need a callback-based one in the test to make sure it is reliable. It is just the logic (since there principles are the same) behind a mock scheduler and a mock executor service is the same that we have to use MockTime and be callback-based so they are very similar. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] olalamichelle commented on a diff in pull request #14078: KAFKA-14780: Fix flaky test 'testSecondaryRefreshAfterElapsedDelay'
olalamichelle commented on code in PR #14078: URL: https://github.com/apache/kafka/pull/14078#discussion_r1299213641 ## clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksTest.java: ## @@ -195,4 +231,68 @@ public String getBody() { return Mockito.spy(httpsJwks); } +/** + * A mock ScheduledExecutorService just for the test. Note that this is not a generally reusable mock as it does not + * implement some interfaces like scheduleWithFixedDelay, etc. And it does not return ScheduledFuture correctly. + */ +private class MockExecutorService implements MockTime.Listener { +private final MockTime time; + +private final TreeMap>>> waiters = new TreeMap<>(); + +public MockExecutorService(MockTime time) { +this.time = time; +time.addListener(this); +} + +@Override +public synchronized void onTimeUpdated() { +long timeMs = time.milliseconds(); +while (true) { +Map.Entry>>> entry = waiters.firstEntry(); +if ((entry == null) || (entry.getKey() > timeMs)) { +break; +} +for (AbstractMap.SimpleEntry> pair : entry.getValue()) { +pair.getValue().complete(timeMs); +if (pair.getKey() != null) { +addWaiter(entry.getKey() + pair.getKey(), pair.getKey(), pair.getValue()); +} +} +waiters.remove(entry.getKey()); +} +} + +private synchronized void addWaiter(long delayMs, Long period, KafkaFutureImpl waiter) { +long timeMs = time.milliseconds(); +if (delayMs <= 0) { +waiter.complete(timeMs); +} else { +long triggerTimeMs = timeMs + delayMs; +List>> futures = +waiters.computeIfAbsent(triggerTimeMs, k -> new ArrayList<>()); +futures.add(new AbstractMap.SimpleEntry<>(period, waiter)); +} +} + +/** + * Internal utility function for periodic or one time refreshes. + * + * @param period null indicates one time refresh, otherwise it is periodic. + */ +public ScheduledFuture schedule(final Callable callable, long delayMs, Long period) { + +KafkaFutureImpl waiter = new KafkaFutureImpl<>(); +waiter.thenApply((KafkaFuture.BaseFunction) now -> { +try { +callable.call(); +} catch (Throwable ignored) { +} Review Comment: This is a test-only class so I think it should be fine? We should make sure scheduled thing does not have exceptions? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] olalamichelle commented on a diff in pull request #14078: KAFKA-14780: Fix flaky test 'testSecondaryRefreshAfterElapsedDelay'
olalamichelle commented on code in PR #14078: URL: https://github.com/apache/kafka/pull/14078#discussion_r1299213641 ## clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksTest.java: ## @@ -195,4 +231,68 @@ public String getBody() { return Mockito.spy(httpsJwks); } +/** + * A mock ScheduledExecutorService just for the test. Note that this is not a generally reusable mock as it does not + * implement some interfaces like scheduleWithFixedDelay, etc. And it does not return ScheduledFuture correctly. + */ +private class MockExecutorService implements MockTime.Listener { +private final MockTime time; + +private final TreeMap>>> waiters = new TreeMap<>(); + +public MockExecutorService(MockTime time) { +this.time = time; +time.addListener(this); +} + +@Override +public synchronized void onTimeUpdated() { +long timeMs = time.milliseconds(); +while (true) { +Map.Entry>>> entry = waiters.firstEntry(); +if ((entry == null) || (entry.getKey() > timeMs)) { +break; +} +for (AbstractMap.SimpleEntry> pair : entry.getValue()) { +pair.getValue().complete(timeMs); +if (pair.getKey() != null) { +addWaiter(entry.getKey() + pair.getKey(), pair.getKey(), pair.getValue()); +} +} +waiters.remove(entry.getKey()); +} +} + +private synchronized void addWaiter(long delayMs, Long period, KafkaFutureImpl waiter) { +long timeMs = time.milliseconds(); +if (delayMs <= 0) { +waiter.complete(timeMs); +} else { +long triggerTimeMs = timeMs + delayMs; +List>> futures = +waiters.computeIfAbsent(triggerTimeMs, k -> new ArrayList<>()); +futures.add(new AbstractMap.SimpleEntry<>(period, waiter)); +} +} + +/** + * Internal utility function for periodic or one time refreshes. + * + * @param period null indicates one time refresh, otherwise it is periodic. + */ +public ScheduledFuture schedule(final Callable callable, long delayMs, Long period) { + +KafkaFutureImpl waiter = new KafkaFutureImpl<>(); +waiter.thenApply((KafkaFuture.BaseFunction) now -> { +try { +callable.call(); +} catch (Throwable ignored) { +} Review Comment: Yes it is very similar but I have to create my own mock for 2 reasons: 1. MockScheduler.schedule method does not take a period parameter which schedules a periodical task. I can add this function to MockScheduler but then I will also implement the corresponding addWaiter which is pretty much just put all the MockExecutorService code into MockScheduler. I don't think it is a good idea since it will make MockScheduler very cumbersome. Moreover, if I do that its schedule interface cannot take a ExecutorService parameter and it needs to take care of the execution itself inside. See #2 for more details. 2. MockScheduler is really just a scheduler that does not take care of the execution. It just submits tasks to the executor service. The flakiness of the test comes from the real clock-based executor service where, because of CPU scheduling, cannot be 100% accurate on timings. We need a callback-based one in the test to make sure it is reliable. It is just the logic (since their principles are the same) behind a mock scheduler and a mock executor service is the same that we have to use MockTime and be callback-based so they are very similar. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] olalamichelle commented on a diff in pull request #14078: KAFKA-14780: Fix flaky test 'testSecondaryRefreshAfterElapsedDelay'
olalamichelle commented on code in PR #14078: URL: https://github.com/apache/kafka/pull/14078#discussion_r1299213610 ## clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksTest.java: ## @@ -195,4 +231,68 @@ public String getBody() { return Mockito.spy(httpsJwks); } +/** + * A mock ScheduledExecutorService just for the test. Note that this is not a generally reusable mock as it does not + * implement some interfaces like scheduleWithFixedDelay, etc. And it does not return ScheduledFuture correctly. + */ +private class MockExecutorService implements MockTime.Listener { +private final MockTime time; + +private final TreeMap>>> waiters = new TreeMap<>(); + +public MockExecutorService(MockTime time) { +this.time = time; +time.addListener(this); +} + +@Override +public synchronized void onTimeUpdated() { Review Comment: This is a test-only class so I think it should be fine? We should make sure scheduled thing does not have exceptions? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] olalamichelle commented on a diff in pull request #14078: KAFKA-14780: Fix flaky test 'testSecondaryRefreshAfterElapsedDelay'
olalamichelle commented on code in PR #14078: URL: https://github.com/apache/kafka/pull/14078#discussion_r1299213580 ## clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksTest.java: ## @@ -195,4 +231,68 @@ public String getBody() { return Mockito.spy(httpsJwks); } +/** + * A mock ScheduledExecutorService just for the test. Note that this is not a generally reusable mock as it does not + * implement some interfaces like scheduleWithFixedDelay, etc. And it does not return ScheduledFuture correctly. + */ +private class MockExecutorService implements MockTime.Listener { +private final MockTime time; + +private final TreeMap>>> waiters = new TreeMap<>(); + +public MockExecutorService(MockTime time) { +this.time = time; +time.addListener(this); +} + +@Override +public synchronized void onTimeUpdated() { Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1299185257 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -698,11 +707,329 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +if (isLeader()) { +logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} +} + +class RemoteLogRetentionHandler { + +private final Optional retentionSizeData; +private final Optional retentionTimeData; + +private long remainingBreachedSize; + +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(Optional retentionSizeData, Optional retentionTimeData) { +this.retentionSizeData = retentionSizeData; +this.retentionTimeData = retentionTimeData; +remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionSizeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size >= 0 +if (remainingBreachedSize > 0) { +long remainingBytes = remainingBreachedSize - x.segmentSizeInBytes(); +if (remainingBytes >= 0) { +remainingBreachedSize = remainingBytes; +return true; +} +} + +return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", +metadata.remoteLogSegmentId(), retentionSizeData.get().retentionSize, remainingBreachedSize + retentionSizeData.get().retentionSize); +} +return isSegmentDeleted; +} + +public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) +throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionTimeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, +x -> x.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs); +if (isSegmentDeleted) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +// It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals +// are ascending with in an epoch. +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment", +metadata.remoteLogSegmentId(), retentionTimeData.get().retentionMs); +} +return isSegmentDeleted; +} + +private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) +throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); +if (isSegmentDeleted && retentionSizeData.isPresent()) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); +} + +return isSegmentDeleted; +} + +// It removes the segments beyond the current leader's earliest epoch. Those segments are considered as +// unreferenced because they are not part of the current leader epoch lineage. +private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageExcept
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1299184981 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -696,11 +704,327 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +if (isLeader()) { +logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} +} + +class RemoteLogRetentionHandler { + +private final Optional retentionSizeData; +private final Optional retentionTimeData; + +private long remainingBreachedSize; + +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(Optional retentionSizeData, Optional retentionTimeData) { +this.retentionSizeData = retentionSizeData; +this.retentionTimeData = retentionTimeData; +remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionSizeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size >= 0 +if (remainingBreachedSize > 0) { +long remainingBytes = remainingBreachedSize - x.segmentSizeInBytes(); +if (remainingBytes >= 0) { +remainingBreachedSize = remainingBytes; +return true; +} +} + +return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", +metadata.remoteLogSegmentId(), retentionSizeData.get().retentionSize, remainingBreachedSize + retentionSizeData.get().retentionSize); +} +return isSegmentDeleted; +} + +public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) +throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionTimeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, +x -> x.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs); +if (isSegmentDeleted) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +// It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals +// are ascending with in an epoch. +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment", +metadata.remoteLogSegmentId(), retentionTimeData.get().retentionMs); +} +return isSegmentDeleted; +} + +private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) +throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); +if (isSegmentDeleted && retentionSizeData.isPresent()) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); +} + +return isSegmentDeleted; +} + +// It removes the segments beyond the current leader's earliest epoch. Those segments are considered as +// unreferenced because they are not part of the current leader epoch lineage. +private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageExcept
[GitHub] [kafka] cadonna commented on a diff in pull request #13927: [DO NOT MERGE] KAFKA-10199: Enable state updater by default
cadonna commented on code in PR #13927: URL: https://github.com/apache/kafka/pull/13927#discussion_r1299182368 ## streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java: ## @@ -212,10 +213,11 @@ public void shouldRestoreState() throws Exception { ); // we need long enough timeout to by-pass the log manager's InitialTaskDelayMs, which is hard-coded on server side +final long waitForPurgeMs = 6; TestUtils.waitForCondition( -new RepartitionTopicVerified(currentSize -> currentSize <= PURGE_SEGMENT_BYTES), -6, -"Repartition topic " + REPARTITION_TOPIC + " not purged data after 6 ms." +new RepartitionTopicVerified(currentSize -> currentSize <= 4L * PURGE_SEGMENT_BYTES), Review Comment: Merged https://github.com/apache/kafka/pull/14227 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14429) Move OffsetStorageReader from storage package to source package
[ https://issues.apache.org/jira/browse/KAFKA-14429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17756308#comment-17756308 ] Sagar Rao commented on KAFKA-14429: --- hey [~gharris1727], given Chris's comment above, do you think we can close this one ? > Move OffsetStorageReader from storage package to source package > --- > > Key: KAFKA-14429 > URL: https://issues.apache.org/jira/browse/KAFKA-14429 > Project: Kafka > Issue Type: Task > Components: KafkaConnect >Reporter: Greg Harris >Priority: Minor > Labels: needs-kip > > The OffsetStorageReader is an interface provided to source connectors. This > does not fit with the broader context of the `storage` package, which is > focused on sink/source-agnostic converters and serialization/deserialization. > The current interface should be deprecated and extend from the relocated > interface in a different package. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14138) The Exception Throwing Behavior of Transactional Producer is Inconsistent
[ https://issues.apache.org/jira/browse/KAFKA-14138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao reassigned KAFKA-14138: - Assignee: (was: Sagar Rao) > The Exception Throwing Behavior of Transactional Producer is Inconsistent > - > > Key: KAFKA-14138 > URL: https://issues.apache.org/jira/browse/KAFKA-14138 > Project: Kafka > Issue Type: Improvement > Components: producer >Reporter: Guozhang Wang >Priority: Critical > > There's an issue for inconsistent error throwing inside Kafka Producer when > transactions are enabled. In short, there are two places where the received > error code from the brokers would be eventually thrown to the caller: > * Recorded on the batch's metadata, via "Sender#failBatch" > * Recorded on the txn manager, via "txnManager#handleFailedBatch". > The former would be thrown from 1) the `Future` returned from > the `send`; or 2) the `callback` inside `send(record, callback)`. Whereas, > the latter would be thrown from `producer.send()` directly in which we call > `txnManager.maybeAddPartition -> maybeFailWithError`. However, when thrown > from the former, it's not wrapped hence the direct exception (e.g. > ClusterAuthorizationException), whereas in the latter it's wrapped as, e.g. > KafkaException(ClusterAuthorizationException). And which one would be thrown > depend on a race condition since we cannot control by the time the caller > thread calls `txnManager.maybeAddPartition`, if the previous produceRequest's > error has been sent back or not. > For example consider the following sequence for idempotent producer: > 1. caller thread: within future = producer.send(), call > recordAccumulator.append > 2. sender thread: drain the accumulator, send the produceRequest and get the > error back. > 3. caller thread: within future = producer.send(), call > txnManager.maybeAddPartition, in which we would check `maybeFailWithError` > before `isTransactional`. > 4. caller thread: future.get() > In a sequence where then 3) happened before 2), we would only get the raw > exception at step 4; in a sequence where 2) happened before 3), then we would > throw the exception immediately at 3). > This inconsistent error throwing is pretty annoying for users since they'd > need to handle both cases, but many of them actually do not know this > trickiness. We should make the error throwing consistent, e.g. we should > consider: 1) which errors would be thrown from callback / future.get, and > which would be thrown from the `send` call directly, and these errors should > better be non-overlapping, 2) whether we should wrap the raw error or not, we > should do so consistently. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cadonna opened a new pull request, #14249: KAFKA-10199: Change to RUNNING if no pending task to init exist
cadonna opened a new pull request, #14249: URL: https://github.com/apache/kafka/pull/14249 A stream thread should only change to RUNNING if there are no active tasks in restoration in the state updater and if there are no pending tasks to recycle and to init. Usually all pending tasks to init are added to the state updater in the same poll iteration that handles the assignment. However, if during an initialization of a task a LockException the task is re-added to the tasks to init and initialization is retried in the next poll iteration. A LockException might occur when a state directory is still locked by another thread, when the rebalance just happened. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna closed pull request #14214: KAFKA-10199: Change to RUNNING if no pending task to init exist
cadonna closed pull request #14214: KAFKA-10199: Change to RUNNING if no pending task to init exist URL: https://github.com/apache/kafka/pull/14214 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kun98-liu opened a new pull request, #14248: MINOR: Add libs.slf4jlog4j to dependencies in project(':core')
kun98-liu opened a new pull request, #14248: URL: https://github.com/apache/kafka/pull/14248 ## Description - When I use IntelliJ to debug `kafka.core.main` following the [wiki](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=145722808), I kept having error messages like these. ``` > Task :core:Kafka.main() SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder". SLF4J: Defaulting to no-operation MDCAdapter implementation. SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further details. ``` - My debug configuration is as: ``` -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Dkafka.logs.dir="/Users/me/logs" -Dlog4j.configuration=file:/path/to/kafka/config/log4j.properties ``` - Adding `implementation libs.slf4jlog4j` resolved my problem, - This change will make life easier for who want to start contributing to kafka. ## Test - Run locally and test by `quick-start`. Should have no impact on existing functions. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna merged pull request #14227: MINOR: Decouple purging committed records from committing
cadonna merged PR #14227: URL: https://github.com/apache/kafka/pull/14227 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #14227: MINOR: Decouple purging committed records from committing
cadonna commented on PR #14227: URL: https://github.com/apache/kafka/pull/14227#issuecomment-1684911229 Build failures are unrelated: ``` Build / JDK 20 and Scala 2.13 / kafka.log.remote.RemoteIndexCacheTest.testClose() Build / JDK 20 and Scala 2.13 / kafka.network.ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit() Build / JDK 20 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize() Build / JDK 20 and Scala 2.13 / kafka.zk.ZkMigrationIntegrationTest.[1] Type=ZK, Name=testStartZkBrokerWithAuthorizer, MetadataVersion=3.4-IV0, Security=PLAINTEXT Build / JDK 20 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders() Build / JDK 11 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize() Build / JDK 11 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders() Build / JDK 17 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize() Build / JDK 17 and Scala 2.13 / kafka.zk.ZkMigrationIntegrationTest.[1] Type=ZK, Name=testDualWriteScram, MetadataVersion=3.5-IV2, Security=PLAINTEXT Build / JDK 17 and Scala 2.13 / kafka.zk.ZkMigrationIntegrationTest.[1] Type=ZK, Name=testStartZkBrokerWithAuthorizer, MetadataVersion=3.4-IV0, Security=PLAINTEXT Build / JDK 17 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testFenceMultipleBrokers() Build / JDK 17 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders() Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector Build / JDK 8 and Scala 2.12 / kafka.api.PlaintextConsumerTest.testSubsequentPatternSubscription() Build / JDK 8 and Scala 2.12 / kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize() Build / JDK 8 and Scala 2.12 / kafka.zk.ZkMigrationIntegrationTest.[1] Type=ZK, Name=testNewAndChangedTopicsInDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT Build / JDK 8 and Scala 2.12 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders() Build / JDK 8 and Scala 2.12 / org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerTest.testNewPartitionUpdates() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15302) Stale value returned when using store.all() in punctuation function.
[ https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17756245#comment-17756245 ] Jinyong Choi commented on KAFKA-15302: -- I got it. > Stale value returned when using store.all() in punctuation function. > > > Key: KAFKA-15302 > URL: https://issues.apache.org/jira/browse/KAFKA-15302 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.1 >Reporter: Jinyong Choi >Priority: Major > > When using the store.all() function within the Punctuation function of > this.context.schedule, the previous value is returned. In other words, even > though the value has been stored from 1 to 2, it doesn't return 2; instead, > it returns 1. > In the provided test code, you can see the output 'BROKEN !!!', and while > this doesn't occur 100% of the time, by adding logs, it's evident that during > the while loop after all() is called, the cache is flushed. As a result, the > named cache holds a null value, causing the return of a value from RocksDB. > This is observed as the value after the .get() call is different from the > expected value. This is possibly due to the consistent read functionality of > RocksDB, although the exact cause is not certain. > Of course, if you perform {{store.flush()}} before {{all()}} there won't be > any errors. > > * test code (forked from balajirrao and modified for this) > [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main] > > {code:java} > private void forwardAll(final long timestamp) { > // > System.err.println("forwardAll Start"); KeyValueIterator Integer> kvList = this.kvStore.all(); > while (kvList.hasNext()) { > KeyValue entry = kvList.next(); > final Record msg = new Record<>(entry.key, > entry.value, context.currentSystemTimeMs()); > final Integer storeValue = this.kvStore.get(entry.key); if > (entry.value != storeValue) { > System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: > " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + > " but KeyValueIterator value: " + entry.value); > throw new RuntimeException("Broken!"); > } this.context.forward(msg); > } > kvList.close(); > } > {code} > * log file (add log in stream source) > > {code:java} > # console log > sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1" > [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20) > ... > [info] running Coordinator 1 > appid: 95108c48-7c69-4eeb-adbd-9d091bd84933 > [0] starting instance +1 > forwardAll Start > [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but > KeyValueIterator value: 1 > # log file > ... > 01:05:00.382 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on > flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401 > 01:05:00.388 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- Named Cache flush > dirtyKeys.size():7873 entries:7873 > 01:05:00.434 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.p.i.ProcessorStateManager -- stream-thread > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > stream-task [0_0] Flushed cache or buffer Counts > ... > 01:05:00.587 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.s.i.CachingKeyValueStore -- KeyValueIterator > all() > 01:05:00.588 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.RocksDBStore -- RocksDB KeyValueIterator all > 01:05:00.590 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.ThreadCache -- stream-thread > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > MemoryLRUCacheBytesIterator cache all() > 01:05:00.591 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- NamedCache allKeys() > size():325771 > 01:05:00.637 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- NamedCache keySetIterator() > TreeSet size():325771 > ... > 01:05:07.052 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e