ivanyu commented on code in PR #13984:
URL: https://github.com/apache/kafka/pull/13984#discussion_r1280742713


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -621,10 +626,30 @@ private void copyLogSegment(UnifiedLog log, LogSegment 
segment, long nextSegment
                     producerStateSnapshotFile.toPath(), leaderEpochsIndex);
             
brokerTopicStats.topicStats(log.topicPartition().topic()).remoteWriteRequestRate().mark();
             brokerTopicStats.allTopicsStats().remoteWriteRequestRate().mark();
-            remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, 
segmentData);
+            Optional<CustomMetadata> customMetadata = 
remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData);
 
             RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new 
RemoteLogSegmentMetadataUpdate(id, time.milliseconds(),
-                    RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+                    customMetadata, 
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+
+            int customMetadataSizeLimit = 
RemoteLogManager.this.rlmConfig.remoteLogMetadataCustomMetadataMaxSize();
+            if (customMetadata.isPresent()) {
+                long customMetadataSize = customMetadata.get().value().length;
+                if (customMetadataSize > customMetadataSizeLimit) {
+                    CustomMetadataSizeLimitExceededException e = new 
CustomMetadataSizeLimitExceededException();
+                    logger.error("Custom metadata size {} exceeds configured 
limit {}." +
+                                    " Copying will be stopped and copied 
segment will be attempted to clean." +
+                                    " Original metadata: {}",
+                            customMetadataSize, customMetadataSizeLimit, 
copySegmentStartedRlsm, e);
+                    try {
+                        // For deletion, we provide back the custom metadata 
by creating a new metadata object from the update.
+                        // However, the update itself will not be stored in 
this case.
+                        
remoteLogStorageManager.deleteLogSegmentData(copySegmentStartedRlsm.createWithUpdates(copySegmentFinishedRlsm));

Review Comment:
   Do you mean switching the segment state to `DELETE_SEGMENT_STARTED` and 
ultimately to `DELETE_SEGMENT_FINISHED`? The idea was to try to roll back to 
the state that was at L629, i.e. before calling 
`remoteLogStorageManager.copyLogSegmentData` (by undoing the copying) and let 
the operator decide what's next. I analyzed the metadata flow and it seems both 
approaches--deleting only the data and leaving the segment in the 
`COPY_SEGMENT_STARTED` (the current one); and fully deleting the metadata (the 
one you propose)--will have the same final result. However, the current one is 
a bit simpler and has fewer steps to fail. So I think we should keep it unless 
I'm missing some factor. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to