Hi,
I'm using the SqlTransform as an external transform from within a Python
pipeline. The SQL docs [1] mention that you can either (a) window the
input or (b) window in the SQL query.
Option (a):
input
| "Window >> beam.WindowInto(window.FixedWindows(30))
| "Aggregate" >>
SqlTransform("""Select field, count(field) from PCOLLECTION
WHERE ...
GROUP BY field
""")
This results in an exception:
Caused by: java.lang.ClassCastException:
org.apache.beam.sdk.transforms.windowing.IntervalWindow cannot be cast
to org.apache.beam.sdk.transforms.windowing.GlobalWindow
=> Is this a bug?
Let's try Option (b):
input
| "Aggregate & Window" >>
SqlTransform("""Select field, count(field) from PCOLLECTION
WHERE ...
GROUP BY field,
TUMBLE(f_timestamp, INTERVAL '30' MINUTE)
""")
The issue that I'm facing here is that the timestamp is already assigned
to my values but is not exposed as a field. So I need to use a DoFn to
extract the timestamp as a new field:
class GetTimestamp(beam.DoFn):
def process(self, event, timestamp=beam.DoFn.TimestampParam):
yield TimestampedRow(..., timestamp)
input
| "Extract timestamp" >>
beam.ParDo(GetTimestamp())
| "Aggregate & Window" >>
SqlTransform("""Select field, count(field) from PCOLLECTION
WHERE ...
GROUP BY field,
TUMBLE(f_timestamp, INTERVAL '30' MINUTE)
""")
=> It would be very convenient if there was a reserved field name which
would point to the timestamp of an element. Maybe there is?
-Max
[1]
https://beam.apache.org/documentation/dsls/sql/extensions/windowing-and-triggering/