Re: Scaling Beam pipeline on Data Flow - Join bounded and non-bounded source
Hi, How many keys do you have flowing through that global window? If there is a wide key space any chance you have a few very hot keys? Cheers Rez On Thu, 21 Mar 2019, 04:04 Juan Carlos Garcia, wrote: > I would recommend going to the compute engine service and check the vm > where the pipeline is working, from there you might have more insight if > you have a bottleneck on your pipeline (cpu, io, network) that is > preventing to process it faster. > > > > Maulik Gandhi schrieb am Mi., 20. März 2019, 20:15: > >> How big is your bounded-source >> - 16.01 GiB total data from AVRO files. But it can be b/w 10-100s of GBs >> >> How much pressure (messages per seconds) your unbounded source is >> receiving? >> - Initially no pressure, to prime the Beam state, but later there will >> be data flowing through PubSub. >> >> I also add a parameter on mvn command, as below and could get 7 (105/15) >> worker, as per guide: >> https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#autoscaling >> >> mvn compile exec:java -pl >> -Dexec.mainClass= \ >> -Dexec.args="--runner=DataflowRunner --project= \ >> --stagingLocation=gs:// \ >> --maxNumWorkers=105 \ >> --autoscalingAlgorithm=THROUGHPUT_BASED \ >> --templateLocation=gs://" >> >> Even though I got 7 worker nodes, when processing GCS data (bounded >> source) and adding it to Beam state, I think the work was just being >> performed on 1 node, as it took more than 16+ hours. >> >> Can someone point me to documentation, on how to figure out how much data >> is being processed by each worker node (like reading GCS part AVRO files, >> input counts, etc), rather than just high-level count of input and output >> element from ParDo. >> >> Thanks. >> - Maulik >> >> On Wed, Mar 20, 2019 at 3:17 AM Juan Carlos Garcia >> wrote: >> >>> Your auto scaling algorithm is THROUGHPUT_BASED, it will kicks in only >>> when it feels the pipeline is not able to keep it up with the incoming >>> source. How big is your bounded-source and how much pressure (messages per >>> seconds) your unbounded source is receiving? >>> >>> Maulik Gandhi schrieb am Di., 19. März 2019, 21:06: >>> Hi Juan, Thanks for replying. I believe I am using correct configurations. I have posted more details with code snippet and Data Flow job template configuration on Stack Overflow post: https://stackoverflow.com/q/55242684/11226631 Thanks. - Maulik On Tue, Mar 19, 2019 at 2:53 PM Juan Carlos Garcia wrote: > Hi Maulik, > > Have you submitted your job with the correct configuration to enable > autoscaling? > > --autoscalingAlgorithm= > --maxWorkers= > > I am on my phone right now and can't tell if the flags name are 100% > correct. > > > Maulik Gandhi schrieb am Di., 19. März 2019, 18:13: > >> >> Maulik Gandhi >> 10:19 AM (1 hour ago) >> to user >> Hi Beam Community, >> >> I am working on Beam processing pipeline, which reads data from the >> non-bounded and bounded source and want to leverage Beam state management >> in my pipeline. For putting data in Beam state, I have to transfer the >> data in key-value (eg: KV. As I am reading data from the >> non-bounded and bounded source, I am forced to perform Window + >> Triggering, >> before grouping data by key. I have chosen to use GlobalWindows(). >> >> I am able to kick-off the Data Flow job, which would run my Beam >> pipeline. I have noticed Data Flow would use only 1 Worker node to >> perform >> the work, and would not scale the job to use more worker nodes, thus not >> leveraging the benefit of distributed processing. >> >> I have posted the question on Stack Overflow: >> https://stackoverflow.com/questions/55242684/join-bounded-and-non-bounded-source-data-flow-job-not-scaling >> but >> reaching out on the mailing list, to get some help, or learn what I >> am missing. >> >> Any help would be appreciated. >> >> Thanks. >> - Maulik >> >
Re: Scaling Beam pipeline on Data Flow - Join bounded and non-bounded source
I would recommend going to the compute engine service and check the vm where the pipeline is working, from there you might have more insight if you have a bottleneck on your pipeline (cpu, io, network) that is preventing to process it faster. Maulik Gandhi schrieb am Mi., 20. März 2019, 20:15: > How big is your bounded-source > - 16.01 GiB total data from AVRO files. But it can be b/w 10-100s of GBs > > How much pressure (messages per seconds) your unbounded source is > receiving? > - Initially no pressure, to prime the Beam state, but later there will be > data flowing through PubSub. > > I also add a parameter on mvn command, as below and could get 7 (105/15) > worker, as per guide: > https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#autoscaling > > mvn compile exec:java -pl > -Dexec.mainClass= \ > -Dexec.args="--runner=DataflowRunner --project= \ > --stagingLocation=gs:// \ > --maxNumWorkers=105 \ > --autoscalingAlgorithm=THROUGHPUT_BASED \ > --templateLocation=gs://" > > Even though I got 7 worker nodes, when processing GCS data (bounded > source) and adding it to Beam state, I think the work was just being > performed on 1 node, as it took more than 16+ hours. > > Can someone point me to documentation, on how to figure out how much data > is being processed by each worker node (like reading GCS part AVRO files, > input counts, etc), rather than just high-level count of input and output > element from ParDo. > > Thanks. > - Maulik > > On Wed, Mar 20, 2019 at 3:17 AM Juan Carlos Garcia > wrote: > >> Your auto scaling algorithm is THROUGHPUT_BASED, it will kicks in only >> when it feels the pipeline is not able to keep it up with the incoming >> source. How big is your bounded-source and how much pressure (messages per >> seconds) your unbounded source is receiving? >> >> Maulik Gandhi schrieb am Di., 19. März 2019, 21:06: >> >>> Hi Juan, >>> >>> Thanks for replying. I believe I am using correct configurations. >>> >>> I have posted more details with code snippet and Data Flow job template >>> configuration on Stack Overflow post: >>> https://stackoverflow.com/q/55242684/11226631 >>> >>> Thanks. >>> - Maulik >>> >>> On Tue, Mar 19, 2019 at 2:53 PM Juan Carlos Garcia >>> wrote: >>> Hi Maulik, Have you submitted your job with the correct configuration to enable autoscaling? --autoscalingAlgorithm= --maxWorkers= I am on my phone right now and can't tell if the flags name are 100% correct. Maulik Gandhi schrieb am Di., 19. März 2019, 18:13: > > Maulik Gandhi > 10:19 AM (1 hour ago) > to user > Hi Beam Community, > > I am working on Beam processing pipeline, which reads data from the > non-bounded and bounded source and want to leverage Beam state management > in my pipeline. For putting data in Beam state, I have to transfer the > data in key-value (eg: KV. As I am reading data from the > non-bounded and bounded source, I am forced to perform Window + > Triggering, > before grouping data by key. I have chosen to use GlobalWindows(). > > I am able to kick-off the Data Flow job, which would run my Beam > pipeline. I have noticed Data Flow would use only 1 Worker node to > perform > the work, and would not scale the job to use more worker nodes, thus not > leveraging the benefit of distributed processing. > > I have posted the question on Stack Overflow: > https://stackoverflow.com/questions/55242684/join-bounded-and-non-bounded-source-data-flow-job-not-scaling > but > reaching out on the mailing list, to get some help, or learn what I > am missing. > > Any help would be appreciated. > > Thanks. > - Maulik >
Re: Scaling Beam pipeline on Data Flow - Join bounded and non-bounded source
How big is your bounded-source - 16.01 GiB total data from AVRO files. But it can be b/w 10-100s of GBs How much pressure (messages per seconds) your unbounded source is receiving? - Initially no pressure, to prime the Beam state, but later there will be data flowing through PubSub. I also add a parameter on mvn command, as below and could get 7 (105/15) worker, as per guide: https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#autoscaling mvn compile exec:java -pl -Dexec.mainClass= \ -Dexec.args="--runner=DataflowRunner --project= \ --stagingLocation=gs:// \ --maxNumWorkers=105 \ --autoscalingAlgorithm=THROUGHPUT_BASED \ --templateLocation=gs://" Even though I got 7 worker nodes, when processing GCS data (bounded source) and adding it to Beam state, I think the work was just being performed on 1 node, as it took more than 16+ hours. Can someone point me to documentation, on how to figure out how much data is being processed by each worker node (like reading GCS part AVRO files, input counts, etc), rather than just high-level count of input and output element from ParDo. Thanks. - Maulik On Wed, Mar 20, 2019 at 3:17 AM Juan Carlos Garcia wrote: > Your auto scaling algorithm is THROUGHPUT_BASED, it will kicks in only > when it feels the pipeline is not able to keep it up with the incoming > source. How big is your bounded-source and how much pressure (messages per > seconds) your unbounded source is receiving? > > Maulik Gandhi schrieb am Di., 19. März 2019, 21:06: > >> Hi Juan, >> >> Thanks for replying. I believe I am using correct configurations. >> >> I have posted more details with code snippet and Data Flow job template >> configuration on Stack Overflow post: >> https://stackoverflow.com/q/55242684/11226631 >> >> Thanks. >> - Maulik >> >> On Tue, Mar 19, 2019 at 2:53 PM Juan Carlos Garcia >> wrote: >> >>> Hi Maulik, >>> >>> Have you submitted your job with the correct configuration to enable >>> autoscaling? >>> >>> --autoscalingAlgorithm= >>> --maxWorkers= >>> >>> I am on my phone right now and can't tell if the flags name are 100% >>> correct. >>> >>> >>> Maulik Gandhi schrieb am Di., 19. März 2019, 18:13: >>> Maulik Gandhi 10:19 AM (1 hour ago) to user Hi Beam Community, I am working on Beam processing pipeline, which reads data from the non-bounded and bounded source and want to leverage Beam state management in my pipeline. For putting data in Beam state, I have to transfer the data in key-value (eg: KV. As I am reading data from the non-bounded and bounded source, I am forced to perform Window + Triggering, before grouping data by key. I have chosen to use GlobalWindows(). I am able to kick-off the Data Flow job, which would run my Beam pipeline. I have noticed Data Flow would use only 1 Worker node to perform the work, and would not scale the job to use more worker nodes, thus not leveraging the benefit of distributed processing. I have posted the question on Stack Overflow: https://stackoverflow.com/questions/55242684/join-bounded-and-non-bounded-source-data-flow-job-not-scaling but reaching out on the mailing list, to get some help, or learn what I am missing. Any help would be appreciated. Thanks. - Maulik >>>
Re: Scaling Beam pipeline on Data Flow - Join bounded and non-bounded source
I think now as I understand this more clearly, there are a couple of things going on. I will try to re-explain what I am trying to achieve. - I am reading from 2 Sources - Bounded (AVRO from GCS) - Unbounded (AVRO from PubSub) - I want to prime Beam pipeline state, with data from GCS (bounded source), using UserId as key, even when data from PubSub is not flowing through. - Later, when data from PubSub (un-bounded source), starts flowing I would update/add to Beam state, using (same) UserId as key. How big is your bounded-source > 16.01 GiB total data from AVRO files. But it can be b/w 10-100s of GBs How much pressure (messages per seconds) your unbounded source is receiving? > Initially no pressure, to prime the Beam state, but later there will be data flowing through PubSub. I also add a parameter on mvn command, as below and could get 7 (105/15) worker, as per guide: https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#autoscaling mvn compile exec:java -pl -Dexec.mainClass= \ -Dexec.args="--runner=DataflowRunner --project= \ --stagingLocation=gs:// \ --maxNumWorkers=105 \ --autoscalingAlgorithm=THROUGHPUT_BASED \ --templateLocation=gs://" Thanks. - Maulik On Wed, Mar 20, 2019 at 3:17 AM Juan Carlos Garcia wrote: > Your auto scaling algorithm is THROUGHPUT_BASED, it will kicks in only > when it feels the pipeline is not able to keep it up with the incoming > source. How big is your bounded-source and how much pressure (messages per > seconds) your unbounded source is receiving? > > Maulik Gandhi schrieb am Di., 19. März 2019, 21:06: > >> Hi Juan, >> >> Thanks for replying. I believe I am using correct configurations. >> >> I have posted more details with code snippet and Data Flow job template >> configuration on Stack Overflow post: >> https://stackoverflow.com/q/55242684/11226631 >> >> Thanks. >> - Maulik >> >> On Tue, Mar 19, 2019 at 2:53 PM Juan Carlos Garcia >> wrote: >> >>> Hi Maulik, >>> >>> Have you submitted your job with the correct configuration to enable >>> autoscaling? >>> >>> --autoscalingAlgorithm= >>> --maxWorkers= >>> >>> I am on my phone right now and can't tell if the flags name are 100% >>> correct. >>> >>> >>> Maulik Gandhi schrieb am Di., 19. März 2019, 18:13: >>> Maulik Gandhi 10:19 AM (1 hour ago) to user Hi Beam Community, I am working on Beam processing pipeline, which reads data from the non-bounded and bounded source and want to leverage Beam state management in my pipeline. For putting data in Beam state, I have to transfer the data in key-value (eg: KV. As I am reading data from the non-bounded and bounded source, I am forced to perform Window + Triggering, before grouping data by key. I have chosen to use GlobalWindows(). I am able to kick-off the Data Flow job, which would run my Beam pipeline. I have noticed Data Flow would use only 1 Worker node to perform the work, and would not scale the job to use more worker nodes, thus not leveraging the benefit of distributed processing. I have posted the question on Stack Overflow: https://stackoverflow.com/questions/55242684/join-bounded-and-non-bounded-source-data-flow-job-not-scaling but reaching out on the mailing list, to get some help, or learn what I am missing. Any help would be appreciated. Thanks. - Maulik >>>
Re: Scaling Beam pipeline on Data Flow - Join bounded and non-bounded source
Your auto scaling algorithm is THROUGHPUT_BASED, it will kicks in only when it feels the pipeline is not able to keep it up with the incoming source. How big is your bounded-source and how much pressure (messages per seconds) your unbounded source is receiving? Maulik Gandhi schrieb am Di., 19. März 2019, 21:06: > Hi Juan, > > Thanks for replying. I believe I am using correct configurations. > > I have posted more details with code snippet and Data Flow job template > configuration on Stack Overflow post: > https://stackoverflow.com/q/55242684/11226631 > > Thanks. > - Maulik > > On Tue, Mar 19, 2019 at 2:53 PM Juan Carlos Garcia > wrote: > >> Hi Maulik, >> >> Have you submitted your job with the correct configuration to enable >> autoscaling? >> >> --autoscalingAlgorithm= >> --maxWorkers= >> >> I am on my phone right now and can't tell if the flags name are 100% >> correct. >> >> >> Maulik Gandhi schrieb am Di., 19. März 2019, 18:13: >> >>> >>> Maulik Gandhi >>> 10:19 AM (1 hour ago) >>> to user >>> Hi Beam Community, >>> >>> I am working on Beam processing pipeline, which reads data from the >>> non-bounded and bounded source and want to leverage Beam state management >>> in my pipeline. For putting data in Beam state, I have to transfer the >>> data in key-value (eg: KV. As I am reading data from the >>> non-bounded and bounded source, I am forced to perform Window + Triggering, >>> before grouping data by key. I have chosen to use GlobalWindows(). >>> >>> I am able to kick-off the Data Flow job, which would run my Beam >>> pipeline. I have noticed Data Flow would use only 1 Worker node to perform >>> the work, and would not scale the job to use more worker nodes, thus not >>> leveraging the benefit of distributed processing. >>> >>> I have posted the question on Stack Overflow: >>> https://stackoverflow.com/questions/55242684/join-bounded-and-non-bounded-source-data-flow-job-not-scaling >>> but >>> reaching out on the mailing list, to get some help, or learn what I >>> am missing. >>> >>> Any help would be appreciated. >>> >>> Thanks. >>> - Maulik >>> >>
Re: Scaling Beam pipeline on Data Flow - Join bounded and non-bounded source
Hi Juan, Thanks for replying. I believe I am using correct configurations. I have posted more details with code snippet and Data Flow job template configuration on Stack Overflow post: https://stackoverflow.com/q/55242684/11226631 Thanks. - Maulik On Tue, Mar 19, 2019 at 2:53 PM Juan Carlos Garcia wrote: > Hi Maulik, > > Have you submitted your job with the correct configuration to enable > autoscaling? > > --autoscalingAlgorithm= > --maxWorkers= > > I am on my phone right now and can't tell if the flags name are 100% > correct. > > > Maulik Gandhi schrieb am Di., 19. März 2019, 18:13: > >> >> Maulik Gandhi >> 10:19 AM (1 hour ago) >> to user >> Hi Beam Community, >> >> I am working on Beam processing pipeline, which reads data from the >> non-bounded and bounded source and want to leverage Beam state management >> in my pipeline. For putting data in Beam state, I have to transfer the >> data in key-value (eg: KV. As I am reading data from the >> non-bounded and bounded source, I am forced to perform Window + Triggering, >> before grouping data by key. I have chosen to use GlobalWindows(). >> >> I am able to kick-off the Data Flow job, which would run my Beam >> pipeline. I have noticed Data Flow would use only 1 Worker node to perform >> the work, and would not scale the job to use more worker nodes, thus not >> leveraging the benefit of distributed processing. >> >> I have posted the question on Stack Overflow: >> https://stackoverflow.com/questions/55242684/join-bounded-and-non-bounded-source-data-flow-job-not-scaling >> but >> reaching out on the mailing list, to get some help, or learn what I >> am missing. >> >> Any help would be appreciated. >> >> Thanks. >> - Maulik >> >
Re: Scaling Beam pipeline on Data Flow - Join bounded and non-bounded source
Hi Maulik, Have you submitted your job with the correct configuration to enable autoscaling? --autoscalingAlgorithm= --maxWorkers= I am on my phone right now and can't tell if the flags name are 100% correct. Maulik Gandhi schrieb am Di., 19. März 2019, 18:13: > > Maulik Gandhi > 10:19 AM (1 hour ago) > to user > Hi Beam Community, > > I am working on Beam processing pipeline, which reads data from the > non-bounded and bounded source and want to leverage Beam state management > in my pipeline. For putting data in Beam state, I have to transfer the > data in key-value (eg: KV. As I am reading data from the > non-bounded and bounded source, I am forced to perform Window + Triggering, > before grouping data by key. I have chosen to use GlobalWindows(). > > I am able to kick-off the Data Flow job, which would run my Beam > pipeline. I have noticed Data Flow would use only 1 Worker node to perform > the work, and would not scale the job to use more worker nodes, thus not > leveraging the benefit of distributed processing. > > I have posted the question on Stack Overflow: > https://stackoverflow.com/questions/55242684/join-bounded-and-non-bounded-source-data-flow-job-not-scaling > but > reaching out on the mailing list, to get some help, or learn what I > am missing. > > Any help would be appreciated. > > Thanks. > - Maulik >
Scaling Beam pipeline on Data Flow - Join bounded and non-bounded source
Maulik Gandhi 10:19 AM (1 hour ago) to user Hi Beam Community, I am working on Beam processing pipeline, which reads data from the non-bounded and bounded source and want to leverage Beam state management in my pipeline. For putting data in Beam state, I have to transfer the data in key-value (eg: KV. As I am reading data from the non-bounded and bounded source, I am forced to perform Window + Triggering, before grouping data by key. I have chosen to use GlobalWindows(). I am able to kick-off the Data Flow job, which would run my Beam pipeline. I have noticed Data Flow would use only 1 Worker node to perform the work, and would not scale the job to use more worker nodes, thus not leveraging the benefit of distributed processing. I have posted the question on Stack Overflow: https://stackoverflow.com/questions/55242684/join-bounded-and-non-bounded-source-data-flow-job-not-scaling but reaching out on the mailing list, to get some help, or learn what I am missing. Any help would be appreciated. Thanks. - Maulik