ivanyu commented on code in PR #13984:
URL: https://github.com/apache/kafka/pull/13984#discussion_r1281763529


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -621,10 +626,30 @@ private void copyLogSegment(UnifiedLog log, LogSegment 
segment, long nextSegment
                     producerStateSnapshotFile.toPath(), leaderEpochsIndex);
             
brokerTopicStats.topicStats(log.topicPartition().topic()).remoteWriteRequestRate().mark();
             brokerTopicStats.allTopicsStats().remoteWriteRequestRate().mark();
-            remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, 
segmentData);
+            Optional<CustomMetadata> customMetadata = 
remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData);
 
             RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new 
RemoteLogSegmentMetadataUpdate(id, time.milliseconds(),
-                    RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+                    customMetadata, 
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+
+            int customMetadataSizeLimit = 
RemoteLogManager.this.rlmConfig.remoteLogMetadataCustomMetadataMaxSize();
+            if (customMetadata.isPresent()) {
+                long customMetadataSize = customMetadata.get().value().length;
+                if (customMetadataSize > customMetadataSizeLimit) {
+                    CustomMetadataSizeLimitExceededException e = new 
CustomMetadataSizeLimitExceededException();
+                    logger.error("Custom metadata size {} exceeds configured 
limit {}." +
+                                    " Copying will be stopped and copied 
segment will be attempted to clean." +
+                                    " Original metadata: {}",
+                            customMetadataSize, customMetadataSizeLimit, 
copySegmentStartedRlsm, e);
+                    try {
+                        // For deletion, we provide back the custom metadata 
by creating a new metadata object from the update.
+                        // However, the update itself will not be stored in 
this case.
+                        
remoteLogStorageManager.deleteLogSegmentData(copySegmentStartedRlsm.createWithUpdates(copySegmentFinishedRlsm));
+                    } catch (RemoteStorageException e1) {
+                        logger.error("Error while cleaning segment after 
custom metadata size exceeded", e1);

Review Comment:
   If this error message doesn't appear, it means the deletion was successful. 
However, it may be a good idea to make this explicit, so I added info logging 
for successful deletion and a suggestion to clean manually in case of an error.
   
   > If we don't even care if the segment is deleted or not at all, why did we 
have to delete it anyway?
   
   On the high level, you're right, we don't care: the operator will fix the 
issue by increasing the custom metadata size limit or more radically by 
deleting the topic or disabling remote storage for it. In any case, there 
shouldn't be any garbage on the remote storage.C
   leaning may matter in the case when the placement on the remote storage is 
nondeterministic. In this case, the subsequent write attempt will not overwrite 
the files but will write new ones thus leaving some garbage. So to combat this 
and generally as a measure of hygiene, the best-effort attempt for deletion is 
made.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to