hmmm, looks like I may fail due to docker environment:
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "/usr/local/lib/python3.7/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/tmp/kafka_test.py", line 26, in <module>
    run_pipeline()
  File "/tmp/kafka_test.py", line 22, in run_pipeline
    |beam.Map(print)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 583, in __exit__
    self.result.wait_until_finish()
  File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py",
line 581, in wait_until_finish
    raise self._runtime_exception
RuntimeError: Pipeline job-0f33f7f0-4fb4-4a57-a0fe-c4b2c34caff8 failed in
state FAILED:
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
java.io.IOException: Cannot run program "docker": error=2, No such file or
directory
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4966)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:451)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:436)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303)
at
org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)
at
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:202)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:243)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:426)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Cannot run program "docker": error=2, No
such file or directory
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
at
org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:189)
at
org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:171)
at
org.apache.beam.runners.fnexecution.environment.DockerCommand.runImage(DockerCommand.java:95)
at
org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:131)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:252)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:231)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)
... 14 more
Caused by: java.io.IOException: error=2, No such file or directory
at java.lang.UNIXProcess.forkAndExec(Native Method)
at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
at java.lang.ProcessImpl.start(ProcessImpl.java:134)
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
... 28 more


I tried to create yaml like to mount docker in local like:
apiVersion: v1
kind: Pod
metadata:
  name: beam-pod
  namespace: default
spec:
  volumes:
    - name: docker-sock
      hostPath:
        path: "/var/run/docker.sock"
        type: Socket
    - name: docker-directory
      hostPath:
        path: "/var/lib/docker"
        type: Directory
  containers:
  - image: python3.7sdk_java11_beam:2.27.0
    command: ["sleep","3600"]
    name: beam-pod
    volumeMounts:
      - mountPath: "/var/run/docker.sock"
        name: docker-sock
        readOnly: false
      - mountPath: "/var/lib/docker"
        name: docker-directory
        readOnly: false
    securityContext:
      privileged: true
      runAsUser: 0
    imagePullPolicy: Never
  restartPolicy: Never

And hello-world example runs fine:
root@beam-pod:/tmp# docker run hello-world

Hello from Docker!


Thanks again!
Yilun

On Thu, Mar 4, 2021 at 4:44 PM yilun zhang <[email protected]> wrote:

> We create a custom docker image, which include java runtime, python and
> docker environment to run our job. But encountered timeout exception:
>
> root@beam-pod:/tmp# PYTHONPATH='./' python  -m kafka_test
> --runner=FlinkRunner --flink_master=beam-flink-cluster-jobmanager:8081
> --flink_submit_uber_jar --environment_type=EXTERNAL
> --environment_config=localhost:50000
> WARNING:root:Make sure that locally built Python SDK docker image has
> Python 3.7 interpreter.
> ERROR:root:java.util.concurrent.TimeoutException: The heartbeat of
> TaskManager with id 10.190.29.80:6122-88ce88  timed out.
> at
> org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(ResourceManager.java:1442)
> at
> org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
> Our test code is super simple:
>
>
> import apache_beam as beam
> import apache_beam.transforms.window as window
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.io.external.kafka import ReadFromKafka, WriteToKafka
> from apache_beam.io import WriteToText
>
> def run_pipeline():
>   with beam.Pipeline(options=PipelineOptions( runner="FlinkRunner",
>             flink_master="beam-flink-cluster-jobmanager:8081",
>             environment_type="EXTERNAL",
>             environment_config="localhost:50000")) as p:
>     (p
>      | 'Read from Kafka' >>
> ReadFromKafka(consumer_config={'bootstrap.servers':
> 'zookeeper.libra.ubiquant:31090',
>
>  'auto.offset.reset': 'latest'},
>                                           topics=['test001'])
>      | 'Par with 1' >> beam.Map(lambda word: (word, 1))
>      | 'Window of 10 seconds' >> beam.WindowInto(window.FixedWindows(5))
>      | 'Group by key' >> beam.GroupByKey()
>      | 'Sum word counts' >> beam.Map(lambda kv: (kv[0], sum(kv[1])))
>    #  | "Write to Kafka" >>
> WriteToKafka(producer_config={'bootstrap.servers':
> 'zookeeper.libra.ubiquant:31090'}, topic='test001')
>      | 'Write to text' >> WriteToText("/tmp/output2")
>     )
>
> if __name__ == '__main__':
>   run_pipeline()
>
>
> Is there any suggestion on debugging direction? In flink UI, it looks like
> it failed from first step, ReadFromKafka.
>
> Thanks,
> Yilun
>
> On Sat, Feb 27, 2021 at 2:16 AM Kyle Weaver <[email protected]> wrote:
>
>> In Beam, the Kafka connector does not know anything about the underlying
>> execution engine (here Flink). It is instead translated by the runner into
>> a user defined function in Flink. So it is expected that the resulting DAG
>> does not look the same as it would with a native Flink source.
>>
>> On Fri, Feb 26, 2021 at 5:18 AM yilun zhang <[email protected]> wrote:
>>
>>> So sorry for subscribing errors on my side resulted in multiple
>>> duplicate email!
>>>
>>> Thanks for reply and it does help!
>>>
>>> I am confused when submitting beam job with kafka connector to flink, I
>>> noticed that flink DAG diagram will included readFromKafka as part of flink
>>> workflow. while if we submit a pyflink job(connected with kafka) directly
>>> to flink, the flink workflow will exclude reading from kafka(which is the
>>> resource) but only has data processing parts.
>>>
>>> Is that how beam want flink to do?
>>>
>>> Thanks a lot and sincerely apologize again for silly duplicated emails!
>>>
>>> Yilun
>>>
>>> Sam Bourne <[email protected]>于2021年2月25日 周四上午11:58写道:
>>>
>>>> Hi Yilun!
>>>>
>>>> I made a quick proof of concept repo showcasing how to run a beam
>>>> pipeline in flink on k8s. It may be useful for you as reference.
>>>>
>>>> https://github.com/sambvfx/beam-flink-k8s
>>>>
>>>>
>>>> On Wed, Feb 24, 2021, 8:13 AM yilun zhang <[email protected]> wrote:
>>>>
>>>>> Hey,
>>>>>
>>>>> Our team is trying to use beam with connector Kafka and runner flink
>>>>> to gather information and process data. We adopt python sdk and build in
>>>>> java 11 in python 3.7 sdk image as java runtime for kafka expansion 
>>>>> service.
>>>>>  so :
>>>>> image: beam python 3.7 docker image + build in java 11
>>>>> connector: kafka
>>>>> runner: flink
>>>>> container: kubernetes
>>>>>
>>>>> We encounter an docker not found error when running:
>>>>>  python3 -m kafka_test --runner=FlinkRunner
>>>>> --flink_master=flink-job-manager:8081 --flink_submit_uber_jar
>>>>> --environment_type=EXTERNAL --environment_config=localhost:50000
>>>>>
>>>>> We notice that in https://beam.apache.org/roadmap/portability/ it
>>>>> mentioned the prerequisite also includes Docker. We wonder what is the
>>>>> docker usage here? Is there any suggested way to build docker in
>>>>> k8s container? (something maybe like sysbox for docker in docker?)
>>>>>
>>>>> Or maybe we should not use beam sdk+runner in k8s?
>>>>>
>>>>> Thanks,
>>>>> Yilun
>>>>>
>>>>

Reply via email to