Guillaume Mallet created KAFKA-17062: ----------------------------------------
Summary: RemoteLogManager - RemoteStorageException causes data loss Key: KAFKA-17062 URL: https://issues.apache.org/jira/browse/KAFKA-17062 Project: Kafka Issue Type: Bug Components: Tiered-Storage Reporter: Guillaume Mallet When Tiered Storage is configured, retention.bytes defines the limit for the amount of data stored in the filesystem and in remote storage. However a failure while offloading to remote storage can cause segments to be dropped before the retention limit is met. What happens Assuming a topic configured with {{retention.bytes=4294967296}} (4GB) and a {{local.retention.bytes=1073741824}} (1GB, equal to segment.bytes) we would expect Kafka to keep up to 3 segments (3GB) in the remote store and 1 segment locally (the local segment) and possibly more if the remote storage is offline. i.e. segments in the following RemoteLogSegmentStates in the RemoteLogMetadataManager (RLMM) : * Segment 3 ({{{}COPY_SEGMENT_FINISHED{}}}) * Segment 2 ({{{}COPY_SEGMENT_FINISHED{}}}) * Segment 1 ({{{}COPY_SEGMENT_FINISHED{}}}) Let's assume the RLMM starts failing when segment 4 rolls. At the first iteration of an RLMTask we will have - * [{{copyLogSegmentsToRemote}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L773] : is called first ** RLMM becomes aware of Segment 4 and adds it to the metadata: *** Segment 4 ({{{}COPY_SEGMENT_STARTED{}}}), *** Segment 3 ({{{}COPY_SEGMENT_FINISHED{}}}), *** Segment 2 ({{{}COPY_SEGMENT_FINISHED{}}}), *** Segment 1 ({{{}COPY_SEGMENT_FINISHED{}}}) ** An exception is raised during the copy operation ([{{copyLogSegmentData}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java#L93] in RemoteStorageManager) which is caught with the error message “{{Error occurred while copying log segments of partition}}” and no further copy will be attempted for the duration of this RLMTask. ** At that point the Segment will never move to {{COPY_SEGMENT_FINISHED}} but will transition to {{DELETE_SEGMENT_STARTED}} eventually before being cleaned up when the associated segment is deleted. * [{{cleanupExpiredRemoteLogSegments}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1122] is then called ** Retention size is computed in [{{buildRetentionSizeData}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1296] as the sum of all the segments size regardless of their state so computed size of the topic is 1 (local) + 4 (remote) ** Segment 1 as being the oldest will be dropped. At the second iteration after [{{remote.log.manager.task.interval.ms}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java#L395] (default: 30s), the same will happen. The RLMM will now have 2 x Segment 4 in a {{COPY_SEGMENT_STARTED}} state each with a different {{RemoteLogSegmentId}} and Segment 2 will be dropped. The same will happen to Segment 3 after another iteration. At that point, we now have the RLMM composed of 4 copies of Segment 4 in {{COPY_SEGMENT_STARTED}} state. Segment 4 is marked for deletion increasing the LSO at the same time and causing the UnifiedLog to delete the local and remote data for Segment 4 including its metadata. Under those circumstances Kafka can quickly delete segments that were not meant for deletion causing a data loss. Steps to reproduce the problem: 1. Enable tiered storage {code:bash} mkdir -p /tmp/tieredStorage/kafka-tiered-storage/ cat <<EOF >> config/kraft/server.properties remote.log.storage.system.enable=True remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.LocalTieredStorage remote.log.manager.task.interval.ms=5000 remote.log.metadata.manager.listener.name=PLAINTEXT rlmm.config.remote.log.metadata.topic.replication.factor=1 rsm.config.dir=/tmp/tieredStorage EOF {code} 2. Start a Kafka server with the following classpath. This is needed so we can use test class LocalTieredStorage as an implementation of RemoteStorageManager. {code:bash} export CLASSPATH="$(pwd)/storage/build/libs/{*}:$(pwd)/clients/build/libs/{*}" export KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)" bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties bin/kafka-server-start.sh config/kraft/server.properties {code} 3. In a separate shell, create the topic and produce enough records to fill the remote log {code:bash} bin/kafka-topics.sh --create --topic bug-ts --bootstrap-server localhost:9092 \ --config retention.bytes=1000000000 --config segment.bytes=100000000 \ --config remote.storage.enable=true --config local.retention.bytes=1 bin/kafka-producer-perf-test.sh --topic bug-ts --num-records=1000000 \ --throughput -1 --record-size 1000 \ --producer-props acks=1 batch.size=100000 bootstrap.servers=localhost:9092 {code} 4. In a separate shell, watch the remote log directory content {code:bash} watch -n 1 – s -R /tmp/tieredStorage/kafka-tiered-storage/ {code} 5. Once all logs are sent to the remote storage (when the server logs stops, should take around 2min), stop the Kafka server 6. Edit the file LocalTieredStorage#L309 in {{copyLogSegmentData()}} in order to throw a {{RemoteStorageException}} and disable the ability to store new remote segments. 7. Rebuild Kafka {code:bash} ./gradlew testJar {code} 8. Restart the Kafka server {code:bash} bin/kafka-server-start.sh config/kraft/server.properties {code} 9. Send enough data for one segment rollup {code:bash} bin/kafka-producer-perf-test.sh \ --topic bug-ts --num-records=10000 --throughput -1 --record-size 10000 \ --producer-props acks=1 batch.size=100000 bootstrap.servers=localhost:9092 {code} All data in the remote directory will start getting deleted when we would expect just no more writes to happen to the remote storage. -- This message was sent by Atlassian Jira (v8.20.10#820010)