Hi Raghu, My job ID is 2017-05-24_02_46_42-11524480684503077480 - thanks for taking a look!
Yes I'm using BigtableIO for the sink and I am measuring the end-to-end latency. It seems to take 3-6 seconds typically, I would like to get it down to ~1s. Thanks, Josh On Wed, May 24, 2017 at 6:50 PM, Raghu Angadi <rang...@google.com> wrote: > Josh, > > Can you share your job_id? I could take look. Are you measuring latency > end-to-end (publisher to when it appears on BT?). Are you using BigtableIO > for sink? > > There is no easy way to use more workers when auto-scaling is enabled. It > thinks your backlog and CPU are low enough and does not need to scale. > Raghu. > > On Wed, May 24, 2017 at 10:14 AM, Josh <jof...@gmail.com> wrote: > >> Thanks Ankur, that's super helpful! I will give these optimisations a go. >> >> About the "No operations completed" message - there are a few of these in >> the logs (but very few, like 1 an hour or something) - so probably no need >> to scale up Bigtable. >> I did however see a lot of INFO messages "Wrote 0 records" in the logs. >> Probably >> about 50% of the "Wrote n records" messages are zero. While the other 50% >> are quite high (e.g. "Wrote 80 records"). Not sure if that could indicate a >> bad setting? >> >> Josh >> >> >> >> On Wed, May 24, 2017 at 5:22 PM, Ankur Chauhan <an...@malloc64.com> >> wrote: >> >>> There are two main things to see here: >>> >>> * In the logs, are there any messages like "No operations completed >>> within the last 61 seconds. There are still 1 simple operations and 1 >>> complex operations in progress.” This means you are underscaled on the >>> bigtable side and would benefit from increasing the node count. >>> * We also saw some improvement in performance (workload dependent) by >>> going to a bigger worker machine type. >>> * Another optimization that worked for our use case: >>> >>> // streaming dataflow has larger machines with smaller bundles, so we can >>> queue up a lot more without blowing up >>> private static BigtableOptions >>> createStreamingBTOptions(AnalyticsPipelineOptions opts) { >>> return new BigtableOptions.Builder() >>> .setProjectId(opts.getProject()) >>> .setInstanceId(opts.getBigtableInstanceId()) >>> .setUseCachedDataPool(true) >>> .setDataChannelCount(32) >>> .setBulkOptions(new BulkOptions.Builder() >>> .setUseBulkApi(true) >>> .setBulkMaxRowKeyCount(2048) >>> .setBulkMaxRequestSize(8_388_608L) >>> .setAsyncMutatorWorkerCount(32) >>> .build()) >>> .build(); >>> } >>> >>> >>> There is a lot of trial and error involved in getting the end-to-end >>> latency down so I would suggest enabling the profiling using the >>> —saveProfilesToGcs option and get a sense of what is exactly happening. >>> >>> — Ankur Chauhan >>> >>> On May 24, 2017, at 9:09 AM, Josh <jof...@gmail.com> wrote: >>> >>> Ah ok - I am using the Dataflow runner. I didn't realise about the >>> custom implementation being provided at runtime... >>> >>> Any ideas of how to tweak my job to either lower the latency consuming >>> from PubSub or to lower the latency in writing to Bigtable? >>> >>> >>> On Wed, May 24, 2017 at 4:14 PM, Lukasz Cwik <lc...@google.com> wrote: >>> >>>> What runner are you using (Flink, Spark, Google Cloud Dataflow, Apex, >>>> ...)? >>>> >>>> On Wed, May 24, 2017 at 8:09 AM, Ankur Chauhan <an...@malloc64.com> >>>> wrote: >>>> >>>>> Sorry that was an autocorrect error. I meant to ask - what dataflow >>>>> runner are you using? If you are using google cloud dataflow then the >>>>> PubsubIO class is not the one doing the reading from the pubsub topic. >>>>> They >>>>> provide a custom implementation at run time. >>>>> >>>>> Ankur Chauhan >>>>> Sent from my iPhone >>>>> >>>>> On May 24, 2017, at 07:52, Josh <jof...@gmail.com> wrote: >>>>> >>>>> Hi Ankur, >>>>> >>>>> What do you mean by runner address? >>>>> Would you be able to link me to the comment you're referring to? >>>>> >>>>> I am using the PubsubIO.Read class from Beam 2.0.0 as found here: >>>>> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/ >>>>> io/google-cloud-platform/src/main/java/org/apache/beam/sdk/i >>>>> o/gcp/pubsub/PubsubIO.java >>>>> >>>>> Thanks, >>>>> Josh >>>>> >>>>> On Wed, May 24, 2017 at 3:36 PM, Ankur Chauhan <an...@malloc64.com> >>>>> wrote: >>>>> >>>>>> What runner address you using. Google cloud dataflow uses a closed >>>>>> source version of the pubsub reader as noted in a comment on Read class. >>>>>> >>>>>> Ankur Chauhan >>>>>> Sent from my iPhone >>>>>> >>>>>> On May 24, 2017, at 04:05, Josh <jof...@gmail.com> wrote: >>>>>> >>>>>> Hi all, >>>>>> >>>>>> I'm using PubsubIO.Read to consume a Pubsub stream, and my job then >>>>>> writes the data out to Bigtable. I'm currently seeing a latency of 3-5 >>>>>> seconds between the messages being published and being written to >>>>>> Bigtable. >>>>>> >>>>>> I want to try and decrease the latency to <1s if possible - does >>>>>> anyone have any tips for doing this? >>>>>> >>>>>> I noticed that there is a PubsubGrpcClient >>>>>> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/ >>>>>> io/google-cloud-platform/src/main/java/org/apache/beam/sdk/i >>>>>> o/gcp/pubsub/PubsubGrpcClient.java however the PubsubUnboundedSource >>>>>> is initialised with a PubsubJsonClient, so the Grpc client doesn't appear >>>>>> to be being used. Is there a way to switch to the Grpc client - as >>>>>> perhaps >>>>>> that would give better performance? >>>>>> >>>>>> Also, I am running my job on Dataflow using autoscaling, which has >>>>>> only allocated one n1-standard-4 instance to the job, which is >>>>>> running at ~50% CPU. Could forcing a higher number of nodes help improve >>>>>> latency? >>>>>> >>>>>> Thanks for any advice, >>>>>> Josh >>>>>> >>>>>> >>>>> >>>> >>> >>> >> >