The doc (which wasn't linked in https://s.apache.org/beam-design-docs, but
is now) doesn't actually definitely that PeriodicImpulse is bounded or not.
Only that conceptually it can trigger what would be a bounded read
pre-wrapped in an appropriate fixed window.

This combined with ProcessContinuations implying unbounded, leads me to the
conclusion that PeriodicImpulse must be producing Unbounded PCollections.

On Thu, May 19, 2022, 4:44 PM Chamikara Jayalath <[email protected]>
wrote:

>
>
> On Thu, May 19, 2022 at 4:30 PM Yi Hu <[email protected]> wrote:
>
>> Dear Cham,
>>
>> Thanks for the feedbacks. Java PeriodicImpulse indeed returns an
>> unbounded PColl. This can be verified by adding an assert
>> `assertEquals(PCollection.IsBounded.UNBOUNDED, result.isBounded());`
>> at the unit test here:
>>
>> https://github.com/apache/beam/blob/f3041e078643abe4f7608a7a11459f81b0d20b3f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PeriodicImpulseTest.java#L71
>>
>> Then run:
>>
>> ./gradlew  :runners:direct-java:needsRunnerTests --tests
>> org.apache.beam.sdk.transforms.PeriodicImpulseTest
>>
>> The test passes. (If asserting BOUNDED, an assertion error happens:
>> java.lang.AssertionError: expected:<BOUNDED> but was:<UNBOUNDED>)
>>
>> The unbounded pcoll is a result of return type of DoFn.ProcessElement
>> here:
>>
>> https://github.com/apache/beam/blob/f3041e078643abe4f7608a7a11459f81b0d20b3f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java#L182
>>
>> The behavior is documented here:
>>
>> https://github.com/apache/beam/blob/6390bcd265512f077c92124a551419ee0349c84c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java#L877
>> "If ProcessElement returns DoFn.ProcessContinuation, assume it is
>> unbounded."
>>
>
> Potentially current Java behavior is incorrect ? Reading the original
> document, it seems like the use-case for PeriodicImpulse was to support
> reading a bounded side-input that is consistent within a given window.
>
> Thanks,
> Cham
>
>
>>
>> Best,
>> Yi
>>
>> On Thu, May 19, 2022 at 6:38 PM Chamikara Jayalath <[email protected]>
>> wrote:
>>
>>>
>>>
>>> On Wed, May 18, 2022 at 12:32 PM Yi Hu <[email protected]> wrote:
>>>
>>>> Hi dev group,
>>>>
>>>> TL;DR: PeriodicImpluse transform in Java sdk generates unbounded
>>>> PCollection; while in Python sdk it generates bounded PCollection. The
>>>> latter case may cause issues in streaming.
>>>>
>>>> Thank you for your attention. There are periodic impulse transforms in
>>>> both Java and Python sdk, implemented in a quite similar way. However, one
>>>> difference is that Java PeriodicImpluse generates an *unbounded*
>>>> PCollection of time series, while the PCollection generated by
>>>> Python PeriodicImpluse is *bounded*.
>>>>
>>>> This difference has generated an issue when downstream PTransform
>>>> contains a ReShuffle op in it. The GBK will hold inflow elements and fire
>>>> it like a batch pipeline, regardless of the window settings.
>>>>
>>>> Here is a working example. There are no printed values in the console.
>>>> But commenting out Reshuffle() the timestamps then get printed
>>>> continuously. (Removing apply_windowing or streaming flags the behavior is
>>>> the same).
>>>>
>>>> ```
>>>> import apache_beam as beam
>>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>> from apache_beam.transforms.periodicsequence import PeriodicImpulse
>>>> from apache_beam.transforms.util import Reshuffle
>>>>
>>>>
>>>> if __name__ == '__main__':
>>>>   pipeline_options = PipelineOptions(flags=['--streaming'])
>>>>
>>>>   with beam.Pipeline(options=pipeline_options) as p:
>>>>     result = (
>>>>         p
>>>>         | PeriodicImpulse(fire_interval=1, apply_windowing=True)
>>>>         | Reshuffle() # commenting out here the timestamps get printed
>>>>         | beam.Map(print)
>>>>     )
>>>>
>>>> ```
>>>>
>>>> Having looking the background I see the design doc of BEAM-9650 (
>>>> https://docs.google.com/document/d/1LDY_CtsOJ8Y_zNv1QtkP6AGFrtzkj1q5EW_gSChOIvg/edit#heading=h.iluf68hiykcs)
>>>> and learned that the Python implementation was originally designed in the
>>>> context of slowly changing sources, which itself is "bounded" during short
>>>> time (like days) but can change. On the other hand, In Java sdk, the
>>>> unbounded PCollection is a result of ProcessContinuation return type of
>>>> DoFn's ProcessElement method. In Python this is not set thus the output
>>>> PTransform is inferred bounded.
>>>>
>>>>
>>> That design doc I believe was both for Java and Python so there should
>>> be behavior differences between Java and Python implementations. Seems like
>>> Java implementation is bounded as well ?
>>>
>>> https://github.com/apache/beam/blob/e9cfd8e441017085c9e9064a4a8cdd3576e3da43/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicImpulse.java#L69
>>>
>>>
>>>> Now the question is,
>>>> (1) should we change the python implementation to generates unbounded
>>>> PCollection in alignment with Java
>>>> (2) If yes (or no), should we introduce a parameter to set the
>>>> boundedness of PeriodicImpulse, however this seems not aligned with current
>>>> Beam sources where boundedness is a final property for a specific source.
>>>>
>>>
>>> I think you are trying to use this to implement watchForNewFiles, right
>>> ? If so,  "PeriodicImpulse" might not be the correct thing to base this on.
>>> We might actually need to develop a Watch transform (SDF) for the Python
>>> SDK.
>>>
>>> Thanks,
>>> Cham
>>>
>>>
>>>> Regards,
>>>> Yi
>>>>
>>>> --
>>>>
>>>> Yi Hu, (he/him/his)
>>>>
>>>> Software Engineer
>>>>
>>>>
>>>>

Reply via email to