Re: using python sdk+kafka under k8s

2021-03-04 Thread Kyle Weaver
The problem is that Kafka is a "cross-language" transform that is
implemented in Java. You have  configured your Python pipeline to run with
environment_type=EXTERNAL. However the Kafka transform has its own
environment that has environment_type=DOCKER, it does not respect the
environment_type you set for the pipeline. Currently I don't think there's
a way to configure the environment for an external transform; I brought up
this issue in a recent thread [1]. The reason for the error you are seeing
is that environment_type=DOCKER tries to start up Docker inside your Flink
workers, which must not have Docker installed.

[1]
https://lists.apache.org/thread.html/r6f6fc207ed62e1bf2a1d41deeeab554e35cd2af389ce38289a303cea%40%3Cdev.beam.apache.org%3E

On Thu, Mar 4, 2021 at 2:28 AM yilun zhang  wrote:

> hmmm, looks like I may fail due to docker environment:
> Traceback (most recent call last):
>   File "/usr/local/lib/python3.7/runpy.py", line 193, in
> _run_module_as_main
> "__main__", mod_spec)
>   File "/usr/local/lib/python3.7/runpy.py", line 85, in _run_code
> exec(code, run_globals)
>   File "/tmp/kafka_test.py", line 26, in 
> run_pipeline()
>   File "/tmp/kafka_test.py", line 22, in run_pipeline
> |beam.Map(print)
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 583, in __exit__
> self.result.wait_until_finish()
>   File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py",
> line 581, in wait_until_finish
> raise self._runtime_exception
> RuntimeError: Pipeline job-0f33f7f0-4fb4-4a57-a0fe-c4b2c34caff8 failed in
> state FAILED:
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
> java.io.IOException: Cannot run program "docker": error=2, No such file or
> directory
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4966)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:451)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:436)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303)
> at
> org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)
> at
> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:202)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:243)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:426)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Cannot run program "docker": error=2, No
> such file or directory
> at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
> at
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:189)
> at
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:171)
> at
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runImage(DockerCommand.java:95)
> at
> org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:131)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:252)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:231)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
> at
> 

Re: using python sdk+kafka under k8s

2021-03-04 Thread yilun zhang
hmmm, looks like I may fail due to docker environment:
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/runpy.py", line 193, in _run_module_as_main
"__main__", mod_spec)
  File "/usr/local/lib/python3.7/runpy.py", line 85, in _run_code
exec(code, run_globals)
  File "/tmp/kafka_test.py", line 26, in 
run_pipeline()
  File "/tmp/kafka_test.py", line 22, in run_pipeline
|beam.Map(print)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 583, in __exit__
self.result.wait_until_finish()
  File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py",
line 581, in wait_until_finish
raise self._runtime_exception
RuntimeError: Pipeline job-0f33f7f0-4fb4-4a57-a0fe-c4b2c34caff8 failed in
state FAILED:
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
java.io.IOException: Cannot run program "docker": error=2, No such file or
directory
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4966)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:451)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:436)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303)
at
org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)
at
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:202)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:243)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:426)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Cannot run program "docker": error=2, No
such file or directory
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
at
org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:189)
at
org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:171)
at
org.apache.beam.runners.fnexecution.environment.DockerCommand.runImage(DockerCommand.java:95)
at
org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:131)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:252)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:231)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)
... 14 more
Caused by: java.io.IOException: error=2, No such file or directory
at java.lang.UNIXProcess.forkAndExec(Native Method)
at java.lang.UNIXProcess.(UNIXProcess.java:247)
at java.lang.ProcessImpl.start(ProcessImpl.java:134)
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
... 28 more


I tried to create yaml like to mount docker in local like:
apiVersion: v1
kind: Pod
metadata:
  name: beam-pod
  namespace: default
spec:
  volumes:
- name: docker-sock
  hostPath:
path: "/var/run/docker.sock"
type: Socket
- name: 

Re: using python sdk+kafka under k8s

2021-03-04 Thread yilun zhang
We create a custom docker image, which include java runtime, python and
docker environment to run our job. But encountered timeout exception:

root@beam-pod:/tmp# PYTHONPATH='./' python  -m kafka_test
--runner=FlinkRunner --flink_master=beam-flink-cluster-jobmanager:8081
--flink_submit_uber_jar --environment_type=EXTERNAL
--environment_config=localhost:5
WARNING:root:Make sure that locally built Python SDK docker image has
Python 3.7 interpreter.
ERROR:root:java.util.concurrent.TimeoutException: The heartbeat of
TaskManager with id 10.190.29.80:6122-88ce88  timed out.
at
org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(ResourceManager.java:1442)
at
org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



Our test code is super simple:


import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.external.kafka import ReadFromKafka, WriteToKafka
from apache_beam.io import WriteToText

def run_pipeline():
  with beam.Pipeline(options=PipelineOptions( runner="FlinkRunner",
flink_master="beam-flink-cluster-jobmanager:8081",
environment_type="EXTERNAL",
environment_config="localhost:5")) as p:
(p
 | 'Read from Kafka' >>
ReadFromKafka(consumer_config={'bootstrap.servers':
'zookeeper.libra.ubiquant:31090',

 'auto.offset.reset': 'latest'},
  topics=['test001'])
 | 'Par with 1' >> beam.Map(lambda word: (word, 1))
 | 'Window of 10 seconds' >> beam.WindowInto(window.FixedWindows(5))
 | 'Group by key' >> beam.GroupByKey()
 | 'Sum word counts' >> beam.Map(lambda kv: (kv[0], sum(kv[1])))
   #  | "Write to Kafka" >>
WriteToKafka(producer_config={'bootstrap.servers':
'zookeeper.libra.ubiquant:31090'}, topic='test001')
 | 'Write to text' >> WriteToText("/tmp/output2")
)

if __name__ == '__main__':
  run_pipeline()


Is there any suggestion on debugging direction? In flink UI, it looks like
it failed from first step, ReadFromKafka.

Thanks,
Yilun

On Sat, Feb 27, 2021 at 2:16 AM Kyle Weaver  wrote:

> In Beam, the Kafka connector does not know anything about the underlying
> execution engine (here Flink). It is instead translated by the runner into
> a user defined function in Flink. So it is expected that the resulting DAG
> does not look the same as it would with a native Flink source.
>
> On Fri, Feb 26, 2021 at 5:18 AM yilun zhang  wrote:
>
>> So sorry for subscribing errors on my side resulted in multiple duplicate
>> email!
>>
>> Thanks for reply and it does help!
>>
>> I am confused when submitting beam job with kafka connector to flink, I
>> noticed that flink DAG diagram will included readFromKafka as part of flink
>> workflow. while if we submit a pyflink job(connected with kafka) directly
>> to flink, the flink workflow will exclude reading from kafka(which is the
>> resource) but only has data processing parts.
>>
>> Is that how beam want flink to do?
>>
>> Thanks a lot and sincerely apologize again for silly duplicated emails!
>>
>> Yilun
>>
>> Sam Bourne 于2021年2月25日 周四上午11:58写道:
>>
>>> Hi Yilun!
>>>
>>> I made a quick proof of concept repo showcasing 

Re: using python sdk+kafka under k8s

2021-02-26 Thread Kyle Weaver
In Beam, the Kafka connector does not know anything about the underlying
execution engine (here Flink). It is instead translated by the runner into
a user defined function in Flink. So it is expected that the resulting DAG
does not look the same as it would with a native Flink source.

On Fri, Feb 26, 2021 at 5:18 AM yilun zhang  wrote:

> So sorry for subscribing errors on my side resulted in multiple duplicate
> email!
>
> Thanks for reply and it does help!
>
> I am confused when submitting beam job with kafka connector to flink, I
> noticed that flink DAG diagram will included readFromKafka as part of flink
> workflow. while if we submit a pyflink job(connected with kafka) directly
> to flink, the flink workflow will exclude reading from kafka(which is the
> resource) but only has data processing parts.
>
> Is that how beam want flink to do?
>
> Thanks a lot and sincerely apologize again for silly duplicated emails!
>
> Yilun
>
> Sam Bourne 于2021年2月25日 周四上午11:58写道:
>
>> Hi Yilun!
>>
>> I made a quick proof of concept repo showcasing how to run a beam
>> pipeline in flink on k8s. It may be useful for you as reference.
>>
>> https://github.com/sambvfx/beam-flink-k8s
>>
>>
>> On Wed, Feb 24, 2021, 8:13 AM yilun zhang  wrote:
>>
>>> Hey,
>>>
>>> Our team is trying to use beam with connector Kafka and runner flink to
>>> gather information and process data. We adopt python sdk and build in java
>>> 11 in python 3.7 sdk image as java runtime for kafka expansion service.
>>>  so :
>>> image: beam python 3.7 docker image + build in java 11
>>> connector: kafka
>>> runner: flink
>>> container: kubernetes
>>>
>>> We encounter an docker not found error when running:
>>>  python3 -m kafka_test --runner=FlinkRunner
>>> --flink_master=flink-job-manager:8081 --flink_submit_uber_jar
>>> --environment_type=EXTERNAL --environment_config=localhost:5
>>>
>>> We notice that in https://beam.apache.org/roadmap/portability/ it
>>> mentioned the prerequisite also includes Docker. We wonder what is the
>>> docker usage here? Is there any suggested way to build docker in
>>> k8s container? (something maybe like sysbox for docker in docker?)
>>>
>>> Or maybe we should not use beam sdk+runner in k8s?
>>>
>>> Thanks,
>>> Yilun
>>>
>>


Re: using python sdk+kafka under k8s

2021-02-26 Thread yilun zhang
So sorry for subscribing errors on my side resulted in multiple duplicate
email!

Thanks for reply and it does help!

I am confused when submitting beam job with kafka connector to flink, I
noticed that flink DAG diagram will included readFromKafka as part of flink
workflow. while if we submit a pyflink job(connected with kafka) directly
to flink, the flink workflow will exclude reading from kafka(which is the
resource) but only has data processing parts.

Is that how beam want flink to do?

Thanks a lot and sincerely apologize again for silly duplicated emails!

Yilun

Sam Bourne 于2021年2月25日 周四上午11:58写道:

> Hi Yilun!
>
> I made a quick proof of concept repo showcasing how to run a beam pipeline
> in flink on k8s. It may be useful for you as reference.
>
> https://github.com/sambvfx/beam-flink-k8s
>
>
> On Wed, Feb 24, 2021, 8:13 AM yilun zhang  wrote:
>
>> Hey,
>>
>> Our team is trying to use beam with connector Kafka and runner flink to
>> gather information and process data. We adopt python sdk and build in java
>> 11 in python 3.7 sdk image as java runtime for kafka expansion service.
>>  so :
>> image: beam python 3.7 docker image + build in java 11
>> connector: kafka
>> runner: flink
>> container: kubernetes
>>
>> We encounter an docker not found error when running:
>>  python3 -m kafka_test --runner=FlinkRunner
>> --flink_master=flink-job-manager:8081 --flink_submit_uber_jar
>> --environment_type=EXTERNAL --environment_config=localhost:5
>>
>> We notice that in https://beam.apache.org/roadmap/portability/ it
>> mentioned the prerequisite also includes Docker. We wonder what is the
>> docker usage here? Is there any suggested way to build docker in
>> k8s container? (something maybe like sysbox for docker in docker?)
>>
>> Or maybe we should not use beam sdk+runner in k8s?
>>
>> Thanks,
>> Yilun
>>
>


Re: using python sdk+kafka under k8s

2021-02-24 Thread Sam Bourne
Hi Yilun!

I made a quick proof of concept repo showcasing how to run a beam pipeline
in flink on k8s. It may be useful for you as reference.

https://github.com/sambvfx/beam-flink-k8s

On Wed, Feb 24, 2021, 8:13 AM yilun zhang  wrote:

> Hey,
>
> Our team is trying to use beam with connector Kafka and runner flink to
> gather information and process data. We adopt python sdk and build in java
> 11 in python 3.7 sdk image as java runtime for kafka expansion service.
>  so :
> image: beam python 3.7 docker image + build in java 11
> connector: kafka
> runner: flink
> container: kubernetes
>
> We encounter an docker not found error when running:
>  python3 -m kafka_test --runner=FlinkRunner
> --flink_master=flink-job-manager:8081 --flink_submit_uber_jar
> --environment_type=EXTERNAL --environment_config=localhost:5
>
> We notice that in https://beam.apache.org/roadmap/portability/ it
> mentioned the prerequisite also includes Docker. We wonder what is the
> docker usage here? Is there any suggested way to build docker in
> k8s container? (something maybe like sysbox for docker in docker?)
>
> Or maybe we should not use beam sdk+runner in k8s?
>
> Thanks,
> Yilun
>