We create a custom docker image, which include java runtime, python and
docker environment to run our job. But encountered timeout exception:

root@beam-pod:/tmp# PYTHONPATH='./' python  -m kafka_test
--runner=FlinkRunner --flink_master=beam-flink-cluster-jobmanager:8081
--flink_submit_uber_jar --environment_type=EXTERNAL
--environment_config=localhost:50000
WARNING:root:Make sure that locally built Python SDK docker image has
Python 3.7 interpreter.
ERROR:root:java.util.concurrent.TimeoutException: The heartbeat of
TaskManager with id 10.190.29.80:6122-88ce88  timed out.
at
org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(ResourceManager.java:1442)
at
org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



Our test code is super simple:


import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.external.kafka import ReadFromKafka, WriteToKafka
from apache_beam.io import WriteToText

def run_pipeline():
  with beam.Pipeline(options=PipelineOptions( runner="FlinkRunner",
            flink_master="beam-flink-cluster-jobmanager:8081",
            environment_type="EXTERNAL",
            environment_config="localhost:50000")) as p:
    (p
     | 'Read from Kafka' >>
ReadFromKafka(consumer_config={'bootstrap.servers':
'zookeeper.libra.ubiquant:31090',

 'auto.offset.reset': 'latest'},
                                          topics=['test001'])
     | 'Par with 1' >> beam.Map(lambda word: (word, 1))
     | 'Window of 10 seconds' >> beam.WindowInto(window.FixedWindows(5))
     | 'Group by key' >> beam.GroupByKey()
     | 'Sum word counts' >> beam.Map(lambda kv: (kv[0], sum(kv[1])))
   #  | "Write to Kafka" >>
WriteToKafka(producer_config={'bootstrap.servers':
'zookeeper.libra.ubiquant:31090'}, topic='test001')
     | 'Write to text' >> WriteToText("/tmp/output2")
    )

if __name__ == '__main__':
  run_pipeline()


Is there any suggestion on debugging direction? In flink UI, it looks like
it failed from first step, ReadFromKafka.

Thanks,
Yilun

On Sat, Feb 27, 2021 at 2:16 AM Kyle Weaver <kcwea...@google.com> wrote:

> In Beam, the Kafka connector does not know anything about the underlying
> execution engine (here Flink). It is instead translated by the runner into
> a user defined function in Flink. So it is expected that the resulting DAG
> does not look the same as it would with a native Flink source.
>
> On Fri, Feb 26, 2021 at 5:18 AM yilun zhang <ilyak1...@gmail.com> wrote:
>
>> So sorry for subscribing errors on my side resulted in multiple duplicate
>> email!
>>
>> Thanks for reply and it does help!
>>
>> I am confused when submitting beam job with kafka connector to flink, I
>> noticed that flink DAG diagram will included readFromKafka as part of flink
>> workflow. while if we submit a pyflink job(connected with kafka) directly
>> to flink, the flink workflow will exclude reading from kafka(which is the
>> resource) but only has data processing parts.
>>
>> Is that how beam want flink to do?
>>
>> Thanks a lot and sincerely apologize again for silly duplicated emails!
>>
>> Yilun
>>
>> Sam Bourne <samb...@gmail.com>于2021年2月25日 周四上午11:58写道:
>>
>>> Hi Yilun!
>>>
>>> I made a quick proof of concept repo showcasing how to run a beam
>>> pipeline in flink on k8s. It may be useful for you as reference.
>>>
>>> https://github.com/sambvfx/beam-flink-k8s
>>>
>>>
>>> On Wed, Feb 24, 2021, 8:13 AM yilun zhang <ilyak1...@gmail.com> wrote:
>>>
>>>> Hey,
>>>>
>>>> Our team is trying to use beam with connector Kafka and runner flink to
>>>> gather information and process data. We adopt python sdk and build in java
>>>> 11 in python 3.7 sdk image as java runtime for kafka expansion service.
>>>>  so :
>>>> image: beam python 3.7 docker image + build in java 11
>>>> connector: kafka
>>>> runner: flink
>>>> container: kubernetes
>>>>
>>>> We encounter an docker not found error when running:
>>>>  python3 -m kafka_test --runner=FlinkRunner
>>>> --flink_master=flink-job-manager:8081 --flink_submit_uber_jar
>>>> --environment_type=EXTERNAL --environment_config=localhost:50000
>>>>
>>>> We notice that in https://beam.apache.org/roadmap/portability/ it
>>>> mentioned the prerequisite also includes Docker. We wonder what is the
>>>> docker usage here? Is there any suggested way to build docker in
>>>> k8s container? (something maybe like sysbox for docker in docker?)
>>>>
>>>> Or maybe we should not use beam sdk+runner in k8s?
>>>>
>>>> Thanks,
>>>> Yilun
>>>>
>>>

Reply via email to