Hi Jan,

This is a batch job so no windows. It is basically a job launched by a TFX
component, so I don't have control over Beam code being executed.
I conclude that the job is stuck, since the number of bytes and processed
rows do not move for a long time on a specific task and subtask (always the
same one).

Thanks,
Gorjan


On Thu, Jun 2, 2022 at 4:45 PM Jan Lukavský <je...@seznam.cz> wrote:

> -user@flink <http://user@flink.apache.org> as this looks like purely beam
> issue
>
> Could you please elaborate more about what "stuck" means? Does the
> watermark stop progressing? Does that happen at any specific instant (e.g.
> end of window or end of window + allowed lateness)?
> On 6/1/22 15:43, Gorjan Todorovski wrote:
>
> Hi Jan,
>
> I have not checked the harness log. I have now checked it *Apache Beam
> worker log) and found this, but currently not sure what it means:
>
> 2022/06/01 13:34:40 Python exited: <nil>
> 2022/06/01 13:34:41 Python exited: <nil>
> Exception in thread read_grpc_client_inputs:
> Traceback (most recent call last):
>   File "/usr/local/lib/python3.7/threading.py", line 926, in
> _bootstrap_inner
>     self.run()
>   File "/usr/local/lib/python3.7/threading.py", line 870, in run
>     self._target(*self._args, **self._kwargs)
>   File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py",
> line 587, in <lambda>
>     target=lambda: self._read_inputs(elements_iterator),
>   File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py",
> line 570, in _read_inputs
>     for elements in elements_iterator:
>   File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line
> 416, in __next__
>     return self._next()
>   File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line
> 803, in _next
>     raise self
> grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC
> that terminated with:
> status = StatusCode.CANCELLED
> details = "Multiplexer hanging up"
> debug_error_string =
> "{"created":"@1654090485.252525992","description":"Error received from peer
> ipv4:127.0.0.1:44439","file":"src/core/lib/surface/call.cc","file_line":1062,"grpc_message":"Multiplexer
> hanging up","grpc_status":1}"
> >
>
> 2022/06/01 13:34:45 Python exited: <nil>
> 2022/06/01 13:34:46 Python exited: <nil>
> 2022/06/01 13:34:46 Python exited: <nil>
> 2022/06/01 13:34:47 Python exited: <nil>
> Starting worker with command ['/opt/apache/beam/boot', '--id=3-1',
> '--logging_endpoint=localhost:44267',
> '--artifact_endpoint=localhost:36413',
> '--provision_endpoint=localhost:42179',
> '--control_endpoint=localhost:38825']
> Starting worker with command ['/opt/apache/beam/boot', '--id=3-3',
> '--logging_endpoint=localhost:38683',
> '--artifact_endpoint=localhost:44867',
> '--provision_endpoint=localhost:34833',
> '--control_endpoint=localhost:44351']
> Starting worker with command ['/opt/apache/beam/boot', '--id=3-2',
> '--logging_endpoint=localhost:35391',
> '--artifact_endpoint=localhost:46571',
> '--provision_endpoint=localhost:44073',
> '--control_endpoint=localhost:44133']
> Starting work...
>
> On Wed, Jun 1, 2022 at 11:21 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi Gorjan,
>>
>> +user@beam <u...@beam.apache.org>
>>
>> The trace you posted is just waiting for a bundle to finish in the SDK
>> harness. I would suspect there is a problem in the logs of the harness. Did
>> you look for possible errors there?
>>
>>  Jan
>> On 5/31/22 13:54, Gorjan Todorovski wrote:
>>
>> Hi,
>>
>> I am running a TensorFlow Extended (TFX) pipeline which uses Apache Beam
>> for data processing which in turn has a Flink Runner (Basically a batch job
>> on a Flink Session Cluster on Kubernetes) version 1.13.6, but the job (for
>> gathering stats) gets stuck.
>>
>> There is nothing significant in the Job Manager or Task Manager logs. The
>> only thing that possibly might tell why the task is stuck seems to be a
>> thread dump:
>>
>> "MapPartition (MapPartition at [14]{TFXIORead[train],
>> GenerateStatistics[train]}) (1/32)#0" Id=188 WAITING on
>> java.util.concurrent.CompletableFuture$Signaller@6f078632
>>     at sun.misc.Unsafe.park(Native Method)
>>     - waiting on java.util.concurrent.CompletableFuture$Signaller@
>> 6f078632
>>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>>     at java.util.concurrent.CompletableFuture$Signaller.block(
>> CompletableFuture.java:1707)
>>     at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:
>> 3323)
>>     at java.util.concurrent.CompletableFuture.waitingGet(
>> CompletableFuture.java:1742)
>>     at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:
>> 1908)
>>     at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
>>     at org.apache.beam.runners.fnexecution.control.
>> SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient
>> .java:504)
>>     ...
>> I use 32 parallel degrees. Task managers are set, so each TM runs in one
>> container with 1 CPU and a total process memory set to 20 GB. Each TM
>> runs 1 tasks slot.
>> This is failing with ~100 files with a total size of about 100 GB. If I
>> run the pipeline with a smaller number of files to process, it runs ok.
>> I need Flink to be able to process different amounts of data as it is
>> able to scale by automatically adding pods depending on the parallel degree
>> setting for the specific job (I set the parallel degree to the max(number
>> of files,32))
>> Thanks,
>> Gorjan
>>
>>

Reply via email to