kamalcph commented on code in PR #13984: URL: https://github.com/apache/kafka/pull/13984#discussion_r1285011540
########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -621,10 +626,30 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegment producerStateSnapshotFile.toPath(), leaderEpochsIndex); brokerTopicStats.topicStats(log.topicPartition().topic()).remoteWriteRequestRate().mark(); brokerTopicStats.allTopicsStats().remoteWriteRequestRate().mark(); - remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData); + Optional<CustomMetadata> customMetadata = remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData); RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new RemoteLogSegmentMetadataUpdate(id, time.milliseconds(), - RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId); + customMetadata, RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId); + + int customMetadataSizeLimit = RemoteLogManager.this.rlmConfig.remoteLogMetadataCustomMetadataMaxSize(); + if (customMetadata.isPresent()) { + long customMetadataSize = customMetadata.get().value().length; + if (customMetadataSize > customMetadataSizeLimit) { + CustomMetadataSizeLimitExceededException e = new CustomMetadataSizeLimitExceededException(); + logger.error("Custom metadata size {} exceeds configured limit {}." + + " Copying will be stopped and copied segment will be attempted to clean." + + " Original metadata: {}", + customMetadataSize, customMetadataSizeLimit, copySegmentStartedRlsm, e); + try { + // For deletion, we provide back the custom metadata by creating a new metadata object from the update. + // However, the update itself will not be stored in this case. + remoteLogStorageManager.deleteLogSegmentData(copySegmentStartedRlsm.createWithUpdates(copySegmentFinishedRlsm)); + } catch (RemoteStorageException e1) { + logger.error("Error while cleaning segment after custom metadata size exceeded", e1); Review Comment: The approach taken looks good. Even if we fail to delete the last uploaded segment on error, it's will marked as unreferenced segment. And, when the RLM task is enabled for the topic, it will be removed in the regular segment cleanup. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org