Hi All,

I have a BigQuery dataset which includes event logs from many thousand devices.
It's basically time-series data which includes a "CONNECTED" or
"DISCONNECTED" state for any device at infrequent points in time.
However, except for error cases where it might be missing, there is
always an event logged for any actual state changes and then there are
some events in between just republishing the current state.

Example:

+----------+------------------+---------------------+--------------+
| deviceId | msg seq (device) | timestamp           | state        |
+----------+------------------+---------------------+--------------+
| d111     | 5                | 2023-01-20T20:30:00 | CONNECTED    |
+----------+------------------+---------------------+--------------+
| d111     | 6                | 2023-02-21T01:25:00 | CONNECTED    |
+----------+------------------+---------------------+--------------+
| d000     | 2                | 2023-01-20T10:00:00 | DISCONNECTED |
+----------+------------------+---------------------+--------------+
| d111     | 7                | 2023-02-21T10:15:00 | DISCONNECTED |
<- state change event
+----------+------------------+---------------------+--------------+
| d000     | 3                | 2023-02-21T12:00:00 | DISCONNECTED |
<- state change event
+----------+------------------+---------------------+--------------+
| d000     | 4                | 2023-02-21T12:10:00 | CONNECTED    |
<- state change event
+----------+------------------+---------------------+--------------+
| d000     | 5                | 2023-02-22T05:00:00 | DISCONNECTED |
+----------+------------------+---------------------+--------------+

I would like to use Apache Beam (via Dataflow) to read this dataset
and convert it into a dataset which only lists intervals for the
connected state, similar to this:

+----------+---------------------+---------------------+-----------+
| deviceId | timestamp (start)   | timestamp (end)     | state     |
+----------+---------------------+---------------------+-----------+
| d111     | 2023-01-20T20:30:00 | 2023-02-21T10:15:00 | CONNECTED |
+----------+---------------------+---------------------+-----------+
| d000     | 2023-02-21T12:10:00 | 2023-02-22T05:00:00 | CONNECTED |
+----------+---------------------+---------------------+-----------+

In a naive way I would just loop through all these events, remember
the current state for each device and output an entry each time the
state changes.
But what would be the best way to implement this use-case with Apache
Beam? I was thinking that in theory I should be able to start tracking
state changes for a device at multiple points in time to optimize the
processing.

Given that I need to remember the current state I assume I need some
stateful processing using StateSpec.
But when I read the documentation / blog posts it is also mentioned
that the stateful processing ignores time and doesn't process elements
in order.
So I am quite unsure if there is a good way to actually do this
although I assumed at first that this is a rather trivial task to do
with the available tools.

I assumed this is some common use-case when doing some state analysis
on existing event data, in this case querying the total connection
time.
Is there a best-practice approach for this? Potentially already at the
logging stage of the data to make it easier to process it later.

Thanks!

Reply via email to