Re: Set parallelism for each operator

2020-05-11 Thread Maximilian Michels
Beam and its Flink Runner do not allow setting the parallelism at the
operator level. The wish to configure per-operator came up numerous
times over the years. I'm not opposed to allowing  for special cases,
e.g. via a pipeline option.

It doesn't look like it is necessary for the use case discussed here.

-Max

On 06.05.20 18:27, Alexey Romanenko wrote:
> One of the option when reading from topics with a small number of
> partitions could be to do a Reshuffle right after read transform to
> parallelize better other pipeline steps.
> 
> We had a discussion in this Jira about that a while ago:
> https://issues.apache.org/jira/browse/BEAM-8121
> 
>> On 30 Apr 2020, at 03:56, Eleanore Jin > > wrote:
>>
>> Thanks all for the information! 
>>
>> Eleanore 
>>
>> On Wed, Apr 29, 2020 at 6:36 PM Ankur Goenka > > wrote:
>>
>> Beam does support parallelism for the job which applies to all the
>> transforms in the job when executing on Flink using the
>> "--parallelism" flag.
>>
>> From the usecase you mentioned, Kafka read operations will be over
>> parallelised but it should be ok as they will only have a small
>> amount of memory impact in loading some state for kafka client etc.
>> Also flink can run multiple operations for the same Job in a
>> single task slot so having higher parallelism for lightweight
>> operations should not be a problem.
>>
>> On Wed, Apr 29, 2020 at 6:28 PM Luke Cwik > > wrote:
>>
>> Beam doesn't expose such a thing directly but the FlinkRunner
>> may be able to take some pipeline options to configure this.
>>
>> On Wed, Apr 29, 2020 at 5:51 PM Eleanore Jin
>> mailto:eleanore@gmail.com>> wrote:
>>
>> Hi Kyle, 
>>
>> I am using Flink Runner (v1.8.2)
>>
>> Thanks!
>> Eleanore
>>
>> On Wed, Apr 29, 2020 at 10:33 AM Kyle Weaver
>> mailto:kcwea...@google.com>> wrote:
>>
>> Which runner are you using?
>>
>> On Wed, Apr 29, 2020 at 1:32 PM Eleanore Jin
>> > > wrote:
>>
>> Hi all, 
>>
>> I just wonder can Beam allow to set
>> parallelism for each operator (PTransform)
>> separately? Flink provides such feature. 
>>
>> The usecase I have is the source is kafka topics,
>> which has less partitions, while we have heavy
>> PTransform and would like to scale it with more
>> parallelism. 
>>
>> Thanks a lot!
>> Eleanore
>>
> 


Re: GC overhead limit exceeded

2020-05-11 Thread Maximilian Michels
Generally, it is to be expected that the main input is buffered until
the side input is available. We really have no other option to correctly
process the data.

Have you tried using RocksDB as the state backend to prevent too much GC
churn?

-Max

On 07.05.20 06:27, Eleanore Jin wrote:
> Please see: https://issues.apache.org/jira/browse/BEAM-9914
> 
> Thanks a lot!
> Eleanore
> 
> On Wed, May 6, 2020 at 9:17 PM Ankur Goenka  > wrote:
> 
> Thanks for sharing the response. It makes sense to me.
> Please file a jira in Beam so that we can prioritize it.
> 
> Thanks,
> Ankur
> 
> On Wed, May 6, 2020 at 9:08 PM Eleanore Jin  > wrote:
> 
> Hi Ankur, 
> 
> Thanks for your response. 
> 
> I also checked with Flink Community, here is there response, in
> short, flink does not cache the main input data if there is no
> data available in side input  (flink broadcast stream)
> 
> - quote from flink community:
> 
> Coming back to your question, Flink's Broadcast stream does
> *not* block or collect events from the non-broadcasted side if
> the broadcast side doesn't serve events.
> However, the user-implemented operators (Beam or your code in
> this case) often puts non-broadcasted events into state to wait
> for input from the other side.
> Since the error is not about lack of memory, the buffering in
> Flink state might not be the problem here.
> 
> Thanks a lot for the help!
> Eleanore
> 
> On Wed, May 6, 2020 at 8:59 PM Ankur Goenka  > wrote:
> 
> The relevant code should bere
> here 
> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L595
>  
> 
> Given that the problem goes away after publishing Side input
> suggests that this might be a problem with synchronizing 2
> streams of data on Flink using Beam.
> 
> I am not sure if flink optimizer waits for site input to be
> available before processing the main input. We might
> potentially handle this on the Beam side as well or use a
> different set of flink apis to let us do better optimization
> if possible. In any case this would require a new sdk
> release if we decide to fix.
> 
> On Wed, May 6, 2020 at 7:54 PM Eleanore Jin
> mailto:eleanore@gmail.com>> wrote:
> 
> Hi Ankur, 
> 
> Thanks for the answer! Can you please point to me the
> source code where the buffering is? I would like to
> learn how beam works, thanks! 
> 
> To your question, in my case, side input does not have
> any data, meaning no one publishing to the side input
> topic. 
> 
> After publishing some data into the side input topic,
> the OOM goes away. 
> 
> Thanks! 
> Eleanore 
> 
> On Wed, May 6, 2020 at 6:37 PM Ankur Goenka
> mailto:goe...@google.com>> wrote:
> 
> Hi Eleanore, 
> 
> The operation requires buffering the data till the
> data from side input is not available. Which might
> be causing the OOM issue.
> You mention that OOM happens when there is no data
> in side input. Does it mean that the side input is
> not yet ready or does side input have no data at all?
> 
> Thanks,
> Ankur
> 
> On Tue, May 5, 2020 at 5:15 PM Pablo Estrada
> mailto:pabl...@google.com>> wrote:
> 
> +Ankur Goenka  by any
> chance do you know what could be causing this?
> 
> Thanks Eleanore for the detailed debugging : )
> 
> On Tue, May 5, 2020 at 9:34 AM Eleanore Jin
>  > wrote:
> 
> Hi Community, 
> 
> Just wonder does side input feature buffer
> the messages from main source if there is no
> data available from side input?
> 
> Thanks!
> Eleanore
> 
> On Sat, May 2, 2020 at 6:28 PM Eleanore Jin
>  > wrote:
> 
> After some more experience, I observed
> 

Re: Beam + Flink + Docker - Write to host system

2020-05-11 Thread Maximilian Michels
Hey Robbe,

The issue with a higher parallelism is likely due to the single Python
process which processes the data.

You may want to use the `sdk_worker_parallelism` pipeline option which
brings up multiple worker Python workers.

Best,
Max

On 30.04.20 23:56, Robbe Sneyders wrote:
> Yes, the task manager has one task slot per CPU core available, and the
> dashboard shows that the work is parallelized across multiple subtasks.
> 
> However when using parallelism, the pipeline stalls, the Task Manager
> starts throwing 'Output channel stalled' warnings, and high back
> pressure is created at the Partition step as is shown in the tables below.
> 
> The Task Manager should have more than enough memory.
> JVM Heap Size: 30.0 GB
> Flink Managed Memory: 21.0 GB
> 
> Any idea what could cause this and how I could resolve it?
> 
> Parallelism = 1:
> Name  Status  Bytes Received  Records ReceivedBytes Sent  Records 
> Sent
> Parallelism   Start Time  DurationEnd TimeTasks
> CHAIN MapPartition (MapPartition at [1]Read
> input/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn)/ProcessSizedElementsAndRestrictions0)
> -> FlatMap (FlatMap at ExtractOutput[0])  RUNNING 43.0 KB 52  831 MB
> 33060 1   43951.97782 3m 2s   -   1
> Partition RUNNING 831 MB  33059   831 MB  33059   1   43951.97788 
> 2m 58s  -   1
> CHAIN MapPartition (MapPartition at [4]{Discard array, Load json,
> Process element, Dump json}) -> FlatMap (FlatMap at ExtractOutput[0])
> RUNNING   831 MB  33057   641 MB  32439   1   43951.97788 2m 58s  
> -   1
> CHAIN MapPartition (MapPartition at [3]Write
> output/Write/WriteImpl/{WriteBundles, Pair, WindowInto(WindowIntoFn)})
> -> FlatMap (FlatMap at ExtractOutput[0]) -> Map (Key Extractor) ->
> GroupCombine (GroupCombine at GroupCombine: Write
> output/Write/WriteImpl/GroupByKey) -> Map (Key Extractor) RUNNING 641
> MB32438   0 B 0   1   43951.97787 2m 58s  -   1
> 
> 
> Parallelism = 10:
> Name  Status  Bytes Received  Records ReceivedBytes Sent  Records 
> Sent
> Parallelism   Start Time  DurationEnd TimeTasks
> CHAIN MapPartition (MapPartition at [1]Read
> input/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn)/ProcessSizedElementsAndRestrictions0)
> -> FlatMap (FlatMap at ExtractOutput[0])  RUNNING 43.1 KB 52  493 MB
> 19625 10  43951.9834  7m 15s  -   19
> Partition RUNNING 486 MB  19363   486 MB  19363   10  43951.9834  
> 7m 14s  -   10
> CHAIN MapPartition (MapPartition at [4]{Discard array, Load json,
> Process element, Dump json}) -> FlatMap (FlatMap at ExtractOutput[0])
> RUNNING   477 MB  18987   0 B 0   10  43951.98341 7m 14s  
> -   10
> CHAIN MapPartition (MapPartition at [3]Write
> output/Write/WriteImpl/{WriteBundles, Pair, WindowInto(WindowIntoFn)})
> -> FlatMap (FlatMap at ExtractOutput[0]) -> Map (Key Extractor) ->
> GroupCombine (GroupCombine at GroupCombine: Write
> output/Write/WriteImpl/GroupByKey) -> Map (Key Extractor) RUNNING
> 1.16 KB   0   0 B 0   10  43951.9834  7m 14s  -   
> 10
> 
> 
>  https://ml6.eu  
> 
>   
>   
> 
> Robbe Sneyders
> 
> ML6 Gent
> 
>  
> 
> M: +32 474 71 31 08
> 
> 
> 
> On Thu, 30 Apr 2020 at 22:35, Kyle Weaver  > wrote:
> 
> If you are using only a single task manager but want to get
> parallelism > 1, you will need to
> increase taskmanager.numberOfTaskSlots in
> your flink-conf.yaml. 
> https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html#scheduling
> 
> On Thu, Apr 30, 2020 at 8:19 AM Robbe Sneyders
> mailto:robbe.sneyd...@ml6.eu>> wrote:
> 
> Hi Kyle,
> 
> Thanks for the quick response.
> The problem was that the pipeline could not access the input
> file. The Task Manager errors seem unrelated indeed.
> 
> I'm now able to run the pipeline completely, but I'm running
> into problems when using parallelism. 
> The pipeline can be summarized as:
> read file -> shuffle -> process -> write files
> 
> When using parallelism > 1, the pipeline stalls and the Task
> Manager outputs following warnings:
> flink-taskmanager_1  | 2020-04-30 09:24:46,272 INFO
>  org.apache.beam.sdk.fn.stream.DirectStreamObserver - Output
> channel stalled for 255s, outbound thread CHAIN MapPartition
> (MapPartition at [4]{Discard array, Load json, Process element,
> Dump json}) -> FlatMap (FlatMap at ExtractOutput[0]) (7/10).
> See: https://issues.apache.org/jira/browse/BEAM-4280 for the
> history for this issue.
> 
> The referen

Re: Running NexMark Tests

2020-05-11 Thread Sruthi Sree Kumar
I have opened a PR with the documentation change.
https://github.com/apache/beam/pull/11662

Regards,
Sruthi

On 2020/04/21 20:22:17, Ismaël Mejía  wrote: 
> You need to instruct the Flink runner to shutdown the the source
> otherwise it will stay waiting.
> You can this by adding the extra
> argument`--shutdownSourcesOnFinalWatermark=true`
> And if that works and you want to open a PR to update our
> documentation that would be greatly appreciated.
> 
> Regards,
> Ismaël
> 
> 
> On Tue, Apr 21, 2020 at 10:04 PM Sruthi Sree Kumar
>  wrote:
> >
> > Hello,
> >
> > I am trying to run nexmark queries using flink runner streaming. Followed 
> > the documentation and used the command
> > ./gradlew :sdks:java:testing:nexmark:run \
> >
> > -Pnexmark.runner=":runners:flink:1.10" \
> > -Pnexmark.args="
> > --runner=FlinkRunner
> > --suite=SMOKE
> > --streamTimeout=60
> > --streaming=true
> > --manageResources=false
> > --monitorJobs=true
> > --flinkMaster=[local]"
> >
> >
> > But after the events are read from the source, there is no further progress 
> > and the job is always stuck at 99%. Is there any configuration that I am 
> > missing?
> >
> > Regards,
> > Sruthi
> 


Re: KafkaIO BackLog Elements Metrics

2020-05-11 Thread Luke Cwik
I think you are looking for https://issues.apache.org/jira/browse/BEAM-3310

On Sat, May 9, 2020 at 6:23 PM Talat Uyarer 
wrote:

> Hi,
>
> I want to get Kafka's backlog metrics. In apache beam code I saw beam is
> collecting that metrics in here[1] as Source Metrics. However I can not see
> those metrics on Dataflow's metrics explorer. Do you know is there
> anyway to get those metrics ?
>
> Also I saw there is MetricsSink. But based on beam documentation it is not
> supported by Dataflow. Is there any ticket to give support MetrcisSink
> support to Dataflow Runner ?
>
> Thanks
>
> [1]
> https://github.com/apache/beam/blob/d309c1b7c39ba78aa0cbd5cad9cc7a256e3caa9f/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L673
>
>


resources management at worker machine or how to debug hanging execution on worker machine

2020-05-11 Thread OrielResearch Eila Arich-Landkof
Hi all,

I am trying to run the Kallisto package command on the apache beam worker.
Below is a table that describes my steps on the apache beam pipeline code
and local compute Debian machine (new machine). I used both of them
for debug and comparison.
On a local machine, the execution completes with no issues. On apache beam,
I am having issues with no error. Very challenging to debug.

The only issue that I am familiar with the Kallisto package is when there
is not enough disk for the input and the output. I have added the resources
commands on the local and remote machine. Please let me know if there is
another way to manage the resources.

Thank you,
Eila


task

Local

Apache worker

resources

n1-standard-8 (8 vCPUs, 30 GB memory)

60 GB persistent disk

GoogleCloudOptions.disk_size_gb = 60

GoogleCloudOptions.worker_machine_type = 'n1-standard-4'

anaconda

A created base environment with Kallisto package

Created base environment with kallisto package

command

from subprocess import Popen, PIPE, STDOUT

import logging

script = "/home/eila_orielresearch_org/etc/profile.d/conda.sh"

cmd1 = ". {}; env".format(script)

cmd2 = "echo finished kallisto"

cmd3 = "echo before init"

cmd4 = "conda init --all"

cmd5 = "conda activate"

cmd6 = "kallisto quant -t 2 -i release-99_transcripts.idx --single -l 200
-s 20 -o srr SRR2144345.fastq"

cmd7 = "conda deactivate"

final = Popen("{}; {}; {}; {}; {}; {};
{}".format(cmd1,cmd2,cmd3,cmd4,cmd5,cmd6,cmd7), shell=True,
stdin=PIPE,stdout=PIPE, stderr=STDOUT, close_fds=True)

stdout, nothing = final.communicate()

stdout

from subprocess import Popen, PIPE, STDOUT

import logging

script = "/opt/userowned/etc/profile.d/conda.sh"

cmd1 = ". {}; env".format(script)

cmd2 = "echo finished kallisto"

cmd3 = "echo before init"

cmd4 = "conda init --all"

cmd5 = "conda activate"

cmd6 = "kallisto quant -t 2 -i release-99_transcripts.idx --single -l 200
-s 20 -o srr SRR2144345.fastq"

cmd7 = "conda deactivate"

final = Popen("{}; {}; {}; {}; {}; {};
{}".format(cmd1,cmd2,cmd3,cmd4,cmd5,cmd6,cmd7), shell=True,
stdin=PIPE,stdout=PIPE, stderr=STDOUT, close_fds=True)

stdout, nothing = final.communicate()

stdout

output

eila_orielresearch_org@instance-1:~/srr$ ls -lt

total 8548

-rw-r--r-- 1 eila_orielresearch_org eila_orielresearch_org 2174869 May 11
16:19 abundance.h5

-rw-r--r-- 1 eila_orielresearch_org eila_orielresearch_org 6570911 May 11
16:19 abundance.tsv

-rw-r--r-- 1 eila_orielresearch_org eila_orielresearch_org 371 May 11
16:19 run_info.json

No output.

hanging on the yellow command. no error. restarting DoFn execution


-- 
Eila

Meetup 


Re: KafkaIO BackLog Elements Metrics

2020-05-11 Thread Reuven Lax
I think this is also because gauge metrics are not well supported.

On Mon, May 11, 2020 at 9:03 AM Luke Cwik  wrote:

> I think you are looking for
> https://issues.apache.org/jira/browse/BEAM-3310
>
> On Sat, May 9, 2020 at 6:23 PM Talat Uyarer 
> wrote:
>
>> Hi,
>>
>> I want to get Kafka's backlog metrics. In apache beam code I saw beam is
>> collecting that metrics in here[1] as Source Metrics. However I can not see
>> those metrics on Dataflow's metrics explorer. Do you know is there
>> anyway to get those metrics ?
>>
>> Also I saw there is MetricsSink. But based on beam documentation it is
>> not supported by Dataflow. Is there any ticket to give support MetrcisSink
>> support to Dataflow Runner ?
>>
>> Thanks
>>
>> [1]
>> https://github.com/apache/beam/blob/d309c1b7c39ba78aa0cbd5cad9cc7a256e3caa9f/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L673
>>
>>
>


Re: GC overhead limit exceeded

2020-05-11 Thread Eleanore Jin
Hi Max,

No I did not introduce RocksDB at this point since the pipeline is
stateless apart from Kafka offset.

So what we do is to ensure there is a dummy message in the side input to
avoid this situation.

Thanks!
Eleanore

On Mon, May 11, 2020 at 2:57 AM Maximilian Michels  wrote:

> Generally, it is to be expected that the main input is buffered until
> the side input is available. We really have no other option to correctly
> process the data.
>
> Have you tried using RocksDB as the state backend to prevent too much GC
> churn?
>
> -Max
>
> On 07.05.20 06:27, Eleanore Jin wrote:
> > Please see: https://issues.apache.org/jira/browse/BEAM-9914
> >
> > Thanks a lot!
> > Eleanore
> >
> > On Wed, May 6, 2020 at 9:17 PM Ankur Goenka  > > wrote:
> >
> > Thanks for sharing the response. It makes sense to me.
> > Please file a jira in Beam so that we can prioritize it.
> >
> > Thanks,
> > Ankur
> >
> > On Wed, May 6, 2020 at 9:08 PM Eleanore Jin  > > wrote:
> >
> > Hi Ankur,
> >
> > Thanks for your response.
> >
> > I also checked with Flink Community, here is there response, in
> > short, flink does not cache the main input data if there is no
> > data available in side input  (flink broadcast stream)
> >
> > - quote from flink community:
> >
> > Coming back to your question, Flink's Broadcast stream does
> > *not* block or collect events from the non-broadcasted side if
> > the broadcast side doesn't serve events.
> > However, the user-implemented operators (Beam or your code in
> > this case) often puts non-broadcasted events into state to wait
> > for input from the other side.
> > Since the error is not about lack of memory, the buffering in
> > Flink state might not be the problem here.
> >
> > Thanks a lot for the help!
> > Eleanore
> >
> > On Wed, May 6, 2020 at 8:59 PM Ankur Goenka  > > wrote:
> >
> > The relevant code should bere
> > here
> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L595
>
> >
> > Given that the problem goes away after publishing Side input
> > suggests that this might be a problem with synchronizing 2
> > streams of data on Flink using Beam.
> >
> > I am not sure if flink optimizer waits for site input to be
> > available before processing the main input. We might
> > potentially handle this on the Beam side as well or use a
> > different set of flink apis to let us do better optimization
> > if possible. In any case this would require a new sdk
> > release if we decide to fix.
> >
> > On Wed, May 6, 2020 at 7:54 PM Eleanore Jin
> > mailto:eleanore@gmail.com>>
> wrote:
> >
> > Hi Ankur,
> >
> > Thanks for the answer! Can you please point to me the
> > source code where the buffering is? I would like to
> > learn how beam works, thanks!
> >
> > To your question, in my case, side input does not have
> > any data, meaning no one publishing to the side input
> > topic.
> >
> > After publishing some data into the side input topic,
> > the OOM goes away.
> >
> > Thanks!
> > Eleanore
> >
> > On Wed, May 6, 2020 at 6:37 PM Ankur Goenka
> > mailto:goe...@google.com>> wrote:
> >
> > Hi Eleanore,
> >
> > The operation requires buffering the data till the
> > data from side input is not available. Which might
> > be causing the OOM issue.
> > You mention that OOM happens when there is no data
> > in side input. Does it mean that the side input is
> > not yet ready or does side input have no data at all?
> >
> > Thanks,
> > Ankur
> >
> > On Tue, May 5, 2020 at 5:15 PM Pablo Estrada
> > mailto:pabl...@google.com>>
> wrote:
> >
> > +Ankur Goenka  by any
> > chance do you know what could be causing this?
> >
> > Thanks Eleanore for the detailed debugging : )
> >
> > On Tue, May 5, 2020 at 9:34 AM Eleanore Jin
> >  > > wrote:
> >
> > Hi Community,
> >
> > Just wonder does side input feature buffer
> >   

Behavior of KafkaIO

2020-05-11 Thread Eleanore Jin
Hi community,

In my pipeline, I am using KafkaIO to read and write. The source topic has
4 partitions and pipeline parallelism is 1.

I noticed from consumer lag metrics, it will consume from 1 partition until
all the messages from that partition is processed then it will consume from
another partition.

Is this the expected behavior?

Runner is Flink.

Thanks a lot!
Eleanore


Re: Behavior of KafkaIO

2020-05-11 Thread Chamikara Jayalath
The number of partitions assigned to a given split depends on the
desiredNumSplits value provided by the runner.
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L54

(This is assuming that you are using Beam Kafka source not a native Flink
override).

Do you see the same behavior when you increase the number of workers of
your Flink cluster ?

On Mon, May 11, 2020 at 5:28 PM Eleanore Jin  wrote:

> Hi community,
>
> In my pipeline, I am using KafkaIO to read and write. The source topic has
> 4 partitions and pipeline parallelism is 1.
>
> I noticed from consumer lag metrics, it will consume from 1 partition
> until all the messages from that partition is processed then it will
> consume from another partition.
>
> Is this the expected behavior?
>
> Runner is Flink.
>
> Thanks a lot!
> Eleanore
>


Re: Behavior of KafkaIO

2020-05-11 Thread Heejong Lee
If we assume that there's only one reader, all partitions are assigned to a
single KafkaConsumer. I think the order of reading each partition depends
on KafkaConsumer implementation i.e. how KafkaConsumer.poll() returns
messages.

Reference:
assigning partitions:
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L83
polling records:
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L538
creating a record batch:
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L614

On Mon, May 11, 2020 at 7:54 PM Chamikara Jayalath 
wrote:

> The number of partitions assigned to a given split depends on the
> desiredNumSplits value provided by the runner.
>
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L54
>
> (This is assuming that you are using Beam Kafka source not a native Flink
> override).
>
> Do you see the same behavior when you increase the number of workers of
> your Flink cluster ?
>
> On Mon, May 11, 2020 at 5:28 PM Eleanore Jin 
> wrote:
>
>> Hi community,
>>
>> In my pipeline, I am using KafkaIO to read and write. The source topic
>> has 4 partitions and pipeline parallelism is 1.
>>
>> I noticed from consumer lag metrics, it will consume from 1 partition
>> until all the messages from that partition is processed then it will
>> consume from another partition.
>>
>> Is this the expected behavior?
>>
>> Runner is Flink.
>>
>> Thanks a lot!
>> Eleanore
>>
>


Re: Behavior of KafkaIO

2020-05-11 Thread Eleanore Jin
Hi Chamikara and Lee,

Thanks for the information, I did more experiment on my local laptop.
(Flink Runner local mode, Job Manager and Task Manager runs in the same JVM)
setup: input topic 4 partitions
1. with 1 parallelism: KafkaIO read will drill 1 partition completed to 0
lags, then move to the another partition
2. with 2 parallelism: KafkaIO read will read 2 partitions together, and
move to the rest of the partitions
3. with 4 parallelism: KafkaIO read will read 4 partitions together.

In production, we run multiple Flink Task managers, from the consumer lag
reported, we also see some partitions goes to 0, while other
partitions remain high lag.

Thanks!
Eleanore

On Mon, May 11, 2020 at 8:19 PM Heejong Lee  wrote:

> If we assume that there's only one reader, all partitions are assigned to
> a single KafkaConsumer. I think the order of reading each partition depends
> on KafkaConsumer implementation i.e. how KafkaConsumer.poll() returns
> messages.
>
> Reference:
> assigning partitions:
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L83
> polling records:
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L538
> creating a record batch:
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L614
>
> On Mon, May 11, 2020 at 7:54 PM Chamikara Jayalath 
> wrote:
>
>> The number of partitions assigned to a given split depends on the
>> desiredNumSplits value provided by the runner.
>>
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L54
>>
>> (This is assuming that you are using Beam Kafka source not a native Flink
>> override).
>>
>> Do you see the same behavior when you increase the number of workers of
>> your Flink cluster ?
>>
>> On Mon, May 11, 2020 at 5:28 PM Eleanore Jin 
>> wrote:
>>
>>> Hi community,
>>>
>>> In my pipeline, I am using KafkaIO to read and write. The source topic
>>> has 4 partitions and pipeline parallelism is 1.
>>>
>>> I noticed from consumer lag metrics, it will consume from 1 partition
>>> until all the messages from that partition is processed then it will
>>> consume from another partition.
>>>
>>> Is this the expected behavior?
>>>
>>> Runner is Flink.
>>>
>>> Thanks a lot!
>>> Eleanore
>>>
>>