Hey Kapil,
I grappled with a similar deployment and created this repo
<https://github.com/sambvfx/beam-flink-k8s> [1] to attempt to provide
others with some nuggets of useful information. We were running cross
language pipelines on flink connecting PubsubIO
<https://github.com/apache/beam/blob/v2.48.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L171> [2]
to other misc python transforms. No promises it will help, but feel
free to take a look as it's a close approximation to the setup that we
had working.
Your particular error seems related to the Kafka transform. Does a
pure python pipeline execute as expected?
[1] https://github.com/sambvfx/beam-flink-k8s
[2]
https://github.com/apache/beam/blob/v2.48.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L171
On Mon, Aug 14, 2023 at 11:05 AM Daniel Chen via user
<user@beam.apache.org> wrote:
Not the OP, but is it possible to join the slack channel without
an apache.org <http://apache.org> email address? I tried joining
slack previously for support and gave up because it looked like it
wasn't.
On Mon, Aug 14, 2023 at 10:58 AM Kenneth Knowles <k...@apache.org>
wrote:
There is a slack channel linked from
https://beam.apache.org/community/contact-us/ it is #beam on
the-asf.slack.com <http://the-asf.slack.com>
(you find this via beam.apache.org <http://beam.apache.org> ->
Community -> Contact Us)
It sounds like an issue with running a multi-language pipeline
on the portable flink runner. (something which I am not
equipped to help with in detail)
Kenn
On Wed, Aug 9, 2023 at 2:51 PM kapil singh
<kapilsing...@gmail.com> wrote:
Hey,
I've been grappling with this issue for the past five days
and, despite my continuous efforts, I haven't found a
resolution. Additionally, I've been unable to locate a
Slack channel for Beam where I might seek assistance.
issue
*RuntimeError: Pipeline construction environment and
pipeline runtime environment are not compatible. If you
use a custom container image, check that the Python
interpreter minor version and the Apache Beam version in
your image match the versions used at pipeline
construction time. Submission environment:
beam:version:sdk_base:apache/beam_java8_sdk:2.48.0.
Runtime environment:
beam:version:sdk_base:apache/beam_python3.8_sdk:2.48.0.*
Here what i am trying to do
i am running job from kubernetes container that hits on
job server and then job manager and task manager
task manager and job manager is one Container
Here is My custom Dockerfile. name:custom-flink
# Starting with the base Flink image
FROM apache/flink:1.16-java11
ARG FLINK_VERSION=1.16
ARG KAFKA_VERSION=2.8.0
# Install python3.8 and its associated dependencies,
followed by pyflink
RUN set -ex; \
apt-get update && \
apt-get install -y build-essential libssl-dev zlib1g-dev
libbz2-dev libffi-dev lzma liblzma-dev && \
wget
https://www.python.org/ftp/python/3.8.0/Python-3.8.0.tgz && \
tar -xvf Python-3.8.0.tgz && \
cd Python-3.8.0 && \
./configure --without-tests --enable-shared && \
make -j4 && \
make install && \
ldconfig /usr/local/lib && \
cd .. && rm -f Python-3.8.0.tgz && rm -rf Python-3.8.0 && \
ln -s /usr/local/bin/python3.8 /usr/local/bin/python && \
ln -s /usr/local/bin/pip3.8 /usr/local/bin/pip && \
apt-get clean && \
rm -rf /var/lib/apt/lists/* && \
python -m pip install --upgrade pip; \
pip install apache-flink==${FLINK_VERSION}; \
pip install kafka-python
RUN pip install --no-cache-dir apache-beam[gcp]==2.48.0
# Copy files from official SDK image, including
script/dependencies.
COPY --from=apache/beam_python3.8_sdk:2.48.0
/opt/apache/beam/ /opt/apache/beam/
# java SDK
COPY --from=apache/beam_java11_sdk:2.48.0
/opt/apache/beam/ /opt/apache/beam_java/
RUN apt-get update && apt-get install -y python3-venv &&
rm -rf /var/lib/apt/lists/*
# Give permissions to the /opt/apache/beam-venv directory
RUN mkdir -p /opt/apache/beam-venv && chown -R 9999:9999
/opt/apache/beam-venv
Here is my Deployment file for Job manager,Task manager
plus worker-pool and job server
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager
namespace: flink
spec:
type: ClusterIP
ports:
- name: rpc
port: 6123
- name: blob-server
port: 6124
- name: webui
port: 8081
selector:
app: flink
component: jobmanager
---
apiVersion: v1
kind: Service
metadata:
name: beam-worker-pool
namespace: flink
spec:
selector:
app: flink
component: taskmanager
ports:
- protocol: TCP
port: 50000
targetPort: 50000
name: pool
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-jobmanager
namespace: flink
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: jobmanager
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
containers:
- name: jobmanager
image: custom-flink:latest
imagePullPolicy: IfNotPresent
args: ["jobmanager"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
- name: flink-staging
mountPath: /tmp/beam-artifact-staging
securityContext:
runAsUser: 9999
resources:
requests:
memory: "1Gi"
cpu: "1"
limits:
memory: "1Gi"
cpu: "1"
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: flink-staging
persistentVolumeClaim:
claimName: staging-artifacts-claim
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
namespace: flink
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager-beam-worker
image: custom-flink:latest
imagePullPolicy: IfNotPresent
args:
- /bin/bash
- -c
- "/opt/flink/bin/taskmanager.sh start-foreground & python
-m apache_beam.runners.worker.worker_pool_main
--container_executable=/opt/apache/beam/boot
--service_port=50000 & tail -f /dev/null"
ports:
- containerPort: 6122
name: rpc
- containerPort: 6125
name: query-state
- containerPort: 50000
name: pool
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
- name: flink-staging
mountPath: /tmp/beam-artifact-staging
securityContext:
runAsUser: 9999
resources:
requests:
memory: "4Gi"
cpu: "4"
limits:
memory: "4Gi"
cpu: "4"
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: flink-staging
persistentVolumeClaim:
claimName: staging-artifacts-claim
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: beam-jobserver
namespace: flink
spec:
replicas: 1
selector:
matchLabels:
app: beam
component: jobserver
template:
metadata:
labels:
app: beam
component: jobserver
spec:
containers:
- name: beam-jobserver
image: apache/beam_flink1.16_job_server:2.48.0
args: ["--flink-master=flink-jobmanager:8081"]
ports:
- containerPort: 8097
- containerPort: 8098
- containerPort: 8099
volumeMounts:
- name: beam-staging
mountPath: /tmp/beam-artifact-staging
resources:
requests:
memory: "2Gi"
cpu: "2"
limits:
memory: "2Gi"
cpu: "2"
volumes:
- name: beam-staging
persistentVolumeClaim:
claimName: staging-artifacts-claim
---
apiVersion: v1
kind: Service
metadata:
name: beam-jobserver
namespace: flink
spec:
type: ClusterIP
ports:
- name: grpc-port
port: 8097
targetPort: 8097
- name: expansion-port
port: 8098
targetPort: 8098
- name: job-manage-port
port: 8099
targetPort: 8099
selector:
app: beam
component: jobserver
pvc
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: staging-artifacts-claim
namespace: flink
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 5Gi
storageClassName: standard
Then I am running a Pod with
apache/beam_python3.8_sdk:2.48.0.
and installing java in it because expansion required to
run the code here is my code that is running from above
container
```
import json
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import
PipelineOptions
from apache_beam.io.kafka import ReadFromKafka,
default_io_expansion_service
def run_beam_pipeline():
logging.getLogger().setLevel(logging.INFO)
consumer_config = {
'bootstrap.servers':
'cluster-0-kafka-bootstrap.strimzi.svc.cluster.local:9092',
'group.id <http://group.id>': 'beamgrouptest',
'auto.offset.reset': 'earliest',
'key.deserializer':
'org.apache.kafka.common.serialization.StringDeserializer',
'value.deserializer':
'org.apache.kafka.common.serialization.StringDeserializer',
}
topic = 'locations'
flink_options = PipelineOptions([
"--runner=PortableRunner",
"--artifact_endpoint=beam-jobserver:8098",
"--job_endpoint=beam-jobserver:8099",
"--environment_type=EXTERNAL",
"--environment_config=beam-worker-pool:50000",
# "--environment_config={\"command\":
\"/opt/apache/beam/boot\"}",
])
with beam.Pipeline(options=flink_options) as pipeline:
messages = (
pipeline
| "Read from Kafka" >> ReadFromKafka(
consumer_config=consumer_config,
topics=[topic],
with_metadata=False,
expansion_service=default_io_expansion_service(
append_args=[
'--defaultEnvironmentType=PROCESS',
"--defaultEnvironmentConfig={\"command\":
\"/opt/apache/beam/boot\"}",
]
)
)
| "Print messages" >> beam.Map(print)
)
logging.info("Pipeline execution completed.")
if __name__ == '__main__':
run_beam_pipeline()
```
When starting a job here is logs, it is downloading java
expansion service.
python3 testing.py
<jemalloc>: MADV_DONTNEED does not work (memset will be
used instead)
<jemalloc>: (This is the expected behaviour if you are
running under QEMU)
INFO:apache_beam.utils.subprocess_server:Using cached job
server jar from
https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-io-expansion-service/2.48.0/beam-sdks-java-io-expansion-service-2.48.0.jar
INFO:root:Starting a JAR-based expansion service from JAR
/root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.48.0.jar
INFO:apache_beam.utils.subprocess_server:Starting service
with ['java' '-jar'
'/root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.48.0.jar'
'35371'
'--filesToStage=/root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.48.0.jar'
'--defaultEnvironmentType=PROCESS'
'--defaultEnvironmentConfig={"command":
"/opt/apache/beam/boot"}']
INFO:apache_beam.utils.subprocess_server:Starting
expansion service at localhost:35371
INFO:apache_beam.utils.subprocess_server:Aug 09, 2023
8:35:00 AM
org.apache.beam.sdk.expansion.service.ExpansionService
loadRegisteredTransforms
INFO:apache_beam.utils.subprocess_server:INFO: Registering
external transforms:
[beam:transform:org.apache.beam:kafka_read_with_metadata:v1,
beam:transform:org.apache.beam:kafka_read_without_metadata:v1,
beam:transform:org.apache.beam:kafka_write:v1,
beam:external:java:generate_sequence:v1]
INFO:apache_beam.utils.subprocess_server:
INFO:apache_beam.utils.subprocess_server:Registered
transforms:
INFO:apache_beam.utils.subprocess_server:beam:transform:org.apache.beam:kafka_read_with_metadata:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@5c6648b0
INFO:apache_beam.utils.subprocess_server:beam:transform:org.apache.beam:kafka_read_without_metadata:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@6f1de4c7
INFO:apache_beam.utils.subprocess_server:beam:transform:org.apache.beam:kafka_write:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@459e9125
INFO:apache_beam.utils.subprocess_server:beam:external:java:generate_sequence:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@128d2484
INFO:apache_beam.utils.subprocess_server:
INFO:apache_beam.utils.subprocess_server:Registered
SchemaTransformProviders:
INFO:apache_beam.utils.subprocess_server:beam:schematransform:org.apache.beam:kafka_read:v1
INFO:apache_beam.utils.subprocess_server:beam:schematransform:org.apache.beam:kafka_write:v1
WARNING:root:Waiting for grpc channel to be ready at
localhost:35371.
WARNING:root:Waiting for grpc channel to be ready at
localhost:35371.
WARNING:root:Waiting for grpc channel to be ready at
localhost:35371.
WARNING:root:Waiting for grpc channel to be ready at
localhost:35371.
INFO:apache_beam.utils.subprocess_server:Aug 09, 2023
8:35:06 AM
org.apache.beam.sdk.expansion.service.ExpansionService expand
INFO:apache_beam.utils.subprocess_server:INFO: Expanding
'Read from Kafka' with URN
'beam:transform:org.apache.beam:kafka_read_without_metadata:v1'
INFO:apache_beam.utils.subprocess_server:Dependencies list: {}
INFO:apache_beam.utils.subprocess_server:Aug 09, 2023
8:35:07 AM
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader
payloadToConfig
INFO:apache_beam.utils.subprocess_server:WARNING:
Configuration class
'org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration'
has no schema registered. Attempting to construct with
setter approach.
INFO:apache_beam.utils.subprocess_server:Aug 09, 2023
8:35:08 AM
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader
payloadToConfig
INFO:apache_beam.utils.subprocess_server:WARNING:
Configuration class
'org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration'
has no schema registered. Attempting to construct with
setter approach.
INFO:root:Default Python SDK image for environment is
apache/beam_python3.8_sdk:2.48.0
INFO:apache_beam.runners.portability.fn_api_runner.translations:====================
<function pack_combiners at 0x402e7a95e0> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:====================
<function lift_combiners at 0x402e7a9670> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:====================
<function sort_stages at 0x402e7a9dc0> ====================
INFO:apache_beam.runners.portability.portable_runner:Job
state changed to STOPPED
INFO:apache_beam.runners.portability.portable_runner:Job
state changed to STARTING
INFO:apache_beam.runners.portability.portable_runner:Job
state changed to RUNNING
*Error*
2023-08-09 10:25:37,146
INFOorg.apache.flink.configuration.GlobalConfiguration []
- Loading configuration property: taskmanager.rpc.port, 6122
2023-08-09 10:25:37,260
INFOorg.apache.flink.runtime.taskmanager.Task[] - Source:
Impulse -> [3]Read from
Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor),
KafkaIO.ReadSourceDescriptors} (1/1)#0
(3a42a4a4b7edf55899dc956496d8f99b_cbc357ccb763df2852fee8c4fc7d55f2_0_0)
switched from INITIALIZING to RUNNING.
2023-08-09 10:25:37,724
INFOorg.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder
[] - Finished to build heap keyed state-backend.
2023-08-09 10:25:37,731
INFOorg.apache.flink.runtime.state.heap.HeapKeyedStateBackend[]
- Initializing heap keyed state backend with stream factory.
2023-08-09 10:25:37,771
INFOorg.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService
[] - getProcessBundleDescriptor request with id 1-4
2023-08-09 10:25:37,876
INFO/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:889
[] - Creating insecure state channel for localhost:45429.
2023-08-09 10:25:37,877
INFO/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:896
[] - State channel established.
2023-08-09 10:25:37,918
INFO/usr/local/lib/python3.8/site-packages/apache_beam/transforms/environments.py:376
[] - Default Python SDK image for environment is
apache/beam_python3.8_sdk:2.48.0
2023-08-09 10:25:37,928 ERROR
/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:299
[] - Error processing instruction 1. Original traceback is
Traceback (most recent call last):
File
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 295, in _execute
response = task()
File
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 370, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 629, in do_instruction
return getattr(self, request_type)(
File
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 660, in process_bundle
bundle_processor = self.bundle_processor_cache.get(
File
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 491, in get
processor = bundle_processor.BundleProcessor(
File
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 876, in __init__
_verify_descriptor_created_in_a_compatible_env(process_bundle_descriptor)
File
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 839, in _verify_descriptor_created_in_a_compatible_env
raise RuntimeError(
*RuntimeError: Pipeline construction environment and
pipeline runtime environment are not compatible. If you
use a custom container image, check that the Python
interpreter minor version and the Apache Beam version in
your image match the versions used at pipeline
construction time. Submission environment:
beam:version:sdk_base:apache/beam_java8_sdk:2.48.0.
Runtime environment:
beam:version:sdk_base:apache/beam_python3.8_sdk:2.48.0.*
2023-08-09 10:25:37,931
INFOorg.apache.flink.runtime.taskmanager.Task[] - [3]Read
from Kafka/{KafkaIO.Read, Remove Kafka Metadata} ->
[1]Print messages (1/1)#0
(3a42a4a4b7edf55899dc956496d8f99b_03f93075562d7d50bb0b07080b2ebe35_0_0)
switched from INITIALIZING to RUNNING.
2023-08-09 10:28:37,784
WARNorg.apache.flink.runtime.taskmanager.Task[] - Source:
Impulse -> [3]Read from
Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor),
KafkaIO.ReadSourceDescriptors} (1/1)#0
(3a42a4a4b7edf55899dc956496d8f99b_cbc357ccb763df2852fee8c4fc7d55f2_0_0)
switched from RUNNING to FAILED with failure cause:
java.lang.RuntimeException: Failed to start remote bundle
Thanks
kapil Dev