Re: How to decrease latency when using PubsubIO.Read?

2017-05-24 Thread Raghu Angadi
Thanks. Looked a some job system level metrics. I see minimal latency
within Dataflow pipeline itself, you might not see much improvement from
Reshuffle() (may be ~0.25 seconds).

Do you have control on publisher? Publishers might be batching hundreds of
messages, which adds latency. You can try to reduce that. Even then PubSub
itself does some batching internally which might limit overall latency.
Removing publisher batching is worth a try.

On Wed, May 24, 2017 at 11:46 AM, Josh  wrote:

> Hi Raghu,
>
> My job ID is 2017-05-24_02_46_42-11524480684503077480 - thanks for taking
> a look!
>
> Yes I'm using BigtableIO for the sink and I am measuring the end-to-end
> latency. It seems to take 3-6 seconds typically, I would like to get it
> down to ~1s.
>
> Thanks,
> Josh
>
> On Wed, May 24, 2017 at 6:50 PM, Raghu Angadi  wrote:
>
>> Josh,
>>
>> Can you share your job_id? I could take look. Are you measuring latency
>> end-to-end (publisher to when it appears on BT?). Are you using BigtableIO
>> for sink?
>>
>> There is no easy way to use more workers when auto-scaling is enabled. It
>> thinks your backlog and CPU are low enough and does not need to scale.
>> Raghu.
>>
>> On Wed, May 24, 2017 at 10:14 AM, Josh  wrote:
>>
>>> Thanks Ankur, that's super helpful! I will give these optimisations a go.
>>>
>>> About the "No operations completed" message - there are a few of these
>>> in the logs (but very few, like 1 an hour or something) - so probably no
>>> need to scale up Bigtable.
>>> I did however see a lot of INFO messages "Wrote 0 records" in the logs. 
>>> Probably
>>> about 50% of the "Wrote n records" messages are zero. While the other 50%
>>> are quite high (e.g. "Wrote 80 records"). Not sure if that could indicate a
>>> bad setting?
>>>
>>> Josh
>>>
>>>
>>>
>>> On Wed, May 24, 2017 at 5:22 PM, Ankur Chauhan 
>>> wrote:
>>>
 There are two main things to see here:

 * In the logs, are there any messages like "No operations completed
 within the last 61 seconds. There are still 1 simple operations and 1
 complex operations in progress.” This means you are underscaled on the
 bigtable side and would benefit from  increasing the node count.
 * We also saw some improvement in performance (workload dependent) by
 going to a bigger worker machine type.
 * Another optimization that worked for our use case:

 // streaming dataflow has larger machines with smaller bundles, so we can 
 queue up a lot more without blowing up
 private static BigtableOptions 
 createStreamingBTOptions(AnalyticsPipelineOptions opts) {
 return new BigtableOptions.Builder()
 .setProjectId(opts.getProject())
 .setInstanceId(opts.getBigtableInstanceId())
 .setUseCachedDataPool(true)
 .setDataChannelCount(32)
 .setBulkOptions(new BulkOptions.Builder()
 .setUseBulkApi(true)
 .setBulkMaxRowKeyCount(2048)
 .setBulkMaxRequestSize(8_388_608L)
 .setAsyncMutatorWorkerCount(32)
 .build())
 .build();
 }


 There is a lot of trial and error involved in getting the end-to-end
 latency down so I would suggest enabling the profiling using the
 —saveProfilesToGcs option and get a sense of what is exactly happening.

 — Ankur Chauhan

 On May 24, 2017, at 9:09 AM, Josh  wrote:

 Ah ok - I am using the Dataflow runner. I didn't realise about the
 custom implementation being provided at runtime...

 Any ideas of how to tweak my job to either lower the latency consuming
 from PubSub or to lower the latency in writing to Bigtable?


 On Wed, May 24, 2017 at 4:14 PM, Lukasz Cwik  wrote:

> What runner are you using (Flink, Spark, Google Cloud Dataflow, Apex,
> ...)?
>
> On Wed, May 24, 2017 at 8:09 AM, Ankur Chauhan 
> wrote:
>
>> Sorry that was an autocorrect error. I meant to ask - what dataflow
>> runner are you using? If you are using google cloud dataflow then the
>> PubsubIO class is not the one doing the reading from the pubsub topic. 
>> They
>> provide a custom implementation at run time.
>>
>> Ankur Chauhan
>> Sent from my iPhone
>>
>> On May 24, 2017, at 07:52, Josh  wrote:
>>
>> Hi Ankur,
>>
>> What do you mean by runner address?
>> Would you be able to link me to the comment you're referring to?
>>
>> I am using the PubsubIO.Read class from Beam 2.0.0 as found here:
>> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/
>> io/google-cloud-platform/src/main/java/org/apache/beam/sdk/i
>> o/gcp/pubsub/PubsubIO.java
>>
>> Thanks,

Re: How to decrease latency when using PubsubIO.Read?

2017-05-24 Thread Raghu Angadi
On Wed, May 24, 2017 at 10:14 AM, Josh  wrote:

> Thanks Ankur, that's super helpful! I will give these optimisations a go.
>
> About the "No operations completed" message - there are a few of these in
> the logs (but very few, like 1 an hour or something) - so probably no need
> to scale up Bigtable.
> I did however see a lot of INFO messages "Wrote 0 records" in the logs. 
> Probably
> about 50% of the "Wrote n records" messages are zero. While the other 50%
> are quite high (e.g. "Wrote 80 records"). Not sure if that could indicate a
> bad setting?
>

If this is a single stage pipeline (no GroupByKey()), high number could
indicate bundle size in Dataflow. A larger bundle could increase latency.
You can try a work around by adding a reshuffle
.
Something like :
  .apply(...)
  .apply(message -> (random.nextLong(), message)) // add a random key.
  .apply(Reshuffle.of())
  .apply((i, message) -> message) // strip key
  .apply(sink)

This would bring 'n' down to 1 or so. I don't know why there are messages
that say '0' records.

Raghu.


> Josh
>
>
>
> On Wed, May 24, 2017 at 5:22 PM, Ankur Chauhan  wrote:
>
>> There are two main things to see here:
>>
>> * In the logs, are there any messages like "No operations completed
>> within the last 61 seconds. There are still 1 simple operations and 1
>> complex operations in progress.” This means you are underscaled on the
>> bigtable side and would benefit from  increasing the node count.
>> * We also saw some improvement in performance (workload dependent) by
>> going to a bigger worker machine type.
>> * Another optimization that worked for our use case:
>>
>> // streaming dataflow has larger machines with smaller bundles, so we can 
>> queue up a lot more without blowing up
>> private static BigtableOptions 
>> createStreamingBTOptions(AnalyticsPipelineOptions opts) {
>> return new BigtableOptions.Builder()
>> .setProjectId(opts.getProject())
>> .setInstanceId(opts.getBigtableInstanceId())
>> .setUseCachedDataPool(true)
>> .setDataChannelCount(32)
>> .setBulkOptions(new BulkOptions.Builder()
>> .setUseBulkApi(true)
>> .setBulkMaxRowKeyCount(2048)
>> .setBulkMaxRequestSize(8_388_608L)
>> .setAsyncMutatorWorkerCount(32)
>> .build())
>> .build();
>> }
>>
>>
>> There is a lot of trial and error involved in getting the end-to-end
>> latency down so I would suggest enabling the profiling using the
>> —saveProfilesToGcs option and get a sense of what is exactly happening.
>>
>> — Ankur Chauhan
>>
>> On May 24, 2017, at 9:09 AM, Josh  wrote:
>>
>> Ah ok - I am using the Dataflow runner. I didn't realise about the custom
>> implementation being provided at runtime...
>>
>> Any ideas of how to tweak my job to either lower the latency consuming
>> from PubSub or to lower the latency in writing to Bigtable?
>>
>>
>> On Wed, May 24, 2017 at 4:14 PM, Lukasz Cwik  wrote:
>>
>>> What runner are you using (Flink, Spark, Google Cloud Dataflow, Apex,
>>> ...)?
>>>
>>> On Wed, May 24, 2017 at 8:09 AM, Ankur Chauhan 
>>> wrote:
>>>
 Sorry that was an autocorrect error. I meant to ask - what dataflow
 runner are you using? If you are using google cloud dataflow then the
 PubsubIO class is not the one doing the reading from the pubsub topic. They
 provide a custom implementation at run time.

 Ankur Chauhan
 Sent from my iPhone

 On May 24, 2017, at 07:52, Josh  wrote:

 Hi Ankur,

 What do you mean by runner address?
 Would you be able to link me to the comment you're referring to?

 I am using the PubsubIO.Read class from Beam 2.0.0 as found here:
 https://github.com/apache/beam/blob/release-2.0.0/sdks/java/
 io/google-cloud-platform/src/main/java/org/apache/beam/sdk/i
 o/gcp/pubsub/PubsubIO.java

 Thanks,
 Josh

 On Wed, May 24, 2017 at 3:36 PM, Ankur Chauhan 
 wrote:

> What runner address you using. Google cloud dataflow uses a closed
> source version of the pubsub reader as noted in a comment on Read class.
>
> Ankur Chauhan
> Sent from my iPhone
>
> On May 24, 2017, at 04:05, Josh  wrote:
>
> Hi all,
>
> I'm using PubsubIO.Read to consume a Pubsub stream, and my job then
> writes the data out to Bigtable. I'm currently seeing a latency of 3-5
> seconds between the messages being published and being written to 
> Bigtable.
>
> I want to try and decrease the latency to <1s if possible - does
> anyone have any tips for doing this?
>
> I noticed that there is a 

Re: Question regarding on how to run Beam locally

2017-05-24 Thread Claire Yuan
Hi,  I right click on my IntelliJ and try "run" in the main method of tfidf in 
examples, but it says fail in finding main method. Anyone has run the method 
manually?

 

On Tuesday, May 23, 2017 10:56 PM, Jean-Baptiste Onofré  
wrote:
 

 Hi,

Just tried with one of my sample, and it runs fine locally (tried flink, spark 
and direct). Let me try the quickstart tfidf example.

Regards
JB

On 05/23/2017 11:56 PM, Claire Yuan wrote:
> To who may concern,
>    I am trying to run the Apache Beam with Java SDK on my computer. I already 
> cloned the codes from github and was trying to run the example in "TfIdf". 
> When 
> I opened in IntelliJ and tried right click on it to run the main() in TfIdf, 
> it 
> says :
> Error: Could not find or load main class 
> org.apache.beam.examples.complete.TfIdf
> Also when I tried using the command I found in quickstart and run it in the 
> folder containing src codes for examples:
> 
> |mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.TfIdf \ 
> -Dexec.args="--runner=FlinkRunner --inputFile=pom.xml --output=counts" 
> -Pflink-runner|
> 
> It stuck on the terminal. I am wondering if anyone could help in how to run 
> program locally in Ubuntu.
> 
> Sincerely, a user
> 
> 
> 
> 

-- 
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


   

Re: How to decrease latency when using PubsubIO.Read?

2017-05-24 Thread Josh
Hi Raghu,

My job ID is 2017-05-24_02_46_42-11524480684503077480 - thanks for taking a
look!

Yes I'm using BigtableIO for the sink and I am measuring the end-to-end
latency. It seems to take 3-6 seconds typically, I would like to get it
down to ~1s.

Thanks,
Josh

On Wed, May 24, 2017 at 6:50 PM, Raghu Angadi  wrote:

> Josh,
>
> Can you share your job_id? I could take look. Are you measuring latency
> end-to-end (publisher to when it appears on BT?). Are you using BigtableIO
> for sink?
>
> There is no easy way to use more workers when auto-scaling is enabled. It
> thinks your backlog and CPU are low enough and does not need to scale.
> Raghu.
>
> On Wed, May 24, 2017 at 10:14 AM, Josh  wrote:
>
>> Thanks Ankur, that's super helpful! I will give these optimisations a go.
>>
>> About the "No operations completed" message - there are a few of these in
>> the logs (but very few, like 1 an hour or something) - so probably no need
>> to scale up Bigtable.
>> I did however see a lot of INFO messages "Wrote 0 records" in the logs. 
>> Probably
>> about 50% of the "Wrote n records" messages are zero. While the other 50%
>> are quite high (e.g. "Wrote 80 records"). Not sure if that could indicate a
>> bad setting?
>>
>> Josh
>>
>>
>>
>> On Wed, May 24, 2017 at 5:22 PM, Ankur Chauhan 
>> wrote:
>>
>>> There are two main things to see here:
>>>
>>> * In the logs, are there any messages like "No operations completed
>>> within the last 61 seconds. There are still 1 simple operations and 1
>>> complex operations in progress.” This means you are underscaled on the
>>> bigtable side and would benefit from  increasing the node count.
>>> * We also saw some improvement in performance (workload dependent) by
>>> going to a bigger worker machine type.
>>> * Another optimization that worked for our use case:
>>>
>>> // streaming dataflow has larger machines with smaller bundles, so we can 
>>> queue up a lot more without blowing up
>>> private static BigtableOptions 
>>> createStreamingBTOptions(AnalyticsPipelineOptions opts) {
>>> return new BigtableOptions.Builder()
>>> .setProjectId(opts.getProject())
>>> .setInstanceId(opts.getBigtableInstanceId())
>>> .setUseCachedDataPool(true)
>>> .setDataChannelCount(32)
>>> .setBulkOptions(new BulkOptions.Builder()
>>> .setUseBulkApi(true)
>>> .setBulkMaxRowKeyCount(2048)
>>> .setBulkMaxRequestSize(8_388_608L)
>>> .setAsyncMutatorWorkerCount(32)
>>> .build())
>>> .build();
>>> }
>>>
>>>
>>> There is a lot of trial and error involved in getting the end-to-end
>>> latency down so I would suggest enabling the profiling using the
>>> —saveProfilesToGcs option and get a sense of what is exactly happening.
>>>
>>> — Ankur Chauhan
>>>
>>> On May 24, 2017, at 9:09 AM, Josh  wrote:
>>>
>>> Ah ok - I am using the Dataflow runner. I didn't realise about the
>>> custom implementation being provided at runtime...
>>>
>>> Any ideas of how to tweak my job to either lower the latency consuming
>>> from PubSub or to lower the latency in writing to Bigtable?
>>>
>>>
>>> On Wed, May 24, 2017 at 4:14 PM, Lukasz Cwik  wrote:
>>>
 What runner are you using (Flink, Spark, Google Cloud Dataflow, Apex,
 ...)?

 On Wed, May 24, 2017 at 8:09 AM, Ankur Chauhan 
 wrote:

> Sorry that was an autocorrect error. I meant to ask - what dataflow
> runner are you using? If you are using google cloud dataflow then the
> PubsubIO class is not the one doing the reading from the pubsub topic. 
> They
> provide a custom implementation at run time.
>
> Ankur Chauhan
> Sent from my iPhone
>
> On May 24, 2017, at 07:52, Josh  wrote:
>
> Hi Ankur,
>
> What do you mean by runner address?
> Would you be able to link me to the comment you're referring to?
>
> I am using the PubsubIO.Read class from Beam 2.0.0 as found here:
> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/
> io/google-cloud-platform/src/main/java/org/apache/beam/sdk/i
> o/gcp/pubsub/PubsubIO.java
>
> Thanks,
> Josh
>
> On Wed, May 24, 2017 at 3:36 PM, Ankur Chauhan 
> wrote:
>
>> What runner address you using. Google cloud dataflow uses a closed
>> source version of the pubsub reader as noted in a comment on Read class.
>>
>> Ankur Chauhan
>> Sent from my iPhone
>>
>> On May 24, 2017, at 04:05, Josh  wrote:
>>
>> Hi all,
>>
>> I'm using PubsubIO.Read to consume a Pubsub stream, and my job then
>> writes the data out to Bigtable. I'm currently seeing a latency of 3-5
>> seconds between the messages being published and being 

Re: How to partition a stream by key before writing with FileBasedSink?

2017-05-24 Thread Josh
Ahh I see - Ok I'll try out this solution then. Thanks Lukasz!

On Wed, May 24, 2017 at 5:20 PM, Lukasz Cwik  wrote:

> Google Cloud Dataflow won't override your setting. The dynamic sharding
> occurs if you don't explicitly set a numShard value.
>
> On Wed, May 24, 2017 at 9:14 AM, Josh  wrote:
>
>> Hi Lukasz,
>>
>> Thanks for the example. That sounds like a nice solution -
>> I am running on Dataflow though, which dynamically sets numShards - so if
>> I set numShards to 1 on each of those AvroIO writers, I can't be sure that
>> Dataflow isn't going to override my setting right? I guess this should work
>> fine as long as I partition my stream into a large enough number of
>> partitions so that Dataflow won't override numShards.
>>
>> Josh
>>
>>
>> On Wed, May 24, 2017 at 4:10 PM, Lukasz Cwik  wrote:
>>
>>> Since your using a small number of shards, add a Partition transform
>>> which uses a deterministic hash of the key to choose one of 4 partitions.
>>> Write each partition with a single shard.
>>>
>>> (Fixed width diagram below)
>>> Pipeline -> AvroIO(numShards = 4)
>>> Becomes:
>>> Pipeline -> Partition --> AvroIO(numShards = 1)
>>>   |-> AvroIO(numShards = 1)
>>>   |-> AvroIO(numShards = 1)
>>>   \-> AvroIO(numShards = 1)
>>>
>>> On Wed, May 24, 2017 at 1:05 AM, Josh  wrote:
>>>
 Hi,

 I am using a FileBasedSink (AvroIO.write) on an unbounded stream
 (withWindowedWrites, hourly windows, numShards=4).

 I would like to partition the stream by some key in the element, so
 that all elements with the same key will get processed by the same shard
 writer, and therefore written to the same file. Is there a way to do this?
 Note that in my stream the number of keys is very large (most elements have
 a unique key, while a few elements share a key).

 Thanks,
 Josh

>>>
>>>
>>
>


Re: How to decrease latency when using PubsubIO.Read?

2017-05-24 Thread Josh
Thanks Ankur, that's super helpful! I will give these optimisations a go.

About the "No operations completed" message - there are a few of these in
the logs (but very few, like 1 an hour or something) - so probably no need
to scale up Bigtable.
I did however see a lot of INFO messages "Wrote 0 records" in the
logs. Probably
about 50% of the "Wrote n records" messages are zero. While the other 50%
are quite high (e.g. "Wrote 80 records"). Not sure if that could indicate a
bad setting?

Josh



On Wed, May 24, 2017 at 5:22 PM, Ankur Chauhan  wrote:

> There are two main things to see here:
>
> * In the logs, are there any messages like "No operations completed within
> the last 61 seconds. There are still 1 simple operations and 1 complex
> operations in progress.” This means you are underscaled on the bigtable
> side and would benefit from  increasing the node count.
> * We also saw some improvement in performance (workload dependent) by
> going to a bigger worker machine type.
> * Another optimization that worked for our use case:
>
> // streaming dataflow has larger machines with smaller bundles, so we can 
> queue up a lot more without blowing up
> private static BigtableOptions 
> createStreamingBTOptions(AnalyticsPipelineOptions opts) {
> return new BigtableOptions.Builder()
> .setProjectId(opts.getProject())
> .setInstanceId(opts.getBigtableInstanceId())
> .setUseCachedDataPool(true)
> .setDataChannelCount(32)
> .setBulkOptions(new BulkOptions.Builder()
> .setUseBulkApi(true)
> .setBulkMaxRowKeyCount(2048)
> .setBulkMaxRequestSize(8_388_608L)
> .setAsyncMutatorWorkerCount(32)
> .build())
> .build();
> }
>
>
> There is a lot of trial and error involved in getting the end-to-end
> latency down so I would suggest enabling the profiling using the
> —saveProfilesToGcs option and get a sense of what is exactly happening.
>
> — Ankur Chauhan
>
> On May 24, 2017, at 9:09 AM, Josh  wrote:
>
> Ah ok - I am using the Dataflow runner. I didn't realise about the custom
> implementation being provided at runtime...
>
> Any ideas of how to tweak my job to either lower the latency consuming
> from PubSub or to lower the latency in writing to Bigtable?
>
>
> On Wed, May 24, 2017 at 4:14 PM, Lukasz Cwik  wrote:
>
>> What runner are you using (Flink, Spark, Google Cloud Dataflow, Apex,
>> ...)?
>>
>> On Wed, May 24, 2017 at 8:09 AM, Ankur Chauhan 
>> wrote:
>>
>>> Sorry that was an autocorrect error. I meant to ask - what dataflow
>>> runner are you using? If you are using google cloud dataflow then the
>>> PubsubIO class is not the one doing the reading from the pubsub topic. They
>>> provide a custom implementation at run time.
>>>
>>> Ankur Chauhan
>>> Sent from my iPhone
>>>
>>> On May 24, 2017, at 07:52, Josh  wrote:
>>>
>>> Hi Ankur,
>>>
>>> What do you mean by runner address?
>>> Would you be able to link me to the comment you're referring to?
>>>
>>> I am using the PubsubIO.Read class from Beam 2.0.0 as found here:
>>> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/
>>> io/google-cloud-platform/src/main/java/org/apache/beam/sdk/i
>>> o/gcp/pubsub/PubsubIO.java
>>>
>>> Thanks,
>>> Josh
>>>
>>> On Wed, May 24, 2017 at 3:36 PM, Ankur Chauhan 
>>> wrote:
>>>
 What runner address you using. Google cloud dataflow uses a closed
 source version of the pubsub reader as noted in a comment on Read class.

 Ankur Chauhan
 Sent from my iPhone

 On May 24, 2017, at 04:05, Josh  wrote:

 Hi all,

 I'm using PubsubIO.Read to consume a Pubsub stream, and my job then
 writes the data out to Bigtable. I'm currently seeing a latency of 3-5
 seconds between the messages being published and being written to Bigtable.

 I want to try and decrease the latency to <1s if possible - does anyone
 have any tips for doing this?

 I noticed that there is a PubsubGrpcClient
 https://github.com/apache/beam/blob/release-2.0.0/sdks/java/
 io/google-cloud-platform/src/main/java/org/apache/beam/sdk/i
 o/gcp/pubsub/PubsubGrpcClient.java however the PubsubUnboundedSource
 is initialised with a PubsubJsonClient, so the Grpc client doesn't appear
 to be being used. Is there a way to switch to the Grpc client - as perhaps
 that would give better performance?

 Also, I am running my job on Dataflow using autoscaling, which has only
 allocated one n1-standard-4 instance to the job, which is running at
 ~50% CPU. Could forcing a higher number of nodes help improve latency?

 Thanks for any advice,
 Josh


>>>
>>
>
>


Re: Best way to load heavy object into memory on nodes (python sdk)

2017-05-24 Thread Ahmet Altay
You can see an example implementation of Luke's suggestion in the
tensorflow-transform project [1]. Thread local is used in that case, this
will work for runners that re-use the same thread to execute bundles.

[1] 
*https://github.com/tensorflow/transform/blob/master/tensorflow_transform/beam/impl.py#L253
*

On Wed, May 24, 2017 at 8:00 AM, Lukasz Cwik  wrote:

> Why not use a singleton like pattern and have a function which either
> loads and caches the ML model from a side input or returns the singleton if
> it has been loaded.
> You'll want to use some form of locking to ensure that you really only
> load the ML model once.
>
> On Wed, May 24, 2017 at 6:18 AM, Vilhelm von Ehrenheim <
> vonehrenh...@gmail.com> wrote:
>
>> Hi all!
>> I would like to load a heavy object (think ML model) into memory that
>> should be available in a ParDo for quick predictions.
>>
>> What is the preferred way of doing this without loading the model for
>> each ParDo call (slow and will flood memory on the nodes). I don't seem to
>> be able to do it in the DoFn's __init__ block either as this is only done
>> once for all nodes (my guess here though) and then it breaks when
>> replicated internally (even on the DirectRunner, I suspect it is pickled
>> and this object cannot be pickled). If I load it as a side input it seems
>> to still be loaded into memory separately for each ParDo.
>>
>> If there is a better way to handle it in Java I'm happy to do it there
>> instead. It was just easier to attack the problem w python as the models
>> were developed in python.
>>
>> Any sort of pointers or tips are welcome!
>>
>> Thanks!
>> Vilhelm von Ehrenheim
>>
>
>


Re: How to decrease latency when using PubsubIO.Read?

2017-05-24 Thread Ankur Chauhan
There are two main things to see here:

* In the logs, are there any messages like "No operations completed within the 
last 61 seconds. There are still 1 simple operations and 1 complex operations 
in progress.” This means you are underscaled on the bigtable side and would 
benefit from  increasing the node count.
* We also saw some improvement in performance (workload dependent) by going to 
a bigger worker machine type.
* Another optimization that worked for our use case:

// streaming dataflow has larger machines with smaller bundles, so we can queue 
up a lot more without blowing up
private static BigtableOptions 
createStreamingBTOptions(AnalyticsPipelineOptions opts) {
return new BigtableOptions.Builder()
.setProjectId(opts.getProject())
.setInstanceId(opts.getBigtableInstanceId())
.setUseCachedDataPool(true)
.setDataChannelCount(32)
.setBulkOptions(new BulkOptions.Builder()
.setUseBulkApi(true)
.setBulkMaxRowKeyCount(2048)
.setBulkMaxRequestSize(8_388_608L)
.setAsyncMutatorWorkerCount(32)
.build())
.build();
}

There is a lot of trial and error involved in getting the end-to-end latency 
down so I would suggest enabling the profiling using the —saveProfilesToGcs 
option and get a sense of what is exactly happening.

— Ankur Chauhan

> On May 24, 2017, at 9:09 AM, Josh  wrote:
> 
> Ah ok - I am using the Dataflow runner. I didn't realise about the custom 
> implementation being provided at runtime...
> 
> Any ideas of how to tweak my job to either lower the latency consuming from 
> PubSub or to lower the latency in writing to Bigtable?
> 
> 
> On Wed, May 24, 2017 at 4:14 PM, Lukasz Cwik  > wrote:
> What runner are you using (Flink, Spark, Google Cloud Dataflow, Apex, ...)?
> 
> On Wed, May 24, 2017 at 8:09 AM, Ankur Chauhan  > wrote:
> Sorry that was an autocorrect error. I meant to ask - what dataflow runner 
> are you using? If you are using google cloud dataflow then the PubsubIO class 
> is not the one doing the reading from the pubsub topic. They provide a custom 
> implementation at run time.
> 
> Ankur Chauhan 
> Sent from my iPhone
> 
> On May 24, 2017, at 07:52, Josh > 
> wrote:
> 
>> Hi Ankur,
>> 
>> What do you mean by runner address?
>> Would you be able to link me to the comment you're referring to?
>> 
>> I am using the PubsubIO.Read class from Beam 2.0.0 as found here:
>> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
>>  
>> 
>> 
>> Thanks,
>> Josh
>> 
>> On Wed, May 24, 2017 at 3:36 PM, Ankur Chauhan > > wrote:
>> What runner address you using. Google cloud dataflow uses a closed source 
>> version of the pubsub reader as noted in a comment on Read class. 
>> 
>> Ankur Chauhan
>> Sent from my iPhone
>> 
>> On May 24, 2017, at 04:05, Josh > 
>> wrote:
>> 
>>> Hi all,
>>> 
>>> I'm using PubsubIO.Read to consume a Pubsub stream, and my job then writes 
>>> the data out to Bigtable. I'm currently seeing a latency of 3-5 seconds 
>>> between the messages being published and being written to Bigtable.
>>> 
>>> I want to try and decrease the latency to <1s if possible - does anyone 
>>> have any tips for doing this? 
>>> 
>>> I noticed that there is a PubsubGrpcClient 
>>> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
>>>  
>>> 
>>>  however the PubsubUnboundedSource is initialised with a PubsubJsonClient, 
>>> so the Grpc client doesn't appear to be being used. Is there a way to 
>>> switch to the Grpc client - as perhaps that would give better performance?
>>> 
>>> Also, I am running my job on Dataflow using autoscaling, which has only 
>>> allocated one n1-standard-4 instance to the job, which is running at ~50% 
>>> CPU. Could forcing a higher number of nodes help improve latency?
>>> 
>>> Thanks for any advice,
>>> Josh
>> 
> 
> 



Re: How to partition a stream by key before writing with FileBasedSink?

2017-05-24 Thread Lukasz Cwik
Google Cloud Dataflow won't override your setting. The dynamic sharding
occurs if you don't explicitly set a numShard value.

On Wed, May 24, 2017 at 9:14 AM, Josh  wrote:

> Hi Lukasz,
>
> Thanks for the example. That sounds like a nice solution -
> I am running on Dataflow though, which dynamically sets numShards - so if
> I set numShards to 1 on each of those AvroIO writers, I can't be sure that
> Dataflow isn't going to override my setting right? I guess this should work
> fine as long as I partition my stream into a large enough number of
> partitions so that Dataflow won't override numShards.
>
> Josh
>
>
> On Wed, May 24, 2017 at 4:10 PM, Lukasz Cwik  wrote:
>
>> Since your using a small number of shards, add a Partition transform
>> which uses a deterministic hash of the key to choose one of 4 partitions.
>> Write each partition with a single shard.
>>
>> (Fixed width diagram below)
>> Pipeline -> AvroIO(numShards = 4)
>> Becomes:
>> Pipeline -> Partition --> AvroIO(numShards = 1)
>>   |-> AvroIO(numShards = 1)
>>   |-> AvroIO(numShards = 1)
>>   \-> AvroIO(numShards = 1)
>>
>> On Wed, May 24, 2017 at 1:05 AM, Josh  wrote:
>>
>>> Hi,
>>>
>>> I am using a FileBasedSink (AvroIO.write) on an unbounded stream
>>> (withWindowedWrites, hourly windows, numShards=4).
>>>
>>> I would like to partition the stream by some key in the element, so that
>>> all elements with the same key will get processed by the same shard writer,
>>> and therefore written to the same file. Is there a way to do this? Note
>>> that in my stream the number of keys is very large (most elements have a
>>> unique key, while a few elements share a key).
>>>
>>> Thanks,
>>> Josh
>>>
>>
>>
>


Re: How to partition a stream by key before writing with FileBasedSink?

2017-05-24 Thread Josh
Hi Lukasz,

Thanks for the example. That sounds like a nice solution -
I am running on Dataflow though, which dynamically sets numShards - so if I
set numShards to 1 on each of those AvroIO writers, I can't be sure that
Dataflow isn't going to override my setting right? I guess this should work
fine as long as I partition my stream into a large enough number of
partitions so that Dataflow won't override numShards.

Josh

On Wed, May 24, 2017 at 4:10 PM, Lukasz Cwik  wrote:

> Since your using a small number of shards, add a Partition transform which
> uses a deterministic hash of the key to choose one of 4 partitions. Write
> each partition with a single shard.
>
> (Fixed width diagram below)
> Pipeline -> AvroIO(numShards = 4)
> Becomes:
> Pipeline -> Partition --> AvroIO(numShards = 1)
>   |-> AvroIO(numShards = 1)
>   |-> AvroIO(numShards = 1)
>   \-> AvroIO(numShards = 1)
>
> On Wed, May 24, 2017 at 1:05 AM, Josh  wrote:
>
>> Hi,
>>
>> I am using a FileBasedSink (AvroIO.write) on an unbounded stream
>> (withWindowedWrites, hourly windows, numShards=4).
>>
>> I would like to partition the stream by some key in the element, so that
>> all elements with the same key will get processed by the same shard writer,
>> and therefore written to the same file. Is there a way to do this? Note
>> that in my stream the number of keys is very large (most elements have a
>> unique key, while a few elements share a key).
>>
>> Thanks,
>> Josh
>>
>
>


Re: How to decrease latency when using PubsubIO.Read?

2017-05-24 Thread Josh
Ah ok - I am using the Dataflow runner. I didn't realise about the custom
implementation being provided at runtime...

Any ideas of how to tweak my job to either lower the latency consuming from
PubSub or to lower the latency in writing to Bigtable?


On Wed, May 24, 2017 at 4:14 PM, Lukasz Cwik  wrote:

> What runner are you using (Flink, Spark, Google Cloud Dataflow, Apex, ...)?
>
> On Wed, May 24, 2017 at 8:09 AM, Ankur Chauhan  wrote:
>
>> Sorry that was an autocorrect error. I meant to ask - what dataflow
>> runner are you using? If you are using google cloud dataflow then the
>> PubsubIO class is not the one doing the reading from the pubsub topic. They
>> provide a custom implementation at run time.
>>
>> Ankur Chauhan
>> Sent from my iPhone
>>
>> On May 24, 2017, at 07:52, Josh  wrote:
>>
>> Hi Ankur,
>>
>> What do you mean by runner address?
>> Would you be able to link me to the comment you're referring to?
>>
>> I am using the PubsubIO.Read class from Beam 2.0.0 as found here:
>> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/
>> io/google-cloud-platform/src/main/java/org/apache/beam/sdk/
>> io/gcp/pubsub/PubsubIO.java
>>
>> Thanks,
>> Josh
>>
>> On Wed, May 24, 2017 at 3:36 PM, Ankur Chauhan 
>> wrote:
>>
>>> What runner address you using. Google cloud dataflow uses a closed
>>> source version of the pubsub reader as noted in a comment on Read class.
>>>
>>> Ankur Chauhan
>>> Sent from my iPhone
>>>
>>> On May 24, 2017, at 04:05, Josh  wrote:
>>>
>>> Hi all,
>>>
>>> I'm using PubsubIO.Read to consume a Pubsub stream, and my job then
>>> writes the data out to Bigtable. I'm currently seeing a latency of 3-5
>>> seconds between the messages being published and being written to Bigtable.
>>>
>>> I want to try and decrease the latency to <1s if possible - does anyone
>>> have any tips for doing this?
>>>
>>> I noticed that there is a PubsubGrpcClient
>>> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/
>>> io/google-cloud-platform/src/main/java/org/apache/beam/sdk/i
>>> o/gcp/pubsub/PubsubGrpcClient.java however the PubsubUnboundedSource is
>>> initialised with a PubsubJsonClient, so the Grpc client doesn't appear to
>>> be being used. Is there a way to switch to the Grpc client - as perhaps
>>> that would give better performance?
>>>
>>> Also, I am running my job on Dataflow using autoscaling, which has only
>>> allocated one n1-standard-4 instance to the job, which is running at
>>> ~50% CPU. Could forcing a higher number of nodes help improve latency?
>>>
>>> Thanks for any advice,
>>> Josh
>>>
>>>
>>
>


Re: How to decrease latency when using PubsubIO.Read?

2017-05-24 Thread Lukasz Cwik
What runner are you using (Flink, Spark, Google Cloud Dataflow, Apex, ...)?

On Wed, May 24, 2017 at 8:09 AM, Ankur Chauhan  wrote:

> Sorry that was an autocorrect error. I meant to ask - what dataflow runner
> are you using? If you are using google cloud dataflow then the PubsubIO
> class is not the one doing the reading from the pubsub topic. They provide
> a custom implementation at run time.
>
> Ankur Chauhan
> Sent from my iPhone
>
> On May 24, 2017, at 07:52, Josh  wrote:
>
> Hi Ankur,
>
> What do you mean by runner address?
> Would you be able to link me to the comment you're referring to?
>
> I am using the PubsubIO.Read class from Beam 2.0.0 as found here:
> https://github.com/apache/beam/blob/release-2.0.0/sdks/
> java/io/google-cloud-platform/src/main/java/org/apache/beam/
> sdk/io/gcp/pubsub/PubsubIO.java
>
> Thanks,
> Josh
>
> On Wed, May 24, 2017 at 3:36 PM, Ankur Chauhan  wrote:
>
>> What runner address you using. Google cloud dataflow uses a closed source
>> version of the pubsub reader as noted in a comment on Read class.
>>
>> Ankur Chauhan
>> Sent from my iPhone
>>
>> On May 24, 2017, at 04:05, Josh  wrote:
>>
>> Hi all,
>>
>> I'm using PubsubIO.Read to consume a Pubsub stream, and my job then
>> writes the data out to Bigtable. I'm currently seeing a latency of 3-5
>> seconds between the messages being published and being written to Bigtable.
>>
>> I want to try and decrease the latency to <1s if possible - does anyone
>> have any tips for doing this?
>>
>> I noticed that there is a PubsubGrpcClient https://github.com/apache/beam
>> /blob/release-2.0.0/sdks/java/io/google-cloud-platform/src/m
>> ain/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java however
>> the PubsubUnboundedSource is initialised with a PubsubJsonClient, so the
>> Grpc client doesn't appear to be being used. Is there a way to switch to
>> the Grpc client - as perhaps that would give better performance?
>>
>> Also, I am running my job on Dataflow using autoscaling, which has only
>> allocated one n1-standard-4 instance to the job, which is running at
>> ~50% CPU. Could forcing a higher number of nodes help improve latency?
>>
>> Thanks for any advice,
>> Josh
>>
>>
>


Re: How to partition a stream by key before writing with FileBasedSink?

2017-05-24 Thread Lukasz Cwik
Since your using a small number of shards, add a Partition transform which
uses a deterministic hash of the key to choose one of 4 partitions. Write
each partition with a single shard.

(Fixed width diagram below)
Pipeline -> AvroIO(numShards = 4)
Becomes:
Pipeline -> Partition --> AvroIO(numShards = 1)
  |-> AvroIO(numShards = 1)
  |-> AvroIO(numShards = 1)
  \-> AvroIO(numShards = 1)

On Wed, May 24, 2017 at 1:05 AM, Josh  wrote:

> Hi,
>
> I am using a FileBasedSink (AvroIO.write) on an unbounded stream
> (withWindowedWrites, hourly windows, numShards=4).
>
> I would like to partition the stream by some key in the element, so that
> all elements with the same key will get processed by the same shard writer,
> and therefore written to the same file. Is there a way to do this? Note
> that in my stream the number of keys is very large (most elements have a
> unique key, while a few elements share a key).
>
> Thanks,
> Josh
>


Re: Best way to load heavy object into memory on nodes (python sdk)

2017-05-24 Thread Lukasz Cwik
Why not use a singleton like pattern and have a function which either loads
and caches the ML model from a side input or returns the singleton if it
has been loaded.
You'll want to use some form of locking to ensure that you really only load
the ML model once.

On Wed, May 24, 2017 at 6:18 AM, Vilhelm von Ehrenheim <
vonehrenh...@gmail.com> wrote:

> Hi all!
> I would like to load a heavy object (think ML model) into memory that
> should be available in a ParDo for quick predictions.
>
> What is the preferred way of doing this without loading the model for each
> ParDo call (slow and will flood memory on the nodes). I don't seem to be
> able to do it in the DoFn's __init__ block either as this is only done once
> for all nodes (my guess here though) and then it breaks when replicated
> internally (even on the DirectRunner, I suspect it is pickled and this
> object cannot be pickled). If I load it as a side input it seems to still
> be loaded into memory separately for each ParDo.
>
> If there is a better way to handle it in Java I'm happy to do it there
> instead. It was just easier to attack the problem w python as the models
> were developed in python.
>
> Any sort of pointers or tips are welcome!
>
> Thanks!
> Vilhelm von Ehrenheim
>


Re: How to decrease latency when using PubsubIO.Read?

2017-05-24 Thread Josh
Hi Ankur,

What do you mean by runner address?
Would you be able to link me to the comment you're referring to?

I am using the PubsubIO.Read class from Beam 2.0.0 as found here:
https://github.com/apache/beam/blob/release-2.0.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java

Thanks,
Josh

On Wed, May 24, 2017 at 3:36 PM, Ankur Chauhan  wrote:

> What runner address you using. Google cloud dataflow uses a closed source
> version of the pubsub reader as noted in a comment on Read class.
>
> Ankur Chauhan
> Sent from my iPhone
>
> On May 24, 2017, at 04:05, Josh  wrote:
>
> Hi all,
>
> I'm using PubsubIO.Read to consume a Pubsub stream, and my job then writes
> the data out to Bigtable. I'm currently seeing a latency of 3-5 seconds
> between the messages being published and being written to Bigtable.
>
> I want to try and decrease the latency to <1s if possible - does anyone
> have any tips for doing this?
>
> I noticed that there is a PubsubGrpcClient https://github.com/apache/beam
> /blob/release-2.0.0/sdks/java/io/google-cloud-platform/src/
> main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java however
> the PubsubUnboundedSource is initialised with a PubsubJsonClient, so the
> Grpc client doesn't appear to be being used. Is there a way to switch to
> the Grpc client - as perhaps that would give better performance?
>
> Also, I am running my job on Dataflow using autoscaling, which has only
> allocated one n1-standard-4 instance to the job, which is running at ~50%
> CPU. Could forcing a higher number of nodes help improve latency?
>
> Thanks for any advice,
> Josh
>
>


Re: How to decrease latency when using PubsubIO.Read?

2017-05-24 Thread Ankur Chauhan
What runner address you using. Google cloud dataflow uses a closed source 
version of the pubsub reader as noted in a comment on Read class. 

Ankur Chauhan
Sent from my iPhone

> On May 24, 2017, at 04:05, Josh  wrote:
> 
> Hi all,
> 
> I'm using PubsubIO.Read to consume a Pubsub stream, and my job then writes 
> the data out to Bigtable. I'm currently seeing a latency of 3-5 seconds 
> between the messages being published and being written to Bigtable.
> 
> I want to try and decrease the latency to <1s if possible - does anyone have 
> any tips for doing this? 
> 
> I noticed that there is a PubsubGrpcClient 
> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
>  however the PubsubUnboundedSource is initialised with a PubsubJsonClient, so 
> the Grpc client doesn't appear to be being used. Is there a way to switch to 
> the Grpc client - as perhaps that would give better performance?
> 
> Also, I am running my job on Dataflow using autoscaling, which has only 
> allocated one n1-standard-4 instance to the job, which is running at ~50% 
> CPU. Could forcing a higher number of nodes help improve latency?
> 
> Thanks for any advice,
> Josh


Re: Sample Data

2017-05-24 Thread Prabeesh K.
Hi Vilhelm,

This is what exactly what I am looking for. Thank you so much.

Regards,
Prabeesh K.

On 24 May 2017 at 17:26, Vilhelm von Ehrenheim 
wrote:

> Hi!
> Have you tried beam.combiners.Sample.FixedSizeGlobally(n)? There is also
> a Sample function per Key.
> They are defined here: https://github.com/apache/
> beam/blob/v2.0.0/sdks/python/apache_beam/transforms/combiners.py#L368
>
> Hope it helps. If not could you explain in more detail what you mean?
>
> Br,
> Vilhelm von Ehrenheim
> ​
>
> On Wed, May 24, 2017 at 8:37 AM, Prabeesh K.  wrote:
>
>>
>> How to we can take sample data in Python?
>>
>
>


Re: Sample Data

2017-05-24 Thread Vilhelm von Ehrenheim
Hi!
Have you tried beam.combiners.Sample.FixedSizeGlobally(n)? There is also a
Sample function per Key.
They are defined here:
https://github.com/apache/beam/blob/v2.0.0/sdks/python/apache_beam/transforms/combiners.py#L368

Hope it helps. If not could you explain in more detail what you mean?

Br,
Vilhelm von Ehrenheim
​

On Wed, May 24, 2017 at 8:37 AM, Prabeesh K.  wrote:

>
> How to we can take sample data in Python?
>


How to decrease latency when using PubsubIO.Read?

2017-05-24 Thread Josh
Hi all,

I'm using PubsubIO.Read to consume a Pubsub stream, and my job then writes
the data out to Bigtable. I'm currently seeing a latency of 3-5 seconds
between the messages being published and being written to Bigtable.

I want to try and decrease the latency to <1s if possible - does anyone
have any tips for doing this?

I noticed that there is a PubsubGrpcClient https://github.com/apache/
beam/blob/release-2.0.0/sdks/java/io/google-cloud-platform/
src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
however the PubsubUnboundedSource is initialised with a PubsubJsonClient,
so the Grpc client doesn't appear to be being used. Is there a way to
switch to the Grpc client - as perhaps that would give better performance?

Also, I am running my job on Dataflow using autoscaling, which has only
allocated one n1-standard-4 instance to the job, which is running at ~50%
CPU. Could forcing a higher number of nodes help improve latency?

Thanks for any advice,
Josh


How to partition a stream by key before writing with FileBasedSink?

2017-05-24 Thread Josh
Hi,

I am using a FileBasedSink (AvroIO.write) on an unbounded stream
(withWindowedWrites, hourly windows, numShards=4).

I would like to partition the stream by some key in the element, so that
all elements with the same key will get processed by the same shard writer,
and therefore written to the same file. Is there a way to do this? Note
that in my stream the number of keys is very large (most elements have a
unique key, while a few elements share a key).

Thanks,
Josh


Sample Data

2017-05-24 Thread Prabeesh K.
How to we can take sample data in Python?