Not the OP, but is it possible to join the slack channel without an apache.org email address? I tried joining slack previously for support and gave up because it looked like it wasn't.
On Mon, Aug 14, 2023 at 10:58 AM Kenneth Knowles <k...@apache.org> wrote: > There is a slack channel linked from > https://beam.apache.org/community/contact-us/ it is #beam on > the-asf.slack.com > > (you find this via beam.apache.org -> Community -> Contact Us) > > It sounds like an issue with running a multi-language pipeline on the > portable flink runner. (something which I am not equipped to help with in > detail) > > Kenn > > On Wed, Aug 9, 2023 at 2:51 PM kapil singh <kapilsing...@gmail.com> wrote: > >> Hey, >> >> I've been grappling with this issue for the past five days and, despite >> my continuous efforts, I haven't found a resolution. Additionally, I've >> been unable to locate a Slack channel for Beam where I might seek >> assistance. >> >> issue >> >> *RuntimeError: Pipeline construction environment and pipeline runtime >> environment are not compatible. If you use a custom container image, check >> that the Python interpreter minor version and the Apache Beam version in >> your image match the versions used at pipeline construction time. >> Submission environment: beam:version:sdk_base:apache/beam_java8_sdk:2.48.0. >> Runtime environment: >> beam:version:sdk_base:apache/beam_python3.8_sdk:2.48.0.* >> >> >> Here what i am trying to do >> >> i am running job from kubernetes container that hits on job server and >> then job manager and task manager >> task manager and job manager is one Container >> >> Here is My custom Dockerfile. name:custom-flink >> >> # Starting with the base Flink image >> FROM apache/flink:1.16-java11 >> ARG FLINK_VERSION=1.16 >> ARG KAFKA_VERSION=2.8.0 >> >> # Install python3.8 and its associated dependencies, followed by pyflink >> RUN set -ex; \ >> apt-get update && \ >> apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev >> libffi-dev lzma liblzma-dev && \ >> wget https://www.python.org/ftp/python/3.8.0/Python-3.8.0.tgz && \ >> tar -xvf Python-3.8.0.tgz && \ >> cd Python-3.8.0 && \ >> ./configure --without-tests --enable-shared && \ >> make -j4 && \ >> make install && \ >> ldconfig /usr/local/lib && \ >> cd .. && rm -f Python-3.8.0.tgz && rm -rf Python-3.8.0 && \ >> ln -s /usr/local/bin/python3.8 /usr/local/bin/python && \ >> ln -s /usr/local/bin/pip3.8 /usr/local/bin/pip && \ >> apt-get clean && \ >> rm -rf /var/lib/apt/lists/* && \ >> python -m pip install --upgrade pip; \ >> pip install apache-flink==${FLINK_VERSION}; \ >> pip install kafka-python >> >> RUN pip install --no-cache-dir apache-beam[gcp]==2.48.0 >> >> # Copy files from official SDK image, including script/dependencies. >> COPY --from=apache/beam_python3.8_sdk:2.48.0 /opt/apache/beam/ >> /opt/apache/beam/ >> >> # java SDK >> COPY --from=apache/beam_java11_sdk:2.48.0 /opt/apache/beam/ >> /opt/apache/beam_java/ >> >> RUN apt-get update && apt-get install -y python3-venv && rm -rf >> /var/lib/apt/lists/* >> >> # Give permissions to the /opt/apache/beam-venv directory >> RUN mkdir -p /opt/apache/beam-venv && chown -R 9999:9999 >> /opt/apache/beam-venv >> >> Here is my Deployment file for Job manager,Task manager plus worker-pool >> and job server >> >> >> apiVersion: v1 >> kind: Service >> metadata: >> name: flink-jobmanager >> namespace: flink >> spec: >> type: ClusterIP >> ports: >> - name: rpc >> port: 6123 >> - name: blob-server >> port: 6124 >> - name: webui >> port: 8081 >> selector: >> app: flink >> component: jobmanager >> --- >> apiVersion: v1 >> kind: Service >> metadata: >> name: beam-worker-pool >> namespace: flink >> spec: >> selector: >> app: flink >> component: taskmanager >> ports: >> - protocol: TCP >> port: 50000 >> targetPort: 50000 >> name: pool >> --- >> apiVersion: apps/v1 >> kind: Deployment >> metadata: >> name: flink-jobmanager >> namespace: flink >> spec: >> replicas: 1 >> selector: >> matchLabels: >> app: flink >> component: jobmanager >> template: >> metadata: >> labels: >> app: flink >> component: jobmanager >> spec: >> containers: >> - name: jobmanager >> image: custom-flink:latest >> imagePullPolicy: IfNotPresent >> args: ["jobmanager"] >> ports: >> - containerPort: 6123 >> name: rpc >> - containerPort: 6124 >> name: blob-server >> - containerPort: 8081 >> name: webui >> livenessProbe: >> tcpSocket: >> port: 6123 >> initialDelaySeconds: 30 >> periodSeconds: 60 >> volumeMounts: >> - name: flink-config-volume >> mountPath: /opt/flink/conf >> - name: flink-staging >> mountPath: /tmp/beam-artifact-staging >> securityContext: >> runAsUser: 9999 >> resources: >> requests: >> memory: "1Gi" >> cpu: "1" >> limits: >> memory: "1Gi" >> cpu: "1" >> volumes: >> - name: flink-config-volume >> configMap: >> name: flink-config >> items: >> - key: flink-conf.yaml >> path: flink-conf.yaml >> - key: log4j-console.properties >> path: log4j-console.properties >> - name: flink-staging >> persistentVolumeClaim: >> claimName: staging-artifacts-claim >> --- >> apiVersion: apps/v1 >> kind: Deployment >> metadata: >> name: flink-taskmanager >> namespace: flink >> spec: >> replicas: 1 >> selector: >> matchLabels: >> app: flink >> component: taskmanager >> template: >> metadata: >> labels: >> app: flink >> component: taskmanager >> spec: >> containers: >> - name: taskmanager-beam-worker >> image: custom-flink:latest >> imagePullPolicy: IfNotPresent >> args: >> - /bin/bash >> - -c >> - "/opt/flink/bin/taskmanager.sh start-foreground & python -m >> apache_beam.runners.worker.worker_pool_main >> --container_executable=/opt/apache/beam/boot --service_port=50000 & tail -f >> /dev/null" >> ports: >> - containerPort: 6122 >> name: rpc >> - containerPort: 6125 >> name: query-state >> - containerPort: 50000 >> name: pool >> livenessProbe: >> tcpSocket: >> port: 6122 >> initialDelaySeconds: 30 >> periodSeconds: 60 >> volumeMounts: >> - name: flink-config-volume >> mountPath: /opt/flink/conf/ >> - name: flink-staging >> mountPath: /tmp/beam-artifact-staging >> securityContext: >> runAsUser: 9999 >> resources: >> requests: >> memory: "4Gi" >> cpu: "4" >> limits: >> memory: "4Gi" >> cpu: "4" >> volumes: >> - name: flink-config-volume >> configMap: >> name: flink-config >> items: >> - key: flink-conf.yaml >> path: flink-conf.yaml >> - key: log4j-console.properties >> path: log4j-console.properties >> - name: flink-staging >> persistentVolumeClaim: >> claimName: staging-artifacts-claim >> --- >> apiVersion: apps/v1 >> kind: Deployment >> metadata: >> name: beam-jobserver >> namespace: flink >> spec: >> replicas: 1 >> selector: >> matchLabels: >> app: beam >> component: jobserver >> template: >> metadata: >> labels: >> app: beam >> component: jobserver >> spec: >> containers: >> - name: beam-jobserver >> image: apache/beam_flink1.16_job_server:2.48.0 >> args: ["--flink-master=flink-jobmanager:8081"] >> ports: >> - containerPort: 8097 >> - containerPort: 8098 >> - containerPort: 8099 >> volumeMounts: >> - name: beam-staging >> mountPath: /tmp/beam-artifact-staging >> resources: >> requests: >> memory: "2Gi" >> cpu: "2" >> limits: >> memory: "2Gi" >> cpu: "2" >> volumes: >> - name: beam-staging >> persistentVolumeClaim: >> claimName: staging-artifacts-claim >> --- >> apiVersion: v1 >> kind: Service >> metadata: >> name: beam-jobserver >> namespace: flink >> spec: >> type: ClusterIP >> ports: >> - name: grpc-port >> port: 8097 >> targetPort: 8097 >> - name: expansion-port >> port: 8098 >> targetPort: 8098 >> - name: job-manage-port >> port: 8099 >> targetPort: 8099 >> selector: >> app: beam >> component: jobserver >> >> pvc >> >> apiVersion: v1 >> kind: PersistentVolumeClaim >> metadata: >> name: staging-artifacts-claim >> namespace: flink >> spec: >> accessModes: >> - ReadWriteOnce >> resources: >> requests: >> storage: 5Gi >> storageClassName: standard >> >> >> >> Then I am running a Pod with apache/beam_python3.8_sdk:2.48.0. >> and installing java in it because expansion required to run the code here >> is my code that is running from above container >> ``` >> import json >> import logging >> import apache_beam as beam >> from apache_beam.options.pipeline_options import PipelineOptions >> from apache_beam.io.kafka import ReadFromKafka, >> default_io_expansion_service >> >> def run_beam_pipeline(): >> logging.getLogger().setLevel(logging.INFO) >> >> consumer_config = { >> 'bootstrap.servers': >> 'cluster-0-kafka-bootstrap.strimzi.svc.cluster.local:9092', >> 'group.id': 'beamgrouptest', >> 'auto.offset.reset': 'earliest', >> 'key.deserializer': >> 'org.apache.kafka.common.serialization.StringDeserializer', >> 'value.deserializer': >> 'org.apache.kafka.common.serialization.StringDeserializer', >> } >> >> topic = 'locations' >> >> flink_options = PipelineOptions([ >> "--runner=PortableRunner", >> "--artifact_endpoint=beam-jobserver:8098", >> "--job_endpoint=beam-jobserver:8099", >> "--environment_type=EXTERNAL", >> "--environment_config=beam-worker-pool:50000", >> # "--environment_config={\"command\": \"/opt/apache/beam/boot\"}", >> ]) >> >> with beam.Pipeline(options=flink_options) as pipeline: >> messages = ( >> pipeline >> | "Read from Kafka" >> ReadFromKafka( >> consumer_config=consumer_config, >> topics=[topic], >> with_metadata=False, >> expansion_service=default_io_expansion_service( >> append_args=[ >> '--defaultEnvironmentType=PROCESS', >> "--defaultEnvironmentConfig={\"command\": \"/opt/apache/beam/boot\"}", >> ] >> ) >> ) >> | "Print messages" >> beam.Map(print) >> ) >> >> logging.info("Pipeline execution completed.") >> >> if __name__ == '__main__': >> run_beam_pipeline() >> >> ``` >> >> When starting a job here is logs, it is downloading java expansion >> service. >> >> >> python3 testing.py >> >> >> >> >> >> <jemalloc>: MADV_DONTNEED does not work (memset will be used instead) >> >> <jemalloc>: (This is the expected behaviour if you are running under QEMU) >> >> INFO:apache_beam.utils.subprocess_server:Using cached job server jar from >> https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-io-expansion-service/2.48.0/beam-sdks-java-io-expansion-service-2.48.0.jar >> >> INFO:root:Starting a JAR-based expansion service from JAR >> /root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.48.0.jar >> >> >> INFO:apache_beam.utils.subprocess_server:Starting service with ['java' >> '-jar' >> '/root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.48.0.jar' >> '35371' >> '--filesToStage=/root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.48.0.jar' >> '--defaultEnvironmentType=PROCESS' '--defaultEnvironmentConfig={"command": >> "/opt/apache/beam/boot"}'] >> >> INFO:apache_beam.utils.subprocess_server:Starting expansion service at >> localhost:35371 >> >> INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:00 AM >> org.apache.beam.sdk.expansion.service.ExpansionService >> loadRegisteredTransforms >> >> INFO:apache_beam.utils.subprocess_server:INFO: Registering external >> transforms: [beam:transform:org.apache.beam:kafka_read_with_metadata:v1, >> beam:transform:org.apache.beam:kafka_read_without_metadata:v1, >> beam:transform:org.apache.beam:kafka_write:v1, >> beam:external:java:generate_sequence:v1] >> >> INFO:apache_beam.utils.subprocess_server: >> >> INFO:apache_beam.utils.subprocess_server:Registered transforms: >> >> INFO:apache_beam.utils.subprocess_server: >> beam:transform:org.apache.beam:kafka_read_with_metadata:v1: >> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@5c6648b0 >> >> INFO:apache_beam.utils.subprocess_server: >> beam:transform:org.apache.beam:kafka_read_without_metadata:v1: >> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@6f1de4c7 >> >> INFO:apache_beam.utils.subprocess_server: >> beam:transform:org.apache.beam:kafka_write:v1: >> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@459e9125 >> >> INFO:apache_beam.utils.subprocess_server: >> beam:external:java:generate_sequence:v1: >> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@128d2484 >> >> INFO:apache_beam.utils.subprocess_server: >> >> INFO:apache_beam.utils.subprocess_server:Registered >> SchemaTransformProviders: >> >> INFO:apache_beam.utils.subprocess_server: >> beam:schematransform:org.apache.beam:kafka_read:v1 >> >> INFO:apache_beam.utils.subprocess_server: >> beam:schematransform:org.apache.beam:kafka_write:v1 >> >> WARNING:root:Waiting for grpc channel to be ready at localhost:35371. >> >> >> >> >> WARNING:root:Waiting for grpc channel to be ready at localhost:35371. >> >> WARNING:root:Waiting for grpc channel to be ready at localhost:35371. >> >> WARNING:root:Waiting for grpc channel to be ready at localhost:35371. >> >> INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:06 AM >> org.apache.beam.sdk.expansion.service.ExpansionService expand >> >> INFO:apache_beam.utils.subprocess_server:INFO: Expanding 'Read from >> Kafka' with URN >> 'beam:transform:org.apache.beam:kafka_read_without_metadata:v1' >> >> INFO:apache_beam.utils.subprocess_server:Dependencies list: {} >> >> INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:07 AM >> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader >> payloadToConfig >> >> INFO:apache_beam.utils.subprocess_server:WARNING: Configuration class >> 'org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration' has no >> schema registered. Attempting to construct with setter approach. >> >> INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:08 AM >> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader >> payloadToConfig >> >> INFO:apache_beam.utils.subprocess_server:WARNING: Configuration class >> 'org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration' has no >> schema registered. Attempting to construct with setter approach. >> >> INFO:root:Default Python SDK image for environment is >> apache/beam_python3.8_sdk:2.48.0 >> >> INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== >> <function pack_combiners at 0x402e7a95e0> ==================== >> >> INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== >> <function lift_combiners at 0x402e7a9670> ==================== >> >> INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== >> <function sort_stages at 0x402e7a9dc0> ==================== >> >> INFO:apache_beam.runners.portability.portable_runner:Job state changed to >> STOPPED >> >> INFO:apache_beam.runners.portability.portable_runner:Job state changed to >> STARTING >> >> INFO:apache_beam.runners.portability.portable_runner:Job state changed to >> RUNNING >> >> >> >> >> >> >> >> *Error* >> >> 2023-08-09 10:25:37,146 INFO >> org.apache.flink.configuration.GlobalConfiguration >> [] - Loading configuration property: taskmanager.rpc.port, 6122 >> >> 2023-08-09 10:25:37,260 INFO org.apache.flink.runtime.taskmanager.Task >> [] - Source: Impulse -> [3]Read from >> Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), >> KafkaIO.ReadSourceDescriptors} (1/1)#0 >> (3a42a4a4b7edf55899dc956496d8f99b_cbc357ccb763df2852fee8c4fc7d55f2_0_0) >> switched from INITIALIZING to RUNNING. >> >> 2023-08-09 10:25:37,724 INFO >> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder >> [] - Finished to build heap keyed state-backend. >> >> 2023-08-09 10:25:37,731 INFO >> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend [] - >> Initializing heap keyed state backend with stream factory. >> >> 2023-08-09 10:25:37,771 INFO >> org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService >> [] - getProcessBundleDescriptor request with id 1-4 >> >> 2023-08-09 10:25:37,876 INFO >> /usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:889 >> [] - Creating insecure state channel for localhost:45429. >> >> 2023-08-09 10:25:37,877 INFO >> /usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:896 >> [] - State channel established. >> >> 2023-08-09 10:25:37,918 INFO >> /usr/local/lib/python3.8/site-packages/apache_beam/transforms/environments.py:376 >> [] - Default Python SDK image for environment is >> apache/beam_python3.8_sdk:2.48.0 >> >> 2023-08-09 10:25:37,928 ERROR >> /usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:299 >> [] - Error processing instruction 1. Original traceback is >> >> Traceback (most recent call last): >> >> File >> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", >> line 295, in _execute >> >> response = task() >> >> File >> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", >> line 370, in <lambda> >> >> lambda: self.create_worker().do_instruction(request), request) >> >> File >> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", >> line 629, in do_instruction >> >> return getattr(self, request_type)( >> >> File >> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", >> line 660, in process_bundle >> >> bundle_processor = self.bundle_processor_cache.get( >> >> File >> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", >> line 491, in get >> >> processor = bundle_processor.BundleProcessor( >> >> File >> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", >> line 876, in __init__ >> >> >> _verify_descriptor_created_in_a_compatible_env(process_bundle_descriptor) >> >> File >> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", >> line 839, in _verify_descriptor_created_in_a_compatible_env >> >> raise RuntimeError( >> >> *RuntimeError: Pipeline construction environment and pipeline runtime >> environment are not compatible. If you use a custom container image, check >> that the Python interpreter minor version and the Apache Beam version in >> your image match the versions used at pipeline construction time. >> Submission environment: beam:version:sdk_base:apache/beam_java8_sdk:2.48.0. >> Runtime environment: >> beam:version:sdk_base:apache/beam_python3.8_sdk:2.48.0.* >> >> >> >> >> 2023-08-09 10:25:37,931 INFO org.apache.flink.runtime.taskmanager.Task >> [] - [3]Read from Kafka/{KafkaIO.Read, Remove Kafka >> Metadata} -> [1]Print messages (1/1)#0 >> (3a42a4a4b7edf55899dc956496d8f99b_03f93075562d7d50bb0b07080b2ebe35_0_0) >> switched from INITIALIZING to RUNNING. >> >> 2023-08-09 10:28:37,784 WARN org.apache.flink.runtime.taskmanager.Task >> [] - Source: Impulse -> [3]Read from >> Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), >> KafkaIO.ReadSourceDescriptors} (1/1)#0 >> (3a42a4a4b7edf55899dc956496d8f99b_cbc357ccb763df2852fee8c4fc7d55f2_0_0) >> switched from RUNNING to FAILED with failure cause: >> java.lang.RuntimeException: Failed to start remote bundle >> >> >> >> >> Thanks >> kapil Dev >> >> >> >