Josh, Reshuffle might also be worth trying. To clarify, ~1s end-to-end is not always simple given number of systems and services involved between publisher and eventual sink.
Raghu. On Wed, May 24, 2017 at 12:32 PM, Raghu Angadi <[email protected]> wrote: > Thanks. Looked a some job system level metrics. I see minimal latency > within Dataflow pipeline itself, you might not see much improvement from > Reshuffle() (may be ~0.25 seconds). > > Do you have control on publisher? Publishers might be batching hundreds of > messages, which adds latency. You can try to reduce that. Even then PubSub > itself does some batching internally which might limit overall latency. > Removing publisher batching is worth a try. > > > On Wed, May 24, 2017 at 11:46 AM, Josh <[email protected]> wrote: > >> Hi Raghu, >> >> My job ID is 2017-05-24_02_46_42-11524480684503077480 - thanks for >> taking a look! >> >> Yes I'm using BigtableIO for the sink and I am measuring the end-to-end >> latency. It seems to take 3-6 seconds typically, I would like to get it >> down to ~1s. >> >> Thanks, >> Josh >> >> On Wed, May 24, 2017 at 6:50 PM, Raghu Angadi <[email protected]> wrote: >> >>> Josh, >>> >>> Can you share your job_id? I could take look. Are you measuring latency >>> end-to-end (publisher to when it appears on BT?). Are you using BigtableIO >>> for sink? >>> >>> There is no easy way to use more workers when auto-scaling is enabled. >>> It thinks your backlog and CPU are low enough and does not need to scale. >>> Raghu. >>> >>> On Wed, May 24, 2017 at 10:14 AM, Josh <[email protected]> wrote: >>> >>>> Thanks Ankur, that's super helpful! I will give these optimisations a >>>> go. >>>> >>>> About the "No operations completed" message - there are a few of these >>>> in the logs (but very few, like 1 an hour or something) - so probably no >>>> need to scale up Bigtable. >>>> I did however see a lot of INFO messages "Wrote 0 records" in the >>>> logs. Probably about 50% of the "Wrote n records" messages are zero. >>>> While the other 50% are quite high (e.g. "Wrote 80 records"). Not sure if >>>> that could indicate a bad setting? >>>> >>>> Josh >>>> >>>> >>>> >>>> On Wed, May 24, 2017 at 5:22 PM, Ankur Chauhan <[email protected]> >>>> wrote: >>>> >>>>> There are two main things to see here: >>>>> >>>>> * In the logs, are there any messages like "No operations completed >>>>> within the last 61 seconds. There are still 1 simple operations and 1 >>>>> complex operations in progress.” This means you are underscaled on the >>>>> bigtable side and would benefit from increasing the node count. >>>>> * We also saw some improvement in performance (workload dependent) by >>>>> going to a bigger worker machine type. >>>>> * Another optimization that worked for our use case: >>>>> >>>>> // streaming dataflow has larger machines with smaller bundles, so we can >>>>> queue up a lot more without blowing up >>>>> private static BigtableOptions >>>>> createStreamingBTOptions(AnalyticsPipelineOptions opts) { >>>>> return new BigtableOptions.Builder() >>>>> .setProjectId(opts.getProject()) >>>>> .setInstanceId(opts.getBigtableInstanceId()) >>>>> .setUseCachedDataPool(true) >>>>> .setDataChannelCount(32) >>>>> .setBulkOptions(new BulkOptions.Builder() >>>>> .setUseBulkApi(true) >>>>> .setBulkMaxRowKeyCount(2048) >>>>> .setBulkMaxRequestSize(8_388_608L) >>>>> .setAsyncMutatorWorkerCount(32) >>>>> .build()) >>>>> .build(); >>>>> } >>>>> >>>>> >>>>> There is a lot of trial and error involved in getting the end-to-end >>>>> latency down so I would suggest enabling the profiling using the >>>>> —saveProfilesToGcs option and get a sense of what is exactly happening. >>>>> >>>>> — Ankur Chauhan >>>>> >>>>> On May 24, 2017, at 9:09 AM, Josh <[email protected]> wrote: >>>>> >>>>> Ah ok - I am using the Dataflow runner. I didn't realise about the >>>>> custom implementation being provided at runtime... >>>>> >>>>> Any ideas of how to tweak my job to either lower the latency consuming >>>>> from PubSub or to lower the latency in writing to Bigtable? >>>>> >>>>> >>>>> On Wed, May 24, 2017 at 4:14 PM, Lukasz Cwik <[email protected]> wrote: >>>>> >>>>>> What runner are you using (Flink, Spark, Google Cloud Dataflow, Apex, >>>>>> ...)? >>>>>> >>>>>> On Wed, May 24, 2017 at 8:09 AM, Ankur Chauhan <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Sorry that was an autocorrect error. I meant to ask - what dataflow >>>>>>> runner are you using? If you are using google cloud dataflow then the >>>>>>> PubsubIO class is not the one doing the reading from the pubsub topic. >>>>>>> They >>>>>>> provide a custom implementation at run time. >>>>>>> >>>>>>> Ankur Chauhan >>>>>>> Sent from my iPhone >>>>>>> >>>>>>> On May 24, 2017, at 07:52, Josh <[email protected]> wrote: >>>>>>> >>>>>>> Hi Ankur, >>>>>>> >>>>>>> What do you mean by runner address? >>>>>>> Would you be able to link me to the comment you're referring to? >>>>>>> >>>>>>> I am using the PubsubIO.Read class from Beam 2.0.0 as found here: >>>>>>> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/ >>>>>>> io/google-cloud-platform/src/main/java/org/apache/beam/sdk/i >>>>>>> o/gcp/pubsub/PubsubIO.java >>>>>>> >>>>>>> Thanks, >>>>>>> Josh >>>>>>> >>>>>>> On Wed, May 24, 2017 at 3:36 PM, Ankur Chauhan <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> What runner address you using. Google cloud dataflow uses a closed >>>>>>>> source version of the pubsub reader as noted in a comment on Read >>>>>>>> class. >>>>>>>> >>>>>>>> Ankur Chauhan >>>>>>>> Sent from my iPhone >>>>>>>> >>>>>>>> On May 24, 2017, at 04:05, Josh <[email protected]> wrote: >>>>>>>> >>>>>>>> Hi all, >>>>>>>> >>>>>>>> I'm using PubsubIO.Read to consume a Pubsub stream, and my job then >>>>>>>> writes the data out to Bigtable. I'm currently seeing a latency of 3-5 >>>>>>>> seconds between the messages being published and being written to >>>>>>>> Bigtable. >>>>>>>> >>>>>>>> I want to try and decrease the latency to <1s if possible - does >>>>>>>> anyone have any tips for doing this? >>>>>>>> >>>>>>>> I noticed that there is a PubsubGrpcClient >>>>>>>> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/ >>>>>>>> io/google-cloud-platform/src/main/java/org/apache/beam/sdk/i >>>>>>>> o/gcp/pubsub/PubsubGrpcClient.java however the >>>>>>>> PubsubUnboundedSource is initialised with a PubsubJsonClient, so the >>>>>>>> Grpc >>>>>>>> client doesn't appear to be being used. Is there a way to switch to the >>>>>>>> Grpc client - as perhaps that would give better performance? >>>>>>>> >>>>>>>> Also, I am running my job on Dataflow using autoscaling, which has >>>>>>>> only allocated one n1-standard-4 instance to the job, which is >>>>>>>> running at ~50% CPU. Could forcing a higher number of nodes help >>>>>>>> improve >>>>>>>> latency? >>>>>>>> >>>>>>>> Thanks for any advice, >>>>>>>> Josh >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>> >>> >> >
