Hi Raghu,

Thanks for the suggestions and for having a look at the metrics!
I will try the reshuffle and tweaking the publisher batching over the next 
couple of days, along with Ankur's suggestions, and will report back if any of 
these have a significant difference.

Best regards,
Josh

> On 24 May 2017, at 21:17, Raghu Angadi <[email protected]> wrote:
> 
> Josh,
> 
> Reshuffle might also be worth trying. To clarify, ~1s end-to-end is not 
> always simple given number of systems and services involved between publisher 
> and eventual sink.
> 
> Raghu.
> 
>> On Wed, May 24, 2017 at 12:32 PM, Raghu Angadi <[email protected]> wrote:
>> Thanks. Looked a some job system level metrics. I see minimal latency within 
>> Dataflow pipeline itself, you might not see much improvement from 
>> Reshuffle() (may be ~0.25 seconds).
>> 
>> Do you have control on publisher? Publishers might be batching hundreds of 
>> messages, which adds latency. You can try to reduce that. Even then PubSub 
>> itself does some batching internally which might limit overall latency. 
>> Removing publisher batching is worth a try. 
>> 
>> 
>>> On Wed, May 24, 2017 at 11:46 AM, Josh <[email protected]> wrote:
>>> Hi Raghu,
>>> 
>>> My job ID is 2017-05-24_02_46_42-11524480684503077480 - thanks for taking a 
>>> look!
>>> 
>>> Yes I'm using BigtableIO for the sink and I am measuring the end-to-end 
>>> latency. It seems to take 3-6 seconds typically, I would like to get it 
>>> down to ~1s.
>>> 
>>> Thanks,
>>> Josh
>>> 
>>>> On Wed, May 24, 2017 at 6:50 PM, Raghu Angadi <[email protected]> wrote:
>>>> Josh,
>>>> 
>>>> Can you share your job_id? I could take look. Are you measuring latency 
>>>> end-to-end (publisher to when it appears on BT?). Are you using BigtableIO 
>>>> for sink?
>>>> 
>>>> There is no easy way to use more workers when auto-scaling is enabled. It 
>>>> thinks your backlog and CPU are low enough and does not need to scale.
>>>> Raghu.
>>>> 
>>>>> On Wed, May 24, 2017 at 10:14 AM, Josh <[email protected]> wrote:
>>>>> Thanks Ankur, that's super helpful! I will give these optimisations a go.
>>>>> 
>>>>> About the "No operations completed" message - there are a few of these in 
>>>>> the logs (but very few, like 1 an hour or something) - so probably no 
>>>>> need to scale up Bigtable.
>>>>> I did however see a lot of INFO messages "Wrote 0 records" in the logs. 
>>>>> Probably about 50% of the "Wrote n records" messages are zero. While the 
>>>>> other 50% are quite high (e.g. "Wrote 80 records"). Not sure if that 
>>>>> could indicate a bad setting?
>>>>> 
>>>>> Josh
>>>>> 
>>>>> 
>>>>> 
>>>>>> On Wed, May 24, 2017 at 5:22 PM, Ankur Chauhan <[email protected]> 
>>>>>> wrote:
>>>>>> There are two main things to see here:
>>>>>> 
>>>>>> * In the logs, are there any messages like "No operations completed 
>>>>>> within the last 61 seconds. There are still 1 simple operations and 1 
>>>>>> complex operations in progress.” This means you are underscaled on the 
>>>>>> bigtable side and would benefit from  increasing the node count.
>>>>>> * We also saw some improvement in performance (workload dependent) by 
>>>>>> going to a bigger worker machine type.
>>>>>> * Another optimization that worked for our use case:
>>>>>> 
>>>>>> // streaming dataflow has larger machines with smaller bundles, so we 
>>>>>> can queue up a lot more without blowing up
>>>>>> private static BigtableOptions 
>>>>>> createStreamingBTOptions(AnalyticsPipelineOptions opts) {
>>>>>>     return new BigtableOptions.Builder()
>>>>>>             .setProjectId(opts.getProject())
>>>>>>             .setInstanceId(opts.getBigtableInstanceId())
>>>>>>             .setUseCachedDataPool(true)
>>>>>>             .setDataChannelCount(32)
>>>>>>             .setBulkOptions(new BulkOptions.Builder()
>>>>>>                     .setUseBulkApi(true)
>>>>>>                     .setBulkMaxRowKeyCount(2048)
>>>>>>                     .setBulkMaxRequestSize(8_388_608L)
>>>>>>                     .setAsyncMutatorWorkerCount(32)
>>>>>>                     .build())
>>>>>>             .build();
>>>>>> }
>>>>>> 
>>>>>> There is a lot of trial and error involved in getting the end-to-end 
>>>>>> latency down so I would suggest enabling the profiling using the 
>>>>>> —saveProfilesToGcs option and get a sense of what is exactly happening.
>>>>>> 
>>>>>> — Ankur Chauhan
>>>>>> 
>>>>>>> On May 24, 2017, at 9:09 AM, Josh <[email protected]> wrote:
>>>>>>> 
>>>>>>> Ah ok - I am using the Dataflow runner. I didn't realise about the 
>>>>>>> custom implementation being provided at runtime...
>>>>>>> 
>>>>>>> Any ideas of how to tweak my job to either lower the latency consuming 
>>>>>>> from PubSub or to lower the latency in writing to Bigtable?
>>>>>>> 
>>>>>>> 
>>>>>>>> On Wed, May 24, 2017 at 4:14 PM, Lukasz Cwik <[email protected]> wrote:
>>>>>>>> What runner are you using (Flink, Spark, Google Cloud Dataflow, Apex, 
>>>>>>>> ...)?
>>>>>>>> 
>>>>>>>>> On Wed, May 24, 2017 at 8:09 AM, Ankur Chauhan <[email protected]> 
>>>>>>>>> wrote:
>>>>>>>>> Sorry that was an autocorrect error. I meant to ask - what dataflow 
>>>>>>>>> runner are you using? If you are using google cloud dataflow then the 
>>>>>>>>> PubsubIO class is not the one doing the reading from the pubsub 
>>>>>>>>> topic. They provide a custom implementation at run time.
>>>>>>>>> 
>>>>>>>>> Ankur Chauhan 
>>>>>>>>> Sent from my iPhone
>>>>>>>>> 
>>>>>>>>>> On May 24, 2017, at 07:52, Josh <[email protected]> wrote:
>>>>>>>>>> 
>>>>>>>>>> Hi Ankur,
>>>>>>>>>> 
>>>>>>>>>> What do you mean by runner address?
>>>>>>>>>> Would you be able to link me to the comment you're referring to?
>>>>>>>>>> 
>>>>>>>>>> I am using the PubsubIO.Read class from Beam 2.0.0 as found here:
>>>>>>>>>> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
>>>>>>>>>> 
>>>>>>>>>> Thanks,
>>>>>>>>>> Josh
>>>>>>>>>> 
>>>>>>>>>>> On Wed, May 24, 2017 at 3:36 PM, Ankur Chauhan <[email protected]> 
>>>>>>>>>>> wrote:
>>>>>>>>>>> What runner address you using. Google cloud dataflow uses a closed 
>>>>>>>>>>> source version of the pubsub reader as noted in a comment on Read 
>>>>>>>>>>> class. 
>>>>>>>>>>> 
>>>>>>>>>>> Ankur Chauhan
>>>>>>>>>>> Sent from my iPhone
>>>>>>>>>>> 
>>>>>>>>>>>> On May 24, 2017, at 04:05, Josh <[email protected]> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>> Hi all,
>>>>>>>>>>>> 
>>>>>>>>>>>> I'm using PubsubIO.Read to consume a Pubsub stream, and my job 
>>>>>>>>>>>> then writes the data out to Bigtable. I'm currently seeing a 
>>>>>>>>>>>> latency of 3-5 seconds between the messages being published and 
>>>>>>>>>>>> being written to Bigtable.
>>>>>>>>>>>> 
>>>>>>>>>>>> I want to try and decrease the latency to <1s if possible - does 
>>>>>>>>>>>> anyone have any tips for doing this? 
>>>>>>>>>>>> 
>>>>>>>>>>>> I noticed that there is a PubsubGrpcClient 
>>>>>>>>>>>> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
>>>>>>>>>>>>  however the PubsubUnboundedSource is initialised with a 
>>>>>>>>>>>> PubsubJsonClient, so the Grpc client doesn't appear to be being 
>>>>>>>>>>>> used. Is there a way to switch to the Grpc client - as perhaps 
>>>>>>>>>>>> that would give better performance?
>>>>>>>>>>>> 
>>>>>>>>>>>> Also, I am running my job on Dataflow using autoscaling, which has 
>>>>>>>>>>>> only allocated one n1-standard-4 instance to the job, which is 
>>>>>>>>>>>> running at ~50% CPU. Could forcing a higher number of nodes help 
>>>>>>>>>>>> improve latency?
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks for any advice,
>>>>>>>>>>>> Josh
> 

Reply via email to