Hey Kapil, I grappled with a similar deployment and created this repo <https://github.com/sambvfx/beam-flink-k8s> [1] to attempt to provide others with some nuggets of useful information. We were running cross language pipelines on flink connecting PubsubIO <https://github.com/apache/beam/blob/v2.48.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L171> [2] to other misc python transforms. No promises it will help, but feel free to take a look as it's a close approximation to the setup that we had working.
Your particular error seems related to the Kafka transform. Does a pure python pipeline execute as expected? [1] https://github.com/sambvfx/beam-flink-k8s [2] https://github.com/apache/beam/blob/v2.48.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L171 On Mon, Aug 14, 2023 at 11:05 AM Daniel Chen via user <user@beam.apache.org> wrote: > Not the OP, but is it possible to join the slack channel without an > apache.org email address? I tried joining slack previously for support > and gave up because it looked like it wasn't. > > On Mon, Aug 14, 2023 at 10:58 AM Kenneth Knowles <k...@apache.org> wrote: > >> There is a slack channel linked from >> https://beam.apache.org/community/contact-us/ it is #beam on >> the-asf.slack.com >> >> (you find this via beam.apache.org -> Community -> Contact Us) >> >> It sounds like an issue with running a multi-language pipeline on the >> portable flink runner. (something which I am not equipped to help with in >> detail) >> >> Kenn >> >> On Wed, Aug 9, 2023 at 2:51 PM kapil singh <kapilsing...@gmail.com> >> wrote: >> >>> Hey, >>> >>> I've been grappling with this issue for the past five days and, despite >>> my continuous efforts, I haven't found a resolution. Additionally, I've >>> been unable to locate a Slack channel for Beam where I might seek >>> assistance. >>> >>> issue >>> >>> *RuntimeError: Pipeline construction environment and pipeline runtime >>> environment are not compatible. If you use a custom container image, check >>> that the Python interpreter minor version and the Apache Beam version in >>> your image match the versions used at pipeline construction time. >>> Submission environment: beam:version:sdk_base:apache/beam_java8_sdk:2.48.0. >>> Runtime environment: >>> beam:version:sdk_base:apache/beam_python3.8_sdk:2.48.0.* >>> >>> >>> Here what i am trying to do >>> >>> i am running job from kubernetes container that hits on job server and >>> then job manager and task manager >>> task manager and job manager is one Container >>> >>> Here is My custom Dockerfile. name:custom-flink >>> >>> # Starting with the base Flink image >>> FROM apache/flink:1.16-java11 >>> ARG FLINK_VERSION=1.16 >>> ARG KAFKA_VERSION=2.8.0 >>> >>> # Install python3.8 and its associated dependencies, followed by pyflink >>> RUN set -ex; \ >>> apt-get update && \ >>> apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev >>> libffi-dev lzma liblzma-dev && \ >>> wget https://www.python.org/ftp/python/3.8.0/Python-3.8.0.tgz && \ >>> tar -xvf Python-3.8.0.tgz && \ >>> cd Python-3.8.0 && \ >>> ./configure --without-tests --enable-shared && \ >>> make -j4 && \ >>> make install && \ >>> ldconfig /usr/local/lib && \ >>> cd .. && rm -f Python-3.8.0.tgz && rm -rf Python-3.8.0 && \ >>> ln -s /usr/local/bin/python3.8 /usr/local/bin/python && \ >>> ln -s /usr/local/bin/pip3.8 /usr/local/bin/pip && \ >>> apt-get clean && \ >>> rm -rf /var/lib/apt/lists/* && \ >>> python -m pip install --upgrade pip; \ >>> pip install apache-flink==${FLINK_VERSION}; \ >>> pip install kafka-python >>> >>> RUN pip install --no-cache-dir apache-beam[gcp]==2.48.0 >>> >>> # Copy files from official SDK image, including script/dependencies. >>> COPY --from=apache/beam_python3.8_sdk:2.48.0 /opt/apache/beam/ >>> /opt/apache/beam/ >>> >>> # java SDK >>> COPY --from=apache/beam_java11_sdk:2.48.0 /opt/apache/beam/ >>> /opt/apache/beam_java/ >>> >>> RUN apt-get update && apt-get install -y python3-venv && rm -rf >>> /var/lib/apt/lists/* >>> >>> # Give permissions to the /opt/apache/beam-venv directory >>> RUN mkdir -p /opt/apache/beam-venv && chown -R 9999:9999 >>> /opt/apache/beam-venv >>> >>> Here is my Deployment file for Job manager,Task manager plus worker-pool >>> and job server >>> >>> >>> apiVersion: v1 >>> kind: Service >>> metadata: >>> name: flink-jobmanager >>> namespace: flink >>> spec: >>> type: ClusterIP >>> ports: >>> - name: rpc >>> port: 6123 >>> - name: blob-server >>> port: 6124 >>> - name: webui >>> port: 8081 >>> selector: >>> app: flink >>> component: jobmanager >>> --- >>> apiVersion: v1 >>> kind: Service >>> metadata: >>> name: beam-worker-pool >>> namespace: flink >>> spec: >>> selector: >>> app: flink >>> component: taskmanager >>> ports: >>> - protocol: TCP >>> port: 50000 >>> targetPort: 50000 >>> name: pool >>> --- >>> apiVersion: apps/v1 >>> kind: Deployment >>> metadata: >>> name: flink-jobmanager >>> namespace: flink >>> spec: >>> replicas: 1 >>> selector: >>> matchLabels: >>> app: flink >>> component: jobmanager >>> template: >>> metadata: >>> labels: >>> app: flink >>> component: jobmanager >>> spec: >>> containers: >>> - name: jobmanager >>> image: custom-flink:latest >>> imagePullPolicy: IfNotPresent >>> args: ["jobmanager"] >>> ports: >>> - containerPort: 6123 >>> name: rpc >>> - containerPort: 6124 >>> name: blob-server >>> - containerPort: 8081 >>> name: webui >>> livenessProbe: >>> tcpSocket: >>> port: 6123 >>> initialDelaySeconds: 30 >>> periodSeconds: 60 >>> volumeMounts: >>> - name: flink-config-volume >>> mountPath: /opt/flink/conf >>> - name: flink-staging >>> mountPath: /tmp/beam-artifact-staging >>> securityContext: >>> runAsUser: 9999 >>> resources: >>> requests: >>> memory: "1Gi" >>> cpu: "1" >>> limits: >>> memory: "1Gi" >>> cpu: "1" >>> volumes: >>> - name: flink-config-volume >>> configMap: >>> name: flink-config >>> items: >>> - key: flink-conf.yaml >>> path: flink-conf.yaml >>> - key: log4j-console.properties >>> path: log4j-console.properties >>> - name: flink-staging >>> persistentVolumeClaim: >>> claimName: staging-artifacts-claim >>> --- >>> apiVersion: apps/v1 >>> kind: Deployment >>> metadata: >>> name: flink-taskmanager >>> namespace: flink >>> spec: >>> replicas: 1 >>> selector: >>> matchLabels: >>> app: flink >>> component: taskmanager >>> template: >>> metadata: >>> labels: >>> app: flink >>> component: taskmanager >>> spec: >>> containers: >>> - name: taskmanager-beam-worker >>> image: custom-flink:latest >>> imagePullPolicy: IfNotPresent >>> args: >>> - /bin/bash >>> - -c >>> - "/opt/flink/bin/taskmanager.sh start-foreground & python -m >>> apache_beam.runners.worker.worker_pool_main >>> --container_executable=/opt/apache/beam/boot --service_port=50000 & tail -f >>> /dev/null" >>> ports: >>> - containerPort: 6122 >>> name: rpc >>> - containerPort: 6125 >>> name: query-state >>> - containerPort: 50000 >>> name: pool >>> livenessProbe: >>> tcpSocket: >>> port: 6122 >>> initialDelaySeconds: 30 >>> periodSeconds: 60 >>> volumeMounts: >>> - name: flink-config-volume >>> mountPath: /opt/flink/conf/ >>> - name: flink-staging >>> mountPath: /tmp/beam-artifact-staging >>> securityContext: >>> runAsUser: 9999 >>> resources: >>> requests: >>> memory: "4Gi" >>> cpu: "4" >>> limits: >>> memory: "4Gi" >>> cpu: "4" >>> volumes: >>> - name: flink-config-volume >>> configMap: >>> name: flink-config >>> items: >>> - key: flink-conf.yaml >>> path: flink-conf.yaml >>> - key: log4j-console.properties >>> path: log4j-console.properties >>> - name: flink-staging >>> persistentVolumeClaim: >>> claimName: staging-artifacts-claim >>> --- >>> apiVersion: apps/v1 >>> kind: Deployment >>> metadata: >>> name: beam-jobserver >>> namespace: flink >>> spec: >>> replicas: 1 >>> selector: >>> matchLabels: >>> app: beam >>> component: jobserver >>> template: >>> metadata: >>> labels: >>> app: beam >>> component: jobserver >>> spec: >>> containers: >>> - name: beam-jobserver >>> image: apache/beam_flink1.16_job_server:2.48.0 >>> args: ["--flink-master=flink-jobmanager:8081"] >>> ports: >>> - containerPort: 8097 >>> - containerPort: 8098 >>> - containerPort: 8099 >>> volumeMounts: >>> - name: beam-staging >>> mountPath: /tmp/beam-artifact-staging >>> resources: >>> requests: >>> memory: "2Gi" >>> cpu: "2" >>> limits: >>> memory: "2Gi" >>> cpu: "2" >>> volumes: >>> - name: beam-staging >>> persistentVolumeClaim: >>> claimName: staging-artifacts-claim >>> --- >>> apiVersion: v1 >>> kind: Service >>> metadata: >>> name: beam-jobserver >>> namespace: flink >>> spec: >>> type: ClusterIP >>> ports: >>> - name: grpc-port >>> port: 8097 >>> targetPort: 8097 >>> - name: expansion-port >>> port: 8098 >>> targetPort: 8098 >>> - name: job-manage-port >>> port: 8099 >>> targetPort: 8099 >>> selector: >>> app: beam >>> component: jobserver >>> >>> pvc >>> >>> apiVersion: v1 >>> kind: PersistentVolumeClaim >>> metadata: >>> name: staging-artifacts-claim >>> namespace: flink >>> spec: >>> accessModes: >>> - ReadWriteOnce >>> resources: >>> requests: >>> storage: 5Gi >>> storageClassName: standard >>> >>> >>> >>> Then I am running a Pod with apache/beam_python3.8_sdk:2.48.0. >>> and installing java in it because expansion required to run the code >>> here is my code that is running from above container >>> ``` >>> import json >>> import logging >>> import apache_beam as beam >>> from apache_beam.options.pipeline_options import PipelineOptions >>> from apache_beam.io.kafka import ReadFromKafka, >>> default_io_expansion_service >>> >>> def run_beam_pipeline(): >>> logging.getLogger().setLevel(logging.INFO) >>> >>> consumer_config = { >>> 'bootstrap.servers': >>> 'cluster-0-kafka-bootstrap.strimzi.svc.cluster.local:9092', >>> 'group.id': 'beamgrouptest', >>> 'auto.offset.reset': 'earliest', >>> 'key.deserializer': >>> 'org.apache.kafka.common.serialization.StringDeserializer', >>> 'value.deserializer': >>> 'org.apache.kafka.common.serialization.StringDeserializer', >>> } >>> >>> topic = 'locations' >>> >>> flink_options = PipelineOptions([ >>> "--runner=PortableRunner", >>> "--artifact_endpoint=beam-jobserver:8098", >>> "--job_endpoint=beam-jobserver:8099", >>> "--environment_type=EXTERNAL", >>> "--environment_config=beam-worker-pool:50000", >>> # "--environment_config={\"command\": \"/opt/apache/beam/boot\"}", >>> ]) >>> >>> with beam.Pipeline(options=flink_options) as pipeline: >>> messages = ( >>> pipeline >>> | "Read from Kafka" >> ReadFromKafka( >>> consumer_config=consumer_config, >>> topics=[topic], >>> with_metadata=False, >>> expansion_service=default_io_expansion_service( >>> append_args=[ >>> '--defaultEnvironmentType=PROCESS', >>> "--defaultEnvironmentConfig={\"command\": \"/opt/apache/beam/boot\"}", >>> ] >>> ) >>> ) >>> | "Print messages" >> beam.Map(print) >>> ) >>> >>> logging.info("Pipeline execution completed.") >>> >>> if __name__ == '__main__': >>> run_beam_pipeline() >>> >>> ``` >>> >>> When starting a job here is logs, it is downloading java expansion >>> service. >>> >>> >>> python3 testing.py >>> >>> >>> >>> >>> >>> <jemalloc>: MADV_DONTNEED does not work (memset will be used instead) >>> >>> <jemalloc>: (This is the expected behaviour if you are running under >>> QEMU) >>> >>> INFO:apache_beam.utils.subprocess_server:Using cached job server jar >>> from >>> https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-io-expansion-service/2.48.0/beam-sdks-java-io-expansion-service-2.48.0.jar >>> >>> INFO:root:Starting a JAR-based expansion service from JAR >>> /root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.48.0.jar >>> >>> >>> INFO:apache_beam.utils.subprocess_server:Starting service with ['java' >>> '-jar' >>> '/root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.48.0.jar' >>> '35371' >>> '--filesToStage=/root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.48.0.jar' >>> '--defaultEnvironmentType=PROCESS' '--defaultEnvironmentConfig={"command": >>> "/opt/apache/beam/boot"}'] >>> >>> INFO:apache_beam.utils.subprocess_server:Starting expansion service at >>> localhost:35371 >>> >>> INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:00 AM >>> org.apache.beam.sdk.expansion.service.ExpansionService >>> loadRegisteredTransforms >>> >>> INFO:apache_beam.utils.subprocess_server:INFO: Registering external >>> transforms: [beam:transform:org.apache.beam:kafka_read_with_metadata:v1, >>> beam:transform:org.apache.beam:kafka_read_without_metadata:v1, >>> beam:transform:org.apache.beam:kafka_write:v1, >>> beam:external:java:generate_sequence:v1] >>> >>> INFO:apache_beam.utils.subprocess_server: >>> >>> INFO:apache_beam.utils.subprocess_server:Registered transforms: >>> >>> INFO:apache_beam.utils.subprocess_server: >>> beam:transform:org.apache.beam:kafka_read_with_metadata:v1: >>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@5c6648b0 >>> >>> INFO:apache_beam.utils.subprocess_server: >>> beam:transform:org.apache.beam:kafka_read_without_metadata:v1: >>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@6f1de4c7 >>> >>> INFO:apache_beam.utils.subprocess_server: >>> beam:transform:org.apache.beam:kafka_write:v1: >>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@459e9125 >>> >>> INFO:apache_beam.utils.subprocess_server: >>> beam:external:java:generate_sequence:v1: >>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@128d2484 >>> >>> INFO:apache_beam.utils.subprocess_server: >>> >>> INFO:apache_beam.utils.subprocess_server:Registered >>> SchemaTransformProviders: >>> >>> INFO:apache_beam.utils.subprocess_server: >>> beam:schematransform:org.apache.beam:kafka_read:v1 >>> >>> INFO:apache_beam.utils.subprocess_server: >>> beam:schematransform:org.apache.beam:kafka_write:v1 >>> >>> WARNING:root:Waiting for grpc channel to be ready at localhost:35371. >>> >>> >>> >>> >>> WARNING:root:Waiting for grpc channel to be ready at localhost:35371. >>> >>> WARNING:root:Waiting for grpc channel to be ready at localhost:35371. >>> >>> WARNING:root:Waiting for grpc channel to be ready at localhost:35371. >>> >>> INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:06 AM >>> org.apache.beam.sdk.expansion.service.ExpansionService expand >>> >>> INFO:apache_beam.utils.subprocess_server:INFO: Expanding 'Read from >>> Kafka' with URN >>> 'beam:transform:org.apache.beam:kafka_read_without_metadata:v1' >>> >>> INFO:apache_beam.utils.subprocess_server:Dependencies list: {} >>> >>> INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:07 AM >>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader >>> payloadToConfig >>> >>> INFO:apache_beam.utils.subprocess_server:WARNING: Configuration class >>> 'org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration' has no >>> schema registered. Attempting to construct with setter approach. >>> >>> INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:08 AM >>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader >>> payloadToConfig >>> >>> INFO:apache_beam.utils.subprocess_server:WARNING: Configuration class >>> 'org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration' has no >>> schema registered. Attempting to construct with setter approach. >>> >>> INFO:root:Default Python SDK image for environment is >>> apache/beam_python3.8_sdk:2.48.0 >>> >>> INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== >>> <function pack_combiners at 0x402e7a95e0> ==================== >>> >>> INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== >>> <function lift_combiners at 0x402e7a9670> ==================== >>> >>> INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== >>> <function sort_stages at 0x402e7a9dc0> ==================== >>> >>> INFO:apache_beam.runners.portability.portable_runner:Job state changed >>> to STOPPED >>> >>> INFO:apache_beam.runners.portability.portable_runner:Job state changed >>> to STARTING >>> >>> INFO:apache_beam.runners.portability.portable_runner:Job state changed >>> to RUNNING >>> >>> >>> >>> >>> >>> >>> >>> *Error* >>> >>> 2023-08-09 10:25:37,146 INFO >>> org.apache.flink.configuration.GlobalConfiguration >>> [] - Loading configuration property: taskmanager.rpc.port, >>> 6122 >>> >>> 2023-08-09 10:25:37,260 INFO org.apache.flink.runtime.taskmanager.Task >>> [] - Source: Impulse -> [3]Read from >>> Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), >>> KafkaIO.ReadSourceDescriptors} (1/1)#0 >>> (3a42a4a4b7edf55899dc956496d8f99b_cbc357ccb763df2852fee8c4fc7d55f2_0_0) >>> switched from INITIALIZING to RUNNING. >>> >>> 2023-08-09 10:25:37,724 INFO >>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder >>> [] - Finished to build heap keyed state-backend. >>> >>> 2023-08-09 10:25:37,731 INFO >>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend [] - >>> Initializing heap keyed state backend with stream factory. >>> >>> 2023-08-09 10:25:37,771 INFO >>> org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService >>> [] - getProcessBundleDescriptor request with id 1-4 >>> >>> 2023-08-09 10:25:37,876 INFO >>> /usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:889 >>> [] - Creating insecure state channel for localhost:45429. >>> >>> 2023-08-09 10:25:37,877 INFO >>> /usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:896 >>> [] - State channel established. >>> >>> 2023-08-09 10:25:37,918 INFO >>> /usr/local/lib/python3.8/site-packages/apache_beam/transforms/environments.py:376 >>> [] - Default Python SDK image for environment is >>> apache/beam_python3.8_sdk:2.48.0 >>> >>> 2023-08-09 10:25:37,928 ERROR >>> /usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:299 >>> [] - Error processing instruction 1. Original traceback is >>> >>> Traceback (most recent call last): >>> >>> File >>> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", >>> line 295, in _execute >>> >>> response = task() >>> >>> File >>> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", >>> line 370, in <lambda> >>> >>> lambda: self.create_worker().do_instruction(request), request) >>> >>> File >>> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", >>> line 629, in do_instruction >>> >>> return getattr(self, request_type)( >>> >>> File >>> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", >>> line 660, in process_bundle >>> >>> bundle_processor = self.bundle_processor_cache.get( >>> >>> File >>> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", >>> line 491, in get >>> >>> processor = bundle_processor.BundleProcessor( >>> >>> File >>> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", >>> line 876, in __init__ >>> >>> >>> _verify_descriptor_created_in_a_compatible_env(process_bundle_descriptor) >>> >>> File >>> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", >>> line 839, in _verify_descriptor_created_in_a_compatible_env >>> >>> raise RuntimeError( >>> >>> *RuntimeError: Pipeline construction environment and pipeline runtime >>> environment are not compatible. If you use a custom container image, check >>> that the Python interpreter minor version and the Apache Beam version in >>> your image match the versions used at pipeline construction time. >>> Submission environment: beam:version:sdk_base:apache/beam_java8_sdk:2.48.0. >>> Runtime environment: >>> beam:version:sdk_base:apache/beam_python3.8_sdk:2.48.0.* >>> >>> >>> >>> >>> 2023-08-09 10:25:37,931 INFO org.apache.flink.runtime.taskmanager.Task >>> [] - [3]Read from Kafka/{KafkaIO.Read, Remove Kafka >>> Metadata} -> [1]Print messages (1/1)#0 >>> (3a42a4a4b7edf55899dc956496d8f99b_03f93075562d7d50bb0b07080b2ebe35_0_0) >>> switched from INITIALIZING to RUNNING. >>> >>> 2023-08-09 10:28:37,784 WARN org.apache.flink.runtime.taskmanager.Task >>> [] - Source: Impulse -> [3]Read from >>> Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), >>> KafkaIO.ReadSourceDescriptors} (1/1)#0 >>> (3a42a4a4b7edf55899dc956496d8f99b_cbc357ccb763df2852fee8c4fc7d55f2_0_0) >>> switched from RUNNING to FAILED with failure cause: >>> java.lang.RuntimeException: Failed to start remote bundle >>> >>> >>> >>> >>> Thanks >>> kapil Dev >>> >>> >>> >>