Re: using python sdk+kafka under k8s
The problem is that Kafka is a "cross-language" transform that is implemented in Java. You have configured your Python pipeline to run with environment_type=EXTERNAL. However the Kafka transform has its own environment that has environment_type=DOCKER, it does not respect the environment_type you set for the pipeline. Currently I don't think there's a way to configure the environment for an external transform; I brought up this issue in a recent thread [1]. The reason for the error you are seeing is that environment_type=DOCKER tries to start up Docker inside your Flink workers, which must not have Docker installed. [1] https://lists.apache.org/thread.html/r6f6fc207ed62e1bf2a1d41deeeab554e35cd2af389ce38289a303cea%40%3Cdev.beam.apache.org%3E On Thu, Mar 4, 2021 at 2:28 AM yilun zhang wrote: > hmmm, looks like I may fail due to docker environment: > Traceback (most recent call last): > File "/usr/local/lib/python3.7/runpy.py", line 193, in > _run_module_as_main > "__main__", mod_spec) > File "/usr/local/lib/python3.7/runpy.py", line 85, in _run_code > exec(code, run_globals) > File "/tmp/kafka_test.py", line 26, in > run_pipeline() > File "/tmp/kafka_test.py", line 22, in run_pipeline > |beam.Map(print) > File "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", > line 583, in __exit__ > self.result.wait_until_finish() > File > "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py", > line 581, in wait_until_finish > raise self._runtime_exception > RuntimeError: Pipeline job-0f33f7f0-4fb4-4a57-a0fe-c4b2c34caff8 failed in > state FAILED: > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: > java.io.IOException: Cannot run program "docker": error=2, No such file or > directory > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4966) > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:451) > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:436) > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303) > at > org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38) > at > org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:202) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:243) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:426) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.IOException: Cannot run program "docker": error=2, No > such file or directory > at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048) > at > org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:189) > at > org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:171) > at > org.apache.beam.runners.fnexecution.environment.DockerCommand.runImage(DockerCommand.java:95) > at > org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:131) > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:252) > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:231) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044) > at >
Re: using python sdk+kafka under k8s
hmmm, looks like I may fail due to docker environment: Traceback (most recent call last): File "/usr/local/lib/python3.7/runpy.py", line 193, in _run_module_as_main "__main__", mod_spec) File "/usr/local/lib/python3.7/runpy.py", line 85, in _run_code exec(code, run_globals) File "/tmp/kafka_test.py", line 26, in run_pipeline() File "/tmp/kafka_test.py", line 22, in run_pipeline |beam.Map(print) File "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line 583, in __exit__ self.result.wait_until_finish() File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py", line 581, in wait_until_finish raise self._runtime_exception RuntimeError: Pipeline job-0f33f7f0-4fb4-4a57-a0fe-c4b2c34caff8 failed in state FAILED: org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.io.IOException: Cannot run program "docker": error=2, No such file or directory at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4966) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:451) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:436) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303) at org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38) at org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:202) at org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:243) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:426) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Cannot run program "docker": error=2, No such file or directory at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048) at org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:189) at org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:171) at org.apache.beam.runners.fnexecution.environment.DockerCommand.runImage(DockerCommand.java:95) at org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:131) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:252) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:231) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964) ... 14 more Caused by: java.io.IOException: error=2, No such file or directory at java.lang.UNIXProcess.forkAndExec(Native Method) at java.lang.UNIXProcess.(UNIXProcess.java:247) at java.lang.ProcessImpl.start(ProcessImpl.java:134) at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029) ... 28 more I tried to create yaml like to mount docker in local like: apiVersion: v1 kind: Pod metadata: name: beam-pod namespace: default spec: volumes: - name: docker-sock hostPath: path: "/var/run/docker.sock" type: Socket - name:
Re: using python sdk+kafka under k8s
We create a custom docker image, which include java runtime, python and docker environment to run our job. But encountered timeout exception: root@beam-pod:/tmp# PYTHONPATH='./' python -m kafka_test --runner=FlinkRunner --flink_master=beam-flink-cluster-jobmanager:8081 --flink_submit_uber_jar --environment_type=EXTERNAL --environment_config=localhost:5 WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. ERROR:root:java.util.concurrent.TimeoutException: The heartbeat of TaskManager with id 10.190.29.80:6122-88ce88 timed out. at org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(ResourceManager.java:1442) at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Our test code is super simple: import apache_beam as beam import apache_beam.transforms.window as window from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.io.external.kafka import ReadFromKafka, WriteToKafka from apache_beam.io import WriteToText def run_pipeline(): with beam.Pipeline(options=PipelineOptions( runner="FlinkRunner", flink_master="beam-flink-cluster-jobmanager:8081", environment_type="EXTERNAL", environment_config="localhost:5")) as p: (p | 'Read from Kafka' >> ReadFromKafka(consumer_config={'bootstrap.servers': 'zookeeper.libra.ubiquant:31090', 'auto.offset.reset': 'latest'}, topics=['test001']) | 'Par with 1' >> beam.Map(lambda word: (word, 1)) | 'Window of 10 seconds' >> beam.WindowInto(window.FixedWindows(5)) | 'Group by key' >> beam.GroupByKey() | 'Sum word counts' >> beam.Map(lambda kv: (kv[0], sum(kv[1]))) # | "Write to Kafka" >> WriteToKafka(producer_config={'bootstrap.servers': 'zookeeper.libra.ubiquant:31090'}, topic='test001') | 'Write to text' >> WriteToText("/tmp/output2") ) if __name__ == '__main__': run_pipeline() Is there any suggestion on debugging direction? In flink UI, it looks like it failed from first step, ReadFromKafka. Thanks, Yilun On Sat, Feb 27, 2021 at 2:16 AM Kyle Weaver wrote: > In Beam, the Kafka connector does not know anything about the underlying > execution engine (here Flink). It is instead translated by the runner into > a user defined function in Flink. So it is expected that the resulting DAG > does not look the same as it would with a native Flink source. > > On Fri, Feb 26, 2021 at 5:18 AM yilun zhang wrote: > >> So sorry for subscribing errors on my side resulted in multiple duplicate >> email! >> >> Thanks for reply and it does help! >> >> I am confused when submitting beam job with kafka connector to flink, I >> noticed that flink DAG diagram will included readFromKafka as part of flink >> workflow. while if we submit a pyflink job(connected with kafka) directly >> to flink, the flink workflow will exclude reading from kafka(which is the >> resource) but only has data processing parts. >> >> Is that how beam want flink to do? >> >> Thanks a lot and sincerely apologize again for silly duplicated emails! >> >> Yilun >> >> Sam Bourne 于2021年2月25日 周四上午11:58写道: >> >>> Hi Yilun! >>> >>> I made a quick proof of concept repo showcasing
Re: using python sdk+kafka under k8s
In Beam, the Kafka connector does not know anything about the underlying execution engine (here Flink). It is instead translated by the runner into a user defined function in Flink. So it is expected that the resulting DAG does not look the same as it would with a native Flink source. On Fri, Feb 26, 2021 at 5:18 AM yilun zhang wrote: > So sorry for subscribing errors on my side resulted in multiple duplicate > email! > > Thanks for reply and it does help! > > I am confused when submitting beam job with kafka connector to flink, I > noticed that flink DAG diagram will included readFromKafka as part of flink > workflow. while if we submit a pyflink job(connected with kafka) directly > to flink, the flink workflow will exclude reading from kafka(which is the > resource) but only has data processing parts. > > Is that how beam want flink to do? > > Thanks a lot and sincerely apologize again for silly duplicated emails! > > Yilun > > Sam Bourne 于2021年2月25日 周四上午11:58写道: > >> Hi Yilun! >> >> I made a quick proof of concept repo showcasing how to run a beam >> pipeline in flink on k8s. It may be useful for you as reference. >> >> https://github.com/sambvfx/beam-flink-k8s >> >> >> On Wed, Feb 24, 2021, 8:13 AM yilun zhang wrote: >> >>> Hey, >>> >>> Our team is trying to use beam with connector Kafka and runner flink to >>> gather information and process data. We adopt python sdk and build in java >>> 11 in python 3.7 sdk image as java runtime for kafka expansion service. >>> so : >>> image: beam python 3.7 docker image + build in java 11 >>> connector: kafka >>> runner: flink >>> container: kubernetes >>> >>> We encounter an docker not found error when running: >>> python3 -m kafka_test --runner=FlinkRunner >>> --flink_master=flink-job-manager:8081 --flink_submit_uber_jar >>> --environment_type=EXTERNAL --environment_config=localhost:5 >>> >>> We notice that in https://beam.apache.org/roadmap/portability/ it >>> mentioned the prerequisite also includes Docker. We wonder what is the >>> docker usage here? Is there any suggested way to build docker in >>> k8s container? (something maybe like sysbox for docker in docker?) >>> >>> Or maybe we should not use beam sdk+runner in k8s? >>> >>> Thanks, >>> Yilun >>> >>
Re: using python sdk+kafka under k8s
So sorry for subscribing errors on my side resulted in multiple duplicate email! Thanks for reply and it does help! I am confused when submitting beam job with kafka connector to flink, I noticed that flink DAG diagram will included readFromKafka as part of flink workflow. while if we submit a pyflink job(connected with kafka) directly to flink, the flink workflow will exclude reading from kafka(which is the resource) but only has data processing parts. Is that how beam want flink to do? Thanks a lot and sincerely apologize again for silly duplicated emails! Yilun Sam Bourne 于2021年2月25日 周四上午11:58写道: > Hi Yilun! > > I made a quick proof of concept repo showcasing how to run a beam pipeline > in flink on k8s. It may be useful for you as reference. > > https://github.com/sambvfx/beam-flink-k8s > > > On Wed, Feb 24, 2021, 8:13 AM yilun zhang wrote: > >> Hey, >> >> Our team is trying to use beam with connector Kafka and runner flink to >> gather information and process data. We adopt python sdk and build in java >> 11 in python 3.7 sdk image as java runtime for kafka expansion service. >> so : >> image: beam python 3.7 docker image + build in java 11 >> connector: kafka >> runner: flink >> container: kubernetes >> >> We encounter an docker not found error when running: >> python3 -m kafka_test --runner=FlinkRunner >> --flink_master=flink-job-manager:8081 --flink_submit_uber_jar >> --environment_type=EXTERNAL --environment_config=localhost:5 >> >> We notice that in https://beam.apache.org/roadmap/portability/ it >> mentioned the prerequisite also includes Docker. We wonder what is the >> docker usage here? Is there any suggested way to build docker in >> k8s container? (something maybe like sysbox for docker in docker?) >> >> Or maybe we should not use beam sdk+runner in k8s? >> >> Thanks, >> Yilun >> >
Re: using python sdk+kafka under k8s
Hi Yilun! I made a quick proof of concept repo showcasing how to run a beam pipeline in flink on k8s. It may be useful for you as reference. https://github.com/sambvfx/beam-flink-k8s On Wed, Feb 24, 2021, 8:13 AM yilun zhang wrote: > Hey, > > Our team is trying to use beam with connector Kafka and runner flink to > gather information and process data. We adopt python sdk and build in java > 11 in python 3.7 sdk image as java runtime for kafka expansion service. > so : > image: beam python 3.7 docker image + build in java 11 > connector: kafka > runner: flink > container: kubernetes > > We encounter an docker not found error when running: > python3 -m kafka_test --runner=FlinkRunner > --flink_master=flink-job-manager:8081 --flink_submit_uber_jar > --environment_type=EXTERNAL --environment_config=localhost:5 > > We notice that in https://beam.apache.org/roadmap/portability/ it > mentioned the prerequisite also includes Docker. We wonder what is the > docker usage here? Is there any suggested way to build docker in > k8s container? (something maybe like sysbox for docker in docker?) > > Or maybe we should not use beam sdk+runner in k8s? > > Thanks, > Yilun >