As the Go SDK has recently achieved Process continuations for 2.40, the Go
Perspective is that Periodic Impulse seems like its unbounded by definition.

Bounded PCollections generally imply: all configured data is available
*now* and could be consumed & processed in a single bundle (or multiple if
the Element can be split via SDF).

Unbounded is that splitting across the processing time dimension and
requires different mechanisms.

It's unclear to me how a transform that can be configured "emit something
once every 10seconds, for 5 minutes" is useful if it's bounded. It's a DoFn
that waits 5 minutes to get any aggregation work done.

(But i also should read the design doc.)


On Thu, May 19, 2022, 4:31 PM Yi Hu <[email protected]> wrote:

> Dear Cham,
>
> Thanks for the feedbacks. Java PeriodicImpulse indeed returns an unbounded
> PColl. This can be verified by adding an assert
> `assertEquals(PCollection.IsBounded.UNBOUNDED, result.isBounded());`
> at the unit test here:
>
> https://github.com/apache/beam/blob/f3041e078643abe4f7608a7a11459f81b0d20b3f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PeriodicImpulseTest.java#L71
>
> Then run:
>
> ./gradlew  :runners:direct-java:needsRunnerTests --tests
> org.apache.beam.sdk.transforms.PeriodicImpulseTest
>
> The test passes. (If asserting BOUNDED, an assertion error happens:
> java.lang.AssertionError: expected:<BOUNDED> but was:<UNBOUNDED>)
>
> The unbounded pcoll is a result of return type of DoFn.ProcessElement here:
>
> https://github.com/apache/beam/blob/f3041e078643abe4f7608a7a11459f81b0d20b3f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java#L182
>
> The behavior is documented here:
>
> https://github.com/apache/beam/blob/6390bcd265512f077c92124a551419ee0349c84c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java#L877
> "If ProcessElement returns DoFn.ProcessContinuation, assume it is
> unbounded."
>
> Best,
> Yi
>
> On Thu, May 19, 2022 at 6:38 PM Chamikara Jayalath <[email protected]>
> wrote:
>
>>
>>
>> On Wed, May 18, 2022 at 12:32 PM Yi Hu <[email protected]> wrote:
>>
>>> Hi dev group,
>>>
>>> TL;DR: PeriodicImpluse transform in Java sdk generates unbounded
>>> PCollection; while in Python sdk it generates bounded PCollection. The
>>> latter case may cause issues in streaming.
>>>
>>> Thank you for your attention. There are periodic impulse transforms in
>>> both Java and Python sdk, implemented in a quite similar way. However, one
>>> difference is that Java PeriodicImpluse generates an *unbounded*
>>> PCollection of time series, while the PCollection generated by
>>> Python PeriodicImpluse is *bounded*.
>>>
>>> This difference has generated an issue when downstream PTransform
>>> contains a ReShuffle op in it. The GBK will hold inflow elements and fire
>>> it like a batch pipeline, regardless of the window settings.
>>>
>>> Here is a working example. There are no printed values in the console.
>>> But commenting out Reshuffle() the timestamps then get printed
>>> continuously. (Removing apply_windowing or streaming flags the behavior is
>>> the same).
>>>
>>> ```
>>> import apache_beam as beam
>>> from apache_beam.options.pipeline_options import PipelineOptions
>>> from apache_beam.transforms.periodicsequence import PeriodicImpulse
>>> from apache_beam.transforms.util import Reshuffle
>>>
>>>
>>> if __name__ == '__main__':
>>>   pipeline_options = PipelineOptions(flags=['--streaming'])
>>>
>>>   with beam.Pipeline(options=pipeline_options) as p:
>>>     result = (
>>>         p
>>>         | PeriodicImpulse(fire_interval=1, apply_windowing=True)
>>>         | Reshuffle() # commenting out here the timestamps get printed
>>>         | beam.Map(print)
>>>     )
>>>
>>> ```
>>>
>>> Having looking the background I see the design doc of BEAM-9650 (
>>> https://docs.google.com/document/d/1LDY_CtsOJ8Y_zNv1QtkP6AGFrtzkj1q5EW_gSChOIvg/edit#heading=h.iluf68hiykcs)
>>> and learned that the Python implementation was originally designed in the
>>> context of slowly changing sources, which itself is "bounded" during short
>>> time (like days) but can change. On the other hand, In Java sdk, the
>>> unbounded PCollection is a result of ProcessContinuation return type of
>>> DoFn's ProcessElement method. In Python this is not set thus the output
>>> PTransform is inferred bounded.
>>>
>>>
>> That design doc I believe was both for Java and Python so there should be
>> behavior differences between Java and Python implementations. Seems like
>> Java implementation is bounded as well ?
>>
>> https://github.com/apache/beam/blob/e9cfd8e441017085c9e9064a4a8cdd3576e3da43/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicImpulse.java#L69
>>
>>
>>> Now the question is,
>>> (1) should we change the python implementation to generates unbounded
>>> PCollection in alignment with Java
>>> (2) If yes (or no), should we introduce a parameter to set the
>>> boundedness of PeriodicImpulse, however this seems not aligned with current
>>> Beam sources where boundedness is a final property for a specific source.
>>>
>>
>> I think you are trying to use this to implement watchForNewFiles, right ?
>> If so,  "PeriodicImpulse" might not be the correct thing to base this on.
>> We might actually need to develop a Watch transform (SDF) for the Python
>> SDK.
>>
>> Thanks,
>> Cham
>>
>>
>>> Regards,
>>> Yi
>>>
>>> --
>>>
>>> Yi Hu, (he/him/his)
>>>
>>> Software Engineer
>>>
>>>
>>>

Reply via email to