On 6/11/20 5:02 PM, Reuven Lax wrote:
On Thu, Jun 11, 2020 at 1:26 AM Jan Lukavský <je...@seznam.cz
<mailto:je...@seznam.cz>> wrote:
Hi,
I'd propose the following:
- delete all DoFnSignatures.{usesState,usesTimers,...} helpers
*except* for DoFnSignatures.isStateful
Why? Actually seems like maybe the opposite is better? (remove
isStateful and keep the others). There are cases where it might be
useful to know if just timers are used.
I had a feeling there is consensus in this thread to keep only (the
non-static) DoFnSignature#usesState(), DoFnSignature#usesTimers(), etc.
The isStateful is needed, because (as mentioned several times here as
well), a DoFn might require being run as stateful despite it contains no
user state or timers (but is annotated as @RequiresTimeSortedInput,
which implies statefulness).
- DoFnSignatures.isStateful would be equal to
'signature.usesState() || signature.usesTimers() ||
signature.processElement().requiresTimeSortedInput()'
requiresTimeSortedInput does not imply isStateful in general - that
seems like a runner-dependent thing.
- fix all _relevant_ places in all runners where are currently
checks for statefulness like
'doFnSignature.stateDeclarations().size() > 0' or
'doFnSignature.usesState()', but with sematics
'DoFnSignatures.isStateful()`
WDYT?
On 5/31/20 2:27 PM, Jan Lukavský wrote:
On 5/30/20 5:39 AM, Kenneth Knowles wrote:
Agree to delete them, though for different reasons. I think this
code comes from a desire to have methods that can be called on a
DoFn directly. And from reviewing the code history I think they
are copied in from another class. So that's why they are the way
they are. Accepting a DoFnSignature would be more appropriate to
the "plural-class-name companion class" pattern. But I doubt the
perf impact of this is ever measurable, and of course not
relative to a big data processing job. If we really wanted the
current API, a cache is trivial, but also not important so we
shouldn't add one.
Reason I think they should be deleted:
1. They seem to exist as a shortcut to people don't forget to
call both DoFnSignatures#usesState and DoFnSignatures#usesTimers
[1]. But now if another relevant method is added, the new method
doesn't include it, so the problem of not forgetting to call all
relevant methods is not solved.
There are multiple ways runners test for "statefulness" of a
DoFn. Some use DoFnSignature#usesState(), some
DoFnSignatures#usesState(), some DoFnSignatures#isStateful() and
some even DoFnSignature.stateDeclarations() > 0. Having so many
ways for a simple check that DoFn needs to be executed as a
stateful seems to be suboptimal.
I don't see anything weird on definition of "stateful dofn",
which is any DoFn, that has the following requirements:
a) is keyed
b) requires shuffling same keys to same workers
c) requires support for both state and timers
2. They seem incorrect [2]. Just because something requires time
sorted input *does not* mean it uses bag state.
Yes, this is unfortunate. What makes the DoFn use bag state is
"when the runner executes the DoFn using default expansion". I
agree this is not the same, but the correct solution seems again
routed to the discussion about pipeline requirements vs. runner
capabilities vs. default and overridden expansions. It would be
better to use the standard expansion mechanism, but AFAIK it is
not possible currently, because it is not possible to simply wrap
two stateful dofns one inside another (that would require dynamic
states).
Jan
Kenn
[1]
https://github.com/apache/beam/blob/dba5f2b9d8625a3be3dae026858ecacf20947616/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java#L2432
[2]
https://github.com/apache/beam/blob/dba5f2b9d8625a3be3dae026858ecacf20947616/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java#L2449
On Fri, May 29, 2020 at 8:46 AM Luke Cwik <lc...@google.com
<mailto:lc...@google.com>> wrote:
To go back to your original question.
I would remove the static convenience methods in
DoFnSignatures since they construct a DoFnSignature and then
throw it away. This construction is pretty involved, nothing
as large as an IO call but it would become noticeable if it
was abused. We can already see that it is being used
multiple times in a row [1, 2].
Runners should create their own derived properties based
upon knowledge of how they are implemented and we shouldn't
create derived properties for different concepts (e.g.
merging isStateful and @RequiresTimeSortedInput). If there
is a common implementation that is shared across
multiple runners, it could "translate" a DoFnSignature based
upon how it is implemented and/or define its own thing.
1:
https://github.com/apache/beam/blob/0addd1f08a2e3f424199c1054c06f363bb77a019/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java#L61
2:
https://github.com/apache/beam/blob/0addd1f08a2e3f424199c1054c06f363bb77a019/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java#L73
On Wed, May 27, 2020 at 3:16 AM Jan Lukavský
<je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
Right, this might be about a definition of what these
methods really should return. Currently, the most
visible issue is [1]. When a DoFn has no state or timer,
but is annotated with @RequiresTimeSortedInput this
annotation is silently ignored, because
DoFnSignature#usesState returns false and the ParDo is
executed as stateless.
I agree that there are two points - what user declares
and what runner effectively needs to execute a DoFn.
Another complication to this is that what runner needs
might depend not only on the DoFn itself, but on other
conditions - e.g. RequiresTimeSortedInput does not
require any state or timer in bounded case, when runner
can presort the data. There might be additional inputs
to this decision as well.
I don't quite agree that DoFnSignature#isStateful is a
bad name - when a DoFn has only timer and no state, it
is still stateful, although usesState should return
false. Or we would have to declare timer a state, which
would be even more confusing (although it might be
technically correct).
[1] https://issues.apache.org/jira/browse/BEAM-10072
On 5/27/20 1:21 AM, Luke Cwik wrote:
I believe DoFnSignature#isStateful is remnant of a bad
API name choice and was renamed to usesState. I would
remove DoFnSignature#isStateful as it does not seem to
be used anywhere.
Does DoFnSignatures#usesValueState return true if the
DoFn says it needs @RequiresTimeSortedInput because of
how a DoFn is being "wrapped" with a stateful DoFn that
provides the time sorting functionality?
That doesn't seem right since I would have always
expected that DoFnSignature(s) should be about the DoFn
passed in and not about the implementation details that
a runner might be using in how it
provides @RequiresTimeSortedInput.
(similarly for
DoFnSignatures#usesBagState, DoFnSignatures#usesWatermarkHold,
DoFnSignatures#usesTimers, DoFnSignatures#usesState)
On Mon, May 25, 2020 at 2:31 AM Jan Lukavský
<je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
Hi,
I have come across issue with multiple way of
getting a meaningful flags
for DoFns. We have
a)
DoFnSignature#{usesState,usesTimers,isStateful,...},
and
b)
DoFnSignatures#{usesState,usesTimers,isStateful,...}
These two might not (and actually are not) aligned
with each other. That
can be solved quite easily (removing any logic from
DoFnSignatures and
put it to DoFnSignature), but what I'm not sure is why
DoFnSignature#isStateful is deprecated in favor of
DoFnSignature#usesState. In my understanding, it
should hold that
`isStateful() iff usesState() || usesTimers()`,
which means these two
should not be used interchangeably. I'd suggest to
undeprecate the
`DoFnSignature#isStateful` and align the various
(static and non-static)
versions of these calls.
Thoughts?
Jan