Hi Kenn, all,
you are right, sliding windows seems to be exactly what I need. Thanks
for that pointer.
Where I'm still in need of expert advise is how to structure the data
within my PCollection. From PubSub I do read all data as a JSON-String
Format 1: "{ humidity=42.700001, temp=20.700001}"
currently I'm just extending the JSON-String with the deviceID of the
sender and the timestamp
Format 2: "{timestamp=1516549776, deviceID=esp8266_D608CF,
humidity=42.700001, temp=20.700001}"
I guess it would make sense to take the deviceID as "Key" to a Key/Value
pair, so I can Group by "deviceID"?
Format 3: "esp8266_D608CF" : "{timestamp=1516549776, humidity=42.700001,
temp=20.700001}"
What I'm still "missing" is an idea how to apply "Mean" to each
"humidity" and "temp", so that as a result I can create something like
Format 4: "esp8266_D608CF" : "{timestamp=1516549776, humidity=42.700001,
temp=20.700001, avg_hum=<xyz>, avg_temp=<abc>}"
Happy to take advise or pointer into the right direction.
Thanks
Patrick
Kenneth Knowles wrote:
Hi Patrick,
It sounds like you want to read about event-time windowing:
https://beam.apache.org/documentation/programming-guide/#windowing
In particular, when you say "the last 5 minutes" I would ask what the
point of reference is. Your needs may be served by fixed or sliding
windows of five minutes. These are included with the SDK, and
documented here:
https://beam.apache.org/documentation/programming-guide/#provided-windowing-functions
Hope that gets you started,
Kenn
On Sun, Jan 21, 2018 at 8:29 AM, Steiner Patrick
<patr...@steiner-buchholz.de <mailto:patr...@steiner-buchholz.de>> wrote:
Hi,
I'm in the process of porting work that I have done based on JBoss
Technology ( ActiveMQ, Drools, etc ) to GCloud.
The scenario is a simple IoT example with devices sending their
values via MQTT to PubSub and getting received by Dataflow for
processing.
So far, while learning GCloud features from scratch, I was able to
receive the data and write it to a BigQuery Table. It's all
documented at
https://github.com/PatrickSteiner/Google_Cloud_IoT_Demo
<https://github.com/PatrickSteiner/Google_Cloud_IoT_Demo>
What's working is, that I receive a JSON String ( e.g. {
humidity=42.700001, temp=20.700001} ) via PubSub and extend it to
{timestamp=1516549776, deviceID=esp8266_D608CF,
humidity=42.700001, temp=20.700001} via DataFlow.
Where I have no clue is the following: I want to calculate for
every "deviceID" the average value for "humidity" and "temp" for
the last 5 minutes.
Currently my simple Pipeline is
p.apply(PubsubIO.readMessagesWithAttributes().fromTopic(options.getPubSubTopic()))
.apply(ParDo.of(new FormatMessageAsTableRowFn()))
.apply(BigQueryIO.writeTableRows().to(tableSpec.toString())
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER));
Anyone an advice or pointer to docu how I need to proceed?
Patrick