Thanks. Looked a some job system level metrics. I see minimal latency
within Dataflow pipeline itself, you might not see much improvement from
Reshuffle() (may be ~0.25 seconds).

Do you have control on publisher? Publishers might be batching hundreds of
messages, which adds latency. You can try to reduce that. Even then PubSub
itself does some batching internally which might limit overall latency.
Removing publisher batching is worth a try.

On Wed, May 24, 2017 at 11:46 AM, Josh <[email protected]> wrote:

> Hi Raghu,
>
> My job ID is 2017-05-24_02_46_42-11524480684503077480 - thanks for taking
> a look!
>
> Yes I'm using BigtableIO for the sink and I am measuring the end-to-end
> latency. It seems to take 3-6 seconds typically, I would like to get it
> down to ~1s.
>
> Thanks,
> Josh
>
> On Wed, May 24, 2017 at 6:50 PM, Raghu Angadi <[email protected]> wrote:
>
>> Josh,
>>
>> Can you share your job_id? I could take look. Are you measuring latency
>> end-to-end (publisher to when it appears on BT?). Are you using BigtableIO
>> for sink?
>>
>> There is no easy way to use more workers when auto-scaling is enabled. It
>> thinks your backlog and CPU are low enough and does not need to scale.
>> Raghu.
>>
>> On Wed, May 24, 2017 at 10:14 AM, Josh <[email protected]> wrote:
>>
>>> Thanks Ankur, that's super helpful! I will give these optimisations a go.
>>>
>>> About the "No operations completed" message - there are a few of these
>>> in the logs (but very few, like 1 an hour or something) - so probably no
>>> need to scale up Bigtable.
>>> I did however see a lot of INFO messages "Wrote 0 records" in the logs. 
>>> Probably
>>> about 50% of the "Wrote n records" messages are zero. While the other 50%
>>> are quite high (e.g. "Wrote 80 records"). Not sure if that could indicate a
>>> bad setting?
>>>
>>> Josh
>>>
>>>
>>>
>>> On Wed, May 24, 2017 at 5:22 PM, Ankur Chauhan <[email protected]>
>>> wrote:
>>>
>>>> There are two main things to see here:
>>>>
>>>> * In the logs, are there any messages like "No operations completed
>>>> within the last 61 seconds. There are still 1 simple operations and 1
>>>> complex operations in progress.” This means you are underscaled on the
>>>> bigtable side and would benefit from  increasing the node count.
>>>> * We also saw some improvement in performance (workload dependent) by
>>>> going to a bigger worker machine type.
>>>> * Another optimization that worked for our use case:
>>>>
>>>> // streaming dataflow has larger machines with smaller bundles, so we can 
>>>> queue up a lot more without blowing up
>>>> private static BigtableOptions 
>>>> createStreamingBTOptions(AnalyticsPipelineOptions opts) {
>>>>     return new BigtableOptions.Builder()
>>>>             .setProjectId(opts.getProject())
>>>>             .setInstanceId(opts.getBigtableInstanceId())
>>>>             .setUseCachedDataPool(true)
>>>>             .setDataChannelCount(32)
>>>>             .setBulkOptions(new BulkOptions.Builder()
>>>>                     .setUseBulkApi(true)
>>>>                     .setBulkMaxRowKeyCount(2048)
>>>>                     .setBulkMaxRequestSize(8_388_608L)
>>>>                     .setAsyncMutatorWorkerCount(32)
>>>>                     .build())
>>>>             .build();
>>>> }
>>>>
>>>>
>>>> There is a lot of trial and error involved in getting the end-to-end
>>>> latency down so I would suggest enabling the profiling using the
>>>> —saveProfilesToGcs option and get a sense of what is exactly happening.
>>>>
>>>> — Ankur Chauhan
>>>>
>>>> On May 24, 2017, at 9:09 AM, Josh <[email protected]> wrote:
>>>>
>>>> Ah ok - I am using the Dataflow runner. I didn't realise about the
>>>> custom implementation being provided at runtime...
>>>>
>>>> Any ideas of how to tweak my job to either lower the latency consuming
>>>> from PubSub or to lower the latency in writing to Bigtable?
>>>>
>>>>
>>>> On Wed, May 24, 2017 at 4:14 PM, Lukasz Cwik <[email protected]> wrote:
>>>>
>>>>> What runner are you using (Flink, Spark, Google Cloud Dataflow, Apex,
>>>>> ...)?
>>>>>
>>>>> On Wed, May 24, 2017 at 8:09 AM, Ankur Chauhan <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Sorry that was an autocorrect error. I meant to ask - what dataflow
>>>>>> runner are you using? If you are using google cloud dataflow then the
>>>>>> PubsubIO class is not the one doing the reading from the pubsub topic. 
>>>>>> They
>>>>>> provide a custom implementation at run time.
>>>>>>
>>>>>> Ankur Chauhan
>>>>>> Sent from my iPhone
>>>>>>
>>>>>> On May 24, 2017, at 07:52, Josh <[email protected]> wrote:
>>>>>>
>>>>>> Hi Ankur,
>>>>>>
>>>>>> What do you mean by runner address?
>>>>>> Would you be able to link me to the comment you're referring to?
>>>>>>
>>>>>> I am using the PubsubIO.Read class from Beam 2.0.0 as found here:
>>>>>> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/
>>>>>> io/google-cloud-platform/src/main/java/org/apache/beam/sdk/i
>>>>>> o/gcp/pubsub/PubsubIO.java
>>>>>>
>>>>>> Thanks,
>>>>>> Josh
>>>>>>
>>>>>> On Wed, May 24, 2017 at 3:36 PM, Ankur Chauhan <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> What runner address you using. Google cloud dataflow uses a closed
>>>>>>> source version of the pubsub reader as noted in a comment on Read class.
>>>>>>>
>>>>>>> Ankur Chauhan
>>>>>>> Sent from my iPhone
>>>>>>>
>>>>>>> On May 24, 2017, at 04:05, Josh <[email protected]> wrote:
>>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> I'm using PubsubIO.Read to consume a Pubsub stream, and my job then
>>>>>>> writes the data out to Bigtable. I'm currently seeing a latency of 3-5
>>>>>>> seconds between the messages being published and being written to 
>>>>>>> Bigtable.
>>>>>>>
>>>>>>> I want to try and decrease the latency to <1s if possible - does
>>>>>>> anyone have any tips for doing this?
>>>>>>>
>>>>>>> I noticed that there is a PubsubGrpcClient
>>>>>>> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/
>>>>>>> io/google-cloud-platform/src/main/java/org/apache/beam/sdk/i
>>>>>>> o/gcp/pubsub/PubsubGrpcClient.java however the
>>>>>>> PubsubUnboundedSource is initialised with a PubsubJsonClient, so the 
>>>>>>> Grpc
>>>>>>> client doesn't appear to be being used. Is there a way to switch to the
>>>>>>> Grpc client - as perhaps that would give better performance?
>>>>>>>
>>>>>>> Also, I am running my job on Dataflow using autoscaling, which has
>>>>>>> only allocated one n1-standard-4 instance to the job, which is
>>>>>>> running at ~50% CPU. Could forcing a higher number of nodes help improve
>>>>>>> latency?
>>>>>>>
>>>>>>> Thanks for any advice,
>>>>>>> Josh
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to