Re: Installing non-native Python dependencies in Dataflow

2017-06-06 Thread Ahmet Altay
Pinning setuptools is generally not a good practice. The reason is at
installation time it might cause removal of the the setuptools that is
being used to install packages.

FWIW, dataflow workers should have setuptools 33.1.1, which was released in
2017/01/16.

Ahmet

On Tue, Jun 6, 2017 at 6:53 PM, Dmitry Demeshchuk 
wrote:

> Thanks, Ahmet, it really turned out that Stackdriver had more logs than
> just the Dataflow logs section.
>
> So, I ended up seeing this code that fails constantly:
>
> IRunning setup.py install for dataflow: started
> I  Running setup.py install for dataflow: finished with status 'error'
> I  Complete output from command /usr/bin/python -u -c "import setuptools, 
> tokenize;__file__='/tmp/pip-bXyST4-build/setup.py';f=getattr(tokenize, 
> 'open', open)(__file__);code=f.read().replace('\r\n', 
> '\n');f.close();exec(compile(code, __file__, 'exec'))" install --record 
> /tmp/pip-sHw6oI-record/install-record.txt --single-version-externally-managed 
> --compile:
> I  usage: -c [global_opts] cmd1 [cmd1_opts] [cmd2 [cmd2_opts] ...]
> I or: -c --help [cmd1 cmd2 ...]
> I or: -c --help-commands
> I or: -c cmd --help
> I
> I  error: option --single-version-externally-managed not recognized
> I
> I  
> I  Command "/usr/bin/python -u -c "import setuptools, 
> tokenize;__file__='/tmp/pip-bXyST4-build/setup.py';f=getattr(tokenize, 
> 'open', open)(__file__);code=f.read().replace('\r\n', 
> '\n');f.close();exec(compile(code, __file__, 'exec'))" install --record 
> /tmp/pip-sHw6oI-record/install-record.txt --single-version-externally-managed 
> --compile" failed with error code 1 in /tmp/pip-bXyST4-build/
> I  /usr/local/bin/pip failed with exit status 1
>
>
> This seems to mean that the natively installed setuptools are too old, and
> the new command has been generated with a newer version of setuptools
> (specifically, my project has setuptools==36.0.1 as a dependency of some
> package). I'm still digging more through the Stackdriver logs but so far
> couldn't find out the exact reason of the failure.
>
> Also talking to the Dataflow folks, maybe they'll have a better idea. I'll
> also try to compare this to the output of successful pipelines and see if
> it gives me any ideas.
>
> Thank you.
>
> On Tue, Jun 6, 2017 at 4:40 PM, Ahmet Altay  wrote:
>
>>
>>
>> On Tue, Jun 6, 2017 at 2:07 PM, Dmitry Demeshchuk 
>> wrote:
>>
>>> Hi Ahmet,
>>>
>>> Thanks a lot for pointing out that doc, I somehow missed it from the
>>> official Python SDK page!
>>>
>>> One thing that comes to my mind is that generally one should probably
>>> use the 'install' command in setuptools, not 'build', like it's done in
>>> https://github.com/apache/beam/blob/master/sdks/python/ap
>>> ache_beam/examples/complete/juliaset/setup.py#L113. Reason being, the
>>> 'build' step seems to be executed on the original machine, not inside the
>>> runner's containers, while 'install' will be triggered inside of them. If I
>>> run a pipeline that uses setup.py with a "build" step, it fails due to
>>> being unable to "apt-get install libpq-dev" on a mac.
>>>
>>
>> Thank you. This example should similarly work in install commands I
>> believe. Also, if possible please file a JIRA issue with your ideas and we
>> can work on improving things.
>>
>>
>>>
>>> I'm still trying to make it work with either build or install steps,
>>> talking to the Dataflow folks in parallel to get more understanding of what
>>> I'm doing wrong (Dataflow doesn't send out installation failure logs to
>>> Stackdriver, only runtime logs, so it seems).
>>>
>>
>> Have you tried looking worker-startup logs? All of the logs should be in
>> stackdriver.
>>
>>
>>>
>>> On Tue, Jun 6, 2017 at 9:21 AM, Ahmet Altay  wrote:
>>>
 Hi,

 Please see Managing Python Pipeline Dependencies [1] for various ways
 on installing additional dependencies. The section on non-python
 dependencies is relevant to your question.

 Thank you,
 Ahmet

 [1] https://beam.apache.org/documentation/sdks/python-pipeli
 ne-dependencies/

 On Mon, Jun 5, 2017 at 11:52 PM, Morand, Sebastien <
 sebastien.mor...@veolia.com> wrote:

> Hi,
>
> Interested too. Could be fine for instance to add sftp BoundedSource,
> but compilalation of paramiko with ssl library (and so installation of
> ssl-dev)
>
> Regards,
>
> *Sébastien MORAND*
> Team Lead Solution Architect
> Technology & Operations / Digital Factory
> Veolia - Group Information Systems & Technology (IS)
> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08
> <+33%201%2085%2057%2071%2008>
> Bureau 0144C (Ouest)
> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France
> *www.veolia.com *
> 
> 

Re: Installing non-native Python dependencies in Dataflow

2017-06-06 Thread Ahmet Altay
On Tue, Jun 6, 2017 at 2:07 PM, Dmitry Demeshchuk 
wrote:

> Hi Ahmet,
>
> Thanks a lot for pointing out that doc, I somehow missed it from the
> official Python SDK page!
>
> One thing that comes to my mind is that generally one should probably use
> the 'install' command in setuptools, not 'build', like it's done in
> https://github.com/apache/beam/blob/master/sdks/python/ap
> ache_beam/examples/complete/juliaset/setup.py#L113. Reason being, the
> 'build' step seems to be executed on the original machine, not inside the
> runner's containers, while 'install' will be triggered inside of them. If I
> run a pipeline that uses setup.py with a "build" step, it fails due to
> being unable to "apt-get install libpq-dev" on a mac.
>

Thank you. This example should similarly work in install commands I
believe. Also, if possible please file a JIRA issue with your ideas and we
can work on improving things.


>
> I'm still trying to make it work with either build or install steps,
> talking to the Dataflow folks in parallel to get more understanding of what
> I'm doing wrong (Dataflow doesn't send out installation failure logs to
> Stackdriver, only runtime logs, so it seems).
>

Have you tried looking worker-startup logs? All of the logs should be in
stackdriver.


>
> On Tue, Jun 6, 2017 at 9:21 AM, Ahmet Altay  wrote:
>
>> Hi,
>>
>> Please see Managing Python Pipeline Dependencies [1] for various ways on
>> installing additional dependencies. The section on non-python dependencies
>> is relevant to your question.
>>
>> Thank you,
>> Ahmet
>>
>> [1] https://beam.apache.org/documentation/sdks/python-pipeli
>> ne-dependencies/
>>
>> On Mon, Jun 5, 2017 at 11:52 PM, Morand, Sebastien <
>> sebastien.mor...@veolia.com> wrote:
>>
>>> Hi,
>>>
>>> Interested too. Could be fine for instance to add sftp BoundedSource,
>>> but compilalation of paramiko with ssl library (and so installation of
>>> ssl-dev)
>>>
>>> Regards,
>>>
>>> *Sébastien MORAND*
>>> Team Lead Solution Architect
>>> Technology & Operations / Digital Factory
>>> Veolia - Group Information Systems & Technology (IS)
>>> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08
>>> <+33%201%2085%2057%2071%2008>
>>> Bureau 0144C (Ouest)
>>> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France
>>> *www.veolia.com *
>>> 
>>> 
>>> 
>>> 
>>> 
>>>
>>> On 6 June 2017 at 08:01, Dmitry Demeshchuk  wrote:
>>>
 Hi again, folks,

 How should I go about installing Python packages that require to be
 built and/or require native dependencies like shared libraries or such?

 I guess, I could potentially build the C-based modules using the same
 version of kernel and glibc that Dataflow is running, but doesn't seem like
 there's any way to install shared libraries at these boxes, right?

 Thanks!

 --
 Best regards,
 Dmitry Demeshchuk.

>>>
>>>
>>>
>>> 
>>> 
>>> This e-mail transmission (message and any attached files) may contain
>>> information that is proprietary, privileged and/or confidential to Veolia
>>> Environnement and/or its affiliates and is intended exclusively for the
>>> person(s) to whom it is addressed. If you are not the intended recipient,
>>> please notify the sender by return e-mail and delete all copies of this
>>> e-mail, including all attachments. Unless expressly authorized, any use,
>>> disclosure, publication, retransmission or dissemination of this e-mail
>>> and/or of its attachments is strictly prohibited.
>>>
>>> Ce message electronique et ses fichiers attaches sont strictement
>>> confidentiels et peuvent contenir des elements dont Veolia Environnement
>>> et/ou l'une de ses entites affiliees sont proprietaires. Ils sont donc
>>> destines a l'usage de leurs seuls destinataires. Si vous avez recu ce
>>> message par erreur, merci de le retourner a son emetteur et de le detruire
>>> ainsi que toutes les pieces attachees. L'utilisation, la divulgation, la
>>> publication, la distribution, ou la reproduction non expressement
>>> autorisees de ce message et de ses pieces attachees sont interdites.
>>> 
>>> 
>>>
>>
>>
>
>
> --
> Best regards,
> Dmitry Demeshchuk.
>


Re: Installing non-native Python dependencies in Dataflow

2017-06-06 Thread Dmitry Demeshchuk
Hi Ahmet,

Thanks a lot for pointing out that doc, I somehow missed it from the
official Python SDK page!

One thing that comes to my mind is that generally one should probably use
the 'install' command in setuptools, not 'build', like it's done in
https://github.com/apache/beam/blob/master/sdks/python/ap
ache_beam/examples/complete/juliaset/setup.py#L113. Reason being, the
'build' step seems to be executed on the original machine, not inside the
runner's containers, while 'install' will be triggered inside of them. If I
run a pipeline that uses setup.py with a "build" step, it fails due to
being unable to "apt-get install libpq-dev" on a mac.

I'm still trying to make it work with either build or install steps,
talking to the Dataflow folks in parallel to get more understanding of what
I'm doing wrong (Dataflow doesn't send out installation failure logs to
Stackdriver, only runtime logs, so it seems).

On Tue, Jun 6, 2017 at 9:21 AM, Ahmet Altay  wrote:

> Hi,
>
> Please see Managing Python Pipeline Dependencies [1] for various ways on
> installing additional dependencies. The section on non-python dependencies
> is relevant to your question.
>
> Thank you,
> Ahmet
>
> [1] https://beam.apache.org/documentation/sdks/python-
> pipeline-dependencies/
>
> On Mon, Jun 5, 2017 at 11:52 PM, Morand, Sebastien <
> sebastien.mor...@veolia.com> wrote:
>
>> Hi,
>>
>> Interested too. Could be fine for instance to add sftp BoundedSource, but
>> compilalation of paramiko with ssl library (and so installation of ssl-dev)
>>
>> Regards,
>>
>> *Sébastien MORAND*
>> Team Lead Solution Architect
>> Technology & Operations / Digital Factory
>> Veolia - Group Information Systems & Technology (IS)
>> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08
>> <+33%201%2085%2057%2071%2008>
>> Bureau 0144C (Ouest)
>> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France
>> *www.veolia.com *
>> 
>> 
>> 
>> 
>> 
>>
>> On 6 June 2017 at 08:01, Dmitry Demeshchuk  wrote:
>>
>>> Hi again, folks,
>>>
>>> How should I go about installing Python packages that require to be
>>> built and/or require native dependencies like shared libraries or such?
>>>
>>> I guess, I could potentially build the C-based modules using the same
>>> version of kernel and glibc that Dataflow is running, but doesn't seem like
>>> there's any way to install shared libraries at these boxes, right?
>>>
>>> Thanks!
>>>
>>> --
>>> Best regards,
>>> Dmitry Demeshchuk.
>>>
>>
>>
>>
>> 
>> 
>> This e-mail transmission (message and any attached files) may contain
>> information that is proprietary, privileged and/or confidential to Veolia
>> Environnement and/or its affiliates and is intended exclusively for the
>> person(s) to whom it is addressed. If you are not the intended recipient,
>> please notify the sender by return e-mail and delete all copies of this
>> e-mail, including all attachments. Unless expressly authorized, any use,
>> disclosure, publication, retransmission or dissemination of this e-mail
>> and/or of its attachments is strictly prohibited.
>>
>> Ce message electronique et ses fichiers attaches sont strictement
>> confidentiels et peuvent contenir des elements dont Veolia Environnement
>> et/ou l'une de ses entites affiliees sont proprietaires. Ils sont donc
>> destines a l'usage de leurs seuls destinataires. Si vous avez recu ce
>> message par erreur, merci de le retourner a son emetteur et de le detruire
>> ainsi que toutes les pieces attachees. L'utilisation, la divulgation, la
>> publication, la distribution, ou la reproduction non expressement
>> autorisees de ce message et de ses pieces attachees sont interdites.
>> 
>> 
>>
>
>


-- 
Best regards,
Dmitry Demeshchuk.


Re: How to partition a stream by key before writing with FileBasedSink?

2017-06-06 Thread Lukasz Cwik
Your right, the window acts as a secondary key within GroupByKey
(KeyA,Window1 != KeyA,Window2), which means that each of those two
composite keys can be scheduled to execute at the same time.

At this point I think you should challenge your limited parallelism
requirement as you'll need to build something outside of Apache Beam to
provide these parallelization limits across windows (e.g. lock within the
same process when limiting yourself to a single machine, distributed lock
service when dealing with multiple machines).

The backlog of data is either going to grow infinitely at the GroupByKey or
grow infinitely at the source if your pipeline can't keep up. It is up to
the Runner to be smart and not produce a giant backlog at the GroupByKey
since it knows how fast work is being completed (unfortunately I don't know
if any Runner is this smart yet to push the backlog up to the source).

On Tue, Jun 6, 2017 at 11:03 AM, Josh  wrote:

> I see, thanks for the tips!
>
> Last question about this! How could this be adapted to work in a
> unbounded/streaming job? To work in an unbounded job, I need to put a
> Window.into with a trigger before GroupByKey.
> I guess this would mean that the "shard gets processed by a single thread
> in MyDofn" guarantee will only apply to messages within a single window,
> and would not apply across windows?
> If this is the case, is there a better solution? I would like to avoid
> buffering data in windows, and want the shard guarantee to apply across
> windows.
>
>
>
> On Tue, Jun 6, 2017 at 5:42 PM, Lukasz Cwik  wrote:
>
>> Your code looks like what I was describing. My only comment would be to
>> use a deterministic hashing function which is stable across JVM versions
>> and JVM instances as it will help in making your pipeline consistent across
>> different runs/environments.
>>
>> Parallelizing across 8 instances instead of 4 would break the contract
>> around GroupByKey (since it didn't group all the elements for a key
>> correctly). Also, each element is the smallest unit of work and
>> specifically in your pipeline you have chosen to reduce all your elements
>> into 4 logical elements (each containing some proportion of your original
>> data).
>>
>> On Tue, Jun 6, 2017 at 9:37 AM, Josh  wrote:
>>
>>> Thanks for the reply, Lukasz.
>>>
>>>
>>> What I meant was that I want to shard my data by a "shard key", and be
>>> sure that any two elements with the same "shard key" are processed by the
>>> same thread on the same worker. (Or if that's not possible, by the same
>>> worker JVM with no thread guarantee would be good enough). It doesn't
>>> actually matter to me whether there's 1 or 4 or 100 DoFn instances
>>> processing the data.
>>>
>>>
>>> It sounds like what you suggested will work for this, with the downside
>>> of me needing to choose a number of shards/DoFns (e.g. 4).
>>>
>>> It seems a bit long and messy but am I right in thinking it would look
>>> like this? ...
>>>
>>>
>>> PCollection elements = ...;
>>>
>>> elements
>>>
>>> .apply(MapElements
>>>
>>> .into(TypeDescriptors.kvs(TypeDescriptors.integers(),
>>> TypeDescriptor.of(MyElement.class)))
>>>
>>> .via((MyElement e) -> KV.of(
>>>
>>> e.getKey().toString().hashCode() % 4, e)))
>>>
>>> .apply(GroupByKey.create())
>>>
>>> .apply(Partition.of(4,
>>>
>>> (Partition.PartitionFn>) (kv, i) ->
>>> kv.getKey()))
>>>
>>> .apply(ParDo.of(new MyDofn()));
>>>
>>> // Where MyDofn must be changed to handle a KV>> Iterable> as input instead of just a MyElement
>>>
>>>
>>> I was wondering is there a guarantee that the runner won't parallelise
>>> the final MyDofn across e.g. 8 instances instead of 4? If there are two
>>> input elements with the same key are they actually guaranteed to be
>>> processed on the same instance?
>>>
>>>
>>> Thanks,
>>>
>>> Josh
>>>
>>>
>>>
>>>
>>> On Tue, Jun 6, 2017 at 4:51 PM, Lukasz Cwik  wrote:
>>>
 I think this is what your asking for but your statement about 4
 instances is unclear as to whether that is 4 copies of the same DoFn or 4
 completely different DoFns. Also its unclear what you mean by
 instance/thread, I'm assuming that you want at most 4 instances of a DoFn
 each being processed by a single thread.

 This is a bad idea because you limit your parallelism but this is
 similar to what the default file sharding logic does. In Apache Beam the
 smallest unit of output for a GroupByKey is a single key+iterable pair. We
 exploit this by assigning all our values to a fixed number of keys and then
 performing a GroupByKey. This is the same trick that powers the file
 sharding logic in AvroIO/TextIO/...

 Your pipeline would look like (fixed width font diagram):
 your data  -> apply shard key   -> GroupByKey->
 partition by key -> your dofn #1

  \> your dofn #2

  \> ...
 a  

Re: How to partition a stream by key before writing with FileBasedSink?

2017-06-06 Thread Josh
I see, thanks for the tips!

Last question about this! How could this be adapted to work in a
unbounded/streaming job? To work in an unbounded job, I need to put a
Window.into with a trigger before GroupByKey.
I guess this would mean that the "shard gets processed by a single thread
in MyDofn" guarantee will only apply to messages within a single window,
and would not apply across windows?
If this is the case, is there a better solution? I would like to avoid
buffering data in windows, and want the shard guarantee to apply across
windows.



On Tue, Jun 6, 2017 at 5:42 PM, Lukasz Cwik  wrote:

> Your code looks like what I was describing. My only comment would be to
> use a deterministic hashing function which is stable across JVM versions
> and JVM instances as it will help in making your pipeline consistent across
> different runs/environments.
>
> Parallelizing across 8 instances instead of 4 would break the contract
> around GroupByKey (since it didn't group all the elements for a key
> correctly). Also, each element is the smallest unit of work and
> specifically in your pipeline you have chosen to reduce all your elements
> into 4 logical elements (each containing some proportion of your original
> data).
>
> On Tue, Jun 6, 2017 at 9:37 AM, Josh  wrote:
>
>> Thanks for the reply, Lukasz.
>>
>>
>> What I meant was that I want to shard my data by a "shard key", and be
>> sure that any two elements with the same "shard key" are processed by the
>> same thread on the same worker. (Or if that's not possible, by the same
>> worker JVM with no thread guarantee would be good enough). It doesn't
>> actually matter to me whether there's 1 or 4 or 100 DoFn instances
>> processing the data.
>>
>>
>> It sounds like what you suggested will work for this, with the downside
>> of me needing to choose a number of shards/DoFns (e.g. 4).
>>
>> It seems a bit long and messy but am I right in thinking it would look
>> like this? ...
>>
>>
>> PCollection elements = ...;
>>
>> elements
>>
>> .apply(MapElements
>>
>> .into(TypeDescriptors.kvs(TypeDescriptors.integers(),
>> TypeDescriptor.of(MyElement.class)))
>>
>> .via((MyElement e) -> KV.of(
>>
>> e.getKey().toString().hashCode() % 4, e)))
>>
>> .apply(GroupByKey.create())
>>
>> .apply(Partition.of(4,
>>
>> (Partition.PartitionFn>) (kv, i) ->
>> kv.getKey()))
>>
>> .apply(ParDo.of(new MyDofn()));
>>
>> // Where MyDofn must be changed to handle a KV> Iterable> as input instead of just a MyElement
>>
>>
>> I was wondering is there a guarantee that the runner won't parallelise
>> the final MyDofn across e.g. 8 instances instead of 4? If there are two
>> input elements with the same key are they actually guaranteed to be
>> processed on the same instance?
>>
>>
>> Thanks,
>>
>> Josh
>>
>>
>>
>>
>> On Tue, Jun 6, 2017 at 4:51 PM, Lukasz Cwik  wrote:
>>
>>> I think this is what your asking for but your statement about 4
>>> instances is unclear as to whether that is 4 copies of the same DoFn or 4
>>> completely different DoFns. Also its unclear what you mean by
>>> instance/thread, I'm assuming that you want at most 4 instances of a DoFn
>>> each being processed by a single thread.
>>>
>>> This is a bad idea because you limit your parallelism but this is
>>> similar to what the default file sharding logic does. In Apache Beam the
>>> smallest unit of output for a GroupByKey is a single key+iterable pair. We
>>> exploit this by assigning all our values to a fixed number of keys and then
>>> performing a GroupByKey. This is the same trick that powers the file
>>> sharding logic in AvroIO/TextIO/...
>>>
>>> Your pipeline would look like (fixed width font diagram):
>>> your data  -> apply shard key   -> GroupByKey->
>>> partition by key -> your dofn #1
>>>
>>>  \> your dofn #2
>>>
>>>  \> ...
>>> a  / b / c / d -> 1,a / 2,b / 1,c / 2,d -> 1,[a,c] / 2,[b,d] -> ???
>>>
>>> This is not exactly the same as processing a single DoFn instance/thread
>>> because it relies on the Runner to be able to schedule each key to be
>>> processed on a different machine. For example a Runner may choose to
>>> process value 1,[a,c] and 2,[b,d] sequentially on the same machine or may
>>> choose to distribute them.
>>>
>>>
>>>
>>> On Tue, Jun 6, 2017 at 8:13 AM, Josh  wrote:
>>>
 Hey Lukasz,

 I have a follow up question about this -

 What if I want to do something very similar, but instead of with 4
 instances of AvroIO following the partition transform, I want 4 instances
 of a DoFn that I've written. I want to ensure that each partition is
 processed by a single DoFn instance/thread. Is this possible with Beam?

 Thanks,
 Josh



 On Wed, May 24, 2017 at 6:15 PM, Josh  wrote:

> Ahh I see - Ok I'll try out this solution then. Thanks Lukasz!
>
> On Wed, May 24, 2017 at 

Re: How to partition a stream by key before writing with FileBasedSink?

2017-06-06 Thread Lukasz Cwik
Your code looks like what I was describing. My only comment would be to use
a deterministic hashing function which is stable across JVM versions and
JVM instances as it will help in making your pipeline consistent across
different runs/environments.

Parallelizing across 8 instances instead of 4 would break the contract
around GroupByKey (since it didn't group all the elements for a key
correctly). Also, each element is the smallest unit of work and
specifically in your pipeline you have chosen to reduce all your elements
into 4 logical elements (each containing some proportion of your original
data).

On Tue, Jun 6, 2017 at 9:37 AM, Josh  wrote:

> Thanks for the reply, Lukasz.
>
>
> What I meant was that I want to shard my data by a "shard key", and be
> sure that any two elements with the same "shard key" are processed by the
> same thread on the same worker. (Or if that's not possible, by the same
> worker JVM with no thread guarantee would be good enough). It doesn't
> actually matter to me whether there's 1 or 4 or 100 DoFn instances
> processing the data.
>
>
> It sounds like what you suggested will work for this, with the downside of
> me needing to choose a number of shards/DoFns (e.g. 4).
>
> It seems a bit long and messy but am I right in thinking it would look
> like this? ...
>
>
> PCollection elements = ...;
>
> elements
>
> .apply(MapElements
>
> .into(TypeDescriptors.kvs(TypeDescriptors.integers(),
> TypeDescriptor.of(MyElement.class)))
>
> .via((MyElement e) -> KV.of(
>
> e.getKey().toString().hashCode() % 4, e)))
>
> .apply(GroupByKey.create())
>
> .apply(Partition.of(4,
>
> (Partition.PartitionFn>) (kv, i) ->
> kv.getKey()))
>
> .apply(ParDo.of(new MyDofn()));
>
> // Where MyDofn must be changed to handle a KV Iterable> as input instead of just a MyElement
>
>
> I was wondering is there a guarantee that the runner won't parallelise the
> final MyDofn across e.g. 8 instances instead of 4? If there are two input
> elements with the same key are they actually guaranteed to be processed on
> the same instance?
>
>
> Thanks,
>
> Josh
>
>
>
>
> On Tue, Jun 6, 2017 at 4:51 PM, Lukasz Cwik  wrote:
>
>> I think this is what your asking for but your statement about 4 instances
>> is unclear as to whether that is 4 copies of the same DoFn or 4 completely
>> different DoFns. Also its unclear what you mean by instance/thread, I'm
>> assuming that you want at most 4 instances of a DoFn each being processed
>> by a single thread.
>>
>> This is a bad idea because you limit your parallelism but this is similar
>> to what the default file sharding logic does. In Apache Beam the smallest
>> unit of output for a GroupByKey is a single key+iterable pair. We exploit
>> this by assigning all our values to a fixed number of keys and then
>> performing a GroupByKey. This is the same trick that powers the file
>> sharding logic in AvroIO/TextIO/...
>>
>> Your pipeline would look like (fixed width font diagram):
>> your data  -> apply shard key   -> GroupByKey-> partition
>> by key -> your dofn #1
>>
>>\> your dofn #2
>>
>>\> ...
>> a  / b / c / d -> 1,a / 2,b / 1,c / 2,d -> 1,[a,c] / 2,[b,d] -> ???
>>
>> This is not exactly the same as processing a single DoFn instance/thread
>> because it relies on the Runner to be able to schedule each key to be
>> processed on a different machine. For example a Runner may choose to
>> process value 1,[a,c] and 2,[b,d] sequentially on the same machine or may
>> choose to distribute them.
>>
>>
>>
>> On Tue, Jun 6, 2017 at 8:13 AM, Josh  wrote:
>>
>>> Hey Lukasz,
>>>
>>> I have a follow up question about this -
>>>
>>> What if I want to do something very similar, but instead of with 4
>>> instances of AvroIO following the partition transform, I want 4 instances
>>> of a DoFn that I've written. I want to ensure that each partition is
>>> processed by a single DoFn instance/thread. Is this possible with Beam?
>>>
>>> Thanks,
>>> Josh
>>>
>>>
>>>
>>> On Wed, May 24, 2017 at 6:15 PM, Josh  wrote:
>>>
 Ahh I see - Ok I'll try out this solution then. Thanks Lukasz!

 On Wed, May 24, 2017 at 5:20 PM, Lukasz Cwik  wrote:

> Google Cloud Dataflow won't override your setting. The dynamic
> sharding occurs if you don't explicitly set a numShard value.
>
> On Wed, May 24, 2017 at 9:14 AM, Josh  wrote:
>
>> Hi Lukasz,
>>
>> Thanks for the example. That sounds like a nice solution -
>> I am running on Dataflow though, which dynamically sets numShards -
>> so if I set numShards to 1 on each of those AvroIO writers, I can't be 
>> sure
>> that Dataflow isn't going to override my setting right? I guess this 
>> should
>> work fine as long as I partition my stream into a large enough number of
>> partitions so that Dataflow won't override numShards.

Re: Installing non-native Python dependencies in Dataflow

2017-06-06 Thread Ahmet Altay
Hi,

Please see Managing Python Pipeline Dependencies [1] for various ways on
installing additional dependencies. The section on non-python dependencies
is relevant to your question.

Thank you,
Ahmet

[1] https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/

On Mon, Jun 5, 2017 at 11:52 PM, Morand, Sebastien <
sebastien.mor...@veolia.com> wrote:

> Hi,
>
> Interested too. Could be fine for instance to add sftp BoundedSource, but
> compilalation of paramiko with ssl library (and so installation of ssl-dev)
>
> Regards,
>
> *Sébastien MORAND*
> Team Lead Solution Architect
> Technology & Operations / Digital Factory
> Veolia - Group Information Systems & Technology (IS)
> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08
> <+33%201%2085%2057%2071%2008>
> Bureau 0144C (Ouest)
> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France
> *www.veolia.com *
> 
> 
> 
> 
> 
>
> On 6 June 2017 at 08:01, Dmitry Demeshchuk  wrote:
>
>> Hi again, folks,
>>
>> How should I go about installing Python packages that require to be built
>> and/or require native dependencies like shared libraries or such?
>>
>> I guess, I could potentially build the C-based modules using the same
>> version of kernel and glibc that Dataflow is running, but doesn't seem like
>> there's any way to install shared libraries at these boxes, right?
>>
>> Thanks!
>>
>> --
>> Best regards,
>> Dmitry Demeshchuk.
>>
>
>
>
> 
> 
> This e-mail transmission (message and any attached files) may contain
> information that is proprietary, privileged and/or confidential to Veolia
> Environnement and/or its affiliates and is intended exclusively for the
> person(s) to whom it is addressed. If you are not the intended recipient,
> please notify the sender by return e-mail and delete all copies of this
> e-mail, including all attachments. Unless expressly authorized, any use,
> disclosure, publication, retransmission or dissemination of this e-mail
> and/or of its attachments is strictly prohibited.
>
> Ce message electronique et ses fichiers attaches sont strictement
> confidentiels et peuvent contenir des elements dont Veolia Environnement
> et/ou l'une de ses entites affiliees sont proprietaires. Ils sont donc
> destines a l'usage de leurs seuls destinataires. Si vous avez recu ce
> message par erreur, merci de le retourner a son emetteur et de le detruire
> ainsi que toutes les pieces attachees. L'utilisation, la divulgation, la
> publication, la distribution, ou la reproduction non expressement
> autorisees de ce message et de ses pieces attachees sont interdites.
> 
> 
>


Re: Practices for running Python projects on Dataflow

2017-06-06 Thread Morand, Sebastien
Ok I made so many changements, I have no more the problem.

Thanks!

*Sébastien MORAND*
Team Lead Solution Architect
Technology & Operations / Digital Factory
Veolia - Group Information Systems & Technology (IS)
Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08
Bureau 0144C (Ouest)
30, rue Madeleine-Vionnet - 93300 Aubervilliers, France
*www.veolia.com *






On 6 June 2017 at 01:54, Ahmet Altay  wrote:

> Sébastien, what kind of an issue you had with using setup.py with
> installation_requires?
>
> On Mon, Jun 5, 2017 at 4:44 PM, Morand, Sebastien <
> sebastien.mor...@veolia.com> wrote:
>
>> Hi,
>>
>> I ran into trouble when using setup.py with installation_requires. So I
>> basically ended up with setup.py with no installation requirements inside +
>> requirements.txt :
>>
>> PIPELINE_OPTIONS = [
>> '--project={}'.format(projectname),
>> '--runner=DataflowRunner',
>> '--temp_location=gs://dataflow-run/temp',
>> '--staging_location=gs://dataflow-run/staging',
>> '--requirements_file=requirements.txt',
>> '--save_main_session',
>> '--setup_file=./setup.py'
>> ]
>>
>> with setup.py:
>> setup(
>> name='MyProject',
>> version='1.0',
>> description='My Description',
>> author='myself',
>> author_email='m...@whatever.com',
>> url='http://myurl.whatever.com',
>> package_dir={'': 'src'},
>> packages=[
>> 'package1',
>> 'package1.subpackage1',
>> 'package1.subpackage2',
>> 'package2'
>> ]
>> )
>>
>> Regards
>>
>> *Sébastien MORAND*
>> Team Lead Solution Architect
>> Technology & Operations / Digital Factory
>> Veolia - Group Information Systems & Technology (IS)
>> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08
>> <+33%201%2085%2057%2071%2008>
>> Bureau 0144C (Ouest)
>> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France
>> *www.veolia.com *
>> 
>> 
>> 
>> 
>> 
>>
>> On 5 June 2017 at 22:56, Dmitry Demeshchuk  wrote:
>>
>>> Hi list,
>>>
>>> Suppose, you have a private Python package that contains some code
>>> people want to be sharing when writing their pipelines.
>>>
>>> So, typically, the installation process of the package would be either
>>>
>>> pip install git+ssh://g...@github.com/mycompany/mypackage#egg=mypackage
>>>
>>> or
>>>
>>> git clone git://g...@github.com/mycompany/mypackage
>>> python setup.py mypackage/setup.py
>>>
>>> Now, the problem starts when we want to get that package into Dataflow.
>>> Right now, to my understanding, DataflowRunner supports 3 approaches:
>>>
>>>1.
>>>
>>>Specifying a requirements_file parameter in the pipeline options.
>>>This basically must be a requirements.txt file.
>>>2.
>>>
>>>Specifying an extra_packages parameter in the pipeline options. This
>>>must be a list of tarballs, each of which contains a Python package
>>>packaged using distutils.
>>>3.
>>>
>>>Specifying a setup_file parameter in the pipeline options. This will
>>>just run the python path/to/my/setup.py package command and then
>>>send the files over the wire.
>>>
>>> The best approach I could come up with was including an *additional*
>>> setup.py into the package itself, so that when we install that package,
>>> the setup.py file gets installed along with it. And then, I’d point the
>>> setup_file option to that file.
>>>
>>> This gist
>>> 
>>> shows the basic approach in code. Both setup.py and options.py are
>>> supposed to be present in the installed package.
>>>
>>> It kind of works for me, with some caveats, but I just wanted to find
>>> out if it’s a more decent way to handle my situation. I’m not keen on
>>> specifying that private package as a git dependency, because of having to
>>> worry about git credentials, but maybe there are other ways?
>>>
>>> Thanks!
>>> ​
>>> --
>>> Best regards,
>>> Dmitry Demeshchuk.
>>>
>>
>>
>>
>> 
>> 
>> This e-mail transmission (message and any attached files) may contain
>> information that is proprietary, privileged and/or confidential to Veolia
>> Environnement and/or its affiliates and is intended exclusively for the
>> person(s) to whom it is addressed. If you are not the intended recipient,
>> please notify the sender by return e-mail and delete all copies of this
>> e-mail, including all attachments. Unless expressly authorized, any use,
>> disclosure, publication, 

Re: Installing non-native Python dependencies in Dataflow

2017-06-06 Thread Morand, Sebastien
Hi,

Interested too. Could be fine for instance to add sftp BoundedSource, but
compilalation of paramiko with ssl library (and so installation of ssl-dev)

Regards,

*Sébastien MORAND*
Team Lead Solution Architect
Technology & Operations / Digital Factory
Veolia - Group Information Systems & Technology (IS)
Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08
Bureau 0144C (Ouest)
30, rue Madeleine-Vionnet - 93300 Aubervilliers, France
*www.veolia.com *






On 6 June 2017 at 08:01, Dmitry Demeshchuk  wrote:

> Hi again, folks,
>
> How should I go about installing Python packages that require to be built
> and/or require native dependencies like shared libraries or such?
>
> I guess, I could potentially build the C-based modules using the same
> version of kernel and glibc that Dataflow is running, but doesn't seem like
> there's any way to install shared libraries at these boxes, right?
>
> Thanks!
>
> --
> Best regards,
> Dmitry Demeshchuk.
>

-- 


This e-mail transmission (message and any attached files) may contain 
information that is proprietary, privileged and/or confidential to Veolia 
Environnement and/or its affiliates and is intended exclusively for the 
person(s) to whom it is addressed. If you are not the intended recipient, 
please notify the sender by return e-mail and delete all copies of this 
e-mail, including all attachments. Unless expressly authorized, any use, 
disclosure, publication, retransmission or dissemination of this e-mail 
and/or of its attachments is strictly prohibited. 

Ce message electronique et ses fichiers attaches sont strictement 
confidentiels et peuvent contenir des elements dont Veolia Environnement 
et/ou l'une de ses entites affiliees sont proprietaires. Ils sont donc 
destines a l'usage de leurs seuls destinataires. Si vous avez recu ce 
message par erreur, merci de le retourner a son emetteur et de le detruire 
ainsi que toutes les pieces attachees. L'utilisation, la divulgation, la 
publication, la distribution, ou la reproduction non expressement 
autorisees de ce message et de ses pieces attachees sont interdites.



Installing non-native Python dependencies in Dataflow

2017-06-06 Thread Dmitry Demeshchuk
Hi again, folks,

How should I go about installing Python packages that require to be built
and/or require native dependencies like shared libraries or such?

I guess, I could potentially build the C-based modules using the same
version of kernel and glibc that Dataflow is running, but doesn't seem like
there's any way to install shared libraries at these boxes, right?

Thanks!

-- 
Best regards,
Dmitry Demeshchuk.