Writing and serializing a custom WindowFn

2019-06-01 Thread Chad Dombrova
Hi all,
I'm currently investigating Beam for our company's needs, and I have a
question about how to solve a specific windowing problem in python.

I have a stream of elements and I want to gather them until a special
end-of-stream element arrives.

To solve this, I've written a modified version of window.Sessions that
takes an optional callable which is passed an element when it is being
assigned to a window.  If the callable returns True, the window is
immediately closed.  Here's the code:

https://gist.github.com/chadrik/b0dfff8953fed99f99bdd69c7cc870ba

It works as expected in the Direct runner, but fails in Dataflow, which I'm
pretty sure is due to a serialization problem.

So, I have a few questions:

1) Is there a way to do this in python using stock components? (i.e.
without my custom class)

2) If not, is there any interest in accepting a PR to modify the stock
window.Sessions to do something like what I've written?  It seems very
useful, and I've seen other people attempting to solve this same problem
[1][2]

3) If not, how do I go about serializing my custom WindowFn class?  From
looking at the source code I'm certain I could figure out how to extend the
serialization for a stock object like window.Sessions (modify
standard_window_fns.proto, and update to/from_runner_api_parameter), but
it's very unclear how I would do this for a custom WindowFn, since
serialization of these classes seems to be part of the official beam gRPC
portability protocol.

Thanks in advance for your help!
-chad


1.
https://stackoverflow.com/questions/49011360/using-a-custom-windowfn-on-google-dataflow-which-closes-after-an-element-value
2.
https://stackoverflow.com/questions/43035302/close-window-after-based-on-element-value/43052779#43052779


Re: Writing and serializing a custom WindowFn

2019-06-03 Thread Chad Dombrova
Hi Robert,
Thanks for the response.

As you've discovered, fully custom merging window fns are not yet
> supported portably, though this is on our todo list.
>
> https://issues.apache.org/jira/browse/BEAM-5601
>

Thanks for linking me to that.  I've watched it and voted for it, and maybe
I'll even take a peak at what it would take to implement if it appears to
be the best way forward for us.

Note that it's tricky to get the exact behavior you need as data may
> come in out of order. Consider, for example, three events of
> increasing timestamp e1, e2, e3 where e2 is the "end" event. It could
> happen that e1 and e3 get merged before e2 is seen, and there's no
> "unmerge" capability (the values may already be combined via an
> upstream combiner). How do you handle this?
>

This is something I've been wondering about myself.  I read the excellent
book Streaming Systems and it seems that the preferred way to solve this is
using event timestamps and watermarks, but that raised a few questions for
me.

First, just to clarify, you're correct that your example scenario could
occur _in processing time_ but our system can guarantee that it does not
happen in event time. i.e. The e2 "end" event will always have an event
timestamp later than e1 and e3.

So, can I use event time with watermarks to solve this problem?

IIUC, python does not have support for late arriving data, so that seems
like a pretty big issue.  If it did, would that be the preferred way of
solving this problem, or is that not enough?  If late data is indeed not
currently supported, then the critique of my custom WindowFn would apply to
Sessions in general would it not?

Is handling of late data in python something that's slated for an upcoming
release?

In the meantime, you could try getting this behavior with StatefulDoFns.
>

Is this fully supported by python now?  I've read some conflicting
information on the subject.

thanks again for the feedback!

(btw, I'm still experimenting with the static type checking issue:
https://github.com/python/mypy/issues/6933)

-chad


Re: [ANNOUNCE] Apache Beam 2.13.0 released!

2019-06-07 Thread Chad Dombrova
I saw this and was particularly excited about the new support for
"external" transforms in portable runners like python (i.e. the ability to
use the Java KafkaIO transforms, with presumably more to come in the
future).  While the release notes are useful, I will say that it takes a
lot of time and effort to sift through the release notes to find relevant
issues.  They're not grouped by sdk/component, and for example, not all of
the python issues include the word "python" in their title.  It would be
great to have a blurb on the Beam blog explaining the highlights.  An
example of a project that I think does this very well is mypy:
http://mypy-lang.blogspot.com/

thanks!
chad





On Fri, Jun 7, 2019 at 2:58 PM Kyle Weaver  wrote:

> Awesome! Thanks for leading the release Ankur.
>
> On Fri, Jun 7, 2019 at 2:57 PM Ankur Goenka  wrote:
>
>> The Apache Beam team is pleased to announce the release of version 2.13
>> .0!
>>
>> Apache Beam is an open source unified programming model to define and
>> execute data processing pipelines, including ETL, batch and stream
>> (continuous) processing. See https://beam.apache.org
>>
>> You can download the release here:
>>
>> https://beam.apache.org/get-started/downloads/
>>
>> This release includes bugfixes, features, and improvements detailed on
>> the Beam blog: https://beam.apache.org/blog/2019/05/22/beam-2.13.0.html
>>
>> Thanks to everyone who contributed to this release, and we hope you enjoy
>> using Beam 2.13.0.
>>
>> -- Ankur Goenka, on behalf of The Apache Beam team
>>
> --
> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com
> | +1650203
>


Re: [ANNOUNCE] Apache Beam 2.13.0 released!

2019-06-10 Thread Chad Dombrova
>
>
> @Chad Thanks for the feedback. I agree that we can improve our release
> notes. The particular issue you were looking for was part of the detailed
> list [1] linked in the blog post:
> https://jira.apache.org/jira/browse/BEAM-7029


Just to be clear, I had no idea about the feature ahead of time, but I
spent awhile clicking through various issues in the release notes to read
about things that seemed interesting, but it often took clicking all the
way through to the github PRs to actually understand what I was looking
at.  So the release notes are working, they just take some time to sift
through.  I think the simplest improvement you could make would be to
include a component(s) column in the release note.

thanks!


gRPC method to get a pipeline definition?

2019-06-25 Thread Chad Dombrova
Hi all,
I've been poking around the beam source code trying to determine whether
it's possible to get the definition of a pipeline via beam's gPRC-based
services.   It looks like the message types are there for describing a
Pipeline

but
as far as I can tell, they're only used by JobService.Prepare() for
submitting a new job.

If I were to create a PR to add support for a JobService.GetPipeline()
method, would that be interesting to others?  Is it technically feasible?
i.e. is the pipeline definition readily available to the job service after
the job has been prepared and sent to the runner?

Bigger picture, what I'm thinking about is writing a UI that's designed to
view and monitor Beam pipelines via the portability abstraction, rather
than using the (rather clunky) UIs that come with runners like Flink and
Dataflow.  My thinking is that using beam's abstractions would future proof
the UI by allowing it to work with any portable runner.  Right now it's
just an idea, so I'd love to know what others think of this.

thanks!
-chad


Re: gRPC method to get a pipeline definition?

2019-06-27 Thread Chad Dombrova
Hi all,
Thanks for all the support!

I put together a rough working version of this already and it was quite
easy, even for a Java newb.

After playing with it a little I was surprised to find that:

A) completed jobs are not cleared from the job service
B) job info is not persisted between restarts of the job service

It seems like this adds up to a memory leak which can only be resolved by
restarting the service and thereby losing information about jobs which may
be actively running.  How are people dealing with this currently?

Note, I'm referring to the InMemoryJobService that's started like this:

./gradlew :beam-runners-flink-1.8-job-server:runShadow
-PflinkMasterUrl=localhost:8081

Thanks for the tip on the .dot graph exporter!  That will come in handy.

-chad


On Wed, Jun 26, 2019 at 6:39 AM Tim Robertson 
wrote:

> Another +1 to support your research into this Chad. Thank you.
>
> Trying to understand where a beam process is in the Spark DAG is... not
> easy. A UI that helped would be a great addition.
>
>
>
> On Wed, Jun 26, 2019 at 3:30 PM Ismaël Mejía  wrote:
>
>> +1 don't hesitate to create a JIRA + PR. You may be interested in [1].
>> This is a simple util class that takes a proto pipeline object and
>> converts it into its graph representation in .dot format. You can
>> easily reuse the code or the idea as a first approach to show what the
>> pipeline is about.
>>
>> [1]
>> https://github.com/apache/beam/blob/2df702a1448fa6cbd22cd225bf16e9ffc4c82595/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/renderer/PortablePipelineDotRenderer.java#L29
>>
>> On Wed, Jun 26, 2019 at 10:27 AM Robert Bradshaw 
>> wrote:
>> >
>> > Yes, offering a way to get a pipeline from the job service directly
>> > would be a completely reasonable thing to do (and likely not hard at
>> > all). We welcome pull requests.
>> >
>> > Alternative UIs built on top of this abstraction would be an
>> > interesting project to explore.
>> >
>> > On Wed, Jun 26, 2019 at 8:44 AM Chad Dombrova 
>> wrote:
>> > >
>> > > Hi all,
>> > > I've been poking around the beam source code trying to determine
>> whether it's possible to get the definition of a pipeline via beam's
>> gPRC-based services.   It looks like the message types are there for
>> describing a Pipeline but as far as I can tell, they're only used by
>> JobService.Prepare() for submitting a new job.
>> > >
>> > > If I were to create a PR to add support for a
>> JobService.GetPipeline() method, would that be interesting to others?  Is
>> it technically feasible?  i.e. is the pipeline definition readily available
>> to the job service after the job has been prepared and sent to the runner?
>> > >
>> > > Bigger picture, what I'm thinking about is writing a UI that's
>> designed to view and monitor Beam pipelines via the portability
>> abstraction, rather than using the (rather clunky) UIs that come with
>> runners like Flink and Dataflow.  My thinking is that using beam's
>> abstractions would future proof the UI by allowing it to work with any
>> portable runner.  Right now it's just an idea, so I'd love to know what
>> others think of this.
>> > >
>> > > thanks!
>> > > -chad
>> > >
>>
>


Re: gRPC method to get a pipeline definition?

2019-06-28 Thread Chad Dombrova
> In reality, a more complex job service is needed that is backed by some
> kind of persistent storage or stateful service.
>

I was afraid you were going to say that :)  So is anything like this
planned or in the works?

I see that there's also ReferenceRunnerJobService, but it seems like a very
similar implementation to InMemoryJobService.  What's the use case for that?

-chad


Re: gRPC method to get a pipeline definition?

2019-06-28 Thread Chad Dombrova
> I think the simplest solution would be to have some kind of override/hook
> that allows Flink/Spark/... to provide storage. They already have a concept
> of a job and know how to store them so can we piggyback the Beam pipeline
> there.
>

That makes sense to me, since it avoids adding a dependency on a database
like Mongo, which adds complexity to the deployment.  That said, Beam's
definition of a job is different from Flink/Spark/etc.  To support this, a
runner would need to support storing arbitrary metadata, so that the Beam
Job Service could store a copy of each Beam job there (pipeline, pipeline
options, etc), either directly as serialized protobuf messages, or by
converting those to json.  Do you know offhand if Flink and Spark support
that kind of arbitrary storage?

-chad


[python] ReadFromPubSub broken in Flink

2019-07-12 Thread Chad Dombrova
Hi all,
This error came as a bit of a surprise.

Here’s a snippet of the traceback (full traceback below).

  File "apache_beam/runners/common.py", line 751, in
apache_beam.runners.common.DoFnRunner.process
return self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 423, in
apache_beam.runners.common.SimpleInvoker.invoke_process
windowed_value, self.process_method(windowed_value.value))
  File 
"/Users/chad/dev/beam-tests/.venv/lib/python2.7/site-packages/apache_beam/io/iobase.py",
line 860, in split_source
AttributeError: '_PubSubSource' object has no attribute
'estimate_size' [while running 'PubSubInflow/Read/Split']


Flink is using _PubSubSource which is, as far as I can tell, a stub that
should be replaced at runtime by an actual streaming source. Is this error
a bug or a known limitation? If the latter, is there a Jira issue and any
momentum to solve this?

I’m pretty confused by this because the Apache Beam Portability Support
Matrix [1] makes it pretty clear that Flink supports streaming, and the
docs for “Built-in I/O Transforms” lists Google PubSub and BigQuery as the
only IO transforms that support streaming, so if streaming works with
Flink, PubSub should probably be the thing it works with.

I'm using beam 2.13.0 and flink 1.8.

thanks,
chad

[1]
https://docs.google.com/spreadsheets/d/1KDa_FGn1ShjomGd-UUDOhuh2q73de2tPz6BqHpzqvNI/edit#gid=0
[2] https://beam.apache.org/documentation/io/built-in/

Full traceback:

Caused by: java.util.concurrent.ExecutionException:
java.lang.RuntimeException: Error received from SDK harness for
instruction 5: Traceback (most recent call last):
  File 
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 157, in _execute
response = task()
  File 
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 190, in 
self._execute(lambda: worker.do_instruction(work), work)
  File 
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 333, in do_instruction
request.instruction_id)
  File 
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 359, in process_bundle
bundle_processor.process_bundle(instruction_id))
  File 
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 589, in process_bundle
].process_encoded(data.data)
  File 
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 143, in process_encoded
self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 246, in
apache_beam.runners.worker.operations.Operation.output
def output(self, windowed_value, output_index=0):
  File "apache_beam/runners/worker/operations.py", line 247, in
apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 143, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
self.consumer.process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 583, in
apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 584, in
apache_beam.runners.worker.operations.DoOperation.process
delayed_application = self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 747, in
apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
  File "apache_beam/runners/common.py", line 753, in
apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 807, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
raise_with_traceback(new_exn)
  File "apache_beam/runners/common.py", line 751, in
apache_beam.runners.common.DoFnRunner.process
return self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 423, in
apache_beam.runners.common.SimpleInvoker.invoke_process
windowed_value, self.process_method(windowed_value.value))
  File 
"/Users/chad/dev/beam-tests/.venv/lib/python2.7/site-packages/apache_beam/io/iobase.py",
line 860, in split_source
AttributeError: '_PubSubSource' object has no attribute
'estimate_size' [while running 'PubSubInflow/Read/Split']


Re: [python] ReadFromPubSub broken in Flink

2019-07-13 Thread Chad Dombrova
Thanks for the response, Max.  I saw that KafkaIO is now supported in
python via an external transform (
https://jira.apache.org/jira/browse/BEAM-7029), but I thought I read
somewhere that it was only supported in batch mode (though I don't see that
mentioned in the ticket or the PR, so not sure where I got that
impression).

Do I have to modify the source along the lines of the KafkaIO PRs to work
with PubSubIO, or is it already supported via some flag?

-chad


On Sat, Jul 13, 2019 at 8:43 AM Maximilian Michels  wrote:

> Hi Chad,
>
> This stub will only be replaced by the Dataflow service. It's an artifact
> of the pre-portability era.
>
> That said, we now have the option to replace ReadFromPubSub with an
> external transform which would utilize Java's PubSubIO via the new
> cross-language feature.
>
> Thanks,
> Max
>
> On 12.07.19 19:32, Chad Dombrova wrote:
> > Hi all,
> > This error came as a bit of a surprise.
> >
> > Here’s a snippet of the traceback (full traceback below).
> >
> > |File "apache_beam/runners/common.py", line 751, in
> > apache_beam.runners.common.DoFnRunner.process return
> > self.do_fn_invoker.invoke_process(windowed_value) File
> > "apache_beam/runners/common.py", line 423, in
> > apache_beam.runners.common.SimpleInvoker.invoke_process windowed_value,
> > self.process_method(windowed_value.value)) File
> >
> "/Users/chad/dev/beam-tests/.venv/lib/python2.7/site-packages/apache_beam/io/iobase.py",
> > line 860, in split_source AttributeError: '_PubSubSource' object has no
> > attribute 'estimate_size' [while running 'PubSubInflow/Read/Split'] |
> >
> > Flink is using _PubSubSource which is, as far as I can tell, a stub that
> > should be replaced at runtime by an actual streaming source. Is this
> > error a bug or a known limitation? If the latter, is there a Jira issue
> > and any momentum to solve this?
> >
> > I’m pretty confused by this because the Apache Beam Portability Support
> > Matrix [1] makes it pretty clear that Flink supports streaming, and the
> > docs for “Built-in I/O Transforms” lists Google PubSub and BigQuery as
> > the only IO transforms that support streaming, so if streaming works
> > with Flink, PubSub should probably be the thing it works with.
> >
> > I'm using beam 2.13.0 and flink 1.8.
> >
> > thanks,
> > chad
> >
> > [1]
> >
> https://docs.google.com/spreadsheets/d/1KDa_FGn1ShjomGd-UUDOhuh2q73de2tPz6BqHpzqvNI/edit#gid=0
> > [2] https://beam.apache.org/documentation/io/built-in/
> >
> > Full traceback:
> >
> > |Caused by: java.util.concurrent.ExecutionException:
> > java.lang.RuntimeException: Error received from SDK harness for
> > instruction 5: Traceback (most recent call last): File
> >
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> > line 157, in _execute response = task() File
> >
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> > line 190, in  self._execute(lambda: worker.do_instruction(work),
> > work) File
> >
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> > line 333, in do_instruction request.instruction_id) File
> >
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> > line 359, in process_bundle
> > bundle_processor.process_bundle(instruction_id)) File
> >
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> > line 589, in process_bundle ].process_encoded(data.data) File
> >
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> > line 143, in process_encoded self.output(decoded_value) File
> > "apache_beam/runners/worker/operations.py", line 246, in
> > apache_beam.runners.worker.operations.Operation.output def output(self,
> > windowed_value, output_index=0): File
> > "apache_beam/runners/worker/operations.py", line 247, in
> > apache_beam.runners.worker.operations.Operation.output
> > cython.cast(Receiver,
> > self.receivers[output_index]).receive(windowed_value) File
> > "apache_beam/runners/worker/operations.py", line 143, in
> > apache_beam.runners.worker.operations.SingletonConsumerSet.receive
> > self.consumer.process(windowed_value) File
> > "apache_beam/runners/worker/operations.py", line 583, in
> > apache_beam.runners.worker.operations.DoOperation.process with
> > s

Re: [python] ReadFromPubSub broken in Flink

2019-07-13 Thread Chad Dombrova
> Cross-language support for PubSub is not yet implemented but it can be
> done similarly to ReadFromKafka. There are still some limitations regarding
> the coders, i.e. only coders can be used which are available in both the
> Java and the Python SDK (standard coders).
>

Yeah, I was just looking through the source and noticed a few things right
off the bat:

   - expansion service needs to be passed as an arg to each external xform
  - why not make this part of the pipeline options?  does it really
  need to vary from transform to transform?
   - explicit coders need to be passed to each external xform for each item
   to be serialized, key and value coders provided separately
  - in python we get auto-detection of coders based on type hints or
  data type, including compound data types (e.g. Tuple[int, str, Dict[str,
  float]])
  - in python we also have a fallback to the pickle coder for complex
  types without builtin coders.  is the pickle coder supported by java?
  - is there a way to express compound java coders as a string?
  - why not pass the results in and out of the java xform using
  bystrings, and then use python-based coders in python?

As of now the user experience is a bit rough, but we will be improving that
> very soon. Happy to help out if you want to contribute a cross-language
> ReadFromPubSub.
>

We're pretty locked in to Flink, thus adopting Kafka or PubSub is going to
be a requirement, so it looks like we're going the external transform route
either way.  I'd love to hear more about A) what the other limitations of
external transforms are, and B) what you have planned to improve the UX.
I'm sure we can find something to contribute!

-chad


Re: [python] ReadFromPubSub broken in Flink

2019-07-13 Thread Chad Dombrova
Hi Chamikara,


>>- why not make this part of the pipeline options?  does it really
>>   need to vary from transform to transform?
>>
>> It's possible for the same pipeline to connect to multiple expansion
> services, to use transforms from more than one SDK language and/or version.
>
There are only 3 languages supported, so excluding the use’s chosen
language we’re only talking about 2 options (i.e. for python, they’re java
and go). The reality is that Java provides the superset of all the IO
functionality of Go and Python, and the addition of external transforms is
only going to discourage the addition of more native IO transforms in
python and go (which is ultimately a good thing!). So it seems like a poor
UX choice to make users provide the expansion service to every single
external IO transform when the reality is that 99.9% of the time it’ll be
the same url for any given pipeline. Correct me if I’m wrong, but in a
production scenario the expansion service not be the current default,
localhost:8097, correct? That means users would need to always specific
this arg.

Here’s an alternate proposal: instead of providing the expansion service as
a URL in a transform’s __init__ arg, i.e. expansion_service='localhost:8097',
make it a symbolic name, like expansion_service='java' (an external
transform is bound to a particular source SDK, e.g. KafkaIO is bound to
Java, so this default seems reasonable to me). Then provide a pipeline
option to specify the url of an expansion service alias in the form
alias@url (e.g. --expansion_service=java@myexpansionservice:8097).

Are you talking about key/value coders of the Kafka external transform ?
> Story of coders is bit complicated for cross-language transform. Even if
> we get a bytestring from Java, how can we make sure that that is
> processable in Python ? For example, it might be a serialized Java object.
>
IIUC, it’s not as if you support that with the current design, do you? If
it’s a Java object that your native IO transform decodes in Java, then how
are you going to get that to Python? Presumably the reason it’s encoded as
a Java object is because it can’t be represented using a cross-language
coder.

On the other hand, if I’m authoring a beam pipeline in python using an
external transform like PubSubIO, then it’s desirable for me to write a
pickled python object to WriteToPubSub and get that back in a
ReadFromPubSub in another python-based pipeline. In other words, when it
comes to coders, it seems we should be favoring the language that is *using*
the external transform, rather than the native language of the transform
itself.

All of that said, it occurs to me that for ReadFromPubSub, we do explicit
decoding in a subsequent transform rather than as part of ReadFromPubSub,
so I’m confused why ReadFromKafka needs to know about coders at all. Is
that behavior specific to Kafka?


> This is great and contributions are welcome. BTW Max and others, do you
> think it will help to add an expanded roadmap on cross-language transforms
> to [3] that will better describe the current status and future roadmap of
> cross-language transform support for various SDKs and runners ?
>
More info would be great. I’ve started looking at the changes required to
make KafkaIO work as an external transform and I have a number of questions
already. I’ll probably start asking questions on specific lines this old PR
 unless you’d like me to
use a different forum.

thanks,
-chad


Portability framework: multiple environments in one pipeline

2019-07-20 Thread Chad Dombrova
Hi all,
I'm interested to know if others on the list would find value in the
ability to use multiple environments (e.g. docker images) within a single
pipeline, using some mechanism to identify the environment(s) that a
transform should use. It would be quite useful for us, since our transforms
can have conflicting python requirements, or worse, conflicting interpreter
requirements.  Currently to solve this we have to break the pipeline up
into multiple pipelines and use pubsub to communicate between them, which
is not ideal.

-chad


Re: Portability framework: multiple environments in one pipeline

2019-07-23 Thread Chad Dombrova
Our specific situation is pretty unique, but I think it fits a more general
pattern.  We use a number of media applications and each comes with its own
built-in python interpreter (Autodesk Maya and SideFX Houndini, for
example), and the core modules for each application can only be imported
within their respective interpreter.  We want to be able to create
pipelines where certain transforms are hosted within different application
interpreters, so that we can avoid the ugly workarounds that we have to do
now.

I can imagine a similar scenario where a user wants to use a number of
different libraries for different transforms, but the libraries’
requirements conflict with each other, or perhaps some require python3 and
others are stuck on python2.

Where can I find documentation on the expansion service?  I found a design
doc which was helpful, but it seems to hew toward the hypothetical, so I
think there have been a number of concrete steps taken since it was
written:
https://docs.google.com/document/d/1veiDF2dVH_5y56YxCcri2elCtYJgJnqw9aKPWAgddH8/mobilebasic

-chad



On Tue, Jul 23, 2019 at 1:39 PM Chamikara Jayalath 
wrote:

> I think we have primary focussed on the ability run transforms from
> multiple SDK in the same pipeline (cross-language) so far, but as Robert
> mentioned the framework currently in development should also be usable for
> running pipelines that use multiple environments that have the same SDK
> installed as well. I'd love to get more clarity on the exact use-case here
> (for example, details on why you couldn't run all Python transforms in a
> single environment) and to know if others have the same requirement.
>
> Thanks,
> Cham
>
>
> On Mon, Jul 22, 2019 at 12:31 AM Robert Bradshaw 
> wrote:
>
>> Yes, for sure. Support for this is available in some runners (like the
>> Python Universal Local Runner and Flink) and actively being added to
>> others (e.g. Dataflow). There are still some rough edges however--one
>> currently must run an expansion service to define a pipeline step in
>> an alternative environment (e.g. by registering your transforms and
>> running
>> https://github.com/apache/beam/blob/release-2.14.0/sdks/python/apache_beam/runners/portability/expansion_service_test.py
>> ).
>> We'd like to make this process a lot smoother (and feedback would be
>> appreciated).
>>
>> On Sat, Jul 20, 2019 at 7:57 PM Chad Dombrova  wrote:
>> >
>> > Hi all,
>> > I'm interested to know if others on the list would find value in the
>> ability to use multiple environments (e.g. docker images) within a single
>> pipeline, using some mechanism to identify the environment(s) that a
>> transform should use. It would be quite useful for us, since our transforms
>> can have conflicting python requirements, or worse, conflicting interpreter
>> requirements.  Currently to solve this we have to break the pipeline up
>> into multiple pipelines and use pubsub to communicate between them, which
>> is not ideal.
>> >
>> > -chad
>> >
>>
>


Re: [ANNOUNCE] Beam 2.14.0 Released!

2019-08-01 Thread Chad Dombrova
Nice work all round!  I love the release blog format with the highlights
and links to issues.

-chad


On Thu, Aug 1, 2019 at 4:23 PM Anton Kedin  wrote:

> The Apache Beam team is pleased to announce the release of version 2.14.0.
>
> Apache Beam is an open source unified programming model to define and
> execute data processing pipelines, including ETL, batch and stream
> (continuous) processing. See https://beam.apache.org
>
> You can download the release here:
>
> https://beam.apache.org/get-started/downloads/
>
> This release includes bugfixes, features, and improvements detailed on
> the Beam blog: https://beam.apache.org/blog/2019/07/31/beam-2.14.0.html
>
> Thanks to everyone who contributed to this release, and we hope you enjoy
> using Beam 2.14.0.
>
> -- Anton Kedin, on behalf of The Apache Beam team
>


Re: Beam python pipeline on spark

2019-08-03 Thread Chad Dombrova
It's in the doc that Kyle sent, but it's worth mentioning here that
streaming is not yet supported.

-chad


On Fri, Aug 2, 2019 at 3:25 PM Mahesh Vangala 
wrote:

> Super 👍
> Thank you for the great news!
> I will file issues in this thread accordingly.
>
> Best,
> Mahesh
>
> *--*
> *Mahesh Vangala*
> *(Ph) 443-326-1957*
> *(web) mvangala.com *
>
>
> On Fri, Aug 2, 2019 at 2:51 PM Kyle Weaver  wrote:
>
>> Hi Manesh,
>>
>> I'm happy to say we do support Python pipelines on the spark runner now.
>> Check out instructions here :
>> https://beam.apache.org/documentation/runners/spark/
>> This is a new development, so be sure to let me know if you have any bugs
>> or other feedback.
>>
>> Thanks,
>> Kyle
>>
>> On Fri, Aug 2, 2019 at 11:24 AM Mahesh Vangala 
>> wrote:
>>
>>> Hello all -
>>>
>>> I have enquired about it a while back, and the answer then was "not".
>>> There also stack overflow response here:
>>> https://stackoverflow.com/questions/43923776/python-support-for-sparkrunner-in-apache-beam
>>>
>>> I am wondering if this has changed to date?
>>>
>>> Let me know.
>>>
>>> Thanks,
>>> Mahesh
>>>
>>> *--*
>>> *Mahesh Vangala*
>>> *(Ph) 443-326-1957*
>>> *(web) mvangala.com *
>>>
>> --
>> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com
>>
>


Design question regarding streaming and sorting

2019-08-23 Thread Chad Dombrova
Hi all,
Our team is brainstorming how to solve a particular type of problem with
Beam, and it's a bit beyond our experience level, so I thought I'd turn to
the experts for some advice.

Here are the pieces of our puzzle:

   - a data source with the following properties:
  - rows represent work to do
  - each row has an integer priority
  - rows can be added or deleted
  - priorities of a row can be changed
  - <10k rows
   - a slow worker Beam transform (Map/ParDo) that consumes a row at a time


We want a streaming pipeline that delivers rows from our data store to the
worker transform,  resorting the source based on priority each time a new
row is delivered.  The goal is that last second changes in priority can
affect the order of the slowly yielding read.  Throughput is not a major
concern since the worker is the bottleneck.

I have a few questions:

   - is the sort of problem that BeamSQL can solve? I'm not sure how
   sorting and resorting are handled there in a streaming context...
   - I'm unclear on how back-pressure in Flink affects streaming reads.
   It's my hope that data/messages are left in the data source until
   back-pressure subsides, rather than read eagerly into memory.  Can someone
   clarify this for me?
   - is there a combination of windowing and triggering that can solve this
   continual resorting plus slow yielding problem?  It's not unreasonable to
   keep all of our rows in memory on Flink, as long as we're snapshotting
   state.


Any advice on how an expert Beamer would solve this is greatly appreciated!

Thanks,
chad


Re: Design question regarding streaming and sorting

2019-08-24 Thread Chad Dombrova
>
> Is there a reason you can't process the rows in parallel?
>

Yes, but only up to a very low limit.  In our current solution (not using
Beam) this is 4.  The workers are doing data sync over a WAN with very high
latency and relatively low bandwidth.  We limit the number of workers (and
thus number of rows processed in parallel) in order to ensure that high
priority rows finish quickly and before low priority rows.  If the limit
were too high, the available bandwidth would be spread over too many tasks
-- both low and high priority -- and thus increase the time to complete any
given task.

Can you pause inflight work on a row arbitrarily or does whatever work that
> you start must complete?
>

That is not a requirement, but it could be an interesting feature if done
correctly.

-chad


Re: Save state on tear down

2019-08-30 Thread Chad Dombrova
+dev

I read the document on Drain, and it sounds very promising.  I have a few
questions, starting with this statement from the doc:

   "This document proposes a new pipeline action called Drain. Drain can be
implemented by runners by manipulating the watermark of the pipeline."

What is a pipeline "action" and how would this be exposed to the user?  I
assume this is externally and manually initiated.  Is this something that
would be invoked from a PipelineResult object (i.e. akin to "cancel")?

Once a Drain is initiated on a pipeline, does this trigger a loop over all
unbounded sources to set their watermark to infinity, or only certain
ones?

thanks,
-chad


On Fri, Aug 16, 2019 at 2:47 PM Jose Delgado 
wrote:

> I see,  thank you  Lukasz.
>
>
>
> Regards,
> Jose
>
> *From: *Lukasz Cwik 
> *Reply-To: *"user@beam.apache.org" 
> *Date: *Monday, August 5, 2019 at 11:11 AM
> *To: *user 
> *Subject: *Re: Save state on tear down
>
>
>
> This is not possible today.
>
>
>
> There have been discussions about pipeline drain, snapshot and update [1,
> 2] which may provide additional details of what is planned and could use
> your feedback.
>
>
>
> 1:
> https://docs.google.com/document/d/1NExwHlj-2q2WUGhSO4jTu8XGhDPmm3cllSN8IMmWci8
>
> 2:
> https://docs.google.com/document/d/1UWhnYPgui0gUYOsuGcCjLuoOUlGA4QaY91n8p3wz9MY
>
>
>
> On Thu, Aug 1, 2019 at 3:44 PM Jose Delgado 
> wrote:
>
> Hello All,
>
>
>
> I wondering if there is a way or pattern to save state on tear down ?
>
>
>
> In case of a failure or a pipeline we cannot update(due to significant
> changes ) we would like to save the state and re-load it on the next
> creation of the pipeline.
>
>
>
> Note: we are currently using Google Dataflow runner
>
>
>
> Regards,
>
> Jose
>
>


The state of external transforms in Beam

2019-09-16 Thread Chad Dombrova
Hi all,
There was some interest in this topic at the Beam Summit this week (btw,
great job to everyone involved!), so I thought I’d try to summarize the
current state of things.
First, let me explain the idea behind an external transforms for the
uninitiated.

Problem:

   - there’s a transform that you want to use, but it’s not available in
   your desired language. IO connectors are a good example: there are many
   available in the Java SDK, but not so much in Python or Go.

Solution:

   1. Create a stub transform in your desired language (e.g. Python) whose
   primary role is to serialize the parameters passed to that transform
   2. When you run your portable pipeline, just prior to it being sent to
   the Job Service for execution, your stub transform’s payload is first sent
   to the “Expansion Service” that’s running in the native language (Java),
   where the payload is used to construct an instance of the native transform,
   which is then expanded and converted to a protobuf and sent back to the
   calling process (Python).
   3. The protobuf representation of the expanded transform gets integrated
   back into the pipeline that you’re submitting
   4. Steps 2-3 are repeated for each external transform in your pipeline
   5. Then the whole pipeline gets sent to the Job Service to be invoked on
   Flink/Spark/etc

--

Now on to my journey to get PubsubIO working in python on Flink.

The first issue I encountered was that there was a lot of boilerplate
involved in serializing the stub python transform’s parameters so they can
be sent to the expansion service.

I created a PR to make this simpler, which has just been merged to master:
https://github.com/apache/beam/pull/9098

With this feature in place, if you’re using python 3.7 you can use a
dataclass and the typing module to create your transform and describe your
schema in one go. For example:

@dataclasses.dataclass
class MyAwesomeTransform(beam.ExternalTransform):
  URN = 'beam:external:fakeurn:v1'

  integer_example: int
  string_example: str
  list_of_strings: List[str]
  optional_kv: Optional[Tuple[str, float]] = None
  optional_integer: Optional[int] = None
  expansion_service: dataclasses.InitVar[Optional[str]] = None

For earlier versions of python, you can use typing.NamedTuple to declare
your schema.

MyAwesomeSchema = typing.NamedTuple(
'MyAwesomeSchema',
[
('integer_example', int),
('string_example', unicode),
('list_of_strings', List[unicode]),
('optional_kv', Optional[Tuple[unicode, float]]),
('optional_integer', Optional[int]),
]
)

There’s also an option to generate the schema implicitly based on the
value(s) you wish to serialize.

There was a slight tangent in implementing this feature in that requesting
a coder for typing.List resulted in pickle coder instead of IterableCoder.
That’s bad because only standard/portable coders can be used for expansion
in Java (for obvious reasons), so as a convenience that was solved here:
https://github.com/apache/beam/pull/9344

The next issue that I encountered was that python did not track the
boundedness of PCollections, which made it impossible to use the expansion
service to create unbounded writes. That’s been solved and merged here:
https://github.com/apache/beam/pull/9426

So that brings us to the actual PR for adding external transform support
for PubsubIO: https://github.com/apache/beam/pull/9268

The PR works, but with one big caveat: in order to use it you must build
your Java containers with this special commit:
https://github.com/chadrik/beam/commit/d12b99084809ec34fcf0be616e94301d3aca4870

That commit solves 2 problems:

   1. Adds the pubsub Java deps so that they’re available in our portable
   pipeline
   2. Makes the coder for the PubsubIO message-holder type, PubsubMessage,
   available as a standard coder. This is required because both PubsubIO.Read
   and PubsubIO.Write expand to ParDos which pass along these PubsubMessage
   objects, but only “standard” (i.e. portable) coders can be used, so we have
   to hack it to make PubsubMessage appear as a standard coder.

More details:

   - There’s a similar magic commit required for Kafka external transforms
   - The Jira issue for this problem is here:
   https://jira.apache.org/jira/browse/BEAM-7870
   - For problem #2 above there seems to be some consensus forming around
   using Avro or schema/row coders to send compound types in a portable way.
   Here’s the PR for making row coders portable
   https://github.com/apache/beam/pull/9188
   - I don’t really have any ideas for problem #1

So the portability expansion system works, and now it’s time to sand off
some of the rough corners. I’d love to hear others’ thoughts on how to
resolve some of these remaining issues.

-chad


Re: Python Portable Runner Issues

2019-09-18 Thread Chad Dombrova
Just note that while Dataflow does have robust python support it does not
fully support the portability framework.  It’s a bit of a blurry
distinction, and honestly I’m not crystal clear on this as I get the
impression that Dataflow may be a bit of a Portability hybrid.  It does not
use the job service or the expansion service but I have heard that it uses
the external worker pool, and maybe it uses the protobufs pipeline
definitions?  It’d be great if a Googler could clarify.

-chad


On Wed, Sep 18, 2019 at 7:51 AM Holden Karau  wrote:

> Probably the most stable is running on Dataflow still. But I’m excited to
> see the progress towards a Spark runner, can’t wait to try TFT on it :)
>
> On Tue, Sep 17, 2019 at 4:37 PM Kyle Weaver  wrote:
>
>> The Flink runner is definitely more stable, as it's been around for
>> longer and has more developers and users on it. But a lot of the code is
>> shared, so for example some of the issues above would also happen on the
>> Flink runner.
>>
>> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com
>>
>>
>> On Tue, Sep 17, 2019 at 4:34 PM Benjamin Tan 
>> wrote:
>>
>>> Thanks for all the replies Kyle! You've been super helpful :D.
>>>
>>> Would you say that the Flink runner more stable than the Spark one? Or
>>> which combo is the most stable for now?
>>>
>>> On 2019/09/17 19:43:54, Tom Barber  wrote:
>>> > Thanks Kyle,
>>> >
>>> > From my pov Alpha is fine, I’m just trying to test out some of the
>>> > capabilities currently, but trying to dig around the website doesn’t
>>> > explain a great deal. Luckily Benjamin seems a step ahead of me… I
>>> hope it
>>> > stays that way!  ;)
>>> >
>>> >
>>> > On 17 September 2019 at 19:33:40, Kyle Weaver (kcwea...@google.com)
>>> wrote:
>>> >
>>> > > The amount of issues I've encountered as a newbie is indeed
>>> troubling.
>>> > Spark portability is very much "alpha" quality software, a point we
>>> should
>>> > maybe emphasize on the website more. Anyway, I appreciate your
>>> patience,
>>> > and I'll do my best to address all these issues.
>>> >
>>> > > org.apache.beam.vendor.grpc.v1p21p0.io.grpc.StatusRuntimeException:
>>> > UNIMPLEMENTED: Method not found!
>>> > > AttributeError: module 'apache_beam.coders.coders' has no
>>> > attribute ‘VarIntCoder'
>>> > This class of errors occurs when the SDK version does not match up
>>> with the
>>> > runner version -- unfortunately, we cannot guarantee compatibility
>>> between
>>> > the two, so manual syncing is required for now. We are looking for
>>> ways to
>>> > improve this.
>>> >
>>> > > ERROR:grpc._server:Exception calling application: u'2-1'
>>> > It looks like these errors are entirely spurious. Have a fix for it
>>> here:
>>> > https://github.com/apache/beam/pull/9600
>>> >
>>> > Note that there may still be other spurious errors like:
>>> >
>>> > 19/09/17 11:25:31 ERROR ManagedChannelOrphanWrapper: *~*~*~ Channel
>>> > ManagedChannelImpl{logId=84, target=localhost:36129} was not shutdown
>>> > properly!!! ~*~*~*
>>> >
>>> > and
>>> >
>>> > 19/09/17 11:25:32 ERROR SerializingExecutor: Exception while executing
>>> > runnable
>>> >
>>> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed@47db89b1
>>> > java.lang.IllegalStateException: call already closed
>>> >
>>> > I will try to get rid of them also, but for now you can just ignore
>>> them.
>>> > They are annoying but harmless.
>>> >
>>> > Kyle Weaver | Software Engineer | github.com/ibzib |
>>> kcwea...@google.com
>>> >
>>> >
>>> > On Tue, Sep 17, 2019 at 8:05 AM Benjamin Tan <
>>> benjamintanwei...@gmail.com>
>>> > wrote:
>>> >
>>> > > :D. Still, I'm curious as to the error we both are getting. Maybe
>>> someone
>>> > > can shed some light on it.
>>> > >
>>> > > On Tue, Sep 17, 2019 at 10:54 PM Tom Barber 
>>> wrote:
>>> > >
>>> > >> I do see hello written to 1 file and world to another, I guess it
>>> works!
>>> > >> Thanks for the pointers Benjamin I was about to give up.
>>> > >>
>>> > >> Tom
>>> > >>
>>> > >>
>>> > >> On 17 September 2019 at 15:51:13, Benjamin Tan (
>>> > >> benjamintanwei...@gmail.com) wrote:
>>> > >>
>>> > >> Tell me if you see any output. Anyway, here's the link to the same
>>> issue
>>> > >> you're facing:
>>> > >>
>>> > >>
>>> > >>
>>> https://lists.apache.org/thread.html/4e8e1455916debe096de32551f9ab05853524cf282bc312cd4620d68@%3Cuser.beam.apache.org%3E
>>> > >>
>>> > >> The amount of issues I've encountered as a newbie is indeed
>>> troubling.
>>> > >>
>>> > >> On 2019/09/17 14:43:11, Tom Barber  wrote:
>>> > >> > 🤣 okay I’ll look again, I assumed it just crashed in a ball of
>>> flames!
>>> > >> >
>>> > >> >
>>> > >> > On 17 September 2019 at 15:39:33, Benjamin Tan (
>>> > >> benjamintanwei...@gmail.com)
>>> > >> > wrote:
>>> > >> >
>>> > >> > I got this too! Did you manage to get any output? (I did) I
>>> reported
>>> > >> this
>>> > >> > in another thread.
>>> > >> >
>>> > >> > This looks like a key error 

Re: Design patterns while using Beam

2019-09-23 Thread Chad Dombrova
There are also these two helpful articles on Dataflow patterns which are
largely applicable to Beam in general:

https://www.google.com/amp/s/cloudblog.withgoogle.com/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1/amp/

https://www.google.com/amp/s/cloudblog.withgoogle.com/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-2/amp/

Perhaps these can be ported to the official Beam docs?

-chad


On Sun, Sep 22, 2019 at 10:57 PM Reza Rokni  wrote:

> Great idea!
>
> In terms of patterns we do have a section in the docs, would be great for
> more contributors to it!
>
> https://beam.apache.org/documentation/patterns/overview/
>
> Cheers
>
> R
>
>
> On Sun, 22 Sep 2019 at 13:43, dev wearebold 
> wrote:
>
>> Hey,
>>
>> That’s a very good idea, this could help people a lot
>>
>>
>> Regards,
>>
>> J
>>
>> > Le 22 sept. 2019 à 06:39, deepak kumar  a écrit :
>> >
>> > Hi All
>> > I guess we need to put some examples in the documentation around best
>> coding practises , concurrency , non blocking IO and design patterns while
>> writing Apache Beam pipelines.
>> > Is there any such guide available ?
>> > E.g. when there are lot of options to be used in the pipeline ,
>> BuilderPattern should be used.
>> > Another use case can be when anyone wants to run complex transformation
>> on incoming objects , visitor pattern should be used.
>> > This guide can come from people already running beam in production and
>> written it with all best practices in mind.
>> > It will help in greater and wider adaoption.
>> >
>> > Just a thought.
>> > Please let me know if anyone wants to contribute and i can lead this
>> initiative.
>> >
>> > Thanks
>> > Deepak
>>
>>
>
> --
>
> This email may be confidential and privileged. If you received this
> communication by mistake, please don't forward it to anyone else, please
> erase all copies and attachments, and please let me know that it has gone
> to the wrong person.
>
> The above terms reflect a potential business arrangement, are provided
> solely as a basis for further discussion, and are not intended to be and do
> not constitute a legally binding obligation. No legally binding obligations
> will be created, implied, or inferred until an agreement in final form is
> executed in writing by all parties involved.
>


Dataflow python2 support

2020-10-13 Thread Chad Dombrova
Hi all,
Those of you who have been following the python2/3 topics on this thread
know that the industry that I work for is a bit behind the times;  we're
still waiting for all the libraries we need to be ported to python3.
Here's a handy website for those who are curious: https://vfxpy.com/

I'm hoping that we'll have most of what we need by EOY or shortly
thereafter, but on the Beam side that leaves us with a gap of 3 months or
more.  I was hoping that we'd be able to coast over that gap using Beam
2.24, but Dataflow dropped support for python2 a week ago [1].

So my question for the Dataflow team is this:  if we lock down our
pipelines to use 2.24 -- the last version of Beam to officially support
python2 -- and run our SDK workers inside docker containers, will new jobs
submitted to Dataflow continue to work on python2?   Or will the Dataflow
runner itself stop being able to execute python2 code regardless of the
version of Beam used?  As an example, I know that PubSubIO, which we use
extensively, is not part of Beam itself, but is sorta magically patched
into the pipeline by Dataflow.

I will say that when I agreed that I was satisfied that Beam 2.24 would be
the last python2 release to support python2, I did so under the assumption
that pipelines running 2.24 would be supported for longer than a few
weeks:  2.24 was released to PyPI on Sept 16, and python2 pipelines became
unsupported on Dataflow October 7th.

thanks,
-chad

[1] https://cloud.google.com/python/docs/python2-sunset/#dataflow


Re: using beam with flink runner

2020-12-28 Thread Chad Dombrova
Sam Bourne’s repo has a lot of tricks for using Flink with docker
containers:
https://github.com/sambvfx/beam-flink-k8s

Feel free to make a PR if you find anything has changed.


-Chad


On Mon, Dec 28, 2020 at 9:38 AM Kyle Weaver  wrote:

> Using Docker workers along with the local filesystem I/O is not
> recommended because the Docker workers will use their own filesystems
> instead of the host filesystem. See
> https://issues.apache.org/jira/browse/BEAM-5440
>
> On Sun, Dec 27, 2020 at 5:01 AM Günter Hipler 
> wrote:
>
>> Hi,
>>
>> I just tried to start a beam pipeline on a flink cluster using
>>
>> - the latest published beam version 2.26.0
>> - the python SDK
>> - a standalone flink cluster version 1.10.1
>> - the simple pipeline I used  [1]
>>
>> When I start the pipeline in embedded mode it works correctly (even
>> pulling a jobmanager docker image)
>>
>> python mas_zb_demo_marc_author_count.py --input
>> /swissbib_index/solrDocumentProcessing/FrequentInitialPreProcessing/data/beam/input/job8r1A069.format.xml
>>
>> --output
>> /swissbib_index/solrDocumentProcessing/FrequentInitialPreProcessing/data/beam/output/out.txt
>>
>> --runner=FlinkRunner --flink_version=1.10
>>
>> 
>> WARNING:root:Make sure that locally built Python SDK docker image has
>> Python 3.7 interpreter.
>> INFO:root:Using Python SDK docker image:
>> apache/beam_python3.7_sdk:2.26.0. If the image is not available at
>> local, we will try to pull from hub.docker.com
>> 
>>
>> python mas_zb_demo_marc_author_count.py --input
>> /swissbib_index/solrDocumentProcessing/FrequentInitialPreProcessing/data/beam/input/job8r1A069.format.xml
>>
>> --output
>> /swissbib_index/solrDocumentProcessing/FrequentInitialPreProcessing/data/beam/output/out.txt
>>
>> --runner=FlinkRunner --flink_version=1.10
>>
>> I'm using python version
>> python --version
>> Python 3.7.9
>>
>> Trying to use the remote stanalone cluster the job fails when fetching
>> the jobmanager docker image
>> python mas_zb_demo_marc_author_count.py --input
>> /swissbib_index/solrDocumentProcessing/FrequentInitialPreProcessing/data/beam/input/job8r1A069.format.xml
>>
>> --output
>> /swissbib_index/solrDocumentProcessing/FrequentInitialPreProcessing/data/beam/output/out.txt
>>
>> --runner=FlinkRunner --flink_version=1.10 flink_master=localhost:8081
>>
>> 
>>
>> java.lang.Exception: The user defined 'open()' method caused an
>> exception: java.util.concurrent.TimeoutException: Timed out while
>> waiting for command 'docker run -d --network=host
>> --env=DOCKER_MAC_CONTAINER=null apache/beam_python3.8_sdk:2.26.0
>> --id=1-1 --provision_endpoint=localhost:41483'
>>  at
>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:499)
>>  at
>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
>>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
>>  at java.base/java.lang.Thread.run(Thread.java:834)
>>
>> 
>>
>> Then I pulled the apache/beam_python3.8_sdk:2.26.0 image locally to
>> avoid the timeout, which was successful, the remote job finished and the
>> they were shown in the Flink dashboard.
>>
>> But no result was written into the given --output dir although I
>> couldn't find any logs referencing this issue in the logs of Flink.
>> Additionally I'm getting quite a huge amount of logs in the python
>> process shell which sends the script to the cluster [2] - but I can't
>> see any reason for the behaviour
>>
>> Thanks for any explanations for the behaviour
>>
>> Günter
>>
>>
>> [1]
>>
>> https://gitlab.com/swissbib/lab/services/jupyter-beam-flink/-/blob/master/mas_zb_demo_marc_author_count.py
>> (grep author data from bibliographic library data)
>>
>> [2]
>>
>> https://gitlab.com/swissbib/lab/services/jupyter-beam-flink/-/blob/master/notes/logging.no.output.txt
>>
>>
>>


Dataflow and mounting large data sets

2023-01-26 Thread Chad Dombrova
Hi all,
We have large data sets which we would like to mount over NFS within
Dataflow.  As far as I know, this is not possible.  Has anything changed
there?

The data sets that our beam pipelines operate on are too large -- about a
petabyte -- to put into a custom container or to sync from one location to
another, this is why we need the ability to mount a volume, i.e. using
docker run --mount.

In the past we used Flink and a custom build of beam to add control docker
run options: we made an MR for that here:
https://github.com/apache/beam/pull/8982/files

It was not merged into beam due to "a concern about exposing all docker run
options since it would likely be incompatible with cluster managers that
use docker".  The discussion was here
https://lists.apache.org/thread.html/fa3fede5d2967daab676d3869d2fc94f505517504c3a849db789ef54%40%3Cdev.beam.apache.org%3E

I'm happy to revisit this feature with a set of pruned down options,
including --volume and --mount, but we would need Dataflow to support this
as well, because IIRC, the docker run invocation is part of a black box in
Dataflow (i.e. not part of Beam), so as Dataflow users we rely on those
changes from Google.

Is this a feature that already exists or that we can hope to get in
Dataflow?  Or do we need to go back to Flink to support this?

thanks,
-chad


Re: Dataflow and mounting large data sets

2023-01-30 Thread Chad Dombrova
Hi Israel,
Thanks for responding.

And could not the dataset be accessed from Cloud Storage? Does it need to
> be specifically NFS?
>

No unfortunately it can't be accessed from Cloud Storage.   Our data
resides on high performance Isilon [1] servers using a posix filesystem,
and NFS is the tried and true protocol for this.  This configuration cannot
be changed for a multitude of reasons, not least of which is that fact that
these servers outperform cloud storage at a fraction of the cost of cloud
offerings (which is a *very* *big* difference for multiple petabytes of
storage.  If you'd like more details on why this is not possible I'm happy
to explain, but for now let's just say that it's been investigated and it's
not practical).  The use of fast posix filers over NFS is fairly ubiquitous
in the media and entertainment industry (if you want to know more about how
we use Beam, I gave a talk at the Beam Summit a few years ago[2]).

thanks!
-chad

[1] https://www.dell.com/en-hk/dt/solutions/media-entertainment.htm
[2] https://www.youtube.com/watch?v=gvbQI3I03a8&t=644s&ab_channel=ApacheBeam


Re: Dataflow and mounting large data sets

2023-01-30 Thread Chad Dombrova
Hi Robert,
I know very little about the FileSystem classes, but I don’t think it’s
possible for a process running in docker to create an NFS mount without
running in privileged [1] mode, which cannot be done with Dataflow. The
other ways of gaining access to a mount are:

A. the node running docker has the NFS mount itself and passes it along
using docker run --volume.
B. the mount is created within the container by using docker run --mount.

Neither of these are possible with Dataflow.

Here’s a full example of how an NFS mount can be created when running
docker:

docker run -it --network=host \
   --mount 
'type=volume,src=pipe-nfs-test,dst=/Volumes/pipe-nfs-test,volume-driver=local,volume-opt=type=nfs,volume-opt=device=:/pipe,"volume-opt=o=addr=turbohal.luma.mel,vers=3"'
 \
   luma/pipe-shell -- bash

In my ideal world, I would make a PR to add support for the docker --mount
flag to Beam for the runners that I can control, and the Dataflow team
would add support on their end.

Let me know if I'm missing anything.

https://docs.docker.com/engine/reference/run/#runtime-privilege-and-linux-capabilities

thanks,
-chad


Re: Dataflow and mounting large data sets

2023-01-30 Thread Chad Dombrova
Hi Valentyn,


> Beam SDK docker containers on Dataflow VMs are currently launched in
> privileged mode.
>

Does this only apply to stock sdk containers?  I'm asking because we use a
custom sdk container that we build.  We've tried various ways of running
mount from within our custom beam container in Dataflow and we could not
get it to work, while the same thing succeeds in local tests and in our CI
(gitlab).  The assessment at the time (this was maybe a year ago) was that
the container was not running in privileged mode, but if you think that's
incorrect we can revisit this and report back with some error logs.

-chad


Re: Dataflow and mounting large data sets

2023-01-31 Thread Chad Dombrova
Thanks for the info.  We are going to test this further and we'll let you
know how it goes.

-chad


On Mon, Jan 30, 2023 at 2:14 PM Valentyn Tymofieiev 
wrote:

> It applies to custom containers as well. You can find the container
> manifest in the GCE VM metadata, and it should have an entry for privileged
> mode. The reason for this was to enable GPU accelerator support, but agree
> with Robert that it is not part of any contracts, so in theory this could
> change or perhaps be more strictly limited to accelerator support. In fact,
> originally, this was only enabled for pipelines using accelerators but for
> purely internal implementation details I believe it is currently enabled
> for all pipelines.
>
> So for prototyping purposes I think you could try it, but I can't make any
> guarantees in this thread that privileged mode will continue to work.
>
> cc: @Aaron Li  FYI
>
>
> On Mon, Jan 30, 2023 at 12:16 PM Robert Bradshaw 
> wrote:
>
>> I'm also not sure it's part of the contract that the containerization
>> technology we use will always have these capabilities.
>>
>> On Mon, Jan 30, 2023 at 10:53 AM Chad Dombrova  wrote:
>> >
>> > Hi Valentyn,
>> >
>> >>
>> >> Beam SDK docker containers on Dataflow VMs are currently launched in
>> privileged mode.
>> >
>> >
>> > Does this only apply to stock sdk containers?  I'm asking because we
>> use a custom sdk container that we build.  We've tried various ways of
>> running mount from within our custom beam container in Dataflow and we
>> could not get it to work, while the same thing succeeds in local tests and
>> in our CI (gitlab).  The assessment at the time (this was maybe a year ago)
>> was that the container was not running in privileged mode, but if you think
>> that's incorrect we can revisit this and report back with some error logs.
>> >
>> > -chad
>> >
>>
>


Re: Best patterns for a polling transform

2023-06-22 Thread Chad Dombrova
I’m also interested in the answer to this.  This is essential for reading
from many types of data sources.


On Tue, Jun 20, 2023 at 2:57 PM Sam Bourne  wrote:

> +dev to see if anyone has any suggestions.
>
> On Fri, Jun 16, 2023 at 5:46 PM Sam Bourne  wrote:
>
>> Hello beam community!
>>
>> I’m having trouble coming up with the best pattern to *eagerly* poll. By
>> eagerly, I mean that elements should be consumed and yielded as soon as
>> possible. There are a handful of experiments that I’ve tried and my latest
>> attempt using the timer API seems quite promising, but is operating in a
>> way that I find rather unintuitive. My solution was to create a sort of
>> recursive timer callback - which I found one example
>> 
>> of within the beam test code.
>>
>> I have a few questions:
>>
>> 1) The below code runs fine with a single worker but with multiple
>> workers there are duplicate values. It seems that the callback and snapshot
>> of the state is provided to multiple workers and the number of duplications
>> increases with the number of workers. Is this due to the values being
>> provided to timer.set?
>>
>> 2) I’m using TimeDomain.WATERMARK here due to it simply not working when
>> using REAL_TIME. The docs
>> 
>> seem to suggest REAL_TIME would be the way to do this, however there
>> seems to be no guarantee that a REAL_TIME callback will run. In this
>> sample setting the timer to REAL_TIME will simply not ever fire the
>> callback. Interestingly, if you call timer.set with any value less than
>> the current time.time(), then the callback will run, however it seems to
>> fire immediately regardless of the value (and in this sample will actually
>> raise an AssertionError
>> 
>> ).
>>
>> I’m happy for suggestions!
>> -Sam
>>
>> import randomimport threading
>> import apache_beam as beamimport apache_beam.coders as codersimport 
>> apache_beam.transforms.combiners as combinersimport 
>> apache_beam.transforms.userstate as userstateimport 
>> apache_beam.utils.timestamp as timestampfrom 
>> apache_beam.options.pipeline_options import PipelineOptions
>> class Log(beam.PTransform):
>>
>> lock = threading.Lock()
>>
>> @classmethod
>> def _log(cls, element, label):
>> with cls.lock:
>> # This just colors the print in terminal
>> print('\033[1m\033[92m{}\033[0m : {!r}'.format(label, element))
>> return element
>>
>> def expand(self, pcoll):
>> return pcoll | beam.Map(self._log, self.label)
>> class EagerProcess(beam.DoFn):
>>
>> BUFFER_STATE = userstate.BagStateSpec('buffer', coders.PickleCoder())
>> POLL_TIMER = userstate.TimerSpec('timer', beam.TimeDomain.WATERMARK)
>>
>> def process(
>> self,
>> element,
>> buffer=beam.DoFn.StateParam(BUFFER_STATE),
>> timer=beam.DoFn.TimerParam(POLL_TIMER),
>> ):
>> _, item = element
>>
>> for i in range(item):
>> buffer.add(i)
>>
>> timer.set(timestamp.Timestamp.now() + timestamp.Duration(seconds=10))
>>
>> @userstate.on_timer(POLL_TIMER)
>> def flush(
>> self,
>> buffer=beam.DoFn.StateParam(BUFFER_STATE),
>> timer=beam.DoFn.TimerParam(POLL_TIMER),
>> ):
>> cache = buffer.read()
>> buffer.clear()
>>
>> requeue = False
>> for item in cache:
>> if random.random() < 0.1:
>> yield item
>> else:
>> buffer.add(item)
>> requeue = True
>>
>> if requeue:
>> timer.set(timestamp.Timestamp.now() + 
>> timestamp.Duration(seconds=10))
>> def main():
>> options = PipelineOptions.from_dictionary({
>> 'direct_num_workers': 3,
>> 'direct_running_mode': 'multi_threading',
>> })
>>
>> pipe = beam.Pipeline(options=options)
>> (
>> pipe
>> | beam.Create([10])
>> | 'Init' >> Log()
>> | beam.Reify.Timestamp()
>> | 'PairWithKey' >> beam.Map(lambda x: (hash(x), x))
>> | beam.ParDo(EagerProcess())
>> | 'Complete' >> Log()
>> | beam.transforms.combiners.Count.Globally()
>> | 'Count' >> Log()
>> )
>> result = pipe.run()
>> result.wait_until_finish()
>> if __name__ == '__main__':
>> main()
>>
>>


Re: Can apache beam be used for control flow (ETL workflow)

2023-12-22 Thread Chad Dombrova
Hi,
I'm the guy who gave the Movie Magic talk.  Since it's possible to write
stateful transforms with Beam, it is capable of some very sophisticated
flow control.   I've not seen a python framework that combines this with
streaming data nearly as well.  That said, there aren't a lot of great
working examples out there for transforms that do sophisticated flow
control, and I feel like we're always wrestling with differences in
behavior between the direct runner and Dataflow.  There was a thread about
polling patterns [1] on this list that never really got a satisfying
resolution.  Likewise, there was a thread about using an SDF with an
unbound source [2] that also didn't get fully resolved.

[1] https://lists.apache.org/thread/nsxs49vjokcc5wkvdvbvsqwzq682s7qw
[2] https://lists.apache.org/thread/n3xgml0z8fok7101q79rsmdgp06lofnb



On Sun, Dec 17, 2023 at 3:53 PM Austin Bennett  wrote:

> https://beamsummit.org/sessions/event-driven-movie-magic/
>
> ^^ the question made me think of that use case.  Though, unclear how close
> it is to what you're thinking about.
>
> Cheers -
>
> On Fri, Dec 15, 2023 at 7:01 AM Byron Ellis via user 
> wrote:
>
>> As Jan says, theoretically possible? Sure. That particular set of
>> operations? Overkill. If you don't have it already set up I'd say even
>> something like Airflow is overkill here. If all you need to do is "launch
>> job and wait" when a file arrives... that's a small script and not
>> something that particularly requires a distributed data processing system.
>>
>> On Fri, Dec 15, 2023 at 4:58 AM Jan Lukavský  wrote:
>>
>>> Hi,
>>>
>>> Apache Beam describes itself as "Apache Beam is an open-source, unified
>>> programming model for batch and streaming data processing pipelines, ...".
>>> As such, it is possible to use it to express essentially arbitrary logic
>>> and run it as a streaming pipeline. A streaming pipeline processes input
>>> data and produces output data and/or actions. Given these assumptions, it
>>> is technically feasible to use Apache Beam for orchestrating other
>>> workflows, the problem is that it will very much likely not be efficient.
>>> Apache Beam has a lot of heavy-lifting related to the fact it is designed
>>> to process large volumes of data in a scalable way, which is probably not
>>> what would one need for workflow orchestration. So, my two cents would be,
>>> that although it _could_ be done, it probably _should not_ be done.
>>>
>>> Best,
>>>
>>>  Jan
>>> On 12/15/23 13:39, Mikhail Khludnev wrote:
>>>
>>> Hello,
>>> I think this page
>>> https://beam.apache.org/documentation/ml/orchestration/ might answer
>>> your question.
>>> Frankly speaking: GCP Workflows and Apache Airflow.
>>> But Beam itself is a data-stream/flow or batch processor; not a workflow
>>> engine (IMHO).
>>>
>>> On Fri, Dec 15, 2023 at 3:13 PM data_nerd_666 
>>> wrote:
>>>
 I know it is technically possible, but my case may be a little special.
 Say I have 3 steps for my control flow (ETL workflow):
 Step 1. upstream file watching
 Step 2. call some external service to run one job, e.g. run a notebook,
 run a python script
 Step 3. notify downstream workflow
 Can I use apache beam to build a DAG with 3 nodes and run this as
 either flink or spark job.  It might be a little weird, but I just want to
 learn from the community whether this is the right way to use apache beam,
 and has anyone done this before? Thanks



 On Fri, Dec 15, 2023 at 10:28 AM Byron Ellis via user <
 user@beam.apache.org> wrote:

> It’s technically possible but the closest thing I can think of would
> be triggering things based on things like file watching.
>
> On Thu, Dec 14, 2023 at 2:46 PM data_nerd_666 
> wrote:
>
>> Not using beam as time-based scheduler, but just use it to control
>> execution orders of ETL workflow DAG, because beam's abstraction is also 
>> a
>> DAG.
>> I know it is a little weird, just want to confirm with the community,
>> has anyone used beam like this before?
>>
>>
>>
>> On Thu, Dec 14, 2023 at 10:59 PM Jan Lukavský 
>> wrote:
>>
>>> Hi,
>>>
>>> can you give an example of what you mean for better understanding?
>>> Do
>>> you mean using Beam as a scheduler of other ETL workflows?
>>>
>>>   Jan
>>>
>>> On 12/14/23 13:17, data_nerd_666 wrote:
>>> > Hi all,
>>> >
>>> > I am new to apache beam, and am very excited to find beam in
>>> apache
>>> > community. I see lots of use cases of using apache beam for data
>>> flow
>>> > (process large amount of batch/streaming data). I am just
>>> wondering
>>> > whether I can use apache beam for control flow (ETL workflow). I
>>> don't
>>> > mean the spark/flink job in the ETL workflow, I mean the ETL
>>> workflow
>>> > itself. Because ETL workflow is also a DAG which is very similar
>>> as
>>> > the