Francis created FLINK-38621:
-------------------------------

             Summary: AsyncKeyedProcessOperator fails to restore with a 
java.io.FileNotFoundException: No such file or directory exception in ForSt
                 Key: FLINK-38621
                 URL: https://issues.apache.org/jira/browse/FLINK-38621
             Project: Flink
          Issue Type: Bug
    Affects Versions: 2.1.0
            Reporter: Francis


I'm currently seeing task manager failures with a root exception of 
{code:java}
java.io.FileNotFoundException: No such file or directory{code}
This issue seems related to https://issues.apache.org/jira/browse/FLINK-38324 
and https://issues.apache.org/jira/browse/FLINK-38433 however in my case it 
looks to be impacting an AsyncKeyedCoProcessOperator and not a 
AsyncStreamFlatMap function as mentioned in the other ticket.

However, the symptoms are the same. My job is in a restart loop where it is 
failing to restore from a checkpoint attempt that does not exist. I've included 
the full stack trace below.

 

>From a configuration standpoint I am using Flink 2.1.0 and version 1.13 of the 
>K8s operator. I am using the Hadoop S3 filesystem with a state recovery mode 
>of CLAIM. There are no lifecycle rules on my S3 bucket that would result in 
>files being deleted externally. 

 

Here are the potentially relevant parts of my configuration:

 
{code:java}
execution:
  state-recovery.claim-mode: CLAIM
  checkpointing:
    dir: "s3a://{{ .Values.state.bucketName }}/checkpoints"
    interval: {{ .Values.checkpoint.interval }}
    timeout: {{ .Values.checkpoint.timeout }}
    incremental: true
    mode: {{ .Values.checkpoint.mode }}
    max-concurrent-checkpoints: 1
    snapshot-compression: true
    savepoint-dir: "s3a://{{ .Values.state.bucketName }}/savepoints"
    externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
    min-pause: {{ .Values.checkpoint.interval }}
    num-retained: 20
    tolerable-failed-checkpoints: 2147483647 

job:
  savepointRedeployNonce: 8
  state: running
  upgradeMode: savepoint
  jarURI: local:///opt/flink/usrlib/flink-job.jar
  entryClass: {{ .Values.entryClass }}
  parallelism: {{ .Values.parallelism }}
  allowNonRestoredState: false

state:
  backend:
    type: forst
    forst:
      executor.read-io-parallelism: 2
      sync.enforce-local: false 
      memory.managed: false 
      memory.fixed-per-slot: 1g
      local-dir: {{ .Values.dataMount }}/forst-local
      cache.dir: {{ .Values.dataMount }}/forst-cache
      cache:
        size-based-limit: {{ .Values.state.diskCacheLimit }}

high-availability:
  type: kubernetes
  storageDir: "s3a://{{ .Values.state.bucketName }}/ha"
  kubernetes:
    cluster-id: {{ .Values.global.serviceName }}
    namespace: {{ .Values.global.namespace }}

kubernetes:
  taskmanager.memory.limit-factor: 5
  taskmanager.cpu.limit-factor: 5
  rest-service:
    exposed.type: ClusterIP
  operator:
    job.upgrade.last-state.max.allowed.checkpoint.age: 24h
    savepoint:
      format.type: NATIVE
      history:
        max.count: {{ .Values.savepoint.maxCountToKeep }}
    periodic:
      savepoint.interval: {{ .Values.savepoint.interval }}{code}
 

 

And the stack trace:

 
{code:java}
op_AsyncKeyedCoProcessOperator_84984c1fa2897a7a40e728e4e87e1da9__19_20__attempt_2/db/91e56e4d-31ae-4ee4-ab17-704a00a45627
 [5343318 bytes], localPath='002991.sst'}], 
metaStateHandle=ByteStreamStateHandle{handleName='s3a://conversation-customer-thread-events-state-production/checkpoints/68e582d9490de1ef330c9d03ff6559d9/chk-12/da15b537-64d4-4651-800d-2ce8d8d4f5c9',
 dataBytes=1310}, stateHandleId=49cb0a39-a8a1-49fd-9e77-dd35dd3a3713}]).
[2025-11-04  09:44:21]    Exception in thread "processor-thread-conversation-id 
(19/20)#860" java.io.FileNotFoundException: No such file or directory: 
s3a://conversation-customer-thread-events-state-production/checkpoints/68e582d9490de1ef330c9d03ff6559d9/shared/op_AsyncKeyedProcessOperator_08ba072f7d4350d148d1fdf71e53ea7c__19_20__attempt_2/db/3acf0699-40c1-48ab-a42a-689829409fbf
[2025-11-04  09:44:21]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3866)
[2025-11-04  09:44:21]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)
[2025-11-04  09:44:21]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getFileStatus$24(S3AFileSystem.java:3556)
[2025-11-04  09:44:21]        at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
[2025-11-04  09:44:21]        at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
[2025-11-04  09:44:21]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
[2025-11-04  09:44:21]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
[2025-11-04  09:44:21]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:3554)
[2025-11-04  09:44:21]        at 
org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:88)
[2025-11-04  09:44:21]        at 
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.getFileStatus(PluginFileSystemFactory.java:106)
[2025-11-04  09:44:21]        at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.getFileStatus(SafetyNetWrapperFileSystem.java:78)
[2025-11-04  09:44:21]        at 
org.apache.flink.state.forst.fs.ForStFlinkFileSystem.getFileStatus(ForStFlinkFileSystem.java:332)
[2025-11-04  09:44:21]        at 
org.apache.flink.state.forst.fs.ForStFlinkFileSystem.listStatus(ForStFlinkFileSystem.java:368)
[2025-11-04  09:44:21]        at 
org.apache.flink.state.forst.fs.StringifiedForStFileSystem.listStatus(StringifiedForStFileSystem.java:52)
[2025-11-04  09:44:21]        at org.forstdb.RocksDB.open(Native Method)
[2025-11-04  09:44:21]        at org.forstdb.RocksDB.open(RocksDB.java:318)
[2025-11-04  09:44:21]        at 
org.apache.flink.state.forst.ForStOperationUtils.openDB(ForStOperationUtils.java:85)
[2025-11-04  09:44:21]        at 
org.apache.flink.state.forst.restore.ForStHandle.loadDb(ForStHandle.java:140)
[2025-11-04  09:44:21]        at 
org.apache.flink.state.forst.restore.ForStHandle.openDB(ForStHandle.java:128)
[2025-11-04  09:44:21]        at 
org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.restoreBaseDBFromMainHandle(ForStIncrementalRestoreOperation.java:379)
[2025-11-04  09:44:21]        at 
org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.initBaseDBFromSingleStateHandle(ForStIncrementalRestoreOperation.java:301)
[2025-11-04  09:44:21]        at 
org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.innerRestore(ForStIncrementalRestoreOperation.java:278)
[2025-11-04  09:44:21]        at 
org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.lambda$restore$1(ForStIncrementalRestoreOperation.java:237)
[2025-11-04  09:44:21]        at 
org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.runAndReportDuration(ForStIncrementalRestoreOperation.java:438)
[2025-11-04  09:44:21]        at 
org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.restore(ForStIncrementalRestoreOperation.java:237)
[2025-11-04  09:44:21]        at 
org.apache.flink.state.forst.ForStKeyedStateBackendBuilder.build(ForStKeyedStateBackendBuilder.java:261)
[2025-11-04  09:44:21]        at 
org.apache.flink.state.forst.ForStStateBackend.createAsyncKeyedStateBackend(ForStStateBackend.java:474)
[2025-11-04  09:44:21]        at 
org.apache.flink.state.forst.ForStStateBackend.createAsyncKeyedStateBackend(ForStStateBackend.java:98)
[2025-11-04  09:44:21]        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3(StreamTaskStateInitializerImpl.java:475)
[2025-11-04  09:44:21]        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:173)
[2025-11-04  09:44:21]        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
[2025-11-04  09:44:21]        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:486)
[2025-11-04  09:44:21]        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
[2025-11-04  09:44:21]        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:280)
[2025-11-04  09:44:21]        at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
[2025-11-04  09:44:21]        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
[2025-11-04  09:44:21]        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
[2025-11-04  09:44:21]        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
[2025-11-04  09:44:21]        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
[2025-11-04  09:44:21]        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
[2025-11-04  09:44:21]        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:963)
[2025-11-04  09:44:21]        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
[2025-11-04  09:44:21]        at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756)
[2025-11-04  09:44:21]        at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
[2025-11-04  09:44:21]        at java.base/java.lang.Thread.run(Unknown Source)
[2025-11-04  09:44:21]    RocksDBExceptionJni::ThrowNew/StatusJni - Error: 
unexpected exception!
[2025-11-04  09:44:22]    2025-11-04 08:44:22,017 INFO  
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer [] - Legacy 
kryo serializer scala extensions are not available.
[2025-11-04  09:44:22]    2025-11-04 08:44:22,017 INFO  
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer [] - Kryo 
serializer scala extensions are not available.
[2025-11-04  09:44:22]    Exception in thread 
"processor-thread-conversation-customer -> 
kafka-sink-thread-conversation-customer: Writer -> 
kafka-sink-thread-conversation-customer: Committer (19/20)#860" 
java.io.FileNotFoundException: No such file or directory: 
s3a://conversation-customer-thread-events-state-production/checkpoints/68e582d9490de1ef330c9d03ff6559d9/shared/op_AsyncKeyedCoProcessOperator_84984c1fa2897a7a40e728e4e87e1da9__19_20__attempt_2/db/ad742554-d9d8-4a0f-830d-61da66510143
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3866)
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getFileStatus$24(S3AFileSystem.java:3556)
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:3554)
[2025-11-04  09:44:22]        at 
org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:88)
[2025-11-04  09:44:22]        at 
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.getFileStatus(PluginFileSystemFactory.java:106)
[2025-11-04  09:44:22]        at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.getFileStatus(SafetyNetWrapperFileSystem.java:78)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.fs.ForStFlinkFileSystem.getFileStatus(ForStFlinkFileSystem.java:332)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.fs.ForStFlinkFileSystem.listStatus(ForStFlinkFileSystem.java:368)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.fs.StringifiedForStFileSystem.listStatus(StringifiedForStFileSystem.java:52)
[2025-11-04  09:44:22]        at org.forstdb.RocksDB.open(Native Method)
[2025-11-04  09:44:22]        at org.forstdb.RocksDB.open(RocksDB.java:318)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.ForStOperationUtils.openDB(ForStOperationUtils.java:85)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.restore.ForStHandle.loadDb(ForStHandle.java:140)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.restore.ForStHandle.openDB(ForStHandle.java:128)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.restoreBaseDBFromMainHandle(ForStIncrementalRestoreOperation.java:379)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.initBaseDBFromSingleStateHandle(ForStIncrementalRestoreOperation.java:301)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.innerRestore(ForStIncrementalRestoreOperation.java:278)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.lambda$restore$1(ForStIncrementalRestoreOperation.java:237)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.runAndReportDuration(ForStIncrementalRestoreOperation.java:438)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.restore(ForStIncrementalRestoreOperation.java:237)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.ForStKeyedStateBackendBuilder.build(ForStKeyedStateBackendBuilder.java:261)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.ForStStateBackend.createAsyncKeyedStateBackend(ForStStateBackend.java:474)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.ForStStateBackend.createAsyncKeyedStateBackend(ForStStateBackend.java:98)
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3(StreamTaskStateInitializerImpl.java:475)
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:173)
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:486)
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:280)
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
[2025-11-04  09:44:22]        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:963)
[2025-11-04  09:44:22]        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
[2025-11-04  09:44:22]        at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756)
[2025-11-04  09:44:22]        at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
[2025-11-04  09:44:22]        at java.base/java.lang.Thread.run(Unknown Source)
[2025-11-04  09:44:22]    Exception in thread 
"processor-thread-conversation-customer -> 
kafka-sink-thread-conversation-customer: Writer -> 
kafka-sink-thread-conversation-customer: Committer (19/20)#860" 
java.io.FileNotFoundException: No such file or directory: 
s3a://conversation-customer-thread-events-state-production/checkpoints/68e582d9490de1ef330c9d03ff6559d9/shared/op_AsyncKeyedCoProcessOperator_84984c1fa2897a7a40e728e4e87e1da9__19_20__attempt_2/db/ad742554-d9d8-4a0f-830d-61da66510143
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3866)
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getFileStatus$24(S3AFileSystem.java:3556)
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:3554)
[2025-11-04  09:44:22]        at 
org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:88)
[2025-11-04  09:44:22]        at 
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.getFileStatus(PluginFileSystemFactory.java:106)
[2025-11-04  09:44:22]        at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.getFileStatus(SafetyNetWrapperFileSystem.java:78)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.fs.ForStFlinkFileSystem.getFileStatus(ForStFlinkFileSystem.java:332)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.fs.ForStFlinkFileSystem.listStatus(ForStFlinkFileSystem.java:368)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.fs.StringifiedForStFileSystem.listStatus(StringifiedForStFileSystem.java:52)
[2025-11-04  09:44:22]        at org.forstdb.RocksDB.open(Native Method)
[2025-11-04  09:44:22]        at org.forstdb.RocksDB.open(RocksDB.java:318)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.ForStOperationUtils.openDB(ForStOperationUtils.java:85)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.restore.ForStHandle.loadDb(ForStHandle.java:140)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.restore.ForStHandle.openDB(ForStHandle.java:128)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.restoreBaseDBFromMainHandle(ForStIncrementalRestoreOperation.java:379)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.initBaseDBFromSingleStateHandle(ForStIncrementalRestoreOperation.java:301)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.innerRestore(ForStIncrementalRestoreOperation.java:278)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.lambda$restore$1(ForStIncrementalRestoreOperation.java:237)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.runAndReportDuration(ForStIncrementalRestoreOperation.java:438)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.restore(ForStIncrementalRestoreOperation.java:237)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.ForStKeyedStateBackendBuilder.build(ForStKeyedStateBackendBuilder.java:261)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.ForStStateBackend.createAsyncKeyedStateBackend(ForStStateBackend.java:474)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.ForStStateBackend.createAsyncKeyedStateBackend(ForStStateBackend.java:98)
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3(StreamTaskStateInitializerImpl.java:475)
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:173)
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:486)
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:280)
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
[2025-11-04  09:44:22]        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:963)
[2025-11-04  09:44:22]        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
[2025-11-04  09:44:22]        at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756)
[2025-11-04  09:44:22]        at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
[2025-11-04  09:44:22]        at java.base/java.lang.Thread.run(Unknown Source)
[2025-11-04  09:44:22]    2025-11-04 08:44:22,059 INFO  
org.apache.flink.runtime.taskmanager.Task                    [] - Attempting to 
cancel task Source: kafka-source-thread_events_v1 (19/20)#860 
(56f77e2196fe7f3ac0662857735727d9_bd9e548cb0c865193602f71910cbd8d6_18_860).
[2025-11-04  09:44:22]    2025-11-04 08:44:22,059 INFO  
org.apache.flink.runtime.taskmanager.Task                    [] - Source: 
kafka-source-thread_events_v1 (19/20)#860 
(56f77e2196fe7f3ac0662857735727d9_bd9e548cb0c865193602f71910cbd8d6_18_860) 
switched from RUNNING to CANCELING.
[2025-11-04  09:44:22]    2025-11-04 08:44:22,059 INFO  
org.apache.flink.runtime.taskmanager.Task                    [] - Triggering 
cancellation of task code Source: kafka-source-thread_events_v1 (19/20)#860 
(56f77e2196fe7f3ac0662857735727d9_bd9e548cb0c865193602f71910cbd8d6_18_860).
[2025-11-04  09:44:22]    2025-11-04 08:44:22,060 INFO  
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing 
Source Reader.
[2025-11-04  09:44:22]    2025-11-04 08:44:22,060 INFO  
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Shutting down split fetcher 0
[2025-11-04  09:44:22]    2025-11-04 08:44:22,060 INFO  
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [] - [Consumer 
clientId=conversation-customer-thread-events-v1-consumer-18, 
groupId=conversation-customer-thread-events-v1-consumer] Resetting generation 
and member id due to: consumer pro-actively leaving the group
[2025-11-04  09:44:22]    2025-11-04 08:44:22,060 INFO  
org.apache.flink.runtime.taskmanager.Task                    [] - Attempting to 
cancel task Source: kafka-source-conversation_customer_v1 (19/20)#860 
(56f77e2196fe7f3ac0662857735727d9_01393662c0a19669846621424653c56f_18_860).
[2025-11-04  09:44:22]    2025-11-04 08:44:22,061 INFO  
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [] - [Consumer 
clientId=conversation-customer-thread-events-v1-consumer-18, 
groupId=conversation-customer-thread-events-v1-consumer] Request joining group 
due to: consumer pro-actively leaving the group
[2025-11-04  09:44:22]    2025-11-04 08:44:22,061 INFO  
org.apache.flink.runtime.taskmanager.Task                    [] - Source: 
kafka-source-conversation_customer_v1 (19/20)#860 
(56f77e2196fe7f3ac0662857735727d9_01393662c0a19669846621424653c56f_18_860) 
switched from RUNNING to CANCELING.
[2025-11-04  09:44:22]    2025-11-04 08:44:22,061 INFO  
org.apache.flink.runtime.taskmanager.Task                    [] - Triggering 
cancellation of task code Source: kafka-source-conversation_customer_v1 
(19/20)#860 
(56f77e2196fe7f3ac0662857735727d9_01393662c0a19669846621424653c56f_18_860).
[2025-11-04  09:44:22]    2025-11-04 08:44:22,062 INFO  
org.apache.flink.runtime.taskmanager.Task                    [] - Attempting to 
cancel task processor-thread-conversation-id (19/20)#860 
(56f77e2196fe7f3ac0662857735727d9_08ba072f7d4350d148d1fdf71e53ea7c_18_860).
[2025-11-04  09:44:22]    2025-11-04 08:44:22,062 INFO  
org.apache.flink.runtime.taskmanager.Task                    [] - 
processor-thread-conversation-id (19/20)#860 
(56f77e2196fe7f3ac0662857735727d9_08ba072f7d4350d148d1fdf71e53ea7c_18_860) 
switched from INITIALIZING to CANCELING.
[2025-11-04  09:44:22]    2025-11-04 08:44:22,062 INFO  
org.apache.flink.runtime.taskmanager.Task                    [] - Triggering 
cancellation of task code processor-thread-conversation-id (19/20)#860 
(56f77e2196fe7f3ac0662857735727d9_08ba072f7d4350d148d1fdf71e53ea7c_18_860).
[2025-11-04  09:44:22]    2025-11-04 08:44:22,063 INFO  
org.apache.kafka.common.metrics.Metrics                      [] - Metrics 
scheduler closed
[2025-11-04  09:44:22]    2025-11-04 08:44:22,063 INFO  
org.apache.kafka.common.metrics.Metrics                      [] - Closing 
reporter org.apache.kafka.common.metrics.JmxReporter
[2025-11-04  09:44:22]    2025-11-04 08:44:22,063 INFO  
org.apache.kafka.common.metrics.Metrics                      [] - Closing 
reporter org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter
[2025-11-04  09:44:22]    2025-11-04 08:44:22,063 INFO  
org.apache.kafka.common.metrics.Metrics                      [] - Metrics 
reporters closed
[2025-11-04  09:44:22]    2025-11-04 08:44:22,064 INFO  
org.apache.flink.runtime.taskmanager.Task                    [] - Attempting to 
cancel task processor-thread-conversation-customer -> 
kafka-sink-thread-conversation-customer: Writer -> 
kafka-sink-thread-conversation-customer: Committer (19/20)#860 
(56f77e2196fe7f3ac0662857735727d9_84984c1fa2897a7a40e728e4e87e1da9_18_860).
[2025-11-04  09:44:22]    2025-11-04 08:44:22,064 INFO  
org.apache.flink.runtime.taskmanager.Task                    [] - 
processor-thread-conversation-customer -> 
kafka-sink-thread-conversation-customer: Writer -> 
kafka-sink-thread-conversation-customer: Committer (19/20)#860 
(56f77e2196fe7f3ac0662857735727d9_84984c1fa2897a7a40e728e4e87e1da9_18_860) 
switched from INITIALIZING to CANCELING.
[2025-11-04  09:44:22]    2025-11-04 08:44:22,064 INFO  
org.apache.flink.runtime.taskmanager.Task                    [] - Triggering 
cancellation of task code processor-thread-conversation-customer -> 
kafka-sink-thread-conversation-customer: Writer -> 
kafka-sink-thread-conversation-customer: Committer (19/20)#860 
(56f77e2196fe7f3ac0662857735727d9_84984c1fa2897a7a40e728e4e87e1da9_18_860).
[2025-11-04  09:44:22]    2025-11-04 08:44:22,065 INFO  
org.apache.kafka.common.utils.AppInfoParser                  [] - App info 
kafka.consumer for conversation-customer-thread-events-v1-consumer-18 
unregistered
[2025-11-04  09:44:22]    2025-11-04 08:44:22,065 INFO  
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split 
fetcher 0 exited.
[2025-11-04  09:44:22]    2025-11-04 08:44:22,065 INFO  
org.apache.flink.runtime.taskmanager.Task                    [] - Source: 
kafka-source-thread_events_v1 (19/20)#860 
(56f77e2196fe7f3ac0662857735727d9_bd9e548cb0c865193602f71910cbd8d6_18_860) 
switched from CANCELING to CANCELED.
[2025-11-04  09:44:22]    2025-11-04 08:44:22,065 INFO  
org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task 
resources for Source: kafka-source-thread_events_v1 (19/20)#860 
(56f77e2196fe7f3ac0662857735727d9_bd9e548cb0c865193602f71910cbd8d6_18_860).
[2025-11-04  09:44:22]    2025-11-04 08:44:22,065 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - 
Un-registering task and sending final execution state CANCELED to JobManager 
for task Source: kafka-source-thread_events_v1 (19/20)#860 
56f77e2196fe7f3ac0662857735727d9_bd9e548cb0c865193602f71910cbd8d6_18_860.
[2025-11-04  09:44:22]    Exception in thread 
"processor-thread-conversation-customer -> 
kafka-sink-thread-conversation-customer: Writer -> 
kafka-sink-thread-conversation-customer: Committer (19/20)#860" 
java.io.InterruptedIOException: getFileStatus on 
s3a://conversation-customer-thread-events-state-production/checkpoints/68e582d9490de1ef330c9d03ff6559d9/shared/op_AsyncKeyedCoProcessOperator_84984c1fa2897a7a40e728e4e87e1da9__19_20__attempt_860/db:
 com.amazonaws.AbortedException: 
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AUtils.translateInterruptedException(S3AUtils.java:395)
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:201)
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:175)
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3861)
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$exists$34(S3AFileSystem.java:4703)
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.exists(S3AFileSystem.java:4701)
[2025-11-04  09:44:22]        at 
org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.exists(HadoopFileSystem.java:168)
[2025-11-04  09:44:22]        at 
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.exists(PluginFileSystemFactory.java:149)
[2025-11-04  09:44:22]        at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.exists(SafetyNetWrapperFileSystem.java:117)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.fs.ForStFlinkFileSystem.exists(ForStFlinkFileSystem.java:298)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.fs.StringifiedForStFileSystem.exists(StringifiedForStFileSystem.java:44)
[2025-11-04  09:44:22]        at org.forstdb.RocksDB.open(Native Method)
[2025-11-04  09:44:22]        at org.forstdb.RocksDB.open(RocksDB.java:318)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.ForStOperationUtils.openDB(ForStOperationUtils.java:85)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.restore.ForStHandle.loadDb(ForStHandle.java:140)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.restore.ForStHandle.openDB(ForStHandle.java:128)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.restoreBaseDBFromMainHandle(ForStIncrementalRestoreOperation.java:379)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.initBaseDBFromSingleStateHandle(ForStIncrementalRestoreOperation.java:301)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.innerRestore(ForStIncrementalRestoreOperation.java:278)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.lambda$restore$1(ForStIncrementalRestoreOperation.java:237)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.runAndReportDuration(ForStIncrementalRestoreOperation.java:438)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.restore(ForStIncrementalRestoreOperation.java:237)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.ForStKeyedStateBackendBuilder.build(ForStKeyedStateBackendBuilder.java:261)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.ForStStateBackend.createAsyncKeyedStateBackend(ForStStateBackend.java:474)
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.ForStStateBackend.createAsyncKeyedStateBackend(ForStStateBackend.java:98)
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3(StreamTaskStateInitializerImpl.java:475)
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:173)
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:486)
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:280)
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
[2025-11-04  09:44:22]        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:963)
[2025-11-04  09:44:22]        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
[2025-11-04  09:44:22]        at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756)
[2025-11-04  09:44:22]        at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
[2025-11-04  09:44:22]        at java.base/java.lang.Thread.run(Unknown Source)
[2025-11-04  09:44:22]    Caused by: com.amazonaws.AbortedException: 
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleInterruptedException(AmazonHttpClient.java:906)
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:781)
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:735)
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:717)
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:581)
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:559)
[2025-11-04  09:44:22]        at 
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5593)
[2025-11-04  09:44:22]        at 
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5540)
[2025-11-04  09:44:22]        at 
com.amazonaws.services.s3.AmazonS3Client.listObjectsV2(AmazonS3Client.java:1007)
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$listObjects$11(S3AFileSystem.java:2595)
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:414)
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:377)
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.listObjects(S3AFileSystem.java:2586)
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3832)
[2025-11-04  09:44:22]        ... 43 more
[2025-11-04  09:44:22]    Caused by: 
com.amazonaws.http.timers.client.SdkInterruptedException
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.checkInterrupted(AmazonHttpClient.java:961)
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.checkInterrupted(AmazonHttpClient.java:947)
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1141)
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:838)
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:805)
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:779)
[2025-11-04  09:44:22]        ... 56 more
[2025-11-04  09:44:22]    RocksDBExceptionJni::ThrowNew/StatusJni - Error: 
unexpected exception!
[2025-11-04  09:44:22]    2025-11-04 08:44:22,069 WARN  
org.apache.flink.state.forst.ForStKeyedStateBackendBuilder   [] - Failed to 
delete ForSt local base path 
/flink-data/forst-local/68e582d9490de1ef330c9d03ff6559d9/op_AsyncKeyedCoProcessOperator_84984c1fa2897a7a40e728e4e87e1da9__19_20__attempt_860,
 remote base path 
s3a://conversation-customer-thread-events-state-production/checkpoints/68e582d9490de1ef330c9d03ff6559d9/shared/op_AsyncKeyedCoProcessOperator_84984c1fa2897a7a40e728e4e87e1da9__19_20__attempt_860.
[2025-11-04  09:44:22]    java.io.InterruptedIOException: getFileStatus on 
s3a://conversation-customer-thread-events-state-production/checkpoints/68e582d9490de1ef330c9d03ff6559d9/shared/op_AsyncKeyedCoProcessOperator_84984c1fa2897a7a40e728e4e87e1da9__19_20__attempt_860:
 com.amazonaws.AbortedException: 
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AUtils.translateInterruptedException(S3AUtils.java:395)
 ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:201) ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:175) ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3799) 
~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)
 ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$exists$34(S3AFileSystem.java:4703)
 ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
 ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
 ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
 ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
 ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.exists(S3AFileSystem.java:4701) ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.exists(HadoopFileSystem.java:168)
 ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.exists(PluginFileSystemFactory.java:149)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.exists(SafetyNetWrapperFileSystem.java:117)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.ForStResourceContainer.clearDirectories(ForStResourceContainer.java:446)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.ForStResourceContainer.forceClearRemoteDirectories(ForStResourceContainer.java:440)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.ForStKeyedStateBackendBuilder.build(ForStKeyedStateBackendBuilder.java:303)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.ForStStateBackend.createAsyncKeyedStateBackend(ForStStateBackend.java:474)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.ForStStateBackend.createAsyncKeyedStateBackend(ForStStateBackend.java:98)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3(StreamTaskStateInitializerImpl.java:475)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:173)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:486)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:280)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:963)
 [flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) 
[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756) 
[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:568) 
[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at java.base/java.lang.Thread.run(Unknown Source) 
[?:?]
[2025-11-04  09:44:22]    Caused by: com.amazonaws.AbortedException: 
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleInterruptedException(AmazonHttpClient.java:906)
 ~[?:?]
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:781)
 ~[?:?]
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:735)
 ~[?:?]
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:717)
 ~[?:?]
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:581) ~[?:?]
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:559) ~[?:?]
[2025-11-04  09:44:22]        at 
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5593) ~[?:?]
[2025-11-04  09:44:22]        at 
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5540) ~[?:?]
[2025-11-04  09:44:22]        at 
com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1422)
 ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getObjectMetadata$10(S3AFileSystem.java:2545)
 ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:414) ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:377) ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2533)
 ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2513)
 ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3776) 
~[?:?]
[2025-11-04  09:44:22]        ... 32 more
[2025-11-04  09:44:22]    Caused by: 
com.amazonaws.http.timers.client.SdkInterruptedException
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.checkInterrupted(AmazonHttpClient.java:961)
 ~[?:?]
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.checkInterrupted(AmazonHttpClient.java:947)
 ~[?:?]
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1141)
 ~[?:?]
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:838)
 ~[?:?]
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:805)
 ~[?:?]
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:779)
 ~[?:?]
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:735)
 ~[?:?]
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:717)
 ~[?:?]
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:581) ~[?:?]
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:559) ~[?:?]
[2025-11-04  09:44:22]        at 
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5593) ~[?:?]
[2025-11-04  09:44:22]        at 
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5540) ~[?:?]
[2025-11-04  09:44:22]        at 
com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1422)
 ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getObjectMetadata$10(S3AFileSystem.java:2545)
 ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:414) ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:377) ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2533)
 ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2513)
 ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3776) 
~[?:?]
[2025-11-04  09:44:22]        ... 32 more
[2025-11-04  09:44:22]    2025-11-04 08:44:22,070 ERROR 
org.apache.flink.state.forst.ForStKeyedStateBackendBuilder   [] - Caught 
unexpected exception.
[2025-11-04  09:44:22]    java.io.InterruptedIOException: getFileStatus on 
s3a://conversation-customer-thread-events-state-production/checkpoints/68e582d9490de1ef330c9d03ff6559d9/shared/op_AsyncKeyedCoProcessOperator_84984c1fa2897a7a40e728e4e87e1da9__19_20__attempt_860/db:
 com.amazonaws.AbortedException: 
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AUtils.translateInterruptedException(S3AUtils.java:395)
 ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:201) ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:175) ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3861) 
~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)
 ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$exists$34(S3AFileSystem.java:4703)
 ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
 ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
 ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
 ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
 ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.exists(S3AFileSystem.java:4701) ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.exists(HadoopFileSystem.java:168)
 ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.exists(PluginFileSystemFactory.java:149)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.exists(SafetyNetWrapperFileSystem.java:117)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.fs.ForStFlinkFileSystem.exists(ForStFlinkFileSystem.java:298)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.fs.StringifiedForStFileSystem.exists(StringifiedForStFileSystem.java:44)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at org.forstdb.RocksDB.open(Native Method) 
~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at org.forstdb.RocksDB.open(RocksDB.java:318) 
~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.ForStOperationUtils.openDB(ForStOperationUtils.java:85)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.restore.ForStHandle.loadDb(ForStHandle.java:140) 
~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.restore.ForStHandle.openDB(ForStHandle.java:128) 
~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.restoreBaseDBFromMainHandle(ForStIncrementalRestoreOperation.java:379)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.initBaseDBFromSingleStateHandle(ForStIncrementalRestoreOperation.java:301)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.innerRestore(ForStIncrementalRestoreOperation.java:278)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.lambda$restore$1(ForStIncrementalRestoreOperation.java:237)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.runAndReportDuration(ForStIncrementalRestoreOperation.java:438)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation.restore(ForStIncrementalRestoreOperation.java:237)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.ForStKeyedStateBackendBuilder.build(ForStKeyedStateBackendBuilder.java:261)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.ForStStateBackend.createAsyncKeyedStateBackend(ForStStateBackend.java:474)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.state.forst.ForStStateBackend.createAsyncKeyedStateBackend(ForStStateBackend.java:98)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3(StreamTaskStateInitializerImpl.java:475)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:173)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:486)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:280)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
 ~[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:963)
 [flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) 
[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756) 
[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:568) 
[flink-dist-2.1.0.jar:2.1.0]
[2025-11-04  09:44:22]        at java.base/java.lang.Thread.run(Unknown Source) 
[?:?]
[2025-11-04  09:44:22]    Caused by: com.amazonaws.AbortedException: 
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleInterruptedException(AmazonHttpClient.java:906)
 ~[?:?]
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:781)
 ~[?:?]
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:735)
 ~[?:?]
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:717)
 ~[?:?]
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:581) ~[?:?]
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:559) ~[?:?]
[2025-11-04  09:44:22]        at 
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5593) ~[?:?]
[2025-11-04  09:44:22]        at 
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5540) ~[?:?]
[2025-11-04  09:44:22]        at 
com.amazonaws.services.s3.AmazonS3Client.listObjectsV2(AmazonS3Client.java:1007)
 ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$listObjects$11(S3AFileSystem.java:2595)
 ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
 ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:414) ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:377) ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.listObjects(S3AFileSystem.java:2586) 
~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3832) 
~[?:?]
[2025-11-04  09:44:22]        ... 43 more
[2025-11-04  09:44:22]    Caused by: 
com.amazonaws.http.timers.client.SdkInterruptedException
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.checkInterrupted(AmazonHttpClient.java:961)
 ~[?:?]
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.checkInterrupted(AmazonHttpClient.java:947)
 ~[?:?]
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1141)
 ~[?:?]
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:838)
 ~[?:?]
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:805)
 ~[?:?]
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:779)
 ~[?:?]
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:735)
 ~[?:?]
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:717)
 ~[?:?]
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:581) ~[?:?]
[2025-11-04  09:44:22]        at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:559) ~[?:?]
[2025-11-04  09:44:22]        at 
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5593) ~[?:?]
[2025-11-04  09:44:22]        at 
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5540) ~[?:?]
[2025-11-04  09:44:22]        at 
com.amazonaws.services.s3.AmazonS3Client.listObjectsV2(AmazonS3Client.java:1007)
 ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$listObjects$11(S3AFileSystem.java:2595)
 ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
 ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:414) ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:377) ~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.listObjects(S3AFileSystem.java:2586) 
~[?:?]
[2025-11-04  09:44:22]        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3832) 
~[?:?]
[2025-11-04  09:44:22]        ... 43 more
[2025-11-04  09:44:22]    2025-11-04 08:44:22,071 INFO  
org.apache.kafka.clients.producer.KafkaProducer              [] - [Producer 
clientId=conversation-customer-thread-events-v1-producer] Closing the Kafka 
producer with timeoutMillis = 0 ms.
[2025-11-04  09:44:22]    2025-11-04 08:44:22,071 INFO  
org.apache.kafka.clients.producer.KafkaProducer              [] - [Producer 
clientId=conversation-customer-thread-events-v1-producer] Proceeding to force 
close the producer since pending requests could not be completed within timeout 
0 ms.
[2025-11-04  09:44:22]    2025-11-04 08:44:22,072 INFO  
org.apache.kafka.common.metrics.Metrics                      [] - Metrics 
scheduler closed
[2025-11-04  09:44:22]    2025-11-04 08:44:22,072 INFO  
org.apache.kafka.common.metrics.Metrics                      [] - Closing 
reporter org.apache.kafka.common.metrics.JmxReporter
[2025-11-04  09:44:22]    2025-11-04 08:44:22,072 INFO  
org.apache.kafka.common.metrics.Metrics                      [] - Closing 
reporter org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter
[2025-11-04  09:44:22]    2025-11-04 08:44:22,072 INFO  
org.apache.kafka.common.metrics.Metrics                      [] - Metrics 
reporters closed
[2025-11-04  09:44:22]    2025-11-04 08:44:22,073 INFO  
org.apache.kafka.common.utils.AppInfoParser                  [] - App info 
kafka.producer for conversation-customer-thread-events-v1-producer unregistered
[2025-11-04  09:44:22]    2025-11-04 08:44:22,073 INFO  
org.apache.flink.runtime.taskmanager.Task                    [] - 
processor-thread-conversation-customer -> 
kafka-sink-thread-conversation-customer: Writer -> 
kafka-sink-thread-conversation-customer: Committer (19/20)#860 
(56f77e2196fe7f3ac0662857735727d9_84984c1fa2897a7a40e728e4e87e1da9_18_860) 
switched from CANCELING to CANCELED.
[2025-11-04  09:44:22]    2025-11-04 08:44:22,073 INFO  
org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task 
resources for processor-thread-conversation-customer -> 
kafka-sink-thread-conversation-customer: Writer -> 
kafka-sink-thread-conversation-customer: Committer (19/20)#860 
(56f77e2196fe7f3ac0662857735727d9_84984c1fa2897a7a40e728e4e87e1da9_18_860).
[2025-11-04  09:44:22]    2025-11-04 08:44:22,073 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - 
Un-registering task and sending final execution state CANCELED to JobManager 
for task processor-thread-conversation-customer -> 
kafka-sink-thread-conversation-customer: Writer -> 
kafka-sink-thread-conversation-customer: Committer (19/20)#860  {code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to