A quick update:
I expose docker.sock to container which could get docker part work, but
sadly encountered new issue with flink runner:
root@beam-pod:/tmp# PYTHONPATH='./' python -m kafka_test
--runner=FlinkRunner --flink_master=beam-flink-cluster-jobmanager:8081
--flink_submit_uber_jar --environment_type=EXTERNAL
--environment_config=localhost:50000
WARNING:root:Make sure that locally built Python SDK docker image has
Python 3.7 interpreter.
ERROR:root:java.util.concurrent.TimeoutException: The heartbeat of
TaskManager with id 10.190.29.80:6122-88ce88 timed out.
at
org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(ResourceManager.java:1442)
at
org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Our test code is super simple:
import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.external.kafka import ReadFromKafka, WriteToKafka
from apache_beam.io import WriteToText
def run_pipeline():
with beam.Pipeline(options=PipelineOptions( runner="FlinkRunner",
flink_master="beam-flink-cluster-jobmanager:8081",
environment_type="EXTERNAL",
environment_config="localhost:50000")) as p:
(p
| 'Read from Kafka' >>
ReadFromKafka(consumer_config={'bootstrap.servers':
'zookeeper.libra.ubiquant:31090',
'auto.offset.reset': 'latest'},
topics=['test001'])
| 'Par with 1' >> beam.Map(lambda word: (word, 1))
| 'Window of 10 seconds' >> beam.WindowInto(window.FixedWindows(5))
| 'Group by key' >> beam.GroupByKey()
| 'Sum word counts' >> beam.Map(lambda kv: (kv[0], sum(kv[1])))
# | "Write to Kafka" >>
WriteToKafka(producer_config={'bootstrap.servers':
'zookeeper.libra.ubiquant:31090'}, topic='test001')
| 'Write to text' >> WriteToText("/tmp/output2")
)
if __name__ == '__main__':
run_pipeline()
Confused about how the timeout triggered >.<
Thanks,
Yilun
On Wed, Feb 24, 2021 at 4:58 PM yilun zhang <[email protected]> wrote:
> Hey,
>
> Our team is trying to use beam with connector Kafka and runner flink to
> gather information and process data. We adopt python sdk and build in java
> 11 in python 3.7 sdk image as java runtime for kafka expansion service.
>
> so :
> image: beam python 3.7 docker image + self build in java 11
> connector: kafka
> runner: flink
> container: kubernetes
>
>
> We encounter an docker not found error when running(logs below):
> python3 -m kafka_test --runner=FlinkRunner
> --flink_master=flink-job-manager:8081 --flink_submit_uber_jar
> --environment_type=EXTERNAL --environment_config=localhost:50000
>
> We notice that in https://beam.apache.org/roadmap/portability/ it
> mentioned the prerequisite also includes Docker. We wonder what is the
> docker usage here? Is there any suggested way to build docker in k8s
> container? (something maybe like sysbox for docker in docker? )
>
> Or is there any recommended way/best practice to use beam python
> sdk(kafka)+ flink runner in k8s/container environment? I attached our logs
> as below:
>
>
> Here are our logs:
>
> Traceback (most recent call last):
> File "/usr/local/lib/python3.7/runpy.py", line 193, in
> _run_module_as_main
> "__main__", mod_spec)
> File "/usr/local/lib/python3.7/runpy.py", line 85, in _run_code
> exec(code, run_globals)
> File "/tmp/kafka_test.py", line 25, in <module>
> run_pipeline()
> File "/tmp/kafka_test.py", line 21, in run_pipeline
> | 'Write to text' >> WriteToText("/tmp/output2")
> File "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 583, in __exit__
> self.result.wait_until_finish()
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py",
> line 581, in wait_until_finish
> raise self._runtime_exception
> RuntimeError: Pipeline job-93bbf6ce-f666-4d46-803d-320983fca3f1 failed in
> state FAILED:
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
> java.io.IOException: Cannot run program "docker": error=2, No such file or
> directory
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4966)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:451)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:436)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303)
> at
> org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)
> at
> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:202)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:243)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:426)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Cannot run program "docker": error=2, No
> such file or directory
> at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
> at
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:189)
> at
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:171)
> at
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runImage(DockerCommand.java:95)
> at
> org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:131)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:252)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:231)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)
> ... 14 more
> Caused by: java.io.IOException: error=2, No such file or directory
> at java.lang.UNIXProcess.forkAndExec(Native Method)
> at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
> at java.lang.ProcessImpl.start(ProcessImpl.java:134)
> at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
> ... 28 more
>
> Really appreciated for your time and help!
>
> Thanks,
> Yilun
>