Hi Kapil,

if you don't have a special reason for running the jobserver manually, you can let Beam Python SDK to run it for you (and let it configure accordingly). You just need to pass `--runner=flink` (and --flink_master) to your flink_options (or via command-line). As Sam suggested, it would be good to try to run minimal pipeline without Kafka (which brings the complexity of cross-language pipeline) to see if the problem is with the jobserver or the expansion service.

I also have a working repo with k8s (minikube), flink and kafka - dockerfile: [1] (somewhat older versions), deployment: [2], example pipeline: [3].

Best,

 Jan


[1] https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/env/docker/flink/Dockerfile

[2] https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/env/manifests/flink.yaml

[3] https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter6/src/main/python/max_word_length.py

On 8/14/23 22:12, Sam Bourne wrote:
Hey Kapil,

I grappled with a similar deployment and created this repo <https://github.com/sambvfx/beam-flink-k8s> [1] to attempt to provide others with some nuggets of useful information. We were running cross language pipelines on flink connecting PubsubIO <https://github.com/apache/beam/blob/v2.48.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L171> [2] to other misc python transforms. No promises it will help, but feel free to take a look as it's a close approximation to the setup that we had working.

Your particular error seems related to the Kafka transform. Does a pure python pipeline execute as expected?

[1] https://github.com/sambvfx/beam-flink-k8s
[2] https://github.com/apache/beam/blob/v2.48.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L171


On Mon, Aug 14, 2023 at 11:05 AM Daniel Chen via user <user@beam.apache.org> wrote:

    Not the OP, but is it possible to join the slack channel without
    an apache.org <http://apache.org> email address? I tried joining
    slack previously for support and gave up because it looked like it
    wasn't.

    On Mon, Aug 14, 2023 at 10:58 AM Kenneth Knowles <k...@apache.org>
    wrote:

        There is a slack channel linked from
        https://beam.apache.org/community/contact-us/ it is #beam on
        the-asf.slack.com <http://the-asf.slack.com>

        (you find this via beam.apache.org <http://beam.apache.org> ->
        Community -> Contact Us)

        It sounds like an issue with running a multi-language pipeline
        on the portable flink runner. (something which I am not
        equipped to help with in detail)

        Kenn

        On Wed, Aug 9, 2023 at 2:51 PM kapil singh
        <kapilsing...@gmail.com> wrote:

            Hey,

            I've been grappling with this issue for the past five days
            and, despite my continuous efforts, I haven't found a
            resolution. Additionally, I've been unable to locate a
            Slack channel for Beam where I might seek assistance.

            issue

            *RuntimeError: Pipeline construction environment and
            pipeline runtime environment are not compatible. If you
            use a custom container image, check that the Python
            interpreter minor version and the Apache Beam version in
            your image match the versions used at pipeline
            construction time. Submission environment:
            beam:version:sdk_base:apache/beam_java8_sdk:2.48.0.
            Runtime environment:
            beam:version:sdk_base:apache/beam_python3.8_sdk:2.48.0.*



            Here what i am trying to do

             i am running job from kubernetes container that hits on
            job server and then job manager and task manager
            task manager and job manager is one Container

            Here is  My custom Dockerfile. name:custom-flink

            # Starting with the base Flink image
            FROM apache/flink:1.16-java11
            ARG FLINK_VERSION=1.16
            ARG KAFKA_VERSION=2.8.0

            # Install python3.8 and its associated dependencies,
            followed by pyflink
            RUN set -ex; \
            apt-get update && \
            apt-get install -y build-essential libssl-dev zlib1g-dev
            libbz2-dev libffi-dev lzma liblzma-dev && \
            wget
            https://www.python.org/ftp/python/3.8.0/Python-3.8.0.tgz && \
            tar -xvf Python-3.8.0.tgz && \
            cd Python-3.8.0 && \
            ./configure --without-tests --enable-shared && \
            make -j4 && \
            make install && \
            ldconfig /usr/local/lib && \
            cd .. && rm -f Python-3.8.0.tgz && rm -rf Python-3.8.0 && \
            ln -s /usr/local/bin/python3.8 /usr/local/bin/python && \
            ln -s /usr/local/bin/pip3.8 /usr/local/bin/pip && \
            apt-get clean && \
            rm -rf /var/lib/apt/lists/* && \
            python -m pip install --upgrade pip; \
            pip install apache-flink==${FLINK_VERSION}; \
            pip install kafka-python

            RUN pip install --no-cache-dir apache-beam[gcp]==2.48.0

            # Copy files from official SDK image, including
            script/dependencies.
            COPY --from=apache/beam_python3.8_sdk:2.48.0
            /opt/apache/beam/ /opt/apache/beam/

            # java SDK
            COPY --from=apache/beam_java11_sdk:2.48.0
            /opt/apache/beam/ /opt/apache/beam_java/

            RUN apt-get update && apt-get install -y python3-venv &&
            rm -rf /var/lib/apt/lists/*

            # Give permissions to the /opt/apache/beam-venv directory
            RUN mkdir -p /opt/apache/beam-venv && chown -R 9999:9999
            /opt/apache/beam-venv

            Here is my Deployment file for Job manager,Task manager
            plus worker-pool and job server


            apiVersion: v1
            kind: Service
            metadata:
            name: flink-jobmanager
            namespace: flink
            spec:
            type: ClusterIP
            ports:
            - name: rpc
            port: 6123
            - name: blob-server
            port: 6124
            - name: webui
            port: 8081
            selector:
            app: flink
            component: jobmanager
            ---
            apiVersion: v1
            kind: Service
            metadata:
            name: beam-worker-pool
            namespace: flink
            spec:
            selector:
            app: flink
            component: taskmanager
            ports:
            - protocol: TCP
            port: 50000
            targetPort: 50000
            name: pool
            ---
            apiVersion: apps/v1
            kind: Deployment
            metadata:
            name: flink-jobmanager
            namespace: flink
            spec:
            replicas: 1
            selector:
            matchLabels:
            app: flink
            component: jobmanager
            template:
            metadata:
            labels:
            app: flink
            component: jobmanager
            spec:
            containers:
            - name: jobmanager
            image: custom-flink:latest
            imagePullPolicy: IfNotPresent
            args: ["jobmanager"]
            ports:
            - containerPort: 6123
            name: rpc
            - containerPort: 6124
            name: blob-server
            - containerPort: 8081
            name: webui
            livenessProbe:
            tcpSocket:
            port: 6123
            initialDelaySeconds: 30
            periodSeconds: 60
            volumeMounts:
            - name: flink-config-volume
            mountPath: /opt/flink/conf
            - name: flink-staging
            mountPath: /tmp/beam-artifact-staging
            securityContext:
            runAsUser: 9999
            resources:
            requests:
            memory: "1Gi"
            cpu: "1"
            limits:
            memory: "1Gi"
            cpu: "1"
            volumes:
            - name: flink-config-volume
            configMap:
            name: flink-config
            items:
            - key: flink-conf.yaml
            path: flink-conf.yaml
            - key: log4j-console.properties
            path: log4j-console.properties
            - name: flink-staging
            persistentVolumeClaim:
            claimName: staging-artifacts-claim
            ---
            apiVersion: apps/v1
            kind: Deployment
            metadata:
            name: flink-taskmanager
            namespace: flink
            spec:
            replicas: 1
            selector:
            matchLabels:
            app: flink
            component: taskmanager
            template:
            metadata:
            labels:
            app: flink
            component: taskmanager
            spec:
            containers:
            - name: taskmanager-beam-worker
            image: custom-flink:latest
            imagePullPolicy: IfNotPresent
            args:
            - /bin/bash
            - -c
            - "/opt/flink/bin/taskmanager.sh start-foreground & python
            -m apache_beam.runners.worker.worker_pool_main
            --container_executable=/opt/apache/beam/boot
            --service_port=50000 & tail -f /dev/null"
            ports:
            - containerPort: 6122
            name: rpc
            - containerPort: 6125
            name: query-state
            - containerPort: 50000
            name: pool
            livenessProbe:
            tcpSocket:
            port: 6122
            initialDelaySeconds: 30
            periodSeconds: 60
            volumeMounts:
            - name: flink-config-volume
            mountPath: /opt/flink/conf/
            - name: flink-staging
            mountPath: /tmp/beam-artifact-staging
            securityContext:
            runAsUser: 9999
            resources:
            requests:
            memory: "4Gi"
            cpu: "4"
            limits:
            memory: "4Gi"
            cpu: "4"
            volumes:
            - name: flink-config-volume
            configMap:
            name: flink-config
            items:
            - key: flink-conf.yaml
            path: flink-conf.yaml
            - key: log4j-console.properties
            path: log4j-console.properties
            - name: flink-staging
            persistentVolumeClaim:
            claimName: staging-artifacts-claim
            ---
            apiVersion: apps/v1
            kind: Deployment
            metadata:
            name: beam-jobserver
            namespace: flink
            spec:
            replicas: 1
            selector:
            matchLabels:
            app: beam
            component: jobserver
            template:
            metadata:
            labels:
            app: beam
            component: jobserver
            spec:
            containers:
            - name: beam-jobserver
            image: apache/beam_flink1.16_job_server:2.48.0
            args: ["--flink-master=flink-jobmanager:8081"]
            ports:
            - containerPort: 8097
            - containerPort: 8098
            - containerPort: 8099
            volumeMounts:
            - name: beam-staging
            mountPath: /tmp/beam-artifact-staging
            resources:
            requests:
            memory: "2Gi"
            cpu: "2"
            limits:
            memory: "2Gi"
            cpu: "2"
            volumes:
            - name: beam-staging
            persistentVolumeClaim:
            claimName: staging-artifacts-claim
            ---
            apiVersion: v1
            kind: Service
            metadata:
            name: beam-jobserver
            namespace: flink
            spec:
            type: ClusterIP
            ports:
            - name: grpc-port
            port: 8097
            targetPort: 8097
            - name: expansion-port
            port: 8098
            targetPort: 8098
            - name: job-manage-port
            port: 8099
            targetPort: 8099
            selector:
            app: beam
            component: jobserver

            pvc

            apiVersion: v1
            kind: PersistentVolumeClaim
            metadata:
            name: staging-artifacts-claim
            namespace: flink
            spec:
            accessModes:
            - ReadWriteOnce
            resources:
            requests:
            storage: 5Gi
            storageClassName: standard



            Then I am running a Pod  with
            apache/beam_python3.8_sdk:2.48.0.
            and installing java in it because expansion required to
            run the code here is my code that is running from above
            container
            ```
            import json
            import logging
            import apache_beam as beam
            from apache_beam.options.pipeline_options import
            PipelineOptions
            from apache_beam.io.kafka import ReadFromKafka,
            default_io_expansion_service

            def run_beam_pipeline():
            logging.getLogger().setLevel(logging.INFO)

            consumer_config = {
            'bootstrap.servers':
            'cluster-0-kafka-bootstrap.strimzi.svc.cluster.local:9092',
            'group.id <http://group.id>': 'beamgrouptest',
            'auto.offset.reset': 'earliest',
            'key.deserializer':
            'org.apache.kafka.common.serialization.StringDeserializer',
            'value.deserializer':
            'org.apache.kafka.common.serialization.StringDeserializer',
            }

            topic = 'locations'

            flink_options = PipelineOptions([
            "--runner=PortableRunner",
            "--artifact_endpoint=beam-jobserver:8098",
            "--job_endpoint=beam-jobserver:8099",
            "--environment_type=EXTERNAL",
            "--environment_config=beam-worker-pool:50000",
            # "--environment_config={\"command\":
            \"/opt/apache/beam/boot\"}",
            ])

            with beam.Pipeline(options=flink_options) as pipeline:
            messages = (
            pipeline
            | "Read from Kafka" >> ReadFromKafka(
            consumer_config=consumer_config,
            topics=[topic],
            with_metadata=False,
            expansion_service=default_io_expansion_service(
            append_args=[
            '--defaultEnvironmentType=PROCESS',
            "--defaultEnvironmentConfig={\"command\":
            \"/opt/apache/beam/boot\"}",
            ]
            )
            )
            | "Print messages" >> beam.Map(print)
            )

            logging.info("Pipeline execution completed.")

            if __name__ == '__main__':
            run_beam_pipeline()

            ```

            When starting a job here is logs, it is downloading java
            expansion service.


            python3 testing.py





            <jemalloc>: MADV_DONTNEED does not work (memset will be
            used instead)

            <jemalloc>: (This is the expected behaviour if you are
            running under QEMU)

            INFO:apache_beam.utils.subprocess_server:Using cached job
            server jar from
            
https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-io-expansion-service/2.48.0/beam-sdks-java-io-expansion-service-2.48.0.jar

            INFO:root:Starting a JAR-based expansion service from JAR
            
/root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.48.0.jar

            INFO:apache_beam.utils.subprocess_server:Starting service
            with ['java' '-jar'
            
'/root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.48.0.jar'
            '35371'
            
'--filesToStage=/root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.48.0.jar'
            '--defaultEnvironmentType=PROCESS'
            '--defaultEnvironmentConfig={"command":
            "/opt/apache/beam/boot"}']

            INFO:apache_beam.utils.subprocess_server:Starting
            expansion service at localhost:35371

            INFO:apache_beam.utils.subprocess_server:Aug 09, 2023
            8:35:00 AM
            org.apache.beam.sdk.expansion.service.ExpansionService
            loadRegisteredTransforms

            INFO:apache_beam.utils.subprocess_server:INFO: Registering
            external transforms:
            [beam:transform:org.apache.beam:kafka_read_with_metadata:v1,
            beam:transform:org.apache.beam:kafka_read_without_metadata:v1,
            beam:transform:org.apache.beam:kafka_write:v1,
            beam:external:java:generate_sequence:v1]

            INFO:apache_beam.utils.subprocess_server:

            INFO:apache_beam.utils.subprocess_server:Registered
            transforms:

            
INFO:apache_beam.utils.subprocess_server:beam:transform:org.apache.beam:kafka_read_with_metadata:v1:
            
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@5c6648b0

            
INFO:apache_beam.utils.subprocess_server:beam:transform:org.apache.beam:kafka_read_without_metadata:v1:
            
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@6f1de4c7

            
INFO:apache_beam.utils.subprocess_server:beam:transform:org.apache.beam:kafka_write:v1:
            
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@459e9125

            
INFO:apache_beam.utils.subprocess_server:beam:external:java:generate_sequence:v1:
            
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@128d2484

            INFO:apache_beam.utils.subprocess_server:

            INFO:apache_beam.utils.subprocess_server:Registered
            SchemaTransformProviders:

            
INFO:apache_beam.utils.subprocess_server:beam:schematransform:org.apache.beam:kafka_read:v1

            
INFO:apache_beam.utils.subprocess_server:beam:schematransform:org.apache.beam:kafka_write:v1

            WARNING:root:Waiting for grpc channel to be ready at
            localhost:35371.




            WARNING:root:Waiting for grpc channel to be ready at
            localhost:35371.

            WARNING:root:Waiting for grpc channel to be ready at
            localhost:35371.

            WARNING:root:Waiting for grpc channel to be ready at
            localhost:35371.

            INFO:apache_beam.utils.subprocess_server:Aug 09, 2023
            8:35:06 AM
            org.apache.beam.sdk.expansion.service.ExpansionService expand

            INFO:apache_beam.utils.subprocess_server:INFO: Expanding
            'Read from Kafka' with URN
            'beam:transform:org.apache.beam:kafka_read_without_metadata:v1'

            INFO:apache_beam.utils.subprocess_server:Dependencies list: {}

            INFO:apache_beam.utils.subprocess_server:Aug 09, 2023
            8:35:07 AM
            
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader
            payloadToConfig

            INFO:apache_beam.utils.subprocess_server:WARNING:
            Configuration class
            'org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration'
            has no schema registered. Attempting to construct with
            setter approach.

            INFO:apache_beam.utils.subprocess_server:Aug 09, 2023
            8:35:08 AM
            
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader
            payloadToConfig

            INFO:apache_beam.utils.subprocess_server:WARNING:
            Configuration class
            'org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration'
            has no schema registered. Attempting to construct with
            setter approach.

            INFO:root:Default Python SDK image for environment is
            apache/beam_python3.8_sdk:2.48.0

            
INFO:apache_beam.runners.portability.fn_api_runner.translations:====================
            <function pack_combiners at 0x402e7a95e0> ====================

            
INFO:apache_beam.runners.portability.fn_api_runner.translations:====================
            <function lift_combiners at 0x402e7a9670> ====================

            
INFO:apache_beam.runners.portability.fn_api_runner.translations:====================
            <function sort_stages at 0x402e7a9dc0> ====================

            INFO:apache_beam.runners.portability.portable_runner:Job
            state changed to STOPPED

            INFO:apache_beam.runners.portability.portable_runner:Job
            state changed to STARTING

            INFO:apache_beam.runners.portability.portable_runner:Job
            state changed to RUNNING








            *Error*

            2023-08-09 10:25:37,146
            INFOorg.apache.flink.configuration.GlobalConfiguration []
            - Loading configuration property: taskmanager.rpc.port, 6122

            2023-08-09 10:25:37,260
            INFOorg.apache.flink.runtime.taskmanager.Task[] - Source:
            Impulse -> [3]Read from
            
Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor),
            KafkaIO.ReadSourceDescriptors} (1/1)#0
            
(3a42a4a4b7edf55899dc956496d8f99b_cbc357ccb763df2852fee8c4fc7d55f2_0_0)
            switched from INITIALIZING to RUNNING.

            2023-08-09 10:25:37,724
            INFOorg.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder
            [] - Finished to build heap keyed state-backend.

            2023-08-09 10:25:37,731
            INFOorg.apache.flink.runtime.state.heap.HeapKeyedStateBackend[]
            - Initializing heap keyed state backend with stream factory.

            2023-08-09 10:25:37,771
            
INFOorg.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService
            [] - getProcessBundleDescriptor request with id 1-4

            2023-08-09 10:25:37,876
            
INFO/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:889
            [] - Creating insecure state channel for localhost:45429.

            2023-08-09 10:25:37,877
            
INFO/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:896
            [] - State channel established.

            2023-08-09 10:25:37,918
            
INFO/usr/local/lib/python3.8/site-packages/apache_beam/transforms/environments.py:376
            [] - Default Python SDK image for environment is
            apache/beam_python3.8_sdk:2.48.0

            2023-08-09 10:25:37,928 ERROR
            
/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:299
            [] - Error processing instruction 1. Original traceback is

            Traceback (most recent call last):

            File
            
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
            line 295, in _execute

            response = task()

            File
            
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
            line 370, in <lambda>

            lambda: self.create_worker().do_instruction(request), request)

            File
            
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
            line 629, in do_instruction

            return getattr(self, request_type)(

            File
            
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
            line 660, in process_bundle

            bundle_processor = self.bundle_processor_cache.get(

            File
            
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
            line 491, in get

            processor = bundle_processor.BundleProcessor(

            File
            
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
            line 876, in __init__

            
_verify_descriptor_created_in_a_compatible_env(process_bundle_descriptor)

            File
            
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
            line 839, in _verify_descriptor_created_in_a_compatible_env

            raise RuntimeError(

            *RuntimeError: Pipeline construction environment and
            pipeline runtime environment are not compatible. If you
            use a custom container image, check that the Python
            interpreter minor version and the Apache Beam version in
            your image match the versions used at pipeline
            construction time. Submission environment:
            beam:version:sdk_base:apache/beam_java8_sdk:2.48.0.
            Runtime environment:
            beam:version:sdk_base:apache/beam_python3.8_sdk:2.48.0.*


            2023-08-09 10:25:37,931
            INFOorg.apache.flink.runtime.taskmanager.Task[] - [3]Read
            from Kafka/{KafkaIO.Read, Remove Kafka Metadata} ->
            [1]Print messages (1/1)#0
            
(3a42a4a4b7edf55899dc956496d8f99b_03f93075562d7d50bb0b07080b2ebe35_0_0)
            switched from INITIALIZING to RUNNING.

            2023-08-09 10:28:37,784
            WARNorg.apache.flink.runtime.taskmanager.Task[] - Source:
            Impulse -> [3]Read from
            
Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor),
            KafkaIO.ReadSourceDescriptors} (1/1)#0
            
(3a42a4a4b7edf55899dc956496d8f99b_cbc357ccb763df2852fee8c4fc7d55f2_0_0)
            switched from RUNNING to FAILED with failure cause:
            java.lang.RuntimeException: Failed to start remote bundle





            Thanks
            kapil Dev

Reply via email to