ivanyu commented on code in PR #13984: URL: https://github.com/apache/kafka/pull/13984#discussion_r1281763529
########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -621,10 +626,30 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegment producerStateSnapshotFile.toPath(), leaderEpochsIndex); brokerTopicStats.topicStats(log.topicPartition().topic()).remoteWriteRequestRate().mark(); brokerTopicStats.allTopicsStats().remoteWriteRequestRate().mark(); - remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData); + Optional<CustomMetadata> customMetadata = remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData); RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new RemoteLogSegmentMetadataUpdate(id, time.milliseconds(), - RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId); + customMetadata, RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId); + + int customMetadataSizeLimit = RemoteLogManager.this.rlmConfig.remoteLogMetadataCustomMetadataMaxSize(); + if (customMetadata.isPresent()) { + long customMetadataSize = customMetadata.get().value().length; + if (customMetadataSize > customMetadataSizeLimit) { + CustomMetadataSizeLimitExceededException e = new CustomMetadataSizeLimitExceededException(); + logger.error("Custom metadata size {} exceeds configured limit {}." + + " Copying will be stopped and copied segment will be attempted to clean." + + " Original metadata: {}", + customMetadataSize, customMetadataSizeLimit, copySegmentStartedRlsm, e); + try { + // For deletion, we provide back the custom metadata by creating a new metadata object from the update. + // However, the update itself will not be stored in this case. + remoteLogStorageManager.deleteLogSegmentData(copySegmentStartedRlsm.createWithUpdates(copySegmentFinishedRlsm)); + } catch (RemoteStorageException e1) { + logger.error("Error while cleaning segment after custom metadata size exceeded", e1); Review Comment: If this error message doesn't appear, it means the deletion was successful. However, it may be a good idea to make this explicit, so I added info logging for successful deletion and a suggestion to clean manually in case of an error. > If we don't even care if the segment is deleted or not at all, why did we have to delete it anyway? On the high level, you're right, we don't care: the operator will fix the issue by increasing the custom metadata size limit or more radically by deleting the topic or disabling remote storage for it. In any case, there shouldn't be any garbage on the remote storage.C leaning may matter in the case when the placement on the remote storage is nondeterministic. In this case, the subsequent write attempt will not overwrite the files but will write new ones thus leaving some garbage. So to combat this and generally as a measure of hygiene, the best-effort attempt for deletion is made. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org