Hello! I am trying to run Beam pipeline in local docker-compose environment on top of Flink. I wrote my own Dockerfile for Flink jobmanager and taskmanager. I need to connect to secure Kafka cluster through kerberos. Dockerfile for my-image-apache-beam/flink:1.16-java11: FROM flink:1.16-java11
# python SDK COPY --from=apache/beam_python3.10_sdk /opt/apache/beam/ /opt/apache/beam/ # java SDK COPY --from=apache/beam_java11_sdk:2.51.0 /opt/apache/beam/ /opt/apache/beam_java/ COPY krb5.conf /etc/ My docker-compose.yml version: "2.2" services: jobmanager: image: my-image-apache-beam/flink:1.16-java11 ports: - "8081:8081" volumes: - artifacts:/tmp/beam-artifact-staging command: jobmanager environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager: image: registry.kontur.host/srs/apache-beam/flink:1.16-java11 depends_on: - jobmanager command: taskmanager ports: - "8100-8200:8100-8200" volumes: - artifacts:/tmp/beam-artifact-staging scale: 1 extra_hosts: - "host.docker.internal:host-gateway" environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager.numberOfTaskSlots: 2 taskmanager.memory.process.size: 2Gb beam_job_server: image: apache/beam_flink1.16_job_server command: --flink-master=jobmanager --job-host=0.0.0.0 ports: - "8097:8097" - "8098:8098" - "8099:8099" volumes: - artifacts:/tmp/beam-artifact-staging python-worker-harness: image: "apache/beam_python3.10_sdk" command: "-worker_pool" ports: - "50000:50000" volumes: - artifacts:/tmp/beam-artifact-staging volumes: artifacts: And eventually my pipeline: import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.io.kafka import ReadFromKafka, WriteToKafka, default_io_expansion_service import os import logging job_server = "localhost" pipeline_external_environment = [ "--runner=PortableRunner", f"--job_endpoint={job_server}:8099", f"--artifact_endpoint={job_server}:8098", "--environment_type=EXTERNAL", "--environment_config=python-worker-harness:50000" ] kafka_process_expansion_service = default_io_expansion_service( append_args=[ "--defaultEnvironmentType=PROCESS", "--defaultEnvironmentConfig={\"command\":\"/opt/apache/beam_java/boot\"}" ] ) def run(): pipeline_options = PipelineOptions(pipeline_external_environment) sasl_kerberos_principal = os.getenv('SASL_KERBEROS_PRINCIPAL') sasl_kerberos_password = os.getenv('SASL_KERBEROS_PASSWORD') source_config = { 'bootstrap.servers': 'kafka-host1:9093,kafka-host2:9093,kafka-host3:9093', 'security.protocol': 'SASL_PLAINTEXT', 'sasl.mechanism': 'GSSAPI', 'sasl.kerberos.service.name': 'kafka', 'sasl.kerberos.principal': f'{sasl_kerberos_principal}', 'sasl.kerberos.kinit.cmd': f'kinit -R || echo {sasl_kerberos_password} | kinit {sasl_kerberos_principal}', 'sasl.jaas.config': f'com.sun.security.auth.module.Krb5LoginModule required debug=true principal={sasl_kerberos_principal} useTicketCache=true;', 'group.id': 'test_group_1', 'auto.offset.reset': 'earliest'} source_topic = 'Test_Source2-0_0_0_0.id-0' sink_topic = 'Beam.Test' with beam.Pipeline(options=pipeline_options) as pipeline: outputs = (pipeline | 'Read topic from Kafka' >> ReadFromKafka(consumer_config=source_config, topics=[source_topic], expansion_service=kafka_process_expansion_service ) | 'Write topic to Kafka' >> WriteToKafka(producer_config=source_config, topic=sink_topic, expansion_service=kafka_process_expansion_service ) ) if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) run() But I got stuck with ERROR below: INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Receive slot request 2f0a7a3cd89226651c2f84bd11e23321 for job 1dc3e31750be59cab4f2fcd0710b255e from resource manager with leader id 00000000000000000000000000000000. 2023-11-22 12:52:29,065 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Allocated slot for 2f0a7a3cd89226651c2f84bd11e23321. 2023-11-22 12:52:29,065 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Add job 1dc3e31750be59cab4f2fcd0710b255e for job leader monitoring. 2023-11-22 12:52:29,066 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Try to register at job manager akka.tcp://flink@jobmanager:6123/user/rpc/jobmanager_20 with leader id 00000000-0000-0000-0000-000000000000. 2023-11-22 12:52:29,073 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved JobManager address, beginning registration 2023-11-22 12:52:29,083 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Successful registration at job manager akka.tcp://flink@jobmanager:6123/user/rpc/jobmanager_20 for job 1dc3e31750be59cab4f2fcd0710b255e. 2023-11-22 12:52:29,084 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Establish JobManager connection for job 1dc3e31750be59cab4f2fcd0710b255e. 2023-11-22 12:52:29,084 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Offer reserved slots to the leader of job 1dc3e31750be59cab4f2fcd0710b255e. 2023-11-22 12:52:29,119 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 2f0a7a3cd89226651c2f84bd11e23321. 2023-11-22 12:52:29,122 INFO org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader [] - Creating a changelog storage with name 'memory'. 2023-11-22 12:52:29,123 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task Source: Impulse -> [3]Read topic from Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1)#0 (23453970cf391f954eec648c133f6b45_cbc357ccb763df2852fee8c4fc7d55f2_0_0), deploy into slot with allocation id 2f0a7a3cd89226651c2f84bd11e23321. 2023-11-22 12:52:29,124 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Impulse -> [3]Read topic from Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1)#0 (23453970cf391f954eec648c133f6b45_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from CREATED to DEPLOYING. 2023-11-22 12:52:29,125 INFO org.apache.flink.runtime.taskmanager.Task [] - Loading JAR files for task Source: Impulse -> [3]Read topic from Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1)#0 (23453970cf391f954eec648c133f6b45_cbc357ccb763df2852fee8c4fc7d55f2_0_0) [DEPLOYING]. 2023-11-22 12:52:29,127 INFO org.apache.flink.runtime.blob.BlobClient [] - Downloading 1dc3e31750be59cab4f2fcd0710b255e/p-039dcd4a6f7c7c98e043dd08ae889a32d54677d0-e15023028c24bfb8cee4aa954d7d578a from jobmanager/172.19.0.2:6124 2023-11-22 12:52:29,145 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 2f0a7a3cd89226651c2f84bd11e23321. 2023-11-22 12:52:29,149 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task [5]{Read topic from Kafka, Write topic to Kafka} (1/1)#0 (23453970cf391f954eec648c133f6b45_508275ad2a106fd681f6d94bbcc7822d_0_0), deploy into slot with allocation id 2f0a7a3cd89226651c2f84bd11e23321. 2023-11-22 12:52:29,151 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 2f0a7a3cd89226651c2f84bd11e23321. 2023-11-22 12:52:29,150 INFO org.apache.flink.runtime.taskmanager.Task [] - [5]{Read topic from Kafka, Write topic to Kafka} (1/1)#0 (23453970cf391f954eec648c133f6b45_508275ad2a106fd681f6d94bbcc7822d_0_0) switched from CREATED to DEPLOYING. 2023-11-22 12:52:29,151 INFO org.apache.flink.runtime.taskmanager.Task [] - Loading JAR files for task [5]{Read topic from Kafka, Write topic to Kafka} (1/1)#0 (23453970cf391f954eec648c133f6b45_508275ad2a106fd681f6d94bbcc7822d_0_0) [DEPLOYING]. 2023-11-22 12:52:31,693 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@1cce4869<mailto:org.apache.flink.runtime.state.hashmap.HashMapStateBackend@1cce4869> 2023-11-22 12:52:31,693 INFO org.apache.flink.runtime.state.StateBackendLoader [] - State backend loader loads the state backend as HashMapStateBackend 2023-11-22 12:52:31,696 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Checkpoint storage is set to 'jobmanager' 2023-11-22 12:52:31,727 INFO org.apache.flink.runtime.taskmanager.Task [] - [5]{Read topic from Kafka, Write topic to Kafka} (1/1)#0 (23453970cf391f954eec648c133f6b45_508275ad2a106fd681f6d94bbcc7822d_0_0) switched from DEPLOYING to INITIALIZING. 2023-11-22 12:52:33,035 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@7252a43f<mailto:org.apache.flink.runtime.state.hashmap.HashMapStateBackend@7252a43f> 2023-11-22 12:52:33,036 INFO org.apache.flink.runtime.state.StateBackendLoader [] - State backend loader loads the state backend as HashMapStateBackend 2023-11-22 12:52:33,036 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Checkpoint storage is set to 'jobmanager' 2023-11-22 12:52:33,038 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Impulse -> [3]Read topic from Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1)#0 (23453970cf391f954eec648c133f6b45_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from DEPLOYING to INITIALIZING. 2023-11-22 12:52:33,384 WARN org.apache.flink.metrics.MetricGroup [] - The operator name [3]Read topic from Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} exceeded the 80 characters length limit and was truncated. 2023-11-22 12:52:33,461 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder [] - Finished to build heap keyed state-backend. 2023-11-22 12:52:33,473 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend [] - Initializing heap keyed state backend with stream factory. 2023-11-22 12:52:34,193 INFO org.apache.beam.runners.fnexecution.logging.GrpcLoggingService [] - Beam Fn Logging client connected. 2023-11-22 12:52:35,529 WARN org.apache.beam.runners.fnexecution.logging.GrpcLoggingService [] - Logging client failed unexpectedly. org.apache.beam.vendor.grpc.v1p54p0.io.grpc.StatusRuntimeException: CANCELLED: client cancelled at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Status.asRuntimeException(Status.java:530) ~[blob_p-039dcd4a6f7c7c98e043dd08ae889a32d54677d0-e15023028c24bfb8cee4aa954d7d578a:?] at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:291) [blob_p-039dcd4a6f7c7c98e043dd08ae889a32d54677d0-e15023028c24bfb8cee4aa954d7d578a:?] at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40) [blob_p-039dcd4a6f7c7c98e043dd08ae889a32d54677d0-e15023028c24bfb8cee4aa954d7d578a:?] at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23) [blob_p-039dcd4a6f7c7c98e043dd08ae889a32d54677d0-e15023028c24bfb8cee4aa954d7d578a:?] at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40) [blob_p-039dcd4a6f7c7c98e043dd08ae889a32d54677d0-e15023028c24bfb8cee4aa954d7d578a:?] at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96) [blob_p-039dcd4a6f7c7c98e043dd08ae889a32d54677d0-e15023028c24bfb8cee4aa954d7d578a:?] at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:378) [blob_p-039dcd4a6f7c7c98e043dd08ae889a32d54677d0-e15023028c24bfb8cee4aa954d7d578a:?] at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:365) [blob_p-039dcd4a6f7c7c98e043dd08ae889a32d54677d0-e15023028c24bfb8cee4aa954d7d578a:?] at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:923) [blob_p-039dcd4a6f7c7c98e043dd08ae889a32d54677d0-e15023028c24bfb8cee4aa954d7d578a:?] at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) [blob_p-039dcd4a6f7c7c98e043dd08ae889a32d54677d0-e15023028c24bfb8cee4aa954d7d578a:?] at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) [blob_p-039dcd4a6f7c7c98e043dd08ae889a32d54677d0-e15023028c24bfb8cee4aa954d7d578a:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?] at java.lang.Thread.run(Unknown Source) [?:?] 2023-11-22 12:52:38,914 INFO org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory [] - Still waiting for startup of environment '/opt/apache/beam_java/boot' for worker id 1-1 2023-11-22 12:52:38,925 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: Impulse -> [3]Read topic from Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1)#0 (23453970cf391f954eec648c133f6b45_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from INITIALIZING to FAILED with failure cause: org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: Process died with exit code 1 at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2086) at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.get(LocalCache.java:4012) at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4035) at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5013) at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:5020) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:452) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:437) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:304) at org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38) at org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:207) at org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:258) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:731) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:706) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:672) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) at java.base/java.lang.Thread.run(Unknown Source) Caused by: java.lang.IllegalStateException: Process died with exit code 1 at org.apache.beam.runners.fnexecution.environment.ProcessManager$RunningProcess.isAliveOrThrow(ProcessManager.java:75) at org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory.createEnvironment(ProcessEnvironmentFactory.java:110) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:253) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:232) at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3571) at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2313) at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2190) at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2080) ... 20 more 2023-11-22 12:52:38,925 WARN org.apache.flink.runtime.taskmanager.Task [] - [5]{Read topic from Kafka, Write topic to Kafka} (1/1)#0 (23453970cf391f954eec648c133f6b45_508275ad2a106fd681f6d94bbcc7822d_0_0) switched from INITIALIZING to FAILED with failure cause: org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: Process died with exit code 1 at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2086) at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.get(LocalCache.java:4012) at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4035) at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5013) at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:5020) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:452) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:437) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:304) at org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38) at org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:207) at org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:258) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:731) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:706) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:672) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) at java.base/java.lang.Thread.run(Unknown Source) Caused by: java.lang.IllegalStateException: Process died with exit code 1 at org.apache.beam.runners.fnexecution.environment.ProcessManager$RunningProcess.isAliveOrThrow(ProcessManager.java:75) at org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory.createEnvironment(ProcessEnvironmentFactory.java:110) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:253) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:232) at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3571) at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2313) at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2190) at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2080) at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.get(LocalCache.java:4012) at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4035) at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5013) at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:5020) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:452) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:437) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:304) at org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38) at org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:207) at org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:258) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:731) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100) ... 7 more 2023-11-22 12:52:38,927 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for Source: Impulse -> [3]Read topic from Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1)#0 (23453970cf391f954eec648c133f6b45_cbc357ccb763df2852fee8c4fc7d55f2_0_0). 2023-11-22 12:52:38,927 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for [5]{Read topic from Kafka, Write topic to Kafka} (1/1)#0 (23453970cf391f954eec648c133f6b45_508275ad2a106fd681f6d94bbcc7822d_0_0). 2023-11-22 12:52:38,934 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FAILED to JobManager for task [5]{Read topic from Kafka, Write topic to Kafka} (1/1)#0 23453970cf391f954eec648c133f6b45_508275ad2a106fd681f6d94bbcc7822d_0_0. 2023-11-22 12:52:38,941 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FAILED to JobManager for task Source: Impulse -> [3]Read topic from Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1)#0 23453970cf391f954eec648c133f6b45_cbc357ccb763df2852fee8c4fc7d55f2_0_0. 2023-11-22 12:52:39,055 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:1, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1, taskHeapMemory=268.800mb (281857222 bytes), taskOffHeapMemory=0 bytes, managedMemory=317.440mb (332859969 bytes), networkMemory=79.360mb (83214992 bytes)}, allocationId: 2f0a7a3cd89226651c2f84bd11e23321, jobId: 1dc3e31750be59cab4f2fcd0710b255e). I can't realise what could cause this. Glad to get any help! Best regards, Stanislav Porotikov С уважением, Поротиков Станислав Инженер эскплуатации веб-сервисов Команда SRS