There is a slack channel linked from https://beam.apache.org/community/contact-us/ it is #beam on the-asf.slack.com
(you find this via beam.apache.org -> Community -> Contact Us) It sounds like an issue with running a multi-language pipeline on the portable flink runner. (something which I am not equipped to help with in detail) Kenn On Wed, Aug 9, 2023 at 2:51 PM kapil singh <kapilsing...@gmail.com> wrote: > Hey, > > I've been grappling with this issue for the past five days and, despite my > continuous efforts, I haven't found a resolution. Additionally, I've been > unable to locate a Slack channel for Beam where I might seek assistance. > > issue > > *RuntimeError: Pipeline construction environment and pipeline runtime > environment are not compatible. If you use a custom container image, check > that the Python interpreter minor version and the Apache Beam version in > your image match the versions used at pipeline construction time. > Submission environment: beam:version:sdk_base:apache/beam_java8_sdk:2.48.0. > Runtime environment: > beam:version:sdk_base:apache/beam_python3.8_sdk:2.48.0.* > > > Here what i am trying to do > > i am running job from kubernetes container that hits on job server and > then job manager and task manager > task manager and job manager is one Container > > Here is My custom Dockerfile. name:custom-flink > > # Starting with the base Flink image > FROM apache/flink:1.16-java11 > ARG FLINK_VERSION=1.16 > ARG KAFKA_VERSION=2.8.0 > > # Install python3.8 and its associated dependencies, followed by pyflink > RUN set -ex; \ > apt-get update && \ > apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev > libffi-dev lzma liblzma-dev && \ > wget https://www.python.org/ftp/python/3.8.0/Python-3.8.0.tgz && \ > tar -xvf Python-3.8.0.tgz && \ > cd Python-3.8.0 && \ > ./configure --without-tests --enable-shared && \ > make -j4 && \ > make install && \ > ldconfig /usr/local/lib && \ > cd .. && rm -f Python-3.8.0.tgz && rm -rf Python-3.8.0 && \ > ln -s /usr/local/bin/python3.8 /usr/local/bin/python && \ > ln -s /usr/local/bin/pip3.8 /usr/local/bin/pip && \ > apt-get clean && \ > rm -rf /var/lib/apt/lists/* && \ > python -m pip install --upgrade pip; \ > pip install apache-flink==${FLINK_VERSION}; \ > pip install kafka-python > > RUN pip install --no-cache-dir apache-beam[gcp]==2.48.0 > > # Copy files from official SDK image, including script/dependencies. > COPY --from=apache/beam_python3.8_sdk:2.48.0 /opt/apache/beam/ > /opt/apache/beam/ > > # java SDK > COPY --from=apache/beam_java11_sdk:2.48.0 /opt/apache/beam/ > /opt/apache/beam_java/ > > RUN apt-get update && apt-get install -y python3-venv && rm -rf > /var/lib/apt/lists/* > > # Give permissions to the /opt/apache/beam-venv directory > RUN mkdir -p /opt/apache/beam-venv && chown -R 9999:9999 > /opt/apache/beam-venv > > Here is my Deployment file for Job manager,Task manager plus worker-pool > and job server > > > apiVersion: v1 > kind: Service > metadata: > name: flink-jobmanager > namespace: flink > spec: > type: ClusterIP > ports: > - name: rpc > port: 6123 > - name: blob-server > port: 6124 > - name: webui > port: 8081 > selector: > app: flink > component: jobmanager > --- > apiVersion: v1 > kind: Service > metadata: > name: beam-worker-pool > namespace: flink > spec: > selector: > app: flink > component: taskmanager > ports: > - protocol: TCP > port: 50000 > targetPort: 50000 > name: pool > --- > apiVersion: apps/v1 > kind: Deployment > metadata: > name: flink-jobmanager > namespace: flink > spec: > replicas: 1 > selector: > matchLabels: > app: flink > component: jobmanager > template: > metadata: > labels: > app: flink > component: jobmanager > spec: > containers: > - name: jobmanager > image: custom-flink:latest > imagePullPolicy: IfNotPresent > args: ["jobmanager"] > ports: > - containerPort: 6123 > name: rpc > - containerPort: 6124 > name: blob-server > - containerPort: 8081 > name: webui > livenessProbe: > tcpSocket: > port: 6123 > initialDelaySeconds: 30 > periodSeconds: 60 > volumeMounts: > - name: flink-config-volume > mountPath: /opt/flink/conf > - name: flink-staging > mountPath: /tmp/beam-artifact-staging > securityContext: > runAsUser: 9999 > resources: > requests: > memory: "1Gi" > cpu: "1" > limits: > memory: "1Gi" > cpu: "1" > volumes: > - name: flink-config-volume > configMap: > name: flink-config > items: > - key: flink-conf.yaml > path: flink-conf.yaml > - key: log4j-console.properties > path: log4j-console.properties > - name: flink-staging > persistentVolumeClaim: > claimName: staging-artifacts-claim > --- > apiVersion: apps/v1 > kind: Deployment > metadata: > name: flink-taskmanager > namespace: flink > spec: > replicas: 1 > selector: > matchLabels: > app: flink > component: taskmanager > template: > metadata: > labels: > app: flink > component: taskmanager > spec: > containers: > - name: taskmanager-beam-worker > image: custom-flink:latest > imagePullPolicy: IfNotPresent > args: > - /bin/bash > - -c > - "/opt/flink/bin/taskmanager.sh start-foreground & python -m > apache_beam.runners.worker.worker_pool_main > --container_executable=/opt/apache/beam/boot --service_port=50000 & tail -f > /dev/null" > ports: > - containerPort: 6122 > name: rpc > - containerPort: 6125 > name: query-state > - containerPort: 50000 > name: pool > livenessProbe: > tcpSocket: > port: 6122 > initialDelaySeconds: 30 > periodSeconds: 60 > volumeMounts: > - name: flink-config-volume > mountPath: /opt/flink/conf/ > - name: flink-staging > mountPath: /tmp/beam-artifact-staging > securityContext: > runAsUser: 9999 > resources: > requests: > memory: "4Gi" > cpu: "4" > limits: > memory: "4Gi" > cpu: "4" > volumes: > - name: flink-config-volume > configMap: > name: flink-config > items: > - key: flink-conf.yaml > path: flink-conf.yaml > - key: log4j-console.properties > path: log4j-console.properties > - name: flink-staging > persistentVolumeClaim: > claimName: staging-artifacts-claim > --- > apiVersion: apps/v1 > kind: Deployment > metadata: > name: beam-jobserver > namespace: flink > spec: > replicas: 1 > selector: > matchLabels: > app: beam > component: jobserver > template: > metadata: > labels: > app: beam > component: jobserver > spec: > containers: > - name: beam-jobserver > image: apache/beam_flink1.16_job_server:2.48.0 > args: ["--flink-master=flink-jobmanager:8081"] > ports: > - containerPort: 8097 > - containerPort: 8098 > - containerPort: 8099 > volumeMounts: > - name: beam-staging > mountPath: /tmp/beam-artifact-staging > resources: > requests: > memory: "2Gi" > cpu: "2" > limits: > memory: "2Gi" > cpu: "2" > volumes: > - name: beam-staging > persistentVolumeClaim: > claimName: staging-artifacts-claim > --- > apiVersion: v1 > kind: Service > metadata: > name: beam-jobserver > namespace: flink > spec: > type: ClusterIP > ports: > - name: grpc-port > port: 8097 > targetPort: 8097 > - name: expansion-port > port: 8098 > targetPort: 8098 > - name: job-manage-port > port: 8099 > targetPort: 8099 > selector: > app: beam > component: jobserver > > pvc > > apiVersion: v1 > kind: PersistentVolumeClaim > metadata: > name: staging-artifacts-claim > namespace: flink > spec: > accessModes: > - ReadWriteOnce > resources: > requests: > storage: 5Gi > storageClassName: standard > > > > Then I am running a Pod with apache/beam_python3.8_sdk:2.48.0. > and installing java in it because expansion required to run the code here > is my code that is running from above container > ``` > import json > import logging > import apache_beam as beam > from apache_beam.options.pipeline_options import PipelineOptions > from apache_beam.io.kafka import ReadFromKafka, > default_io_expansion_service > > def run_beam_pipeline(): > logging.getLogger().setLevel(logging.INFO) > > consumer_config = { > 'bootstrap.servers': > 'cluster-0-kafka-bootstrap.strimzi.svc.cluster.local:9092', > 'group.id': 'beamgrouptest', > 'auto.offset.reset': 'earliest', > 'key.deserializer': > 'org.apache.kafka.common.serialization.StringDeserializer', > 'value.deserializer': > 'org.apache.kafka.common.serialization.StringDeserializer', > } > > topic = 'locations' > > flink_options = PipelineOptions([ > "--runner=PortableRunner", > "--artifact_endpoint=beam-jobserver:8098", > "--job_endpoint=beam-jobserver:8099", > "--environment_type=EXTERNAL", > "--environment_config=beam-worker-pool:50000", > # "--environment_config={\"command\": \"/opt/apache/beam/boot\"}", > ]) > > with beam.Pipeline(options=flink_options) as pipeline: > messages = ( > pipeline > | "Read from Kafka" >> ReadFromKafka( > consumer_config=consumer_config, > topics=[topic], > with_metadata=False, > expansion_service=default_io_expansion_service( > append_args=[ > '--defaultEnvironmentType=PROCESS', > "--defaultEnvironmentConfig={\"command\": \"/opt/apache/beam/boot\"}", > ] > ) > ) > | "Print messages" >> beam.Map(print) > ) > > logging.info("Pipeline execution completed.") > > if __name__ == '__main__': > run_beam_pipeline() > > ``` > > When starting a job here is logs, it is downloading java expansion service. > > > python3 testing.py > > > > > > <jemalloc>: MADV_DONTNEED does not work (memset will be used instead) > > <jemalloc>: (This is the expected behaviour if you are running under QEMU) > > INFO:apache_beam.utils.subprocess_server:Using cached job server jar from > https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-io-expansion-service/2.48.0/beam-sdks-java-io-expansion-service-2.48.0.jar > > INFO:root:Starting a JAR-based expansion service from JAR > /root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.48.0.jar > > > INFO:apache_beam.utils.subprocess_server:Starting service with ['java' > '-jar' > '/root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.48.0.jar' > '35371' > '--filesToStage=/root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.48.0.jar' > '--defaultEnvironmentType=PROCESS' '--defaultEnvironmentConfig={"command": > "/opt/apache/beam/boot"}'] > > INFO:apache_beam.utils.subprocess_server:Starting expansion service at > localhost:35371 > > INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:00 AM > org.apache.beam.sdk.expansion.service.ExpansionService > loadRegisteredTransforms > > INFO:apache_beam.utils.subprocess_server:INFO: Registering external > transforms: [beam:transform:org.apache.beam:kafka_read_with_metadata:v1, > beam:transform:org.apache.beam:kafka_read_without_metadata:v1, > beam:transform:org.apache.beam:kafka_write:v1, > beam:external:java:generate_sequence:v1] > > INFO:apache_beam.utils.subprocess_server: > > INFO:apache_beam.utils.subprocess_server:Registered transforms: > > INFO:apache_beam.utils.subprocess_server: > beam:transform:org.apache.beam:kafka_read_with_metadata:v1: > org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@5c6648b0 > > INFO:apache_beam.utils.subprocess_server: > beam:transform:org.apache.beam:kafka_read_without_metadata:v1: > org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@6f1de4c7 > > INFO:apache_beam.utils.subprocess_server: > beam:transform:org.apache.beam:kafka_write:v1: > org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@459e9125 > > INFO:apache_beam.utils.subprocess_server: > beam:external:java:generate_sequence:v1: > org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@128d2484 > > INFO:apache_beam.utils.subprocess_server: > > INFO:apache_beam.utils.subprocess_server:Registered > SchemaTransformProviders: > > INFO:apache_beam.utils.subprocess_server: > beam:schematransform:org.apache.beam:kafka_read:v1 > > INFO:apache_beam.utils.subprocess_server: > beam:schematransform:org.apache.beam:kafka_write:v1 > > WARNING:root:Waiting for grpc channel to be ready at localhost:35371. > > > > > WARNING:root:Waiting for grpc channel to be ready at localhost:35371. > > WARNING:root:Waiting for grpc channel to be ready at localhost:35371. > > WARNING:root:Waiting for grpc channel to be ready at localhost:35371. > > INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:06 AM > org.apache.beam.sdk.expansion.service.ExpansionService expand > > INFO:apache_beam.utils.subprocess_server:INFO: Expanding 'Read from Kafka' > with URN 'beam:transform:org.apache.beam:kafka_read_without_metadata:v1' > > INFO:apache_beam.utils.subprocess_server:Dependencies list: {} > > INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:07 AM > org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader > payloadToConfig > > INFO:apache_beam.utils.subprocess_server:WARNING: Configuration class > 'org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration' has no > schema registered. Attempting to construct with setter approach. > > INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:08 AM > org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader > payloadToConfig > > INFO:apache_beam.utils.subprocess_server:WARNING: Configuration class > 'org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration' has no > schema registered. Attempting to construct with setter approach. > > INFO:root:Default Python SDK image for environment is > apache/beam_python3.8_sdk:2.48.0 > > INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== > <function pack_combiners at 0x402e7a95e0> ==================== > > INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== > <function lift_combiners at 0x402e7a9670> ==================== > > INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== > <function sort_stages at 0x402e7a9dc0> ==================== > > INFO:apache_beam.runners.portability.portable_runner:Job state changed to > STOPPED > > INFO:apache_beam.runners.portability.portable_runner:Job state changed to > STARTING > > INFO:apache_beam.runners.portability.portable_runner:Job state changed to > RUNNING > > > > > > > > *Error* > > 2023-08-09 10:25:37,146 INFO > org.apache.flink.configuration.GlobalConfiguration > [] - Loading configuration property: taskmanager.rpc.port, 6122 > > 2023-08-09 10:25:37,260 INFO org.apache.flink.runtime.taskmanager.Task > [] - Source: Impulse -> [3]Read from > Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), > KafkaIO.ReadSourceDescriptors} (1/1)#0 > (3a42a4a4b7edf55899dc956496d8f99b_cbc357ccb763df2852fee8c4fc7d55f2_0_0) > switched from INITIALIZING to RUNNING. > > 2023-08-09 10:25:37,724 INFO > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder > [] - Finished to build heap keyed state-backend. > > 2023-08-09 10:25:37,731 INFO > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend [] - > Initializing heap keyed state backend with stream factory. > > 2023-08-09 10:25:37,771 INFO > org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService > [] - getProcessBundleDescriptor request with id 1-4 > > 2023-08-09 10:25:37,876 INFO > /usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:889 > [] - Creating insecure state channel for localhost:45429. > > 2023-08-09 10:25:37,877 INFO > /usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:896 > [] - State channel established. > > 2023-08-09 10:25:37,918 INFO > /usr/local/lib/python3.8/site-packages/apache_beam/transforms/environments.py:376 > [] - Default Python SDK image for environment is > apache/beam_python3.8_sdk:2.48.0 > > 2023-08-09 10:25:37,928 ERROR > /usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:299 > [] - Error processing instruction 1. Original traceback is > > Traceback (most recent call last): > > File > "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 295, in _execute > > response = task() > > File > "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 370, in <lambda> > > lambda: self.create_worker().do_instruction(request), request) > > File > "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 629, in do_instruction > > return getattr(self, request_type)( > > File > "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 660, in process_bundle > > bundle_processor = self.bundle_processor_cache.get( > > File > "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 491, in get > > processor = bundle_processor.BundleProcessor( > > File > "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 876, in __init__ > > > _verify_descriptor_created_in_a_compatible_env(process_bundle_descriptor) > > File > "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 839, in _verify_descriptor_created_in_a_compatible_env > > raise RuntimeError( > > *RuntimeError: Pipeline construction environment and pipeline runtime > environment are not compatible. If you use a custom container image, check > that the Python interpreter minor version and the Apache Beam version in > your image match the versions used at pipeline construction time. > Submission environment: beam:version:sdk_base:apache/beam_java8_sdk:2.48.0. > Runtime environment: > beam:version:sdk_base:apache/beam_python3.8_sdk:2.48.0.* > > > > > 2023-08-09 10:25:37,931 INFO org.apache.flink.runtime.taskmanager.Task > [] - [3]Read from Kafka/{KafkaIO.Read, Remove Kafka > Metadata} -> [1]Print messages (1/1)#0 > (3a42a4a4b7edf55899dc956496d8f99b_03f93075562d7d50bb0b07080b2ebe35_0_0) > switched from INITIALIZING to RUNNING. > > 2023-08-09 10:28:37,784 WARN org.apache.flink.runtime.taskmanager.Task > [] - Source: Impulse -> [3]Read from > Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), > KafkaIO.ReadSourceDescriptors} (1/1)#0 > (3a42a4a4b7edf55899dc956496d8f99b_cbc357ccb763df2852fee8c4fc7d55f2_0_0) > switched from RUNNING to FAILED with failure cause: > java.lang.RuntimeException: Failed to start remote bundle > > > > > Thanks > kapil Dev > > >