Hey Robbe,
The issue with a higher parallelism is likely due to the single Python
process which processes the data.
You may want to use the `sdk_worker_parallelism` pipeline option which
brings up multiple worker Python workers.
Best,
Max
On 30.04.20 23:56, Robbe Sneyders wrote:
> Yes, the task manager has one task slot per CPU core available, and the
> dashboard shows that the work is parallelized across multiple subtasks.
>
> However when using parallelism, the pipeline stalls, the Task Manager
> starts throwing 'Output channel stalled' warnings, and high back
> pressure is created at the Partition step as is shown in the tables below.
>
> The Task Manager should have more than enough memory.
> JVM Heap Size: 30.0 GB
> Flink Managed Memory: 21.0 GB
>
> Any idea what could cause this and how I could resolve it?
>
> Parallelism = 1:
> Name Status Bytes Received Records Received Bytes Sent Records
> Sent
> Parallelism Start Time Duration End Time Tasks
> CHAIN MapPartition (MapPartition at [1]Read
> input/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn)/ProcessSizedElementsAndRestrictions0)
> -> FlatMap (FlatMap at ExtractOutput[0]) RUNNING 43.0 KB 52 831 MB
> 33060 1 43951.97782 3m 2s - 1
> Partition RUNNING 831 MB 33059 831 MB 33059 1 43951.97788
> 2m 58s - 1
> CHAIN MapPartition (MapPartition at [4]{Discard array, Load json,
> Process element, Dump json}) -> FlatMap (FlatMap at ExtractOutput[0])
> RUNNING 831 MB 33057 641 MB 32439 1 43951.97788 2m 58s
> - 1
> CHAIN MapPartition (MapPartition at [3]Write
> output/Write/WriteImpl/{WriteBundles, Pair, WindowInto(WindowIntoFn)})
> -> FlatMap (FlatMap at ExtractOutput[0]) -> Map (Key Extractor) ->
> GroupCombine (GroupCombine at GroupCombine: Write
> output/Write/WriteImpl/GroupByKey) -> Map (Key Extractor) RUNNING 641
> MB 32438 0 B 0 1 43951.97787 2m 58s - 1
>
>
> Parallelism = 10:
> Name Status Bytes Received Records Received Bytes Sent Records
> Sent
> Parallelism Start Time Duration End Time Tasks
> CHAIN MapPartition (MapPartition at [1]Read
> input/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn)/ProcessSizedElementsAndRestrictions0)
> -> FlatMap (FlatMap at ExtractOutput[0]) RUNNING 43.1 KB 52 493 MB
> 19625 10 43951.9834 7m 15s - 19
> Partition RUNNING 486 MB 19363 486 MB 19363 10 43951.9834
> 7m 14s - 10
> CHAIN MapPartition (MapPartition at [4]{Discard array, Load json,
> Process element, Dump json}) -> FlatMap (FlatMap at ExtractOutput[0])
> RUNNING 477 MB 18987 0 B 0 10 43951.98341 7m 14s
> - 10
> CHAIN MapPartition (MapPartition at [3]Write
> output/Write/WriteImpl/{WriteBundles, Pair, WindowInto(WindowIntoFn)})
> -> FlatMap (FlatMap at ExtractOutput[0]) -> Map (Key Extractor) ->
> GroupCombine (GroupCombine at GroupCombine: Write
> output/Write/WriteImpl/GroupByKey) -> Map (Key Extractor) RUNNING
> 1.16 KB 0 0 B 0 10 43951.9834 7m 14s -
> 10
>
>
> https://ml6.eu <https://ml6.eu>
>
>
>
>
> Robbe Sneyders
>
> ML6 Gent
> <https://www.google.be/maps/place/ML6/@51.037408,3.7044893,17z/data=!3m1!4b1!4m5!3m4!1s0x47c37161feeca14b:0xb8f72585fdd21c90!8m2!3d51.037408!4d3.706678?hl=nl>
>
>
> M: +32 474 71 31 08
>
>
>
> On Thu, 30 Apr 2020 at 22:35, Kyle Weaver <[email protected]
> <mailto:[email protected]>> wrote:
>
> If you are using only a single task manager but want to get
> parallelism > 1, you will need to
> increase taskmanager.numberOfTaskSlots in
> your flink-conf.yaml.
> https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html#scheduling
>
> On Thu, Apr 30, 2020 at 8:19 AM Robbe Sneyders
> <[email protected] <mailto:[email protected]>> wrote:
>
> Hi Kyle,
>
> Thanks for the quick response.
> The problem was that the pipeline could not access the input
> file. The Task Manager errors seem unrelated indeed.
>
> I'm now able to run the pipeline completely, but I'm running
> into problems when using parallelism.
> The pipeline can be summarized as:
> read file -> shuffle -> process -> write files
>
> When using parallelism > 1, the pipeline stalls and the Task
> Manager outputs following warnings:
> flink-taskmanager_1 | 2020-04-30 09:24:46,272 INFO
> org.apache.beam.sdk.fn.stream.DirectStreamObserver - Output
> channel stalled for 255s, outbound thread CHAIN MapPartition
> (MapPartition at [4]{Discard array, Load json, Process element,
> Dump json}) -> FlatMap (FlatMap at ExtractOutput[0]) (7/10).
> See: https://issues.apache.org/jira/browse/BEAM-4280 for the
> history for this issue.
>
> The referenced issue [1] doesn't contain a lot of information
> and is resolved. There is a Flink issue [2] that seems related,
> although I'm not seeing the reported stacktrace. I guess this
> problem occurs since I'm reading and writing to the same disc in
> parallel.
>
> Increasing the Task Manager memory seems to resolve the issue
> partially. I'm still getting the stalled channel warnings, but
> the pipeline does proceed step-wise but slowly.
>
> Using BATCH_FORCED execution mode removes the warnings, but
> still runs a lot slower than running with parallelism=1.
>
> The pipeline shouldn't be I/O bounded, so I guess I should still
> be able to get some benefit out of running tasks in parallel?
>
> 1. https://issues.apache.org/jira/browse/BEAM-4280
> 2.
> https://issues.apache.org/jira/browse/FLINK-10672?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=16970692#comment-16970692
>
> Kind regards,
> Robbe
>
> https://ml6.eu <https://ml6.eu>
>
>
>
>
> Robbe Sneyders
>
> ML6 Gent
>
> <https://www.google.be/maps/place/ML6/@51.037408,3.7044893,17z/data=!3m1!4b1!4m5!3m4!1s0x47c37161feeca14b:0xb8f72585fdd21c90!8m2!3d51.037408!4d3.706678?hl=nl>
>
>
> M: +32 474 71 31 08 <tel:+32%20474%2071%2031%2008>
>
>
>
> On Wed, 29 Apr 2020 at 19:28, Kyle Weaver <[email protected]
> <mailto:[email protected]>> wrote:
>
> > This seems to have worked, as the output file is created
> on the host system. However the pipeline silently fails, and
> the output file remains empty.
>
> Have you checked the SDK container logs? They are most
> likely to contain relevant failure information.
>
> > I don't know if this is a result of me rebuilding the Job
> Server, or caused by another issue.
>
> Looks like there is an old but unresolved bug with the same
> error: https://issues.apache.org/jira/browse/BEAM-5397
>
> On Wed, Apr 29, 2020 at 11:24 AM Robbe Sneyders
> <[email protected] <mailto:[email protected]>> wrote:
>
> Hi all,
>
> We're working on a project where we're limited to one
> big development machine for now. We want to start
> developing data processing pipelines in Python, which
> should eventually be ported to a currently unknown setup
> on a separate cluster or cloud, so we went with Beam for
> its portability.
>
> For the development setup, we wanted to have the least
> amount of overhead possible, so we deployed a one node
> flink cluster with docker-compose. The whole setup is
> defined by the following docker-compose.yml:
>
> ```
> version: "2.1"
> services:
> flink-jobmanager:
> image: flink:1.9
> network_mode: host
> command: jobmanager
> environment:
> - JOB_MANAGER_RPC_ADDRESS=localhost
>
> flink-taskmanager:
> image: flink:1.9
> network_mode: host
> depends_on:
> - flink-jobmanager
> command: taskmanager
> environment:
> - JOB_MANAGER_RPC_ADDRESS=localhost
> volumes:
> - staging-dir:/tmp/beam-artifact-staging
> - /usr/bin/docker:/usr/bin/docker
> - /var/run/docker.sock:/var/run/docker.sock
> user: flink:${DOCKER_GID}
>
> beam-jobserver:
> image: apache/beam_flink1.9_job_server:2.20.0
> network_mode: host
> command: --flink-master=localhost:8081
> volumes:
> - staging-dir:/tmp/beam-artifact-staging
>
> volumes:
> staging-dir:
> ```
>
> We can submit and run pipelines with the following options:
> ```
> 'runner': 'PortableRunner',
> 'job_endpoint': 'localhost:8099',
> ```
> The environment type for the SDK Harness is configured
> to the default 'docker'.
>
> However, we cannot write output files to the host
> system. To fix this, I tried to mount a host directory
> to the Beam SDK Container (I had to rebuild the Beam Job
> Server jar and image to do this). This seems to have
> worked, as the output file is created on the host
> system. However the pipeline silently fails, and the
> output file remains empty. Running the pipeline with
> DirectRunner confirms that the pipeline is working.
>
> Looking at the output logs, the following error is
> thrown in the Flink Task Manager:
> flink-taskmanager_1 | java.lang.NoClassDefFoundError:
>
> org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1
> I don't know if this is a result of me rebuilding the
> Job Server, or caused by another issue.
>
> We currently do not have a distributed file system
> available. Is there any way to make writing to the host
> system possible?
>
> Kind regards,
> Robbe
>
> https://ml6.eu <https://ml6.eu>
>
>
>
>
> Robbe Sneyders
>
> ML6 Gent
>
> <https://www.google.be/maps/place/ML6/@51.037408,3.7044893,17z/data=!3m1!4b1!4m5!3m4!1s0x47c37161feeca14b:0xb8f72585fdd21c90!8m2!3d51.037408!4d3.706678?hl=nl>
>
>
> M: +32 474 71 31 08 <tel:+32%20474%2071%2031%2008>
>