[GitHub] [kafka] ivanyu commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment

2023-08-04 Thread via GitHub


ivanyu commented on code in PR #13984:
URL: https://github.com/apache/kafka/pull/13984#discussion_r1284145024


##
storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java:
##
@@ -66,6 +68,11 @@ public class RemoteLogSegmentMetadata extends 
RemoteLogMetadata {
  */
 private final int segmentSizeInBytes;
 
+/**
+ * Custom metadata.

Review Comment:
   Thanks, updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ivanyu commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment

2023-08-03 Thread via GitHub


ivanyu commented on code in PR #13984:
URL: https://github.com/apache/kafka/pull/13984#discussion_r1282989826


##
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java:
##
@@ -81,6 +81,13 @@ public final class RemoteLogManagerConfig {
 public static final String REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_DOC = 
"Listener name of the local broker to which it should get connected if " +
 "needed by RemoteLogMetadataManager implementation.";
 
+public static final String 
REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_SIZE_PROP = 
"remote.log.metadata.custom.metadata.max.size";

Review Comment:
   Sure. Done this and updated the KIP.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ivanyu commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment

2023-08-03 Thread via GitHub


ivanyu commented on code in PR #13984:
URL: https://github.com/apache/kafka/pull/13984#discussion_r1282971156


##
storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java:
##
@@ -66,6 +68,11 @@ public class RemoteLogSegmentMetadata extends 
RemoteLogMetadata {
  */
 private final int segmentSizeInBytes;
 
+/**
+ * Custom metadata.

Review Comment:
   Would you prefer it here or in the `CustomMetadata` Javadoc?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ivanyu commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment

2023-08-03 Thread via GitHub


ivanyu commented on code in PR #13984:
URL: https://github.com/apache/kafka/pull/13984#discussion_r1282941966


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -623,10 +632,30 @@ private void copyLogSegment(UnifiedLog log, LogSegment 
segment, long nextSegment
 producerStateSnapshotFile.toPath(), leaderEpochsIndex);
 
brokerTopicStats.topicStats(log.topicPartition().topic()).remoteCopyRequestRate().mark();
 brokerTopicStats.allTopicsStats().remoteCopyRequestRate().mark();
-remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, 
segmentData);
+Optional customMetadata = 
remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData);
 
 RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new 
RemoteLogSegmentMetadataUpdate(id, time.milliseconds(),
-RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+customMetadata, 
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+
+if (customMetadata.isPresent()) {
+long customMetadataSize = customMetadata.get().value().length;
+if (customMetadataSize > this.customMetadataSizeLimit) {
+CustomMetadataSizeLimitExceededException e = new 
CustomMetadataSizeLimitExceededException();
+logger.error("Custom metadata size {} exceeds configured 
limit {}." +
+" Copying will be stopped and copied 
segment will be attempted to clean." +
+" Original metadata: {}",
+customMetadataSize, this.customMetadataSizeLimit, 
copySegmentStartedRlsm, e);
+try {
+// For deletion, we provide back the custom metadata 
by creating a new metadata object from the update.
+// However, the update itself will not be stored in 
this case.
+
remoteLogStorageManager.deleteLogSegmentData(copySegmentStartedRlsm.createWithUpdates(copySegmentFinishedRlsm));

Review Comment:
   Here's my reasoning why not: 
https://github.com/apache/kafka/pull/13984#discussion_r1280742713



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ivanyu commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment

2023-08-02 Thread via GitHub


ivanyu commented on code in PR #13984:
URL: https://github.com/apache/kafka/pull/13984#discussion_r1281763529


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -621,10 +626,30 @@ private void copyLogSegment(UnifiedLog log, LogSegment 
segment, long nextSegment
 producerStateSnapshotFile.toPath(), leaderEpochsIndex);
 
brokerTopicStats.topicStats(log.topicPartition().topic()).remoteWriteRequestRate().mark();
 brokerTopicStats.allTopicsStats().remoteWriteRequestRate().mark();
-remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, 
segmentData);
+Optional customMetadata = 
remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData);
 
 RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new 
RemoteLogSegmentMetadataUpdate(id, time.milliseconds(),
-RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+customMetadata, 
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+
+int customMetadataSizeLimit = 
RemoteLogManager.this.rlmConfig.remoteLogMetadataCustomMetadataMaxSize();
+if (customMetadata.isPresent()) {
+long customMetadataSize = customMetadata.get().value().length;
+if (customMetadataSize > customMetadataSizeLimit) {
+CustomMetadataSizeLimitExceededException e = new 
CustomMetadataSizeLimitExceededException();
+logger.error("Custom metadata size {} exceeds configured 
limit {}." +
+" Copying will be stopped and copied 
segment will be attempted to clean." +
+" Original metadata: {}",
+customMetadataSize, customMetadataSizeLimit, 
copySegmentStartedRlsm, e);
+try {
+// For deletion, we provide back the custom metadata 
by creating a new metadata object from the update.
+// However, the update itself will not be stored in 
this case.
+
remoteLogStorageManager.deleteLogSegmentData(copySegmentStartedRlsm.createWithUpdates(copySegmentFinishedRlsm));
+} catch (RemoteStorageException e1) {
+logger.error("Error while cleaning segment after 
custom metadata size exceeded", e1);

Review Comment:
   If this error message doesn't appear, it means the deletion was successful. 
However, it may be a good idea to make this explicit, so I added info logging 
for successful deletion and a suggestion to clean manually in case of an error.
   
   > If we don't even care if the segment is deleted or not at all, why did we 
have to delete it anyway?
   
   On the high level, you're right, we don't care: the operator will fix the 
issue by increasing the custom metadata size limit or more radically by 
deleting the topic or disabling remote storage for it. In any case, there 
shouldn't be any garbage on the remote storage.C
   leaning may matter in the case when the placement on the remote storage is 
nondeterministic. In this case, the subsequent write attempt will not overwrite 
the files but will write new ones thus leaving some garbage. So to combat this 
and generally as a measure of hygiene, the best-effort attempt for deletion is 
made.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ivanyu commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment

2023-08-02 Thread via GitHub


ivanyu commented on code in PR #13984:
URL: https://github.com/apache/kafka/pull/13984#discussion_r1281701101


##
core/src/main/java/kafka/log/remote/CustomMetadataSizeLimitExceededException.java:
##
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote;
+
+class CustomMetadataSizeLimitExceededException extends Exception {

Review Comment:
   `storage.api` is a subproject for exposing public stuff. 
`CustomMetadataSizeLimitExceededException` is an implementation detail of 
`RemoteLogManager`. `RemoteStorageException` is different because it's also a 
public exception. That's why I'm a bit unsure if we should expose it by putting 
in  `storage.api`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ivanyu commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment

2023-08-02 Thread via GitHub


ivanyu commented on code in PR #13984:
URL: https://github.com/apache/kafka/pull/13984#discussion_r1281661247


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -621,10 +626,30 @@ private void copyLogSegment(UnifiedLog log, LogSegment 
segment, long nextSegment
 producerStateSnapshotFile.toPath(), leaderEpochsIndex);
 
brokerTopicStats.topicStats(log.topicPartition().topic()).remoteWriteRequestRate().mark();
 brokerTopicStats.allTopicsStats().remoteWriteRequestRate().mark();
-remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, 
segmentData);
+Optional customMetadata = 
remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData);
 
 RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new 
RemoteLogSegmentMetadataUpdate(id, time.milliseconds(),
-RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+customMetadata, 
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+
+int customMetadataSizeLimit = 
RemoteLogManager.this.rlmConfig.remoteLogMetadataCustomMetadataMaxSize();

Review Comment:
   Sure, done this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ivanyu commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment

2023-08-02 Thread via GitHub


ivanyu commented on code in PR #13984:
URL: https://github.com/apache/kafka/pull/13984#discussion_r1281646428


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -572,6 +573,9 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 } else {
 logger.debug("Skipping copying segments, current 
read-offset:{}, and LSO:{}", copiedOffset, lso);
 }
+} catch (CustomMetadataSizeLimitExceededException e) {
+// Only stop this task. Logging is done where the exception is 
thrown.
+this.cancel();

Review Comment:
   Yes, good point. I also added metrics check in 
`testCustomMetadataSizeExceedsLimit`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ivanyu commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment

2023-08-02 Thread via GitHub


ivanyu commented on code in PR #13984:
URL: https://github.com/apache/kafka/pull/13984#discussion_r1281602105


##
storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataTest.java:
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.api.Test;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class RemoteLogSegmentMetadataTest {
+private static final TopicIdPartition TP0 = new 
TopicIdPartition(Uuid.randomUuid(),
+new TopicPartition("foo", 0));
+
+@Test
+void createWithUpdates() {
+int brokerIdStart = 0;
+int timestampStarted = 0;
+int brokerIdFinished = 1;
+int timestampFinished = 1;
+long startOffset = 0L;
+long endOffset = 100L;
+int segmentSize = 123;
+long maxTimestamp = -1L;
+
+Map segmentLeaderEpochs = new HashMap<>();
+segmentLeaderEpochs.put(0, 0L);
+RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, 
Uuid.randomUuid());
+RemoteLogSegmentMetadata segmentMetadata = new 
RemoteLogSegmentMetadata(segmentId, startOffset, endOffset,
+maxTimestamp, brokerIdStart, timestampStarted, segmentSize,
+segmentLeaderEpochs);
+
+CustomMetadata customMetadata = new CustomMetadata(new byte[]{0, 1, 2, 
3});
+RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new 
RemoteLogSegmentMetadataUpdate(
+segmentId, timestampFinished, Optional.of(customMetadata), 
RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
+brokerIdFinished);
+RemoteLogSegmentMetadata updatedMetadata = 
segmentMetadata.createWithUpdates(segmentMetadataUpdate);
+
+RemoteLogSegmentMetadata expectedUpdatedMetadata = new 
RemoteLogSegmentMetadata(
+segmentId, startOffset, endOffset,
+maxTimestamp, brokerIdFinished, timestampFinished, 
segmentSize, Optional.of(customMetadata),
+RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
+segmentLeaderEpochs
+);
+assertEquals(expectedUpdatedMetadata, updatedMetadata);

Review Comment:
   Good point, done this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ivanyu commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment

2023-08-02 Thread via GitHub


ivanyu commented on code in PR #13984:
URL: https://github.com/apache/kafka/pull/13984#discussion_r1281591602


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -239,96 +240,186 @@ void testStartup() {
 assertEquals(logDir, capture.getValue().get("log.dir"));
 }
 
-// This test creates 2 log segments, 1st one has start offset of 0, 2nd 
one (and active one) has start offset of 150.

Review Comment:
   After resolving another bunch of conflicts with `trunk`, I dropped this 
approach and made `testCustomMetadataSizeExceedsLimit` independent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ivanyu commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment

2023-08-01 Thread via GitHub


ivanyu commented on code in PR #13984:
URL: https://github.com/apache/kafka/pull/13984#discussion_r1280742713


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -621,10 +626,30 @@ private void copyLogSegment(UnifiedLog log, LogSegment 
segment, long nextSegment
 producerStateSnapshotFile.toPath(), leaderEpochsIndex);
 
brokerTopicStats.topicStats(log.topicPartition().topic()).remoteWriteRequestRate().mark();
 brokerTopicStats.allTopicsStats().remoteWriteRequestRate().mark();
-remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, 
segmentData);
+Optional customMetadata = 
remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData);
 
 RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new 
RemoteLogSegmentMetadataUpdate(id, time.milliseconds(),
-RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+customMetadata, 
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+
+int customMetadataSizeLimit = 
RemoteLogManager.this.rlmConfig.remoteLogMetadataCustomMetadataMaxSize();
+if (customMetadata.isPresent()) {
+long customMetadataSize = customMetadata.get().value().length;
+if (customMetadataSize > customMetadataSizeLimit) {
+CustomMetadataSizeLimitExceededException e = new 
CustomMetadataSizeLimitExceededException();
+logger.error("Custom metadata size {} exceeds configured 
limit {}." +
+" Copying will be stopped and copied 
segment will be attempted to clean." +
+" Original metadata: {}",
+customMetadataSize, customMetadataSizeLimit, 
copySegmentStartedRlsm, e);
+try {
+// For deletion, we provide back the custom metadata 
by creating a new metadata object from the update.
+// However, the update itself will not be stored in 
this case.
+
remoteLogStorageManager.deleteLogSegmentData(copySegmentStartedRlsm.createWithUpdates(copySegmentFinishedRlsm));

Review Comment:
   Do you mean switching the segment state to `DELETE_SEGMENT_STARTED` and 
ultimately to `DELETE_SEGMENT_FINISHED`? The idea was to try to roll back to 
the state that was at L629, i.e. before calling 
`remoteLogStorageManager.copyLogSegmentData` (by undoing the copying) and let 
the operator decide what's next. I analyzed the metadata flow and it seems both 
approaches--deleting only the data and leaving the segment in the 
`COPY_SEGMENT_STARTED` (the current one); and fully deleting the metadata (the 
one you propose)--will have the same final result. However, the current one is 
a bit simpler and has fewer steps to fail. So I think we should keep it unless 
I'm missing some factor. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ivanyu commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment

2023-08-01 Thread via GitHub


ivanyu commented on code in PR #13984:
URL: https://github.com/apache/kafka/pull/13984#discussion_r1280718946


##
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java:
##
@@ -393,6 +413,7 @@ public boolean equals(Object o) {
 && Objects.equals(remoteLogMetadataManagerClassName, 
that.remoteLogMetadataManagerClassName)
 && Objects.equals(remoteLogMetadataManagerClassPath, 
that.remoteLogMetadataManagerClassPath)
 && Objects.equals(remoteLogMetadataManagerListenerName, 
that.remoteLogMetadataManagerListenerName)
+&& remoteLogMetadataCustomMetadataMaxSize == 
that.remoteLogMetadataCustomMetadataMaxSize

Review Comment:
   Sure!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ivanyu commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment

2023-08-01 Thread via GitHub


ivanyu commented on code in PR #13984:
URL: https://github.com/apache/kafka/pull/13984#discussion_r1280718863


##
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java:
##
@@ -169,6 +176,11 @@ public final class RemoteLogManagerConfig {
   new ConfigDef.NonEmptyString(),
   MEDIUM,
   
REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_DOC)
+  
.defineInternal(REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_SIZE_PROP, INT,

Review Comment:
   Yes, fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ivanyu commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment

2023-08-01 Thread via GitHub


ivanyu commented on code in PR #13984:
URL: https://github.com/apache/kafka/pull/13984#discussion_r1280710406


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -621,10 +626,30 @@ private void copyLogSegment(UnifiedLog log, LogSegment 
segment, long nextSegment
 producerStateSnapshotFile.toPath(), leaderEpochsIndex);
 
brokerTopicStats.topicStats(log.topicPartition().topic()).remoteWriteRequestRate().mark();
 brokerTopicStats.allTopicsStats().remoteWriteRequestRate().mark();
-remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, 
segmentData);
+Optional customMetadata = 
remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData);
 
 RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new 
RemoteLogSegmentMetadataUpdate(id, time.milliseconds(),
-RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+customMetadata, 
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);

Review Comment:
   It's needed during the validation, in the rollback phase: there's an attempt 
to delete that needs the updated metadata, including the custom piece.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ivanyu commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment

2023-08-01 Thread via GitHub


ivanyu commented on code in PR #13984:
URL: https://github.com/apache/kafka/pull/13984#discussion_r1280708300


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -621,10 +626,30 @@ private void copyLogSegment(UnifiedLog log, LogSegment 
segment, long nextSegment
 producerStateSnapshotFile.toPath(), leaderEpochsIndex);
 
brokerTopicStats.topicStats(log.topicPartition().topic()).remoteWriteRequestRate().mark();
 brokerTopicStats.allTopicsStats().remoteWriteRequestRate().mark();
-remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, 
segmentData);
+Optional customMetadata = 
remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData);
 
 RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new 
RemoteLogSegmentMetadataUpdate(id, time.milliseconds(),
-RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+customMetadata, 
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+
+int customMetadataSizeLimit = 
RemoteLogManager.this.rlmConfig.remoteLogMetadataCustomMetadataMaxSize();

Review Comment:
   Yeah, it's global. Do you mean, make it a field in `RLMTask` and pass it in 
the constructor?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ivanyu commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment

2023-08-01 Thread via GitHub


ivanyu commented on code in PR #13984:
URL: https://github.com/apache/kafka/pull/13984#discussion_r1280704854


##
core/src/main/java/kafka/log/remote/CustomMetadataSizeLimitExceededException.java:
##
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote;
+
+class CustomMetadataSizeLimitExceededException extends Exception {

Review Comment:
   I'm not sure. As you said, it's internal, supposed to be used inside 
`RemoteLogManager.java` only. Am I missing something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ivanyu commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment

2023-07-10 Thread via GitHub


ivanyu commented on code in PR #13984:
URL: https://github.com/apache/kafka/pull/13984#discussion_r1258358905


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -239,96 +240,186 @@ void testStartup() {
 assertEquals(logDir, capture.getValue().get("log.dir"));
 }
 
-// This test creates 2 log segments, 1st one has start offset of 0, 2nd 
one (and active one) has start offset of 150.

Review Comment:
   To make the review easier: in this test file,
   - `testCopyLogSegmentsToRemoteShouldCopyExpectedLogSegment` was moved to a 
test subclass;
   - The code that initializes the log mock was extracted to `setUp`;
   - A new test `testCustomMetadataSizeExceedsLimit` was added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org