Hi Raghu,

My job ID is 2017-05-24_02_46_42-11524480684503077480 - thanks for taking a

Yes I'm using BigtableIO for the sink and I am measuring the end-to-end
latency. It seems to take 3-6 seconds typically, I would like to get it
down to ~1s.


On Wed, May 24, 2017 at 6:50 PM, Raghu Angadi <rang...@google.com> wrote:

> Josh,
> Can you share your job_id? I could take look. Are you measuring latency
> end-to-end (publisher to when it appears on BT?). Are you using BigtableIO
> for sink?
> There is no easy way to use more workers when auto-scaling is enabled. It
> thinks your backlog and CPU are low enough and does not need to scale.
> Raghu.
> On Wed, May 24, 2017 at 10:14 AM, Josh <jof...@gmail.com> wrote:
>> Thanks Ankur, that's super helpful! I will give these optimisations a go.
>> About the "No operations completed" message - there are a few of these in
>> the logs (but very few, like 1 an hour or something) - so probably no need
>> to scale up Bigtable.
>> I did however see a lot of INFO messages "Wrote 0 records" in the logs. 
>> Probably
>> about 50% of the "Wrote n records" messages are zero. While the other 50%
>> are quite high (e.g. "Wrote 80 records"). Not sure if that could indicate a
>> bad setting?
>> Josh
>> On Wed, May 24, 2017 at 5:22 PM, Ankur Chauhan <an...@malloc64.com>
>> wrote:
>>> There are two main things to see here:
>>> * In the logs, are there any messages like "No operations completed
>>> within the last 61 seconds. There are still 1 simple operations and 1
>>> complex operations in progress.” This means you are underscaled on the
>>> bigtable side and would benefit from  increasing the node count.
>>> * We also saw some improvement in performance (workload dependent) by
>>> going to a bigger worker machine type.
>>> * Another optimization that worked for our use case:
>>> // streaming dataflow has larger machines with smaller bundles, so we can 
>>> queue up a lot more without blowing up
>>> private static BigtableOptions 
>>> createStreamingBTOptions(AnalyticsPipelineOptions opts) {
>>>     return new BigtableOptions.Builder()
>>>             .setProjectId(opts.getProject())
>>>             .setInstanceId(opts.getBigtableInstanceId())
>>>             .setUseCachedDataPool(true)
>>>             .setDataChannelCount(32)
>>>             .setBulkOptions(new BulkOptions.Builder()
>>>                     .setUseBulkApi(true)
>>>                     .setBulkMaxRowKeyCount(2048)
>>>                     .setBulkMaxRequestSize(8_388_608L)
>>>                     .setAsyncMutatorWorkerCount(32)
>>>                     .build())
>>>             .build();
>>> }
>>> There is a lot of trial and error involved in getting the end-to-end
>>> latency down so I would suggest enabling the profiling using the
>>> —saveProfilesToGcs option and get a sense of what is exactly happening.
>>> — Ankur Chauhan
>>> On May 24, 2017, at 9:09 AM, Josh <jof...@gmail.com> wrote:
>>> Ah ok - I am using the Dataflow runner. I didn't realise about the
>>> custom implementation being provided at runtime...
>>> Any ideas of how to tweak my job to either lower the latency consuming
>>> from PubSub or to lower the latency in writing to Bigtable?
>>> On Wed, May 24, 2017 at 4:14 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>> What runner are you using (Flink, Spark, Google Cloud Dataflow, Apex,
>>>> ...)?
>>>> On Wed, May 24, 2017 at 8:09 AM, Ankur Chauhan <an...@malloc64.com>
>>>> wrote:
>>>>> Sorry that was an autocorrect error. I meant to ask - what dataflow
>>>>> runner are you using? If you are using google cloud dataflow then the
>>>>> PubsubIO class is not the one doing the reading from the pubsub topic. 
>>>>> They
>>>>> provide a custom implementation at run time.
>>>>> Ankur Chauhan
>>>>> Sent from my iPhone
>>>>> On May 24, 2017, at 07:52, Josh <jof...@gmail.com> wrote:
>>>>> Hi Ankur,
>>>>> What do you mean by runner address?
>>>>> Would you be able to link me to the comment you're referring to?
>>>>> I am using the PubsubIO.Read class from Beam 2.0.0 as found here:
>>>>> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/
>>>>> io/google-cloud-platform/src/main/java/org/apache/beam/sdk/i
>>>>> o/gcp/pubsub/PubsubIO.java
>>>>> Thanks,
>>>>> Josh
>>>>> On Wed, May 24, 2017 at 3:36 PM, Ankur Chauhan <an...@malloc64.com>
>>>>> wrote:
>>>>>> What runner address you using. Google cloud dataflow uses a closed
>>>>>> source version of the pubsub reader as noted in a comment on Read class.
>>>>>> Ankur Chauhan
>>>>>> Sent from my iPhone
>>>>>> On May 24, 2017, at 04:05, Josh <jof...@gmail.com> wrote:
>>>>>> Hi all,
>>>>>> I'm using PubsubIO.Read to consume a Pubsub stream, and my job then
>>>>>> writes the data out to Bigtable. I'm currently seeing a latency of 3-5
>>>>>> seconds between the messages being published and being written to 
>>>>>> Bigtable.
>>>>>> I want to try and decrease the latency to <1s if possible - does
>>>>>> anyone have any tips for doing this?
>>>>>> I noticed that there is a PubsubGrpcClient
>>>>>> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/
>>>>>> io/google-cloud-platform/src/main/java/org/apache/beam/sdk/i
>>>>>> o/gcp/pubsub/PubsubGrpcClient.java however the PubsubUnboundedSource
>>>>>> is initialised with a PubsubJsonClient, so the Grpc client doesn't appear
>>>>>> to be being used. Is there a way to switch to the Grpc client - as 
>>>>>> perhaps
>>>>>> that would give better performance?
>>>>>> Also, I am running my job on Dataflow using autoscaling, which has
>>>>>> only allocated one n1-standard-4 instance to the job, which is
>>>>>> running at ~50% CPU. Could forcing a higher number of nodes help improve
>>>>>> latency?
>>>>>> Thanks for any advice,
>>>>>> Josh

Reply via email to