As the Go SDK has recently achieved Process continuations for 2.40, the Go Perspective is that Periodic Impulse seems like its unbounded by definition.
Bounded PCollections generally imply: all configured data is available *now* and could be consumed & processed in a single bundle (or multiple if the Element can be split via SDF). Unbounded is that splitting across the processing time dimension and requires different mechanisms. It's unclear to me how a transform that can be configured "emit something once every 10seconds, for 5 minutes" is useful if it's bounded. It's a DoFn that waits 5 minutes to get any aggregation work done. (But i also should read the design doc.) On Thu, May 19, 2022, 4:31 PM Yi Hu <[email protected]> wrote: > Dear Cham, > > Thanks for the feedbacks. Java PeriodicImpulse indeed returns an unbounded > PColl. This can be verified by adding an assert > `assertEquals(PCollection.IsBounded.UNBOUNDED, result.isBounded());` > at the unit test here: > > https://github.com/apache/beam/blob/f3041e078643abe4f7608a7a11459f81b0d20b3f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PeriodicImpulseTest.java#L71 > > Then run: > > ./gradlew :runners:direct-java:needsRunnerTests --tests > org.apache.beam.sdk.transforms.PeriodicImpulseTest > > The test passes. (If asserting BOUNDED, an assertion error happens: > java.lang.AssertionError: expected:<BOUNDED> but was:<UNBOUNDED>) > > The unbounded pcoll is a result of return type of DoFn.ProcessElement here: > > https://github.com/apache/beam/blob/f3041e078643abe4f7608a7a11459f81b0d20b3f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java#L182 > > The behavior is documented here: > > https://github.com/apache/beam/blob/6390bcd265512f077c92124a551419ee0349c84c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java#L877 > "If ProcessElement returns DoFn.ProcessContinuation, assume it is > unbounded." > > Best, > Yi > > On Thu, May 19, 2022 at 6:38 PM Chamikara Jayalath <[email protected]> > wrote: > >> >> >> On Wed, May 18, 2022 at 12:32 PM Yi Hu <[email protected]> wrote: >> >>> Hi dev group, >>> >>> TL;DR: PeriodicImpluse transform in Java sdk generates unbounded >>> PCollection; while in Python sdk it generates bounded PCollection. The >>> latter case may cause issues in streaming. >>> >>> Thank you for your attention. There are periodic impulse transforms in >>> both Java and Python sdk, implemented in a quite similar way. However, one >>> difference is that Java PeriodicImpluse generates an *unbounded* >>> PCollection of time series, while the PCollection generated by >>> Python PeriodicImpluse is *bounded*. >>> >>> This difference has generated an issue when downstream PTransform >>> contains a ReShuffle op in it. The GBK will hold inflow elements and fire >>> it like a batch pipeline, regardless of the window settings. >>> >>> Here is a working example. There are no printed values in the console. >>> But commenting out Reshuffle() the timestamps then get printed >>> continuously. (Removing apply_windowing or streaming flags the behavior is >>> the same). >>> >>> ``` >>> import apache_beam as beam >>> from apache_beam.options.pipeline_options import PipelineOptions >>> from apache_beam.transforms.periodicsequence import PeriodicImpulse >>> from apache_beam.transforms.util import Reshuffle >>> >>> >>> if __name__ == '__main__': >>> pipeline_options = PipelineOptions(flags=['--streaming']) >>> >>> with beam.Pipeline(options=pipeline_options) as p: >>> result = ( >>> p >>> | PeriodicImpulse(fire_interval=1, apply_windowing=True) >>> | Reshuffle() # commenting out here the timestamps get printed >>> | beam.Map(print) >>> ) >>> >>> ``` >>> >>> Having looking the background I see the design doc of BEAM-9650 ( >>> https://docs.google.com/document/d/1LDY_CtsOJ8Y_zNv1QtkP6AGFrtzkj1q5EW_gSChOIvg/edit#heading=h.iluf68hiykcs) >>> and learned that the Python implementation was originally designed in the >>> context of slowly changing sources, which itself is "bounded" during short >>> time (like days) but can change. On the other hand, In Java sdk, the >>> unbounded PCollection is a result of ProcessContinuation return type of >>> DoFn's ProcessElement method. In Python this is not set thus the output >>> PTransform is inferred bounded. >>> >>> >> That design doc I believe was both for Java and Python so there should be >> behavior differences between Java and Python implementations. Seems like >> Java implementation is bounded as well ? >> >> https://github.com/apache/beam/blob/e9cfd8e441017085c9e9064a4a8cdd3576e3da43/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicImpulse.java#L69 >> >> >>> Now the question is, >>> (1) should we change the python implementation to generates unbounded >>> PCollection in alignment with Java >>> (2) If yes (or no), should we introduce a parameter to set the >>> boundedness of PeriodicImpulse, however this seems not aligned with current >>> Beam sources where boundedness is a final property for a specific source. >>> >> >> I think you are trying to use this to implement watchForNewFiles, right ? >> If so, "PeriodicImpulse" might not be the correct thing to base this on. >> We might actually need to develop a Watch transform (SDF) for the Python >> SDK. >> >> Thanks, >> Cham >> >> >>> Regards, >>> Yi >>> >>> -- >>> >>> Yi Hu, (he/him/his) >>> >>> Software Engineer >>> >>> >>>
