On Wed, May 24, 2017 at 10:14 AM, Josh <jof...@gmail.com> wrote: > Thanks Ankur, that's super helpful! I will give these optimisations a go. > > About the "No operations completed" message - there are a few of these in > the logs (but very few, like 1 an hour or something) - so probably no need > to scale up Bigtable. > I did however see a lot of INFO messages "Wrote 0 records" in the logs. > Probably > about 50% of the "Wrote n records" messages are zero. While the other 50% > are quite high (e.g. "Wrote 80 records"). Not sure if that could indicate a > bad setting? >
If this is a single stage pipeline (no GroupByKey()), high number could indicate bundle size in Dataflow. A larger bundle could increase latency. You can try a work around by adding a reshuffle <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L49>. Something like : .apply(...) .apply(message -> (random.nextLong(), message)) // add a random key. .apply(Reshuffle.of()) .apply((i, message) -> message) // strip key .apply(sink) This would bring 'n' down to 1 or so. I don't know why there are messages that say '0' records. Raghu. > Josh > > > > On Wed, May 24, 2017 at 5:22 PM, Ankur Chauhan <an...@malloc64.com> wrote: > >> There are two main things to see here: >> >> * In the logs, are there any messages like "No operations completed >> within the last 61 seconds. There are still 1 simple operations and 1 >> complex operations in progress.” This means you are underscaled on the >> bigtable side and would benefit from increasing the node count. >> * We also saw some improvement in performance (workload dependent) by >> going to a bigger worker machine type. >> * Another optimization that worked for our use case: >> >> // streaming dataflow has larger machines with smaller bundles, so we can >> queue up a lot more without blowing up >> private static BigtableOptions >> createStreamingBTOptions(AnalyticsPipelineOptions opts) { >> return new BigtableOptions.Builder() >> .setProjectId(opts.getProject()) >> .setInstanceId(opts.getBigtableInstanceId()) >> .setUseCachedDataPool(true) >> .setDataChannelCount(32) >> .setBulkOptions(new BulkOptions.Builder() >> .setUseBulkApi(true) >> .setBulkMaxRowKeyCount(2048) >> .setBulkMaxRequestSize(8_388_608L) >> .setAsyncMutatorWorkerCount(32) >> .build()) >> .build(); >> } >> >> >> There is a lot of trial and error involved in getting the end-to-end >> latency down so I would suggest enabling the profiling using the >> —saveProfilesToGcs option and get a sense of what is exactly happening. >> >> — Ankur Chauhan >> >> On May 24, 2017, at 9:09 AM, Josh <jof...@gmail.com> wrote: >> >> Ah ok - I am using the Dataflow runner. I didn't realise about the custom >> implementation being provided at runtime... >> >> Any ideas of how to tweak my job to either lower the latency consuming >> from PubSub or to lower the latency in writing to Bigtable? >> >> >> On Wed, May 24, 2017 at 4:14 PM, Lukasz Cwik <lc...@google.com> wrote: >> >>> What runner are you using (Flink, Spark, Google Cloud Dataflow, Apex, >>> ...)? >>> >>> On Wed, May 24, 2017 at 8:09 AM, Ankur Chauhan <an...@malloc64.com> >>> wrote: >>> >>>> Sorry that was an autocorrect error. I meant to ask - what dataflow >>>> runner are you using? If you are using google cloud dataflow then the >>>> PubsubIO class is not the one doing the reading from the pubsub topic. They >>>> provide a custom implementation at run time. >>>> >>>> Ankur Chauhan >>>> Sent from my iPhone >>>> >>>> On May 24, 2017, at 07:52, Josh <jof...@gmail.com> wrote: >>>> >>>> Hi Ankur, >>>> >>>> What do you mean by runner address? >>>> Would you be able to link me to the comment you're referring to? >>>> >>>> I am using the PubsubIO.Read class from Beam 2.0.0 as found here: >>>> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/ >>>> io/google-cloud-platform/src/main/java/org/apache/beam/sdk/i >>>> o/gcp/pubsub/PubsubIO.java >>>> >>>> Thanks, >>>> Josh >>>> >>>> On Wed, May 24, 2017 at 3:36 PM, Ankur Chauhan <an...@malloc64.com> >>>> wrote: >>>> >>>>> What runner address you using. Google cloud dataflow uses a closed >>>>> source version of the pubsub reader as noted in a comment on Read class. >>>>> >>>>> Ankur Chauhan >>>>> Sent from my iPhone >>>>> >>>>> On May 24, 2017, at 04:05, Josh <jof...@gmail.com> wrote: >>>>> >>>>> Hi all, >>>>> >>>>> I'm using PubsubIO.Read to consume a Pubsub stream, and my job then >>>>> writes the data out to Bigtable. I'm currently seeing a latency of 3-5 >>>>> seconds between the messages being published and being written to >>>>> Bigtable. >>>>> >>>>> I want to try and decrease the latency to <1s if possible - does >>>>> anyone have any tips for doing this? >>>>> >>>>> I noticed that there is a PubsubGrpcClient >>>>> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/ >>>>> io/google-cloud-platform/src/main/java/org/apache/beam/sdk/i >>>>> o/gcp/pubsub/PubsubGrpcClient.java however the PubsubUnboundedSource >>>>> is initialised with a PubsubJsonClient, so the Grpc client doesn't appear >>>>> to be being used. Is there a way to switch to the Grpc client - as perhaps >>>>> that would give better performance? >>>>> >>>>> Also, I am running my job on Dataflow using autoscaling, which has >>>>> only allocated one n1-standard-4 instance to the job, which is >>>>> running at ~50% CPU. Could forcing a higher number of nodes help improve >>>>> latency? >>>>> >>>>> Thanks for any advice, >>>>> Josh >>>>> >>>>> >>>> >>> >> >> >