Okay, after a brief detour through "get this working in the Flink Portable
Runner" I think I have something pretty workable.

PInput and POutput can actually be structs rather than protocols, which
simplifies things quite a bit. It also allows us to use them with property
wrappers for a SwiftUI-like experience if we want when defining DoFns
(which is what I was originally intending to use them for). That also means
the function signature you use for closures would match full-fledged DoFn
definitions for the most part which is satisfying.



On Thu, Aug 24, 2023 at 5:55 PM Byron Ellis <byronel...@google.com> wrote:

> Okay, I tried a couple of different things.
>
> Implicitly passing the timestamp and window during iteration did not go
> well. While physically possible it introduces an invisible side effect into
> loop iteration which confused me when I tried to use it and I implemented
> it. Also, I'm pretty sure there'd end up being some sort of race condition
> nightmare continuing down that path.
>
> What I decided to do instead was the following:
>
> 1. Rename the existing "pardo" functions to "pstream" and require that
> they always emit a window and timestamp along with their value. This
> eliminates the side effect but lets us keep iteration in a bundle where
> that might be convenient. For example, in my cheesy GCS implementation it
> means that I can keep an OAuth token around for the lifetime of the bundle
> as a local variable, which is convenient. It's a bit more typing for users
> of pstream, but the expectation here is that if you're using pstream
> functions You Know What You Are Doing and most people won't be using it
> directly.
>
> 2. Introduce a new set of pardo functions (I didn't do all of them yet,
> but enough to test the functionality and decide I liked it) which take a
> function signature of (any PInput<InputType>,any POutput<OutputType>).
> PInput takes the (InputType,Date,Window) tuple and converts it into a
> struct with friendlier names. Not strictly necessary, but makes the code
> nicer to read I think. POutput introduces emit functions that optionally
> allow you to specify a timestamp and a window. If you don't for either one
> it will take the timestamp and/or window of the input.
>
> Trying to use that was pretty pleasant to use so I think we should
> continue down that path. If you'd like to see it in use, I reimplemented
> map() and flatMap() in terms of this new pardo functionality.
>
> Code has been pushed to the branch/PR if you're interested in taking a
> look.
>
>
>
>
>
>
>
>
>
> On Thu, Aug 24, 2023 at 2:15 PM Byron Ellis <byronel...@google.com> wrote:
>
>> Gotcha, I think there's a fairly easy solution to link input and output
>> streams.... Let me try it out... might even be possible to have both
>> element and stream-wise closure pardos. Definitely possible to have that at
>> the DoFn level (called SerializableFn in the SDK because I want to
>> use @DoFn as a macro)
>>
>> On Thu, Aug 24, 2023 at 1:09 PM Robert Bradshaw <rober...@google.com>
>> wrote:
>>
>>> On Thu, Aug 24, 2023 at 12:58 PM Chamikara Jayalath <
>>> chamik...@google.com> wrote:
>>>
>>>>
>>>>
>>>> On Thu, Aug 24, 2023 at 12:27 PM Robert Bradshaw <rober...@google.com>
>>>> wrote:
>>>>
>>>>> I would like to figure out a way to get the stream-y interface to
>>>>> work, as I think it's more natural overall.
>>>>>
>>>>> One hypothesis is that if any elements are carried over loop
>>>>> iterations, there will likely be some that are carried over beyond the 
>>>>> loop
>>>>> (after all the callee doesn't know when the loop is supposed to end). We
>>>>> could reject "plain" elements that are emitted after this point, requiring
>>>>> one to emit timestamp-windowed-values.
>>>>>
>>>>
>>>> Are you assuming that the same stream (or overlapping sets of data) are
>>>> pushed to multiple workers ? I thought that the set of data streamed here
>>>> are the data that belong to the current bundle (hence already assigned to
>>>> the current worker) so any output from the current bundle invocation would
>>>> be a valid output of that bundle.
>>>>
>>>>>
>>> Yes, the content of the stream is exactly the contents of the bundle.
>>> The question is how to do the input_element:output_element correlation for
>>> automatically propagating metadata.
>>>
>>>
>>>> Related to this, we could enforce that the only (user-accessible) way
>>>>> to get such a timestamped value is to start with one, e.g. a
>>>>> WindowedValue<T>.withValue(O) produces a WindowedValue<O> with the same
>>>>> metadata but a new value. Thus a user wanting to do anything "fancy" would
>>>>> have to explicitly request iteration over these windowed values rather 
>>>>> than
>>>>> over the raw elements. (This is also forward compatible with expanding the
>>>>> metadata that can get attached, e.g. pane infos, and makes the right thing
>>>>> the easiest/most natural.)
>>>>>
>>>>> On Thu, Aug 24, 2023 at 12:10 PM Byron Ellis <byronel...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Ah, that is a good point—being element-wise would make managing
>>>>>> windows and time stamps easier for the user. Fortunately it’s a fairly 
>>>>>> easy
>>>>>> change to make and maybe even less typing for the user. I was originally
>>>>>> thinking side inputs and metrics would happen outside the loop, but I 
>>>>>> think
>>>>>> you want a class and not a closure at that point for sanity.
>>>>>>
>>>>>> On Thu, Aug 24, 2023 at 12:02 PM Robert Bradshaw <rober...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Ah, I see.
>>>>>>>
>>>>>>> Yeah, I've thought about using an iterable for the whole bundle
>>>>>>> rather than start/finish bundle callbacks, but one of the questions is 
>>>>>>> how
>>>>>>> that would impact implicit passing of the timestamp (and other) metadata
>>>>>>> from input elements to output elements. (You can of course attach the
>>>>>>> metadata to any output that happens in the loop body, but it's very 
>>>>>>> easy to
>>>>>>> implicitly to break the 1:1 relationship here (e.g. by doing buffering 
>>>>>>> or
>>>>>>> otherwise modifying local state) and this would be hard to detect. (I
>>>>>>> suppose trying to output after the loop finishes could require
>>>>>>> something more explicit).
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Aug 23, 2023 at 6:56 PM Byron Ellis <byronel...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Oh, I also forgot to mention that I included element-wise
>>>>>>>> collection operations like "map" that eliminate the need for pardo in 
>>>>>>>> many
>>>>>>>> cases. the groupBy command is actually a map + groupByKey under the 
>>>>>>>> hood.
>>>>>>>> That was to be more consistent with Swift's collection protocol (and is
>>>>>>>> also why PCollection and PCollectionStream are different types...
>>>>>>>> PCollection implements map and friends as pipeline construction 
>>>>>>>> operations
>>>>>>>> whereas PCollectionStream is an actual stream)
>>>>>>>>
>>>>>>>> I just happened to push some "IO primitives" that uses map rather
>>>>>>>> than pardo in a couple of places to do a true wordcount using good ol'
>>>>>>>> Shakespeare and very very primitive GCS IO.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> B
>>>>>>>>
>>>>>>>> On Wed, Aug 23, 2023 at 6:08 PM Byron Ellis <byronel...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Indeed :-) Yeah, I went back and forth on the pardo syntax quite a
>>>>>>>>> bit before settling on where I ended up. Ultimately I decided to go 
>>>>>>>>> with
>>>>>>>>> something that felt more Swift-y than anything else which means that 
>>>>>>>>> rather
>>>>>>>>> than dealing with a single element like you do in the other SDKs 
>>>>>>>>> you're
>>>>>>>>> dealing with a stream of elements (which of course will often be of 
>>>>>>>>> size
>>>>>>>>> 1). That's a really natural paradigm in the Swift world especially 
>>>>>>>>> with the
>>>>>>>>> async / await structures. So when you see something like:
>>>>>>>>>
>>>>>>>>> pardo(name:"Read Files") { filenames,output,errors in
>>>>>>>>>
>>>>>>>>> for try await (filename,_,_) in filenames {
>>>>>>>>>   ...
>>>>>>>>>   output.emit(data)
>>>>>>>>>
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> filenames is the input stream and then output and errors are both
>>>>>>>>> output streams. In theory you can have as many output streams as you 
>>>>>>>>> like
>>>>>>>>> though at the moment there's a compiler bug in the new type pack 
>>>>>>>>> feature
>>>>>>>>> that limits it to "as many as I felt like supporting". Presumably 
>>>>>>>>> this will
>>>>>>>>> get fixed before the official 5.9 release which will probably be in 
>>>>>>>>> the
>>>>>>>>> October timeframe if history is any guide)
>>>>>>>>>
>>>>>>>>> If you had parameterization you wanted to send that would look
>>>>>>>>> like pardo("Parameter") { param,filenames,output,error in ... } where
>>>>>>>>> "param" would take on the value of "Parameter." All of this is being
>>>>>>>>> typechecked at compile time BTW.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> the (filename,_,_) is a tuple spreading construct like you have in
>>>>>>>>> ES6 and other things where "_" is Swift for "ignore." In this case
>>>>>>>>> PCollectionStreams have an element signature of (Of,Date,Window) so 
>>>>>>>>> you can
>>>>>>>>> optionally extract the timestamp and the window if you want to 
>>>>>>>>> manipulate
>>>>>>>>> it somehow.
>>>>>>>>>
>>>>>>>>> That said it would also be natural to provide elementwise
>>>>>>>>> pardos--- that would probably mean having explicit type signatures in 
>>>>>>>>> the
>>>>>>>>> closure. I had that at one point, but it felt less natural the more I 
>>>>>>>>> used
>>>>>>>>> it. I'm also slowly working towards adding a more "traditional" DoFn
>>>>>>>>> implementation approach where you implement the DoFn as an object 
>>>>>>>>> type. In
>>>>>>>>> that case it would be very very easy to support both by having a 
>>>>>>>>> default
>>>>>>>>> stream implementation call the equivalent of processElement. To make 
>>>>>>>>> that
>>>>>>>>> performant I need to implement an @DoFn macro and I just haven't 
>>>>>>>>> gotten to
>>>>>>>>> it yet.
>>>>>>>>>
>>>>>>>>> It's a bit more work and I've been prioritizing implementing
>>>>>>>>> composite and external transforms for the reasons you suggest. :-) 
>>>>>>>>> I've got
>>>>>>>>> the basics of a composite transform (there's an equivalent wordcount
>>>>>>>>> example) and am hooking it into the pipeline generation, which should 
>>>>>>>>> also
>>>>>>>>> give me everything I need to successfully hook in external transforms 
>>>>>>>>> as
>>>>>>>>> well. That will give me the jump on IOs as you say. I can also treat 
>>>>>>>>> the
>>>>>>>>> pipeline itself as a composite transform which lets me get rid of the
>>>>>>>>> Pipeline { pipeline in ... } and just instead have things attach 
>>>>>>>>> themselves
>>>>>>>>> to the pipeline implicitly.
>>>>>>>>>
>>>>>>>>> That said, there are some interesting IO possibilities that would
>>>>>>>>> be Swift native. In particularly, I've been looking at the native 
>>>>>>>>> Swift
>>>>>>>>> binding for DuckDB (which is C++ based). DuckDB is SQL based but not
>>>>>>>>> distributed in the same was as, say, Beam SQL... but it would allow 
>>>>>>>>> for SQL
>>>>>>>>> statements on individual files with projection pushdown supported for
>>>>>>>>> things like Parquet which could have some cool and performant data 
>>>>>>>>> lake
>>>>>>>>> applications. I'll probably do a couple of the simpler IOs as
>>>>>>>>> well---there's a Swift AWS SDK binding that's pretty good that would 
>>>>>>>>> give
>>>>>>>>> me S3 and there's a Cloud auth library as well that makes it pretty 
>>>>>>>>> easy to
>>>>>>>>> work with GCS.
>>>>>>>>>
>>>>>>>>> In any case, I'm updating the branch as I find a minute here and
>>>>>>>>> there.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> B
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Aug 23, 2023 at 5:02 PM Robert Bradshaw <
>>>>>>>>> rober...@google.com> wrote:
>>>>>>>>>
>>>>>>>>>> Neat.
>>>>>>>>>>
>>>>>>>>>> Nothing like writing and SDK to actually understand how the FnAPI
>>>>>>>>>> works :). I like the use of groupBy. I have to admit I'm a bit 
>>>>>>>>>> mystified by
>>>>>>>>>> the syntax for parDo (I don't know swift at all which is probably 
>>>>>>>>>> tripping
>>>>>>>>>> me up). The addition of external (cross-language) transforms could 
>>>>>>>>>> let you
>>>>>>>>>> steal everything (e.g. IOs) pretty quickly from other SDKs.
>>>>>>>>>>
>>>>>>>>>> On Fri, Aug 18, 2023 at 7:55 AM Byron Ellis via user <
>>>>>>>>>> u...@beam.apache.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> For everyone who is interested, here's the draft PR:
>>>>>>>>>>>
>>>>>>>>>>> https://github.com/apache/beam/pull/28062
>>>>>>>>>>>
>>>>>>>>>>> I haven't had a chance to test it on my M1 machine yet though
>>>>>>>>>>> (there's a good chance there are a few places that need to properly 
>>>>>>>>>>> address
>>>>>>>>>>> endianness. Specifically timestamps in windowed values and length in
>>>>>>>>>>> iterable coders as those both use specifically bigendian 
>>>>>>>>>>> representations)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Aug 17, 2023 at 8:57 PM Byron Ellis <
>>>>>>>>>>> byronel...@google.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thanks Cham,
>>>>>>>>>>>>
>>>>>>>>>>>> Definitely happy to open a draft PR so folks can
>>>>>>>>>>>> comment---there's not as much code as it looks like since most of 
>>>>>>>>>>>> the LOC
>>>>>>>>>>>> is just generated protobuf. As for the support, I definitely want 
>>>>>>>>>>>> to add
>>>>>>>>>>>> external transforms and may actually add that support before 
>>>>>>>>>>>> adding the
>>>>>>>>>>>> ability to make composites in the language itself. With the way 
>>>>>>>>>>>> the SDK is
>>>>>>>>>>>> laid out adding composites to the pipeline graph is a separate 
>>>>>>>>>>>> operation
>>>>>>>>>>>> than defining a composite.
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Aug 17, 2023 at 4:28 PM Chamikara Jayalath <
>>>>>>>>>>>> chamik...@google.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks Byron. This sounds great. I wonder if there is interest
>>>>>>>>>>>>> in Swift SDK from folks currently subscribed to the +user
>>>>>>>>>>>>> <u...@beam.apache.org> list.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Aug 16, 2023 at 6:53 PM Byron Ellis via dev <
>>>>>>>>>>>>> dev@beam.apache.org> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hello everyone,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> A couple of months ago I decided that I wanted to really
>>>>>>>>>>>>>> understand how the Beam FnApi works and how it interacts with 
>>>>>>>>>>>>>> the Portable
>>>>>>>>>>>>>> Runner. For me at least that usually means I need to write some 
>>>>>>>>>>>>>> code so I
>>>>>>>>>>>>>> can see things happening in a debugger and to really prove to 
>>>>>>>>>>>>>> myself I
>>>>>>>>>>>>>> understood what was going on I decided I couldn't use an 
>>>>>>>>>>>>>> existing SDK
>>>>>>>>>>>>>> language to do it since there would be the temptation to read 
>>>>>>>>>>>>>> some code and
>>>>>>>>>>>>>> convince myself that I actually understood what was going on.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> One thing led to another and it turns out that to get a
>>>>>>>>>>>>>> minimal FnApi integration going you end up writing a fair bit of 
>>>>>>>>>>>>>> an SDK. So
>>>>>>>>>>>>>> I decided to take things to a point where I had an SDK that 
>>>>>>>>>>>>>> could execute a
>>>>>>>>>>>>>> word count example via a portable runner backend. I've now 
>>>>>>>>>>>>>> reached that
>>>>>>>>>>>>>> point and would like to submit my prototype SDK to the list for 
>>>>>>>>>>>>>> feedback.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> It's currently living in a branch on my fork here:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> https://github.com/byronellis/beam/tree/swift-sdk/sdks/swift
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> At the moment it runs via the most recent XCode Beta using
>>>>>>>>>>>>>> Swift 5.9 on Intel Macs, but should also work using beta builds 
>>>>>>>>>>>>>> of 5.9 for
>>>>>>>>>>>>>> Linux running on Intel hardware. I haven't had a chance to try 
>>>>>>>>>>>>>> it on ARM
>>>>>>>>>>>>>> hardware and make sure all of the endian checks are complete. The
>>>>>>>>>>>>>> "IntegrationTests.swift" file contains a word count example that 
>>>>>>>>>>>>>> reads some
>>>>>>>>>>>>>> local files (as well as a missing file to exercise DLQ 
>>>>>>>>>>>>>> functionality) and
>>>>>>>>>>>>>> output counts through two separate group by operations to get it 
>>>>>>>>>>>>>> past the
>>>>>>>>>>>>>> "map reduce" size of pipeline. I've tested it against the Python 
>>>>>>>>>>>>>> Portable
>>>>>>>>>>>>>> Runner. Since my goal was to learn FnApi there is no Direct 
>>>>>>>>>>>>>> Runner at this
>>>>>>>>>>>>>> time.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I've shown it to a couple of folks already and incorporated
>>>>>>>>>>>>>> some of that feedback already (for example pardo was originally 
>>>>>>>>>>>>>> called dofn
>>>>>>>>>>>>>> when defining pipelines). In general I've tried to make the API 
>>>>>>>>>>>>>> as
>>>>>>>>>>>>>> "Swift-y" as possible, hence the heavy reliance on closures and 
>>>>>>>>>>>>>> while there
>>>>>>>>>>>>>> aren't yet composite PTransforms there's the beginnings of what 
>>>>>>>>>>>>>> would be
>>>>>>>>>>>>>> needed for a SwiftUI-like declarative API for creating them.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> There are of course a ton of missing bits still to be
>>>>>>>>>>>>>> implemented, like counters, metrics, windowing, state, timers, 
>>>>>>>>>>>>>> etc.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> This should be fine and we can get the code documented without
>>>>>>>>>>>>> these features. I think support for composites and adding an 
>>>>>>>>>>>>> external
>>>>>>>>>>>>> transform (see, Java
>>>>>>>>>>>>> <https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java>,
>>>>>>>>>>>>> Python
>>>>>>>>>>>>> <https://github.com/apache/beam/blob/c7b7921185686da573f76ce7320817c32375c7d0/sdks/python/apache_beam/transforms/external.py#L556>,
>>>>>>>>>>>>> Go
>>>>>>>>>>>>> <https://github.com/apache/beam/blob/c7b7921185686da573f76ce7320817c32375c7d0/sdks/go/pkg/beam/xlang.go#L155>,
>>>>>>>>>>>>> TypeScript
>>>>>>>>>>>>> <https://github.com/apache/beam/blob/master/sdks/typescript/src/apache_beam/transforms/external.ts>)
>>>>>>>>>>>>> to add support for multi-lang will bring in a lot of features 
>>>>>>>>>>>>> (for example,
>>>>>>>>>>>>> I/O connectors) for free.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Any and all feedback welcome and happy to submit a PR if
>>>>>>>>>>>>>> folks are interested, though the "Swift Way" would be to have it 
>>>>>>>>>>>>>> in its own
>>>>>>>>>>>>>> repo so that it can easily be used from the Swift Package 
>>>>>>>>>>>>>> Manager.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> +1 for creating a PR (may be as a draft initially). Also it'll
>>>>>>>>>>>>> be easier to comment on a PR :)
>>>>>>>>>>>>>
>>>>>>>>>>>>> - Cham
>>>>>>>>>>>>>
>>>>>>>>>>>>> [1]
>>>>>>>>>>>>> [2]
>>>>>>>>>>>>> [3]
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>> B
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>

Reply via email to