Just saw that the malformed master URL was due to HTML formatting. It looks ok.

Please check your Flink JobManager logs. The JobManager might not reachable and the submission is just blocked on it becoming available.

Thanks,
Max

On 13.05.19 20:05, Maximilian Michels wrote:
Hi Averell,

Your Flink master URL does not look correct. You probably want to try to use "10.10.64.121:8081".

I also have another question regarding writing my stream into parquet files. As mentioned in this site, https://beam.apache.org/documentation/io/built-in/, there's no file-based connectors for Python streaming yet, does that mean I also need to use Java for the IO?

Batch sinks work for streaming pipelines but all data has to be put into the GlobalWindow before writing.

-Max

On 11.05.19 13:48, Averell Huyen Levan wrote:
Hello Maximilian,

I followed your guide with the wordcount.py example, to the step where I connect to a remote Flink cluster from my laptop, and got stuck. On the Flink server side, I saw that there's a connection established from my laptop to the port 8081 of the server (shown in /netstat),/ but nothing showed up in Flink GUI console. On my python console, I could see that my job state changed to /RUNNING/. On the other side, the JobService, the last log entry is "/Submitting job to.../"
Are there any logs that could help me debug?
Would that be any chance that the Flink job jar file is too big to be sent to my Flink cluster?

More interesting, if I change the port in the command to start the JobService to an invalid one (e.g: from 8081 to 8082), the output on my python console as well as on the JobService console stayed the same.

I also have another question regarding writing my stream into parquet files. As mentioned in this site, https://beam.apache.org/documentation/io/built-in/, there's no file-based connectors for Python streaming yet, does that mean I also need to use Java for the IO?

Thanks and best regards,
Averell

Here's the output on my python console - it stuck there:

    (beam_env) Averell-Macbook:wordcount Averell$ python wordcount.py
    INFO:root:Using latest locally built Python SDK docker image.
    INFO:root:==================== <function lift_combiners at
    0x119935e60> ====================
    INFO:root:==================== <function expand_sdf at 0x119935ed8>
    ====================
    INFO:root:Job state changed to RUNNING


And here is the output on the JobService:

    (beam_env) Averell-Macbook:beam Averell$ ./gradlew
    :beam-runners-flink-1.7-job-server:runShadow
    -PflinkMasterUrl=10.10.64.121:8081 <http://10.10.64.121:8081>
    Configuration on demand is an incubating feature.

     > Task :beam-runners-flink-1.7-job-server:runShadow
    Listening for transport dt_socket at address: 5005
    [main] INFO
    org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver -
    ArtifactStagingService started on localhost:8098
    [main] INFO
    org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver -
    Java ExpansionService started on localhost:8097
    [main] INFO
    org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver -
    JobService started on localhost:8099
    [grpc-default-executor-0] ERROR
    org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService
    - Encountered Unexpected Exception for Invocation
    job_28626ac7-5339-4694-84c5-7e85f3b51a0
    org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusException: NOT_FOUND
             at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Status.asException(Status.java:534)
             at
org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.getInvocation(InMemoryJobService.java:341)
             at
org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.getStateStream(InMemoryJobService.java:262)
             at
org.apache.beam.model.jobmanagement.v1.JobServiceGrpc$MethodHandlers.invoke(JobServiceGrpc.java:770)
             at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:171)
             at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
             at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
             at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
             at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
             at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:283)
             at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:707)
             at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
             at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
             at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
             at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
             at java.lang.Thread.run(Thread.java:748)
    [grpc-default-executor-0] INFO
    org.apache.beam.runners.flink.FlinkJobInvoker - Invoking job
    your-wordcount-job_de5850b5-e92f-4179-bcff-19169554aaef
    [grpc-default-executor-0] INFO
    org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation -
    Starting job invocation
    your-wordcount-job_de5850b5-e92f-4179-bcff-19169554aaef
    [flink-runner-job-invoker] INFO
    org.apache.beam.runners.flink.FlinkPipelineRunner - Translating
    pipeline to Flink program.
    [flink-runner-job-invoker] INFO
    org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating
    a Batch Execution Environment.
    [flink-runner-job-invoker] INFO
    org.apache.beam.runners.flink.FlinkExecutionEnvironments - Using
    Flink Master URL 10.10.64.126:36215 <http://10.10.64.126:36215>.
    [flink-runner-job-invoker] WARN
    org.apache.beam.runners.flink.FlinkExecutionEnvironments - No
    default parallelism could be found. Defaulting to parallelism 1.
    Please set an explicit parallelism with --parallelism
    [flink-runner-job-invoker] INFO
    org.apache.flink.api.java.ExecutionEnvironment - The job has 0
    registered types and 0 default Kryo serializers
    [flink-runner-job-invoker] WARN
    org.apache.flink.configuration.Configuration - Config uses
    deprecated configuration key 'jobmanager.rpc.address' instead of
    proper key 'rest.address'
    [flink-runner-job-invoker] INFO
    org.apache.flink.runtime.rest.RestClient - Rest client endpoint started.
    [flink-runner-job-invoker] INFO
    org.apache.flink.client.program.rest.RestClusterClient - Submitting
    job 2ee00e434677116a298351eb77cdfaa4 (detached: false).
    <============-> 98% EXECUTING [23m 3s]
     > IDLE
     > :beam-runners-flink-1.7-job-server:runShadow
     > IDLE
     > IDLE


On Sat, May 11, 2019 at 9:03 AM Averell Huyen Levan <lvhu...@gmail.com <mailto:lvhu...@gmail.com>> wrote:

    Hello Maximilian,

    Thanks for your help.
    The other part of my question was with running (Python) pipeline on
    Flink-cluster runner. I read that page
    https://beam.apache.org/documentation/runners/flink/ but felt
    confused. Will try one more time and then come back if I am still
    stuck with it.

    Again, thanks a lot for your help.

    Regards,
    Averell

    On Sat, 11 May 2019, 12:35 am Maximilian Michels, <m...@apache.org
    <mailto:m...@apache.org>> wrote:

        Hi Averell,

        What you want to do is possible today but at this point is an early
        experimental feature. The reason for that is that Kafka is a
        cross-language Java transform in a Python pipeline. We just
        recently
        enabled cross-language pipelines.

        1. First of all, until 2.13.0 is released you will have to use the
             latest master version, e.g.
             $ git clone https://github.com/apache/beam

        2. Setup and activate a Python virtual environment:
             $ virtualenv ~/beam_environemnt
             $ source ~/beam_environment/bin/activate

        4. Build the Python SDK:
             $ cd beam
             $ ./gradlew :beam-sdks-python:buildSnapshot
             $ cd sdks/python/build/
             $ unzip apache-beam-2.13.0.dev0.zip
             $ cd apache-beam-2.13.0.dev0
             $ python setup.py install

        3. Start the Flink JobServer / ExpansionServer
             $ ./gradlew :beam-runners-flink-1.7-job-server:runShadow

        5. Create your Python pipeline and use the ReadFromKafka
        transform, e.g.

             options = ["--runner=PortableRunner",
                        "--job_endpoint=localhost:8099"]
             p = Pipeline(options)
             (p
              |
              ReadFromKafka(consumer_config={'bootstrap.servers':
                                             'kafka_broker:port'},
                            topics=['myTopic'])
              |
              Map(lambda x,y: ...)
             p.run()

        6. Run your file with the python command :)

        Note: If you do not set key_deserializer or value_serializer for
        ReadFromKafka, you will receive the read data as KV[bytes,
        bytes]. That
        means you have to perform decoding inside Python. If you set a
        Kafka
        Deserializer, you can also receive the Kafka data already decoded.
        However, you are limited to the coders in ModelCoders. For
        example, Int,
        Long, KV, Iterable are supported; we also added String recently.

        Hope that makes sense. Curious to see how your experiments go.

        Cheers,
        Max

        PS: The best resources are in the doc comments of kafka.py:
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py
        or here:
        https://beam.apache.org/documentation/runners/flink/
        https://beam.apache.org/roadmap/portability/

        On 10.05.19 14:33, lvhu...@gmail.com <mailto:lvhu...@gmail.com>
        wrote:
         > Hi everyone,
         >
         > I am trying to get started with Python on Flink-cluster
        runner, to build a pipeline that reads data from Kafka and write
        to S3 in parquet format.
         > I tried to search on Beam website, but could not find any
        example (even for the basic word count). E.g, in this page
        https://beam.apache.org/get-started/wordcount-example/, in all
        Python - Flink-cluster sections, there's no content but "This
        runner is not yet available for the Python SDK."
         >
         > At this point in time, is that possible to create such a
        pipeline? From all the slides / videos, it seems feasible. But,
        could you please lead me to some step-by-step guide?
         >
         > Thanks and best regards,
         > Averell
         >

Reply via email to