[ https://issues.apache.org/jira/browse/KAFKA-15525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17771065#comment-17771065 ]
Kamal Chandraprakash commented on KAFKA-15525: ---------------------------------------------- While uploading the segment, the RemoteLogManager sends an event to the internal topic, if it's unavailable then it cannot upload the segment. {{rlmm.config.remote.log.metadata.topic.replication.factor}} is set to 1, can you try increasing the replication-factor to 3 (or) 4? > Segment uploads stop working following a broker failure > ------------------------------------------------------- > > Key: KAFKA-15525 > URL: https://issues.apache.org/jira/browse/KAFKA-15525 > Project: Kafka > Issue Type: Bug > Components: Tiered-Storage > Affects Versions: 3.6.0 > Reporter: Francois Visconte > Priority: Major > > I have a tiered-storage enabled cluster and topic where I continuously > produce and consume to/from a TS-enabled topic on that cluster. > Here are the topic settings I’m using: > {code:java} > local.retention.ms=120000 > remote.storage.enable=true > retention.ms: 10800000 > segment.bytes: 512000000 > {code} > Here are my broker settings: > {code:java} > remote.log.storage.system.enable=true > remote.log.storage.manager.class.path=/opt/kafka/tiered-storage-libs/* > remote.log.storage.manager.class.name=io.aiven.kafka.tieredstorage.RemoteStorageManager > remote.log.metadata.manager.class.name=org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager > remote.log.metadata.manager.listener.name=INTERNAL_PLAINTEXT > remote.log.manager.task.interval.ms=5000 > remote.log.manager.thread.pool.size=10 > remote.log.reader.threads=10 > remote.log.reader.max.pending.tasks=100 > rlmm.config.remote.log.metadata.topic.replication.factor=1 > rlmm.config.remote.log.metadata.topic.num.partitions=50 > rlmm.config.remote.log.metadata.topic.retention.ms=-1 > rsm.config.chunk.cache.class=io.aiven.kafka.tieredstorage.chunkmanager.cache.DiskBasedChunkCache > rsm.config.chunk.cache.path=/data/tiered-storage-cache > rsm.config.chunk.cache.size=1073741824 > rsm.config.metrics.recording.level=DEBUG > rsm.config.storage.aws.credentials.provider.class=software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider > rsm.config.storage.backend.class.name=io.aiven.kafka.tieredstorage.storage.s3.S3Storage > rsm.config.storage.backend.class=io.aiven.kafka.tieredstorage.storage.s3.S3Storage > rsm.config.storage.s3.region=us-east-1 > rsm.config.chunk.size=102400 > rsm.config.storage.s3.multipart.upload.part.size=16777216 {code} > When a broker in the cluster get rotated (replaced or restarted) some brokers > start throwing this error repeatedly: > {code:java} > [RemoteLogManager=10000 partition=yTypIvtBRY2l3sD4-8M7fA:loadgen-3] Error > occurred while copying log segments of partition: > yTypIvtBRY2l3sD4-8M7fA:loadgen-3 > java.util.concurrent.ExecutionException: > org.apache.kafka.common.KafkaException: > java.util.concurrent.TimeoutException: Timed out in catching up with the > expected offset by consumer. > at > java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) > at > java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) > at > kafka.log.remote.RemoteLogManager$RLMTask.copyLogSegment(RemoteLogManager.java:728) > at > kafka.log.remote.RemoteLogManager$RLMTask.copyLogSegmentsToRemote(RemoteLogManager.java:687) > at > kafka.log.remote.RemoteLogManager$RLMTask.run(RemoteLogManager.java:790) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) > at > java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) > at > java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) > at java.base/java.lang.Thread.run(Thread.java:833) > Caused by: org.apache.kafka.common.KafkaException: > java.util.concurrent.TimeoutException: Timed out in catching up with the > expected offset by consumer. > at > org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.lambda$storeRemoteLogMetadata$0(TopicBasedRemoteLogMetadataManager.java:188) > at > java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:718) > at > java.base/java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:483) > at > java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) > at > java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182) > at > java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) > at > java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) > at > java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165) > Caused by: java.util.concurrent.TimeoutException: Timed out in catching up > with the expected offset by consumer. > at > org.apache.kafka.server.log.remote.metadata.storage.ConsumerManager.waitTillConsumptionCatchesUp(ConsumerManager.java:121) > at > org.apache.kafka.server.log.remote.metadata.storage.ConsumerManager.waitTillConsumptionCatchesUp(ConsumerManager.java:89) > at > org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.lambda$storeRemoteLogMetadata$0(TopicBasedRemoteLogMetadataManager.java:186) > ... 7 more{code} > I observed a few times that rolling restarting the cluster solved the issue. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)