Hey Kapil,

I grappled with a similar deployment and created this repo
<https://github.com/sambvfx/beam-flink-k8s> [1] to attempt to provide
others with some nuggets of useful information. We were running cross
language pipelines on flink connecting PubsubIO
to other misc python transforms. No promises it will help, but feel free to
take a look as it's a close approximation to the setup that we had working.

Your particular error seems related to the Kafka transform. Does a pure
python pipeline execute as expected?

On Mon, Aug 14, 2023 at 11:05 AM Daniel Chen via user <user@beam.apache.org>

> Not the OP, but is it possible to join the slack channel without an
> apache.org email address? I tried joining slack previously for support
> and gave up because it looked like it wasn't.
> On Mon, Aug 14, 2023 at 10:58 AM Kenneth Knowles <k...@apache.org> wrote:
>> There is a slack channel linked from
>> https://beam.apache.org/community/contact-us/ it is #beam on
>> the-asf.slack.com
>> (you find this via beam.apache.org -> Community -> Contact Us)
>> It sounds like an issue with running a multi-language pipeline on the
>> portable flink runner. (something which I am not equipped to help with in
>> detail)
>> Kenn
>> On Wed, Aug 9, 2023 at 2:51 PM kapil singh <kapilsing...@gmail.com>
>> wrote:
>>> Hey,
>>> I've been grappling with this issue for the past five days and, despite
>>> my continuous efforts, I haven't found a resolution. Additionally, I've
>>> been unable to locate a Slack channel for Beam where I might seek
>>> assistance.
>>> issue
>>> *RuntimeError: Pipeline construction environment and pipeline runtime
>>> environment are not compatible. If you use a custom container image, check
>>> that the Python interpreter minor version and the Apache Beam version in
>>> your image match the versions used at pipeline construction time.
>>> Submission environment: beam:version:sdk_base:apache/beam_java8_sdk:2.48.0.
>>> Runtime environment:
>>> beam:version:sdk_base:apache/beam_python3.8_sdk:2.48.0.*
>>> Here what i am trying to do
>>>  i am running job from kubernetes container  that hits on job server and
>>> then job manager and task manager
>>> task manager and job manager is one Container
>>> Here is  My custom Dockerfile. name:custom-flink
>>> # Starting with the base Flink image
>>> FROM apache/flink:1.16-java11
>>> # Install python3.8 and its associated dependencies, followed by pyflink
>>> RUN set -ex; \
>>> apt-get update && \
>>> apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev
>>> libffi-dev lzma liblzma-dev && \
>>> wget https://www.python.org/ftp/python/3.8.0/Python-3.8.0.tgz && \
>>> tar -xvf Python-3.8.0.tgz && \
>>> cd Python-3.8.0 && \
>>> ./configure --without-tests --enable-shared && \
>>> make -j4 && \
>>> make install && \
>>> ldconfig /usr/local/lib && \
>>> cd .. && rm -f Python-3.8.0.tgz && rm -rf Python-3.8.0 && \
>>> ln -s /usr/local/bin/python3.8 /usr/local/bin/python && \
>>> ln -s /usr/local/bin/pip3.8 /usr/local/bin/pip && \
>>> apt-get clean && \
>>> rm -rf /var/lib/apt/lists/* && \
>>> python -m pip install --upgrade pip; \
>>> pip install apache-flink==${FLINK_VERSION}; \
>>> pip install kafka-python
>>> RUN pip install --no-cache-dir apache-beam[gcp]==2.48.0
>>> # Copy files from official SDK image, including script/dependencies.
>>> COPY --from=apache/beam_python3.8_sdk:2.48.0 /opt/apache/beam/
>>> /opt/apache/beam/
>>> # java SDK
>>> COPY --from=apache/beam_java11_sdk:2.48.0 /opt/apache/beam/
>>> /opt/apache/beam_java/
>>> RUN apt-get update && apt-get install -y python3-venv && rm -rf
>>> /var/lib/apt/lists/*
>>> # Give permissions to the /opt/apache/beam-venv directory
>>> RUN mkdir -p /opt/apache/beam-venv && chown -R 9999:9999
>>> /opt/apache/beam-venv
>>> Here is my Deployment file for Job manager,Task manager plus worker-pool
>>> and job server
>>> apiVersion: v1
>>> kind: Service
>>> metadata:
>>> name: flink-jobmanager
>>> namespace: flink
>>> spec:
>>> type: ClusterIP
>>> ports:
>>> - name: rpc
>>> port: 6123
>>> - name: blob-server
>>> port: 6124
>>> - name: webui
>>> port: 8081
>>> selector:
>>> app: flink
>>> component: jobmanager
>>> ---
>>> apiVersion: v1
>>> kind: Service
>>> metadata:
>>> name: beam-worker-pool
>>> namespace: flink
>>> spec:
>>> selector:
>>> app: flink
>>> component: taskmanager
>>> ports:
>>> - protocol: TCP
>>> port: 50000
>>> targetPort: 50000
>>> name: pool
>>> ---
>>> apiVersion: apps/v1
>>> kind: Deployment
>>> metadata:
>>> name: flink-jobmanager
>>> namespace: flink
>>> spec:
>>> replicas: 1
>>> selector:
>>> matchLabels:
>>> app: flink
>>> component: jobmanager
>>> template:
>>> metadata:
>>> labels:
>>> app: flink
>>> component: jobmanager
>>> spec:
>>> containers:
>>> - name: jobmanager
>>> image: custom-flink:latest
>>> imagePullPolicy: IfNotPresent
>>> args: ["jobmanager"]
>>> ports:
>>> - containerPort: 6123
>>> name: rpc
>>> - containerPort: 6124
>>> name: blob-server
>>> - containerPort: 8081
>>> name: webui
>>> livenessProbe:
>>> tcpSocket:
>>> port: 6123
>>> initialDelaySeconds: 30
>>> periodSeconds: 60
>>> volumeMounts:
>>> - name: flink-config-volume
>>> mountPath: /opt/flink/conf
>>> - name: flink-staging
>>> mountPath: /tmp/beam-artifact-staging
>>> securityContext:
>>> runAsUser: 9999
>>> resources:
>>> requests:
>>> memory: "1Gi"
>>> cpu: "1"
>>> limits:
>>> memory: "1Gi"
>>> cpu: "1"
>>> volumes:
>>> - name: flink-config-volume
>>> configMap:
>>> name: flink-config
>>> items:
>>> - key: flink-conf.yaml
>>> path: flink-conf.yaml
>>> - key: log4j-console.properties
>>> path: log4j-console.properties
>>> - name: flink-staging
>>> persistentVolumeClaim:
>>> claimName: staging-artifacts-claim
>>> ---
>>> apiVersion: apps/v1
>>> kind: Deployment
>>> metadata:
>>> name: flink-taskmanager
>>> namespace: flink
>>> spec:
>>> replicas: 1
>>> selector:
>>> matchLabels:
>>> app: flink
>>> component: taskmanager
>>> template:
>>> metadata:
>>> labels:
>>> app: flink
>>> component: taskmanager
>>> spec:
>>> containers:
>>> - name: taskmanager-beam-worker
>>> image: custom-flink:latest
>>> imagePullPolicy: IfNotPresent
>>> args:
>>> - /bin/bash
>>> - -c
>>> - "/opt/flink/bin/taskmanager.sh start-foreground & python -m
>>> apache_beam.runners.worker.worker_pool_main
>>> --container_executable=/opt/apache/beam/boot --service_port=50000 & tail -f
>>> /dev/null"
>>> ports:
>>> - containerPort: 6122
>>> name: rpc
>>> - containerPort: 6125
>>> name: query-state
>>> - containerPort: 50000
>>> name: pool
>>> livenessProbe:
>>> tcpSocket:
>>> port: 6122
>>> initialDelaySeconds: 30
>>> periodSeconds: 60
>>> volumeMounts:
>>> - name: flink-config-volume
>>> mountPath: /opt/flink/conf/
>>> - name: flink-staging
>>> mountPath: /tmp/beam-artifact-staging
>>> securityContext:
>>> runAsUser: 9999
>>> resources:
>>> requests:
>>> memory: "4Gi"
>>> cpu: "4"
>>> limits:
>>> memory: "4Gi"
>>> cpu: "4"
>>> volumes:
>>> - name: flink-config-volume
>>> configMap:
>>> name: flink-config
>>> items:
>>> - key: flink-conf.yaml
>>> path: flink-conf.yaml
>>> - key: log4j-console.properties
>>> path: log4j-console.properties
>>> - name: flink-staging
>>> persistentVolumeClaim:
>>> claimName: staging-artifacts-claim
>>> ---
>>> apiVersion: apps/v1
>>> kind: Deployment
>>> metadata:
>>> name: beam-jobserver
>>> namespace: flink
>>> spec:
>>> replicas: 1
>>> selector:
>>> matchLabels:
>>> app: beam
>>> component: jobserver
>>> template:
>>> metadata:
>>> labels:
>>> app: beam
>>> component: jobserver
>>> spec:
>>> containers:
>>> - name: beam-jobserver
>>> image: apache/beam_flink1.16_job_server:2.48.0
>>> args: ["--flink-master=flink-jobmanager:8081"]
>>> ports:
>>> - containerPort: 8097
>>> - containerPort: 8098
>>> - containerPort: 8099
>>> volumeMounts:
>>> - name: beam-staging
>>> mountPath: /tmp/beam-artifact-staging
>>> resources:
>>> requests:
>>> memory: "2Gi"
>>> cpu: "2"
>>> limits:
>>> memory: "2Gi"
>>> cpu: "2"
>>> volumes:
>>> - name: beam-staging
>>> persistentVolumeClaim:
>>> claimName: staging-artifacts-claim
>>> ---
>>> apiVersion: v1
>>> kind: Service
>>> metadata:
>>> name: beam-jobserver
>>> namespace: flink
>>> spec:
>>> type: ClusterIP
>>> ports:
>>> - name: grpc-port
>>> port: 8097
>>> targetPort: 8097
>>> - name: expansion-port
>>> port: 8098
>>> targetPort: 8098
>>> - name: job-manage-port
>>> port: 8099
>>> targetPort: 8099
>>> selector:
>>> app: beam
>>> component: jobserver
>>> pvc
>>> apiVersion: v1
>>> kind: PersistentVolumeClaim
>>> metadata:
>>> name: staging-artifacts-claim
>>> namespace: flink
>>> spec:
>>> accessModes:
>>> - ReadWriteOnce
>>> resources:
>>> requests:
>>> storage: 5Gi
>>> storageClassName: standard
>>> Then I am running a Pod  with apache/beam_python3.8_sdk:2.48.0.
>>> and installing java in it because expansion required to run the code
>>> here is my code that is running from above container
>>> ```
>>> import json
>>> import logging
>>> import apache_beam as beam
>>> from apache_beam.options.pipeline_options import PipelineOptions
>>> from apache_beam.io.kafka import ReadFromKafka,
>>> default_io_expansion_service
>>> def run_beam_pipeline():
>>> logging.getLogger().setLevel(logging.INFO)
>>> consumer_config = {
>>> 'bootstrap.servers':
>>> 'cluster-0-kafka-bootstrap.strimzi.svc.cluster.local:9092',
>>> 'group.id': 'beamgrouptest',
>>> 'auto.offset.reset': 'earliest',
>>> 'key.deserializer':
>>> 'org.apache.kafka.common.serialization.StringDeserializer',
>>> 'value.deserializer':
>>> 'org.apache.kafka.common.serialization.StringDeserializer',
>>> }
>>> topic = 'locations'
>>> flink_options = PipelineOptions([
>>> "--runner=PortableRunner",
>>> "--artifact_endpoint=beam-jobserver:8098",
>>> "--job_endpoint=beam-jobserver:8099",
>>> "--environment_type=EXTERNAL",
>>> "--environment_config=beam-worker-pool:50000",
>>> # "--environment_config={\"command\": \"/opt/apache/beam/boot\"}",
>>> ])
>>> with beam.Pipeline(options=flink_options) as pipeline:
>>> messages = (
>>> pipeline
>>> | "Read from Kafka" >> ReadFromKafka(
>>> consumer_config=consumer_config,
>>> topics=[topic],
>>> with_metadata=False,
>>> expansion_service=default_io_expansion_service(
>>> append_args=[
>>> '--defaultEnvironmentType=PROCESS',
>>> "--defaultEnvironmentConfig={\"command\": \"/opt/apache/beam/boot\"}",
>>> ]
>>> )
>>> )
>>> | "Print messages" >> beam.Map(print)
>>> )
>>> logging.info("Pipeline execution completed.")
>>> if __name__ == '__main__':
>>> run_beam_pipeline()
>>> ```
>>> When starting a job here is logs, it is downloading java expansion
>>> service.
>>> python3 testing.py
>>> <jemalloc>: MADV_DONTNEED does not work (memset will be used instead)
>>> <jemalloc>: (This is the expected behaviour if you are running under
>>> QEMU)
>>> INFO:apache_beam.utils.subprocess_server:Using cached job server jar
>>> from
>>> https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-io-expansion-service/2.48.0/beam-sdks-java-io-expansion-service-2.48.0.jar
>>> INFO:root:Starting a JAR-based expansion service from JAR
>>> /root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.48.0.jar
>>> INFO:apache_beam.utils.subprocess_server:Starting service with ['java'
>>> '-jar'
>>> '/root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.48.0.jar'
>>> '35371'
>>> '--filesToStage=/root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.48.0.jar'
>>> '--defaultEnvironmentType=PROCESS' '--defaultEnvironmentConfig={"command":
>>> "/opt/apache/beam/boot"}']
>>> INFO:apache_beam.utils.subprocess_server:Starting expansion service at
>>> localhost:35371
>>> INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:00 AM
>>> org.apache.beam.sdk.expansion.service.ExpansionService
>>> loadRegisteredTransforms
>>> INFO:apache_beam.utils.subprocess_server:INFO: Registering external
>>> transforms: [beam:transform:org.apache.beam:kafka_read_with_metadata:v1,
>>> beam:transform:org.apache.beam:kafka_read_without_metadata:v1,
>>> beam:transform:org.apache.beam:kafka_write:v1,
>>> beam:external:java:generate_sequence:v1]
>>> INFO:apache_beam.utils.subprocess_server:
>>> INFO:apache_beam.utils.subprocess_server:Registered transforms:
>>> INFO:apache_beam.utils.subprocess_server: 
>>> beam:transform:org.apache.beam:kafka_read_with_metadata:v1:
>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@5c6648b0
>>> INFO:apache_beam.utils.subprocess_server: 
>>> beam:transform:org.apache.beam:kafka_read_without_metadata:v1:
>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@6f1de4c7
>>> INFO:apache_beam.utils.subprocess_server: 
>>> beam:transform:org.apache.beam:kafka_write:v1:
>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@459e9125
>>> INFO:apache_beam.utils.subprocess_server: 
>>> beam:external:java:generate_sequence:v1:
>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@128d2484
>>> INFO:apache_beam.utils.subprocess_server:
>>> INFO:apache_beam.utils.subprocess_server:Registered
>>> SchemaTransformProviders:
>>> INFO:apache_beam.utils.subprocess_server:
>>> beam:schematransform:org.apache.beam:kafka_read:v1
>>> INFO:apache_beam.utils.subprocess_server:
>>> beam:schematransform:org.apache.beam:kafka_write:v1
>>> WARNING:root:Waiting for grpc channel to be ready at localhost:35371.
>>> WARNING:root:Waiting for grpc channel to be ready at localhost:35371.
>>> WARNING:root:Waiting for grpc channel to be ready at localhost:35371.
>>> WARNING:root:Waiting for grpc channel to be ready at localhost:35371.
>>> INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:06 AM
>>> org.apache.beam.sdk.expansion.service.ExpansionService expand
>>> INFO:apache_beam.utils.subprocess_server:INFO: Expanding 'Read from
>>> Kafka' with URN
>>> 'beam:transform:org.apache.beam:kafka_read_without_metadata:v1'
>>> INFO:apache_beam.utils.subprocess_server:Dependencies list: {}
>>> INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:07 AM
>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader
>>> payloadToConfig
>>> INFO:apache_beam.utils.subprocess_server:WARNING: Configuration class
>>> 'org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration' has no
>>> schema registered. Attempting to construct with setter approach.
>>> INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:08 AM
>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader
>>> payloadToConfig
>>> INFO:apache_beam.utils.subprocess_server:WARNING: Configuration class
>>> 'org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration' has no
>>> schema registered. Attempting to construct with setter approach.
>>> INFO:root:Default Python SDK image for environment is
>>> apache/beam_python3.8_sdk:2.48.0
>>> INFO:apache_beam.runners.portability.fn_api_runner.translations:====================
>>> <function pack_combiners at 0x402e7a95e0> ====================
>>> INFO:apache_beam.runners.portability.fn_api_runner.translations:====================
>>> <function lift_combiners at 0x402e7a9670> ====================
>>> INFO:apache_beam.runners.portability.fn_api_runner.translations:====================
>>> <function sort_stages at 0x402e7a9dc0> ====================
>>> INFO:apache_beam.runners.portability.portable_runner:Job state changed
>>> to STOPPED
>>> INFO:apache_beam.runners.portability.portable_runner:Job state changed
>>> INFO:apache_beam.runners.portability.portable_runner:Job state changed
>>> to RUNNING
>>> *Error*
>>> 2023-08-09 10:25:37,146 INFO  
>>> org.apache.flink.configuration.GlobalConfiguration
>>>           [] - Loading configuration property: taskmanager.rpc.port,
>>> 6122
>>> 2023-08-09 10:25:37,260 INFO  org.apache.flink.runtime.taskmanager.Task
>>>                   [] - Source: Impulse -> [3]Read from
>>> Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor),
>>> KafkaIO.ReadSourceDescriptors} (1/1)#0
>>> (3a42a4a4b7edf55899dc956496d8f99b_cbc357ccb763df2852fee8c4fc7d55f2_0_0)
>>> switched from INITIALIZING to RUNNING.
>>> 2023-08-09 10:25:37,724 INFO  
>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder
>>> [] - Finished to build heap keyed state-backend.
>>> 2023-08-09 10:25:37,731 INFO
>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend    [] -
>>> Initializing heap keyed state backend with stream factory.
>>> 2023-08-09 10:25:37,771 INFO  
>>> org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService
>>> [] - getProcessBundleDescriptor request with id 1-4
>>> 2023-08-09 10:25:37,876 INFO  
>>> /usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:889
>>> [] - Creating insecure state channel for localhost:45429.
>>> 2023-08-09 10:25:37,877 INFO  
>>> /usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:896
>>> [] - State channel established.
>>> 2023-08-09 10:25:37,918 INFO  
>>> /usr/local/lib/python3.8/site-packages/apache_beam/transforms/environments.py:376
>>> [] - Default Python SDK image for environment is
>>> apache/beam_python3.8_sdk:2.48.0
>>> 2023-08-09 10:25:37,928 ERROR
>>> /usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:299
>>> [] - Error processing instruction 1. Original traceback is
>>> Traceback (most recent call last):
>>>   File
>>> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 295, in _execute
>>>     response = task()
>>>   File
>>> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 370, in <lambda>
>>>     lambda: self.create_worker().do_instruction(request), request)
>>>   File
>>> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 629, in do_instruction
>>>     return getattr(self, request_type)(
>>>   File
>>> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 660, in process_bundle
>>>     bundle_processor = self.bundle_processor_cache.get(
>>>   File
>>> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 491, in get
>>>     processor = bundle_processor.BundleProcessor(
>>>   File
>>> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 876, in __init__
>>> _verify_descriptor_created_in_a_compatible_env(process_bundle_descriptor)
>>>   File
>>> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 839, in _verify_descriptor_created_in_a_compatible_env
>>>     raise RuntimeError(
>>> *RuntimeError: Pipeline construction environment and pipeline runtime
>>> environment are not compatible. If you use a custom container image, check
>>> that the Python interpreter minor version and the Apache Beam version in
>>> your image match the versions used at pipeline construction time.
>>> Submission environment: beam:version:sdk_base:apache/beam_java8_sdk:2.48.0.
>>> Runtime environment:
>>> beam:version:sdk_base:apache/beam_python3.8_sdk:2.48.0.*
>>> 2023-08-09 10:25:37,931 INFO  org.apache.flink.runtime.taskmanager.Task
>>>                   [] - [3]Read from Kafka/{KafkaIO.Read, Remove Kafka
>>> Metadata} -> [1]Print messages (1/1)#0
>>> (3a42a4a4b7edf55899dc956496d8f99b_03f93075562d7d50bb0b07080b2ebe35_0_0)
>>> switched from INITIALIZING to RUNNING.
>>> 2023-08-09 10:28:37,784 WARN  org.apache.flink.runtime.taskmanager.Task
>>>                   [] - Source: Impulse -> [3]Read from
>>> Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor),
>>> KafkaIO.ReadSourceDescriptors} (1/1)#0
>>> (3a42a4a4b7edf55899dc956496d8f99b_cbc357ccb763df2852fee8c4fc7d55f2_0_0)
>>> switched from RUNNING to FAILED with failure cause:
>>> java.lang.RuntimeException: Failed to start remote bundle
>>> Thanks
>>> kapil Dev

