There is a slack channel linked from
https://beam.apache.org/community/contact-us/ it is #beam on
the-asf.slack.com

(you find this via beam.apache.org -> Community -> Contact Us)

It sounds like an issue with running a multi-language pipeline on the
portable flink runner. (something which I am not equipped to help with in
detail)

Kenn

On Wed, Aug 9, 2023 at 2:51 PM kapil singh <kapilsing...@gmail.com> wrote:

> Hey,
>
> I've been grappling with this issue for the past five days and, despite my
> continuous efforts, I haven't found a resolution. Additionally, I've been
> unable to locate a Slack channel for Beam where I might seek assistance.
>
> issue
>
> *RuntimeError: Pipeline construction environment and pipeline runtime
> environment are not compatible. If you use a custom container image, check
> that the Python interpreter minor version and the Apache Beam version in
> your image match the versions used at pipeline construction time.
> Submission environment: beam:version:sdk_base:apache/beam_java8_sdk:2.48.0.
> Runtime environment:
> beam:version:sdk_base:apache/beam_python3.8_sdk:2.48.0.*
>
>
> Here what i am trying to do
>
>  i am running job from kubernetes container  that hits on job server and
> then job manager and task manager
> task manager and job manager is one Container
>
> Here is  My custom Dockerfile. name:custom-flink
>
> # Starting with the base Flink image
> FROM apache/flink:1.16-java11
> ARG FLINK_VERSION=1.16
> ARG KAFKA_VERSION=2.8.0
>
> # Install python3.8 and its associated dependencies, followed by pyflink
> RUN set -ex; \
> apt-get update && \
> apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev
> libffi-dev lzma liblzma-dev && \
> wget https://www.python.org/ftp/python/3.8.0/Python-3.8.0.tgz && \
> tar -xvf Python-3.8.0.tgz && \
> cd Python-3.8.0 && \
> ./configure --without-tests --enable-shared && \
> make -j4 && \
> make install && \
> ldconfig /usr/local/lib && \
> cd .. && rm -f Python-3.8.0.tgz && rm -rf Python-3.8.0 && \
> ln -s /usr/local/bin/python3.8 /usr/local/bin/python && \
> ln -s /usr/local/bin/pip3.8 /usr/local/bin/pip && \
> apt-get clean && \
> rm -rf /var/lib/apt/lists/* && \
> python -m pip install --upgrade pip; \
> pip install apache-flink==${FLINK_VERSION}; \
> pip install kafka-python
>
> RUN pip install --no-cache-dir apache-beam[gcp]==2.48.0
>
> # Copy files from official SDK image, including script/dependencies.
> COPY --from=apache/beam_python3.8_sdk:2.48.0 /opt/apache/beam/
> /opt/apache/beam/
>
> # java SDK
> COPY --from=apache/beam_java11_sdk:2.48.0 /opt/apache/beam/
> /opt/apache/beam_java/
>
> RUN apt-get update && apt-get install -y python3-venv && rm -rf
> /var/lib/apt/lists/*
>
> # Give permissions to the /opt/apache/beam-venv directory
> RUN mkdir -p /opt/apache/beam-venv && chown -R 9999:9999
> /opt/apache/beam-venv
>
> Here is my Deployment file for Job manager,Task manager plus worker-pool
> and job server
>
>
> apiVersion: v1
> kind: Service
> metadata:
> name: flink-jobmanager
> namespace: flink
> spec:
> type: ClusterIP
> ports:
> - name: rpc
> port: 6123
> - name: blob-server
> port: 6124
> - name: webui
> port: 8081
> selector:
> app: flink
> component: jobmanager
> ---
> apiVersion: v1
> kind: Service
> metadata:
> name: beam-worker-pool
> namespace: flink
> spec:
> selector:
> app: flink
> component: taskmanager
> ports:
> - protocol: TCP
> port: 50000
> targetPort: 50000
> name: pool
> ---
> apiVersion: apps/v1
> kind: Deployment
> metadata:
> name: flink-jobmanager
> namespace: flink
> spec:
> replicas: 1
> selector:
> matchLabels:
> app: flink
> component: jobmanager
> template:
> metadata:
> labels:
> app: flink
> component: jobmanager
> spec:
> containers:
> - name: jobmanager
> image: custom-flink:latest
> imagePullPolicy: IfNotPresent
> args: ["jobmanager"]
> ports:
> - containerPort: 6123
> name: rpc
> - containerPort: 6124
> name: blob-server
> - containerPort: 8081
> name: webui
> livenessProbe:
> tcpSocket:
> port: 6123
> initialDelaySeconds: 30
> periodSeconds: 60
> volumeMounts:
> - name: flink-config-volume
> mountPath: /opt/flink/conf
> - name: flink-staging
> mountPath: /tmp/beam-artifact-staging
> securityContext:
> runAsUser: 9999
> resources:
> requests:
> memory: "1Gi"
> cpu: "1"
> limits:
> memory: "1Gi"
> cpu: "1"
> volumes:
> - name: flink-config-volume
> configMap:
> name: flink-config
> items:
> - key: flink-conf.yaml
> path: flink-conf.yaml
> - key: log4j-console.properties
> path: log4j-console.properties
> - name: flink-staging
> persistentVolumeClaim:
> claimName: staging-artifacts-claim
> ---
> apiVersion: apps/v1
> kind: Deployment
> metadata:
> name: flink-taskmanager
> namespace: flink
> spec:
> replicas: 1
> selector:
> matchLabels:
> app: flink
> component: taskmanager
> template:
> metadata:
> labels:
> app: flink
> component: taskmanager
> spec:
> containers:
> - name: taskmanager-beam-worker
> image: custom-flink:latest
> imagePullPolicy: IfNotPresent
> args:
> - /bin/bash
> - -c
> - "/opt/flink/bin/taskmanager.sh start-foreground & python -m
> apache_beam.runners.worker.worker_pool_main
> --container_executable=/opt/apache/beam/boot --service_port=50000 & tail -f
> /dev/null"
> ports:
> - containerPort: 6122
> name: rpc
> - containerPort: 6125
> name: query-state
> - containerPort: 50000
> name: pool
> livenessProbe:
> tcpSocket:
> port: 6122
> initialDelaySeconds: 30
> periodSeconds: 60
> volumeMounts:
> - name: flink-config-volume
> mountPath: /opt/flink/conf/
> - name: flink-staging
> mountPath: /tmp/beam-artifact-staging
> securityContext:
> runAsUser: 9999
> resources:
> requests:
> memory: "4Gi"
> cpu: "4"
> limits:
> memory: "4Gi"
> cpu: "4"
> volumes:
> - name: flink-config-volume
> configMap:
> name: flink-config
> items:
> - key: flink-conf.yaml
> path: flink-conf.yaml
> - key: log4j-console.properties
> path: log4j-console.properties
> - name: flink-staging
> persistentVolumeClaim:
> claimName: staging-artifacts-claim
> ---
> apiVersion: apps/v1
> kind: Deployment
> metadata:
> name: beam-jobserver
> namespace: flink
> spec:
> replicas: 1
> selector:
> matchLabels:
> app: beam
> component: jobserver
> template:
> metadata:
> labels:
> app: beam
> component: jobserver
> spec:
> containers:
> - name: beam-jobserver
> image: apache/beam_flink1.16_job_server:2.48.0
> args: ["--flink-master=flink-jobmanager:8081"]
> ports:
> - containerPort: 8097
> - containerPort: 8098
> - containerPort: 8099
> volumeMounts:
> - name: beam-staging
> mountPath: /tmp/beam-artifact-staging
> resources:
> requests:
> memory: "2Gi"
> cpu: "2"
> limits:
> memory: "2Gi"
> cpu: "2"
> volumes:
> - name: beam-staging
> persistentVolumeClaim:
> claimName: staging-artifacts-claim
> ---
> apiVersion: v1
> kind: Service
> metadata:
> name: beam-jobserver
> namespace: flink
> spec:
> type: ClusterIP
> ports:
> - name: grpc-port
> port: 8097
> targetPort: 8097
> - name: expansion-port
> port: 8098
> targetPort: 8098
> - name: job-manage-port
> port: 8099
> targetPort: 8099
> selector:
> app: beam
> component: jobserver
>
> pvc
>
> apiVersion: v1
> kind: PersistentVolumeClaim
> metadata:
> name: staging-artifacts-claim
> namespace: flink
> spec:
> accessModes:
> - ReadWriteOnce
> resources:
> requests:
> storage: 5Gi
> storageClassName: standard
>
>
>
> Then I am running a Pod  with apache/beam_python3.8_sdk:2.48.0.
> and installing java in it because expansion required to run the code here
> is my code that is running from above container
> ```
> import json
> import logging
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.io.kafka import ReadFromKafka,
> default_io_expansion_service
>
> def run_beam_pipeline():
> logging.getLogger().setLevel(logging.INFO)
>
> consumer_config = {
> 'bootstrap.servers':
> 'cluster-0-kafka-bootstrap.strimzi.svc.cluster.local:9092',
> 'group.id': 'beamgrouptest',
> 'auto.offset.reset': 'earliest',
> 'key.deserializer':
> 'org.apache.kafka.common.serialization.StringDeserializer',
> 'value.deserializer':
> 'org.apache.kafka.common.serialization.StringDeserializer',
> }
>
> topic = 'locations'
>
> flink_options = PipelineOptions([
> "--runner=PortableRunner",
> "--artifact_endpoint=beam-jobserver:8098",
> "--job_endpoint=beam-jobserver:8099",
> "--environment_type=EXTERNAL",
> "--environment_config=beam-worker-pool:50000",
> # "--environment_config={\"command\": \"/opt/apache/beam/boot\"}",
> ])
>
> with beam.Pipeline(options=flink_options) as pipeline:
> messages = (
> pipeline
> | "Read from Kafka" >> ReadFromKafka(
> consumer_config=consumer_config,
> topics=[topic],
> with_metadata=False,
> expansion_service=default_io_expansion_service(
> append_args=[
> '--defaultEnvironmentType=PROCESS',
> "--defaultEnvironmentConfig={\"command\": \"/opt/apache/beam/boot\"}",
> ]
> )
> )
> | "Print messages" >> beam.Map(print)
> )
>
> logging.info("Pipeline execution completed.")
>
> if __name__ == '__main__':
> run_beam_pipeline()
>
> ```
>
> When starting a job here is logs, it is downloading java expansion service.
>
>
> python3 testing.py
>
>
>
>
>
> <jemalloc>: MADV_DONTNEED does not work (memset will be used instead)
>
> <jemalloc>: (This is the expected behaviour if you are running under QEMU)
>
> INFO:apache_beam.utils.subprocess_server:Using cached job server jar from
> https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-io-expansion-service/2.48.0/beam-sdks-java-io-expansion-service-2.48.0.jar
>
> INFO:root:Starting a JAR-based expansion service from JAR
> /root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.48.0.jar
>
>
> INFO:apache_beam.utils.subprocess_server:Starting service with ['java'
> '-jar'
> '/root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.48.0.jar'
> '35371'
> '--filesToStage=/root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.48.0.jar'
> '--defaultEnvironmentType=PROCESS' '--defaultEnvironmentConfig={"command":
> "/opt/apache/beam/boot"}']
>
> INFO:apache_beam.utils.subprocess_server:Starting expansion service at
> localhost:35371
>
> INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:00 AM
> org.apache.beam.sdk.expansion.service.ExpansionService
> loadRegisteredTransforms
>
> INFO:apache_beam.utils.subprocess_server:INFO: Registering external
> transforms: [beam:transform:org.apache.beam:kafka_read_with_metadata:v1,
> beam:transform:org.apache.beam:kafka_read_without_metadata:v1,
> beam:transform:org.apache.beam:kafka_write:v1,
> beam:external:java:generate_sequence:v1]
>
> INFO:apache_beam.utils.subprocess_server:
>
> INFO:apache_beam.utils.subprocess_server:Registered transforms:
>
> INFO:apache_beam.utils.subprocess_server: 
> beam:transform:org.apache.beam:kafka_read_with_metadata:v1:
> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@5c6648b0
>
> INFO:apache_beam.utils.subprocess_server: 
> beam:transform:org.apache.beam:kafka_read_without_metadata:v1:
> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@6f1de4c7
>
> INFO:apache_beam.utils.subprocess_server: 
> beam:transform:org.apache.beam:kafka_write:v1:
> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@459e9125
>
> INFO:apache_beam.utils.subprocess_server: 
> beam:external:java:generate_sequence:v1:
> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@128d2484
>
> INFO:apache_beam.utils.subprocess_server:
>
> INFO:apache_beam.utils.subprocess_server:Registered
> SchemaTransformProviders:
>
> INFO:apache_beam.utils.subprocess_server:
> beam:schematransform:org.apache.beam:kafka_read:v1
>
> INFO:apache_beam.utils.subprocess_server:
> beam:schematransform:org.apache.beam:kafka_write:v1
>
> WARNING:root:Waiting for grpc channel to be ready at localhost:35371.
>
>
>
>
> WARNING:root:Waiting for grpc channel to be ready at localhost:35371.
>
> WARNING:root:Waiting for grpc channel to be ready at localhost:35371.
>
> WARNING:root:Waiting for grpc channel to be ready at localhost:35371.
>
> INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:06 AM
> org.apache.beam.sdk.expansion.service.ExpansionService expand
>
> INFO:apache_beam.utils.subprocess_server:INFO: Expanding 'Read from Kafka'
> with URN 'beam:transform:org.apache.beam:kafka_read_without_metadata:v1'
>
> INFO:apache_beam.utils.subprocess_server:Dependencies list: {}
>
> INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:07 AM
> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader
> payloadToConfig
>
> INFO:apache_beam.utils.subprocess_server:WARNING: Configuration class
> 'org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration' has no
> schema registered. Attempting to construct with setter approach.
>
> INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:08 AM
> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader
> payloadToConfig
>
> INFO:apache_beam.utils.subprocess_server:WARNING: Configuration class
> 'org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration' has no
> schema registered. Attempting to construct with setter approach.
>
> INFO:root:Default Python SDK image for environment is
> apache/beam_python3.8_sdk:2.48.0
>
> INFO:apache_beam.runners.portability.fn_api_runner.translations:====================
> <function pack_combiners at 0x402e7a95e0> ====================
>
> INFO:apache_beam.runners.portability.fn_api_runner.translations:====================
> <function lift_combiners at 0x402e7a9670> ====================
>
> INFO:apache_beam.runners.portability.fn_api_runner.translations:====================
> <function sort_stages at 0x402e7a9dc0> ====================
>
> INFO:apache_beam.runners.portability.portable_runner:Job state changed to
> STOPPED
>
> INFO:apache_beam.runners.portability.portable_runner:Job state changed to
> STARTING
>
> INFO:apache_beam.runners.portability.portable_runner:Job state changed to
> RUNNING
>
>
>
>
>
>
>
> *Error*
>
> 2023-08-09 10:25:37,146 INFO  
> org.apache.flink.configuration.GlobalConfiguration
>           [] - Loading configuration property: taskmanager.rpc.port, 6122
>
> 2023-08-09 10:25:37,260 INFO  org.apache.flink.runtime.taskmanager.Task
>                   [] - Source: Impulse -> [3]Read from
> Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor),
> KafkaIO.ReadSourceDescriptors} (1/1)#0
> (3a42a4a4b7edf55899dc956496d8f99b_cbc357ccb763df2852fee8c4fc7d55f2_0_0)
> switched from INITIALIZING to RUNNING.
>
> 2023-08-09 10:25:37,724 INFO  
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder
> [] - Finished to build heap keyed state-backend.
>
> 2023-08-09 10:25:37,731 INFO
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend    [] -
> Initializing heap keyed state backend with stream factory.
>
> 2023-08-09 10:25:37,771 INFO  
> org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService
> [] - getProcessBundleDescriptor request with id 1-4
>
> 2023-08-09 10:25:37,876 INFO  
> /usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:889
> [] - Creating insecure state channel for localhost:45429.
>
> 2023-08-09 10:25:37,877 INFO  
> /usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:896
> [] - State channel established.
>
> 2023-08-09 10:25:37,918 INFO  
> /usr/local/lib/python3.8/site-packages/apache_beam/transforms/environments.py:376
> [] - Default Python SDK image for environment is
> apache/beam_python3.8_sdk:2.48.0
>
> 2023-08-09 10:25:37,928 ERROR
> /usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:299
> [] - Error processing instruction 1. Original traceback is
>
> Traceback (most recent call last):
>
>   File
> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 295, in _execute
>
>     response = task()
>
>   File
> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 370, in <lambda>
>
>     lambda: self.create_worker().do_instruction(request), request)
>
>   File
> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 629, in do_instruction
>
>     return getattr(self, request_type)(
>
>   File
> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 660, in process_bundle
>
>     bundle_processor = self.bundle_processor_cache.get(
>
>   File
> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 491, in get
>
>     processor = bundle_processor.BundleProcessor(
>
>   File
> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 876, in __init__
>
>
> _verify_descriptor_created_in_a_compatible_env(process_bundle_descriptor)
>
>   File
> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 839, in _verify_descriptor_created_in_a_compatible_env
>
>     raise RuntimeError(
>
> *RuntimeError: Pipeline construction environment and pipeline runtime
> environment are not compatible. If you use a custom container image, check
> that the Python interpreter minor version and the Apache Beam version in
> your image match the versions used at pipeline construction time.
> Submission environment: beam:version:sdk_base:apache/beam_java8_sdk:2.48.0.
> Runtime environment:
> beam:version:sdk_base:apache/beam_python3.8_sdk:2.48.0.*
>
>
>
>
> 2023-08-09 10:25:37,931 INFO  org.apache.flink.runtime.taskmanager.Task
>                   [] - [3]Read from Kafka/{KafkaIO.Read, Remove Kafka
> Metadata} -> [1]Print messages (1/1)#0
> (3a42a4a4b7edf55899dc956496d8f99b_03f93075562d7d50bb0b07080b2ebe35_0_0)
> switched from INITIALIZING to RUNNING.
>
> 2023-08-09 10:28:37,784 WARN  org.apache.flink.runtime.taskmanager.Task
>                   [] - Source: Impulse -> [3]Read from
> Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor),
> KafkaIO.ReadSourceDescriptors} (1/1)#0
> (3a42a4a4b7edf55899dc956496d8f99b_cbc357ccb763df2852fee8c4fc7d55f2_0_0)
> switched from RUNNING to FAILED with failure cause:
> java.lang.RuntimeException: Failed to start remote bundle
>
>
>
>
> Thanks
> kapil Dev
>
>
>

Reply via email to