Hi Junrui,

I think you understood correctly. What I'm seeing is that each vertex has a
single subtask, but multiple vertices are started in parallel in different
slots. That is not a problem in my case, I _want_ to parallelize the work,
it's just that this mechanism is very different from streaming jobs, where
the total number of slots in the cluster must equal the maximum vertex
parallelism---in other words, a streaming job won't use free slots
regardless of how many vertices there are, whereas a batch job needs one
slot per subtask in order to parallelize. I was not aware that batch jobs
interact with task manager slots like this.

The other thing I mentioned is what really remains as a problem: even
though the different vertices do start in parallel, some of them finish in
seconds and others in more than a minute, even though all of them do the
exact same transformation, just with different operator IDs. I'm using
Flink 1.18.1 btw.

Regards,
Alexis.

Am Sa., 6. Juli 2024 um 12:09 Uhr schrieb Junrui Lee <jrlee....@gmail.com>:

> Hi Alexis,
>
> Could you clarify what you mean by "If I add more slots to the task
> manager, I see the transformations actually start in parallel even though I
> submit the job with 'flink run -p 1'"?
> Are you asking if multiple slots are working simultaneously, or if a
> single JobVertex contains multiple subtasks?
>
> In fact, the number of slots and parallelism are not the same concept.
> And Flink Batch jobs can run even with only a single slot, and when more
> slots become available, Flink will schedule and deploy more parallelizable
> tasks (unless their upstream tasks have not finished). If you want only one
> slot to be active at a time, you can limit the resources of the cluster —
> for instance, by setting "slotmanager.number-of-slots.max" to 1.
>
> If you intend for each JobVertex to have a parallelism of 1, and you find
> that this isn't being enforced when using the "flink run -p 1" command. In
> that case, it would be helpful to have more detailed information to assist
> with troubleshooting, including the version of Flink in use and the
> JobManager logs.
>
> Alexis Sarda-Espinosa <sarda.espin...@gmail.com> 于2024年7月6日周六 15:35写道:
>
>> Hi Junrui,
>>
>> Thanks for the confirmation. I tested some more and I'm seeing a strange
>> behavior.
>>
>> I'm currently testing a single source stream that is fed to 6 identical
>> transformations. The state processor api requires batch mode and, from what
>> I can tell, I must specify a parallelism of 1 in the job, otherwise it
>> freezes. However, if I add more slots to the task manager, I see the
>> transformations actually start in parallel even though I submit the job
>> with "flink run -p 1". Is this expected of batch mode?
>>
>> Additionally, regardless of how much memory I give to the task manager,
>> some transformations finish in around 6 seconds, and then the others need
>> more than 1 minute even though it's the same transformation, and each one
>> writes around 70MB in my local disk. The flame graph shows the slow
>> operators are just parked due to an ArrayBlockingQueue whose size is hard
>> coded as 16 in the Flink sources. Am I missing something crucial for tuning
>> such jobs?
>>
>> Regards,
>> Alexis.
>>
>> On Sat, 6 Jul 2024, 03:29 Junrui Lee, <jrlee....@gmail.com> wrote:
>>
>>> Hi Alexis,
>>>
>>> For the SavepointWriter, I've briefly looked over the code and the write
>>> operation is enforced as non-parallel.
>>>
>>> Best,
>>> Junrui
>>>
>>> Alexis Sarda-Espinosa <sarda.espin...@gmail.com> 于2024年7月6日周六 01:27写道:
>>>
>>>> Hi Gabor,
>>>>
>>>> Thanks for the quick response. What about SavepointWriter? In my case
>>>> I'm actually writing a job that will read from an existing savepoint and
>>>> modify some of its data to write a new one.
>>>>
>>>> Regards,
>>>> Alexis.
>>>>
>>>> Am Fr., 5. Juli 2024 um 17:37 Uhr schrieb Gabor Somogyi <
>>>> gabor.g.somo...@gmail.com>:
>>>>
>>>>> Hi Alexis,
>>>>>
>>>>> It depends. When one uses SavepointLoader to read metadata only then
>>>>> it's non-parallel.
>>>>> SavepointReader however is basically a normal batch job with all its
>>>>> features.
>>>>>
>>>>> G
>>>>>
>>>>>
>>>>> On Fri, Jul 5, 2024 at 5:21 PM Alexis Sarda-Espinosa <
>>>>> sarda.espin...@gmail.com> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> Really quick question, when using the state processor API, are all
>>>>>> transformations performed in a non-parallel fashion?
>>>>>>
>>>>>> Regards,
>>>>>> Alexis.
>>>>>>
>>>>>>

Reply via email to