Hi Kenn,

thanks again for responding.

Let me try to explain better what I'm looking for. For simplicity reason let's take a more simple example.

Message 1 "4711" : "{temp=10}" will become "4711" : "{temp=10, avg_temp=10}" Message 2 "4711" : "{temp=12}" will become "4711" : "{temp=12, avg_temp=11}" Message 3 "4711" : "{temp=14}" will become "4711" : "{temp=14, avg_temp=12}" Message 4 "4711" : "{temp=10}" will become "4711" : "{temp=10, avg_temp=11.5}"

So for each incoming message I would like to append the current windows "avg_temp" and all this with a sliding window. So if we would say that the window is 2 seconds and we receive one message per second, my sample would change to


Message 1 "4711" : "{temp=10}" will become "4711" : "{temp=10, avg_temp=10}" Message 2 "4711" : "{temp=12}" will become "4711" : "{temp=12, avg_temp=11}" Message 3 "4711" : "{temp=14}" will become "4711" : "{temp=14, avg_temp=13}" Message 4 "4711" : "{temp=10}" will become "4711" : "{temp=10, avg_temp=12}"

Does this explain what I plan?

Thanks again for your help

Patrick

Kenneth Knowles wrote:


On Thu, Jan 25, 2018 at 3:31 AM, Steiner Patrick <patr...@steiner-buchholz.de <mailto:patr...@steiner-buchholz.de>> wrote:

    Hi Kenn, all,

    you are right, sliding windows seems to be exactly what I need.
    Thanks for that pointer.

    Where I'm still in need of expert advise is how to structure the
    data within my PCollection. From PubSub I do read all data as a
    JSON-String

    Format 1: "{ humidity=42.700001, temp=20.700001}"

    currently I'm just extending the JSON-String with the deviceID of
    the sender and the timestamp

    Format 2: "{timestamp=1516549776, deviceID=esp8266_D608CF,
    humidity=42.700001, temp=20.700001}"

    I guess it would make sense to take the deviceID as "Key" to a
    Key/Value pair, so I can Group by "deviceID"?

    Format 3: "esp8266_D608CF" : "{timestamp=1516549776,
    humidity=42.700001, temp=20.700001}"


Yup, this is the right basic set up for just about anything you'll want to do.

    What I'm still "missing" is an idea how to apply  "Mean" to each
    "humidity" and "temp", so that as a result I can create something like

    Format 4: "esp8266_D608CF" : "{timestamp=1516549776,
    humidity=42.700001, temp=20.700001, avg_hum=<xyz>, avg_temp=<abc>}"


Do you just want to calculate the averages and have one record output summarizing the device/window? Or do you want to keep all the original records and annotate them with the avg for their window, in other words basically doing the first calculation and then joining it with your original stream?

Kenn

    Happy to take advise or pointer into the right direction.

    Thanks

    Patrick



    Kenneth Knowles wrote:
    Hi Patrick,

    It sounds like you want to read about event-time windowing:
    https://beam.apache.org/documentation/programming-guide/#windowing 
<https://beam.apache.org/documentation/programming-guide/#windowing>

    In particular, when you say "the last 5 minutes" I would ask what
    the point of reference is. Your needs may be served by fixed or
    sliding windows of five minutes. These are included with the SDK,
    and documented here:
    
https://beam.apache.org/documentation/programming-guide/#provided-windowing-functions
    
<https://beam.apache.org/documentation/programming-guide/#provided-windowing-functions>

    Hope that gets you started,

    Kenn

    On Sun, Jan 21, 2018 at 8:29 AM, Steiner Patrick
    <patr...@steiner-buchholz.de
    <mailto:patr...@steiner-buchholz.de>> wrote:

        Hi,

        I'm in the process of porting work that I have done based on
        JBoss Technology ( ActiveMQ, Drools, etc ) to GCloud.

        The scenario is a simple IoT example with devices sending
        their values via MQTT to PubSub and getting received by
        Dataflow for processing.

        So far, while learning GCloud features from scratch, I was
        able to receive the data and write it to a BigQuery Table.
        It's all documented at
        https://github.com/PatrickSteiner/Google_Cloud_IoT_Demo
        <https://github.com/PatrickSteiner/Google_Cloud_IoT_Demo>

        What's working is, that I receive a JSON String ( e.g. {
        humidity=42.700001, temp=20.700001} ) via PubSub and extend
        it to {timestamp=1516549776, deviceID=esp8266_D608CF,
        humidity=42.700001, temp=20.700001} via DataFlow.

        Where I have no clue is the following: I want to calculate
for every "deviceID" the average value for "humidity" and "temp" for the last 5 minutes.

        Currently my simple Pipeline is

p.apply(PubsubIO.readMessagesWithAttributes().fromTopic(options.getPubSubTopic()))
                 .apply(ParDo.of(new FormatMessageAsTableRowFn()))
.apply(BigQueryIO.writeTableRows().to(tableSpec.toString()) .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CR
        <http://yIO.Write.CreateDisposition.CR>EATE_NEVER));

        Anyone an advice or pointer to docu how I need to proceed?

        Patrick





Reply via email to