Hey Robbe, The issue with a higher parallelism is likely due to the single Python process which processes the data.
You may want to use the `sdk_worker_parallelism` pipeline option which brings up multiple worker Python workers. Best, Max On 30.04.20 23:56, Robbe Sneyders wrote: > Yes, the task manager has one task slot per CPU core available, and the > dashboard shows that the work is parallelized across multiple subtasks. > > However when using parallelism, the pipeline stalls, the Task Manager > starts throwing 'Output channel stalled' warnings, and high back > pressure is created at the Partition step as is shown in the tables below. > > The Task Manager should have more than enough memory. > JVM Heap Size: 30.0 GB > Flink Managed Memory: 21.0 GB > > Any idea what could cause this and how I could resolve it? > > Parallelism = 1: > Name Status Bytes Received Records Received Bytes Sent Records > Sent > Parallelism Start Time Duration End Time Tasks > CHAIN MapPartition (MapPartition at [1]Read > input/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn)/ProcessSizedElementsAndRestrictions0) > -> FlatMap (FlatMap at ExtractOutput[0]) RUNNING 43.0 KB 52 831 MB > 33060 1 43951.97782 3m 2s - 1 > Partition RUNNING 831 MB 33059 831 MB 33059 1 43951.97788 > 2m 58s - 1 > CHAIN MapPartition (MapPartition at [4]{Discard array, Load json, > Process element, Dump json}) -> FlatMap (FlatMap at ExtractOutput[0]) > RUNNING 831 MB 33057 641 MB 32439 1 43951.97788 2m 58s > - 1 > CHAIN MapPartition (MapPartition at [3]Write > output/Write/WriteImpl/{WriteBundles, Pair, WindowInto(WindowIntoFn)}) > -> FlatMap (FlatMap at ExtractOutput[0]) -> Map (Key Extractor) -> > GroupCombine (GroupCombine at GroupCombine: Write > output/Write/WriteImpl/GroupByKey) -> Map (Key Extractor) RUNNING 641 > MB 32438 0 B 0 1 43951.97787 2m 58s - 1 > > > Parallelism = 10: > Name Status Bytes Received Records Received Bytes Sent Records > Sent > Parallelism Start Time Duration End Time Tasks > CHAIN MapPartition (MapPartition at [1]Read > input/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn)/ProcessSizedElementsAndRestrictions0) > -> FlatMap (FlatMap at ExtractOutput[0]) RUNNING 43.1 KB 52 493 MB > 19625 10 43951.9834 7m 15s - 19 > Partition RUNNING 486 MB 19363 486 MB 19363 10 43951.9834 > 7m 14s - 10 > CHAIN MapPartition (MapPartition at [4]{Discard array, Load json, > Process element, Dump json}) -> FlatMap (FlatMap at ExtractOutput[0]) > RUNNING 477 MB 18987 0 B 0 10 43951.98341 7m 14s > - 10 > CHAIN MapPartition (MapPartition at [3]Write > output/Write/WriteImpl/{WriteBundles, Pair, WindowInto(WindowIntoFn)}) > -> FlatMap (FlatMap at ExtractOutput[0]) -> Map (Key Extractor) -> > GroupCombine (GroupCombine at GroupCombine: Write > output/Write/WriteImpl/GroupByKey) -> Map (Key Extractor) RUNNING > 1.16 KB 0 0 B 0 10 43951.9834 7m 14s - > 10 > > > https://ml6.eu <https://ml6.eu> > > > > > Robbe Sneyders > > ML6 Gent > <https://www.google.be/maps/place/ML6/@51.037408,3.7044893,17z/data=!3m1!4b1!4m5!3m4!1s0x47c37161feeca14b:0xb8f72585fdd21c90!8m2!3d51.037408!4d3.706678?hl=nl> > > > M: +32 474 71 31 08 > > > > On Thu, 30 Apr 2020 at 22:35, Kyle Weaver <kcwea...@google.com > <mailto:kcwea...@google.com>> wrote: > > If you are using only a single task manager but want to get > parallelism > 1, you will need to > increase taskmanager.numberOfTaskSlots in > your flink-conf.yaml. > https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html#scheduling > > On Thu, Apr 30, 2020 at 8:19 AM Robbe Sneyders > <robbe.sneyd...@ml6.eu <mailto:robbe.sneyd...@ml6.eu>> wrote: > > Hi Kyle, > > Thanks for the quick response. > The problem was that the pipeline could not access the input > file. The Task Manager errors seem unrelated indeed. > > I'm now able to run the pipeline completely, but I'm running > into problems when using parallelism. > The pipeline can be summarized as: > read file -> shuffle -> process -> write files > > When using parallelism > 1, the pipeline stalls and the Task > Manager outputs following warnings: > flink-taskmanager_1 | 2020-04-30 09:24:46,272 INFO > org.apache.beam.sdk.fn.stream.DirectStreamObserver - Output > channel stalled for 255s, outbound thread CHAIN MapPartition > (MapPartition at [4]{Discard array, Load json, Process element, > Dump json}) -> FlatMap (FlatMap at ExtractOutput[0]) (7/10). > See: https://issues.apache.org/jira/browse/BEAM-4280 for the > history for this issue. > > The referenced issue [1] doesn't contain a lot of information > and is resolved. There is a Flink issue [2] that seems related, > although I'm not seeing the reported stacktrace. I guess this > problem occurs since I'm reading and writing to the same disc in > parallel. > > Increasing the Task Manager memory seems to resolve the issue > partially. I'm still getting the stalled channel warnings, but > the pipeline does proceed step-wise but slowly. > > Using BATCH_FORCED execution mode removes the warnings, but > still runs a lot slower than running with parallelism=1. > > The pipeline shouldn't be I/O bounded, so I guess I should still > be able to get some benefit out of running tasks in parallel? > > 1. https://issues.apache.org/jira/browse/BEAM-4280 > 2. > https://issues.apache.org/jira/browse/FLINK-10672?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=16970692#comment-16970692 > > Kind regards, > Robbe > > https://ml6.eu <https://ml6.eu> > > > > > Robbe Sneyders > > ML6 Gent > > <https://www.google.be/maps/place/ML6/@51.037408,3.7044893,17z/data=!3m1!4b1!4m5!3m4!1s0x47c37161feeca14b:0xb8f72585fdd21c90!8m2!3d51.037408!4d3.706678?hl=nl> > > > M: +32 474 71 31 08 <tel:+32%20474%2071%2031%2008> > > > > On Wed, 29 Apr 2020 at 19:28, Kyle Weaver <kcwea...@google.com > <mailto:kcwea...@google.com>> wrote: > > > This seems to have worked, as the output file is created > on the host system. However the pipeline silently fails, and > the output file remains empty. > > Have you checked the SDK container logs? They are most > likely to contain relevant failure information. > > > I don't know if this is a result of me rebuilding the Job > Server, or caused by another issue. > > Looks like there is an old but unresolved bug with the same > error: https://issues.apache.org/jira/browse/BEAM-5397 > > On Wed, Apr 29, 2020 at 11:24 AM Robbe Sneyders > <robbe.sneyd...@ml6.eu <mailto:robbe.sneyd...@ml6.eu>> wrote: > > Hi all, > > We're working on a project where we're limited to one > big development machine for now. We want to start > developing data processing pipelines in Python, which > should eventually be ported to a currently unknown setup > on a separate cluster or cloud, so we went with Beam for > its portability. > > For the development setup, we wanted to have the least > amount of overhead possible, so we deployed a one node > flink cluster with docker-compose. The whole setup is > defined by the following docker-compose.yml: > > ``` > version: "2.1" > services: > flink-jobmanager: > image: flink:1.9 > network_mode: host > command: jobmanager > environment: > - JOB_MANAGER_RPC_ADDRESS=localhost > > flink-taskmanager: > image: flink:1.9 > network_mode: host > depends_on: > - flink-jobmanager > command: taskmanager > environment: > - JOB_MANAGER_RPC_ADDRESS=localhost > volumes: > - staging-dir:/tmp/beam-artifact-staging > - /usr/bin/docker:/usr/bin/docker > - /var/run/docker.sock:/var/run/docker.sock > user: flink:${DOCKER_GID} > > beam-jobserver: > image: apache/beam_flink1.9_job_server:2.20.0 > network_mode: host > command: --flink-master=localhost:8081 > volumes: > - staging-dir:/tmp/beam-artifact-staging > > volumes: > staging-dir: > ``` > > We can submit and run pipelines with the following options: > ``` > 'runner': 'PortableRunner', > 'job_endpoint': 'localhost:8099', > ``` > The environment type for the SDK Harness is configured > to the default 'docker'. > > However, we cannot write output files to the host > system. To fix this, I tried to mount a host directory > to the Beam SDK Container (I had to rebuild the Beam Job > Server jar and image to do this). This seems to have > worked, as the output file is created on the host > system. However the pipeline silently fails, and the > output file remains empty. Running the pipeline with > DirectRunner confirms that the pipeline is working. > > Looking at the output logs, the following error is > thrown in the Flink Task Manager: > flink-taskmanager_1 | java.lang.NoClassDefFoundError: > > org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1 > I don't know if this is a result of me rebuilding the > Job Server, or caused by another issue. > > We currently do not have a distributed file system > available. Is there any way to make writing to the host > system possible? > > Kind regards, > Robbe > > https://ml6.eu <https://ml6.eu> > > > > > Robbe Sneyders > > ML6 Gent > > <https://www.google.be/maps/place/ML6/@51.037408,3.7044893,17z/data=!3m1!4b1!4m5!3m4!1s0x47c37161feeca14b:0xb8f72585fdd21c90!8m2!3d51.037408!4d3.706678?hl=nl> > > > M: +32 474 71 31 08 <tel:+32%20474%2071%2031%2008> >