Thanks for the pointer to 'CoGroupByKey', which makes perfect sense and
so does the usage of Mean.
Still I have to bother you, as I'm still probably lacking some basic
understanding on how ideal messages/data-structures are supposed to look
like in a Beam Pipeline.
This is what I want to happen for each element in my original
PCollection ( read from PubSub )
original message -> "{ humidity=42.700001, temp=20.700001}"
final message -> "deviceID":{timestamp=1516549776,
humidity=42.700001, temp=20.700001, avg_hum=<xyz>, avg_temp=<abc>}"
Where I'm stuck is how to calculate the average ( Mean ) for each
datapoint in my original message?
As you pointed out the usage of 'CoGroupByKey' I assume I have to do
something like in the following visualization?
https://github.com/PatrickSteiner/Google_Cloud_IoT_Demo/blob/master/pictures/Mean_flow.png
Am I on the right path?
Thanks
Patrick
Kenneth Knowles wrote:
It looks like what you want is to join your input stream with the
computed averages. It might look something like this:
PCollection<KV<DeviceId, RawEvent> inputEvents =
...apply(Window.into(SlidingWindows....))
PCollection<KV<DeviceId, Double>> avgTemps =
inputEvents.apply(Mean.perKey())
I don't want to recreate all of the docs on this thread, so I will
just point to CoGroupByKey [1] that you would use to join these on
DeviceId; they pattern looks something like this, where I've left out
lots of boilerplate:
PCollection<KV<DeviceId, CoGbkResult>> joined = ...
PCollection<KV<DeviceId, EventWithAvg>> result =
joined.apply(ParDo.of(<function to pull out the joined results>))
Kenn
[1] https://beam.apache.org/documentation/programming-guide/#cogroupbykey
On Fri, Jan 26, 2018 at 6:23 AM, Steiner Patrick
<patr...@steiner-buchholz.de <mailto:patr...@steiner-buchholz.de>> wrote:
Hi Kenn,
thanks again for responding.
Let me try to explain better what I'm looking for. For simplicity
reason let's take a more simple example.
Message 1 "4711" : "{temp=10}" will become "4711" : "{temp=10,
avg_temp=10}"
Message 2 "4711" : "{temp=12}" will become "4711" : "{temp=12,
avg_temp=11}"
Message 3 "4711" : "{temp=14}" will become "4711" : "{temp=14,
avg_temp=12}"
Message 4 "4711" : "{temp=10}" will become "4711" : "{temp=10,
avg_temp=11.5}"
So for each incoming message I would like to append the current
windows "avg_temp" and all this with a sliding window.
So if we would say that the window is 2 seconds and we receive one
message per second, my sample would change to
Message 1 "4711" : "{temp=10}" will become "4711" : "{temp=10,
avg_temp=10}"
Message 2 "4711" : "{temp=12}" will become "4711" : "{temp=12,
avg_temp=11}"
Message 3 "4711" : "{temp=14}" will become "4711" : "{temp=14,
avg_temp=13}"
Message 4 "4711" : "{temp=10}" will become "4711" : "{temp=10,
avg_temp=12}"
Does this explain what I plan?
Thanks again for your help
Patrick
Kenneth Knowles wrote:
On Thu, Jan 25, 2018 at 3:31 AM, Steiner Patrick
<patr...@steiner-buchholz.de
<mailto:patr...@steiner-buchholz.de>> wrote:
Hi Kenn, all,
you are right, sliding windows seems to be exactly what I
need. Thanks for that pointer.
Where I'm still in need of expert advise is how to structure
the data within my PCollection. From PubSub I do read all
data as a JSON-String
Format 1: "{ humidity=42.700001, temp=20.700001}"
currently I'm just extending the JSON-String with the
deviceID of the sender and the timestamp
Format 2: "{timestamp=1516549776, deviceID=esp8266_D608CF,
humidity=42.700001, temp=20.700001}"
I guess it would make sense to take the deviceID as "Key" to
a Key/Value pair, so I can Group by "deviceID"?
Format 3: "esp8266_D608CF" : "{timestamp=1516549776,
humidity=42.700001, temp=20.700001}"
Yup, this is the right basic set up for just about anything
you'll want to do.
What I'm still "missing" is an idea how to apply "Mean" to
each "humidity" and "temp", so that as a result I can create
something like
Format 4: "esp8266_D608CF" : "{timestamp=1516549776,
humidity=42.700001, temp=20.700001, avg_hum=<xyz>,
avg_temp=<abc>}"
Do you just want to calculate the averages and have one record
output summarizing the device/window? Or do you want to keep all
the original records and annotate them with the avg for their
window, in other words basically doing the first calculation and
then joining it with your original stream?
Kenn
Happy to take advise or pointer into the right direction.
Thanks
Patrick
Kenneth Knowles wrote:
Hi Patrick,
It sounds like you want to read about event-time windowing:
https://beam.apache.org/documentation/programming-guide/#windowing
<https://beam.apache.org/documentation/programming-guide/#windowing>
In particular, when you say "the last 5 minutes" I would ask
what the point of reference is. Your needs may be served by
fixed or sliding windows of five minutes. These are included
with the SDK, and documented here:
https://beam.apache.org/documentation/programming-guide/#provided-windowing-functions
<https://beam.apache.org/documentation/programming-guide/#provided-windowing-functions>
Hope that gets you started,
Kenn
On Sun, Jan 21, 2018 at 8:29 AM, Steiner Patrick
<patr...@steiner-buchholz.de
<mailto:patr...@steiner-buchholz.de>> wrote:
Hi,
I'm in the process of porting work that I have done
based on JBoss Technology ( ActiveMQ, Drools, etc ) to
GCloud.
The scenario is a simple IoT example with devices
sending their values via MQTT to PubSub and getting
received by Dataflow for processing.
So far, while learning GCloud features from scratch, I
was able to receive the data and write it to a BigQuery
Table. It's all documented at
https://github.com/PatrickSteiner/Google_Cloud_IoT_Demo
<https://github.com/PatrickSteiner/Google_Cloud_IoT_Demo>
What's working is, that I receive a JSON String ( e.g. {
humidity=42.700001, temp=20.700001} ) via PubSub and
extend it to {timestamp=1516549776,
deviceID=esp8266_D608CF, humidity=42.700001,
temp=20.700001} via DataFlow.
Where I have no clue is the following: I want to
calculate for every "deviceID" the average value for
"humidity" and "temp" for the last 5 minutes.
Currently my simple Pipeline is
p.apply(PubsubIO.readMessagesWithAttributes().fromTopic(options.getPubSubTopic()))
.apply(ParDo.of(new FormatMessageAsTableRowFn()))
.apply(BigQueryIO.writeTableRows().to(tableSpec.toString())
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CR
<http://yIO.Write.CreateDisposition.CR>EATE_NEVER));
Anyone an advice or pointer to docu how I need to proceed?
Patrick