Hello Maximilian,
I followed your guide with the wordcount.py example, to the step where
I connect to a remote Flink cluster from my laptop, and got stuck.
On the Flink server side, I saw that there's a connection established
from my laptop to the port 8081 of the server (shown in
/netstat),/ but nothing showed up in Flink GUI console.
On my python console, I could see that my job state changed to
/RUNNING/. On the other side, the JobService, the last log entry is
"/Submitting job to.../"
Are there any logs that could help me debug?
Would that be any chance that the Flink job jar file is too big to be
sent to my Flink cluster?
More interesting, if I change the port in the command to start the
JobService to an invalid one (e.g: from 8081 to 8082), the output on
my python console as well as on the JobService console stayed the same.
I also have another question regarding writing my stream into parquet
files. As mentioned in this site,
https://beam.apache.org/documentation/io/built-in/, there's no
file-based connectors for Python streaming yet, does that mean I also
need to use Java for the IO?
Thanks and best regards,
Averell
Here's the output on my python console - it stuck there:
(beam_env) Averell-Macbook:wordcount Averell$ python wordcount.py
INFO:root:Using latest locally built Python SDK docker image.
INFO:root:==================== <function lift_combiners at
0x119935e60> ====================
INFO:root:==================== <function expand_sdf at 0x119935ed8>
====================
INFO:root:Job state changed to RUNNING
And here is the output on the JobService:
(beam_env) Averell-Macbook:beam Averell$ ./gradlew
:beam-runners-flink-1.7-job-server:runShadow
-PflinkMasterUrl=10.10.64.121:8081 <http://10.10.64.121:8081>
Configuration on demand is an incubating feature.
> Task :beam-runners-flink-1.7-job-server:runShadow
Listening for transport dt_socket at address: 5005
[main] INFO
org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver -
ArtifactStagingService started on localhost:8098
[main] INFO
org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver -
Java ExpansionService started on localhost:8097
[main] INFO
org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver -
JobService started on localhost:8099
[grpc-default-executor-0] ERROR
org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService
- Encountered Unexpected Exception for Invocation
job_28626ac7-5339-4694-84c5-7e85f3b51a0
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusException:
NOT_FOUND
at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Status.asException(Status.java:534)
at
org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.getInvocation(InMemoryJobService.java:341)
at
org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.getStateStream(InMemoryJobService.java:262)
at
org.apache.beam.model.jobmanagement.v1.JobServiceGrpc$MethodHandlers.invoke(JobServiceGrpc.java:770)
at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:171)
at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:283)
at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:707)
at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[grpc-default-executor-0] INFO
org.apache.beam.runners.flink.FlinkJobInvoker - Invoking job
your-wordcount-job_de5850b5-e92f-4179-bcff-19169554aaef
[grpc-default-executor-0] INFO
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation -
Starting job invocation
your-wordcount-job_de5850b5-e92f-4179-bcff-19169554aaef
[flink-runner-job-invoker] INFO
org.apache.beam.runners.flink.FlinkPipelineRunner - Translating
pipeline to Flink program.
[flink-runner-job-invoker] INFO
org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating
a Batch Execution Environment.
[flink-runner-job-invoker] INFO
org.apache.beam.runners.flink.FlinkExecutionEnvironments - Using
Flink Master URL 10.10.64.126:36215 <http://10.10.64.126:36215>.
[flink-runner-job-invoker] WARN
org.apache.beam.runners.flink.FlinkExecutionEnvironments - No
default parallelism could be found. Defaulting to parallelism 1.
Please set an explicit parallelism with --parallelism
[flink-runner-job-invoker] INFO
org.apache.flink.api.java.ExecutionEnvironment - The job has 0
registered types and 0 default Kryo serializers
[flink-runner-job-invoker] WARN
org.apache.flink.configuration.Configuration - Config uses
deprecated configuration key 'jobmanager.rpc.address' instead of
proper key 'rest.address'
[flink-runner-job-invoker] INFO
org.apache.flink.runtime.rest.RestClient - Rest client endpoint
started.
[flink-runner-job-invoker] INFO
org.apache.flink.client.program.rest.RestClusterClient - Submitting
job 2ee00e434677116a298351eb77cdfaa4 (detached: false).
<============-> 98% EXECUTING [23m 3s]
> IDLE
> :beam-runners-flink-1.7-job-server:runShadow
> IDLE
> IDLE
On Sat, May 11, 2019 at 9:03 AM Averell Huyen Levan <lvhu...@gmail.com
<mailto:lvhu...@gmail.com>> wrote:
Hello Maximilian,
Thanks for your help.
The other part of my question was with running (Python) pipeline on
Flink-cluster runner. I read that page
https://beam.apache.org/documentation/runners/flink/ but felt
confused. Will try one more time and then come back if I am still
stuck with it.
Again, thanks a lot for your help.
Regards,
Averell
On Sat, 11 May 2019, 12:35 am Maximilian Michels, <m...@apache.org
<mailto:m...@apache.org>> wrote:
Hi Averell,
What you want to do is possible today but at this point is an
early
experimental feature. The reason for that is that Kafka is a
cross-language Java transform in a Python pipeline. We just
recently
enabled cross-language pipelines.
1. First of all, until 2.13.0 is released you will have to use
the
latest master version, e.g.
$ git clone https://github.com/apache/beam
2. Setup and activate a Python virtual environment:
$ virtualenv ~/beam_environemnt
$ source ~/beam_environment/bin/activate
4. Build the Python SDK:
$ cd beam
$ ./gradlew :beam-sdks-python:buildSnapshot
$ cd sdks/python/build/
$ unzip apache-beam-2.13.0.dev0.zip
$ cd apache-beam-2.13.0.dev0
$ python setup.py install
3. Start the Flink JobServer / ExpansionServer
$ ./gradlew :beam-runners-flink-1.7-job-server:runShadow
5. Create your Python pipeline and use the ReadFromKafka
transform, e.g.
options = ["--runner=PortableRunner",
"--job_endpoint=localhost:8099"]
p = Pipeline(options)
(p
|
ReadFromKafka(consumer_config={'bootstrap.servers':
'kafka_broker:port'},
topics=['myTopic'])
|
Map(lambda x,y: ...)
p.run()
6. Run your file with the python command :)
Note: If you do not set key_deserializer or value_serializer for
ReadFromKafka, you will receive the read data as KV[bytes,
bytes]. That
means you have to perform decoding inside Python. If you set a
Kafka
Deserializer, you can also receive the Kafka data already
decoded.
However, you are limited to the coders in ModelCoders. For
example, Int,
Long, KV, Iterable are supported; we also added String recently.
Hope that makes sense. Curious to see how your experiments go.
Cheers,
Max
PS: The best resources are in the doc comments of kafka.py:
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py
or here:
https://beam.apache.org/documentation/runners/flink/
https://beam.apache.org/roadmap/portability/
On 10.05.19 14:33, lvhu...@gmail.com <mailto:lvhu...@gmail.com>
wrote:
> Hi everyone,
>
> I am trying to get started with Python on Flink-cluster
runner, to build a pipeline that reads data from Kafka and write
to S3 in parquet format.
> I tried to search on Beam website, but could not find any
example (even for the basic word count). E.g, in this page
https://beam.apache.org/get-started/wordcount-example/, in all
Python - Flink-cluster sections, there's no content but "This
runner is not yet available for the Python SDK."
>
> At this point in time, is that possible to create such a
pipeline? From all the slides / videos, it seems feasible. But,
could you please lead me to some step-by-step guide?
>
> Thanks and best regards,
> Averell
>