[GitHub] [kafka] ivanyu commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment
ivanyu commented on code in PR #13984: URL: https://github.com/apache/kafka/pull/13984#discussion_r1284145024 ## storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java: ## @@ -66,6 +68,11 @@ public class RemoteLogSegmentMetadata extends RemoteLogMetadata { */ private final int segmentSizeInBytes; +/** + * Custom metadata. Review Comment: Thanks, updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ivanyu commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment
ivanyu commented on code in PR #13984: URL: https://github.com/apache/kafka/pull/13984#discussion_r1282989826 ## storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java: ## @@ -81,6 +81,13 @@ public final class RemoteLogManagerConfig { public static final String REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_DOC = "Listener name of the local broker to which it should get connected if " + "needed by RemoteLogMetadataManager implementation."; +public static final String REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_SIZE_PROP = "remote.log.metadata.custom.metadata.max.size"; Review Comment: Sure. Done this and updated the KIP. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ivanyu commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment
ivanyu commented on code in PR #13984: URL: https://github.com/apache/kafka/pull/13984#discussion_r1282971156 ## storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java: ## @@ -66,6 +68,11 @@ public class RemoteLogSegmentMetadata extends RemoteLogMetadata { */ private final int segmentSizeInBytes; +/** + * Custom metadata. Review Comment: Would you prefer it here or in the `CustomMetadata` Javadoc? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ivanyu commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment
ivanyu commented on code in PR #13984: URL: https://github.com/apache/kafka/pull/13984#discussion_r1282941966 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -623,10 +632,30 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegment producerStateSnapshotFile.toPath(), leaderEpochsIndex); brokerTopicStats.topicStats(log.topicPartition().topic()).remoteCopyRequestRate().mark(); brokerTopicStats.allTopicsStats().remoteCopyRequestRate().mark(); -remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData); +Optional customMetadata = remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData); RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new RemoteLogSegmentMetadataUpdate(id, time.milliseconds(), -RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId); +customMetadata, RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId); + +if (customMetadata.isPresent()) { +long customMetadataSize = customMetadata.get().value().length; +if (customMetadataSize > this.customMetadataSizeLimit) { +CustomMetadataSizeLimitExceededException e = new CustomMetadataSizeLimitExceededException(); +logger.error("Custom metadata size {} exceeds configured limit {}." + +" Copying will be stopped and copied segment will be attempted to clean." + +" Original metadata: {}", +customMetadataSize, this.customMetadataSizeLimit, copySegmentStartedRlsm, e); +try { +// For deletion, we provide back the custom metadata by creating a new metadata object from the update. +// However, the update itself will not be stored in this case. + remoteLogStorageManager.deleteLogSegmentData(copySegmentStartedRlsm.createWithUpdates(copySegmentFinishedRlsm)); Review Comment: Here's my reasoning why not: https://github.com/apache/kafka/pull/13984#discussion_r1280742713 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ivanyu commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment
ivanyu commented on code in PR #13984: URL: https://github.com/apache/kafka/pull/13984#discussion_r1281763529 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -621,10 +626,30 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegment producerStateSnapshotFile.toPath(), leaderEpochsIndex); brokerTopicStats.topicStats(log.topicPartition().topic()).remoteWriteRequestRate().mark(); brokerTopicStats.allTopicsStats().remoteWriteRequestRate().mark(); -remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData); +Optional customMetadata = remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData); RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new RemoteLogSegmentMetadataUpdate(id, time.milliseconds(), -RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId); +customMetadata, RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId); + +int customMetadataSizeLimit = RemoteLogManager.this.rlmConfig.remoteLogMetadataCustomMetadataMaxSize(); +if (customMetadata.isPresent()) { +long customMetadataSize = customMetadata.get().value().length; +if (customMetadataSize > customMetadataSizeLimit) { +CustomMetadataSizeLimitExceededException e = new CustomMetadataSizeLimitExceededException(); +logger.error("Custom metadata size {} exceeds configured limit {}." + +" Copying will be stopped and copied segment will be attempted to clean." + +" Original metadata: {}", +customMetadataSize, customMetadataSizeLimit, copySegmentStartedRlsm, e); +try { +// For deletion, we provide back the custom metadata by creating a new metadata object from the update. +// However, the update itself will not be stored in this case. + remoteLogStorageManager.deleteLogSegmentData(copySegmentStartedRlsm.createWithUpdates(copySegmentFinishedRlsm)); +} catch (RemoteStorageException e1) { +logger.error("Error while cleaning segment after custom metadata size exceeded", e1); Review Comment: If this error message doesn't appear, it means the deletion was successful. However, it may be a good idea to make this explicit, so I added info logging for successful deletion and a suggestion to clean manually in case of an error. > If we don't even care if the segment is deleted or not at all, why did we have to delete it anyway? On the high level, you're right, we don't care: the operator will fix the issue by increasing the custom metadata size limit or more radically by deleting the topic or disabling remote storage for it. In any case, there shouldn't be any garbage on the remote storage.C leaning may matter in the case when the placement on the remote storage is nondeterministic. In this case, the subsequent write attempt will not overwrite the files but will write new ones thus leaving some garbage. So to combat this and generally as a measure of hygiene, the best-effort attempt for deletion is made. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ivanyu commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment
ivanyu commented on code in PR #13984: URL: https://github.com/apache/kafka/pull/13984#discussion_r1281701101 ## core/src/main/java/kafka/log/remote/CustomMetadataSizeLimitExceededException.java: ## @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote; + +class CustomMetadataSizeLimitExceededException extends Exception { Review Comment: `storage.api` is a subproject for exposing public stuff. `CustomMetadataSizeLimitExceededException` is an implementation detail of `RemoteLogManager`. `RemoteStorageException` is different because it's also a public exception. That's why I'm a bit unsure if we should expose it by putting in `storage.api`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ivanyu commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment
ivanyu commented on code in PR #13984: URL: https://github.com/apache/kafka/pull/13984#discussion_r1281661247 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -621,10 +626,30 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegment producerStateSnapshotFile.toPath(), leaderEpochsIndex); brokerTopicStats.topicStats(log.topicPartition().topic()).remoteWriteRequestRate().mark(); brokerTopicStats.allTopicsStats().remoteWriteRequestRate().mark(); -remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData); +Optional customMetadata = remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData); RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new RemoteLogSegmentMetadataUpdate(id, time.milliseconds(), -RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId); +customMetadata, RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId); + +int customMetadataSizeLimit = RemoteLogManager.this.rlmConfig.remoteLogMetadataCustomMetadataMaxSize(); Review Comment: Sure, done this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ivanyu commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment
ivanyu commented on code in PR #13984: URL: https://github.com/apache/kafka/pull/13984#discussion_r1281646428 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -572,6 +573,9 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException } else { logger.debug("Skipping copying segments, current read-offset:{}, and LSO:{}", copiedOffset, lso); } +} catch (CustomMetadataSizeLimitExceededException e) { +// Only stop this task. Logging is done where the exception is thrown. +this.cancel(); Review Comment: Yes, good point. I also added metrics check in `testCustomMetadataSizeExceedsLimit`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ivanyu commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment
ivanyu commented on code in PR #13984: URL: https://github.com/apache/kafka/pull/13984#discussion_r1281602105 ## storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataTest.java: ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.junit.jupiter.api.Test; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class RemoteLogSegmentMetadataTest { +private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(), +new TopicPartition("foo", 0)); + +@Test +void createWithUpdates() { +int brokerIdStart = 0; +int timestampStarted = 0; +int brokerIdFinished = 1; +int timestampFinished = 1; +long startOffset = 0L; +long endOffset = 100L; +int segmentSize = 123; +long maxTimestamp = -1L; + +Map segmentLeaderEpochs = new HashMap<>(); +segmentLeaderEpochs.put(0, 0L); +RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid()); +RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, startOffset, endOffset, +maxTimestamp, brokerIdStart, timestampStarted, segmentSize, +segmentLeaderEpochs); + +CustomMetadata customMetadata = new CustomMetadata(new byte[]{0, 1, 2, 3}); +RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate( +segmentId, timestampFinished, Optional.of(customMetadata), RemoteLogSegmentState.COPY_SEGMENT_FINISHED, +brokerIdFinished); +RemoteLogSegmentMetadata updatedMetadata = segmentMetadata.createWithUpdates(segmentMetadataUpdate); + +RemoteLogSegmentMetadata expectedUpdatedMetadata = new RemoteLogSegmentMetadata( +segmentId, startOffset, endOffset, +maxTimestamp, brokerIdFinished, timestampFinished, segmentSize, Optional.of(customMetadata), +RemoteLogSegmentState.COPY_SEGMENT_FINISHED, +segmentLeaderEpochs +); +assertEquals(expectedUpdatedMetadata, updatedMetadata); Review Comment: Good point, done this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ivanyu commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment
ivanyu commented on code in PR #13984: URL: https://github.com/apache/kafka/pull/13984#discussion_r1281591602 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -239,96 +240,186 @@ void testStartup() { assertEquals(logDir, capture.getValue().get("log.dir")); } -// This test creates 2 log segments, 1st one has start offset of 0, 2nd one (and active one) has start offset of 150. Review Comment: After resolving another bunch of conflicts with `trunk`, I dropped this approach and made `testCustomMetadataSizeExceedsLimit` independent. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ivanyu commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment
ivanyu commented on code in PR #13984: URL: https://github.com/apache/kafka/pull/13984#discussion_r1280742713 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -621,10 +626,30 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegment producerStateSnapshotFile.toPath(), leaderEpochsIndex); brokerTopicStats.topicStats(log.topicPartition().topic()).remoteWriteRequestRate().mark(); brokerTopicStats.allTopicsStats().remoteWriteRequestRate().mark(); -remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData); +Optional customMetadata = remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData); RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new RemoteLogSegmentMetadataUpdate(id, time.milliseconds(), -RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId); +customMetadata, RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId); + +int customMetadataSizeLimit = RemoteLogManager.this.rlmConfig.remoteLogMetadataCustomMetadataMaxSize(); +if (customMetadata.isPresent()) { +long customMetadataSize = customMetadata.get().value().length; +if (customMetadataSize > customMetadataSizeLimit) { +CustomMetadataSizeLimitExceededException e = new CustomMetadataSizeLimitExceededException(); +logger.error("Custom metadata size {} exceeds configured limit {}." + +" Copying will be stopped and copied segment will be attempted to clean." + +" Original metadata: {}", +customMetadataSize, customMetadataSizeLimit, copySegmentStartedRlsm, e); +try { +// For deletion, we provide back the custom metadata by creating a new metadata object from the update. +// However, the update itself will not be stored in this case. + remoteLogStorageManager.deleteLogSegmentData(copySegmentStartedRlsm.createWithUpdates(copySegmentFinishedRlsm)); Review Comment: Do you mean switching the segment state to `DELETE_SEGMENT_STARTED` and ultimately to `DELETE_SEGMENT_FINISHED`? The idea was to try to roll back to the state that was at L629, i.e. before calling `remoteLogStorageManager.copyLogSegmentData` (by undoing the copying) and let the operator decide what's next. I analyzed the metadata flow and it seems both approaches--deleting only the data and leaving the segment in the `COPY_SEGMENT_STARTED` (the current one); and fully deleting the metadata (the one you propose)--will have the same final result. However, the current one is a bit simpler and has fewer steps to fail. So I think we should keep it unless I'm missing some factor. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ivanyu commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment
ivanyu commented on code in PR #13984: URL: https://github.com/apache/kafka/pull/13984#discussion_r1280718946 ## storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java: ## @@ -393,6 +413,7 @@ public boolean equals(Object o) { && Objects.equals(remoteLogMetadataManagerClassName, that.remoteLogMetadataManagerClassName) && Objects.equals(remoteLogMetadataManagerClassPath, that.remoteLogMetadataManagerClassPath) && Objects.equals(remoteLogMetadataManagerListenerName, that.remoteLogMetadataManagerListenerName) +&& remoteLogMetadataCustomMetadataMaxSize == that.remoteLogMetadataCustomMetadataMaxSize Review Comment: Sure! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ivanyu commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment
ivanyu commented on code in PR #13984: URL: https://github.com/apache/kafka/pull/13984#discussion_r1280718863 ## storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java: ## @@ -169,6 +176,11 @@ public final class RemoteLogManagerConfig { new ConfigDef.NonEmptyString(), MEDIUM, REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_DOC) + .defineInternal(REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_SIZE_PROP, INT, Review Comment: Yes, fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ivanyu commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment
ivanyu commented on code in PR #13984: URL: https://github.com/apache/kafka/pull/13984#discussion_r1280710406 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -621,10 +626,30 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegment producerStateSnapshotFile.toPath(), leaderEpochsIndex); brokerTopicStats.topicStats(log.topicPartition().topic()).remoteWriteRequestRate().mark(); brokerTopicStats.allTopicsStats().remoteWriteRequestRate().mark(); -remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData); +Optional customMetadata = remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData); RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new RemoteLogSegmentMetadataUpdate(id, time.milliseconds(), -RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId); +customMetadata, RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId); Review Comment: It's needed during the validation, in the rollback phase: there's an attempt to delete that needs the updated metadata, including the custom piece. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ivanyu commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment
ivanyu commented on code in PR #13984: URL: https://github.com/apache/kafka/pull/13984#discussion_r1280708300 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -621,10 +626,30 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegment producerStateSnapshotFile.toPath(), leaderEpochsIndex); brokerTopicStats.topicStats(log.topicPartition().topic()).remoteWriteRequestRate().mark(); brokerTopicStats.allTopicsStats().remoteWriteRequestRate().mark(); -remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData); +Optional customMetadata = remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData); RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new RemoteLogSegmentMetadataUpdate(id, time.milliseconds(), -RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId); +customMetadata, RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId); + +int customMetadataSizeLimit = RemoteLogManager.this.rlmConfig.remoteLogMetadataCustomMetadataMaxSize(); Review Comment: Yeah, it's global. Do you mean, make it a field in `RLMTask` and pass it in the constructor? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ivanyu commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment
ivanyu commented on code in PR #13984: URL: https://github.com/apache/kafka/pull/13984#discussion_r1280704854 ## core/src/main/java/kafka/log/remote/CustomMetadataSizeLimitExceededException.java: ## @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote; + +class CustomMetadataSizeLimitExceededException extends Exception { Review Comment: I'm not sure. As you said, it's internal, supposed to be used inside `RemoteLogManager.java` only. Am I missing something? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ivanyu commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment
ivanyu commented on code in PR #13984: URL: https://github.com/apache/kafka/pull/13984#discussion_r1258358905 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -239,96 +240,186 @@ void testStartup() { assertEquals(logDir, capture.getValue().get("log.dir")); } -// This test creates 2 log segments, 1st one has start offset of 0, 2nd one (and active one) has start offset of 150. Review Comment: To make the review easier: in this test file, - `testCopyLogSegmentsToRemoteShouldCopyExpectedLogSegment` was moved to a test subclass; - The code that initializes the log mock was extracted to `setUp`; - A new test `testCustomMetadataSizeExceedsLimit` was added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org