Hi together,
I want to create a cumulative sum over a time series in a bounded batch
processing in Apache beam with the Python API. What you can do is to write a
cummulative sum with a stateful DoFn, but the problem you would still face is
that you cannot handle it this way when the data in unordered, which is the
case in a PCollection. Is there a way to make the cumulative sum over time in a
batch process? This is what i did (whithout order):
import apache_beam as beam
from apache_beam import TimeDomain
from apache_beam.transforms.userstate import ReadModifyWriteStateSpec,
TimerSpec, CombiningValueStateSpec
from apache_beam.transforms.window import FixedWindows, GlobalWindows
class TimestampedSumAccumulator(beam.DoFn):
SUM_STATE = 'sum'
def process(
self, element,
timestamp=beam.DoFn.TimestampParam,
sum_state=beam.DoFn.StateParam(ReadModifyWriteStateSpec(SUM_STATE,
beam.coders.FloatCoder()))
):
sum_value = sum_state.read() or 0.0
# print(element)
sum_value += element[1]
sum_state.write(sum_value)
yield beam.transforms.window.TimestampedValue(sum_value, timestamp)
with beam.Pipeline() as p:
sums = (p
| 'Create' >> beam.Create([
(3.1, 3),
(1.5, 1),
(4.2, 4),
(5.4, 5),
(2.3, 2)
])
| 'AddTimestamps' >> beam.Map(lambda x:
beam.transforms.window.TimestampedValue(x[0], x[1]))
| 'AddKeys' >> beam.Map(lambda x: ('sum_key', x))
| 'Window' >> beam.WindowInto(FixedWindows(10))
| 'Accumulate' >> beam.ParDo(TimestampedSumAccumulator())
| 'Print' >> beam.Map(print))
How could that be done to make the cumulative sum in the “right” order?
Thank you very much in advance.
________________________________
This e-mail and any attachments may be confidential or legally privileged. If
you received this message in error or are not the intended recipient, you
should destroy the e-mail message and any attachments or copies, and you are
prohibited from retaining, distributing, disclosing or using any information
contained herein. Please inform us of the erroneous delivery by return e-mail.
Thank you for your cooperation. For more information on how we use your
personal data please see our Privacy
Notice<https://www.oliverwyman.com/policies/privacy-notice.html>.