Re: PCollection#applyWindowingStrategyInternal

2024-04-25 Thread Reuven Lax via dev
I think there is more to it than that. You'll probably want retractions
integrated into Beam's core triggering.

One example of where retractions are needed is with session windows. Early
triggers are fairly broken with session windows because the windows
themselves change as more data arrive. So an early trigger might generate
data for two separate windows S1 and S2, but after more data arrives those
two windows merge into a single S3. Retractions solve this by hooking into
the window merging logic, retracting the outputs for S1 and S2 before
outputting S3. I don't think this is possible today with a DSL.

On Thu, Apr 25, 2024 at 5:46 AM Jan Lukavský  wrote:

> > To implement retraction (at least to do so efficiently) I think you'll
> want it integrated in the model. e.g. for combiners one would want to add
> the option to subtract values, otherwise you would end up having to store
> every element defeating the performance that combiners provide.
>
> I think we use different words for the same. :) This is what I meant by
> "and define appropriate retraction functions to CombineFn and the like".
>
> On the other hand, you _could_ derive CombineFn with retraction
> capabilities, if you have a DSL that provides "retraction accumulation"
> function along with "addition accumulation", this should be possible to be
> combined into a CombineFn as we have it today. This should only require
> that the operation being combined is associative, commutative and have a
> valid unary operator '-' (which will be the result of the "retraction
> combine").
> On 4/23/24 18:08, Reuven Lax via dev wrote:
>
>
>
> On Tue, Apr 23, 2024 at 7:52 AM Jan Lukavský  wrote:
>
>> On 4/22/24 20:40, Kenneth Knowles wrote:
>>
>> I'll go ahead and advertise https://s.apache.org/beam-sink-triggers
>> again for this thread.
>>
>> +1
>>
>>
>> There are a couple of difficult technical problems in there. One of them
>> is backwards-propagating triggering to minimize extra latency. We can
>> probably solve this as well as we solve forward-propagating without too
>> much trouble. We also can/should leave it abstract so runners can implement
>> either through back-propagating local triggering rules or using run-time
>> communication to trigger upstream. These actually could interact well with
>> stateful ParDo by sending a "trigger now please" message or some such.
>>
>> Yes, this was what I was referring to as the more "functional" style for
>> stateful ParDo. At minimum, it requires adding new callback independent of
>> @ProcessElement and @OnTimer -  @OnTrigger?
>>
>>
>> But we also probably need retractions that automatically flow through the
>> pipeline and update aggregations. Why? Because currently triggers don't
>> just control update frequency but actually create new elements each
>> time, so they require user fix-up logic to do the right thing with the
>> output. When we go to higher levels of abstraction we need this to "just
>> work" without changing the pipeline. There have been two (nearly identical)
>> propotypes of adding retractions to the DirectRunner as proof of concept.
>> But there's also work in all the IOs since they are not retraction-aware.
>> Also lots of work in many library transforms where a retraction should be
>> computed by running the transform "like normal" but then negating the
>> result, but that cannot be the default for ParDo because it is deliberately
>> more flexible, we just have to annotate Map and the like.
>>
>> +1. I think retractions could be implemented as DSL on top of the current
>> model. Retractions can be viewed as regular data elements with additional
>> metadata (upsert, delete). For ParDo we could add something like
>> @RetractElement (and define appropriate retraction functions to CombineFn
>> and the like). We could introduce RetractedPCollection or similar for this
>> purpose.
>>
>
> To implement retraction (at least to do so efficiently) I think you'll
> want it integrated in the model. e.g. for combiners one would want to add
> the option to subtract values, otherwise you would end up having to store
> every element defeating the performance that combiners provide.
>
>>
>> Getting all this right is a lot of work but would result in a system that
>> is simpler to use out-of-the-box and a more robust SQL implementation
>> (because you can't use triggers with SQL unless you have retractions or
>> some other "just works" mode of computation). It would essentially change
>> Beam into

Re: PCollection#applyWindowingStrategyInternal

2024-04-23 Thread Reuven Lax via dev
On Tue, Apr 23, 2024 at 7:52 AM Jan Lukavský  wrote:

> On 4/22/24 20:40, Kenneth Knowles wrote:
>
> I'll go ahead and advertise https://s.apache.org/beam-sink-triggers again
> for this thread.
>
> +1
>
>
> There are a couple of difficult technical problems in there. One of them
> is backwards-propagating triggering to minimize extra latency. We can
> probably solve this as well as we solve forward-propagating without too
> much trouble. We also can/should leave it abstract so runners can implement
> either through back-propagating local triggering rules or using run-time
> communication to trigger upstream. These actually could interact well with
> stateful ParDo by sending a "trigger now please" message or some such.
>
> Yes, this was what I was referring to as the more "functional" style for
> stateful ParDo. At minimum, it requires adding new callback independent of
> @ProcessElement and @OnTimer -  @OnTrigger?
>
>
> But we also probably need retractions that automatically flow through the
> pipeline and update aggregations. Why? Because currently triggers don't
> just control update frequency but actually create new elements each
> time, so they require user fix-up logic to do the right thing with the
> output. When we go to higher levels of abstraction we need this to "just
> work" without changing the pipeline. There have been two (nearly identical)
> propotypes of adding retractions to the DirectRunner as proof of concept.
> But there's also work in all the IOs since they are not retraction-aware.
> Also lots of work in many library transforms where a retraction should be
> computed by running the transform "like normal" but then negating the
> result, but that cannot be the default for ParDo because it is deliberately
> more flexible, we just have to annotate Map and the like.
>
> +1. I think retractions could be implemented as DSL on top of the current
> model. Retractions can be viewed as regular data elements with additional
> metadata (upsert, delete). For ParDo we could add something like
> @RetractElement (and define appropriate retraction functions to CombineFn
> and the like). We could introduce RetractedPCollection or similar for this
> purpose.
>

To implement retraction (at least to do so efficiently) I think you'll want
it integrated in the model. e.g. for combiners one would want to add the
option to subtract values, otherwise you would end up having to store every
element defeating the performance that combiners provide.

>
> Getting all this right is a lot of work but would result in a system that
> is simpler to use out-of-the-box and a more robust SQL implementation
> (because you can't use triggers with SQL unless you have retractions or
> some other "just works" mode of computation). It would essentially change
> Beam into a delta-processing engine, which it arguably should be, with
> whole append-only elements being a simplest degenerate case of a delta
> (which would be highly optimized in batch/archival processing).
>
> +1
>
>
> Kenn
>
> On Tue, Apr 16, 2024 at 2:36 AM Reuven Lax via dev 
> wrote:
>
>> Yes, but that's inevitable as stateful ParDo in a sense live outside of
>> most of the window/trigger semantics. Basically a stateful ParDo is the
>> user executing low-level control over these semantics, and controlling
>> output frequency themselves with timers. One could however still propagate
>> the trigger upstream of the stateful ParDo, though I'm not sure if that's
>> the best approach.
>>
>> On Mon, Apr 15, 2024 at 11:31 PM Jan Lukavský  wrote:
>>
>>> On 4/11/24 18:20, Reuven Lax via dev wrote:
>>>
>>> I'm not sure it would require all that. A "basic" implementation could
>>> be done on top of our existing model. Essentially the user would specify
>>> triggers at the sink ParDos, then the runner would walk backwards up the
>>> graph, reverse-propagating these triggers (with some resolution rules aimed
>>> at keeping the minimum trigger latency). The runner could under the covers
>>> simply just apply the appropriate trigger into the Window, using the
>>> current mechanism. Of course building this all into the framework from
>>> scratch would be cleaner, but we could also build this on top of what we
>>> have.
>>>
>>> Any propagation from sink to source would be blocked by any stateful
>>> ParDo, because that does not adhere to the concept of trigger, no? Hence,
>>> we could get the required downstream 'cadence' of outputs, but these would
>>> change only when the upstream ParDo emits any da

Re: PCollection#applyWindowingStrategyInternal

2024-04-15 Thread Reuven Lax via dev
Yes, but that's inevitable as stateful ParDo in a sense live outside of
most of the window/trigger semantics. Basically a stateful ParDo is the
user executing low-level control over these semantics, and controlling
output frequency themselves with timers. One could however still propagate
the trigger upstream of the stateful ParDo, though I'm not sure if that's
the best approach.

On Mon, Apr 15, 2024 at 11:31 PM Jan Lukavský  wrote:

> On 4/11/24 18:20, Reuven Lax via dev wrote:
>
> I'm not sure it would require all that. A "basic" implementation could be
> done on top of our existing model. Essentially the user would specify
> triggers at the sink ParDos, then the runner would walk backwards up the
> graph, reverse-propagating these triggers (with some resolution rules aimed
> at keeping the minimum trigger latency). The runner could under the covers
> simply just apply the appropriate trigger into the Window, using the
> current mechanism. Of course building this all into the framework from
> scratch would be cleaner, but we could also build this on top of what we
> have.
>
> Any propagation from sink to source would be blocked by any stateful
> ParDo, because that does not adhere to the concept of trigger, no? Hence,
> we could get the required downstream 'cadence' of outputs, but these would
> change only when the upstream ParDo emits any data. Yes, one can argue that
> stateful ParDo is supposed to emit data at fast as possible, then this
> seems to work.
>
>
> On Thu, Apr 11, 2024 at 5:10 AM Jan Lukavský  wrote:
>
>> I've probably heard about it, but I never read the proposal. Sounds
>> great, but that would require to change our ParDos from the 'directive'
>> style to something more functional, so that processing of elements, state
>> updates and outputting results can be decoupled and managed by the runner
>> independently. This goes exactly in the direction of unifying GBK and
>> Combine with stateful ParDo. Sounds like something worth exploring for Beam
>> 3. :)
>>
>> Anyway, thanks for this discussion, helped me clarify some more white
>> spots.
>>
>>  Jan
>> On 4/10/24 19:24, Reuven Lax via dev wrote:
>>
>> Are you familiar with the "sink triggers" proposal?
>>
>> Essentially while windowing is usually a property of the data, and
>> therefore flows downwards through the graph, triggering is usually a
>> property of output (i.e. sink) latency - how much are you willing to wait
>> to see data, and what semantics do you want for this early data. Ideally
>> triggers should be specified separately at the ParDo level (Beam has no
>> real notion of Sinks as a special object, so to allow for output
>> specification it has to be on the ParDo), and the triggers should propagate
>> up the graph back to the source. This is in contrast to today where we
>> attach triggering to the windowing information.
>>
>> This was a proposal some years back and there was some effort made to
>> implement it, but the implementation never really got off the ground.
>>
>> On Wed, Apr 10, 2024 at 12:43 AM Jan Lukavský  wrote:
>>
>>> On 4/9/24 18:33, Kenneth Knowles wrote:
>>>
>>> At a top level `setWindowingStrategyInternal` exists to set up the
>>> metadata without actually assigning windows. If we were more clever we
>>> might have found a way for it to not be public... it is something that can
>>> easily lead to an invalid pipeline.
>>>
>>> Yes, that was what hit me about one minute after I started this thread.
>>> :)
>>>
>>>
>>> I think "compatible windows" today in Beam doesn't have very good uses
>>> anyhow. I do see how when you are flattening PCollections you might also
>>> want to explicitly have a function that says "and here is how to reconcile
>>> their different metadata". But is it not reasonable to use
>>> Window.into(global window)? It doesn't seem like boilerplate to me
>>> actually, but something you really want to know is happening.
>>>
>>> :)
>>>
>>> Of course this was the way out, but I was somewhat intuitively seeking
>>> something that could go this autonomously.
>>>
>>> Generally speaking, we might have some room for improvement in the way
>>> we handle windows and triggers - windows relate only to GBK and stateful
>>> ParDo, triggers relate to GBK only. They have no semantics if downstream
>>> processing does not use any of these. There could be a pipeline
>>> preprocessing stage that would discard (replace 

Re: [Feature proposal] Java Record Schema inference

2024-04-15 Thread Reuven Lax via dev
Some initial thoughts:

Making schema inference handle generic classes would be a nice improvement
- users occasionally bump into this restriction, and there's no reason not
to improve it.

I would recommend using the new Java reflection APIs (i.e.
getRecordComponents) to directly infer the schema. I think we'll end up
with less error-prone code that way.

We should still use the codegen path for generating efficient Row objects
here, otherwise Record classes will end up being significantly less
efficient than regular Java objects. Since I believe that Record classes
expand out into normal classes, we should be able to reuse the existing
code (i.e. JavaFieldSchema.java and PojotUtils.java) with maybe some small
modifications.

On Mon, Apr 15, 2024 at 8:03 AM Maciej Szwaja via dev 
wrote:

> Hi team,
>
> I'd like to propose a new java sdk extension feature, which is adding
> support for java record schema inference - see the design doc here:
> https://docs.google.com/document/d/1zSQ9cnqtVM8ttJEuHBDE6hw4qjUuJy1dpZWB6IBTuOs/edit?usp=sharing
>
> In short - adding this extension's jar to the classpath would enable users
> to use java 17 record classes as elements of the PCollections simply by
> annotating them with DefaultSchema annotation (pointing to the new
> RecordSchema provider) similarly to how it's currently possible with
> JavaBean or AutoValue classes.
>
> Let me know what you think, there's already an open feature request
> created last year (https://github.com/apache/beam/issues/27802), I
> could simply take it and start working on it if the proposal gets approved
>
> Thanks,
> Maciej
>


Re: PCollection#applyWindowingStrategyInternal

2024-04-11 Thread Reuven Lax via dev
I'm not sure it would require all that. A "basic" implementation could be
done on top of our existing model. Essentially the user would specify
triggers at the sink ParDos, then the runner would walk backwards up the
graph, reverse-propagating these triggers (with some resolution rules aimed
at keeping the minimum trigger latency). The runner could under the covers
simply just apply the appropriate trigger into the Window, using the
current mechanism. Of course building this all into the framework from
scratch would be cleaner, but we could also build this on top of what we
have.

On Thu, Apr 11, 2024 at 5:10 AM Jan Lukavský  wrote:

> I've probably heard about it, but I never read the proposal. Sounds great,
> but that would require to change our ParDos from the 'directive' style to
> something more functional, so that processing of elements, state updates
> and outputting results can be decoupled and managed by the runner
> independently. This goes exactly in the direction of unifying GBK and
> Combine with stateful ParDo. Sounds like something worth exploring for Beam
> 3. :)
>
> Anyway, thanks for this discussion, helped me clarify some more white
> spots.
>
>  Jan
> On 4/10/24 19:24, Reuven Lax via dev wrote:
>
> Are you familiar with the "sink triggers" proposal?
>
> Essentially while windowing is usually a property of the data, and
> therefore flows downwards through the graph, triggering is usually a
> property of output (i.e. sink) latency - how much are you willing to wait
> to see data, and what semantics do you want for this early data. Ideally
> triggers should be specified separately at the ParDo level (Beam has no
> real notion of Sinks as a special object, so to allow for output
> specification it has to be on the ParDo), and the triggers should propagate
> up the graph back to the source. This is in contrast to today where we
> attach triggering to the windowing information.
>
> This was a proposal some years back and there was some effort made to
> implement it, but the implementation never really got off the ground.
>
> On Wed, Apr 10, 2024 at 12:43 AM Jan Lukavský  wrote:
>
>> On 4/9/24 18:33, Kenneth Knowles wrote:
>>
>> At a top level `setWindowingStrategyInternal` exists to set up the
>> metadata without actually assigning windows. If we were more clever we
>> might have found a way for it to not be public... it is something that can
>> easily lead to an invalid pipeline.
>>
>> Yes, that was what hit me about one minute after I started this thread. :)
>>
>>
>> I think "compatible windows" today in Beam doesn't have very good uses
>> anyhow. I do see how when you are flattening PCollections you might also
>> want to explicitly have a function that says "and here is how to reconcile
>> their different metadata". But is it not reasonable to use
>> Window.into(global window)? It doesn't seem like boilerplate to me
>> actually, but something you really want to know is happening.
>>
>> :)
>>
>> Of course this was the way out, but I was somewhat intuitively seeking
>> something that could go this autonomously.
>>
>> Generally speaking, we might have some room for improvement in the way we
>> handle windows and triggers - windows relate only to GBK and stateful
>> ParDo, triggers relate to GBK only. They have no semantics if downstream
>> processing does not use any of these. There could be a pipeline
>> preprocessing stage that would discard (replace with meaningful defaults)
>> any of these metadata that is unused, but can cause Pipeline to fail at
>> construction time. It is also (to me) somewhat questionable if triggers are
>> really a property of a PCollection or a property of a specific transform
>> (GBK - ehm, actually (stateless) 'key by' + 'reduce by key', but that is
>> completely different story :)) because (non-default) triggers are likely
>> not preserved across multiple transforms. Maybe the correct subject of this
>> thread could be "are we sure our windowing and triggering semantics is 100%
>> correct"? Probably the - wrong - expectations at the beginning of this
>> thread were due to conflict in my mental model of how things 'could' work
>> as opposed to how they actually work. :)
>>
>>  Jan
>>
>>
>> Kenn
>>
>> On Tue, Apr 9, 2024 at 9:19 AM Jan Lukavský  wrote:
>>
>>> On 4/6/24 21:23, Reuven Lax via dev wrote:
>>>
>>> So the problem here is that windowFn is a property of the PCollection,
>>> not the element, and the result of Flatten is a single PCollection.
>>&

Re: PCollection#applyWindowingStrategyInternal

2024-04-10 Thread Reuven Lax via dev
Are you familiar with the "sink triggers" proposal?

Essentially while windowing is usually a property of the data, and
therefore flows downwards through the graph, triggering is usually a
property of output (i.e. sink) latency - how much are you willing to wait
to see data, and what semantics do you want for this early data. Ideally
triggers should be specified separately at the ParDo level (Beam has no
real notion of Sinks as a special object, so to allow for output
specification it has to be on the ParDo), and the triggers should propagate
up the graph back to the source. This is in contrast to today where we
attach triggering to the windowing information.

This was a proposal some years back and there was some effort made to
implement it, but the implementation never really got off the ground.

On Wed, Apr 10, 2024 at 12:43 AM Jan Lukavský  wrote:

> On 4/9/24 18:33, Kenneth Knowles wrote:
>
> At a top level `setWindowingStrategyInternal` exists to set up the
> metadata without actually assigning windows. If we were more clever we
> might have found a way for it to not be public... it is something that can
> easily lead to an invalid pipeline.
>
> Yes, that was what hit me about one minute after I started this thread. :)
>
>
> I think "compatible windows" today in Beam doesn't have very good uses
> anyhow. I do see how when you are flattening PCollections you might also
> want to explicitly have a function that says "and here is how to reconcile
> their different metadata". But is it not reasonable to use
> Window.into(global window)? It doesn't seem like boilerplate to me
> actually, but something you really want to know is happening.
>
> :)
>
> Of course this was the way out, but I was somewhat intuitively seeking
> something that could go this autonomously.
>
> Generally speaking, we might have some room for improvement in the way we
> handle windows and triggers - windows relate only to GBK and stateful
> ParDo, triggers relate to GBK only. They have no semantics if downstream
> processing does not use any of these. There could be a pipeline
> preprocessing stage that would discard (replace with meaningful defaults)
> any of these metadata that is unused, but can cause Pipeline to fail at
> construction time. It is also (to me) somewhat questionable if triggers are
> really a property of a PCollection or a property of a specific transform
> (GBK - ehm, actually (stateless) 'key by' + 'reduce by key', but that is
> completely different story :)) because (non-default) triggers are likely
> not preserved across multiple transforms. Maybe the correct subject of this
> thread could be "are we sure our windowing and triggering semantics is 100%
> correct"? Probably the - wrong - expectations at the beginning of this
> thread were due to conflict in my mental model of how things 'could' work
> as opposed to how they actually work. :)
>
>  Jan
>
>
> Kenn
>
> On Tue, Apr 9, 2024 at 9:19 AM Jan Lukavský  wrote:
>
>> On 4/6/24 21:23, Reuven Lax via dev wrote:
>>
>> So the problem here is that windowFn is a property of the PCollection,
>> not the element, and the result of Flatten is a single PCollection.
>>
>> Yes. That is the cause of why Flatten.pCollections() needs the same
>> windowFn.
>>
>>
>> In various cases, there is a notion of "compatible" windows. Basically
>> given window functions W1 and W2, provide a W3 that "works" with both.
>>
>> Exactly this would be a nice feature for Flatten, something like
>> 'windowFn resolve strategy', so that if use does not know the windowFn of
>> upstream PCollections this can be somehow resolved at pipeline construction
>> time. Alternatively only as a small syntactic sugar, something like:
>>
>>  
>> Flatten.pCollections().withWindowingStrategy(WindowResolution.into(oneInput.getWindowingStrategy()))
>>
>> or anything similar. This can be done in user code, so it is not
>> something deeper, but might help in some cases. It would be cool if we
>> could reuse concepts from other cases where such mechanism is needed.
>>
>>
>> Note that Beam already has something similar with side inputs, since the
>> side input often is in a different window than the main input. However main
>> input elements are supposed to see side input elements in the same window
>> (and in fact main inputs are blocked until the side-input window is ready),
>> so we must do a mapping. If for example (and very commonly!) the side input
>> is in the global window and the main input is in a fixed window, by default
>> we will remap the global-window elements into the mai

Re: PCollection#applyWindowingStrategyInternal

2024-04-06 Thread Reuven Lax via dev
So the problem here is that windowFn is a property of the PCollection, not
the element, and the result of Flatten is a single PCollection.

In various cases, there is a notion of "compatible" windows. Basically
given window functions W1 and W2, provide a W3 that "works" with both.

Note that Beam already has something similar with side inputs, since the
side input often is in a different window than the main input. However main
input elements are supposed to see side input elements in the same window
(and in fact main inputs are blocked until the side-input window is ready),
so we must do a mapping. If for example (and very commonly!) the side input
is in the global window and the main input is in a fixed window, by default
we will remap the global-window elements into the main-input's fixed window.

In Side input we also allow the user to control this mapping, so for
example side input elements could always map to the previous fixed window
(e.g. while processing window 12-1, you want to see summary data of all
records in the previous window 11-12). Users can do this by providing a
WindowMappingFunction to the View - essentially a function from window to
window. Unfortunately this is hard to use (one must create their own
PCollectionView class) and very poorly documented, so I doubt many users
know about this!

Reuven

On Sat, Apr 6, 2024 at 7:09 AM Jan Lukavský  wrote:

> Immediate self-correction, although setting the strategy directly via
> setWindowingStrategyInternal() *seemed* to be working during Pipeline
> construction time, during runtime it obviously does not work, because
> the PCollection was still windowed using the old windowFn. Make sense to
> me, but there remains the other question if we can make flattening
> PCollections with incompatible windowFns more user-friendly. The current
> approach where we require the same windowFn for all input PCollections
> creates some unnecessary boilerplate code needed on user side.
>
>   Jan
>
> On 4/6/24 15:45, Jan Lukavský wrote:
> > Hi,
> >
> > I came across a case where using
> > PCollection#applyWindowingStrategyInternal seems legit in user core.
> > The case is roughly as follows:
> >
> >  a) compute some streaming statistics
> >
> >  b) apply the same transform (say ComputeWindowedAggregation) with
> > different parameters on these statistics yielding two windowed
> > PCollections - first is global with early trigger, the other is
> > sliding window, the specific parameters of the windowFns are
> > encapsulated in the ComputeWindowedAggregation transform
> >
> >  c) apply the same transform on both of the above PCollections,
> > yielding two PCollections with the same types, but different windowFns
> >
> >  d) flatten these PCollections into single one (e.g. for downstream
> > processing - joining - or flushing to sink)
> >
> > Now, the flatten will not work, because these PCollections have
> > different windowFns. It would be possible to restore the windowing for
> > either of them, but it requires to somewhat break the encapsulation of
> > the transforms that produce the windowed outputs. A more natural
> > solution is to take the WindowingStrategy from the global aggregation
> > and set it via setWindowingStrategyInternal() to the other
> > PCollection. This works, but it uses API that is marked as @Internal
> > (and obviously, the name as well suggests it is not intended for
> > client-code usage).
> >
> > The question is, should we make a legitimate version of this call? Or
> > should we introduce a way for Flatten.pCollections() to re-window the
> > input PCollections appropriately? In the case of conflicting
> > WindowFns, where one of them is GlobalWindowing strategy, it seems to
> > me that the user's intention is quite well-defined (this might extend
> > to some 'flatten windowFn resolution strategy', maybe).
> >
> > WDYT?
> >
> >  Jan
> >
>


Re: Supporting Dynamic Destinations in a portable context

2024-04-02 Thread Reuven Lax via dev
I do suspect that over time we'll find more and more cases we can't
express, and will be asked to extend this little templating in more
directions. To head that off - could we easily just reuse an existing
language (SQL, LUA, something of the form?) instead of creating something
new?

On Tue, Apr 2, 2024 at 8:55 AM Kenneth Knowles  wrote:

> I really like this proposal. I think it has narrowed down and solved the
> essential problem of not shuffling excess redundant data, and also provides
> the vast majority of the functionality that a lambda would, with
> significantly better debugability and usability too, since the dynamic
> destination pattern string can be in display data, etc.
>
> Kenn
>
> On Wed, Mar 27, 2024 at 1:58 PM Robert Bradshaw via dev <
> dev@beam.apache.org> wrote:
>
>> On Wed, Mar 27, 2024 at 10:20 AM Reuven Lax  wrote:
>>
>>> Can the prefix still be generated programmatically at graph creation
>>> time?
>>>
>>
>> Yes. It's just a property of the transform passed by the user at
>> configuration time.
>>
>>
>>> On Wed, Mar 27, 2024 at 9:40 AM Robert Bradshaw 
>>> wrote:
>>>
 On Wed, Mar 27, 2024 at 9:12 AM Reuven Lax  wrote:

> This does seem like the best compromise, though I think there will
> still end up being performance issues. A common pattern I've seen is that
> there is a long common prefix to the dynamic destination followed the
> dynamic component. e.g. the destination might be
> long/common/path/to/destination/files/. In this case, the
> prefix is often much larger than messages themselves and is what gets
> effectively encoded in the lambda.
>

 The idea here is that the destination would be given as a format
 string, say, "long/common/path/to/destination/files/{dest_info.user}".
 Another way to put this is that we support (only) "lambdas" that are
 represented as string substitutions. (The fact that dest_info does not have
 to be part of the record, and can be the output of an arbitrary map if need
 be, makes this restriction not so bad.)

 As well as solving the performance issues, I think this is actually a
 pretty convenient and natural way for the user to name their destination
 (for the common usecase, even easier than providing a lambda), and has the
 benefit of being much more transparent than an arbitrary callable as well
 for introspection (for both machine and human that may look at the
 resulting pipeline).


> I'm not entirely sure how to address this in a portable context. We
> might simply have to accept the extra overhead when going cross language.
>
> Reuven
>
> On Wed, Mar 27, 2024 at 8:51 AM Robert Bradshaw via dev <
> dev@beam.apache.org> wrote:
>
>> Thanks for putting this together, it will be a really useful feature
>> to have.
>>
>> I am in favor of the string-pattern approaches. I think we need to
>> support both the {record=..., dest_info=...} and the elide-fields
>> approaches, as the former is nicer when one has a fixed representation 
>> for
>> the output record (e.g. a proto or avro schema) and the flattened form 
>> for
>> ease of use in more free-form contexts (e.g. when producing records from
>> YAML and SQL).
>>
>> Also left some comments on the doc.
>>
>>
>> On Wed, Mar 27, 2024 at 6:51 AM Ahmed Abualsaud via dev <
>> dev@beam.apache.org> wrote:
>>
>>> Hey all,
>>>
>>> There have been some conversations lately about how best to enable
>>> dynamic destinations in a portable context. Usually, this comes up for
>>> cross-language transforms and more recently for Beam YAML.
>>>
>>> I've started a short doc outlining some routes we could take. The
>>> purpose is to establish a good standard for supporting dynamic 
>>> destinations
>>> with portability, one that can be applied to most use cases and IOs. 
>>> Please
>>> take a look and add any thoughts!
>>>
>>> https://s.apache.org/portable-dynamic-destinations
>>>
>>> Best,
>>> Ahmed
>>>
>>


Re: Supporting Dynamic Destinations in a portable context

2024-03-27 Thread Reuven Lax via dev
Can the prefix still be generated programmatically at graph creation time?

On Wed, Mar 27, 2024 at 9:40 AM Robert Bradshaw  wrote:

> On Wed, Mar 27, 2024 at 9:12 AM Reuven Lax  wrote:
>
>> This does seem like the best compromise, though I think there will still
>> end up being performance issues. A common pattern I've seen is that there
>> is a long common prefix to the dynamic destination followed the dynamic
>> component. e.g. the destination might be
>> long/common/path/to/destination/files/. In this case, the
>> prefix is often much larger than messages themselves and is what gets
>> effectively encoded in the lambda.
>>
>
> The idea here is that the destination would be given as a format string,
> say, "long/common/path/to/destination/files/{dest_info.user}". Another way
> to put this is that we support (only) "lambdas" that are represented as
> string substitutions. (The fact that dest_info does not have to be part of
> the record, and can be the output of an arbitrary map if need be, makes
> this restriction not so bad.)
>
> As well as solving the performance issues, I think this is actually a
> pretty convenient and natural way for the user to name their destination
> (for the common usecase, even easier than providing a lambda), and has the
> benefit of being much more transparent than an arbitrary callable as well
> for introspection (for both machine and human that may look at the
> resulting pipeline).
>
>
>> I'm not entirely sure how to address this in a portable context. We might
>> simply have to accept the extra overhead when going cross language.
>>
>> Reuven
>>
>> On Wed, Mar 27, 2024 at 8:51 AM Robert Bradshaw via dev <
>> dev@beam.apache.org> wrote:
>>
>>> Thanks for putting this together, it will be a really useful feature to
>>> have.
>>>
>>> I am in favor of the string-pattern approaches. I think we need to
>>> support both the {record=..., dest_info=...} and the elide-fields
>>> approaches, as the former is nicer when one has a fixed representation for
>>> the output record (e.g. a proto or avro schema) and the flattened form for
>>> ease of use in more free-form contexts (e.g. when producing records from
>>> YAML and SQL).
>>>
>>> Also left some comments on the doc.
>>>
>>>
>>> On Wed, Mar 27, 2024 at 6:51 AM Ahmed Abualsaud via dev <
>>> dev@beam.apache.org> wrote:
>>>
 Hey all,

 There have been some conversations lately about how best to enable
 dynamic destinations in a portable context. Usually, this comes up for
 cross-language transforms and more recently for Beam YAML.

 I've started a short doc outlining some routes we could take. The
 purpose is to establish a good standard for supporting dynamic destinations
 with portability, one that can be applied to most use cases and IOs. Please
 take a look and add any thoughts!

 https://s.apache.org/portable-dynamic-destinations

 Best,
 Ahmed

>>>


Re: Supporting Dynamic Destinations in a portable context

2024-03-27 Thread Reuven Lax via dev
This does seem like the best compromise, though I think there will still
end up being performance issues. A common pattern I've seen is that there
is a long common prefix to the dynamic destination followed the dynamic
component. e.g. the destination might be
long/common/path/to/destination/files/. In this case, the
prefix is often much larger than messages themselves and is what gets
effectively encoded in the lambda.

I'm not entirely sure how to address this in a portable context. We might
simply have to accept the extra overhead when going cross language.

Reuven

On Wed, Mar 27, 2024 at 8:51 AM Robert Bradshaw via dev 
wrote:

> Thanks for putting this together, it will be a really useful feature to
> have.
>
> I am in favor of the string-pattern approaches. I think we need to support
> both the {record=..., dest_info=...} and the elide-fields approaches, as
> the former is nicer when one has a fixed representation for the
> output record (e.g. a proto or avro schema) and the flattened form for ease
> of use in more free-form contexts (e.g. when producing records from YAML
> and SQL).
>
> Also left some comments on the doc.
>
>
> On Wed, Mar 27, 2024 at 6:51 AM Ahmed Abualsaud via dev <
> dev@beam.apache.org> wrote:
>
>> Hey all,
>>
>> There have been some conversations lately about how best to enable
>> dynamic destinations in a portable context. Usually, this comes up for
>> cross-language transforms and more recently for Beam YAML.
>>
>> I've started a short doc outlining some routes we could take. The purpose
>> is to establish a good standard for supporting dynamic destinations with
>> portability, one that can be applied to most use cases and IOs. Please take
>> a look and add any thoughts!
>>
>> https://s.apache.org/portable-dynamic-destinations
>>
>> Best,
>> Ahmed
>>
>


Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-27 Thread Reuven Lax via dev
On Tue, Feb 27, 2024 at 10:22 AM Robert Bradshaw via dev <
dev@beam.apache.org> wrote:

> On Mon, Feb 26, 2024 at 11:45 AM Kenneth Knowles  wrote:
> >
> > Pulling out focus points:
> >
> > On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev <
> dev@beam.apache.org> wrote:
> > > I can't act on something yet [...] but I expect to be able to [...] at
> some time in the processing-time future.
> >
> > I like this as a clear and internally-consistent feature description. It
> describes ProcessContinuation and those timers which serve the same purpose
> as ProcessContinuation.
> >
> > On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev <
> dev@beam.apache.org> wrote:
> > > I can't think of a batch or streaming scenario where it would be
> correct to not wait at least that long
> >
> > The main reason we created timers: to take action in the absence of
> data. The archetypal use case for processing time timers was/is "flush data
> from state if it has been sitting there too long". For this use case, the
> right behavior for batch is to skip the timer. It is actually basically
> incorrect to wait.
>
> Good point calling out the distinction between "I need to wait in case
> there's more data." and "I need to wait for something external." We
> can't currently distinguish between the two, but a batch runner can
> say something definitive about the first. Feels like we need a new
> primitive (or at least new signaling information on our existing
> primitive).
>
> BTW the first is also relevant to drain. One reason drain often takes a
long time today is because it has to wait for processing-time timers to
fire (it has to wait because those timers have watermark holds), but
usually those timers are noops.


> > On Fri, Feb 23, 2024 at 3:54 PM Robert Burke 
> wrote:
> > > It doesn't require a new primitive.
> >
> > IMO what's being proposed *is* a new primitive. I think it is a good
> primitive. It is the underlying primitive to ProcessContinuation. It would
> be user-friendly as a kind of timer. But if we made this the behavior of
> processing time timers retroactively, it would break everyone using them to
> flush data who is also reprocessing data.
> >
> > There's two very different use cases ("I need to wait, and block data"
> vs "I want to act without data, aka NOT wait for data") and I think we
> should serve both of them, but it doesn't have to be with the same
> low-level feature.
> >
> > Kenn
> >
> >
> > On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev <
> dev@beam.apache.org> wrote:
> >>
> >> On Fri, Feb 23, 2024 at 3:54 PM Robert Burke 
> wrote:
> >> >
> >> > While I'm currently on the other side of the fence, I would not be
> against changing/requiring the semantics of ProcessingTime constructs to be
> "must wait and execute" as such a solution, and enables the Proposed
> "batch" process continuation throttling mechanism to work as hypothesized
> for both "batch" and "streaming" execution.
> >> >
> >> > There's a lot to like, as it leans Beam further into the unification
> of Batch and Stream, with one fewer exception (eg. unifies timer experience
> further). It doesn't require a new primitive. It probably matches more with
> user expectations anyway.
> >> >
> >> > It does cause looping timer execution with processing time to be a
> problem for Drains however.
> >>
> >> I think we have a problem with looping timers plus drain (a mostly
> >> streaming idea anyway) regardless.
> >>
> >> > I'd argue though that in the case of a drain, we could updated the
> semantics as "move watermark to infinity"  "existing timers are executed,
> but new timers are ignored",
> >>
> >> I don't like the idea of dropping timers for drain. I think correct
> >> handling here requires user visibility into whether a pipeline is
> >> draining or not.
> >>
> >> > and ensure/and update the requirements around OnWindowExpiration
> callbacks to be a bit more insistent on being implemented for correct
> execution, which is currently the only "hard" signal to the SDK side that
> the window's work is guaranteed to be over, and remaining state needs to be
> addressed by the transform or be garbage collected. This remains critical
> for developing a good pattern for ProcessingTime timers within a Global
> Window too.
> >>
> >> +1
> >>
> >> >
> >> > On 2024/02/23 19:48:22 Robert Bradshaw via dev wrote:
> >> > > Thanks for bringing this up.
> >> > >
> >> > > My position is that both batch and streaming should wait for
> >> > > processing time timers, according to local time (with the exception
> of
> >> > > tests that can accelerate this via faked clocks).
> >> > >
> >> > > Both ProcessContinuations delays and ProcessingTimeTimers are IMHO
> >> > > isomorphic, and can be implemented in terms of each other (at least
> in
> >> > > one direction, and likely the other). Both are an indication that I
> >> > > can't act on something yet due to external constraints (e.g. not all
> >> > > the data has been published, or I lack sufficient 

Re: Throttle PTransform

2024-02-22 Thread Reuven Lax via dev
On Thu, Feb 22, 2024 at 9:26 AM Kenneth Knowles  wrote:

> Wow I love your input Reuven. Of course "the source" that you are applying
> backpressure to is often a runner's shuffle so it may be state anyhow, but
> it is good to give the runner the choice of how to figure that out and
> maybe chain backpressure further.
>

Sort of - however most (streaming) runners apply backpressure through
shuffle as well. This means that while some amount of data will accumulate
in shuffle, eventually the backpressure will push back to the source.
Caveat of course is that this is mostly true for streaming runners, not
batch runners.

>
> The goal is basically to make a sink that doesn't do its own throttling
> behave as well as one that does, so we don't DoS it, right? So that could
> be a key design goal and inspiration, which leads to what Reuven describes.
> Such sinks will throttle the DoFn by making the IO requests take longer or
> returning throttle error codes or both. So we might consider how to emulate
> that rather than buffer and timer.
>
> Tangentially, I will start a separate thread and doc about processing time
> timers is batch, which we should probably frame as "processing time timers
> when historically processing a very large amount of data as fast and
> efficiently as possible". I've had this chat with many people and even I
> constantly forget the status, conclusion, and rationale for why things are
> the way they are. It'll be good to record if not already somewhere.
>

> Kenn
>
> On Thu, Feb 22, 2024 at 2:43 AM Jan Lukavský  wrote:
>
>>
>> On 2/21/24 18:27, Reuven Lax via dev wrote:
>>
>> Agreed, that event-time throttling doesn't make sense here. In theory
>> processing-time timers have no SLA - i.e. their firing might be delayed -
>> so batch runners aren't violating the model by firing them all at the end;
>> however it does make processing time timers less useful in batch, as we see
>> here.
>>
>> Personally, I'm not sure I would use state and timers to implement this,
>> and I definitely wouldn't create this many keys. A couple of reasons for
>> this:
>>   1. If a pipeline is receiving input faster than the throttle rate, the
>> proposed technique would shift all those elements into the DoFn's state
>> which will keep growing indefinitely. Generally we would prefer to leave
>> that backlog in the source instead of copying it into DoFn state.
>>   2. In my experience with throttling, having too much parallelism is
>> problematic. The issue is that there is some error involved whenever you
>> throttle, and this error can accumulate across many shards (and when I've
>> done this sort of thing before, I found that the error was often biased in
>> one direction). If targeting 100,000 records/sec, this  approach (if I
>> understand it correctly) would create 100,000 shards and throttle them each
>> to one element/sec. I doubt this will actually result in anything close to
>> desired throttling.
>>   3. Very commonly, the request is to throttle based on bytes/sec, not
>> events/sec. Anything we build should be easily extensible to bytes/sec.
>>
>> What I would suggest (and what Beam users have often done in the past)
>> would be to bucket the PCollection into N buckets where N is generally
>> smallish (100 buckets, 1000 buckets, depending on the expected throughput);
>> runners that support autosharding (such as Dataflow) can automatically
>> choose N. Each shard then throttles its output to rate/N. Keeping N no
>> larger than necessary minimizes the error introduced into throttling.
>>
>> We also don't necessarily need state/timers here - each shard is
>> processed on a single thread, so those threads can simply throttle calls to
>> OutputReceiver.output. This way if the pipeline is exceeding the threshold,
>> backpressure will tend to simply leave excess data in the source. This also
>> is a simpler design than the proposed one.
>>
>> A more sophisticated design might combine elements of both - buffering a
>> bounded amount of data in state when the threshold is exceeded, but
>> severely limiting the state size. However I wouldn't start here - we would
>> want to build the simpler implementation first and see how it performs.
>>
>> +1
>>
>>
>> On Wed, Feb 21, 2024 at 8:53 AM Robert Bradshaw via dev <
>> dev@beam.apache.org> wrote:
>>
>>> On Wed, Feb 21, 2024 at 12:48 AM Jan Lukavský  wrote:
>>> >
>>> > Hi,
>>> >
>>> > I have left a note regarding the proposed splitting of batch and
&

Re: Throttle PTransform

2024-02-21 Thread Reuven Lax via dev
Anyway, to clarify my concern above about cumulative errors. In the past
I've seen this cause errors that were close to an order of magnitude. e.g.
the user specifies a rate limit of 100K QPS, and the throttler ends up
throttling to 15K QPS. The throttlers ended up being close to useless, as
it was nigh near impossible to figure out what value to set in order to get
the desired throttling, and the thresholds specified had very little to do
with the actual result.

On Wed, Feb 21, 2024 at 10:56 AM Reuven Lax  wrote:

> I'm wondering if we could build a global rate limiter in the transform,
> using side inputs to communicate tokens to all the throttling shards.
> However this might get more complicated, and we risk causing performance
> problems if this creates graph cycles.
>
> On Wed, Feb 21, 2024 at 10:51 AM Robert Burke  wrote:
>
>> I agree that a global rate limiter would be ideal, but either we make all
>> runners implement one as part of Beam (and the requisite SDK side hooks) or
>> we're forcing users to deploy their own solution, which they can already do.
>>
>> A good enough in-current-model solution is probably fine for many users.
>>
>> On Wed, Feb 21, 2024, 10:30 AM Reuven Lax via dev 
>> wrote:
>>
>>> Yes, that's true. The technique I proposed will work for simple
>>> pipelines in streaming (e.g. basic ETL), where the throttling threads are
>>> probably all scheduled. For more complicated pipelines (or batch
>>> pipelines), we might find that it overthrottles. Maybe a hybrid solution
>>> that uses state would work?
>>>
>>> Another option is to have a global token-based rate limiter, but this
>>> gets a bit more complicated.
>>>
>>> BTW - some sinks (especially databases) care more about maximum number
>>> of concurrent connections. It's interesting to think about this scenario as
>>> well, but I think it's a bit orthogonal to the current discussion.
>>>
>>> Reuven
>>>
>>> On Wed, Feb 21, 2024 at 9:45 AM Robert Bradshaw 
>>> wrote:
>>>
>>>> I like the idea of pushing back to the source much better than
>>>> unboundedly buffering things in state. I was trying to think of how to
>>>> just slow things down and one problem is that while we can easily
>>>> control the number of keys, it's much harder to control (or even
>>>> detect) the number of parallel threads at any given point in time (for
>>>> which keys is simply an upper bound, especially in batch).
>>>>
>>>> On Wed, Feb 21, 2024 at 9:28 AM Reuven Lax  wrote:
>>>> >
>>>> > Agreed, that event-time throttling doesn't make sense here. In theory
>>>> processing-time timers have no SLA - i.e. their firing might be delayed -
>>>> so batch runners aren't violating the model by firing them all at the end;
>>>> however it does make processing time timers less useful in batch, as we see
>>>> here.
>>>> >
>>>> > Personally, I'm not sure I would use state and timers to implement
>>>> this, and I definitely wouldn't create this many keys. A couple of reasons
>>>> for this:
>>>> >   1. If a pipeline is receiving input faster than the throttle rate,
>>>> the proposed technique would shift all those elements into the DoFn's state
>>>> which will keep growing indefinitely. Generally we would prefer to leave
>>>> that backlog in the source instead of copying it into DoFn state.
>>>> >   2. In my experience with throttling, having too much parallelism is
>>>> problematic. The issue is that there is some error involved whenever you
>>>> throttle, and this error can accumulate across many shards (and when I've
>>>> done this sort of thing before, I found that the error was often biased in
>>>> one direction). If targeting 100,000 records/sec, this  approach (if I
>>>> understand it correctly) would create 100,000 shards and throttle them each
>>>> to one element/sec. I doubt this will actually result in anything close to
>>>> desired throttling.
>>>> >   3. Very commonly, the request is to throttle based on bytes/sec,
>>>> not events/sec. Anything we build should be easily extensible to bytes/sec.
>>>> >
>>>> > What I would suggest (and what Beam users have often done in the
>>>> past) would be to bucket the PCollection into N buckets where N is
>>>> generally smallish (100 buckets, 1000 buckets, depending on

Re: Throttle PTransform

2024-02-21 Thread Reuven Lax via dev
I'm wondering if we could build a global rate limiter in the transform,
using side inputs to communicate tokens to all the throttling shards.
However this might get more complicated, and we risk causing performance
problems if this creates graph cycles.

On Wed, Feb 21, 2024 at 10:51 AM Robert Burke  wrote:

> I agree that a global rate limiter would be ideal, but either we make all
> runners implement one as part of Beam (and the requisite SDK side hooks) or
> we're forcing users to deploy their own solution, which they can already do.
>
> A good enough in-current-model solution is probably fine for many users.
>
> On Wed, Feb 21, 2024, 10:30 AM Reuven Lax via dev 
> wrote:
>
>> Yes, that's true. The technique I proposed will work for simple pipelines
>> in streaming (e.g. basic ETL), where the throttling threads are probably
>> all scheduled. For more complicated pipelines (or batch pipelines), we
>> might find that it overthrottles. Maybe a hybrid solution that uses state
>> would work?
>>
>> Another option is to have a global token-based rate limiter, but this
>> gets a bit more complicated.
>>
>> BTW - some sinks (especially databases) care more about maximum number of
>> concurrent connections. It's interesting to think about this scenario as
>> well, but I think it's a bit orthogonal to the current discussion.
>>
>> Reuven
>>
>> On Wed, Feb 21, 2024 at 9:45 AM Robert Bradshaw 
>> wrote:
>>
>>> I like the idea of pushing back to the source much better than
>>> unboundedly buffering things in state. I was trying to think of how to
>>> just slow things down and one problem is that while we can easily
>>> control the number of keys, it's much harder to control (or even
>>> detect) the number of parallel threads at any given point in time (for
>>> which keys is simply an upper bound, especially in batch).
>>>
>>> On Wed, Feb 21, 2024 at 9:28 AM Reuven Lax  wrote:
>>> >
>>> > Agreed, that event-time throttling doesn't make sense here. In theory
>>> processing-time timers have no SLA - i.e. their firing might be delayed -
>>> so batch runners aren't violating the model by firing them all at the end;
>>> however it does make processing time timers less useful in batch, as we see
>>> here.
>>> >
>>> > Personally, I'm not sure I would use state and timers to implement
>>> this, and I definitely wouldn't create this many keys. A couple of reasons
>>> for this:
>>> >   1. If a pipeline is receiving input faster than the throttle rate,
>>> the proposed technique would shift all those elements into the DoFn's state
>>> which will keep growing indefinitely. Generally we would prefer to leave
>>> that backlog in the source instead of copying it into DoFn state.
>>> >   2. In my experience with throttling, having too much parallelism is
>>> problematic. The issue is that there is some error involved whenever you
>>> throttle, and this error can accumulate across many shards (and when I've
>>> done this sort of thing before, I found that the error was often biased in
>>> one direction). If targeting 100,000 records/sec, this  approach (if I
>>> understand it correctly) would create 100,000 shards and throttle them each
>>> to one element/sec. I doubt this will actually result in anything close to
>>> desired throttling.
>>> >   3. Very commonly, the request is to throttle based on bytes/sec, not
>>> events/sec. Anything we build should be easily extensible to bytes/sec.
>>> >
>>> > What I would suggest (and what Beam users have often done in the past)
>>> would be to bucket the PCollection into N buckets where N is generally
>>> smallish (100 buckets, 1000 buckets, depending on the expected throughput);
>>> runners that support autosharding (such as Dataflow) can automatically
>>> choose N. Each shard then throttles its output to rate/N. Keeping N no
>>> larger than necessary minimizes the error introduced into throttling.
>>> >
>>> > We also don't necessarily need state/timers here - each shard is
>>> processed on a single thread, so those threads can simply throttle calls to
>>> OutputReceiver.output. This way if the pipeline is exceeding the threshold,
>>> backpressure will tend to simply leave excess data in the source. This also
>>> is a simpler design than the proposed one.
>>> >
>>> > A more sophisticated design might combine elements of both 

Re: Pipeline upgrade to 2.55.0-SNAPSHOT broken for FlinkRunner

2024-02-21 Thread Reuven Lax via dev
Is there a fundamental reason we serialize java classes into Flink
savepoints.

On Wed, Feb 21, 2024 at 9:51 AM Robert Bradshaw via dev 
wrote:

> We could consider merging the gradle targets without renaming the
> classpaths as an intermediate step.
>
> Optimistically, perhaps there's a small number of classes that we need
> to preserve (e.g. SerializablePipelineOptions looks like it was
> something specifically intended to be serialized; maybe that an a
> handful of others (that implement Serializable) could be left in their
> original packages for backwards compatibility reasons?
>
> On Wed, Feb 21, 2024 at 7:32 AM Jan Lukavský  wrote:
> >
> > Hi,
> >
> > while implementing FlinkRunner for Flink 1.17 I tried to verify that a
> > running Pipeline is able to successfully upgrade from Flink 1.16 to
> > Flink 1.17. There is some change regarding serialization needed for
> > Flink 1.17, so this was a concern. Unfortunately recently we merged
> > core-construction-java into SDK, which resulted in some classes being
> > repackaged. Unfortunately, we serialize some classes into Flink's
> > check/savepoints. The renaming of the class therefore ends with the
> > following exception trying to restore from the savepoint:
> >
> > Caused by: java.lang.ClassNotFoundException:
> > org.apache.beam.runners.core.construction.SerializablePipelineOptions
> >  at java.base/java.net
> .URLClassLoader.findClass(URLClassLoader.java:476)
> >  at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
> >  at
> >
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67)
> >  at
> >
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
> >  at
> >
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)
> >  at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
> >  at
> >
> org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:192)
> >  at java.base/java.lang.Class.forName0(Native Method)
> >  at java.base/java.lang.Class.forName(Class.java:398)
> >  at
> >
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
> >  at
> >
> org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:251)
> >
> >
> > This means that no Pipeline will be able to successfully upgrade from
> > version prior to 2.55.0 to 2.55.0 (i.e. all Pipelines will have to be
> > restarted from scratch). I wanted to know how the community would feel
> > about that, this consequence probably was not clear when we merged the
> > artifacts. The only option would be to revert the merge and then try to
> > figure out how to avoid Java serialization in Flink's savepoints. That
> > would definitely be costly in terms of implementation and even more to
> > provide ways to transfer old savepoints to the new format (can be
> > possible using state processor API). I'm aware that Beam provides no
> > general guarantees about the upgrade compatibility, so it might be fine
> > to just ignore this, I just wanted to shout this out loud so that we can
> > make a deliberate decision.
> >
> > Best,
> >
> >   Jan
> >
>


Re: Throttle PTransform

2024-02-21 Thread Reuven Lax via dev
Yes, that's true. The technique I proposed will work for simple pipelines
in streaming (e.g. basic ETL), where the throttling threads are probably
all scheduled. For more complicated pipelines (or batch pipelines), we
might find that it overthrottles. Maybe a hybrid solution that uses state
would work?

Another option is to have a global token-based rate limiter, but this gets
a bit more complicated.

BTW - some sinks (especially databases) care more about maximum number of
concurrent connections. It's interesting to think about this scenario as
well, but I think it's a bit orthogonal to the current discussion.

Reuven

On Wed, Feb 21, 2024 at 9:45 AM Robert Bradshaw  wrote:

> I like the idea of pushing back to the source much better than
> unboundedly buffering things in state. I was trying to think of how to
> just slow things down and one problem is that while we can easily
> control the number of keys, it's much harder to control (or even
> detect) the number of parallel threads at any given point in time (for
> which keys is simply an upper bound, especially in batch).
>
> On Wed, Feb 21, 2024 at 9:28 AM Reuven Lax  wrote:
> >
> > Agreed, that event-time throttling doesn't make sense here. In theory
> processing-time timers have no SLA - i.e. their firing might be delayed -
> so batch runners aren't violating the model by firing them all at the end;
> however it does make processing time timers less useful in batch, as we see
> here.
> >
> > Personally, I'm not sure I would use state and timers to implement this,
> and I definitely wouldn't create this many keys. A couple of reasons for
> this:
> >   1. If a pipeline is receiving input faster than the throttle rate, the
> proposed technique would shift all those elements into the DoFn's state
> which will keep growing indefinitely. Generally we would prefer to leave
> that backlog in the source instead of copying it into DoFn state.
> >   2. In my experience with throttling, having too much parallelism is
> problematic. The issue is that there is some error involved whenever you
> throttle, and this error can accumulate across many shards (and when I've
> done this sort of thing before, I found that the error was often biased in
> one direction). If targeting 100,000 records/sec, this  approach (if I
> understand it correctly) would create 100,000 shards and throttle them each
> to one element/sec. I doubt this will actually result in anything close to
> desired throttling.
> >   3. Very commonly, the request is to throttle based on bytes/sec, not
> events/sec. Anything we build should be easily extensible to bytes/sec.
> >
> > What I would suggest (and what Beam users have often done in the past)
> would be to bucket the PCollection into N buckets where N is generally
> smallish (100 buckets, 1000 buckets, depending on the expected throughput);
> runners that support autosharding (such as Dataflow) can automatically
> choose N. Each shard then throttles its output to rate/N. Keeping N no
> larger than necessary minimizes the error introduced into throttling.
> >
> > We also don't necessarily need state/timers here - each shard is
> processed on a single thread, so those threads can simply throttle calls to
> OutputReceiver.output. This way if the pipeline is exceeding the threshold,
> backpressure will tend to simply leave excess data in the source. This also
> is a simpler design than the proposed one.
> >
> > A more sophisticated design might combine elements of both - buffering a
> bounded amount of data in state when the threshold is exceeded, but
> severely limiting the state size. However I wouldn't start here - we would
> want to build the simpler implementation first and see how it performs.
> >
> > On Wed, Feb 21, 2024 at 8:53 AM Robert Bradshaw via dev <
> dev@beam.apache.org> wrote:
> >>
> >> On Wed, Feb 21, 2024 at 12:48 AM Jan Lukavský  wrote:
> >> >
> >> > Hi,
> >> >
> >> > I have left a note regarding the proposed splitting of batch and
> >> > streaming expansion of this transform. In general, a need for such
> split
> >> > triggers doubts in me. This signals that either
> >> >
> >> >   a) the transform does something is should not, or
> >> >
> >> >   b) Beam model is not complete in terms of being "unified"
> >> >
> >> > The problem that is described in the document is that in the batch
> case
> >> > timers are not fired appropriately.
> >>
> >> +1. The underlying flaw is that processing time timers are not handled
> >> correctly in batch, but should be (even if it means keeping workers
> >> idle?). We should fix this.
> >>
> >> > This is actually on of the
> >> > motivations that led to introduction of @RequiresTimeSortedInput
> >> > annotation and, though mentioned years ago as a question, I do not
> >> > remember what arguments were used against enforcing sorting inputs by
> >> > timestamp in the batch stateful DoFn as a requirement in the model.
> That
> >> > would enable the appropriate firing of timers while preserving the
> ba

Re: Throttle PTransform

2024-02-21 Thread Reuven Lax via dev
Agreed, that event-time throttling doesn't make sense here. In theory
processing-time timers have no SLA - i.e. their firing might be delayed -
so batch runners aren't violating the model by firing them all at the end;
however it does make processing time timers less useful in batch, as we see
here.

Personally, I'm not sure I would use state and timers to implement this,
and I definitely wouldn't create this many keys. A couple of reasons for
this:
  1. If a pipeline is receiving input faster than the throttle rate, the
proposed technique would shift all those elements into the DoFn's state
which will keep growing indefinitely. Generally we would prefer to leave
that backlog in the source instead of copying it into DoFn state.
  2. In my experience with throttling, having too much parallelism is
problematic. The issue is that there is some error involved whenever you
throttle, and this error can accumulate across many shards (and when I've
done this sort of thing before, I found that the error was often biased in
one direction). If targeting 100,000 records/sec, this  approach (if I
understand it correctly) would create 100,000 shards and throttle them each
to one element/sec. I doubt this will actually result in anything close to
desired throttling.
  3. Very commonly, the request is to throttle based on bytes/sec, not
events/sec. Anything we build should be easily extensible to bytes/sec.

What I would suggest (and what Beam users have often done in the past)
would be to bucket the PCollection into N buckets where N is generally
smallish (100 buckets, 1000 buckets, depending on the expected throughput);
runners that support autosharding (such as Dataflow) can automatically
choose N. Each shard then throttles its output to rate/N. Keeping N no
larger than necessary minimizes the error introduced into throttling.

We also don't necessarily need state/timers here - each shard is processed
on a single thread, so those threads can simply throttle calls to
OutputReceiver.output. This way if the pipeline is exceeding the threshold,
backpressure will tend to simply leave excess data in the source. This also
is a simpler design than the proposed one.

A more sophisticated design might combine elements of both - buffering a
bounded amount of data in state when the threshold is exceeded, but
severely limiting the state size. However I wouldn't start here - we would
want to build the simpler implementation first and see how it performs.

On Wed, Feb 21, 2024 at 8:53 AM Robert Bradshaw via dev 
wrote:

> On Wed, Feb 21, 2024 at 12:48 AM Jan Lukavský  wrote:
> >
> > Hi,
> >
> > I have left a note regarding the proposed splitting of batch and
> > streaming expansion of this transform. In general, a need for such split
> > triggers doubts in me. This signals that either
> >
> >   a) the transform does something is should not, or
> >
> >   b) Beam model is not complete in terms of being "unified"
> >
> > The problem that is described in the document is that in the batch case
> > timers are not fired appropriately.
>
> +1. The underlying flaw is that processing time timers are not handled
> correctly in batch, but should be (even if it means keeping workers
> idle?). We should fix this.
>
> > This is actually on of the
> > motivations that led to introduction of @RequiresTimeSortedInput
> > annotation and, though mentioned years ago as a question, I do not
> > remember what arguments were used against enforcing sorting inputs by
> > timestamp in the batch stateful DoFn as a requirement in the model. That
> > would enable the appropriate firing of timers while preserving the batch
> > invariant which is there are no late data allowed. IIRC there are
> > runners that do this sorting by default (at least the sorting, not sure
> > about the timers, but once inputs are sorted, firing timers is simple).
> >
> > A different question is if this particular transform should maybe fire
> > not by event time, but rather processing time?
>
> Yeah, I was reading all of these as processing time. Throttling by
> event time doesn't make much sense.
>
> > On 2/21/24 03:00, Robert Burke wrote:
> > > Thanks for the design Damon! And thanks for collaborating with me on
> getting a high level textual description of the key implementation idea
> down in writing. I think the solution is pretty elegant.
> > >
> > > I do have concerns about how different Runners might handle
> ProcessContinuations for the Bounded Input case. I know Dataflow famously
> has two different execution modes under the hood, but I agree with the
> principle that ProcessContinuation.Resume should largely be in line with
> the expected delay, though it's by no means guaranteed AFAIK.
> > >
> > > We should also ensure this is linked from
> https://s.apache.org/beam-design-docs if not already.
> > >
> > > Robert Burke
> > > Beam Go Busybody
> > >
> > > On 2024/02/20 14:00:00 Damon Douglas wrote:
> > >> Hello Everyone,
> > >>
> > >> The following describes a Throttle PTransfo

Re: Row compatible generated coders for custom classes

2023-12-02 Thread Reuven Lax via dev
Out of curiosity, did you add a warmup time before benchmarking? Schema and
row coder does codegen, so the first usage is very slow, but subsequent
usages should be much faster. I recommend running any test for a warmup
period before starting to measure.

On Fri, Dec 1, 2023, 9:13 AM Steven van Rossum via dev 
wrote:

> Hi all,
>
> I was benchmarking the fastjson2 serialization library a few weeks back
> for a Java pipeline I was working on and was asked by a colleague to
> benchmark binary JSON serialization against Rows for fun. We didn't do any
> extensive analysis across different shapes and sizes, but the finding on
> this workload was that serialization to binary JSON (tuple representation)
> outperformed the SchemaCoder on throughput by ~11x on serialization and ~5x
> on deserialization. Additionally, RowCoder outperformed SchemaCoder on
> throughput by ~1.3x on serialization and ~1.7x on deserialization. Note
> that all benchmarks measured in the millions of ops/sec for this quick
> test, so this is already excellent performance obviously.
>
> I'm sure there's stuff to learn from other serialization libraries, but
> I'd table that for now. The low hanging fruit improvement would be to skip
> that intermediate hop to/from Row and instead generate custom SchemaCoders
> to serialize directly into or deserialize from the Row format.
> I'd be happy to pick this up at some point in the new year, but would just
> like to get some thoughts from this group.
>
> Regards,
>
> Steve
>


Re: [YAML] Aggregations

2023-10-19 Thread Reuven Lax via dev
Is the schema Group transform (in Java) something along these lines?

On Wed, Oct 18, 2023 at 1:11 PM Robert Bradshaw via dev 
wrote:

> Beam Yaml has good support for IOs and mappings, but one key missing
> feature for even writing a WordCount is the ability to do Aggregations
> [1]. While the traditional Beam primitive is GroupByKey (and
> CombineValues), we're eschewing KVs in the notion of more schema'd
> data (which has some precedence in our other languages, see the links
> below). The key components the user needs to specify are (1) the key
> fields on which the grouping will take place, (2) the fields
> (expressions?) involved in the aggregation, and (3) what aggregating
> fn to use.
>
> A straw-man example could be something like
>
> type: Aggregating
> config:
>   key: [field1, field2]
>   aggregating:
> total_cost:
>   fn: sum
>   value: cost
> max_cost:
>   fn: max
>   value: cost
>
> This would basically correspond to the SQL expression
>
> "SELECT field1, field2, sum(cost) as total_cost, max(cost) as max_cost
> from table GROUP BY field1, field2"
>
> (though I'm not requiring that we use this as an implementation
> strategy). I do not think we need a separate (non aggregating)
> Grouping operation, this can be accomplished by having a concat-style
> combiner.
>
> There are still some open questions here, notably around how to
> specify the aggregation fns themselves. We could of course provide a
> number of built-ins (like SQL does). This gets into the question of
> how and where to document this complete set, but some basics should
> take us pretty far. Many aggregators, however, are parameterized (e.g.
> quantiles); where do we put the parameters? We could go with something
> like
>
> fn:
>   type: ApproximateQuantiles
>   config:
> n: 10
>
> but others are even configured by functions themselves (e.g. LargestN
> that wants a comparator Fn). Maybe we decide not to support these
> (yet?)
>
> One thing I think we should support, however, is referencing custom
> CombineFns. We have some precedent for this with our Fns from
> MapToFields, where we accept things like inline lambdas and external
> references. Again the topic of how to configure them comes up, as
> these custom Fns are more likely to be parameterized than Map Fns
> (though, to be clear, perhaps it'd be good to allow parameterizatin of
> MapFns as well). Maybe we allow
>
> language: python. # like MapToFields (and here it'd be harder to mix
> and match per Fn)
> fn:
>   type: ???
>   # should these be nested as config?
>   name: fully.qualiied.name
>   path: /path/to/defining/file
>   args: [...]
>   kwargs: {...}
>
> which would invoke the constructor.
>
> I'm also open to other ways of naming/structuring these essential
> parameters if it makes things more clear.
>
> - Robert
>
>
> Java:
> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/schemas/transforms/Group.html
> Python:
> https://beam.apache.org/documentation/transforms/python/aggregation/groupby
> Typescript:
> https://beam.apache.org/releases/typedoc/current/classes/transforms_group_and_combine.GroupBy.html
>
> [1] One can of course use SqlTransform for this, but I'm leaning
> towards offering something more native.
>


Re: [YAML] Aggregations

2023-10-19 Thread Reuven Lax via dev
Or are you specifically referring to the declarative YAML pipelines?

On Thu, Oct 19, 2023 at 12:53 PM Reuven Lax  wrote:

> Is the schema Group transform (in Java) something along these lines?
>
> On Wed, Oct 18, 2023 at 1:11 PM Robert Bradshaw via dev <
> dev@beam.apache.org> wrote:
>
>> Beam Yaml has good support for IOs and mappings, but one key missing
>> feature for even writing a WordCount is the ability to do Aggregations
>> [1]. While the traditional Beam primitive is GroupByKey (and
>> CombineValues), we're eschewing KVs in the notion of more schema'd
>> data (which has some precedence in our other languages, see the links
>> below). The key components the user needs to specify are (1) the key
>> fields on which the grouping will take place, (2) the fields
>> (expressions?) involved in the aggregation, and (3) what aggregating
>> fn to use.
>>
>> A straw-man example could be something like
>>
>> type: Aggregating
>> config:
>>   key: [field1, field2]
>>   aggregating:
>> total_cost:
>>   fn: sum
>>   value: cost
>> max_cost:
>>   fn: max
>>   value: cost
>>
>> This would basically correspond to the SQL expression
>>
>> "SELECT field1, field2, sum(cost) as total_cost, max(cost) as max_cost
>> from table GROUP BY field1, field2"
>>
>> (though I'm not requiring that we use this as an implementation
>> strategy). I do not think we need a separate (non aggregating)
>> Grouping operation, this can be accomplished by having a concat-style
>> combiner.
>>
>> There are still some open questions here, notably around how to
>> specify the aggregation fns themselves. We could of course provide a
>> number of built-ins (like SQL does). This gets into the question of
>> how and where to document this complete set, but some basics should
>> take us pretty far. Many aggregators, however, are parameterized (e.g.
>> quantiles); where do we put the parameters? We could go with something
>> like
>>
>> fn:
>>   type: ApproximateQuantiles
>>   config:
>> n: 10
>>
>> but others are even configured by functions themselves (e.g. LargestN
>> that wants a comparator Fn). Maybe we decide not to support these
>> (yet?)
>>
>> One thing I think we should support, however, is referencing custom
>> CombineFns. We have some precedent for this with our Fns from
>> MapToFields, where we accept things like inline lambdas and external
>> references. Again the topic of how to configure them comes up, as
>> these custom Fns are more likely to be parameterized than Map Fns
>> (though, to be clear, perhaps it'd be good to allow parameterizatin of
>> MapFns as well). Maybe we allow
>>
>> language: python. # like MapToFields (and here it'd be harder to mix
>> and match per Fn)
>> fn:
>>   type: ???
>>   # should these be nested as config?
>>   name: fully.qualiied.name
>>   path: /path/to/defining/file
>>   args: [...]
>>   kwargs: {...}
>>
>> which would invoke the constructor.
>>
>> I'm also open to other ways of naming/structuring these essential
>> parameters if it makes things more clear.
>>
>> - Robert
>>
>>
>> Java:
>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/schemas/transforms/Group.html
>> Python:
>> https://beam.apache.org/documentation/transforms/python/aggregation/groupby
>> Typescript:
>> https://beam.apache.org/releases/typedoc/current/classes/transforms_group_and_combine.GroupBy.html
>>
>> [1] One can of course use SqlTransform for this, but I'm leaning
>> towards offering something more native.
>>
>


Re: [YAML] Fileio sink parameterization (streaming, sharding, and naming)

2023-10-10 Thread Reuven Lax via dev
I suspect some simple pattern templating would solve most use cases. We
probably would want to support timestamp formatting (e.g. $ $M $D) as
well.

On Tue, Oct 10, 2023 at 3:35 PM Robert Bradshaw  wrote:

> On Mon, Oct 9, 2023 at 3:09 PM Chamikara Jayalath 
> wrote:
>
>> I would say:
>>
>> sink:
>>   type: WriteToParquet
>>   config:
>> path: /beam/filesytem/dest
>> prefix: 
>> suffix: 
>>
>> Underlying SDK will add the middle part of the file names to make sure
>> that files generated by various bundles/windows/shards do not conflict.
>>
>
> What's the relationship between path and prefix? Is path the
> directory part of the full path, or does prefix precede it?
>
>
>> This will satisfy the vast majority of use-cases I believe. Fully
>> customizing the file pattern sounds like a more advanced use case that can
>> be left for "real" SDKs.
>>
>
> Yea, we don't have to do everything.
>
>
>> For dynamic destinations, I think just making the "path" component
>> support  a lambda that is parameterized by the input should be adequate
>> since this allows customers to direct files written to different
>> destination directories.
>>
>> sink:
>>   type: WriteToParquet
>>   config:
>> path: 
>> prefix: 
>> suffix: 
>>
>> I'm not sure what would be the best way to specify a lambda here though.
>> Maybe a regex or the name of a Python callable ?
>>
>
> I'd rather not require Python for a pure Java pipeline, but some kind of a
> pattern template may be sufficient here.
>
>
>> On Mon, Oct 9, 2023 at 2:06 PM Robert Bradshaw via dev <
>> dev@beam.apache.org> wrote:
>>
>>> .On Mon, Oct 9, 2023 at 1:49 PM Reuven Lax  wrote:
>>>
 Just FYI - the reason why names (including prefixes) in
 DynamicDestinations were parameterized via a lambda instead of just having
 the user add it via MapElements is performance. We discussed something
 along the lines of what you are suggesting (essentially having the user
 create a KV where the key contained the dynamic information). The problem
 was that often the size of the generated filepath was often much larger
 (sometimes by 2 OOM) than the information in the record, and there was a
 desire to avoid record blowup. e.g. the record might contain a single
 integer userid, and the filepath prefix would then be
 /long/path/to/output/users/. This was especially bad in cases where the
 data had to be shuffled, and the existing dynamic destinations method
 allowed extracting the filepath only _after_  the shuffle.

>>>
>>> That is a consideration I hadn't thought much of, thanks for
>>> bringing this up.
>>>
>>>
 Now there may not be any good way to keep this benefit in a
 declarative approach such as YAML (or at least a good easy way - we could
 always allow the user to pass in a SQL expression to extract the filename
 from the record!), but we should keep in mind that this might mean that
 YAML-generated pipelines will be less efficient for certain use cases.

>>>
>>> Yep, it's not as straightforward to do in a declarative way. I would
>>> like to avoid mixing UDFs (with their associated languages and execution
>>> environments) if possible. Though I'd like the performance of a
>>> "straightforward" YAML pipeline to be that which one can get writing
>>> straight-line Java (and possibly better, if we can leverage the structure
>>> of schemas everywhere) this is not an absolute requirement for all
>>> features.
>>>
>>> I wonder if separating out a constant prefix vs. the dynamic stuff could
>>> be sufficient to mitigate the blow-up of pre-computing this in most cases
>>> (especially in the context of a larger pipeline). Alternatively, rather
>>> than just a sharding pattern, one could have a full filepattern that
>>> includes format parameters for dynamically computed bits as well as the
>>> shard number, windowing info, etc. (There are pros and cons to this.)
>>>
>>>
 On Mon, Oct 9, 2023 at 12:37 PM Robert Bradshaw via dev <
 dev@beam.apache.org> wrote:

> Currently the various file writing configurations take a single
> parameter, path, which indicates where the (sharded) output should be
> placed. In other words, one can write something like
>
>   pipeline:
> ...
> sink:
>   type: WriteToParquet
>   config:
> path: /beam/filesytem/dest
>
> and one gets files like "/beam/filesystem/dest-X-of-N"
>
> Of course, in practice file writing is often much more complicated
> than this (especially when it comes to Streaming). For reference, I've
> included links to our existing offerings in the various SDKs below. I'd
> like to start a discussion about what else should go in the "config"
> parameter and how it should be expressed in YAML.
>
> The primary concern is around naming. This can generally be split into
> (1) the prefix, which 

Re: [YAML] Fileio sink parameterization (streaming, sharding, and naming)

2023-10-09 Thread Reuven Lax via dev
Just FYI - the reason why names (including prefixes) in DynamicDestinations
were parameterized via a lambda instead of just having the user add it via
MapElements is performance. We discussed something along the lines of what
you are suggesting (essentially having the user create a KV where the key
contained the dynamic information). The problem was that often the size of
the generated filepath was often much larger (sometimes by 2 OOM) than the
information in the record, and there was a desire to avoid record blowup.
e.g. the record might contain a single integer userid, and the filepath
prefix would then be /long/path/to/output/users/. This was especially
bad in cases where the data had to be shuffled, and the existing dynamic
destinations method allowed extracting the filepath only _after_  the
shuffle.

Now there may not be any good way to keep this benefit in a
declarative approach such as YAML (or at least a good easy way - we could
always allow the user to pass in a SQL expression to extract the filename
from the record!), but we should keep in mind that this might mean that
YAML-generated pipelines will be less efficient for certain use cases.

On Mon, Oct 9, 2023 at 12:37 PM Robert Bradshaw via dev 
wrote:

> Currently the various file writing configurations take a single parameter,
> path, which indicates where the (sharded) output should be placed. In other
> words, one can write something like
>
>   pipeline:
> ...
> sink:
>   type: WriteToParquet
>   config:
> path: /beam/filesytem/dest
>
> and one gets files like "/beam/filesystem/dest-X-of-N"
>
> Of course, in practice file writing is often much more complicated than
> this (especially when it comes to Streaming). For reference, I've included
> links to our existing offerings in the various SDKs below. I'd like to
> start a discussion about what else should go in the "config" parameter and
> how it should be expressed in YAML.
>
> The primary concern is around naming. This can generally be split into (1)
> the prefix, which must be provided by the users (2) the sharing
> information, includes both shard counts (e.g. (the -X-of-N suffix) but also
> windowing information (for streaming pipelines) which we may want to allow
> the user to customize the formatting of, and (3) a suffix like .json or
> .avro that is useful for both humans and tooling and can often be inferred
> but should allow customization as well.
>
> An interesting case is that of dynamic destinations, where the prefix (or
> other parameters) may themselves be functions of the records themselves. (I
> am excluding the case where the format itself is variable--such cases are
> probably better handled by explicitly partitioning the data and doing
> multiple writes, as this introduces significant complexities and the set of
> possible formats is generally finite and known ahead of time.) I propose
> that we leverage the fact that we have structured data to be able to pull
> out these dynamic parameters. For example, if we have an input data set
> with a string column my_col we could allow something like
>
>   config:
> path: {dynamic: my_col}
>
> which would pull this information out at runtime. (With the MapToFields
> transform, it is very easy to compute/append additional fields to existing
> records.) Generally this field would then be stripped from the written
> data, which would only see the subset of non-dynamically referenced columns
> (though this could be configurable: we could add an attribute like
> {dynamic: my_col, Keep: true} or require the set of columns to be actually
> written (or elided) to be enumerated in the config or allow/require the
> actual data to be written to be in a designated field of the "full" input
> records as arranged by a preceding transform). It'd be great to get
> input/impressions from a wide range of people here on what would be the
> most natural. Often just writing out snippets of various alternatives can
> be quite informative (though I'm avoiding putting them here for the moment
> to avoid biasing ideas right off the bat).
>
> For streaming pipelines it is often essential to write data out in a
> time-partitioned manner. The typical way to do this is to add the windowing
> information into the shard specification itself, and a (set of) file(s) is
> written on each window closing. Beam YAML already supports any transform
> being given a "windowing" configuration which will cause a WindowInto
> transform to be applied to its input(s) before application which can sit
> naturally on a sink. We may want to consider if non-windowed writes make
> sense as well (though how this interacts with the watermark and underlying
> implementations are a large open question, so this is a larger change that
> might make sense to defer).
>
> Note that I am explicitly excluding "coders" here. All data in YAML should
> be schema'd, and writers should know how to write this structured data. We
> may want to allow a "schema" field to

Re: Runner Bundling Strategies

2023-09-27 Thread Reuven Lax via dev
Using Setup would cause data loss in this case. A runner can always retry a
bundle, and I don't believe Setup is called again in this case. If the user
initiated the hashmap in setup, this would cause records to be completely
lost whenever bundles retry.

On Wed, Sep 27, 2023 at 11:20 AM Jan Lukavský  wrote:

> What is the reason to rely on StartBundle and not Setup in this case? If
> the life-cycle of bundle is not "closed" (i.e. start - finish), then it
> seems to be ill defined and Setup should do?
> I'm trying to think of non-caching use-cases of StartBundle-FinishBundle,
> are there such cases? I'd say yes, but I'm a little struggling finding a
> specific example that cannot be solved using Setup or lazy init.
> On 9/27/23 19:58, Reuven Lax via dev wrote:
>
> DoFns are allowed to be non deterministic, so they don't have to yield the
> "same" output.
>
> The example I'm thinking of is where users perform some "best-effort"
> deduplication by creating a hashmap in StartBundle and removing duplicates.
> This is usually done purely for performance to reduce shuffle size, as
> opposed to a guaranteed RemoveDuplicates. This scenario doesn't require
> FinishBundle, though it does require a StartBundle.
>
> On Tue, Sep 26, 2023 at 11:59 AM Kenneth Knowles  wrote:
>
>>
>>
>> On Tue, Sep 26, 2023 at 1:33 PM Reuven Lax via dev 
>> wrote:
>>
>>> Yes, not including FinishBundle in ParDoPayload seems like a mistake.
>>> Though absence of FinishBundle doesn't mean that one can assume that
>>> elements in a bundle don't affect subsequent bundle elements (i.e. there
>>> might still be caching!)
>>>
>>
>> Well for a DoFn to be correct, it has to yield the same (or "the same as
>> much as the user expects it to be the same") output regardless of order of
>> processing or bundling so a runner or SDK harness can definitely take a
>> bunch of elements and process them however it wants if there's
>> no @FinishBundle. I think that's what Jan is getting at - adding
>> a @FinishBundle is the user placing a new restriction on the runner.
>> Technically probably have to include @StartBundle in that consideration.
>>
>> Kenn
>>
>>
>>>
>>> On Tue, Sep 26, 2023 at 8:54 AM Kenneth Knowles  wrote:
>>>
>>>>
>>>>
>>>> On Mon, Sep 25, 2023 at 1:19 PM Jan Lukavský  wrote:
>>>>
>>>>> Hi Kenn and Reuven,
>>>>>
>>>>> I agree with all these points. The only issue here seems to be that
>>>>> FlinkRunner does not fulfill these constraints. This is a bug that can be
>>>>> fixed, though we need to change some defaults, as 1000 ms default bundle
>>>>> "duration" for lower traffic Pipelines can be too much. We are also
>>>>> probably missing some @ValidatesReunner tests for this. I created [1] and
>>>>> [2] to track this.
>>>>>
>>>>> One question still remains, the bundle vs. element life-cycle is
>>>>> relevant only for cases where processing of element X can affect 
>>>>> processing
>>>>> of element Y later in the same bundle. Once this influence is rules out
>>>>> (i.e. no caching), this information can result in runner optimization that
>>>>> yields better performance. Should we consider propagate this information
>>>>> from user code to the runner?
>>>>>
>>>> Yes!
>>>>
>>>> This was the explicit goal of the move to annotation-driven DoFn in
>>>> https://s.apache.org/a-new-dofn to make it so that the SDK and runner
>>>> can get good information about what the DoFn requirements are.
>>>>
>>>> When there is no @FinishBundle method, the runner can make additional
>>>> optimizations. This should have been included in the ParDoPayload in the
>>>> proto when we moved to portable pipelines. I cannot remember if there was a
>>>> good reason that we did not do so. Maybe we (incorrectly) thought that this
>>>> was an issue that only the Java SDK harness needed to know about.
>>>>
>>>> Kenn
>>>>
>>>>
>>>>> [1] https://github.com/apache/beam/issues/28649
>>>>>
>>>>> [2] https://github.com/apache/beam/issues/28650
>>>>> On 9/25/23 18:31, Reuven Lax via dev wrote:
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Sep 25, 2023 at 6:19 AM Jan Lukavský  

Re: Runner Bundling Strategies

2023-09-27 Thread Reuven Lax via dev
DoFns are allowed to be non deterministic, so they don't have to yield the
"same" output.

The example I'm thinking of is where users perform some "best-effort"
deduplication by creating a hashmap in StartBundle and removing duplicates.
This is usually done purely for performance to reduce shuffle size, as
opposed to a guaranteed RemoveDuplicates. This scenario doesn't require
FinishBundle, though it does require a StartBundle.

On Tue, Sep 26, 2023 at 11:59 AM Kenneth Knowles  wrote:

>
>
> On Tue, Sep 26, 2023 at 1:33 PM Reuven Lax via dev 
> wrote:
>
>> Yes, not including FinishBundle in ParDoPayload seems like a mistake.
>> Though absence of FinishBundle doesn't mean that one can assume that
>> elements in a bundle don't affect subsequent bundle elements (i.e. there
>> might still be caching!)
>>
>
> Well for a DoFn to be correct, it has to yield the same (or "the same as
> much as the user expects it to be the same") output regardless of order of
> processing or bundling so a runner or SDK harness can definitely take a
> bunch of elements and process them however it wants if there's
> no @FinishBundle. I think that's what Jan is getting at - adding
> a @FinishBundle is the user placing a new restriction on the runner.
> Technically probably have to include @StartBundle in that consideration.
>
> Kenn
>
>
>>
>> On Tue, Sep 26, 2023 at 8:54 AM Kenneth Knowles  wrote:
>>
>>>
>>>
>>> On Mon, Sep 25, 2023 at 1:19 PM Jan Lukavský  wrote:
>>>
>>>> Hi Kenn and Reuven,
>>>>
>>>> I agree with all these points. The only issue here seems to be that
>>>> FlinkRunner does not fulfill these constraints. This is a bug that can be
>>>> fixed, though we need to change some defaults, as 1000 ms default bundle
>>>> "duration" for lower traffic Pipelines can be too much. We are also
>>>> probably missing some @ValidatesReunner tests for this. I created [1] and
>>>> [2] to track this.
>>>>
>>>> One question still remains, the bundle vs. element life-cycle is
>>>> relevant only for cases where processing of element X can affect processing
>>>> of element Y later in the same bundle. Once this influence is rules out
>>>> (i.e. no caching), this information can result in runner optimization that
>>>> yields better performance. Should we consider propagate this information
>>>> from user code to the runner?
>>>>
>>> Yes!
>>>
>>> This was the explicit goal of the move to annotation-driven DoFn in
>>> https://s.apache.org/a-new-dofn to make it so that the SDK and runner
>>> can get good information about what the DoFn requirements are.
>>>
>>> When there is no @FinishBundle method, the runner can make additional
>>> optimizations. This should have been included in the ParDoPayload in the
>>> proto when we moved to portable pipelines. I cannot remember if there was a
>>> good reason that we did not do so. Maybe we (incorrectly) thought that this
>>> was an issue that only the Java SDK harness needed to know about.
>>>
>>> Kenn
>>>
>>>
>>>> [1] https://github.com/apache/beam/issues/28649
>>>>
>>>> [2] https://github.com/apache/beam/issues/28650
>>>> On 9/25/23 18:31, Reuven Lax via dev wrote:
>>>>
>>>>
>>>>
>>>> On Mon, Sep 25, 2023 at 6:19 AM Jan Lukavský  wrote:
>>>>
>>>>>
>>>>> On 9/23/23 18:16, Reuven Lax via dev wrote:
>>>>>
>>>>> Two separate things here:
>>>>>
>>>>> 1. Yes, a watermark can update in the middle of a bundle.
>>>>> 2. The records in the bundle themselves will prevent the watermark
>>>>> from updating as they are still in flight until after finish bundle.
>>>>> Therefore simply caching the records should always be watermark safe,
>>>>> regardless of the runner. You will only run into problems if you try and
>>>>> move timestamps "backwards" - which is why Beam strongly discourages this.
>>>>>
>>>>> This is not aligned with  FlinkRunner's implementation. And I actually
>>>>> think it is not aligned conceptually.  As mentioned, Flink does not have
>>>>> the concept of bundles at all. It achieves fault tolerance via
>>>>> checkpointing, essentially checkpoint barrier flowing from sources to
>>>>> si

Re: Runner Bundling Strategies

2023-09-26 Thread Reuven Lax via dev
Yes, not including FinishBundle in ParDoPayload seems like a mistake.
Though absence of FinishBundle doesn't mean that one can assume that
elements in a bundle don't affect subsequent bundle elements (i.e. there
might still be caching!)

On Tue, Sep 26, 2023 at 8:54 AM Kenneth Knowles  wrote:

>
>
> On Mon, Sep 25, 2023 at 1:19 PM Jan Lukavský  wrote:
>
>> Hi Kenn and Reuven,
>>
>> I agree with all these points. The only issue here seems to be that
>> FlinkRunner does not fulfill these constraints. This is a bug that can be
>> fixed, though we need to change some defaults, as 1000 ms default bundle
>> "duration" for lower traffic Pipelines can be too much. We are also
>> probably missing some @ValidatesReunner tests for this. I created [1] and
>> [2] to track this.
>>
>> One question still remains, the bundle vs. element life-cycle is relevant
>> only for cases where processing of element X can affect processing of
>> element Y later in the same bundle. Once this influence is rules out (i.e.
>> no caching), this information can result in runner optimization that yields
>> better performance. Should we consider propagate this information from user
>> code to the runner?
>>
> Yes!
>
> This was the explicit goal of the move to annotation-driven DoFn in
> https://s.apache.org/a-new-dofn to make it so that the SDK and runner can
> get good information about what the DoFn requirements are.
>
> When there is no @FinishBundle method, the runner can make additional
> optimizations. This should have been included in the ParDoPayload in the
> proto when we moved to portable pipelines. I cannot remember if there was a
> good reason that we did not do so. Maybe we (incorrectly) thought that this
> was an issue that only the Java SDK harness needed to know about.
>
> Kenn
>
>
>> [1] https://github.com/apache/beam/issues/28649
>>
>> [2] https://github.com/apache/beam/issues/28650
>> On 9/25/23 18:31, Reuven Lax via dev wrote:
>>
>>
>>
>> On Mon, Sep 25, 2023 at 6:19 AM Jan Lukavský  wrote:
>>
>>>
>>> On 9/23/23 18:16, Reuven Lax via dev wrote:
>>>
>>> Two separate things here:
>>>
>>> 1. Yes, a watermark can update in the middle of a bundle.
>>> 2. The records in the bundle themselves will prevent the watermark from
>>> updating as they are still in flight until after finish bundle. Therefore
>>> simply caching the records should always be watermark safe, regardless of
>>> the runner. You will only run into problems if you try and move timestamps
>>> "backwards" - which is why Beam strongly discourages this.
>>>
>>> This is not aligned with  FlinkRunner's implementation. And I actually
>>> think it is not aligned conceptually.  As mentioned, Flink does not have
>>> the concept of bundles at all. It achieves fault tolerance via
>>> checkpointing, essentially checkpoint barrier flowing from sources to
>>> sinks, safely snapshotting state of each operator on the way. Bundles are
>>> implemented as a somewhat arbitrary set of elements between two consecutive
>>> checkpoints (there can be multiple bundles between checkpoints). A bundle
>>> is 'committed' (i.e. persistently stored and guaranteed not to retry) only
>>> after the checkpoint barrier passes over the elements in the bundle (every
>>> bundle is finished at the very latest exactly before a checkpoint). But
>>> watermark propagation and bundle finalization is completely unrelated. This
>>> might be a bug in the runner, but requiring checkpoint for watermark
>>> propagation will introduce insane delays between processing time and
>>> watermarks, every executable stage will delay watermark propagation until a
>>> checkpoint (which is typically the order of seconds). This delay would add
>>> up after each stage.
>>>
>>
>> It's not bundles that hold up processing, rather it is elements, and
>> elements are not considered "processed" until FinishBundle.
>>
>> You are right about Flink. In many cases this is fine - if Flink rolls
>> back to the last checkpoint, the watermark will also roll back, and
>> everything stays consistent. So in general, one does not need to wait for
>> checkpoints for watermark propagation.
>>
>> Where things get a bit weirder with Flink is whenever one has external
>> side effects. In theory, one should wait for checkpoints before letting a
>> Sink flush, otherwise one could end up with incorrect outputs (especially
>> with a sink like Tex

Re: Runner Bundling Strategies

2023-09-25 Thread Reuven Lax via dev
On Mon, Sep 25, 2023 at 6:19 AM Jan Lukavský  wrote:

>
> On 9/23/23 18:16, Reuven Lax via dev wrote:
>
> Two separate things here:
>
> 1. Yes, a watermark can update in the middle of a bundle.
> 2. The records in the bundle themselves will prevent the watermark from
> updating as they are still in flight until after finish bundle. Therefore
> simply caching the records should always be watermark safe, regardless of
> the runner. You will only run into problems if you try and move timestamps
> "backwards" - which is why Beam strongly discourages this.
>
> This is not aligned with  FlinkRunner's implementation. And I actually
> think it is not aligned conceptually.  As mentioned, Flink does not have
> the concept of bundles at all. It achieves fault tolerance via
> checkpointing, essentially checkpoint barrier flowing from sources to
> sinks, safely snapshotting state of each operator on the way. Bundles are
> implemented as a somewhat arbitrary set of elements between two consecutive
> checkpoints (there can be multiple bundles between checkpoints). A bundle
> is 'committed' (i.e. persistently stored and guaranteed not to retry) only
> after the checkpoint barrier passes over the elements in the bundle (every
> bundle is finished at the very latest exactly before a checkpoint). But
> watermark propagation and bundle finalization is completely unrelated. This
> might be a bug in the runner, but requiring checkpoint for watermark
> propagation will introduce insane delays between processing time and
> watermarks, every executable stage will delay watermark propagation until a
> checkpoint (which is typically the order of seconds). This delay would add
> up after each stage.
>

It's not bundles that hold up processing, rather it is elements, and
elements are not considered "processed" until FinishBundle.

You are right about Flink. In many cases this is fine - if Flink rolls back
to the last checkpoint, the watermark will also roll back, and everything
stays consistent. So in general, one does not need to wait for checkpoints
for watermark propagation.

Where things get a bit weirder with Flink is whenever one has external side
effects. In theory, one should wait for checkpoints before letting a Sink
flush, otherwise one could end up with incorrect outputs (especially with a
sink like TextIO). Flink itself recognizes this, and that's why they
provide TwoPhaseCommitSinkFunction
<https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html>
which
waits for a checkpoint. In Beam, this is the reason we introduced
RequiresStableInput. Of course in practice many Flink users don't do this -
in which case they are prioritizing latency over data correctness.

>
> Reuven
>
> On Sat, Sep 23, 2023 at 12:03 AM Jan Lukavský  wrote:
>
>> > Watermarks shouldn't be (visibly) advanced until @FinishBundle is
>> committed, as there's no guarantee that this work won't be discarded.
>>
>> There was a thread [1], where the conclusion seemed to be that updating
>> watermark is possible even in the middle of a bundle. Actually, handling
>> watermarks is runner-dependent (e.g. Flink does not store watermarks in
>> checkpoints, they are always recomputed from scratch on restore).
>>
>> [1] https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv
>> On 9/22/23 21:47, Robert Bradshaw via dev wrote:
>>
>> On Fri, Sep 22, 2023 at 10:58 AM Jan Lukavský  wrote:
>>
>>> On 9/22/23 18:07, Robert Bradshaw via dev wrote:
>>>
>>> On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev 
>>> wrote:
>>>
>>>> I've actually wondered about this specifically for streaming... if
>>>> you're writing a pipeline there it seems like you're often going to want to
>>>> put high fixed cost things like database connections even outside of the
>>>> bundle setup. You really only want to do that once in the lifetime of the
>>>> worker itself, not the bundle. Seems like having that boundary be somewhere
>>>> other than an arbitrarily (and probably small in streaming to avoid
>>>> latency) group of elements might be more useful? I suppose this depends
>>>> heavily on the object lifecycle in the sdk worker though.
>>>>
>>>
>>> +1. This is the difference between @Setup and @StartBundle. The
>>> start/finish bundle operations should be used for bracketing element
>>> processing that must be committed as a unit for correct failure recovery
>>> (e.g. if elements are cached in ProcessElement, they should all be emitted
>>>

Re: Runner Bundling Strategies

2023-09-23 Thread Reuven Lax via dev
Two separate things here:

1. Yes, a watermark can update in the middle of a bundle.
2. The records in the bundle themselves will prevent the watermark from
updating as they are still in flight until after finish bundle. Therefore
simply caching the records should always be watermark safe, regardless of
the runner. You will only run into problems if you try and move timestamps
"backwards" - which is why Beam strongly discourages this.

Reuven

On Sat, Sep 23, 2023 at 12:03 AM Jan Lukavský  wrote:

> > Watermarks shouldn't be (visibly) advanced until @FinishBundle is
> committed, as there's no guarantee that this work won't be discarded.
>
> There was a thread [1], where the conclusion seemed to be that updating
> watermark is possible even in the middle of a bundle. Actually, handling
> watermarks is runner-dependent (e.g. Flink does not store watermarks in
> checkpoints, they are always recomputed from scratch on restore).
>
> [1] https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv
> On 9/22/23 21:47, Robert Bradshaw via dev wrote:
>
> On Fri, Sep 22, 2023 at 10:58 AM Jan Lukavský  wrote:
>
>> On 9/22/23 18:07, Robert Bradshaw via dev wrote:
>>
>> On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev 
>> wrote:
>>
>>> I've actually wondered about this specifically for streaming... if
>>> you're writing a pipeline there it seems like you're often going to want to
>>> put high fixed cost things like database connections even outside of the
>>> bundle setup. You really only want to do that once in the lifetime of the
>>> worker itself, not the bundle. Seems like having that boundary be somewhere
>>> other than an arbitrarily (and probably small in streaming to avoid
>>> latency) group of elements might be more useful? I suppose this depends
>>> heavily on the object lifecycle in the sdk worker though.
>>>
>>
>> +1. This is the difference between @Setup and @StartBundle. The
>> start/finish bundle operations should be used for bracketing element
>> processing that must be committed as a unit for correct failure recovery
>> (e.g. if elements are cached in ProcessElement, they should all be emitted
>> in FinishBundle). On the other hand, things like open database connections
>> can and likely should be shared across bundles.
>>
>> This is correct, but the caching between @StartBundle and @FinishBundle
>> has some problems. First, users need to manually set watermark hold for
>> min(timestamp in bundle), otherwise watermark might overtake the buffered
>> elements.
>>
>
> Watermarks shouldn't be (visibly) advanced until @FinishBundle is
> committed, as there's no guarantee that this work won't be discarded.
>
>
>> Users don't have other option than using timer.withOutputTimestamp for
>> that, as we don't have a user-facing API to set watermark hold otherwise,
>> thus the in-bundle caching implies stateful DoFn. The question might then
>> by, why not use "classical" stateful caching involving state, as there is
>> full control over the caching in user code. This triggered me an idea if it
>> would be useful to add the information about caching to the API (e.g. in
>> Java @StartBundle(caching=true)), which could solve the above issues maybe
>> (runner would know to set the hold, it could work with "stateless" DoFns)?
>>
>
> Really, this is one of the areas that the streaming/batch abstraction
> leaks. In batch it was a common pattern to have local DoFn instance state
> that persisted from start to finish bundle, and these were also used as
> convenient entry points for other operations (like opening
> database connections) 'cause bundles were often "as large as possible."
> WIth the advent of n streaming it makes sense to put this in
> explicitly managed runner state to allow for cross-bundle amortization and
> there's more value in distinguishing between @Setup and @StartBundle.
>
> (Were I do to things over I'd probably encourage an API that discouraged
> non-configuration instance state on DoFns altogether, e.g. in the notion of
> Python context managers (and an equivalent API could probably be put
> together with AutoClosables in Java) one would have something like
>
> ParDo(X)
>
> which would logically (though not necessarily physically) lead to an
> execution like
>
> with X.bundle_processor() as bundle_processor:
>   for bundle in bundles:
> with bundle_processor.element_processor() as process:
>   for element in bundle:
> process(element)
>
> where the traditional setup/start_bundle/finish_bundle/teardown logic
> would live in the __enter__ and __exit__ methods (made even easier with
> coroutines.) For convenience one could of course provide a raw bundle
> processor or element processor to ParDo if the enter/exit contexts are
> trivial. But this is getting somewhat off-topic...
>
>
>>
>>>
>>> Best,
>>> B
>>>
>>> On Fri, Sep 22, 2023 at 7:03 AM Kenneth Knowles  wrote:
>>>
 (I notice that you replied only to yourself, but there has been a whole
 thread of discussion on this 

Re: Contribution of Asgarde: Error Handling for Beam?

2023-09-04 Thread Reuven Lax via dev
Let's be careful about whether these tests are included in our presubmits.
Contrib code with flaky tests has been a major pain point in the past.

On Sat, Sep 2, 2023 at 12:02 PM Austin Bennett  wrote:

> Wanting us to not miss this. @Mazlum TOSUN  is
> happy to donate Asgarde to our project.
>
> It looks like he'd need a SGA and CCLA [ 1 ] on file; anything else?
>
> I recalled the donation of Euphoria [ 2 ] , so I looked at those threads [
> 3 ]  for insights into the process.  It didn't look like there was a needed
> VOTE, so mostly a matter of ensuring necessary signatures, and ideally some
> sort of consensus [ or non-opposition ] to the donation.
>
>
> [ 1 ] https://www.apache.org/licenses/contributor-agreements.html
> [ 2 ] https://beam.apache.org/documentation/sdks/java/euphoria/
> [ 3 ] https://lists.apache.org/thread/xzlx4rm2tvc36mmwvhyvtdvsw7bnjscp
>
>
>
> On Thu, Jun 15, 2023 at 7:05 AM Kerry Donny-Clark via dev <
> dev@beam.apache.org> wrote:
>
>> This looks like an excellent contribution. I can easily understand the
>> motivation, and I think Beam would benefit from a higher level abstraction
>> for error handling.
>> Kerry
>>
>> On Wed, Jun 14, 2023, 6:31 PM Austin Bennett  wrote:
>>
>>> Hi Beam Devs,
>>>
>>> @Mazlum  was
>>> suggested to consider donating Asgarde
>>>  to Beam for Java/Kotlin error
>>> handling to Beam [ see:
>>> https://2022.beamsummit.org/sessions/error-handling-asgarde/ for last
>>> year's Beam Summit talk ], he is also the author of Pasgard
>>> e [ for Python ] and Milgard [
>>> for a simplified Kotlin API ].
>>>
>>> Would Asgarde be a good contribution, something the Beam community would
>>> be willing to accept?  I imagine we might want it to live at
>>> github.com/apache/beam-asgarde ?  Or perhaps there is a good place in
>>> github.com/apache/beam ??
>>>
>>> Especially once/if officially part of Beam, I imagine we'd add follow-up
>>> items like getting onto the website/docs, and related.
>>>
>>> Cheers,
>>> Austin
>>>
>>>
>>> P.S.  This might warrant separate/additional conversations for his other
>>> libraries, but let's focus any discussion on Asgarde for now?
>>>
>>


Re: ByteBuddy ClassLoadingStrategy.Default.INJECTION vs getClassLoadingStrategy

2023-07-21 Thread Reuven Lax via dev
Curious why these failing tests didn't block submission.

For now rollback seems to be the simplest option. However is there a path
forward on Java 11, or is our model irretrievably broken on Java 11?

On Fri, Jul 21, 2023 at 8:57 AM Kenneth Knowles  wrote:

> This is a tricky situation that I don't know how to resolve best. Here are
> some pieces of information I know:
>
> 1. The reason we put certain generated classes in the same package is
> because of Java's package-private access restriction. If they are in
> another package the generated wrapper won't be able to invoke the needed
> functions. I know this applies to a generated DoFnInvoker. I don't know if
> it applies here.
>
> 2. The current status for Beam is that Beam itself is only
> expected/required to be able to build with Java 8 and/or produce Java 8
> compatible bytecode, but users should be able to use it with their own Java
> 11 or Java 17 code. This makes the testing scenario a bit tricky. We do
> have tests that model this scenario but they did not catch this I guess.
>
> On Mon, Jul 17, 2023 at 1:19 AM Damon Douglas 
> wrote:
>
>> Good day, everyone,
>>
>> For clarity, I organize the following into situation, background,
>> assessment, and proposal.
>>
>> Best,
>>
>> Damon
>>
>> -
>>
>> Situation
>>
>> Issue #26981 reports an IllegalArgumentException associated with the
>> ByteBuddy dependency throwing the message " must be defined in
>> the same package as "[1]. I personally discovered this
>> error blocking my own Schema-related tests.
>>
>> Background
>>
>> *1. PR #25578 introduced the error*
>>
>> As Issue #26981 reports[1], the error seems to be introduced with 2.48.
>> Comparing v2.47.0 and v2.48.0[2] reveals that PR #25578 may have introduced
>> this breaking change[3]. Said PR replaced ByteBuddy's
>> ClassLoadingStrategy.Default.INJECTION[4] with getClassLoadingStrategy[5].
>>
>> *2. Reverting PR #25578 resolves the error*
>>
>> To test this hypothesis, I cloned 41e6628 and ran:
>>
>> ./gradlew :sdks:java:core:check
>>
>> revealing several failing tests (see *Failing :sdks:java:core:check at
>> 41e6628* below), some of which contained the
>> familiar IllegalArgumentException " must be defined in the same
>> package as " message.
>>
>> After reverting changes found in #25578, the failing tests and the
>> IllegalArgumentException were resolved.
>>
>> *3. Code related to PR #25578 has a back and forth history*
>>
>> There seems to be a back and forth removal and replacement history[6]
>> between ByteBuddy's ClassLoadingStrategy.Default.INJECTION
>> and getClassLoadingStrategy most recently PR #25578. Said PR's motivation
>> is to prepare Beam for Java 17 compatibility, which explains the
>> re-introduction of the breaking changes.
>>
>> *4. PR #25578 GitHub Actions checks pass*
>>
>> Examining the GitHub actions run reveals that PR #25578 checks passed[7].
>> However, examining the setup[8] more closely reveals that Java tests are
>> executed using Java Version 8. The same is true in the latest 41e6628
>> commit[9].  To test whether the version of Java drives Issue #26981's
>> error, I submitted a draft PR[10] with the version of Java set to 11 and
>> found that the same errors resulted[11] as I found on my machine using the
>> same Java version.
>>
>> Assessment
>>
>> My main impression is that:
>>
>>1. checks did not reveal PR #25578's breaking changes[7] because the
>>environment[8] used Java 8 instead of 11
>>2. the back and forth removal and addition of PR #25578's changes
>>does not solve current and future Java version compatibilities
>>
>> Proposal
>>
>> May we consider:
>>
>>1. If not already planned, set
>>.github/actions/setup-self-hosted-action/action.yml's Java version[12] to
>>11.
>>2. arriving at a consensus regarding PR #25578's breaking changes and
>>what we need to do today and in the future; I don't have anything 
>> practical
>>to propose or recommend
>>
>> References
>>
>>1. https://github.com/apache/beam/issues/26981
>>2. https://github.com/apache/beam/compare/v2.47.0...v2.48.0
>>3. https://github.com/apache/beam/pull/25578
>>4.
>>
>> https://javadoc.io/static/net.bytebuddy/byte-buddy/1.12.23/net/bytebuddy/dynamic/loading/ClassLoadingStrategy.Default.html#INJECTION
>>5.
>>
>> https://github.com/apache/beam/blob/68e19a596a5d0136ba4592be01888f487463c2f3/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteBuddyUtils.java#L32
>>6.
>>
>> https://github.com/apache/beam/commits/68e19a596a5d0136ba4592be01888f487463c2f3/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteBuddyUtils.java
>>7.
>>https://github.com/apache/beam/actions/runs/4759753495/jobs/8459342173
>>8.
>>
>> https://github.com/apache/beam/actions/runs/4759753495/jobs/8459342173#step:3:28
>>9.
>>
>> https://github.com/apache/beam/actions/runs/5558349809/jobs/10153336615#step:3:16
>>10.
>>
>> https

Re: Proposal to reduce the steps to make a Java transform portable

2023-06-22 Thread Reuven Lax via dev
The goal was to make schema transforms as efficient as hand-written coders.
We found the avro encoding/decoding to often be quite inefficient, which is
one reason we didn't use it for schemas.

Our schema encoding is internal to Beam though, and not suggested for use
external to a pipeline. For sources or sinks we still recommend using Avro
(or proto).

On Thu, Jun 22, 2023 at 4:14 PM Robert Bradshaw  wrote:

> On Thu, Jun 22, 2023 at 2:19 PM Ahmed Abualsaud 
> wrote:
>
>> Thank you all for your input. I have a PR for the changes I mentioned in
>> my initial email: https://github.com/apache/beam/pull/27202. Please
>> review when you get a chance!
>>
>> > perhaps we should consider just going to something Avro for portable
>> coding rather than something custom
>>
>> Did you mean using some Avro object (GenericRecord?) besides Beam Row
>> elements? We would still run into the problem Cham mentioned earlier (of
>> making sure existing PTransform inputs/outputs are compatible with
>> cross-language-valid types).
>>
>
> I don't remember why Avro was rejected in favor of our own encoding
> format, but it probably doesn't make sense to revisit that without
> understanding the full history. I do know that avro versioning and diamond
> dependencies cause a lot of pain for users and there's a concerted effort
> to remove Avro from Beam core altogether.
>
> In any case, this is quite orthogonal to the proposal here which we should
> move forward on.
>
>
>> On Tue, May 30, 2023 at 10:53 PM Byron Ellis 
>> wrote:
>>
>>> Sure, I get that… though perhaps we should consider just going to
>>> something Avro for portable coding rather than something custom.
>>>
>>> On Tue, May 30, 2023 at 2:22 PM Chamikara Jayalath 
>>> wrote:
>>>
 Input/output PCollection types at least have to be portable Beam types
 [1] for cross-language to work.

 I think we restricted schema-aware transforms to PCollection since
 Row was expected to be an efficient replacement for arbitrary portable Beam
 types (not sure how true that is in practice currently).

 Thanks,
 Cham

 [1]
 https://github.com/apache/beam/blob/b9730952a7abf60437ee85ba2df6dd30556d6560/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L829

 On Tue, May 30, 2023 at 1:54 PM Byron Ellis 
 wrote:

> Is it actually necessary for a PTransform that is configured via the
> Schema mechanism to also be one that uses RowCoder? Those strike me as two
> separate concerns and unnecessarily limiting.
>
> On Tue, May 30, 2023 at 1:29 PM Chamikara Jayalath <
> chamik...@google.com> wrote:
>
>> +1 for the simplification.
>>
>> On Tue, May 30, 2023 at 12:33 PM Robert Bradshaw 
>> wrote:
>>
>>> Yeah. Essentially one needs do (1) name the arguments and (2)
>>> implement the transform. Hopefully (1) could be done in a concise way 
>>> that
>>> allows for easy consumption from both Java and cross-language.
>>>
>>
>> +1 but I think the hard part today is to convert existing PTransforms
>> to be schema-aware transform compatible (for example, change input/output
>> types and make sure parameters take Beam Schema compatible types). But 
>> this
>> makes sense for new transforms.
>>
>>
>>
>>> On Tue, May 30, 2023 at 12:25 PM Byron Ellis 
>>> wrote:
>>>
 Or perhaps the other way around? If you have a Schema we can
 auto-generate the associated builder on the PTransform? Either way, 
 more
 DRY.

 On Tue, May 30, 2023 at 10:59 AM Robert Bradshaw via dev <
 dev@beam.apache.org> wrote:

> +1 to this simplification, it's a historical artifact that
> provides no value.
>
> I would love it if we also looked into ways to auto-generate the
> SchemaTransformProvider (e.g. via introspection if a transform takes a
> small number of arguments, or uses the standard builder pattern...),
> ideally with something as simple as adding a decorator to the 
> PTransform
> itself.
>
>
> On Tue, May 30, 2023 at 7:42 AM Ahmed Abualsaud via dev <
> dev@beam.apache.org> wrote:
>
>> Hey everyone,
>>
>> I was looking at how we use SchemaTransforms in our expansion
>> service. From what I see, there may be a redundant step in developing
>> SchemaTransforms. Currently, we have 3 pieces:
>> - SchemaTransformProvider [1]
>> - A configuration object
>> - SchemaTransform [2]
>>
>> The API is generally used like this:
>> 1. The SchemaTransformProvider takes a configuration object and
>> returns a SchemaTransform
>> 2. The SchemaTransform is used to build a PTransform according to
>> the configuration
>>
>> In these steps, the S

Re: [VOTE] Release 2.48.0 release candidate #2

2023-05-30 Thread Reuven Lax via dev
+1 (binding)

On Tue, May 30, 2023 at 2:43 PM Ahmet Altay via dev 
wrote:

> +1 (binding)
>
> On Tue, May 30, 2023 at 2:01 PM Ritesh Ghorse via dev 
> wrote:
>
>> Thanks Danny and Jack! Dataflow containers are up!
>>
>> Only PMC votes count but feel free to test your use cases and vote on
>> this thread!
>>
>> On Tue, May 30, 2023 at 11:26 AM Alexey Romanenko <
>> aromanenko@gmail.com> wrote:
>>
>>> +1 (binding)
>>>
>>> Tested with  https://github.com/Talend/beam-samples/
>>> (Java SDK v8/v11/v17, Spark 3.x runner).
>>>
>>> On 27 May 2023, at 19:38, Bruno Volpato via dev 
>>> wrote:
>>>
>>> I was able to check that containers are all there and complete
>>> my validation.
>>>
>>> +1 (non-binding).
>>>
>>> Tested with https://github.com/GoogleCloudPlatform/DataflowTemplates (Java
>>> SDK 11, Dataflow runner).
>>>
>>>
>>> Thanks Ritesh and Danny!
>>>
>>> On Fri, May 26, 2023 at 10:09 AM Danny McCormick via dev <
>>> dev@beam.apache.org> wrote:
>>>
 It looks like some Dataflow containers didn't get published, so some
 jobs using the legacy runner (runner v2 disabled) will fail. I kicked off
 the container release, so that should hopefully be available later today.

 Thanks,
 Danny

 On Thu, May 25, 2023 at 11:19 PM Ritesh Ghorse via dev <
 dev@beam.apache.org> wrote:

> Hi everyone,
> Please review and vote on the release candidate #2 for the version
> 2.48.0, as follows:
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
>
>
> Reviewers are encouraged to test their own use cases with the release
> candidate, and vote +1 if no issues are found. Only PMC member votes will
> count towards the final vote, but votes from all community members are
> encouraged and helpful for finding regressions; you can either test your
> own use cases or use cases from the validation sheet [10].
>
> The complete staging area is available for your review, which includes:
> * GitHub Release notes [1],
> * the official Apache source release to be deployed to dist.apache.org 
> [2],
> which is signed with the key with fingerprint
> E4C74BEC861570F5A3E44E46280A0AC32DBAE62B [3],
> * all artifacts to be deployed to the Maven Central Repository [4],
> * source code tag "v2.48.0-RC2" [5],
> * website pull request listing the release [6], the blog post [6], and
> publishing the API reference manual [7] (to be generated).
> * Java artifacts were built with Gradle 7.5.1 and OpenJDK/Oracle JDK
> 8.0.322.
> * Python artifacts are deployed along with the source release to the
> dist.apache.org [2] and PyPI[8].
> * Go artifacts and documentation are available at pkg.go.dev [9]
> * Validation sheet with a tab for 2.48.0 release to help with
> validation [10].
> * Docker images published to Docker Hub [11].
> * PR to run tests against release branch [12].
>
> The vote will be open for at least 72 hours. It is adopted by majority
> approval, with at least 3 PMC affirmative votes.
>
> For guidelines on how to try the release in your projects, check out
> our blog post at /blog/validate-beam-release/.
>
> *NOTE: Dataflow containers for Python are not finalized yet (likely to
> happen on tuesday). I will follow up on this thread once that is done. 
> Feel
> free to test it on other runners until then. *
>
> Thanks,
> Ritesh Ghorse
>
> [1] https://github.com/apache/beam/milestone/12
> [2] https://dist.apache.org/repos/dist/dev/beam/2.48.0/
> [3] https://dist.apache.org/repos/dist/release/beam/KEYS
> [4]
> https://repository.apache.org/content/repositories/orgapachebeam-1346/
> [5] https://github.com/apache/beam/tree/v2.48.0-RC2
> [6] https://github.com/apache/beam/pull/26903
> [7] https://github.com/apache/beam-site/pull/645
> [8] https://pypi.org/project/apache-beam/2.48.0rc2/
> [9]
> https://pkg.go.dev/github.com/apache/beam/sdks/v2@v2.48.0-RC2/go/pkg/beam
> 
> [10]
> https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=458120434
> [11] https://hub.docker.com/search?q=apache%2Fbeam&type=image
> [12] https://github.com/apache/beam/pull/26811
>
>
>>>


Re: [VOTE] Release 2.46.0, release candidate #1

2023-04-28 Thread Reuven Lax via dev
Those particular errors are often expected in the sink due to the protocol
used. If a work item retries before committing (which could happen for many
reasons including worker crashes), it will experience those errors.

On Fri, Apr 28, 2023 at 12:55 PM Ahmed Abualsaud 
wrote:

> @Danny McCormick  @Reuven Lax
>  sorry it's been a while since you looked into this,
> but do you remember if the fix in #25642
>  issue is related to the
> recent "ALREADY_EXISTS: The offset is within stream, expected offset..."
>  errors?
>
> On Fri, Mar 10, 2023 at 7:47 PM Ahmet Altay via dev 
> wrote:
>
>> Thank you!
>>
>> Is there a tracking issue for this known issue? And would the known
>> issues section of the release notes link to that?
>>
>>
>> On Fri, Mar 10, 2023 at 11:38 AM Danny McCormick via dev <
>> dev@beam.apache.org> wrote:
>>
>>> We determined that the same issue exists in the 2.45 release, so we are
>>> going to continue finalizing the release candidate. Thank you for your
>>> patience.
>>>
>>> Thanks,
>>> Danny
>>>
>>> On Wed, Mar 8, 2023 at 3:15 PM Reuven Lax  wrote:
>>>
 We are trying to reproduce and debug the issue we saw to validate
 whether it was a real regression or not. Will update when we know more.

 On Wed, Mar 8, 2023 at 11:31 AM Danny McCormick <
 dannymccorm...@google.com> wrote:

>
> @Reuven Lax  found a new potential regression in
> BigQuery I/O, so I have paused the release rollout. I had already pushed
> the Python artifacts and Go tags, but not the Java ones. We have since
> temporarily yanked  the Python release
> and deleted the Go tags, they were live for around an hour. The possible
> regression is in Java, so neither of those releases should be affected, 
> but
> x-lang may not work properly because it depends on versioning. I will
> update this thread with next steps when we know more.
>
> Thanks,
> Danny
> On Wed, Mar 8, 2023 at 5:59 AM Jan Lukavský  wrote:
>
>> +1 (binding)
>>
>> Tested Java SDK with Flink and Spark 3 runner.
>>
>> Thanks,
>>  Jan
>>
>> On 3/8/23 01:53, Valentyn Tymofieiev via dev wrote:
>>
>> +1. Verified the composition of Python containers and ran Python
>> pipelines on Dataflow runner v1 and runner v2.
>>
>> On Tue, Mar 7, 2023 at 4:11 PM Ritesh Ghorse via dev <
>> dev@beam.apache.org> wrote:
>>
>>> +1 (non-binding)
>>> Validated Go SDK quickstart on direct and dataflow runner
>>>
>>> On Tue, Mar 7, 2023 at 10:54 AM Alexey Romanenko <
>>> aromanenko@gmail.com> wrote:
>>>
 +1 (binding)

 Tested with  https://github.com/Talend/beam-samples/
 (Java SDK v8/v11/v17, Spark 3.x runner).

 ---
 Alexey

 On 7 Mar 2023, at 07:38, Ahmet Altay via dev 
 wrote:

 +1 (binding) - I validated python quickstarts on direct & dataflow
 runners.

 Thank you for doing the release!

 On Sat, Mar 4, 2023 at 8:01 AM Chamikara Jayalath via dev <
 dev@beam.apache.org> wrote:

> +1 (binding)
>
> Validated multi-language Java and Python pipelines.
>
> On Fri, Mar 3, 2023 at 1:59 PM Danny McCormick via dev <
> dev@beam.apache.org> wrote:
>
>> > I have encountered a failure in a Python pipeline running with
>> Runner v1:
>>
>> > RuntimeError: Beam SDK base version 2.46.0 does not match
>> Dataflow Python worker version 2.45.0. Please check Dataflow worker 
>> startup
>> logs and make sure that correct version of Beam SDK is installed.
>>
>> > We should understand why Python ValidatesRunner tests (which
>> have passed)  didn't catch this error.
>>
>> > This can be remediated in Dataflow containers without  changes
>> to the release candidate.
>>
>> Good catch! I've kicked off a release to fix this, it should be
>> done later this evening - I won't be available when it completes, 
>> but I
>> would expect it to be around 5:00 PST.
>>
>> On Fri, Mar 3, 2023 at 3:49 PM Danny McCormick <
>> dannymccorm...@google.com> wrote:
>>
>>> Hey Reuven, could you provide some more context on the bug/why
>>> it is important? Does it meet the standard in
>>> https://beam.apache.org/contribute/release-guide/#7-triage-release-blocking-issues-in-github?
>>>
>>>
>>> The release branch was cut last Wednesday, so that is why it is
>>> not included.
>>>
>>
> Seems like this was a revert of a previous commit that was also
> not included in the 2.46.0 release branch (
> https://github.com/apache/beam/pull/

Re: [DESIGN] Beam Triggered side input specification

2023-03-28 Thread Reuven Lax via dev
On Tue, Mar 28, 2023 at 12:39 AM Jan Lukavský  wrote:

>
> On 3/27/23 19:44, Reuven Lax via dev wrote:
>
>
>
> On Mon, Mar 27, 2023 at 5:43 AM Jan Lukavský  wrote:
>
>> Hi,
>>
>> I'd like to clarify my understanding. Side inputs generally perform a
>> left (outer) join, LHS side is the main input, RHS is the side input.
>>
>
> Not completely - it's more of what I would call a nested-loop join. I.e.
> if the side input changes _nothing happens_ until a new element arrives on
> the LHS. This isn't quite the same as a left-outer join.
>
> +1. This makes sense, my description was a slight simplification.
>
>
> Doing streaming left join requires watermark synchronization, thus
>> elements from the main input are buffered until main_input_timestamp >
>> side_input_watermark. When side input watermark reaches max watermark, main
>> inputs do not have to be buffered because the side input will not change
>> anymore. This works well for cases when the side input is bounded or in the
>> case of "slowly changing" patterns (ideally with perfect watermarks, so no
>> late data present).
>>
>
> This is true for non-triggered side inputs. Triggered side inputs have
> always been different - the main-input elements are buffered until the
> first triggered value of the side input is available.
>
> I walked again through the code in SimplePushBackSideInputDoFnRunner and
> looks like this is correct, the runner actually does not wait for
> watermark, but for "ready windows", which implies what you say. With
> suitable trigger (AfterWatermark.pastEndOfWindow() this coincides with the
> watermark of end of the window.
>
>
>
>> Allowing arbitrary changes in the side input (with arbitrary triggers)
>> might introduce additional questions - how to handle late data in the side
>> input? Full implementation would require retractions. Dropping late data
>> does not feel like a solution, because then the pipeline would not converge
>> to the "correct" solution, as the side input might hold incorrect value
>> forever. Applying late data from the processing time the DoFn receives them
>> could make the downstream processing unstable, restarting the pipeline on
>> errors might change what is "on time" and what is late thus generate
>> inconsistent different results.
>>
> BTW, triggered side inputs have always been available. The problem Kenn is
> addressing is that nobody has ever written down the spec! There was a spec
> in mind when they were implemented, but the fact that this was not written
> has always been problematic (and especially so when creating the portable
> runner).
>
> Triggered side inputs have always had some non-determinstic behavior, not
> just for late data. Side inputs are cached locally on the reader, so
> different reading workers might have different views on what the latest
> trigger was.
>
> Makes sense, is this a design decision? I can imagine that waiting for
> side input watermark unconditionally adds latency, on the other hand an
> "unexpected" non-deterministic behavior can confuse users. This type of
> non-determinism after pipeline failure and recovery is even the most hard
> to debug. If we would document (and possibly slightly reimplement) the
> triggered side-input spec, could we add (optional) way to make the
> processing deterministic via watermark sync?
>

Well yes it was (though as mentioned before, the fact that none of these
designs were even written into the spec is a problem), though in some ways
not a great one. The only global synchronization method we had was the
watermark/end of window, so if the source PCollection was triggered by
something else we lost that.This creates some unfortunate situations (in
particular I would not recommend using distributed Map-valued side inputs
with an early trigger - the behavior is probably not what one expects).
Part of the problem is that triggers themselves are non determistic.
Something like retractions would make this better but not completely.
Something better here would be great, but I'm still not sure what it would
be or if any of our runners could implement it.

IMO the least-confusing use of triggered side inputs is as a singleton
broadcast (though in that case, we might be better off just introducing a
broadcast variable concept into Beam, which would save users writing all of
the boilerplate code around side inputs).

>
>
>> It seems safe to process multiple triggers as long as the trigger does
>> not produce late data, though (i.e. early emitting). Processing possibly
>> late data might requires to buffer main input up while main_input_timestamp
>> > side_input_waterm

Re: [DESIGN] Beam Triggered side input specification

2023-03-27 Thread Reuven Lax via dev
On Mon, Mar 27, 2023 at 5:43 AM Jan Lukavský  wrote:

> Hi,
>
> I'd like to clarify my understanding. Side inputs generally perform a left
> (outer) join, LHS side is the main input, RHS is the side input.
>

Not completely - it's more of what I would call a nested-loop join. I.e. if
the side input changes _nothing happens_ until a new element arrives on the
LHS. This isn't quite the same as a left-outer join.

Doing streaming left join requires watermark synchronization, thus elements
> from the main input are buffered until main_input_timestamp >
> side_input_watermark. When side input watermark reaches max watermark, main
> inputs do not have to be buffered because the side input will not change
> anymore. This works well for cases when the side input is bounded or in the
> case of "slowly changing" patterns (ideally with perfect watermarks, so no
> late data present).
>

This is true for non-triggered side inputs. Triggered side inputs have
always been different - the main-input elements are buffered until the
first triggered value of the side input is available.


> Allowing arbitrary changes in the side input (with arbitrary triggers)
> might introduce additional questions - how to handle late data in the side
> input? Full implementation would require retractions. Dropping late data
> does not feel like a solution, because then the pipeline would not converge
> to the "correct" solution, as the side input might hold incorrect value
> forever. Applying late data from the processing time the DoFn receives them
> could make the downstream processing unstable, restarting the pipeline on
> errors might change what is "on time" and what is late thus generate
> inconsistent different results.
>
BTW, triggered side inputs have always been available. The problem Kenn is
addressing is that nobody has ever written down the spec! There was a spec
in mind when they were implemented, but the fact that this was not written
has always been problematic (and especially so when creating the portable
runner).

Triggered side inputs have always had some non-determinstic behavior, not
just for late data. Side inputs are cached locally on the reader, so
different reading workers might have different views on what the latest
trigger was.


> It seems safe to process multiple triggers as long as the trigger does not
> produce late data, though (i.e. early emitting). Processing possibly late
> data might requires to buffer main input up while main_input_timestamp >
> side_input_watermark - allowed_lateness.
>
> Is my line of thinking correct?
>
>  Jan
> On 3/23/23 20:19, Kenneth Knowles wrote:
>
> Hi all,
>
> I had a great chat with +Reza Rokni  and +Reuven Lax
>  yesterday about some inconsistencies in side input
> behavior, both before and after portability was introduced.
>
> I wrote up my thoughts about how we should specify the semantics and
> implement them:
>
> https://s.apache.org/beam-triggered-side-inputs
>
> I think I found some issues that I think may require changes in the
> portability protocols to get consistent behavior.
>
> Please take a look and find my errors and oversights!
>
> Kenn
>
>


Re: [VOTE] Release 2.46.0, release candidate #1

2023-03-08 Thread Reuven Lax via dev
We are trying to reproduce and debug the issue we saw to validate whether
it was a real regression or not. Will update when we know more.

On Wed, Mar 8, 2023 at 11:31 AM Danny McCormick 
wrote:

>
> @Reuven Lax  found a new potential regression in
> BigQuery I/O, so I have paused the release rollout. I had already pushed
> the Python artifacts and Go tags, but not the Java ones. We have since
> temporarily yanked  the Python release and
> deleted the Go tags, they were live for around an hour. The possible
> regression is in Java, so neither of those releases should be affected, but
> x-lang may not work properly because it depends on versioning. I will
> update this thread with next steps when we know more.
>
> Thanks,
> Danny
> On Wed, Mar 8, 2023 at 5:59 AM Jan Lukavský  wrote:
>
>> +1 (binding)
>>
>> Tested Java SDK with Flink and Spark 3 runner.
>>
>> Thanks,
>>  Jan
>>
>> On 3/8/23 01:53, Valentyn Tymofieiev via dev wrote:
>>
>> +1. Verified the composition of Python containers and ran Python
>> pipelines on Dataflow runner v1 and runner v2.
>>
>> On Tue, Mar 7, 2023 at 4:11 PM Ritesh Ghorse via dev 
>> wrote:
>>
>>> +1 (non-binding)
>>> Validated Go SDK quickstart on direct and dataflow runner
>>>
>>> On Tue, Mar 7, 2023 at 10:54 AM Alexey Romanenko <
>>> aromanenko@gmail.com> wrote:
>>>
 +1 (binding)

 Tested with  https://github.com/Talend/beam-samples/
 (Java SDK v8/v11/v17, Spark 3.x runner).

 ---
 Alexey

 On 7 Mar 2023, at 07:38, Ahmet Altay via dev 
 wrote:

 +1 (binding) - I validated python quickstarts on direct & dataflow
 runners.

 Thank you for doing the release!

 On Sat, Mar 4, 2023 at 8:01 AM Chamikara Jayalath via dev <
 dev@beam.apache.org> wrote:

> +1 (binding)
>
> Validated multi-language Java and Python pipelines.
>
> On Fri, Mar 3, 2023 at 1:59 PM Danny McCormick via dev <
> dev@beam.apache.org> wrote:
>
>> > I have encountered a failure in a Python pipeline running with
>> Runner v1:
>>
>> > RuntimeError: Beam SDK base version 2.46.0 does not match Dataflow
>> Python worker version 2.45.0. Please check Dataflow worker startup logs 
>> and
>> make sure that correct version of Beam SDK is installed.
>>
>> > We should understand why Python ValidatesRunner tests (which have
>> passed)  didn't catch this error.
>>
>> > This can be remediated in Dataflow containers without  changes to
>> the release candidate.
>>
>> Good catch! I've kicked off a release to fix this, it should be done
>> later this evening - I won't be available when it completes, but I would
>> expect it to be around 5:00 PST.
>>
>> On Fri, Mar 3, 2023 at 3:49 PM Danny McCormick <
>> dannymccorm...@google.com> wrote:
>>
>>> Hey Reuven, could you provide some more context on the bug/why it is
>>> important? Does it meet the standard in
>>> https://beam.apache.org/contribute/release-guide/#7-triage-release-blocking-issues-in-github?
>>>
>>>
>>> The release branch was cut last Wednesday, so that is why it is not
>>> included.
>>>
>>
> Seems like this was a revert of a previous commit that was also not
> included in the 2.46.0 release branch (
> https://github.com/apache/beam/pull/25627) ?
>
> If so we might not need a new RC but good to confirm.
>
> Thanks,
> Cham
>
>
>>> On Fri, Mar 3, 2023 at 3:24 PM Reuven Lax  wrote:
>>>
 If possible, I would like to see if we could include
 https://github.com/apache/beam/pull/25642 as we believe this bug
 has been impacting multiple users. This was merged 4 days ago, but 
 this RC
 cut does not seem to include it.

 On Fri, Mar 3, 2023 at 12:18 PM Valentyn Tymofieiev via dev <
 dev@beam.apache.org> wrote:

> I have encountered a failure in a Python pipeline running with
> Runner v1:
>
> RuntimeError: Beam SDK base version 2.46.0 does not match Dataflow
> Python worker version 2.45.0. Please check Dataflow worker startup 
> logs and
> make sure that correct version of Beam SDK is installed.
>
> We should understand why Python ValidatesRunner tests (which have
> passed)  didn't catch this error.
>
> This can be remediated in Dataflow containers without  changes to
> the release candidate.
>
> On Fri, Mar 3, 2023 at 11:22 AM Robert Bradshaw via dev <
> dev@beam.apache.org> wrote:
>
>> +1 (binding).
>>
>> I verified that the artifacts and signatures all look good, all
>> the
>> containers are pushed, and tested some pipelines with a fresh
>> install
>> from one of the Python wheels.
>>
>> On Fri, Mar 3, 20

Re: [VOTE] Release 2.46.0, release candidate #1

2023-03-03 Thread Reuven Lax via dev
If possible, I would like to see if we could include
https://github.com/apache/beam/pull/25642 as we believe this bug has been
impacting multiple users. This was merged 4 days ago, but this RC cut does
not seem to include it.

On Fri, Mar 3, 2023 at 12:18 PM Valentyn Tymofieiev via dev <
dev@beam.apache.org> wrote:

> I have encountered a failure in a Python pipeline running with Runner v1:
>
> RuntimeError: Beam SDK base version 2.46.0 does not match Dataflow Python
> worker version 2.45.0. Please check Dataflow worker startup logs and make
> sure that correct version of Beam SDK is installed.
>
> We should understand why Python ValidatesRunner tests (which have passed)
> didn't catch this error.
>
> This can be remediated in Dataflow containers without  changes to the
> release candidate.
>
> On Fri, Mar 3, 2023 at 11:22 AM Robert Bradshaw via dev <
> dev@beam.apache.org> wrote:
>
>> +1 (binding).
>>
>> I verified that the artifacts and signatures all look good, all the
>> containers are pushed, and tested some pipelines with a fresh install
>> from one of the Python wheels.
>>
>> On Fri, Mar 3, 2023 at 11:13 AM Danny McCormick
>>  wrote:
>> >
>> > > The released artifacts seem to be missing the last commit at
>> > >
>> https://github.com/apache/beam/commit/c528eab18b32342daed53b750fe330d30c7e5224
>> > > . Is this essential to the release, or just useful for validating it?
>> >
>> > It's strictly a test infrastructure change, it has no functional
>> impact. For context, the changes included were from
>> https://github.com/apache/beam/pull/25661 and
>> https://github.com/apache/beam/pull/25654, both were keeping integration
>> tests from running correctly.
>>
>> Thanks.
>>
>> > On Fri, Mar 3, 2023 at 2:09 PM Robert Bradshaw 
>> wrote:
>> >>
>> >> The released artifacts seem to be missing the last commit at
>> >>
>> https://github.com/apache/beam/commit/c528eab18b32342daed53b750fe330d30c7e5224
>> >> . Is this essential to the release, or just useful for validating it?
>> >>
>> >> On Fri, Mar 3, 2023 at 11:02 AM Danny McCormick
>> >>  wrote:
>> >> >
>> >> > Thanks for calling that out, and thanks for helping me fix it! We
>> should be all set now
>> >> >
>> >> > On Fri, Mar 3, 2023 at 1:38 PM Robert Bradshaw 
>> wrote:
>> >> >>
>> >> >> It appears your public key is not published in
>> >> >> https://dist.apache.org/repos/dist/release/beam/KEYS .
>> >> >>
>> >> >> On Fri, Mar 3, 2023 at 8:33 AM Anand Inguva via dev <
>> dev@beam.apache.org> wrote:
>> >> >> >
>> >> >> > +1 (non-binding)
>> >> >> > Tested python wordcount quick start
>> https://beam.apache.org/get-started/quickstart-py/ on Direct Runner and
>> Dataflow Runner.
>> >> >> >
>> >> >> > Thanks!
>> >> >> >
>> >> >> > On Fri, Mar 3, 2023 at 11:21 AM Bruno Volpato via dev <
>> dev@beam.apache.org> wrote:
>> >> >> >>
>> >> >> >> +1 (non-binding)
>> >> >> >>
>> >> >> >> Tested with
>> https://github.com/GoogleCloudPlatform/DataflowTemplates (Java SDK 11,
>> Dataflow runner).
>> >> >> >>
>> >> >> >>
>> >> >> >> Thanks Danny!
>> >> >> >>
>> >> >> >> On Thu, Mar 2, 2023 at 5:16 PM Danny McCormick via dev <
>> dev@beam.apache.org> wrote:
>> >> >> >>>
>> >> >> >>> Hi everyone,
>> >> >> >>> Please review and vote on release candidate #1 for the version
>> 2.46.0, as follows: [ ] +1, Approve the release [ ] -1, Do not approve the
>> release (please provide specific comments) Reviewers are encouraged to test
>> their own use cases with the release candidate, and vote +1 if no issues
>> are found. The complete staging area is available for your review, which
>> includes: * GitHub Release notes [1], * the official Apache source release
>> to be deployed to dist.apache.org [2], which is signed with the key with
>> fingerprint FC383FCDE7D7E86699954EF2509872C8031C4DFB [3], * all artifacts
>> to be deployed to the Maven Central Repository [4], * source code tag
>> "v2.46.0-RC1" [5], * website pull request listing the release [6], the blog
>> post [6], and publishing the API reference manual [7]. * Java artifacts
>> were built with Gradle GRADLE_VERSION and OpenJDK/Oracle JDK JDK_VERSION. *
>> Python artifacts are deployed along with the source release to the
>> dist.apache.org [2] and PyPI[8]. * Go artifacts and documentation are
>> available at pkg.go.dev [9] * Validation sheet with a tab for 2.46.0
>> release to help with validation [10]. * Docker images published to Docker
>> Hub [11].
>> >> >> >>> * PR to run tests against release branch [12]. The vote will be
>> open for at least 72 hours. It is adopted by majority approval, with at
>> least 3 PMC affirmative votes. For guidelines on how to try the release in
>> your projects, check out our blog post at /blog/validate-beam-release/.
>> Thanks, Danny [1] https://github.com/apache/beam/milestone/9 [2]
>> https://dist.apache.org/repos/dist/dev/beam/2.46.0/ [3]
>> https://dist.apache.org/repos/dist/release/beam/KEYS [4]
>> https://repository.apache.org/content/repositories/orgapachebeam-1306/
>> [5] https://github.c

Re: Consuming one PCollection before consuming another with Beam

2023-03-01 Thread Reuven Lax via dev
I'm not sure I understand this use case well. What are you planning on
doing with the BQ dataset if it were processed first? Were you planning on
caching information in memory? Storing data in Beam state? Something else?

On Wed, Mar 1, 2023 at 10:43 AM Kenneth Knowles  wrote:

>
>
> On Tue, Feb 28, 2023 at 5:14 PM Sahil Modak 
> wrote:
>
>> The number of keys/data in BQ would not be constant and grow with time.
>>
>> A rough estimate would be around 300k keys with an average size of 5kb
>> per key. Both the count of the keys and the size of the key would be
>> feature dependent (based on the upstream pipelines) and we won't have
>> control over this in the future.
>>
>> Using big query client would mean we would have to run individual queries
>> for each of these 300k keys from the BusinessLogic() dofn which operates in
>> a global window KV
>>
>> Also, the order of the data from BQ would not matter to us since the only
>> thing we are trying to solve here is regaining the state spec information
>> before starting to consume pub/sub.
>>
>
> I was referring to order in general, across your whole data set as an
> abstract concept. If order _really_ doesn't matter, then you wouldn't need
> to read the BQ data first. You could just flatten them together and run the
> pipeline like that. So I think there is some order-dependence that you want
> to represent at the data level.
>
> Kenn
>
>
>> I will explore using Wait.on(bigquery) before pub/sub read since I am not
>> sure if side input would be the best option here.
>>
>>
>> On Tue, Feb 28, 2023 at 8:44 AM Kenneth Knowles  wrote:
>>
>>> I'm also curious how much you depend on order to get the state contents
>>> right. The ordering of the side input will be arbitrary, and even the
>>> streaming input can have plenty of out of order messages. So I want to
>>> think about what are the data dependencies that result in the requirement
>>> of order. Or if there are none and you just want to know that all the past
>>> data has been processed, Niel's idea is one solution. It isn't parallel,
>>> though.
>>>
>>> Kenn
>>>
>>> On Mon, Feb 27, 2023 at 11:59 AM Reuven Lax  wrote:
>>>
>>>> How large is this state spec stored in BQ? If the size isn't too large,
>>>> you can read it from BQ and make it a side input into the DoFn.
>>>>
>>>> On Mon, Feb 27, 2023 at 11:06 AM Sahil Modak <
>>>> smo...@paloaltonetworks.com> wrote:
>>>>
>>>>> We are trying to re-initialize our state specs in the BusinessLogic()
>>>>> DoFn from BQ.
>>>>> BQ has data about the state spec, and we would like to make sure that
>>>>> the state specs in our BusinessLogic() dofn are initialized before it
>>>>> starts consuming the pub/sub.
>>>>>
>>>>> This is for handling the case of redeployment of the dataflow jobs so
>>>>> that the states are preserved and the BusinessLogic() can work seamlessly
>>>>> as it was previously. All our dofns are operating in a global window and 
>>>>> do
>>>>> not perform any aggregation.
>>>>>
>>>>> We are currently using Redis to preserve the state spec information
>>>>> but would like to explore using BQ as an alternative to Redis.
>>>>>
>>>>> On Fri, Feb 24, 2023 at 12:51 PM Kenneth Knowles 
>>>>> wrote:
>>>>>
>>>>>> My suggestion is to try to solve the problem in terms of what you
>>>>>> want to compute. Instead of trying to control the operational aspects 
>>>>>> like
>>>>>> "read all the BQ before reading Pubsub" there is presumably some reason
>>>>>> that the BQ data naturally "comes first", for example if its timestamps 
>>>>>> are
>>>>>> earlier or if there is a join or an aggregation that must include it.
>>>>>> Whenever you think you want to set up an operational dependency between 
>>>>>> two
>>>>>> things that "happen" in a pipeline, it is often best to pivot your 
>>>>>> thinking
>>>>>> to the data and what you are trying to compute, and the built-in
>>>>>> dependencies will solve the ordering problems.
>>>>>>
>>>>>> So - is there a way to describe your problem in terms of the data and
>>>>&g

Re: Consuming one PCollection before consuming another with Beam

2023-02-27 Thread Reuven Lax via dev
How large is this state spec stored in BQ? If the size isn't too large, you
can read it from BQ and make it a side input into the DoFn.

On Mon, Feb 27, 2023 at 11:06 AM Sahil Modak 
wrote:

> We are trying to re-initialize our state specs in the BusinessLogic() DoFn
> from BQ.
> BQ has data about the state spec, and we would like to make sure that the
> state specs in our BusinessLogic() dofn are initialized before it starts
> consuming the pub/sub.
>
> This is for handling the case of redeployment of the dataflow jobs so that
> the states are preserved and the BusinessLogic() can work seamlessly as it
> was previously. All our dofns are operating in a global window and do not
> perform any aggregation.
>
> We are currently using Redis to preserve the state spec information but
> would like to explore using BQ as an alternative to Redis.
>
> On Fri, Feb 24, 2023 at 12:51 PM Kenneth Knowles  wrote:
>
>> My suggestion is to try to solve the problem in terms of what you want to
>> compute. Instead of trying to control the operational aspects like "read
>> all the BQ before reading Pubsub" there is presumably some reason that the
>> BQ data naturally "comes first", for example if its timestamps are earlier
>> or if there is a join or an aggregation that must include it. Whenever you
>> think you want to set up an operational dependency between two things that
>> "happen" in a pipeline, it is often best to pivot your thinking to the data
>> and what you are trying to compute, and the built-in dependencies will
>> solve the ordering problems.
>>
>> So - is there a way to describe your problem in terms of the data and
>> what you are trying to compute?
>>
>> Kenn
>>
>> On Fri, Feb 24, 2023 at 10:46 AM Reuven Lax via dev 
>> wrote:
>>
>>> First PCollections are completely unordered, so there is no guarantee on
>>> what order you'll see events in the flattened PCollection.
>>>
>>> There may be ways to process the BigQuery data in a separate transform
>>> first, but it depends on the structure of the data. How large is the
>>> BigQuery table? Are you doing any windowed aggregations here?
>>>
>>> Reuven
>>>
>>> On Fri, Feb 24, 2023 at 10:40 AM Sahil Modak <
>>> smo...@paloaltonetworks.com> wrote:
>>>
>>>> Yes, this is a streaming pipeline.
>>>>
>>>> Some more details about existing implementation v/s what we want to
>>>> achieve.
>>>>
>>>> Current implementation:
>>>> Reading from pub-sub:
>>>>
>>>> Pipeline input = Pipeline.create(options);
>>>>
>>>> PCollection pubsubStream = input.apply("Read From Pubsub", 
>>>> PubsubIO.readMessagesWithAttributesAndMessageId()
>>>>
>>>> .fromSubscription(inputSubscriptionId))
>>>>
>>>>
>>>> Reading from bigquery:
>>>>
>>>> PCollection bqStream = input.apply("Read from BQ", BigQueryIO
>>>> .readTableRows().fromQuery(bqQuery).usingStandardSql())
>>>>
>>>> .apply("JSon Transform", AsJsons.of(TableRow.class));
>>>>
>>>>
>>>> Merge the inputs:
>>>>
>>>> PCollection mergedInput = 
>>>> PCollectionList.of(pubsubStream).and(bqStream).apply("Merge Input", 
>>>> Flatten.pCollections());
>>>>
>>>>
>>>>
>>>> Business Logic:
>>>>
>>>> mergedInput.apply("Business Logic", ParDo.of(new BusinessLogic()))
>>>>
>>>>
>>>>
>>>> Above logic is what we use currently in our pipeline.
>>>>
>>>> We want to make sure that we read from BigQuery first & pass the bqStream 
>>>> through our BusinessLogic() before we start consuming pubsubStream.
>>>>
>>>> Is there a way to achieve this?
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> Sahil
>>>>
>>>>
>>>> On Thu, Feb 23, 2023 at 10:21 PM Reuven Lax  wrote:
>>>>
>>>>> Can you explain this use case some more? Is this a streaming pipeline?
>>>>> If so, how are you reading from BigQuery?
>>>>>
>>>>> On Thu, Feb 23, 2023 at 10:06 PM Sahil Modak via dev <
>>>>> dev@beam.apache.org> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> We have a requirement wherein we are consuming input from pub/sub
>>>>>> (PubSubIO) as well as BQ (BQIO)
>>>>>>
>>>>>> We want to make sure that we consume the BQ stream first before we
>>>>>> start consuming the data from pub-sub. Is there a way to achieve this? 
>>>>>> Can
>>>>>> you please help with some code samples?
>>>>>>
>>>>>> Currently, we read data from big query using BigQueryIO into a
>>>>>> PCollection & also read data from pubsub using PubsubIO. We then use the
>>>>>> flatten transform in this manner.
>>>>>>
>>>>>> PCollection pubsubKvPairs = reads from pubsub using PubsubIO
>>>>>> PCollection bigqueryKvPairs = reads from bigquery using BigQueryIO
>>>>>>
>>>>>> kvPairs = 
>>>>>> PCollectionList.of(pubsubKvPairs).and(bigqueryKvPairs).apply("Merge 
>>>>>> Input", Flatten.pCollections());
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>> Sahil
>>>>>>
>>>>>>


Re: Consuming one PCollection before consuming another with Beam

2023-02-24 Thread Reuven Lax via dev
First PCollections are completely unordered, so there is no guarantee on
what order you'll see events in the flattened PCollection.

There may be ways to process the BigQuery data in a separate transform
first, but it depends on the structure of the data. How large is the
BigQuery table? Are you doing any windowed aggregations here?

Reuven

On Fri, Feb 24, 2023 at 10:40 AM Sahil Modak 
wrote:

> Yes, this is a streaming pipeline.
>
> Some more details about existing implementation v/s what we want to
> achieve.
>
> Current implementation:
> Reading from pub-sub:
>
> Pipeline input = Pipeline.create(options);
>
> PCollection pubsubStream = input.apply("Read From Pubsub", 
> PubsubIO.readMessagesWithAttributesAndMessageId()
>
> .fromSubscription(inputSubscriptionId))
>
>
> Reading from bigquery:
>
> PCollection bqStream = input.apply("Read from BQ", BigQueryIO
> .readTableRows().fromQuery(bqQuery).usingStandardSql())
>
> .apply("JSon Transform", AsJsons.of(TableRow.class));
>
>
> Merge the inputs:
>
> PCollection mergedInput = 
> PCollectionList.of(pubsubStream).and(bqStream).apply("Merge Input", 
> Flatten.pCollections());
>
>
>
> Business Logic:
>
> mergedInput.apply("Business Logic", ParDo.of(new BusinessLogic()))
>
>
>
> Above logic is what we use currently in our pipeline.
>
> We want to make sure that we read from BigQuery first & pass the bqStream 
> through our BusinessLogic() before we start consuming pubsubStream.
>
> Is there a way to achieve this?
>
>
> Thanks,
>
> Sahil
>
>
> On Thu, Feb 23, 2023 at 10:21 PM Reuven Lax  wrote:
>
>> Can you explain this use case some more? Is this a streaming pipeline? If
>> so, how are you reading from BigQuery?
>>
>> On Thu, Feb 23, 2023 at 10:06 PM Sahil Modak via dev 
>> wrote:
>>
>>> Hi,
>>>
>>> We have a requirement wherein we are consuming input from pub/sub
>>> (PubSubIO) as well as BQ (BQIO)
>>>
>>> We want to make sure that we consume the BQ stream first before we start
>>> consuming the data from pub-sub. Is there a way to achieve this? Can you
>>> please help with some code samples?
>>>
>>> Currently, we read data from big query using BigQueryIO into a
>>> PCollection & also read data from pubsub using PubsubIO. We then use the
>>> flatten transform in this manner.
>>>
>>> PCollection pubsubKvPairs = reads from pubsub using PubsubIO
>>> PCollection bigqueryKvPairs = reads from bigquery using BigQueryIO
>>>
>>> kvPairs = 
>>> PCollectionList.of(pubsubKvPairs).and(bigqueryKvPairs).apply("Merge Input", 
>>> Flatten.pCollections());
>>>
>>>
>>> Thanks,
>>> Sahil
>>>
>>>


Re: Consuming one PCollection before consuming another with Beam

2023-02-23 Thread Reuven Lax via dev
Can you explain this use case some more? Is this a streaming pipeline? If
so, how are you reading from BigQuery?

On Thu, Feb 23, 2023 at 10:06 PM Sahil Modak via dev 
wrote:

> Hi,
>
> We have a requirement wherein we are consuming input from pub/sub
> (PubSubIO) as well as BQ (BQIO)
>
> We want to make sure that we consume the BQ stream first before we start
> consuming the data from pub-sub. Is there a way to achieve this? Can you
> please help with some code samples?
>
> Currently, we read data from big query using BigQueryIO into a PCollection
> & also read data from pubsub using PubsubIO. We then use the flatten
> transform in this manner.
>
> PCollection pubsubKvPairs = reads from pubsub using PubsubIO
> PCollection bigqueryKvPairs = reads from bigquery using BigQueryIO
>
> kvPairs = PCollectionList.of(pubsubKvPairs).and(bigqueryKvPairs).apply("Merge 
> Input", Flatten.pCollections());
>
>
> Thanks,
> Sahil
>
>


Re: Thoughts on extensions/datasketches vs adding to the existing sketching library?

2023-01-18 Thread Reuven Lax via dev
I believe that when zetasketch was added, it was also noticeably more
efficient than other sketch implementations. However this was a number of
years ago, and I don't know whether it still has an advantage or not.

On Wed, Jan 18, 2023 at 10:41 AM Byron Ellis via dev 
wrote:

> Hi everyone,
>
> I was looking at adding at least a couple of the sketches from the Apache
> Datasketches library to the Beam Java SDK and I was wondering if folks had
> a preference for adding to the existing "sketching" extension vs splitting
> it out into its own extension?
>
> The reason I ask is that there's some overlap (which already exists in
> zetasketch) between the sketches available in Datasketches vs Beam today,
> particularly HyperLogLog which would have 3 implementations if we were to
> add all of them.
>
> I don't really have a strong opinion, though personally I'd probably lean
> towards a single sketching extension (zetasketch being something of a
> special case as it exists for format compatibility as far as I can tell).
> But I could see how that could be confusing if you had the Apache
> Datasketch implementation and the existing implementation derived from the
> clearspring implementations.
>
> Any thoughts?
>
> Best,
> B
>


Re: Beam Java SDK - ReadableState.read() shouldn't it be Nullable?

2023-01-03 Thread Reuven Lax via dev
Ah, that is fair. However right now that doesn't happen either.

On Tue, Jan 3, 2023 at 12:59 PM Luke Cwik  wrote:

> I think in general ReadableState.read() should not be @Nullable but we
> should allow for the overrides like ValueState to specify that T can
> be @Nullable while others like ListState we should have List<@Nullable T>.
>
> On Tue, Jan 3, 2023 at 12:37 PM Reuven Lax via dev 
> wrote:
>
>> It should be @Nullable - I'm not sure why that was removed.
>>
>> On Tue, Jan 3, 2023 at 12:18 PM Ahmet Altay via dev 
>> wrote:
>>
>>> Forwarding, because this message got lost in the list moderation.
>>>
>>> -- Forwarded message --
>>> From: Jeeno Lentin 
>>> To: dev@beam.apache.org
>>> Cc:
>>> Bcc:
>>> Date: Fri, 23 Dec 2022 00:36:55 -0500
>>> Subject: Beam Java SDK - ReadableState.read() shouldn't it be Nullable?
>>> Hi,
>>>
>>> We use the Beam Java SDK and are trying to upgrade version of Beam from
>>> version 2.31.0 to 2.43.0
>>>
>>> While upgrading, we noticed that @Nullable annotation has been removed
>>> from org.apache.beam.sdk.state.ReadableState.read()
>>>
>>> I traced it back to this PR: https://github.com/apache/beam/pull/16721
>>>
>>> We have the following concerns
>>> - If ReadableState.read() is really not nullable, shouldn’t there be a
>>> way to specify a default value when creating a state? Such a feature
>>> doesn’t seem to exist.
>>> - And what would it return initially when nothing is written to the
>>> state yet initially?
>>>
>>> Thank you,
>>> Jeeno
>>>
>>


Re: Beam Java SDK - ReadableState.read() shouldn't it be Nullable?

2023-01-03 Thread Reuven Lax via dev
It should be @Nullable - I'm not sure why that was removed.

On Tue, Jan 3, 2023 at 12:18 PM Ahmet Altay via dev 
wrote:

> Forwarding, because this message got lost in the list moderation.
>
> -- Forwarded message --
> From: Jeeno Lentin 
> To: dev@beam.apache.org
> Cc:
> Bcc:
> Date: Fri, 23 Dec 2022 00:36:55 -0500
> Subject: Beam Java SDK - ReadableState.read() shouldn't it be Nullable?
> Hi,
>
> We use the Beam Java SDK and are trying to upgrade version of Beam from
> version 2.31.0 to 2.43.0
>
> While upgrading, we noticed that @Nullable annotation has been removed
> from org.apache.beam.sdk.state.ReadableState.read()
>
> I traced it back to this PR: https://github.com/apache/beam/pull/16721
>
> We have the following concerns
> - If ReadableState.read() is really not nullable, shouldn’t there be a
> way to specify a default value when creating a state? Such a feature
> doesn’t seem to exist.
> - And what would it return initially when nothing is written to the state
> yet initially?
>
> Thank you,
> Jeeno
>


Re: [DISCUSS] Avro dependency update, design doc

2023-01-02 Thread Reuven Lax via dev
Be very careful with the auto schema stuff around Avro. These classes
dynamically inspect Avro-generated classes (to codegen schema accessors) so
it will be easy to break this in a way that is not seen at compile time.

On Mon, Jan 2, 2023 at 12:17 PM Alexey Romanenko 
wrote:

> Here is the recent update on the progress for this topic.
>
> After receiving a feedback on the design document [1] presented to
> community before and having the several discussions after (many thanks for
> this!), it was decided to take an “*option 4*” (*“Move Avro from “core”
> to generic Avro extensions using multiple Avro version specific adapters to
> handle breaking changes”*) as a way to move forward.
>
> We created an umbrella issue to track the progress [2] and the* first
> step* (“*Create Avro extension for Java SDK*”) of this [3] is already
> finished and merged. This new created extension (“
> *sdks/java/extensions/avro/*") replicates the same Avro support behaviour
> as it's currently implemented in Java SDK “*core*”. It required almost no
> changes for the current user API (only relaxation of access modifiers for
> several class members and methods to provide an access from other packages
> to them), so it should *not* introduce any potential breaking changes for
> users, especially if they still use the current Beam Avro's version
> (1.8.2).
>
> The *next step* will be to switch all Beam Java modules to use the new
> Avro extension instead of using the “core” Avro classes. Again, we don’t
> expect any user API breaking changes for this step.
>
> *Note*: As a price for smooth and not breakable transition, we have to
> support two equal versions of Beam Avro's code (in “*core*" and in “
> *extensions/avro*”) until the old code will be deprecated (it’s expected
> to be the *third step*). So, till this, please apply your Java SDK
> Avro-related changes (if any) in two places to keep them in sync.
>
>
> Also, please, share any of your feedback, questions, ideas or concerns on
> this topic.
>
>
> [1]
> https://docs.google.com/document/d/1tKIyTk_-HhkmVuJsxvWP5eTELESpCBe_Vmb1nJ3Ia34/edit?usp=sharing
> [2] https://github.com/apache/beam/issues/24292
> [3] https://github.com/apache/beam/issues/24293
>
> —
> Alexey
>
>
>
> On 18 Nov 2022, at 15:56, Alexey Romanenko 
> wrote:
>
> Since there are no principal objections against the proposed option 2
> (extract Avro-related code from “core” to Avro extension but keep it in
> “core” for some time because of transition period), then we will try to
> move forward and take this path.
>
> I’m pretty sure that we will face some hidden issues while working on
> this, so I’ll keep you posted =)
>
> —
> Alexey
>
> On 11 Nov 2022, at 18:05, Austin Bennett  wrote:
>
> @Moritz: I *think* should be fine, and don't have anything specific to
> offer for what might go wrong throughout the process.  :-) :shrug:
>
>
>
> On Fri, Nov 11, 2022 at 2:07 AM Moritz Mack  wrote:
>
>> Thanks a lot for the feedback so far! I can only second Alexey. It was
>> painful to come to realize that the only feasible option seems to be
>> copying a lot of code during the transition phase.
>>
>> For that reason, it will be critical to be disciplined about the removal
>> of the to-be deprecated code in core and, ahead of time, agree on when to
>> remove it again. Any thought on how long the transition phase should be?
>>
>>
>>
>>  *I am concerned of what could go wrong for users in the
>> in-between/transition state while more slowly transitioning avro to
>> extension.*
>>
>>
>>
>> @Austin Do you have any specific concern in mind here?
>>
>> To minimize this risk, we propose that all APIs should be kept as is to
>> make the migration as easy as possible and kick off with the Avro version
>> used in core. The only thing that changes will be package names.
>>
>>
>>
>> / Moritz
>>
>>
>>
>> On 10.11.22, 22:46, "Kenneth Knowles"  wrote:
>>
>>
>>
>> Thank you for writing this document. It really helps to understand the
>> options. I agree that option 2 (make a new extension and deprecate from
>> core) seems best. I think +Reuven Lax might have the most context on any
>> technical issue we will
>>
>> Thank you for writing this document. It really helps to understand the
>> options. I agree that option 2 (make a new extension and deprecate from
>> core) seems best. I think +Reuven Lax  might have the
>> most context on any technical issue we will encounter around schema codegen.
>>
>>
>>
>> Kenn
>>
>>
>>
>> On Thu, Nov 10, 2022 at 7:24 AM Alexey Romanenko <
>> aromanenko@gmail.com> wrote:
>>
>> Personally, I think that keeping two mostly identical versions of
>> Avro-related code in two different places (“core" and "extension") is rathe
>> bad practice, especially, in case of need to fix some issues there -
>> though, it’s a very low risk there since this code is quite mature and it’s
>> not touched often. On the other hand, it should give time for users
>> (several Beam releases) to update their code and use Avro from ext

Re: [DISCUSSION][JAVA] Current state of Java 17 support

2022-12-01 Thread Reuven Lax via dev
We have run into som JDK-specific issues with our use of ByteBuddy though.

On Thu, Dec 1, 2022 at 3:43 PM Luke Cwik via dev 
wrote:

> We do support JDK8, JDK11 and JDK17. Our story around newer features
> within JDKs 9+ like modules is mostly non-existent though.
>
> We rarely run into JDK specific issues, the latest were the TLS1 and
> TLS1.1 deprecation in newer patch versions of the JDK and also the docker
> cpu share issues with different JDK versions. Even though it would be nice
> to cover more, we currently have too many flaky tests and an already busy
> Jenkins cluster. I believe we would get a lot more value out of deflaking
> our existing tests and re-enabling disabled tests.
>
> I got to give credit to the JDK folks for how well they have maintained
> compatibility over the years.
>
> On Thu, Dec 1, 2022 at 9:05 AM Sachin Agarwal via dev 
> wrote:
>
>> This is a good heads up, thank you Cristian.
>>
>> On Thu, Dec 1, 2022 at 8:13 AM Cristian Constantinescu 
>> wrote:
>>
>>> Hi,
>>>
>>> I came across some Kafka info and would like to share for those
>>> unaware. Kafka is planning to drop support for Java 8 in Kafka 4 (Java
>>> 8 is deprecated in Kafka 3), see KIP-750 [1].
>>>
>>> I'm not sure when Kafka 4 is scheduled to be released (probably a few
>>> years down the road), but when it happens, KafkaIO may not be able to
>>> support it if we maintain Java 8 compatibility unless it remains on
>>> Kafka 3.
>>>
>>> Anyways, if not already done, I think it's a good idea to start
>>> putting up serious warning flags around Beam used with Java 8, even
>>> for Google cloud customers ;)
>>>
>>> Cheers,
>>> Cristian
>>>
>>> [1] https://issues.apache.org/jira/browse/KAFKA-12894
>>>
>>> On Wed, Nov 30, 2022 at 12:59 PM Kenneth Knowles 
>>> wrote:
>>> >
>>> > An important thing is to ensure that we do not accidentally depend on
>>> something that would break Java 8 support.
>>> >
>>> > Currently our Java 11 and 17 tests build the code with Java 8 (just
>>> like our released artifacts) and then compile and run the test code with
>>> the newer JDK. This roughly matches the user scenario, I think. So it is a
>>> little more complex than just having separate test runs for different JDK
>>> versions. But it would be good to make this more symmetrical between JDK
>>> versions to develop the mindset that JDK is always explicit.
>>> >
>>> > Kenn
>>> >
>>> > On Wed, Nov 30, 2022 at 9:48 AM Alexey Romanenko <
>>> aromanenko@gmail.com> wrote:
>>> >>
>>> >>
>>> >> On 30 Nov 2022, at 03:56, Tomo Suzuki via dev 
>>> wrote:
>>> >>
>>> >> > Do we still need to support Java 8 SDK?
>>> >>
>>> >> Yes, for Google Cloud customers who still use Java 8, I want Apache
>>> Beam to support Java 8. Do you observe any special burden maintaining Java
>>> 8?
>>> >>
>>> >>
>>> >> I can only think of the additional resources costs if we will test
>>> all supported JDKs, as Austin mentioned above. Imho, we should do that for
>>> all JDK that are officially supported.
>>> >> Another less-costly way is to run the Java tests for all JDKs only
>>> during the release preparation stage.
>>> >>
>>> >> I agree that it would make sense to continue to support Java 8 until
>>> a significant number of users are using it.
>>> >>
>>> >> —
>>> >> Alexey
>>> >>
>>> >>
>>> >>
>>> >> Regards,
>>> >> Tomo
>>> >>
>>> >> On Tue, Nov 29, 2022 at 21:48 Austin Bennett 
>>> wrote:
>>> >>>
>>> >>> -1 for ongoing Java8 support [ or, said another way, +1 for dropping
>>> support of Java8 ]
>>> >>>
>>> >>> +1 for having tests that run for ANY JDK that we say we support.  Is
>>> there any reason the resources to support are too costly [ or outweigh the
>>> benefits of additional confidence in ensuring we support what we say we do
>>> ]?  I am not certain on whether this would only be critical for releases,
>>> or should be done as part of regular CI.
>>> >>>
>>> >>> On Tue, Nov 29, 2022 at 8:51 AM Alexey Romanenko <
>>> aromanenko@gmail.com> wrote:
>>> 
>>>  Hello,
>>> 
>>>  I’m sorry if it’s already discussed somewhere but I find myself a
>>> little bit lost in the subject.
>>>  So, I’d like to clarify this - what is a current official state of
>>> Java 17 support at Beam?
>>> 
>>>  I recall that a great job was done to make Beam compatible with
>>> Java 17 [1] and Beam already provides “beam_java17_sdk” Docker image [2]
>>> but, iiuc, Java 8 is still the default JVM to run all Java tests on Jenkins
>>> ("Java PreCommit" in the first order) and there are only limited number of
>>> tests that are running with JDK 11 and 17 on Jenkins by dedicated jobs.
>>> 
>>>  So, my question would sound like if Beam officially supports Java
>>> 17 (and 11), do we need to run all Beam Java SDK related tests (VR and IT
>>> test including) against all supported Java SDKs?
>>> 
>>>  Do we still need to support Java 8 SDK?
>>> 
>>>  In the same time, as we are heading to move everything from Jenkins
>>> to GitHub actions, what 

Re: SchemaTransformProvider | Java class naming convention

2022-11-15 Thread Reuven Lax via dev
Out of curiosity, several IOs (including PubSub) already do support
schemas. Are you planning on modifying those?

On Tue, Nov 15, 2022 at 11:50 AM Damon Douglas via dev 
wrote:

> Hello Everyone,
>
> Do we like the following Java class naming convention for
> SchemaTransformProviders [1]?  The proposal is:
>
> (Read|Write)SchemaTransformProvider
>
>
> *For those new to Beam, even if this is your first day, consider
> yourselves a welcome contributor to this conversation.  Below are
> definitions/references and a suggested learning guide to understand this
> email.*
>
> Explanation
>
> The  identifies the Beam I/O [2] and Read or Write identifies a
> read or write Ptransform, respectively.
>
> For example, to implement a SchemaTransformProvider [1] for
> BigQueryIO.Write[7], would look like:
>
> BigQueryWriteSchemaTransformProvider
>
>
> And to implement a SchemaTransformProvider for PubSubIO.Read[8] would like
> like:
>
> PubsubReadSchemaTransformProvider
>
>
> Definitions/References
>
> [1] *SchemaTransformProvider*: A way for us to instantiate Beam IO
> transforms using a language agnostic configuration.
> SchemaTransformProvider builds a SchemaTransform[3] from a Beam Row[4] that
> functions as the configuration of that SchemaProvider.
>
> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.html
>
> [2] *Beam I/O*: PTransform for reading from or writing to sources and
> sinks.
> https://beam.apache.org/documentation/programming-guide/#pipeline-io
>
> [3] *SchemaTransform*: An interface containing a buildTransform method
> that returns a PCollectionRowTuple[5] to PCollectionRowTuple PTransform.
>
> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/schemas/transforms/SchemaTransform.html
>
> [4] *Row*: A Beam Row is a generic element of data whose properties are
> defined by a Schema[5].
>
> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/values/Row.html
>
> [5] *Schema*: A description of expected field names and their data types.
>
> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/schemas/Schema.html
>
> [6] *PCollectionRowTuple*: A grouping of Beam Rows[4] into a single
> PInput or POutput tagged by a String name.
>
> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/values/PCollectionRowTuple.html
>
> [7] *BigQueryIO.Write*: A PTransform for writing Beam elements to a
> BigQuery table.
>
> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html
>
> [8] *PubSubIO.Read*: A PTransform for reading from Pub/Sub and emitting
> message payloads into a PCollection.
>
> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.Read.html
>
> Suggested Learning/Reading to understand this email
>
> 1. https://beam.apache.org/documentation/programming-guide/#overview
> 2. https://beam.apache.org/documentation/programming-guide/#transforms
> (Up to 4.1)
> 3. https://beam.apache.org/documentation/programming-guide/#pipeline-io
> 4. https://beam.apache.org/documentation/programming-guide/#schemas
>


Re: Questions on primitive transforms hierarchy

2022-10-26 Thread Reuven Lax via dev
On Tue, Oct 25, 2022 at 5:53 AM Jan Lukavský  wrote:

> > Not quite IMO. It is a subtle difference. Perhaps these transforms can
> be *implemented* using stateful DoFn, but defining their semantics directly
> at a high level is more powerful. The higher level we can make transforms,
> the more flexibility we have in the runners. You *could* suggest that we
> take the same approach as we do with Combine: not a primitive, but a
> special transform that we optimize. You could say that "vanilla ParDo" is a
> composite that has a stateful ParDo implementation, but a runner can
> implement the composite more efficiently (without a shuffle). Same with
> CoGBK. You could say that there is a default expansion of CoGBK that uses
> stateful DoFn (which implies a shuffle) but that smart runners will not use
> that expansion.
>
> Yes, semantics > optimizations. For optimizations Beam already has a
> facility - PTransformOverride. There is no fundamental difference about how
> we treat Combine wrt GBK. It *can* be expanded using GBK, but "smart
> runners will not use that expansion". This is essentially the root of this
> discussion.
>
> If I rephrase it:
>
>  a) why do we distinguish between "some" actually composite transforms
> treating them as primitive, while others have expansions, although the
> fundamental reasoning seems the same for both (performance)?
>
>  b) is there a fundamental reason why we do not support stateful DoFn for
> merging windows?
>
Mostly because we would need the API to include a merge capability, and
that has never been implemented.


> I feel that these are related and have historical reasons, but I'd like to
> know that for sure. :)
>
>  Jan
> On 10/24/22 19:59, Kenneth Knowles wrote:
>
>
>
> On Mon, Oct 24, 2022 at 5:51 AM Jan Lukavský  wrote:
>
>> On 10/22/22 21:47, Reuven Lax via dev wrote:
>>
>> I think we stated that CoGroupbyKey was also a primitive, though in
>> practice it's implemented in terms of GroupByKey today.
>>
>> On Fri, Oct 21, 2022 at 3:05 PM Kenneth Knowles  wrote:
>>
>>>
>>>
>>> On Fri, Oct 21, 2022 at 5:24 AM Jan Lukavský  wrote:
>>>
>>>> Hi,
>>>>
>>>> I have some missing pieces in my understanding of the set of Beam's
>>>> primitive transforms, which I'd like to fill. First a quick recap of what I
>>>> think is the current state. We have (basically) the following primitive
>>>> transforms:
>>>>
>>>>  - DoFn (stateless, stateful, splittable)
>>>>
>>>>  - Window
>>>>
>>>>  - Impulse
>>>>
>>>>  - GroupByKey
>>>>
>>>>  - Combine
>>>>
>>>
>>> Not a primitive, just a well-defined transform that runners can execute
>>> in special ways.
>>>
>> Yep, OK, agree. Performance is orthogonal to semantics.
>>
>>
>>>
>>>>
>>>>
>>>>  - Flatten (pCollections)
>>>>
>>>
>>> The rest, yes.
>>>
>>>
>>>
>>>> Inside runners, we most often transform GBK into ReduceFn
>>>> (ReduceFnRunner), which does the actual logic for both GBK and stateful
>>>> DoFn.
>>>>
>>>
>>> ReduceFnRunner is for windowing / triggers and has special feature to
>>> use a CombineFn while doing it. Nothing to do with stateful DoFn.
>>>
>> My bad, wrong wording. The point was that *all* of the semantics of GBK
>> and Combine can be defined in terms of stateful DoFn. There are some
>> changes needed to stateful DoFn to support the Combine functionality. But
>> as mentioned above - optimization is orthogonal to semantics.
>>
>
> Not quite IMO. It is a subtle difference. Perhaps these transforms can be
> *implemented* using stateful DoFn, but defining their semantics directly at
> a high level is more powerful. The higher level we can make transforms, the
> more flexibility we have in the runners. You *could* suggest that we take
> the same approach as we do with Combine: not a primitive, but a special
> transform that we optimize. You could say that "vanilla ParDo" is a
> composite that has a stateful ParDo implementation, but a runner can
> implement the composite more efficiently (without a shuffle). Same with
> CoGBK. You could say that there is a default expansion of CoGBK that uses
> stateful DoFn (which implies a shuffle) but that smart runners will not use
> that expansion.
>
>>
>>>
>>>
>>>> I'll compare this to the se

Re: Questions on primitive transforms hierarchy

2022-10-24 Thread Reuven Lax via dev
On Mon, Oct 24, 2022 at 5:50 AM Jan Lukavský  wrote:

> On 10/22/22 21:47, Reuven Lax via dev wrote:
>
> I think we stated that CoGroupbyKey was also a primitive, though in
> practice it's implemented in terms of GroupByKey today.
>
> On Fri, Oct 21, 2022 at 3:05 PM Kenneth Knowles  wrote:
>
>>
>>
>> On Fri, Oct 21, 2022 at 5:24 AM Jan Lukavský  wrote:
>>
>>> Hi,
>>>
>>> I have some missing pieces in my understanding of the set of Beam's
>>> primitive transforms, which I'd like to fill. First a quick recap of what I
>>> think is the current state. We have (basically) the following primitive
>>> transforms:
>>>
>>>  - DoFn (stateless, stateful, splittable)
>>>
>>>  - Window
>>>
>>>  - Impulse
>>>
>>>  - GroupByKey
>>>
>>>  - Combine
>>>
>>
>> Not a primitive, just a well-defined transform that runners can execute
>> in special ways.
>>
> Yep, OK, agree. Performance is orthogonal to semantics.
>
>
>>
>>>
>>>
>>>  - Flatten (pCollections)
>>>
>>
>> The rest, yes.
>>
>>
>>
>>> Inside runners, we most often transform GBK into ReduceFn
>>> (ReduceFnRunner), which does the actual logic for both GBK and stateful
>>> DoFn.
>>>
>>
>> ReduceFnRunner is for windowing / triggers and has special feature to use
>> a CombineFn while doing it. Nothing to do with stateful DoFn.
>>
> My bad, wrong wording. The point was that *all* of the semantics of GBK
> and Combine can be defined in terms of stateful DoFn. There are some
> changes needed to stateful DoFn to support the Combine functionality. But
> as mentioned above - optimization is orthogonal to semantics.
>

Yes, though we would need Multimap state to do it properly, which isn't yet
available on all runners. (You could model it _very_ inefficiently with
BagState, but that would be quite bad)


>
>>
>>
>>> I'll compare this to the set of transforms we used to use in Euphoria
>>> (currently java SDK extension):
>>>
>>>  - FlatMap ~~ stateless DoFn
>>>
>>>  - Union ~~ Flatten
>>>
>>>  - ReduceStateByKey ~~ stateful DoFn, GBK, Combine, Window
>>>
>>
>> Stateful DoFn does not require associative or commutative operation,
>> while reduce/combine does. Windowing is really just a secondary key for
>> GBK/Combine that allows completion of unbounded aggregations but has no
>> computation associated with it.
>>
> Merging WindowFn contains some computation. The fact that stateful DoFn do
> not require specific form of reduce function is precisely what makes it the
> actual primitive, no?
>
>
>>
>>
>>>  - (missing Impulse)
>>>
>>
>> Then you must have some primitive sources with splitting?
>>
>>
>>>  - (missing splittable DoFn)
>>>
>>
>> Kind of the same question - SDF is the one and only primitive that
>> creates parallelism.
>>
> Original Euphoria had an analogy to (Un)boundedReader. The SDK extension
> in Beam works on top of PCollecions and therefore does not deal with IOs.
>
>
>> The ReduceStateByKey is a transform that is a "combinable stateful DoFn"
>>> - i.e. the state might be created pre-shuffle, on trigger the state is
>>> shuffled and then merged. In Beam we already have CombiningState and
>>> MergingState facility (sort of), which is what is needed, we just do not
>>> have the ability to shuffle the partial states and then combine them. This
>>> also relates to the inability to run stateful DoFn for merging windowFns,
>>> because that is needed there as well. Is this something that is
>>> fundamentally impossible to define for all runners? What is worth noting is
>>> that building, shuffling and merging the state before shuffle requires
>>> compatible trigger (purely based on watermark), otherwise the transform
>>> fall-backs to "classical DoFn".
>>>
>>
>> Stateful DoFn for merging windows can be defined. You could require all
>> state to be mergeable and then it is automatic. Or you could have an
>> "onMerge" callback. These should both be fine. The automatic version is
>> less likely to have nonsensical semantics, but allowing the callback to do
>> "whatever it wants" whether the result is good or not is more consistent
>> with the design of stateful DoFn.
>>
> Yes, but this is the same for CombineFn, right? The merge (or combi

Re: Questions on primitive transforms hierarchy

2022-10-22 Thread Reuven Lax via dev
I think we stated that CoGroupbyKey was also a primitive, though in
practice it's implemented in terms of GroupByKey today.

On Fri, Oct 21, 2022 at 3:05 PM Kenneth Knowles  wrote:

>
>
> On Fri, Oct 21, 2022 at 5:24 AM Jan Lukavský  wrote:
>
>> Hi,
>>
>> I have some missing pieces in my understanding of the set of Beam's
>> primitive transforms, which I'd like to fill. First a quick recap of what I
>> think is the current state. We have (basically) the following primitive
>> transforms:
>>
>>  - DoFn (stateless, stateful, splittable)
>>
>>  - Window
>>
>>  - Impulse
>>
>>  - GroupByKey
>>
>>  - Combine
>>
>
> Not a primitive, just a well-defined transform that runners can execute in
> special ways.
>
>
>>
>>
>>  - Flatten (pCollections)
>>
>
> The rest, yes.
>
>
>
>> Inside runners, we most often transform GBK into ReduceFn
>> (ReduceFnRunner), which does the actual logic for both GBK and stateful
>> DoFn.
>>
>
> ReduceFnRunner is for windowing / triggers and has special feature to use
> a CombineFn while doing it. Nothing to do with stateful DoFn.
>
>
>
>> I'll compare this to the set of transforms we used to use in Euphoria
>> (currently java SDK extension):
>>
>>  - FlatMap ~~ stateless DoFn
>>
>>  - Union ~~ Flatten
>>
>>  - ReduceStateByKey ~~ stateful DoFn, GBK, Combine, Window
>>
>
> Stateful DoFn does not require associative or commutative operation, while
> reduce/combine does. Windowing is really just a secondary key for
> GBK/Combine that allows completion of unbounded aggregations but has no
> computation associated with it.
>
>
>
>>  - (missing Impulse)
>>
>
> Then you must have some primitive sources with splitting?
>
>
>>  - (missing splittable DoFn)
>>
>
> Kind of the same question - SDF is the one and only primitive that creates
> parallelism.
>
> The ReduceStateByKey is a transform that is a "combinable stateful DoFn" -
>> i.e. the state might be created pre-shuffle, on trigger the state is
>> shuffled and then merged. In Beam we already have CombiningState and
>> MergingState facility (sort of), which is what is needed, we just do not
>> have the ability to shuffle the partial states and then combine them. This
>> also relates to the inability to run stateful DoFn for merging windowFns,
>> because that is needed there as well. Is this something that is
>> fundamentally impossible to define for all runners? What is worth noting is
>> that building, shuffling and merging the state before shuffle requires
>> compatible trigger (purely based on watermark), otherwise the transform
>> fall-backs to "classical DoFn".
>>
>
> Stateful DoFn for merging windows can be defined. You could require all
> state to be mergeable and then it is automatic. Or you could have an
> "onMerge" callback. These should both be fine. The automatic version is
> less likely to have nonsensical semantics, but allowing the callback to do
> "whatever it wants" whether the result is good or not is more consistent
> with the design of stateful DoFn.
>
> Whether and where a shuffle takes place may vary. Start with the maths.
>
> Kenn
>
>
>> Bottom line: I'm thinking of proposing to drop Euphoria extension,
>> because it has essentially no users and actually no maintainers, but I have
>> a feeling there is a value in the set of operators that could be
>> transferred to Beam core, maybe. I'm pretty sure it would bring value to
>> users to have access to a "combining stateful DoFn" primitive (even better
>> would be "combining splittable DoFn").
>>
>> Looking forward to any comments on this.
>>
>>  Jan
>>
>>
>>


Re: [PROPOSAL] Re-enable checkerframework by default

2022-10-21 Thread Reuven Lax via dev
+1

This happens to me regularly. It fails on Jenkins but succeeds on my
machine, and it's hard to figure out why (since all you see on Jenkins is a
compile error). Then I'm always trying to remember how to enable it
locally. IMO development would be faster if this was enabled locally.
Anyone who doesn't like it can always disable it for their local compiles.

Reuven

On Fri, Oct 21, 2022 at 8:38 AM Alexey Romanenko 
wrote:

> +1 to make it “on" by default with mentioning that on Contribution Guide.
>
> I recall for one PR that it took me some time to realise why it was
> failing on Jenkins and not locally because of this different behaviour.
>
> —
> Alexey
>
> > On 20 Oct 2022, at 00:51, Kenneth Knowles  wrote:
> >
> > Hi all,
> >
> > Some time ago we turned off checker framework locally by default, and
> only turn it on with `-PenableCheckerFramework` and also on Jenkins.
> >
> > My opinion is that this causes more headache than it solves, by delaying
> finding out about errors. The increased compilation time of
> checkerframework is real. But during iteration almost every step of a
> compile is cached so it only matters specifically for :sdks:java:core. My
> take is that anyone editing that is probably experienced enough with Beam
> to know they can turn it off. So I propose we turn it on by default, with
> the option to disable it.
> >
> > Kenn
>
>


Re: Staging a PCollection in Beam | Dataflow Runner

2022-10-19 Thread Reuven Lax via dev
PCollections's usually are persistent within a pipeline, so you can reuse
them in other parts of a pipeline with no problem.

There is no notion of state across pipelines - every pipeline is
independent. If you want state across pipelines you can write the
PCollection out to a set of files which are read back in in the new
pipeline.

On Tue, Oct 18, 2022 at 11:45 PM Ravi Kapoor  wrote:

> Hi Team,
> Can we stage a PCollection or  PCollection data? Lets say
> to save  the expensive operations between two complex BQ tables time and
> again and materialize it in some temp view which will be deleted after the
> session.
>
> Is it possible to do that in the Beam Pipeline?
> We can later use the temp view in another pipeline to read the data from
> and do processing.
>
> Or In general I would like to know Do we ever stage the PCollection.
> Let's say I want to create another instance of the same job which has
> complex processing.
> Does the pipeline re perform the computation or would it pick the already
> processed data in the previous instance that must be staged somewhere?
>
> Like in spark we do have notions of createOrReplaceTempView which is used
> to create temp table from a spark dataframe or dataset.
>
> Please advise.
>
> --
> Thanks,
> Ravi Kapoor
> +91-9818764564 <+91%2098187%2064564>
> kapoorrav...@gmail.com
>


Re: Inclusive terminology: "Sickbay" ==> "Disabled test"

2022-10-17 Thread Reuven Lax via dev
To add to this, I think one reason originally for using "sickbay" was to
emphasize that this should be temporary. Removing tests from pre/post
commits permanently is a bad state to be in - at that point why even have
the test? Ideally if a test is extremely flaky, fixing that is highly
prioritized.

That being said, I'm not sure this worked out very well.

On Mon, Oct 17, 2022 at 12:47 PM Yi Hu via dev  wrote:

> (From someone who received education not in English and from another
> field). I first assumed this is some terminology just like "flaky", "smock
> test" because I came from another world and did not know most of the terms
> in this field. That said, I just assumed it was another term used in the
> field and just acknowledged that.
>
> Just note that these "sickbayed" tests are not disabled from running.
> There are disabled test suites on https://ci-beam.apache.org/ having gray
> "🚫 " signs. Those tests are still running on schedule but we "expect"
> they fail / known to fail. If we really want to change. Maybe we could use
> "broken test", "unstable test" or something like that to make distinction
> to the disabled test suites we have.
>
> Best,
> Yi
>
> On Mon, Oct 17, 2022 at 3:26 PM Danny McCormick via dev <
> dev@beam.apache.org> wrote:
>
>> I'm +1 on this, sickbay was a new term for me when I joined the project.
>> One thing I will note: we still have plenty of sickbay references in our
>> code itself - https://github.com/apache/beam/search?p=1&q=sickbay - if
>> we decide to take this forward we should create an issue to remove those
>> (with the "good first issue" label).
>>
>> > By the way, is there any known reason not to have spaces in GitHub
>> Issues tags?
>>
>> Generally, no. GitHub gives you a label with spaces by default when you
>> create a new repo ("good first issue") and we already have multiple labels
>> with spaces ("awaiting triage", "good first issue", there are probably
>> more).
>>
>> I personally slightly prefer dashes because spaces make queries
>> 
>> a little less clean since you need to quote the label (e.g. `is:open
>> is:issue *label:"disabled test"*` instead of is:open is:issue
>> *label:disabled-test*), but that is not a widely accepted standard.
>>
>> On Mon, Oct 17, 2022 at 3:10 PM Kenneth Knowles  wrote:
>>
>>> Hi all,
>>>
>>> I have gotten a lot of questions from people like "what is sickbay?"
>>>
>>> Because I am a Star Trek enthusiast I easily understood that if I
>>> "sickbay the test" means to disable it temporarily. And people on my team
>>> are used to this terminology. But this is not all people :-) and there are
>>> many name conflicts with products too.
>>>
>>> So I have edited the GitHub Issues tag "sickbay" to be "disabled test"
>>> and I suggest we use this term everywhere.
>>>
>>> By the way, is there any known reason not to have spaces in GitHub
>>> Issues tags?
>>>
>>> Kenn
>>>
>>


Re: Using unbounded source as a side input for a DoFn

2022-07-20 Thread Reuven Lax via dev
How do you want to use the side input?

On Wed, Jul 20, 2022 at 10:45 PM Sahil Modak 
wrote:

> Hi,
>
> We are looking to use the side input feature for one of our DoFns. The
> side input has to be a PCollection which is being constructed from a
> subscription using PubsubIO.read
>
> We want our primary DoFn which operates in a global window KV pair to
> access this side input.
> The goal is to have all the messages of this unbounded source (side input)
> to be available across all the KV pairs in our input DoFn which will use
> this side input.
>
> Is it possible to have an unbounded source (like pubsub) as a side input?
>
> Thanks,
> Sahil
>


Re: [ANNOUNCE] New committer: Steven Niemitz

2022-07-19 Thread Reuven Lax via dev
Welcome Steve!

On Tue, Jul 19, 2022 at 1:05 PM Connell O'Callaghan via dev <
dev@beam.apache.org> wrote:

>
> +++1 Woohoo! Congratulations Steven (and to the BEAM community) on this
> announcement!!!
>
> Thank you Luke for this update
>
>
> On Tue, Jul 19, 2022 at 12:34 PM Robert Burke  wrote:
>
>> Woohoo! Welcome and congratulations Steven!
>>
>> On Tue, Jul 19, 2022, 12:40 PM Luke Cwik via dev 
>> wrote:
>>
>>> Hi all,
>>>
>>> Please join me and the rest of the Beam PMC in welcoming a new committer:
>>> Steven Niemitz (sniemitz@)
>>>
>>> Steven started contributing to Beam in 2017 fixing bugs and improving
>>> logging and usability. Stevens most recent focus has been on performance
>>> optimizations within the Java SDK.
>>>
>>> Considering the time span and number of contributions, the Beam PMC
>>> trusts Steven with the responsibilities of a Beam committer. [1]
>>>
>>> Thank you Steven! And we are looking to see more of your contributions!
>>>
>>> Luke, on behalf of the Apache Beam PMC
>>>
>>> [1] https://beam.apache.org/contribute/become-a-committer
>>> /#an-apache-beam-committer
>>>
>>


Re: ClassNotFoundException when using Java external transforms in a Java job

2022-07-09 Thread Reuven Lax via dev
This generally means you have a CLASSPATH problem - either the JAR
containing that class isn't ending up being linked in, or the wrong version
of the JAR is.

On Fri, Jul 8, 2022 at 2:12 PM Sahith Nallapareddy via dev <
dev@beam.apache.org> wrote:

> Hello,
>
> I sent an email sometime ago about java external transforms in a java job.
> Seems like things are for the most part working, but we have run into this
> error that I have attached. At the base cause it says
> [debug] Caused by: java.lang.ClassNotFoundException:
> com.spotify.ingestion.recentactivity.RecentActivityFeature
> [debug] at
> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
> [debug] at
> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
> [debug] at
> java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
> [debug] at java.base/java.lang.Class.forName0(Native Method)
>
> for a class used in the external transform. We find that it seems to be
> related to this code isJavaSDKCompatible method
> 
>  and
> maybe this PR . We
> monkey patched the file and made the method return false, which seemed to
> get further. That was just us messing around to see what happens and maybe
> runs into more issues down the line. Any guidance here on what is going on
> and how we might be able to fix this?
>
> Thanks,
>
> Sahith
>
>