Hi Kenn, all,

you are right, sliding windows seems to be exactly what I need. Thanks for that pointer.

Where I'm still in need of expert advise is how to structure the data within my PCollection. From PubSub I do read all data as a JSON-String

Format 1: "{ humidity=42.700001, temp=20.700001}"

currently I'm just extending the JSON-String with the deviceID of the sender and the timestamp

Format 2: "{timestamp=1516549776, deviceID=esp8266_D608CF, humidity=42.700001, temp=20.700001}"

I guess it would make sense to take the deviceID as "Key" to a Key/Value pair, so I can Group by "deviceID"?

Format 3: "esp8266_D608CF" : "{timestamp=1516549776, humidity=42.700001, temp=20.700001}"

What I'm still "missing" is an idea how to apply "Mean" to each "humidity" and "temp", so that as a result I can create something like

Format 4: "esp8266_D608CF" : "{timestamp=1516549776, humidity=42.700001, temp=20.700001, avg_hum=<xyz>, avg_temp=<abc>}"

Happy to take advise or pointer into the right direction.

Thanks

Patrick


Kenneth Knowles wrote:
Hi Patrick,

It sounds like you want to read about event-time windowing: https://beam.apache.org/documentation/programming-guide/#windowing

In particular, when you say "the last 5 minutes" I would ask what the point of reference is. Your needs may be served by fixed or sliding windows of five minutes. These are included with the SDK, and documented here: https://beam.apache.org/documentation/programming-guide/#provided-windowing-functions

Hope that gets you started,

Kenn

On Sun, Jan 21, 2018 at 8:29 AM, Steiner Patrick <patr...@steiner-buchholz.de <mailto:patr...@steiner-buchholz.de>> wrote:

    Hi,

    I'm in the process of porting work that I have done based on JBoss
    Technology ( ActiveMQ, Drools, etc ) to GCloud.

    The scenario is a simple IoT example with devices sending their
    values via MQTT to PubSub and getting received by Dataflow for
    processing.

    So far, while learning GCloud features from scratch, I was able to
    receive the data and write it to a BigQuery Table. It's all
    documented at
    https://github.com/PatrickSteiner/Google_Cloud_IoT_Demo
    <https://github.com/PatrickSteiner/Google_Cloud_IoT_Demo>

    What's working is, that I receive a JSON String ( e.g. {
    humidity=42.700001, temp=20.700001} ) via PubSub and extend it to
    {timestamp=1516549776, deviceID=esp8266_D608CF,
    humidity=42.700001, temp=20.700001} via DataFlow.

    Where I have no clue is the following: I want to calculate for
    every "deviceID" the average value for "humidity" and  "temp" for
    the last 5 minutes.

    Currently my simple Pipeline is

p.apply(PubsubIO.readMessagesWithAttributes().fromTopic(options.getPubSubTopic()))
             .apply(ParDo.of(new FormatMessageAsTableRowFn()))
             .apply(BigQueryIO.writeTableRows().to(tableSpec.toString())
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER));

    Anyone an advice or pointer to docu how I need to proceed?

    Patrick



Reply via email to