[ 
https://issues.apache.org/jira/browse/KAFKA-17062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863384#comment-17863384
 ] 

Guillaume Mallet commented on KAFKA-17062:
------------------------------------------

[~satish.duggana] Thanks for the context.

 

> Any retried operation for copying a segment will have a unique id and it will 
>create a new file/object in the remote store. This will make sure we will not 
>reference partially copied data in later reads and these semantics may vary 
>across different remote storages like object stores. The safe way is to 
>generate a new uuid and try copying the failed segment again.

If my understanding is correct, we could drop the expected idempotent of the 
[copy 
operation|https://github.com/apache/kafka/blob/trunk/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java#L82-L86]
 as we're never using it (the {{RemoteLogSegmentMetadata}} will always have a 
different ID.

>  One possible tradeoff that can be explored is do not delete the data sooner 
> but we may end up occupying more than the targeted storage in a few scenarios 
> which will eventually be cleaned up.

You said above that "Any retried operation for copying a segment will have a 
unique id and it will create a new file/object in the remote store.", I fail to 
see the drawbacks of attempting to delete those segments earlier if they are 
expected to be independent from the ones for which the copy succeeded. Could 
you help me understand why we wouldn't want to do that ? 

It would make the tradeoff easier to accept as the few scenarios where we would 
end up using more than targeted would also get resolved faster.

> RemoteLogManager - RemoteStorageException causes data loss
> ----------------------------------------------------------
>
>                 Key: KAFKA-17062
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17062
>             Project: Kafka
>          Issue Type: Bug
>          Components: Tiered-Storage
>    Affects Versions: 3.8.0, 3.7.1, 3.9.0
>            Reporter: Guillaume Mallet
>            Assignee: Guillaume Mallet
>            Priority: Major
>              Labels: tiered-storage
>
> When Tiered Storage is configured, retention.bytes defines the limit for the 
> amount of data stored in the filesystem and in remote storage. However a 
> failure while offloading to remote storage can cause segments to be dropped 
> before the retention limit is met.
> What happens
> Assuming a topic configured with {{retention.bytes=4294967296}} (4GB) and a 
> {{local.retention.bytes=1073741824}} (1GB, equal to segment.bytes) we would 
> expect Kafka to keep up to 3 segments (3GB) in the remote store and 1 segment 
> locally (the local segment) and possibly more if the remote storage is 
> offline. i.e. segments in the following RemoteLogSegmentStates in the 
> RemoteLogMetadataManager (RLMM) :
>  * Segment 3 ({{{}COPY_SEGMENT_FINISHED{}}})
>  * Segment 2 ({{{}COPY_SEGMENT_FINISHED{}}})
>  * Segment 1 ({{{}COPY_SEGMENT_FINISHED{}}})
> Let's assume the RLMM starts failing when segment 4 rolls. At the first 
> iteration of an RLMTask we will have -
>  * 
> [{{copyLogSegmentsToRemote}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L773]
>  : is called first
>  ** RLMM becomes aware of Segment 4 and adds it to the metadata:
>  *** Segment 4 ({{{}COPY_SEGMENT_STARTED{}}}),
>  *** Segment 3 ({{{}COPY_SEGMENT_FINISHED{}}}),
>  *** Segment 2 ({{{}COPY_SEGMENT_FINISHED{}}}),
>  *** Segment 1 ({{{}COPY_SEGMENT_FINISHED{}}})
>  ** An exception is raised during the copy operation 
> ([{{copyLogSegmentData}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java#L93]
>  in RemoteStorageManager) which is caught with the error message “{{Error 
> occurred while copying log segments of partition}}” and no further copy will 
> be attempted for the duration of this RLMTask.
>  ** At that point the Segment will never move to {{COPY_SEGMENT_FINISHED}} 
> but will transition to {{DELETE_SEGMENT_STARTED}} eventually before being 
> cleaned up when the associated segment is deleted.
>  * 
> [{{cleanupExpiredRemoteLogSegments}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1122]
>  is then called
>  ** Retention size is computed in 
> [{{buildRetentionSizeData}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1296]
>  as the sum of all the segments size regardless of their state so computed 
> size of the topic is 1 (local) + 4 (remote)
>  ** Segment 1 as being the oldest will be dropped.
> At the second iteration after 
> [{{remote.log.manager.task.interval.ms}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java#L395]
>  (default: 30s), the same will happen. The RLMM will now have 2 x Segment 4 
> in a {{COPY_SEGMENT_STARTED}} state each with a different 
> {{RemoteLogSegmentId}} and Segment 2 will be dropped. The same will happen to 
> Segment 3 after another iteration.
> At that point, we now have the RLMM composed of 4 copies of Segment 4 in 
> {{COPY_SEGMENT_STARTED}} state. Segment 4 is marked for deletion increasing 
> the LSO at the same time and causing the UnifiedLog to delete the local and 
> remote data for Segment 4 including its metadata.
> Under those circumstances Kafka can quickly delete segments that were not 
> meant for deletion causing a data loss.
> Steps to reproduce the problem:
> 1. Enable tiered storage
> {code:bash}
> mkdir -p /tmp/tieredStorage/kafka-tiered-storage/
> cat <<EOF >> config/kraft/server.properties
> remote.log.storage.system.enable=True
> remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.LocalTieredStorage
> remote.log.manager.task.interval.ms=5000
> remote.log.metadata.manager.listener.name=PLAINTEXT
> rlmm.config.remote.log.metadata.topic.replication.factor=1
> rsm.config.dir=/tmp/tieredStorage
> EOF
> {code}
> 2. Start a Kafka server with the following classpath. This is needed so we 
> can use test class LocalTieredStorage as an implementation of 
> RemoteStorageManager.
> {code:bash}
> export CLASSPATH="$(pwd)/storage/build/libs/{*}:$(pwd)/clients/build/libs/{*}"
> export KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
> bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c 
> config/kraft/server.properties
> bin/kafka-server-start.sh config/kraft/server.properties
> {code}
> 3. In a separate shell, create the topic and produce enough records to fill 
> the remote log
> {code:bash}
> bin/kafka-topics.sh --create --topic bug-ts --bootstrap-server localhost:9092 
> \
>    --config retention.bytes=1000000000 --config segment.bytes=100000000 \
>    --config remote.storage.enable=true --config local.retention.bytes=1
> bin/kafka-producer-perf-test.sh --topic bug-ts --num-records=1000000 \
>    --throughput -1 --record-size 1000 \
>    --producer-props acks=1 batch.size=100000  bootstrap.servers=localhost:9092
> {code}
> 4. In a separate shell, watch the remote log directory content
> {code:bash}
> watch -n 1 – s -R /tmp/tieredStorage/kafka-tiered-storage/
> {code}
> 5. Once all logs are sent to the remote storage (when the server logs stops, 
> should take around 2min), stop the Kafka server 
> 6. Edit the file LocalTieredStorage#L309 in {{copyLogSegmentData()}} in order 
> to throw a {{RemoteStorageException}} and disable the ability to store new 
> remote segments.
> 7. Rebuild Kafka
> {code:bash}
>  ./gradlew testJar
> {code}
> 8. Restart the Kafka server
> {code:bash}
> bin/kafka-server-start.sh config/kraft/server.properties
> {code}
> 9. Send enough data for one segment rollup
> {code:bash}
> bin/kafka-producer-perf-test.sh \
>   --topic bug-ts --num-records=10000 --throughput -1 --record-size 10000 \
>   --producer-props acks=1 batch.size=100000 bootstrap.servers=localhost:9092
> {code}
> All data in the remote directory will start getting deleted when we would 
> expect just no more writes to happen to the remote storage.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to