Re: Problem when trying to specify the runner in examples

2017-05-24 Thread Jean-Baptiste Onofré

Hi,

you have to provide the profile: -Pflink-runner.

Regards
JB

On 05/24/2017 11:35 PM, Claire Yuan wrote:

Hi all,
   When I tried specify the runner in command for the examples:
/mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount 
-Dexec.args="--output=wordcount.txt --runner=FlinkRunner" /


   The building was failed and got error message as:
/Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.5.0:java 
(default-cli) on project beam-examples-java: An exception occured while 
executing the Java class. null: InvocationTargetException: Unknown 'runner' 
specified 'FlinkRunner', supported pipeline runners [DirectRunner] -> [Help 1]/

/
/
//I am wondering if anyone got the same issue and how you solved it.


--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Re: question about running examples in command

2017-05-24 Thread Jean-Baptiste Onofré

Hi,

You can take a look on the documentation:

https://beam.apache.org/get-started/quickstart-java/

You will find here how to run the WordCount example on the direct runner for 
instance:


mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
 -Dexec.args="--inputFile=pom.xml --output=counts" -Pdirect-runner

It's the same for all examples. You have one profile per runner.

Regards
JB

On 05/24/2017 10:59 PM, Claire Yuan wrote:

Hi,
   Would anyone please show the correct command to run the examples like 
wordcount and tfIdf with maven in terminal?


--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Re: Make runner implementation smoother?

2017-05-24 Thread Jean-Baptiste Onofré

Hi Romain,

1. A runner has to implement the 5 (or 6 depending how we count) primitives. 
It's the purpose of Runner API (the Fn API is more to deal with SDKs in the 
runner). So, the Runner API is kind of adapter, and it implements all other 
functions based on the primitives. You would like a formal RunnerAdapter in the 
Runner API ?


2. A PipelineVisitor allows you to browse the DAG and get the PInput/POutput 
(representing PCollection and side inputs/outputs) for each PTransforms/steps in 
the pipeline. You can use this PipelineVisitor in a pre-step to get the complete 
graph. I see in your code that you use it. What would you like instead ? Kind of 
"object" representing the DAG ?


I gonna take a look on the code. FYI, I also have PoC for Hazelcast (with 
Cellar), Ignite and MapReduce if you want to take a look.


Regards
JB

On 05/24/2017 10:54 PM, Romain Manni-Bucau wrote:

Hi guys,

congrats for the 2.0!

I have a few question regarding a custom runner implementation, there is no 
particular order but adding numbers for later references:


1. why beam doesn't have (yet?) a RunnerAdapter with all primivite listed and 
enforced by API, if I got right the code (shout if not) each runner is creating 
its own processing context and convertion at the moment which kind of means beam 
doesn't abstract the runtime which makes it harder to enter into beam model IMHO 
(vs defining by contract all operations - potentially with defaults when 
compositions are possible)
2. close to previous point (still around runner): i find quite hard to browse 
the DAG of beam compared to a plain DAG, is it intended to be so low level, 
can't we get a simple graph model?


Maybe I'm just hitting a not yet extended/defined land and therefore an user 
friendly API is missing or I missed a central concept - in this case shout :p.


Any pointers would be very welcomed on how to implement a runner without 
redefining a full transpiler/converter - or is it outside beam scope?



FYI here what I have https://github.com/rmannibucau/beam-hazelcast-runner and 
here where I'm stucked (hesitating to redefine a full transpiler since I 
expected beam to help): 
https://github.com/rmannibucau/beam-hazelcast-runner/blob/master/src/main/java/com/github/rmannibucau/beam/runner/hazelcast/HazelcastPipelineVisitor.java


Romain Manni-Bucau
@rmannibucau  | Blog 
 | Old Blog 
 | Github  | 
LinkedIn  | JavaEE Factory 



--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Re: Problem when trying to specify the runner in examples

2017-05-24 Thread Dan Halperin
Hi Claire,

Have you seen the Quickstarts on https://beam.apache.org? Specifically, we
hope that https://beam.apache.org/get-started/quickstart-java/ answers
these questions and has explicit instructions for running WordCount on all
runners.

Dan

On Wed, May 24, 2017 at 2:45 PM, Thomas Groh  wrote:

> You should specify the Flink profile when you execute, with
> '-Pflink-runner'. That will add the Flink dependency to your classpath when
> you execute the pipeline.
>
> On Wed, May 24, 2017 at 2:35 PM, Claire Yuan 
> wrote:
>
>> Hi all,
>>   When I tried specify the runner in command for the examples:
>> *mvn exec:java -Dexec.mainClass=org.apache.be
>> am.examples.WordCount
>> -Dexec.args="--output=wordcount.txt --runner=FlinkRunner" *
>>
>>   The building was failed and got error message as:
>> *Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.5.0:java
>> (default-cli) on project beam-examples-java: An exception occured while
>> executing the Java class. null: InvocationTargetException: Unknown 'runner'
>> specified 'FlinkRunner', supported pipeline runners [DirectRunner] -> [Help
>> 1]*
>>
>>I am wondering if anyone got the same issue and how you solved it.
>>
>
>


Re: Make runner implementation smoother?

2017-05-24 Thread Kenneth Knowles
Hi Romain,

These are great topics for discussion, and very good timing.

I strongly recommend joining dev@beam, as that is where most discussion
about implementing Beam runners takes place. On that list, I recently
shared a guide for implementing a Beam runner at
https://s.apache.org/beam-runner-guide. It will be improved and appear on
the Beam site fairly soon.

I have also taken the liberty of moving this discussion to dev@beam and
moved user@beam to BCC.

Kenn

On Wed, May 24, 2017 at 1:54 PM, Romain Manni-Bucau 
wrote:

> Hi guys,
>
> congrats for the 2.0!
>
> I have a few question regarding a custom runner implementation, there is
> no particular order but adding numbers for later references:
>
> 1. why beam doesn't have (yet?) a RunnerAdapter with all primivite listed
> and enforced by API, if I got right the code (shout if not) each runner is
> creating its own processing context and convertion at the moment which kind
> of means beam doesn't abstract the runtime which makes it harder to enter
> into beam model IMHO (vs defining by contract all operations - potentially
> with defaults when compositions are possible)
> 2. close to previous point (still around runner): i find quite hard to
> browse the DAG of beam compared to a plain DAG, is it intended to be so low
> level, can't we get a simple graph model?
>
> Maybe I'm just hitting a not yet extended/defined land and therefore an
> user friendly API is missing or I missed a central concept - in this case
> shout :p.
>
> Any pointers would be very welcomed on how to implement a runner without
> redefining a full transpiler/converter - or is it outside beam scope?
>
>
> FYI here what I have https://github.com/rmannibucau/beam-hazelcast-runner
> and here where I'm stucked (hesitating to redefine a full transpiler since
> I expected beam to help): https://github.com/rmannibucau/beam-hazelcast-
> runner/blob/master/src/main/java/com/github/rmannibucau/
> beam/runner/hazelcast/HazelcastPipelineVisitor.java
>
> Romain Manni-Bucau
> @rmannibucau  |  Blog
>  | Old Blog
>  | Github
>  | LinkedIn
>  | JavaEE Factory
> 
>


Re: Problem when trying to specify the runner in examples

2017-05-24 Thread Thomas Groh
You should specify the Flink profile when you execute, with
'-Pflink-runner'. That will add the Flink dependency to your classpath when
you execute the pipeline.

On Wed, May 24, 2017 at 2:35 PM, Claire Yuan 
wrote:

> Hi all,
>   When I tried specify the runner in command for the examples:
> *mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount
> -Dexec.args="--output=wordcount.txt --runner=FlinkRunner" *
>
>   The building was failed and got error message as:
> *Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.5.0:java
> (default-cli) on project beam-examples-java: An exception occured while
> executing the Java class. null: InvocationTargetException: Unknown 'runner'
> specified 'FlinkRunner', supported pipeline runners [DirectRunner] -> [Help
> 1]*
>
>I am wondering if anyone got the same issue and how you solved it.
>


Problem when trying to specify the runner in examples

2017-05-24 Thread Claire Yuan
Hi all,  When I tried specify the runner in command for the examples:    mvn 
exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount 
-Dexec.args="--output=wordcount.txt --runner=FlinkRunner" 
  The building was failed and got error message as:Failed to execute goal 
org.codehaus.mojo:exec-maven-plugin:1.5.0:java (default-cli) on project 
beam-examples-java: An exception occured while executing the Java class. null: 
InvocationTargetException: Unknown 'runner' specified 'FlinkRunner', supported 
pipeline runners [DirectRunner] -> [Help 1]
   I am wondering if anyone got the same issue and how you solved it.

Re: question about running examples in command

2017-05-24 Thread Romain Manni-Bucau
Hi

did you try from examples/java:

$ mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount
-Dexec.args="--output=/tmp/out.txt"




Romain Manni-Bucau
@rmannibucau  |  Blog
 | Old Blog
 | Github  |
LinkedIn  | JavaEE Factory


2017-05-24 22:59 GMT+02:00 Claire Yuan :

> Hi,
>   Would anyone please show the correct command to run the examples like
> wordcount and tfIdf with maven in terminal?
>


question about running examples in command

2017-05-24 Thread Claire Yuan
Hi,  Would anyone please show the correct command to run the examples like 
wordcount and tfIdf with maven in terminal?

Make runner implementation smoother?

2017-05-24 Thread Romain Manni-Bucau
Hi guys,

congrats for the 2.0!

I have a few question regarding a custom runner implementation, there is no
particular order but adding numbers for later references:

1. why beam doesn't have (yet?) a RunnerAdapter with all primivite listed
and enforced by API, if I got right the code (shout if not) each runner is
creating its own processing context and convertion at the moment which kind
of means beam doesn't abstract the runtime which makes it harder to enter
into beam model IMHO (vs defining by contract all operations - potentially
with defaults when compositions are possible)
2. close to previous point (still around runner): i find quite hard to
browse the DAG of beam compared to a plain DAG, is it intended to be so low
level, can't we get a simple graph model?

Maybe I'm just hitting a not yet extended/defined land and therefore an
user friendly API is missing or I missed a central concept - in this case
shout :p.

Any pointers would be very welcomed on how to implement a runner without
redefining a full transpiler/converter - or is it outside beam scope?


FYI here what I have https://github.com/rmannibucau/beam-hazelcast-runner
and here where I'm stucked (hesitating to redefine a full transpiler since
I expected beam to help):
https://github.com/rmannibucau/beam-hazelcast-runner/blob/master/src/main/java/com/github/rmannibucau/beam/runner/hazelcast/HazelcastPipelineVisitor.java

Romain Manni-Bucau
@rmannibucau  |  Blog
 | Old Blog
 | Github  |
LinkedIn  | JavaEE Factory



Re: How to decrease latency when using PubsubIO.Read?

2017-05-24 Thread jofo90
Hi Raghu,

Thanks for the suggestions and for having a look at the metrics!
I will try the reshuffle and tweaking the publisher batching over the next 
couple of days, along with Ankur's suggestions, and will report back if any of 
these have a significant difference.

Best regards,
Josh

> On 24 May 2017, at 21:17, Raghu Angadi  wrote:
> 
> Josh,
> 
> Reshuffle might also be worth trying. To clarify, ~1s end-to-end is not 
> always simple given number of systems and services involved between publisher 
> and eventual sink.
> 
> Raghu.
> 
>> On Wed, May 24, 2017 at 12:32 PM, Raghu Angadi  wrote:
>> Thanks. Looked a some job system level metrics. I see minimal latency within 
>> Dataflow pipeline itself, you might not see much improvement from 
>> Reshuffle() (may be ~0.25 seconds).
>> 
>> Do you have control on publisher? Publishers might be batching hundreds of 
>> messages, which adds latency. You can try to reduce that. Even then PubSub 
>> itself does some batching internally which might limit overall latency. 
>> Removing publisher batching is worth a try. 
>> 
>> 
>>> On Wed, May 24, 2017 at 11:46 AM, Josh  wrote:
>>> Hi Raghu,
>>> 
>>> My job ID is 2017-05-24_02_46_42-11524480684503077480 - thanks for taking a 
>>> look!
>>> 
>>> Yes I'm using BigtableIO for the sink and I am measuring the end-to-end 
>>> latency. It seems to take 3-6 seconds typically, I would like to get it 
>>> down to ~1s.
>>> 
>>> Thanks,
>>> Josh
>>> 
 On Wed, May 24, 2017 at 6:50 PM, Raghu Angadi  wrote:
 Josh,
 
 Can you share your job_id? I could take look. Are you measuring latency 
 end-to-end (publisher to when it appears on BT?). Are you using BigtableIO 
 for sink?
 
 There is no easy way to use more workers when auto-scaling is enabled. It 
 thinks your backlog and CPU are low enough and does not need to scale.
 Raghu.
 
> On Wed, May 24, 2017 at 10:14 AM, Josh  wrote:
> Thanks Ankur, that's super helpful! I will give these optimisations a go.
> 
> About the "No operations completed" message - there are a few of these in 
> the logs (but very few, like 1 an hour or something) - so probably no 
> need to scale up Bigtable.
> I did however see a lot of INFO messages "Wrote 0 records" in the logs. 
> Probably about 50% of the "Wrote n records" messages are zero. While the 
> other 50% are quite high (e.g. "Wrote 80 records"). Not sure if that 
> could indicate a bad setting?
> 
> Josh
> 
> 
> 
>> On Wed, May 24, 2017 at 5:22 PM, Ankur Chauhan  
>> wrote:
>> There are two main things to see here:
>> 
>> * In the logs, are there any messages like "No operations completed 
>> within the last 61 seconds. There are still 1 simple operations and 1 
>> complex operations in progress.” This means you are underscaled on the 
>> bigtable side and would benefit from  increasing the node count.
>> * We also saw some improvement in performance (workload dependent) by 
>> going to a bigger worker machine type.
>> * Another optimization that worked for our use case:
>> 
>> // streaming dataflow has larger machines with smaller bundles, so we 
>> can queue up a lot more without blowing up
>> private static BigtableOptions 
>> createStreamingBTOptions(AnalyticsPipelineOptions opts) {
>> return new BigtableOptions.Builder()
>> .setProjectId(opts.getProject())
>> .setInstanceId(opts.getBigtableInstanceId())
>> .setUseCachedDataPool(true)
>> .setDataChannelCount(32)
>> .setBulkOptions(new BulkOptions.Builder()
>> .setUseBulkApi(true)
>> .setBulkMaxRowKeyCount(2048)
>> .setBulkMaxRequestSize(8_388_608L)
>> .setAsyncMutatorWorkerCount(32)
>> .build())
>> .build();
>> }
>> 
>> There is a lot of trial and error involved in getting the end-to-end 
>> latency down so I would suggest enabling the profiling using the 
>> —saveProfilesToGcs option and get a sense of what is exactly happening.
>> 
>> — Ankur Chauhan
>> 
>>> On May 24, 2017, at 9:09 AM, Josh  wrote:
>>> 
>>> Ah ok - I am using the Dataflow runner. I didn't realise about the 
>>> custom implementation being provided at runtime...
>>> 
>>> Any ideas of how to tweak my job to either lower the latency consuming 
>>> from PubSub or to lower the latency in writing to Bigtable?
>>> 
>>> 
 On Wed, May 24, 2017 at 4:14 PM, Lukasz Cwik  wrote:
 What runner are you using (Flink, Spark, Google Cloud Dataflow, Apex, 
 ...)?
 
> On Wed, May 24, 2017 at 8:09 AM, Ankur Chauhan  
> wrote:
> Sorry that was an autocorrect error. I meant to ask - what dataflow 
> runner are you using? If yo

Re: How to decrease latency when using PubsubIO.Read?

2017-05-24 Thread Raghu Angadi
Josh,

Reshuffle might also be worth trying. To clarify, ~1s end-to-end is not
always simple given number of systems and services involved between
publisher and eventual sink.

Raghu.

On Wed, May 24, 2017 at 12:32 PM, Raghu Angadi  wrote:

> Thanks. Looked a some job system level metrics. I see minimal latency
> within Dataflow pipeline itself, you might not see much improvement from
> Reshuffle() (may be ~0.25 seconds).
>
> Do you have control on publisher? Publishers might be batching hundreds of
> messages, which adds latency. You can try to reduce that. Even then PubSub
> itself does some batching internally which might limit overall latency.
> Removing publisher batching is worth a try.
>
>
> On Wed, May 24, 2017 at 11:46 AM, Josh  wrote:
>
>> Hi Raghu,
>>
>> My job ID is 2017-05-24_02_46_42-11524480684503077480 - thanks for
>> taking a look!
>>
>> Yes I'm using BigtableIO for the sink and I am measuring the end-to-end
>> latency. It seems to take 3-6 seconds typically, I would like to get it
>> down to ~1s.
>>
>> Thanks,
>> Josh
>>
>> On Wed, May 24, 2017 at 6:50 PM, Raghu Angadi  wrote:
>>
>>> Josh,
>>>
>>> Can you share your job_id? I could take look. Are you measuring latency
>>> end-to-end (publisher to when it appears on BT?). Are you using BigtableIO
>>> for sink?
>>>
>>> There is no easy way to use more workers when auto-scaling is enabled.
>>> It thinks your backlog and CPU are low enough and does not need to scale.
>>> Raghu.
>>>
>>> On Wed, May 24, 2017 at 10:14 AM, Josh  wrote:
>>>
 Thanks Ankur, that's super helpful! I will give these optimisations a
 go.

 About the "No operations completed" message - there are a few of these
 in the logs (but very few, like 1 an hour or something) - so probably no
 need to scale up Bigtable.
 I did however see a lot of INFO messages "Wrote 0 records" in the
 logs. Probably about 50% of the "Wrote n records" messages are zero.
 While the other 50% are quite high (e.g. "Wrote 80 records"). Not sure if
 that could indicate a bad setting?

 Josh



 On Wed, May 24, 2017 at 5:22 PM, Ankur Chauhan 
 wrote:

> There are two main things to see here:
>
> * In the logs, are there any messages like "No operations completed
> within the last 61 seconds. There are still 1 simple operations and 1
> complex operations in progress.” This means you are underscaled on the
> bigtable side and would benefit from  increasing the node count.
> * We also saw some improvement in performance (workload dependent) by
> going to a bigger worker machine type.
> * Another optimization that worked for our use case:
>
> // streaming dataflow has larger machines with smaller bundles, so we can 
> queue up a lot more without blowing up
> private static BigtableOptions 
> createStreamingBTOptions(AnalyticsPipelineOptions opts) {
> return new BigtableOptions.Builder()
> .setProjectId(opts.getProject())
> .setInstanceId(opts.getBigtableInstanceId())
> .setUseCachedDataPool(true)
> .setDataChannelCount(32)
> .setBulkOptions(new BulkOptions.Builder()
> .setUseBulkApi(true)
> .setBulkMaxRowKeyCount(2048)
> .setBulkMaxRequestSize(8_388_608L)
> .setAsyncMutatorWorkerCount(32)
> .build())
> .build();
> }
>
>
> There is a lot of trial and error involved in getting the end-to-end
> latency down so I would suggest enabling the profiling using the
> —saveProfilesToGcs option and get a sense of what is exactly happening.
>
> — Ankur Chauhan
>
> On May 24, 2017, at 9:09 AM, Josh  wrote:
>
> Ah ok - I am using the Dataflow runner. I didn't realise about the
> custom implementation being provided at runtime...
>
> Any ideas of how to tweak my job to either lower the latency consuming
> from PubSub or to lower the latency in writing to Bigtable?
>
>
> On Wed, May 24, 2017 at 4:14 PM, Lukasz Cwik  wrote:
>
>> What runner are you using (Flink, Spark, Google Cloud Dataflow, Apex,
>> ...)?
>>
>> On Wed, May 24, 2017 at 8:09 AM, Ankur Chauhan 
>> wrote:
>>
>>> Sorry that was an autocorrect error. I meant to ask - what dataflow
>>> runner are you using? If you are using google cloud dataflow then the
>>> PubsubIO class is not the one doing the reading from the pubsub topic. 
>>> They
>>> provide a custom implementation at run time.
>>>
>>> Ankur Chauhan
>>> Sent from my iPhone
>>>
>>> On May 24, 2017, at 07:52, Josh  wrote:
>>>
>>> Hi Ankur,
>>>
>>> What do you mean by runner address?
>>> Would you be able to link me to the comment you're referring to?
>>>
>>> I am using the PubsubIO.Read class 

Re: How to decrease latency when using PubsubIO.Read?

2017-05-24 Thread Raghu Angadi
Thanks. Looked a some job system level metrics. I see minimal latency
within Dataflow pipeline itself, you might not see much improvement from
Reshuffle() (may be ~0.25 seconds).

Do you have control on publisher? Publishers might be batching hundreds of
messages, which adds latency. You can try to reduce that. Even then PubSub
itself does some batching internally which might limit overall latency.
Removing publisher batching is worth a try.

On Wed, May 24, 2017 at 11:46 AM, Josh  wrote:

> Hi Raghu,
>
> My job ID is 2017-05-24_02_46_42-11524480684503077480 - thanks for taking
> a look!
>
> Yes I'm using BigtableIO for the sink and I am measuring the end-to-end
> latency. It seems to take 3-6 seconds typically, I would like to get it
> down to ~1s.
>
> Thanks,
> Josh
>
> On Wed, May 24, 2017 at 6:50 PM, Raghu Angadi  wrote:
>
>> Josh,
>>
>> Can you share your job_id? I could take look. Are you measuring latency
>> end-to-end (publisher to when it appears on BT?). Are you using BigtableIO
>> for sink?
>>
>> There is no easy way to use more workers when auto-scaling is enabled. It
>> thinks your backlog and CPU are low enough and does not need to scale.
>> Raghu.
>>
>> On Wed, May 24, 2017 at 10:14 AM, Josh  wrote:
>>
>>> Thanks Ankur, that's super helpful! I will give these optimisations a go.
>>>
>>> About the "No operations completed" message - there are a few of these
>>> in the logs (but very few, like 1 an hour or something) - so probably no
>>> need to scale up Bigtable.
>>> I did however see a lot of INFO messages "Wrote 0 records" in the logs. 
>>> Probably
>>> about 50% of the "Wrote n records" messages are zero. While the other 50%
>>> are quite high (e.g. "Wrote 80 records"). Not sure if that could indicate a
>>> bad setting?
>>>
>>> Josh
>>>
>>>
>>>
>>> On Wed, May 24, 2017 at 5:22 PM, Ankur Chauhan 
>>> wrote:
>>>
 There are two main things to see here:

 * In the logs, are there any messages like "No operations completed
 within the last 61 seconds. There are still 1 simple operations and 1
 complex operations in progress.” This means you are underscaled on the
 bigtable side and would benefit from  increasing the node count.
 * We also saw some improvement in performance (workload dependent) by
 going to a bigger worker machine type.
 * Another optimization that worked for our use case:

 // streaming dataflow has larger machines with smaller bundles, so we can 
 queue up a lot more without blowing up
 private static BigtableOptions 
 createStreamingBTOptions(AnalyticsPipelineOptions opts) {
 return new BigtableOptions.Builder()
 .setProjectId(opts.getProject())
 .setInstanceId(opts.getBigtableInstanceId())
 .setUseCachedDataPool(true)
 .setDataChannelCount(32)
 .setBulkOptions(new BulkOptions.Builder()
 .setUseBulkApi(true)
 .setBulkMaxRowKeyCount(2048)
 .setBulkMaxRequestSize(8_388_608L)
 .setAsyncMutatorWorkerCount(32)
 .build())
 .build();
 }


 There is a lot of trial and error involved in getting the end-to-end
 latency down so I would suggest enabling the profiling using the
 —saveProfilesToGcs option and get a sense of what is exactly happening.

 — Ankur Chauhan

 On May 24, 2017, at 9:09 AM, Josh  wrote:

 Ah ok - I am using the Dataflow runner. I didn't realise about the
 custom implementation being provided at runtime...

 Any ideas of how to tweak my job to either lower the latency consuming
 from PubSub or to lower the latency in writing to Bigtable?


 On Wed, May 24, 2017 at 4:14 PM, Lukasz Cwik  wrote:

> What runner are you using (Flink, Spark, Google Cloud Dataflow, Apex,
> ...)?
>
> On Wed, May 24, 2017 at 8:09 AM, Ankur Chauhan 
> wrote:
>
>> Sorry that was an autocorrect error. I meant to ask - what dataflow
>> runner are you using? If you are using google cloud dataflow then the
>> PubsubIO class is not the one doing the reading from the pubsub topic. 
>> They
>> provide a custom implementation at run time.
>>
>> Ankur Chauhan
>> Sent from my iPhone
>>
>> On May 24, 2017, at 07:52, Josh  wrote:
>>
>> Hi Ankur,
>>
>> What do you mean by runner address?
>> Would you be able to link me to the comment you're referring to?
>>
>> I am using the PubsubIO.Read class from Beam 2.0.0 as found here:
>> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/
>> io/google-cloud-platform/src/main/java/org/apache/beam/sdk/i
>> o/gcp/pubsub/PubsubIO.java
>>
>> Thanks,
>> Josh
>>
>> On Wed, May 24, 2017 at 3:36 PM, Ankur Chauhan 
>> wrote:
>>
>>> What runner address you using. Google cloud data

Re: How to decrease latency when using PubsubIO.Read?

2017-05-24 Thread Raghu Angadi
On Wed, May 24, 2017 at 10:14 AM, Josh  wrote:

> Thanks Ankur, that's super helpful! I will give these optimisations a go.
>
> About the "No operations completed" message - there are a few of these in
> the logs (but very few, like 1 an hour or something) - so probably no need
> to scale up Bigtable.
> I did however see a lot of INFO messages "Wrote 0 records" in the logs. 
> Probably
> about 50% of the "Wrote n records" messages are zero. While the other 50%
> are quite high (e.g. "Wrote 80 records"). Not sure if that could indicate a
> bad setting?
>

If this is a single stage pipeline (no GroupByKey()), high number could
indicate bundle size in Dataflow. A larger bundle could increase latency.
You can try a work around by adding a reshuffle
.
Something like :
  .apply(...)
  .apply(message -> (random.nextLong(), message)) // add a random key.
  .apply(Reshuffle.of())
  .apply((i, message) -> message) // strip key
  .apply(sink)

This would bring 'n' down to 1 or so. I don't know why there are messages
that say '0' records.

Raghu.


> Josh
>
>
>
> On Wed, May 24, 2017 at 5:22 PM, Ankur Chauhan  wrote:
>
>> There are two main things to see here:
>>
>> * In the logs, are there any messages like "No operations completed
>> within the last 61 seconds. There are still 1 simple operations and 1
>> complex operations in progress.” This means you are underscaled on the
>> bigtable side and would benefit from  increasing the node count.
>> * We also saw some improvement in performance (workload dependent) by
>> going to a bigger worker machine type.
>> * Another optimization that worked for our use case:
>>
>> // streaming dataflow has larger machines with smaller bundles, so we can 
>> queue up a lot more without blowing up
>> private static BigtableOptions 
>> createStreamingBTOptions(AnalyticsPipelineOptions opts) {
>> return new BigtableOptions.Builder()
>> .setProjectId(opts.getProject())
>> .setInstanceId(opts.getBigtableInstanceId())
>> .setUseCachedDataPool(true)
>> .setDataChannelCount(32)
>> .setBulkOptions(new BulkOptions.Builder()
>> .setUseBulkApi(true)
>> .setBulkMaxRowKeyCount(2048)
>> .setBulkMaxRequestSize(8_388_608L)
>> .setAsyncMutatorWorkerCount(32)
>> .build())
>> .build();
>> }
>>
>>
>> There is a lot of trial and error involved in getting the end-to-end
>> latency down so I would suggest enabling the profiling using the
>> —saveProfilesToGcs option and get a sense of what is exactly happening.
>>
>> — Ankur Chauhan
>>
>> On May 24, 2017, at 9:09 AM, Josh  wrote:
>>
>> Ah ok - I am using the Dataflow runner. I didn't realise about the custom
>> implementation being provided at runtime...
>>
>> Any ideas of how to tweak my job to either lower the latency consuming
>> from PubSub or to lower the latency in writing to Bigtable?
>>
>>
>> On Wed, May 24, 2017 at 4:14 PM, Lukasz Cwik  wrote:
>>
>>> What runner are you using (Flink, Spark, Google Cloud Dataflow, Apex,
>>> ...)?
>>>
>>> On Wed, May 24, 2017 at 8:09 AM, Ankur Chauhan 
>>> wrote:
>>>
 Sorry that was an autocorrect error. I meant to ask - what dataflow
 runner are you using? If you are using google cloud dataflow then the
 PubsubIO class is not the one doing the reading from the pubsub topic. They
 provide a custom implementation at run time.

 Ankur Chauhan
 Sent from my iPhone

 On May 24, 2017, at 07:52, Josh  wrote:

 Hi Ankur,

 What do you mean by runner address?
 Would you be able to link me to the comment you're referring to?

 I am using the PubsubIO.Read class from Beam 2.0.0 as found here:
 https://github.com/apache/beam/blob/release-2.0.0/sdks/java/
 io/google-cloud-platform/src/main/java/org/apache/beam/sdk/i
 o/gcp/pubsub/PubsubIO.java

 Thanks,
 Josh

 On Wed, May 24, 2017 at 3:36 PM, Ankur Chauhan 
 wrote:

> What runner address you using. Google cloud dataflow uses a closed
> source version of the pubsub reader as noted in a comment on Read class.
>
> Ankur Chauhan
> Sent from my iPhone
>
> On May 24, 2017, at 04:05, Josh  wrote:
>
> Hi all,
>
> I'm using PubsubIO.Read to consume a Pubsub stream, and my job then
> writes the data out to Bigtable. I'm currently seeing a latency of 3-5
> seconds between the messages being published and being written to 
> Bigtable.
>
> I want to try and decrease the latency to <1s if possible - does
> anyone have any tips for doing this?
>
> I noticed that there is a PubsubGrpcClient
> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/
> io/google-cloud-platform/src/main/java/org/apache/beam/sdk/i

Re: Question regarding on how to run Beam locally

2017-05-24 Thread Claire Yuan
Hi,  I right click on my IntelliJ and try "run" in the main method of tfidf in 
examples, but it says fail in finding main method. Anyone has run the method 
manually?

 

On Tuesday, May 23, 2017 10:56 PM, Jean-Baptiste Onofré  
wrote:
 

 Hi,

Just tried with one of my sample, and it runs fine locally (tried flink, spark 
and direct). Let me try the quickstart tfidf example.

Regards
JB

On 05/23/2017 11:56 PM, Claire Yuan wrote:
> To who may concern,
>    I am trying to run the Apache Beam with Java SDK on my computer. I already 
> cloned the codes from github and was trying to run the example in "TfIdf". 
> When 
> I opened in IntelliJ and tried right click on it to run the main() in TfIdf, 
> it 
> says :
> Error: Could not find or load main class 
> org.apache.beam.examples.complete.TfIdf
> Also when I tried using the command I found in quickstart and run it in the 
> folder containing src codes for examples:
> 
> |mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.TfIdf \ 
> -Dexec.args="--runner=FlinkRunner --inputFile=pom.xml --output=counts" 
> -Pflink-runner|
> 
> It stuck on the terminal. I am wondering if anyone could help in how to run 
> program locally in Ubuntu.
> 
> Sincerely, a user
> 
> 
> 
> 

-- 
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


   

Re: How to decrease latency when using PubsubIO.Read?

2017-05-24 Thread Josh
Hi Raghu,

My job ID is 2017-05-24_02_46_42-11524480684503077480 - thanks for taking a
look!

Yes I'm using BigtableIO for the sink and I am measuring the end-to-end
latency. It seems to take 3-6 seconds typically, I would like to get it
down to ~1s.

Thanks,
Josh

On Wed, May 24, 2017 at 6:50 PM, Raghu Angadi  wrote:

> Josh,
>
> Can you share your job_id? I could take look. Are you measuring latency
> end-to-end (publisher to when it appears on BT?). Are you using BigtableIO
> for sink?
>
> There is no easy way to use more workers when auto-scaling is enabled. It
> thinks your backlog and CPU are low enough and does not need to scale.
> Raghu.
>
> On Wed, May 24, 2017 at 10:14 AM, Josh  wrote:
>
>> Thanks Ankur, that's super helpful! I will give these optimisations a go.
>>
>> About the "No operations completed" message - there are a few of these in
>> the logs (but very few, like 1 an hour or something) - so probably no need
>> to scale up Bigtable.
>> I did however see a lot of INFO messages "Wrote 0 records" in the logs. 
>> Probably
>> about 50% of the "Wrote n records" messages are zero. While the other 50%
>> are quite high (e.g. "Wrote 80 records"). Not sure if that could indicate a
>> bad setting?
>>
>> Josh
>>
>>
>>
>> On Wed, May 24, 2017 at 5:22 PM, Ankur Chauhan 
>> wrote:
>>
>>> There are two main things to see here:
>>>
>>> * In the logs, are there any messages like "No operations completed
>>> within the last 61 seconds. There are still 1 simple operations and 1
>>> complex operations in progress.” This means you are underscaled on the
>>> bigtable side and would benefit from  increasing the node count.
>>> * We also saw some improvement in performance (workload dependent) by
>>> going to a bigger worker machine type.
>>> * Another optimization that worked for our use case:
>>>
>>> // streaming dataflow has larger machines with smaller bundles, so we can 
>>> queue up a lot more without blowing up
>>> private static BigtableOptions 
>>> createStreamingBTOptions(AnalyticsPipelineOptions opts) {
>>> return new BigtableOptions.Builder()
>>> .setProjectId(opts.getProject())
>>> .setInstanceId(opts.getBigtableInstanceId())
>>> .setUseCachedDataPool(true)
>>> .setDataChannelCount(32)
>>> .setBulkOptions(new BulkOptions.Builder()
>>> .setUseBulkApi(true)
>>> .setBulkMaxRowKeyCount(2048)
>>> .setBulkMaxRequestSize(8_388_608L)
>>> .setAsyncMutatorWorkerCount(32)
>>> .build())
>>> .build();
>>> }
>>>
>>>
>>> There is a lot of trial and error involved in getting the end-to-end
>>> latency down so I would suggest enabling the profiling using the
>>> —saveProfilesToGcs option and get a sense of what is exactly happening.
>>>
>>> — Ankur Chauhan
>>>
>>> On May 24, 2017, at 9:09 AM, Josh  wrote:
>>>
>>> Ah ok - I am using the Dataflow runner. I didn't realise about the
>>> custom implementation being provided at runtime...
>>>
>>> Any ideas of how to tweak my job to either lower the latency consuming
>>> from PubSub or to lower the latency in writing to Bigtable?
>>>
>>>
>>> On Wed, May 24, 2017 at 4:14 PM, Lukasz Cwik  wrote:
>>>
 What runner are you using (Flink, Spark, Google Cloud Dataflow, Apex,
 ...)?

 On Wed, May 24, 2017 at 8:09 AM, Ankur Chauhan 
 wrote:

> Sorry that was an autocorrect error. I meant to ask - what dataflow
> runner are you using? If you are using google cloud dataflow then the
> PubsubIO class is not the one doing the reading from the pubsub topic. 
> They
> provide a custom implementation at run time.
>
> Ankur Chauhan
> Sent from my iPhone
>
> On May 24, 2017, at 07:52, Josh  wrote:
>
> Hi Ankur,
>
> What do you mean by runner address?
> Would you be able to link me to the comment you're referring to?
>
> I am using the PubsubIO.Read class from Beam 2.0.0 as found here:
> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/
> io/google-cloud-platform/src/main/java/org/apache/beam/sdk/i
> o/gcp/pubsub/PubsubIO.java
>
> Thanks,
> Josh
>
> On Wed, May 24, 2017 at 3:36 PM, Ankur Chauhan 
> wrote:
>
>> What runner address you using. Google cloud dataflow uses a closed
>> source version of the pubsub reader as noted in a comment on Read class.
>>
>> Ankur Chauhan
>> Sent from my iPhone
>>
>> On May 24, 2017, at 04:05, Josh  wrote:
>>
>> Hi all,
>>
>> I'm using PubsubIO.Read to consume a Pubsub stream, and my job then
>> writes the data out to Bigtable. I'm currently seeing a latency of 3-5
>> seconds between the messages being published and being written to 
>> Bigtable.
>>
>> I want to try and decrease the latency to <1s if possible - does
>> anyone have any tips for doing this?
>>
>> I noticed 

Re: How to decrease latency when using PubsubIO.Read?

2017-05-24 Thread Raghu Angadi
Josh,

Can you share your job_id? I could take look. Are you measuring latency
end-to-end (publisher to when it appears on BT?). Are you using BigtableIO
for sink?

There is no easy way to use more workers when auto-scaling is enabled. It
thinks your backlog and CPU are low enough and does not need to scale.
Raghu.

On Wed, May 24, 2017 at 10:14 AM, Josh  wrote:

> Thanks Ankur, that's super helpful! I will give these optimisations a go.
>
> About the "No operations completed" message - there are a few of these in
> the logs (but very few, like 1 an hour or something) - so probably no need
> to scale up Bigtable.
> I did however see a lot of INFO messages "Wrote 0 records" in the logs. 
> Probably
> about 50% of the "Wrote n records" messages are zero. While the other 50%
> are quite high (e.g. "Wrote 80 records"). Not sure if that could indicate a
> bad setting?
>
> Josh
>
>
>
> On Wed, May 24, 2017 at 5:22 PM, Ankur Chauhan  wrote:
>
>> There are two main things to see here:
>>
>> * In the logs, are there any messages like "No operations completed
>> within the last 61 seconds. There are still 1 simple operations and 1
>> complex operations in progress.” This means you are underscaled on the
>> bigtable side and would benefit from  increasing the node count.
>> * We also saw some improvement in performance (workload dependent) by
>> going to a bigger worker machine type.
>> * Another optimization that worked for our use case:
>>
>> // streaming dataflow has larger machines with smaller bundles, so we can 
>> queue up a lot more without blowing up
>> private static BigtableOptions 
>> createStreamingBTOptions(AnalyticsPipelineOptions opts) {
>> return new BigtableOptions.Builder()
>> .setProjectId(opts.getProject())
>> .setInstanceId(opts.getBigtableInstanceId())
>> .setUseCachedDataPool(true)
>> .setDataChannelCount(32)
>> .setBulkOptions(new BulkOptions.Builder()
>> .setUseBulkApi(true)
>> .setBulkMaxRowKeyCount(2048)
>> .setBulkMaxRequestSize(8_388_608L)
>> .setAsyncMutatorWorkerCount(32)
>> .build())
>> .build();
>> }
>>
>>
>> There is a lot of trial and error involved in getting the end-to-end
>> latency down so I would suggest enabling the profiling using the
>> —saveProfilesToGcs option and get a sense of what is exactly happening.
>>
>> — Ankur Chauhan
>>
>> On May 24, 2017, at 9:09 AM, Josh  wrote:
>>
>> Ah ok - I am using the Dataflow runner. I didn't realise about the custom
>> implementation being provided at runtime...
>>
>> Any ideas of how to tweak my job to either lower the latency consuming
>> from PubSub or to lower the latency in writing to Bigtable?
>>
>>
>> On Wed, May 24, 2017 at 4:14 PM, Lukasz Cwik  wrote:
>>
>>> What runner are you using (Flink, Spark, Google Cloud Dataflow, Apex,
>>> ...)?
>>>
>>> On Wed, May 24, 2017 at 8:09 AM, Ankur Chauhan 
>>> wrote:
>>>
 Sorry that was an autocorrect error. I meant to ask - what dataflow
 runner are you using? If you are using google cloud dataflow then the
 PubsubIO class is not the one doing the reading from the pubsub topic. They
 provide a custom implementation at run time.

 Ankur Chauhan
 Sent from my iPhone

 On May 24, 2017, at 07:52, Josh  wrote:

 Hi Ankur,

 What do you mean by runner address?
 Would you be able to link me to the comment you're referring to?

 I am using the PubsubIO.Read class from Beam 2.0.0 as found here:
 https://github.com/apache/beam/blob/release-2.0.0/sdks/java/
 io/google-cloud-platform/src/main/java/org/apache/beam/sdk/i
 o/gcp/pubsub/PubsubIO.java

 Thanks,
 Josh

 On Wed, May 24, 2017 at 3:36 PM, Ankur Chauhan 
 wrote:

> What runner address you using. Google cloud dataflow uses a closed
> source version of the pubsub reader as noted in a comment on Read class.
>
> Ankur Chauhan
> Sent from my iPhone
>
> On May 24, 2017, at 04:05, Josh  wrote:
>
> Hi all,
>
> I'm using PubsubIO.Read to consume a Pubsub stream, and my job then
> writes the data out to Bigtable. I'm currently seeing a latency of 3-5
> seconds between the messages being published and being written to 
> Bigtable.
>
> I want to try and decrease the latency to <1s if possible - does
> anyone have any tips for doing this?
>
> I noticed that there is a PubsubGrpcClient
> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/
> io/google-cloud-platform/src/main/java/org/apache/beam/sdk/i
> o/gcp/pubsub/PubsubGrpcClient.java however the PubsubUnboundedSource
> is initialised with a PubsubJsonClient, so the Grpc client doesn't appear
> to be being used. Is there a way to switch to the Grpc client - as perhaps
> that would give better performance?
>
> Also, I 

Re: How to partition a stream by key before writing with FileBasedSink?

2017-05-24 Thread Josh
Ahh I see - Ok I'll try out this solution then. Thanks Lukasz!

On Wed, May 24, 2017 at 5:20 PM, Lukasz Cwik  wrote:

> Google Cloud Dataflow won't override your setting. The dynamic sharding
> occurs if you don't explicitly set a numShard value.
>
> On Wed, May 24, 2017 at 9:14 AM, Josh  wrote:
>
>> Hi Lukasz,
>>
>> Thanks for the example. That sounds like a nice solution -
>> I am running on Dataflow though, which dynamically sets numShards - so if
>> I set numShards to 1 on each of those AvroIO writers, I can't be sure that
>> Dataflow isn't going to override my setting right? I guess this should work
>> fine as long as I partition my stream into a large enough number of
>> partitions so that Dataflow won't override numShards.
>>
>> Josh
>>
>>
>> On Wed, May 24, 2017 at 4:10 PM, Lukasz Cwik  wrote:
>>
>>> Since your using a small number of shards, add a Partition transform
>>> which uses a deterministic hash of the key to choose one of 4 partitions.
>>> Write each partition with a single shard.
>>>
>>> (Fixed width diagram below)
>>> Pipeline -> AvroIO(numShards = 4)
>>> Becomes:
>>> Pipeline -> Partition --> AvroIO(numShards = 1)
>>>   |-> AvroIO(numShards = 1)
>>>   |-> AvroIO(numShards = 1)
>>>   \-> AvroIO(numShards = 1)
>>>
>>> On Wed, May 24, 2017 at 1:05 AM, Josh  wrote:
>>>
 Hi,

 I am using a FileBasedSink (AvroIO.write) on an unbounded stream
 (withWindowedWrites, hourly windows, numShards=4).

 I would like to partition the stream by some key in the element, so
 that all elements with the same key will get processed by the same shard
 writer, and therefore written to the same file. Is there a way to do this?
 Note that in my stream the number of keys is very large (most elements have
 a unique key, while a few elements share a key).

 Thanks,
 Josh

>>>
>>>
>>
>


Re: How to decrease latency when using PubsubIO.Read?

2017-05-24 Thread Josh
Thanks Ankur, that's super helpful! I will give these optimisations a go.

About the "No operations completed" message - there are a few of these in
the logs (but very few, like 1 an hour or something) - so probably no need
to scale up Bigtable.
I did however see a lot of INFO messages "Wrote 0 records" in the
logs. Probably
about 50% of the "Wrote n records" messages are zero. While the other 50%
are quite high (e.g. "Wrote 80 records"). Not sure if that could indicate a
bad setting?

Josh



On Wed, May 24, 2017 at 5:22 PM, Ankur Chauhan  wrote:

> There are two main things to see here:
>
> * In the logs, are there any messages like "No operations completed within
> the last 61 seconds. There are still 1 simple operations and 1 complex
> operations in progress.” This means you are underscaled on the bigtable
> side and would benefit from  increasing the node count.
> * We also saw some improvement in performance (workload dependent) by
> going to a bigger worker machine type.
> * Another optimization that worked for our use case:
>
> // streaming dataflow has larger machines with smaller bundles, so we can 
> queue up a lot more without blowing up
> private static BigtableOptions 
> createStreamingBTOptions(AnalyticsPipelineOptions opts) {
> return new BigtableOptions.Builder()
> .setProjectId(opts.getProject())
> .setInstanceId(opts.getBigtableInstanceId())
> .setUseCachedDataPool(true)
> .setDataChannelCount(32)
> .setBulkOptions(new BulkOptions.Builder()
> .setUseBulkApi(true)
> .setBulkMaxRowKeyCount(2048)
> .setBulkMaxRequestSize(8_388_608L)
> .setAsyncMutatorWorkerCount(32)
> .build())
> .build();
> }
>
>
> There is a lot of trial and error involved in getting the end-to-end
> latency down so I would suggest enabling the profiling using the
> —saveProfilesToGcs option and get a sense of what is exactly happening.
>
> — Ankur Chauhan
>
> On May 24, 2017, at 9:09 AM, Josh  wrote:
>
> Ah ok - I am using the Dataflow runner. I didn't realise about the custom
> implementation being provided at runtime...
>
> Any ideas of how to tweak my job to either lower the latency consuming
> from PubSub or to lower the latency in writing to Bigtable?
>
>
> On Wed, May 24, 2017 at 4:14 PM, Lukasz Cwik  wrote:
>
>> What runner are you using (Flink, Spark, Google Cloud Dataflow, Apex,
>> ...)?
>>
>> On Wed, May 24, 2017 at 8:09 AM, Ankur Chauhan 
>> wrote:
>>
>>> Sorry that was an autocorrect error. I meant to ask - what dataflow
>>> runner are you using? If you are using google cloud dataflow then the
>>> PubsubIO class is not the one doing the reading from the pubsub topic. They
>>> provide a custom implementation at run time.
>>>
>>> Ankur Chauhan
>>> Sent from my iPhone
>>>
>>> On May 24, 2017, at 07:52, Josh  wrote:
>>>
>>> Hi Ankur,
>>>
>>> What do you mean by runner address?
>>> Would you be able to link me to the comment you're referring to?
>>>
>>> I am using the PubsubIO.Read class from Beam 2.0.0 as found here:
>>> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/
>>> io/google-cloud-platform/src/main/java/org/apache/beam/sdk/i
>>> o/gcp/pubsub/PubsubIO.java
>>>
>>> Thanks,
>>> Josh
>>>
>>> On Wed, May 24, 2017 at 3:36 PM, Ankur Chauhan 
>>> wrote:
>>>
 What runner address you using. Google cloud dataflow uses a closed
 source version of the pubsub reader as noted in a comment on Read class.

 Ankur Chauhan
 Sent from my iPhone

 On May 24, 2017, at 04:05, Josh  wrote:

 Hi all,

 I'm using PubsubIO.Read to consume a Pubsub stream, and my job then
 writes the data out to Bigtable. I'm currently seeing a latency of 3-5
 seconds between the messages being published and being written to Bigtable.

 I want to try and decrease the latency to <1s if possible - does anyone
 have any tips for doing this?

 I noticed that there is a PubsubGrpcClient
 https://github.com/apache/beam/blob/release-2.0.0/sdks/java/
 io/google-cloud-platform/src/main/java/org/apache/beam/sdk/i
 o/gcp/pubsub/PubsubGrpcClient.java however the PubsubUnboundedSource
 is initialised with a PubsubJsonClient, so the Grpc client doesn't appear
 to be being used. Is there a way to switch to the Grpc client - as perhaps
 that would give better performance?

 Also, I am running my job on Dataflow using autoscaling, which has only
 allocated one n1-standard-4 instance to the job, which is running at
 ~50% CPU. Could forcing a higher number of nodes help improve latency?

 Thanks for any advice,
 Josh


>>>
>>
>
>


Re: Best way to load heavy object into memory on nodes (python sdk)

2017-05-24 Thread Ahmet Altay
You can see an example implementation of Luke's suggestion in the
tensorflow-transform project [1]. Thread local is used in that case, this
will work for runners that re-use the same thread to execute bundles.

[1] 
*https://github.com/tensorflow/transform/blob/master/tensorflow_transform/beam/impl.py#L253
*

On Wed, May 24, 2017 at 8:00 AM, Lukasz Cwik  wrote:

> Why not use a singleton like pattern and have a function which either
> loads and caches the ML model from a side input or returns the singleton if
> it has been loaded.
> You'll want to use some form of locking to ensure that you really only
> load the ML model once.
>
> On Wed, May 24, 2017 at 6:18 AM, Vilhelm von Ehrenheim <
> vonehrenh...@gmail.com> wrote:
>
>> Hi all!
>> I would like to load a heavy object (think ML model) into memory that
>> should be available in a ParDo for quick predictions.
>>
>> What is the preferred way of doing this without loading the model for
>> each ParDo call (slow and will flood memory on the nodes). I don't seem to
>> be able to do it in the DoFn's __init__ block either as this is only done
>> once for all nodes (my guess here though) and then it breaks when
>> replicated internally (even on the DirectRunner, I suspect it is pickled
>> and this object cannot be pickled). If I load it as a side input it seems
>> to still be loaded into memory separately for each ParDo.
>>
>> If there is a better way to handle it in Java I'm happy to do it there
>> instead. It was just easier to attack the problem w python as the models
>> were developed in python.
>>
>> Any sort of pointers or tips are welcome!
>>
>> Thanks!
>> Vilhelm von Ehrenheim
>>
>
>


Re: How to decrease latency when using PubsubIO.Read?

2017-05-24 Thread Ankur Chauhan
There are two main things to see here:

* In the logs, are there any messages like "No operations completed within the 
last 61 seconds. There are still 1 simple operations and 1 complex operations 
in progress.” This means you are underscaled on the bigtable side and would 
benefit from  increasing the node count.
* We also saw some improvement in performance (workload dependent) by going to 
a bigger worker machine type.
* Another optimization that worked for our use case:

// streaming dataflow has larger machines with smaller bundles, so we can queue 
up a lot more without blowing up
private static BigtableOptions 
createStreamingBTOptions(AnalyticsPipelineOptions opts) {
return new BigtableOptions.Builder()
.setProjectId(opts.getProject())
.setInstanceId(opts.getBigtableInstanceId())
.setUseCachedDataPool(true)
.setDataChannelCount(32)
.setBulkOptions(new BulkOptions.Builder()
.setUseBulkApi(true)
.setBulkMaxRowKeyCount(2048)
.setBulkMaxRequestSize(8_388_608L)
.setAsyncMutatorWorkerCount(32)
.build())
.build();
}

There is a lot of trial and error involved in getting the end-to-end latency 
down so I would suggest enabling the profiling using the —saveProfilesToGcs 
option and get a sense of what is exactly happening.

— Ankur Chauhan

> On May 24, 2017, at 9:09 AM, Josh  wrote:
> 
> Ah ok - I am using the Dataflow runner. I didn't realise about the custom 
> implementation being provided at runtime...
> 
> Any ideas of how to tweak my job to either lower the latency consuming from 
> PubSub or to lower the latency in writing to Bigtable?
> 
> 
> On Wed, May 24, 2017 at 4:14 PM, Lukasz Cwik  > wrote:
> What runner are you using (Flink, Spark, Google Cloud Dataflow, Apex, ...)?
> 
> On Wed, May 24, 2017 at 8:09 AM, Ankur Chauhan  > wrote:
> Sorry that was an autocorrect error. I meant to ask - what dataflow runner 
> are you using? If you are using google cloud dataflow then the PubsubIO class 
> is not the one doing the reading from the pubsub topic. They provide a custom 
> implementation at run time.
> 
> Ankur Chauhan 
> Sent from my iPhone
> 
> On May 24, 2017, at 07:52, Josh mailto:jof...@gmail.com>> 
> wrote:
> 
>> Hi Ankur,
>> 
>> What do you mean by runner address?
>> Would you be able to link me to the comment you're referring to?
>> 
>> I am using the PubsubIO.Read class from Beam 2.0.0 as found here:
>> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
>>  
>> 
>> 
>> Thanks,
>> Josh
>> 
>> On Wed, May 24, 2017 at 3:36 PM, Ankur Chauhan > > wrote:
>> What runner address you using. Google cloud dataflow uses a closed source 
>> version of the pubsub reader as noted in a comment on Read class. 
>> 
>> Ankur Chauhan
>> Sent from my iPhone
>> 
>> On May 24, 2017, at 04:05, Josh mailto:jof...@gmail.com>> 
>> wrote:
>> 
>>> Hi all,
>>> 
>>> I'm using PubsubIO.Read to consume a Pubsub stream, and my job then writes 
>>> the data out to Bigtable. I'm currently seeing a latency of 3-5 seconds 
>>> between the messages being published and being written to Bigtable.
>>> 
>>> I want to try and decrease the latency to <1s if possible - does anyone 
>>> have any tips for doing this? 
>>> 
>>> I noticed that there is a PubsubGrpcClient 
>>> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
>>>  
>>> 
>>>  however the PubsubUnboundedSource is initialised with a PubsubJsonClient, 
>>> so the Grpc client doesn't appear to be being used. Is there a way to 
>>> switch to the Grpc client - as perhaps that would give better performance?
>>> 
>>> Also, I am running my job on Dataflow using autoscaling, which has only 
>>> allocated one n1-standard-4 instance to the job, which is running at ~50% 
>>> CPU. Could forcing a higher number of nodes help improve latency?
>>> 
>>> Thanks for any advice,
>>> Josh
>> 
> 
> 



Re: How to partition a stream by key before writing with FileBasedSink?

2017-05-24 Thread Lukasz Cwik
Google Cloud Dataflow won't override your setting. The dynamic sharding
occurs if you don't explicitly set a numShard value.

On Wed, May 24, 2017 at 9:14 AM, Josh  wrote:

> Hi Lukasz,
>
> Thanks for the example. That sounds like a nice solution -
> I am running on Dataflow though, which dynamically sets numShards - so if
> I set numShards to 1 on each of those AvroIO writers, I can't be sure that
> Dataflow isn't going to override my setting right? I guess this should work
> fine as long as I partition my stream into a large enough number of
> partitions so that Dataflow won't override numShards.
>
> Josh
>
>
> On Wed, May 24, 2017 at 4:10 PM, Lukasz Cwik  wrote:
>
>> Since your using a small number of shards, add a Partition transform
>> which uses a deterministic hash of the key to choose one of 4 partitions.
>> Write each partition with a single shard.
>>
>> (Fixed width diagram below)
>> Pipeline -> AvroIO(numShards = 4)
>> Becomes:
>> Pipeline -> Partition --> AvroIO(numShards = 1)
>>   |-> AvroIO(numShards = 1)
>>   |-> AvroIO(numShards = 1)
>>   \-> AvroIO(numShards = 1)
>>
>> On Wed, May 24, 2017 at 1:05 AM, Josh  wrote:
>>
>>> Hi,
>>>
>>> I am using a FileBasedSink (AvroIO.write) on an unbounded stream
>>> (withWindowedWrites, hourly windows, numShards=4).
>>>
>>> I would like to partition the stream by some key in the element, so that
>>> all elements with the same key will get processed by the same shard writer,
>>> and therefore written to the same file. Is there a way to do this? Note
>>> that in my stream the number of keys is very large (most elements have a
>>> unique key, while a few elements share a key).
>>>
>>> Thanks,
>>> Josh
>>>
>>
>>
>


Re: How to partition a stream by key before writing with FileBasedSink?

2017-05-24 Thread Josh
Hi Lukasz,

Thanks for the example. That sounds like a nice solution -
I am running on Dataflow though, which dynamically sets numShards - so if I
set numShards to 1 on each of those AvroIO writers, I can't be sure that
Dataflow isn't going to override my setting right? I guess this should work
fine as long as I partition my stream into a large enough number of
partitions so that Dataflow won't override numShards.

Josh

On Wed, May 24, 2017 at 4:10 PM, Lukasz Cwik  wrote:

> Since your using a small number of shards, add a Partition transform which
> uses a deterministic hash of the key to choose one of 4 partitions. Write
> each partition with a single shard.
>
> (Fixed width diagram below)
> Pipeline -> AvroIO(numShards = 4)
> Becomes:
> Pipeline -> Partition --> AvroIO(numShards = 1)
>   |-> AvroIO(numShards = 1)
>   |-> AvroIO(numShards = 1)
>   \-> AvroIO(numShards = 1)
>
> On Wed, May 24, 2017 at 1:05 AM, Josh  wrote:
>
>> Hi,
>>
>> I am using a FileBasedSink (AvroIO.write) on an unbounded stream
>> (withWindowedWrites, hourly windows, numShards=4).
>>
>> I would like to partition the stream by some key in the element, so that
>> all elements with the same key will get processed by the same shard writer,
>> and therefore written to the same file. Is there a way to do this? Note
>> that in my stream the number of keys is very large (most elements have a
>> unique key, while a few elements share a key).
>>
>> Thanks,
>> Josh
>>
>
>


Re: How to decrease latency when using PubsubIO.Read?

2017-05-24 Thread Josh
Ah ok - I am using the Dataflow runner. I didn't realise about the custom
implementation being provided at runtime...

Any ideas of how to tweak my job to either lower the latency consuming from
PubSub or to lower the latency in writing to Bigtable?


On Wed, May 24, 2017 at 4:14 PM, Lukasz Cwik  wrote:

> What runner are you using (Flink, Spark, Google Cloud Dataflow, Apex, ...)?
>
> On Wed, May 24, 2017 at 8:09 AM, Ankur Chauhan  wrote:
>
>> Sorry that was an autocorrect error. I meant to ask - what dataflow
>> runner are you using? If you are using google cloud dataflow then the
>> PubsubIO class is not the one doing the reading from the pubsub topic. They
>> provide a custom implementation at run time.
>>
>> Ankur Chauhan
>> Sent from my iPhone
>>
>> On May 24, 2017, at 07:52, Josh  wrote:
>>
>> Hi Ankur,
>>
>> What do you mean by runner address?
>> Would you be able to link me to the comment you're referring to?
>>
>> I am using the PubsubIO.Read class from Beam 2.0.0 as found here:
>> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/
>> io/google-cloud-platform/src/main/java/org/apache/beam/sdk/
>> io/gcp/pubsub/PubsubIO.java
>>
>> Thanks,
>> Josh
>>
>> On Wed, May 24, 2017 at 3:36 PM, Ankur Chauhan 
>> wrote:
>>
>>> What runner address you using. Google cloud dataflow uses a closed
>>> source version of the pubsub reader as noted in a comment on Read class.
>>>
>>> Ankur Chauhan
>>> Sent from my iPhone
>>>
>>> On May 24, 2017, at 04:05, Josh  wrote:
>>>
>>> Hi all,
>>>
>>> I'm using PubsubIO.Read to consume a Pubsub stream, and my job then
>>> writes the data out to Bigtable. I'm currently seeing a latency of 3-5
>>> seconds between the messages being published and being written to Bigtable.
>>>
>>> I want to try and decrease the latency to <1s if possible - does anyone
>>> have any tips for doing this?
>>>
>>> I noticed that there is a PubsubGrpcClient
>>> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/
>>> io/google-cloud-platform/src/main/java/org/apache/beam/sdk/i
>>> o/gcp/pubsub/PubsubGrpcClient.java however the PubsubUnboundedSource is
>>> initialised with a PubsubJsonClient, so the Grpc client doesn't appear to
>>> be being used. Is there a way to switch to the Grpc client - as perhaps
>>> that would give better performance?
>>>
>>> Also, I am running my job on Dataflow using autoscaling, which has only
>>> allocated one n1-standard-4 instance to the job, which is running at
>>> ~50% CPU. Could forcing a higher number of nodes help improve latency?
>>>
>>> Thanks for any advice,
>>> Josh
>>>
>>>
>>
>


Re: How to decrease latency when using PubsubIO.Read?

2017-05-24 Thread Lukasz Cwik
What runner are you using (Flink, Spark, Google Cloud Dataflow, Apex, ...)?

On Wed, May 24, 2017 at 8:09 AM, Ankur Chauhan  wrote:

> Sorry that was an autocorrect error. I meant to ask - what dataflow runner
> are you using? If you are using google cloud dataflow then the PubsubIO
> class is not the one doing the reading from the pubsub topic. They provide
> a custom implementation at run time.
>
> Ankur Chauhan
> Sent from my iPhone
>
> On May 24, 2017, at 07:52, Josh  wrote:
>
> Hi Ankur,
>
> What do you mean by runner address?
> Would you be able to link me to the comment you're referring to?
>
> I am using the PubsubIO.Read class from Beam 2.0.0 as found here:
> https://github.com/apache/beam/blob/release-2.0.0/sdks/
> java/io/google-cloud-platform/src/main/java/org/apache/beam/
> sdk/io/gcp/pubsub/PubsubIO.java
>
> Thanks,
> Josh
>
> On Wed, May 24, 2017 at 3:36 PM, Ankur Chauhan  wrote:
>
>> What runner address you using. Google cloud dataflow uses a closed source
>> version of the pubsub reader as noted in a comment on Read class.
>>
>> Ankur Chauhan
>> Sent from my iPhone
>>
>> On May 24, 2017, at 04:05, Josh  wrote:
>>
>> Hi all,
>>
>> I'm using PubsubIO.Read to consume a Pubsub stream, and my job then
>> writes the data out to Bigtable. I'm currently seeing a latency of 3-5
>> seconds between the messages being published and being written to Bigtable.
>>
>> I want to try and decrease the latency to <1s if possible - does anyone
>> have any tips for doing this?
>>
>> I noticed that there is a PubsubGrpcClient https://github.com/apache/beam
>> /blob/release-2.0.0/sdks/java/io/google-cloud-platform/src/m
>> ain/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java however
>> the PubsubUnboundedSource is initialised with a PubsubJsonClient, so the
>> Grpc client doesn't appear to be being used. Is there a way to switch to
>> the Grpc client - as perhaps that would give better performance?
>>
>> Also, I am running my job on Dataflow using autoscaling, which has only
>> allocated one n1-standard-4 instance to the job, which is running at
>> ~50% CPU. Could forcing a higher number of nodes help improve latency?
>>
>> Thanks for any advice,
>> Josh
>>
>>
>


Re: How to partition a stream by key before writing with FileBasedSink?

2017-05-24 Thread Lukasz Cwik
Since your using a small number of shards, add a Partition transform which
uses a deterministic hash of the key to choose one of 4 partitions. Write
each partition with a single shard.

(Fixed width diagram below)
Pipeline -> AvroIO(numShards = 4)
Becomes:
Pipeline -> Partition --> AvroIO(numShards = 1)
  |-> AvroIO(numShards = 1)
  |-> AvroIO(numShards = 1)
  \-> AvroIO(numShards = 1)

On Wed, May 24, 2017 at 1:05 AM, Josh  wrote:

> Hi,
>
> I am using a FileBasedSink (AvroIO.write) on an unbounded stream
> (withWindowedWrites, hourly windows, numShards=4).
>
> I would like to partition the stream by some key in the element, so that
> all elements with the same key will get processed by the same shard writer,
> and therefore written to the same file. Is there a way to do this? Note
> that in my stream the number of keys is very large (most elements have a
> unique key, while a few elements share a key).
>
> Thanks,
> Josh
>


Re: How to decrease latency when using PubsubIO.Read?

2017-05-24 Thread Ankur Chauhan
Sorry that was an autocorrect error. I meant to ask - what dataflow runner are 
you using? If you are using google cloud dataflow then the PubsubIO class is 
not the one doing the reading from the pubsub topic. They provide a custom 
implementation at run time.

Ankur Chauhan 
Sent from my iPhone

> On May 24, 2017, at 07:52, Josh  wrote:
> 
> Hi Ankur,
> 
> What do you mean by runner address?
> Would you be able to link me to the comment you're referring to?
> 
> I am using the PubsubIO.Read class from Beam 2.0.0 as found here:
> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
> 
> Thanks,
> Josh
> 
>> On Wed, May 24, 2017 at 3:36 PM, Ankur Chauhan  wrote:
>> What runner address you using. Google cloud dataflow uses a closed source 
>> version of the pubsub reader as noted in a comment on Read class. 
>> 
>> Ankur Chauhan
>> Sent from my iPhone
>> 
>>> On May 24, 2017, at 04:05, Josh  wrote:
>>> 
>>> Hi all,
>>> 
>>> I'm using PubsubIO.Read to consume a Pubsub stream, and my job then writes 
>>> the data out to Bigtable. I'm currently seeing a latency of 3-5 seconds 
>>> between the messages being published and being written to Bigtable.
>>> 
>>> I want to try and decrease the latency to <1s if possible - does anyone 
>>> have any tips for doing this? 
>>> 
>>> I noticed that there is a PubsubGrpcClient 
>>> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
>>>  however the PubsubUnboundedSource is initialised with a PubsubJsonClient, 
>>> so the Grpc client doesn't appear to be being used. Is there a way to 
>>> switch to the Grpc client - as perhaps that would give better performance?
>>> 
>>> Also, I am running my job on Dataflow using autoscaling, which has only 
>>> allocated one n1-standard-4 instance to the job, which is running at ~50% 
>>> CPU. Could forcing a higher number of nodes help improve latency?
>>> 
>>> Thanks for any advice,
>>> Josh
> 


Re: Best way to load heavy object into memory on nodes (python sdk)

2017-05-24 Thread Lukasz Cwik
Why not use a singleton like pattern and have a function which either loads
and caches the ML model from a side input or returns the singleton if it
has been loaded.
You'll want to use some form of locking to ensure that you really only load
the ML model once.

On Wed, May 24, 2017 at 6:18 AM, Vilhelm von Ehrenheim <
vonehrenh...@gmail.com> wrote:

> Hi all!
> I would like to load a heavy object (think ML model) into memory that
> should be available in a ParDo for quick predictions.
>
> What is the preferred way of doing this without loading the model for each
> ParDo call (slow and will flood memory on the nodes). I don't seem to be
> able to do it in the DoFn's __init__ block either as this is only done once
> for all nodes (my guess here though) and then it breaks when replicated
> internally (even on the DirectRunner, I suspect it is pickled and this
> object cannot be pickled). If I load it as a side input it seems to still
> be loaded into memory separately for each ParDo.
>
> If there is a better way to handle it in Java I'm happy to do it there
> instead. It was just easier to attack the problem w python as the models
> were developed in python.
>
> Any sort of pointers or tips are welcome!
>
> Thanks!
> Vilhelm von Ehrenheim
>


Re: How to decrease latency when using PubsubIO.Read?

2017-05-24 Thread Josh
Hi Ankur,

What do you mean by runner address?
Would you be able to link me to the comment you're referring to?

I am using the PubsubIO.Read class from Beam 2.0.0 as found here:
https://github.com/apache/beam/blob/release-2.0.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java

Thanks,
Josh

On Wed, May 24, 2017 at 3:36 PM, Ankur Chauhan  wrote:

> What runner address you using. Google cloud dataflow uses a closed source
> version of the pubsub reader as noted in a comment on Read class.
>
> Ankur Chauhan
> Sent from my iPhone
>
> On May 24, 2017, at 04:05, Josh  wrote:
>
> Hi all,
>
> I'm using PubsubIO.Read to consume a Pubsub stream, and my job then writes
> the data out to Bigtable. I'm currently seeing a latency of 3-5 seconds
> between the messages being published and being written to Bigtable.
>
> I want to try and decrease the latency to <1s if possible - does anyone
> have any tips for doing this?
>
> I noticed that there is a PubsubGrpcClient https://github.com/apache/beam
> /blob/release-2.0.0/sdks/java/io/google-cloud-platform/src/
> main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java however
> the PubsubUnboundedSource is initialised with a PubsubJsonClient, so the
> Grpc client doesn't appear to be being used. Is there a way to switch to
> the Grpc client - as perhaps that would give better performance?
>
> Also, I am running my job on Dataflow using autoscaling, which has only
> allocated one n1-standard-4 instance to the job, which is running at ~50%
> CPU. Could forcing a higher number of nodes help improve latency?
>
> Thanks for any advice,
> Josh
>
>


Re: How to decrease latency when using PubsubIO.Read?

2017-05-24 Thread Ankur Chauhan
What runner address you using. Google cloud dataflow uses a closed source 
version of the pubsub reader as noted in a comment on Read class. 

Ankur Chauhan
Sent from my iPhone

> On May 24, 2017, at 04:05, Josh  wrote:
> 
> Hi all,
> 
> I'm using PubsubIO.Read to consume a Pubsub stream, and my job then writes 
> the data out to Bigtable. I'm currently seeing a latency of 3-5 seconds 
> between the messages being published and being written to Bigtable.
> 
> I want to try and decrease the latency to <1s if possible - does anyone have 
> any tips for doing this? 
> 
> I noticed that there is a PubsubGrpcClient 
> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
>  however the PubsubUnboundedSource is initialised with a PubsubJsonClient, so 
> the Grpc client doesn't appear to be being used. Is there a way to switch to 
> the Grpc client - as perhaps that would give better performance?
> 
> Also, I am running my job on Dataflow using autoscaling, which has only 
> allocated one n1-standard-4 instance to the job, which is running at ~50% 
> CPU. Could forcing a higher number of nodes help improve latency?
> 
> Thanks for any advice,
> Josh


Re: Sample Data

2017-05-24 Thread Prabeesh K.
Hi Vilhelm,

This is what exactly what I am looking for. Thank you so much.

Regards,
Prabeesh K.

On 24 May 2017 at 17:26, Vilhelm von Ehrenheim 
wrote:

> Hi!
> Have you tried beam.combiners.Sample.FixedSizeGlobally(n)? There is also
> a Sample function per Key.
> They are defined here: https://github.com/apache/
> beam/blob/v2.0.0/sdks/python/apache_beam/transforms/combiners.py#L368
>
> Hope it helps. If not could you explain in more detail what you mean?
>
> Br,
> Vilhelm von Ehrenheim
> ​
>
> On Wed, May 24, 2017 at 8:37 AM, Prabeesh K.  wrote:
>
>>
>> How to we can take sample data in Python?
>>
>
>


Re: Sample Data

2017-05-24 Thread Vilhelm von Ehrenheim
Hi!
Have you tried beam.combiners.Sample.FixedSizeGlobally(n)? There is also a
Sample function per Key.
They are defined here:
https://github.com/apache/beam/blob/v2.0.0/sdks/python/apache_beam/transforms/combiners.py#L368

Hope it helps. If not could you explain in more detail what you mean?

Br,
Vilhelm von Ehrenheim
​

On Wed, May 24, 2017 at 8:37 AM, Prabeesh K.  wrote:

>
> How to we can take sample data in Python?
>


Best way to load heavy object into memory on nodes (python sdk)

2017-05-24 Thread Vilhelm von Ehrenheim
Hi all!
I would like to load a heavy object (think ML model) into memory that
should be available in a ParDo for quick predictions.

What is the preferred way of doing this without loading the model for each
ParDo call (slow and will flood memory on the nodes). I don't seem to be
able to do it in the DoFn's __init__ block either as this is only done once
for all nodes (my guess here though) and then it breaks when replicated
internally (even on the DirectRunner, I suspect it is pickled and this
object cannot be pickled). If I load it as a side input it seems to still
be loaded into memory separately for each ParDo.

If there is a better way to handle it in Java I'm happy to do it there
instead. It was just easier to attack the problem w python as the models
were developed in python.

Any sort of pointers or tips are welcome!

Thanks!
Vilhelm von Ehrenheim


How to decrease latency when using PubsubIO.Read?

2017-05-24 Thread Josh
Hi all,

I'm using PubsubIO.Read to consume a Pubsub stream, and my job then writes
the data out to Bigtable. I'm currently seeing a latency of 3-5 seconds
between the messages being published and being written to Bigtable.

I want to try and decrease the latency to <1s if possible - does anyone
have any tips for doing this?

I noticed that there is a PubsubGrpcClient https://github.com/apache/
beam/blob/release-2.0.0/sdks/java/io/google-cloud-platform/
src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
however the PubsubUnboundedSource is initialised with a PubsubJsonClient,
so the Grpc client doesn't appear to be being used. Is there a way to
switch to the Grpc client - as perhaps that would give better performance?

Also, I am running my job on Dataflow using autoscaling, which has only
allocated one n1-standard-4 instance to the job, which is running at ~50%
CPU. Could forcing a higher number of nodes help improve latency?

Thanks for any advice,
Josh


How to partition a stream by key before writing with FileBasedSink?

2017-05-24 Thread Josh
Hi,

I am using a FileBasedSink (AvroIO.write) on an unbounded stream
(withWindowedWrites, hourly windows, numShards=4).

I would like to partition the stream by some key in the element, so that
all elements with the same key will get processed by the same shard writer,
and therefore written to the same file. Is there a way to do this? Note
that in my stream the number of keys is very large (most elements have a
unique key, while a few elements share a key).

Thanks,
Josh