On Wed, May 24, 2017 at 10:14 AM, Josh <jof...@gmail.com> wrote:

> Thanks Ankur, that's super helpful! I will give these optimisations a go.
>
> About the "No operations completed" message - there are a few of these in
> the logs (but very few, like 1 an hour or something) - so probably no need
> to scale up Bigtable.
> I did however see a lot of INFO messages "Wrote 0 records" in the logs. 
> Probably
> about 50% of the "Wrote n records" messages are zero. While the other 50%
> are quite high (e.g. "Wrote 80 records"). Not sure if that could indicate a
> bad setting?
>

If this is a single stage pipeline (no GroupByKey()), high number could
indicate bundle size in Dataflow. A larger bundle could increase latency.
You can try a work around by adding a reshuffle
<https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L49>.
Something like :
  .apply(...)
  .apply(message -> (random.nextLong(), message)) // add a random key.
  .apply(Reshuffle.of())
  .apply((i, message) -> message) // strip key
  .apply(sink)

This would bring 'n' down to 1 or so. I don't know why there are messages
that say '0' records.

Raghu.


> Josh
>
>
>
> On Wed, May 24, 2017 at 5:22 PM, Ankur Chauhan <an...@malloc64.com> wrote:
>
>> There are two main things to see here:
>>
>> * In the logs, are there any messages like "No operations completed
>> within the last 61 seconds. There are still 1 simple operations and 1
>> complex operations in progress.” This means you are underscaled on the
>> bigtable side and would benefit from  increasing the node count.
>> * We also saw some improvement in performance (workload dependent) by
>> going to a bigger worker machine type.
>> * Another optimization that worked for our use case:
>>
>> // streaming dataflow has larger machines with smaller bundles, so we can 
>> queue up a lot more without blowing up
>> private static BigtableOptions 
>> createStreamingBTOptions(AnalyticsPipelineOptions opts) {
>>     return new BigtableOptions.Builder()
>>             .setProjectId(opts.getProject())
>>             .setInstanceId(opts.getBigtableInstanceId())
>>             .setUseCachedDataPool(true)
>>             .setDataChannelCount(32)
>>             .setBulkOptions(new BulkOptions.Builder()
>>                     .setUseBulkApi(true)
>>                     .setBulkMaxRowKeyCount(2048)
>>                     .setBulkMaxRequestSize(8_388_608L)
>>                     .setAsyncMutatorWorkerCount(32)
>>                     .build())
>>             .build();
>> }
>>
>>
>> There is a lot of trial and error involved in getting the end-to-end
>> latency down so I would suggest enabling the profiling using the
>> —saveProfilesToGcs option and get a sense of what is exactly happening.
>>
>> — Ankur Chauhan
>>
>> On May 24, 2017, at 9:09 AM, Josh <jof...@gmail.com> wrote:
>>
>> Ah ok - I am using the Dataflow runner. I didn't realise about the custom
>> implementation being provided at runtime...
>>
>> Any ideas of how to tweak my job to either lower the latency consuming
>> from PubSub or to lower the latency in writing to Bigtable?
>>
>>
>> On Wed, May 24, 2017 at 4:14 PM, Lukasz Cwik <lc...@google.com> wrote:
>>
>>> What runner are you using (Flink, Spark, Google Cloud Dataflow, Apex,
>>> ...)?
>>>
>>> On Wed, May 24, 2017 at 8:09 AM, Ankur Chauhan <an...@malloc64.com>
>>> wrote:
>>>
>>>> Sorry that was an autocorrect error. I meant to ask - what dataflow
>>>> runner are you using? If you are using google cloud dataflow then the
>>>> PubsubIO class is not the one doing the reading from the pubsub topic. They
>>>> provide a custom implementation at run time.
>>>>
>>>> Ankur Chauhan
>>>> Sent from my iPhone
>>>>
>>>> On May 24, 2017, at 07:52, Josh <jof...@gmail.com> wrote:
>>>>
>>>> Hi Ankur,
>>>>
>>>> What do you mean by runner address?
>>>> Would you be able to link me to the comment you're referring to?
>>>>
>>>> I am using the PubsubIO.Read class from Beam 2.0.0 as found here:
>>>> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/
>>>> io/google-cloud-platform/src/main/java/org/apache/beam/sdk/i
>>>> o/gcp/pubsub/PubsubIO.java
>>>>
>>>> Thanks,
>>>> Josh
>>>>
>>>> On Wed, May 24, 2017 at 3:36 PM, Ankur Chauhan <an...@malloc64.com>
>>>> wrote:
>>>>
>>>>> What runner address you using. Google cloud dataflow uses a closed
>>>>> source version of the pubsub reader as noted in a comment on Read class.
>>>>>
>>>>> Ankur Chauhan
>>>>> Sent from my iPhone
>>>>>
>>>>> On May 24, 2017, at 04:05, Josh <jof...@gmail.com> wrote:
>>>>>
>>>>> Hi all,
>>>>>
>>>>> I'm using PubsubIO.Read to consume a Pubsub stream, and my job then
>>>>> writes the data out to Bigtable. I'm currently seeing a latency of 3-5
>>>>> seconds between the messages being published and being written to 
>>>>> Bigtable.
>>>>>
>>>>> I want to try and decrease the latency to <1s if possible - does
>>>>> anyone have any tips for doing this?
>>>>>
>>>>> I noticed that there is a PubsubGrpcClient
>>>>> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/
>>>>> io/google-cloud-platform/src/main/java/org/apache/beam/sdk/i
>>>>> o/gcp/pubsub/PubsubGrpcClient.java however the PubsubUnboundedSource
>>>>> is initialised with a PubsubJsonClient, so the Grpc client doesn't appear
>>>>> to be being used. Is there a way to switch to the Grpc client - as perhaps
>>>>> that would give better performance?
>>>>>
>>>>> Also, I am running my job on Dataflow using autoscaling, which has
>>>>> only allocated one n1-standard-4 instance to the job, which is
>>>>> running at ~50% CPU. Could forcing a higher number of nodes help improve
>>>>> latency?
>>>>>
>>>>> Thanks for any advice,
>>>>> Josh
>>>>>
>>>>>
>>>>
>>>
>>
>>
>

Reply via email to