kamalcph commented on code in PR #13984:
URL: https://github.com/apache/kafka/pull/13984#discussion_r1285011540


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -621,10 +626,30 @@ private void copyLogSegment(UnifiedLog log, LogSegment 
segment, long nextSegment
                     producerStateSnapshotFile.toPath(), leaderEpochsIndex);
             
brokerTopicStats.topicStats(log.topicPartition().topic()).remoteWriteRequestRate().mark();
             brokerTopicStats.allTopicsStats().remoteWriteRequestRate().mark();
-            remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, 
segmentData);
+            Optional<CustomMetadata> customMetadata = 
remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData);
 
             RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new 
RemoteLogSegmentMetadataUpdate(id, time.milliseconds(),
-                    RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+                    customMetadata, 
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+
+            int customMetadataSizeLimit = 
RemoteLogManager.this.rlmConfig.remoteLogMetadataCustomMetadataMaxSize();
+            if (customMetadata.isPresent()) {
+                long customMetadataSize = customMetadata.get().value().length;
+                if (customMetadataSize > customMetadataSizeLimit) {
+                    CustomMetadataSizeLimitExceededException e = new 
CustomMetadataSizeLimitExceededException();
+                    logger.error("Custom metadata size {} exceeds configured 
limit {}." +
+                                    " Copying will be stopped and copied 
segment will be attempted to clean." +
+                                    " Original metadata: {}",
+                            customMetadataSize, customMetadataSizeLimit, 
copySegmentStartedRlsm, e);
+                    try {
+                        // For deletion, we provide back the custom metadata 
by creating a new metadata object from the update.
+                        // However, the update itself will not be stored in 
this case.
+                        
remoteLogStorageManager.deleteLogSegmentData(copySegmentStartedRlsm.createWithUpdates(copySegmentFinishedRlsm));
+                    } catch (RemoteStorageException e1) {
+                        logger.error("Error while cleaning segment after 
custom metadata size exceeded", e1);

Review Comment:
   The approach taken looks good. Even if we fail to delete the last uploaded 
segment on error, it will be marked as unreferenced segment. And, when the RLM 
task is enabled for the topic, it will be removed in the regular segment 
cleanup cycle. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to