Not the OP, but is it possible to join the slack channel without an
apache.org email address? I tried joining slack previously for support and
gave up because it looked like it wasn't.

On Mon, Aug 14, 2023 at 10:58 AM Kenneth Knowles <k...@apache.org> wrote:

> There is a slack channel linked from
> https://beam.apache.org/community/contact-us/ it is #beam on
> the-asf.slack.com
>
> (you find this via beam.apache.org -> Community -> Contact Us)
>
> It sounds like an issue with running a multi-language pipeline on the
> portable flink runner. (something which I am not equipped to help with in
> detail)
>
> Kenn
>
> On Wed, Aug 9, 2023 at 2:51 PM kapil singh <kapilsing...@gmail.com> wrote:
>
>> Hey,
>>
>> I've been grappling with this issue for the past five days and, despite
>> my continuous efforts, I haven't found a resolution. Additionally, I've
>> been unable to locate a Slack channel for Beam where I might seek
>> assistance.
>>
>> issue
>>
>> *RuntimeError: Pipeline construction environment and pipeline runtime
>> environment are not compatible. If you use a custom container image, check
>> that the Python interpreter minor version and the Apache Beam version in
>> your image match the versions used at pipeline construction time.
>> Submission environment: beam:version:sdk_base:apache/beam_java8_sdk:2.48.0.
>> Runtime environment:
>> beam:version:sdk_base:apache/beam_python3.8_sdk:2.48.0.*
>>
>>
>> Here what i am trying to do
>>
>>  i am running job from kubernetes container  that hits on job server and
>> then job manager and task manager
>> task manager and job manager is one Container
>>
>> Here is  My custom Dockerfile. name:custom-flink
>>
>> # Starting with the base Flink image
>> FROM apache/flink:1.16-java11
>> ARG FLINK_VERSION=1.16
>> ARG KAFKA_VERSION=2.8.0
>>
>> # Install python3.8 and its associated dependencies, followed by pyflink
>> RUN set -ex; \
>> apt-get update && \
>> apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev
>> libffi-dev lzma liblzma-dev && \
>> wget https://www.python.org/ftp/python/3.8.0/Python-3.8.0.tgz && \
>> tar -xvf Python-3.8.0.tgz && \
>> cd Python-3.8.0 && \
>> ./configure --without-tests --enable-shared && \
>> make -j4 && \
>> make install && \
>> ldconfig /usr/local/lib && \
>> cd .. && rm -f Python-3.8.0.tgz && rm -rf Python-3.8.0 && \
>> ln -s /usr/local/bin/python3.8 /usr/local/bin/python && \
>> ln -s /usr/local/bin/pip3.8 /usr/local/bin/pip && \
>> apt-get clean && \
>> rm -rf /var/lib/apt/lists/* && \
>> python -m pip install --upgrade pip; \
>> pip install apache-flink==${FLINK_VERSION}; \
>> pip install kafka-python
>>
>> RUN pip install --no-cache-dir apache-beam[gcp]==2.48.0
>>
>> # Copy files from official SDK image, including script/dependencies.
>> COPY --from=apache/beam_python3.8_sdk:2.48.0 /opt/apache/beam/
>> /opt/apache/beam/
>>
>> # java SDK
>> COPY --from=apache/beam_java11_sdk:2.48.0 /opt/apache/beam/
>> /opt/apache/beam_java/
>>
>> RUN apt-get update && apt-get install -y python3-venv && rm -rf
>> /var/lib/apt/lists/*
>>
>> # Give permissions to the /opt/apache/beam-venv directory
>> RUN mkdir -p /opt/apache/beam-venv && chown -R 9999:9999
>> /opt/apache/beam-venv
>>
>> Here is my Deployment file for Job manager,Task manager plus worker-pool
>> and job server
>>
>>
>> apiVersion: v1
>> kind: Service
>> metadata:
>> name: flink-jobmanager
>> namespace: flink
>> spec:
>> type: ClusterIP
>> ports:
>> - name: rpc
>> port: 6123
>> - name: blob-server
>> port: 6124
>> - name: webui
>> port: 8081
>> selector:
>> app: flink
>> component: jobmanager
>> ---
>> apiVersion: v1
>> kind: Service
>> metadata:
>> name: beam-worker-pool
>> namespace: flink
>> spec:
>> selector:
>> app: flink
>> component: taskmanager
>> ports:
>> - protocol: TCP
>> port: 50000
>> targetPort: 50000
>> name: pool
>> ---
>> apiVersion: apps/v1
>> kind: Deployment
>> metadata:
>> name: flink-jobmanager
>> namespace: flink
>> spec:
>> replicas: 1
>> selector:
>> matchLabels:
>> app: flink
>> component: jobmanager
>> template:
>> metadata:
>> labels:
>> app: flink
>> component: jobmanager
>> spec:
>> containers:
>> - name: jobmanager
>> image: custom-flink:latest
>> imagePullPolicy: IfNotPresent
>> args: ["jobmanager"]
>> ports:
>> - containerPort: 6123
>> name: rpc
>> - containerPort: 6124
>> name: blob-server
>> - containerPort: 8081
>> name: webui
>> livenessProbe:
>> tcpSocket:
>> port: 6123
>> initialDelaySeconds: 30
>> periodSeconds: 60
>> volumeMounts:
>> - name: flink-config-volume
>> mountPath: /opt/flink/conf
>> - name: flink-staging
>> mountPath: /tmp/beam-artifact-staging
>> securityContext:
>> runAsUser: 9999
>> resources:
>> requests:
>> memory: "1Gi"
>> cpu: "1"
>> limits:
>> memory: "1Gi"
>> cpu: "1"
>> volumes:
>> - name: flink-config-volume
>> configMap:
>> name: flink-config
>> items:
>> - key: flink-conf.yaml
>> path: flink-conf.yaml
>> - key: log4j-console.properties
>> path: log4j-console.properties
>> - name: flink-staging
>> persistentVolumeClaim:
>> claimName: staging-artifacts-claim
>> ---
>> apiVersion: apps/v1
>> kind: Deployment
>> metadata:
>> name: flink-taskmanager
>> namespace: flink
>> spec:
>> replicas: 1
>> selector:
>> matchLabels:
>> app: flink
>> component: taskmanager
>> template:
>> metadata:
>> labels:
>> app: flink
>> component: taskmanager
>> spec:
>> containers:
>> - name: taskmanager-beam-worker
>> image: custom-flink:latest
>> imagePullPolicy: IfNotPresent
>> args:
>> - /bin/bash
>> - -c
>> - "/opt/flink/bin/taskmanager.sh start-foreground & python -m
>> apache_beam.runners.worker.worker_pool_main
>> --container_executable=/opt/apache/beam/boot --service_port=50000 & tail -f
>> /dev/null"
>> ports:
>> - containerPort: 6122
>> name: rpc
>> - containerPort: 6125
>> name: query-state
>> - containerPort: 50000
>> name: pool
>> livenessProbe:
>> tcpSocket:
>> port: 6122
>> initialDelaySeconds: 30
>> periodSeconds: 60
>> volumeMounts:
>> - name: flink-config-volume
>> mountPath: /opt/flink/conf/
>> - name: flink-staging
>> mountPath: /tmp/beam-artifact-staging
>> securityContext:
>> runAsUser: 9999
>> resources:
>> requests:
>> memory: "4Gi"
>> cpu: "4"
>> limits:
>> memory: "4Gi"
>> cpu: "4"
>> volumes:
>> - name: flink-config-volume
>> configMap:
>> name: flink-config
>> items:
>> - key: flink-conf.yaml
>> path: flink-conf.yaml
>> - key: log4j-console.properties
>> path: log4j-console.properties
>> - name: flink-staging
>> persistentVolumeClaim:
>> claimName: staging-artifacts-claim
>> ---
>> apiVersion: apps/v1
>> kind: Deployment
>> metadata:
>> name: beam-jobserver
>> namespace: flink
>> spec:
>> replicas: 1
>> selector:
>> matchLabels:
>> app: beam
>> component: jobserver
>> template:
>> metadata:
>> labels:
>> app: beam
>> component: jobserver
>> spec:
>> containers:
>> - name: beam-jobserver
>> image: apache/beam_flink1.16_job_server:2.48.0
>> args: ["--flink-master=flink-jobmanager:8081"]
>> ports:
>> - containerPort: 8097
>> - containerPort: 8098
>> - containerPort: 8099
>> volumeMounts:
>> - name: beam-staging
>> mountPath: /tmp/beam-artifact-staging
>> resources:
>> requests:
>> memory: "2Gi"
>> cpu: "2"
>> limits:
>> memory: "2Gi"
>> cpu: "2"
>> volumes:
>> - name: beam-staging
>> persistentVolumeClaim:
>> claimName: staging-artifacts-claim
>> ---
>> apiVersion: v1
>> kind: Service
>> metadata:
>> name: beam-jobserver
>> namespace: flink
>> spec:
>> type: ClusterIP
>> ports:
>> - name: grpc-port
>> port: 8097
>> targetPort: 8097
>> - name: expansion-port
>> port: 8098
>> targetPort: 8098
>> - name: job-manage-port
>> port: 8099
>> targetPort: 8099
>> selector:
>> app: beam
>> component: jobserver
>>
>> pvc
>>
>> apiVersion: v1
>> kind: PersistentVolumeClaim
>> metadata:
>> name: staging-artifacts-claim
>> namespace: flink
>> spec:
>> accessModes:
>> - ReadWriteOnce
>> resources:
>> requests:
>> storage: 5Gi
>> storageClassName: standard
>>
>>
>>
>> Then I am running a Pod  with apache/beam_python3.8_sdk:2.48.0.
>> and installing java in it because expansion required to run the code here
>> is my code that is running from above container
>> ```
>> import json
>> import logging
>> import apache_beam as beam
>> from apache_beam.options.pipeline_options import PipelineOptions
>> from apache_beam.io.kafka import ReadFromKafka,
>> default_io_expansion_service
>>
>> def run_beam_pipeline():
>> logging.getLogger().setLevel(logging.INFO)
>>
>> consumer_config = {
>> 'bootstrap.servers':
>> 'cluster-0-kafka-bootstrap.strimzi.svc.cluster.local:9092',
>> 'group.id': 'beamgrouptest',
>> 'auto.offset.reset': 'earliest',
>> 'key.deserializer':
>> 'org.apache.kafka.common.serialization.StringDeserializer',
>> 'value.deserializer':
>> 'org.apache.kafka.common.serialization.StringDeserializer',
>> }
>>
>> topic = 'locations'
>>
>> flink_options = PipelineOptions([
>> "--runner=PortableRunner",
>> "--artifact_endpoint=beam-jobserver:8098",
>> "--job_endpoint=beam-jobserver:8099",
>> "--environment_type=EXTERNAL",
>> "--environment_config=beam-worker-pool:50000",
>> # "--environment_config={\"command\": \"/opt/apache/beam/boot\"}",
>> ])
>>
>> with beam.Pipeline(options=flink_options) as pipeline:
>> messages = (
>> pipeline
>> | "Read from Kafka" >> ReadFromKafka(
>> consumer_config=consumer_config,
>> topics=[topic],
>> with_metadata=False,
>> expansion_service=default_io_expansion_service(
>> append_args=[
>> '--defaultEnvironmentType=PROCESS',
>> "--defaultEnvironmentConfig={\"command\": \"/opt/apache/beam/boot\"}",
>> ]
>> )
>> )
>> | "Print messages" >> beam.Map(print)
>> )
>>
>> logging.info("Pipeline execution completed.")
>>
>> if __name__ == '__main__':
>> run_beam_pipeline()
>>
>> ```
>>
>> When starting a job here is logs, it is downloading java expansion
>> service.
>>
>>
>> python3 testing.py
>>
>>
>>
>>
>>
>> <jemalloc>: MADV_DONTNEED does not work (memset will be used instead)
>>
>> <jemalloc>: (This is the expected behaviour if you are running under QEMU)
>>
>> INFO:apache_beam.utils.subprocess_server:Using cached job server jar from
>> https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-io-expansion-service/2.48.0/beam-sdks-java-io-expansion-service-2.48.0.jar
>>
>> INFO:root:Starting a JAR-based expansion service from JAR
>> /root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.48.0.jar
>>
>>
>> INFO:apache_beam.utils.subprocess_server:Starting service with ['java'
>> '-jar'
>> '/root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.48.0.jar'
>> '35371'
>> '--filesToStage=/root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.48.0.jar'
>> '--defaultEnvironmentType=PROCESS' '--defaultEnvironmentConfig={"command":
>> "/opt/apache/beam/boot"}']
>>
>> INFO:apache_beam.utils.subprocess_server:Starting expansion service at
>> localhost:35371
>>
>> INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:00 AM
>> org.apache.beam.sdk.expansion.service.ExpansionService
>> loadRegisteredTransforms
>>
>> INFO:apache_beam.utils.subprocess_server:INFO: Registering external
>> transforms: [beam:transform:org.apache.beam:kafka_read_with_metadata:v1,
>> beam:transform:org.apache.beam:kafka_read_without_metadata:v1,
>> beam:transform:org.apache.beam:kafka_write:v1,
>> beam:external:java:generate_sequence:v1]
>>
>> INFO:apache_beam.utils.subprocess_server:
>>
>> INFO:apache_beam.utils.subprocess_server:Registered transforms:
>>
>> INFO:apache_beam.utils.subprocess_server: 
>> beam:transform:org.apache.beam:kafka_read_with_metadata:v1:
>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@5c6648b0
>>
>> INFO:apache_beam.utils.subprocess_server: 
>> beam:transform:org.apache.beam:kafka_read_without_metadata:v1:
>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@6f1de4c7
>>
>> INFO:apache_beam.utils.subprocess_server: 
>> beam:transform:org.apache.beam:kafka_write:v1:
>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@459e9125
>>
>> INFO:apache_beam.utils.subprocess_server: 
>> beam:external:java:generate_sequence:v1:
>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@128d2484
>>
>> INFO:apache_beam.utils.subprocess_server:
>>
>> INFO:apache_beam.utils.subprocess_server:Registered
>> SchemaTransformProviders:
>>
>> INFO:apache_beam.utils.subprocess_server:
>> beam:schematransform:org.apache.beam:kafka_read:v1
>>
>> INFO:apache_beam.utils.subprocess_server:
>> beam:schematransform:org.apache.beam:kafka_write:v1
>>
>> WARNING:root:Waiting for grpc channel to be ready at localhost:35371.
>>
>>
>>
>>
>> WARNING:root:Waiting for grpc channel to be ready at localhost:35371.
>>
>> WARNING:root:Waiting for grpc channel to be ready at localhost:35371.
>>
>> WARNING:root:Waiting for grpc channel to be ready at localhost:35371.
>>
>> INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:06 AM
>> org.apache.beam.sdk.expansion.service.ExpansionService expand
>>
>> INFO:apache_beam.utils.subprocess_server:INFO: Expanding 'Read from
>> Kafka' with URN
>> 'beam:transform:org.apache.beam:kafka_read_without_metadata:v1'
>>
>> INFO:apache_beam.utils.subprocess_server:Dependencies list: {}
>>
>> INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:07 AM
>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader
>> payloadToConfig
>>
>> INFO:apache_beam.utils.subprocess_server:WARNING: Configuration class
>> 'org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration' has no
>> schema registered. Attempting to construct with setter approach.
>>
>> INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:08 AM
>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader
>> payloadToConfig
>>
>> INFO:apache_beam.utils.subprocess_server:WARNING: Configuration class
>> 'org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration' has no
>> schema registered. Attempting to construct with setter approach.
>>
>> INFO:root:Default Python SDK image for environment is
>> apache/beam_python3.8_sdk:2.48.0
>>
>> INFO:apache_beam.runners.portability.fn_api_runner.translations:====================
>> <function pack_combiners at 0x402e7a95e0> ====================
>>
>> INFO:apache_beam.runners.portability.fn_api_runner.translations:====================
>> <function lift_combiners at 0x402e7a9670> ====================
>>
>> INFO:apache_beam.runners.portability.fn_api_runner.translations:====================
>> <function sort_stages at 0x402e7a9dc0> ====================
>>
>> INFO:apache_beam.runners.portability.portable_runner:Job state changed to
>> STOPPED
>>
>> INFO:apache_beam.runners.portability.portable_runner:Job state changed to
>> STARTING
>>
>> INFO:apache_beam.runners.portability.portable_runner:Job state changed to
>> RUNNING
>>
>>
>>
>>
>>
>>
>>
>> *Error*
>>
>> 2023-08-09 10:25:37,146 INFO  
>> org.apache.flink.configuration.GlobalConfiguration
>>           [] - Loading configuration property: taskmanager.rpc.port, 6122
>>
>> 2023-08-09 10:25:37,260 INFO  org.apache.flink.runtime.taskmanager.Task
>>                   [] - Source: Impulse -> [3]Read from
>> Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor),
>> KafkaIO.ReadSourceDescriptors} (1/1)#0
>> (3a42a4a4b7edf55899dc956496d8f99b_cbc357ccb763df2852fee8c4fc7d55f2_0_0)
>> switched from INITIALIZING to RUNNING.
>>
>> 2023-08-09 10:25:37,724 INFO  
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder
>> [] - Finished to build heap keyed state-backend.
>>
>> 2023-08-09 10:25:37,731 INFO
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend    [] -
>> Initializing heap keyed state backend with stream factory.
>>
>> 2023-08-09 10:25:37,771 INFO  
>> org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService
>> [] - getProcessBundleDescriptor request with id 1-4
>>
>> 2023-08-09 10:25:37,876 INFO  
>> /usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:889
>> [] - Creating insecure state channel for localhost:45429.
>>
>> 2023-08-09 10:25:37,877 INFO  
>> /usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:896
>> [] - State channel established.
>>
>> 2023-08-09 10:25:37,918 INFO  
>> /usr/local/lib/python3.8/site-packages/apache_beam/transforms/environments.py:376
>> [] - Default Python SDK image for environment is
>> apache/beam_python3.8_sdk:2.48.0
>>
>> 2023-08-09 10:25:37,928 ERROR
>> /usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:299
>> [] - Error processing instruction 1. Original traceback is
>>
>> Traceback (most recent call last):
>>
>>   File
>> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 295, in _execute
>>
>>     response = task()
>>
>>   File
>> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 370, in <lambda>
>>
>>     lambda: self.create_worker().do_instruction(request), request)
>>
>>   File
>> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 629, in do_instruction
>>
>>     return getattr(self, request_type)(
>>
>>   File
>> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 660, in process_bundle
>>
>>     bundle_processor = self.bundle_processor_cache.get(
>>
>>   File
>> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 491, in get
>>
>>     processor = bundle_processor.BundleProcessor(
>>
>>   File
>> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 876, in __init__
>>
>>
>> _verify_descriptor_created_in_a_compatible_env(process_bundle_descriptor)
>>
>>   File
>> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 839, in _verify_descriptor_created_in_a_compatible_env
>>
>>     raise RuntimeError(
>>
>> *RuntimeError: Pipeline construction environment and pipeline runtime
>> environment are not compatible. If you use a custom container image, check
>> that the Python interpreter minor version and the Apache Beam version in
>> your image match the versions used at pipeline construction time.
>> Submission environment: beam:version:sdk_base:apache/beam_java8_sdk:2.48.0.
>> Runtime environment:
>> beam:version:sdk_base:apache/beam_python3.8_sdk:2.48.0.*
>>
>>
>>
>>
>> 2023-08-09 10:25:37,931 INFO  org.apache.flink.runtime.taskmanager.Task
>>                   [] - [3]Read from Kafka/{KafkaIO.Read, Remove Kafka
>> Metadata} -> [1]Print messages (1/1)#0
>> (3a42a4a4b7edf55899dc956496d8f99b_03f93075562d7d50bb0b07080b2ebe35_0_0)
>> switched from INITIALIZING to RUNNING.
>>
>> 2023-08-09 10:28:37,784 WARN  org.apache.flink.runtime.taskmanager.Task
>>                   [] - Source: Impulse -> [3]Read from
>> Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor),
>> KafkaIO.ReadSourceDescriptors} (1/1)#0
>> (3a42a4a4b7edf55899dc956496d8f99b_cbc357ccb763df2852fee8c4fc7d55f2_0_0)
>> switched from RUNNING to FAILED with failure cause:
>> java.lang.RuntimeException: Failed to start remote bundle
>>
>>
>>
>>
>> Thanks
>> kapil Dev
>>
>>
>>
>

Reply via email to