Guillaume Mallet created KAFKA-17062:
----------------------------------------

             Summary: RemoteLogManager - RemoteStorageException causes data loss
                 Key: KAFKA-17062
                 URL: https://issues.apache.org/jira/browse/KAFKA-17062
             Project: Kafka
          Issue Type: Bug
          Components: Tiered-Storage
            Reporter: Guillaume Mallet


When Tiered Storage is configured, retention.bytes defines the limit for the 
amount of data stored in the filesystem and in remote storage. However a 
failure while offloading to remote storage can cause segments to be dropped 
before the retention limit is met.

What happens

Assuming a topic configured with {{retention.bytes=4294967296}} (4GB) and a 
{{local.retention.bytes=1073741824}} (1GB, equal to segment.bytes) we would 
expect Kafka to keep up to 3 segments (3GB) in the remote store and 1 segment 
locally (the local segment) and possibly more if the remote storage is offline. 
i.e. segments in the following RemoteLogSegmentStates in the 
RemoteLogMetadataManager (RLMM) :
 * Segment 3 ({{{}COPY_SEGMENT_FINISHED{}}})
 * Segment 2 ({{{}COPY_SEGMENT_FINISHED{}}})
 * Segment 1 ({{{}COPY_SEGMENT_FINISHED{}}})

Let's assume the RLMM starts failing when segment 4 rolls. At the first 
iteration of an RLMTask we will have -
 * 
[{{copyLogSegmentsToRemote}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L773]
 : is called first
 ** RLMM becomes aware of Segment 4 and adds it to the metadata:
 *** Segment 4 ({{{}COPY_SEGMENT_STARTED{}}}),
 *** Segment 3 ({{{}COPY_SEGMENT_FINISHED{}}}),
 *** Segment 2 ({{{}COPY_SEGMENT_FINISHED{}}}),
 *** Segment 1 ({{{}COPY_SEGMENT_FINISHED{}}})
 ** An exception is raised during the copy operation 
([{{copyLogSegmentData}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java#L93]
 in RemoteStorageManager) which is caught with the error message “{{Error 
occurred while copying log segments of partition}}” and no further copy will be 
attempted for the duration of this RLMTask.
 ** At that point the Segment will never move to {{COPY_SEGMENT_FINISHED}} but 
will transition to {{DELETE_SEGMENT_STARTED}} eventually before being cleaned 
up when the associated segment is deleted.
 * 
[{{cleanupExpiredRemoteLogSegments}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1122]
 is then called
 ** Retention size is computed in 
[{{buildRetentionSizeData}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1296]
 as the sum of all the segments size regardless of their state so computed size 
of the topic is 1 (local) + 4 (remote)
 ** Segment 1 as being the oldest will be dropped.

At the second iteration after 
[{{remote.log.manager.task.interval.ms}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java#L395]
 (default: 30s), the same will happen. The RLMM will now have 2 x Segment 4 in 
a {{COPY_SEGMENT_STARTED}} state each with a different {{RemoteLogSegmentId}} 
and Segment 2 will be dropped. The same will happen to Segment 3 after another 
iteration.

At that point, we now have the RLMM composed of 4 copies of Segment 4 in 
{{COPY_SEGMENT_STARTED}} state. Segment 4 is marked for deletion increasing the 
LSO at the same time and causing the UnifiedLog to delete the local and remote 
data for Segment 4 including its metadata.

Under those circumstances Kafka can quickly delete segments that were not meant 
for deletion causing a data loss.

Steps to reproduce the problem:

1. Enable tiered storage

{code:bash}
mkdir -p /tmp/tieredStorage/kafka-tiered-storage/
cat <<EOF >> config/kraft/server.properties
remote.log.storage.system.enable=True
remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.LocalTieredStorage
remote.log.manager.task.interval.ms=5000
remote.log.metadata.manager.listener.name=PLAINTEXT
rlmm.config.remote.log.metadata.topic.replication.factor=1
rsm.config.dir=/tmp/tieredStorage
EOF
{code}

2. Start a Kafka server with the following classpath. This is needed so we can 
use test class LocalTieredStorage as an implementation of RemoteStorageManager.
{code:bash}
export CLASSPATH="$(pwd)/storage/build/libs/{*}:$(pwd)/clients/build/libs/{*}"
export KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c 
config/kraft/server.properties
bin/kafka-server-start.sh config/kraft/server.properties
{code}
3. In a separate shell, create the topic and produce enough records to fill the 
remote log
{code:bash}
bin/kafka-topics.sh --create --topic bug-ts --bootstrap-server localhost:9092 \
   --config retention.bytes=1000000000 --config segment.bytes=100000000 \
   --config remote.storage.enable=true --config local.retention.bytes=1
bin/kafka-producer-perf-test.sh --topic bug-ts --num-records=1000000 \
   --throughput -1 --record-size 1000 \
   --producer-props acks=1 batch.size=100000  bootstrap.servers=localhost:9092
{code}
4. In a separate shell, watch the remote log directory content
{code:bash}
watch -n 1 – s -R /tmp/tieredStorage/kafka-tiered-storage/
{code}
5. Once all logs are sent to the remote storage (when the server logs stops, 
should take around 2min), stop the Kafka server 
6. Edit the file LocalTieredStorage#L309 in {{copyLogSegmentData()}} in order 
to throw a {{RemoteStorageException}} and disable the ability to store new 
remote segments.
7. Rebuild Kafka
{code:bash}
 ./gradlew testJar
{code}
8. Restart the Kafka server
{code:bash}
bin/kafka-server-start.sh config/kraft/server.properties
{code}
9. Send enough data for one segment rollup
{code:bash}
bin/kafka-producer-perf-test.sh \
  --topic bug-ts --num-records=10000 --throughput -1 --record-size 10000 \
  --producer-props acks=1 batch.size=100000 bootstrap.servers=localhost:9092
{code}
All data in the remote directory will start getting deleted when we would 
expect just no more writes to happen to the remote storage.




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to