Hello  Aljoscha <aljos...@apache.org>,

My sincere apologies at the beginning, if I seem to repeat the same
question, almost interminably. If it is frustrating you, I seek your
patience but I really want to nail it down in mind. :-)

The point about parallelizing is well taken. I understand why the stream
should be broken into multiple partitions and how. The understanding that
is still evading me is how is the use-case of computing an (sliding)
average temperature achieved if the stream is scattered.

I want the *running* average temperature for every 3 readings, sliding by 1
reading. I am monitoring the average temperature; if it goes beyond a
certain threshold for 3 consecutive readings, I throw an alarm.

Let's take the following set of data (fields are: probeID, timestamp,
temperature ; 'timestamp' field is used for assignAscendingTimestamp()
function):

P1,T1,20
P1,T2,30
P2,T2,30
P1,T3,50
P2,T3,20
P3,T3,10

Assumption: T1 < T2 < T3

Now, if we partition on the probeID, we get three partitions, thus:

P1 -> (T1,20) | (T2,30) | (T3,50)
P2 -> (T2,30) | (T3,20)
P3 -> (T3,10)

Computing the average temperature will give me *three distinct averages*
here, one for each partition. I get average per probe, not per every 3
readings [assuming a slidingWindow(3,1)] irrespective of which probe gives.

Is it even correct to expect a running average if we partition the input
stream?

Hope I am making my understanding (or the lack of it), quite clear here! :-)

-- Nirmalya



-------------------------------------------------------------------------------------------------------------------------------------------------------------
To: user@flink.apache.org
Cc:
Date: Fri, 19 Feb 2016 10:41:52 +0100
Subject:
Hi,
as I understand it the “temp_reading_timestamp” field is not a key on which
you can partition your data. This is a field that would be used for
assigning the elements to timestamps. In you data you also have the
“probeID” field. This is a field that could be used to parallelize
computation, for example you could
do the following:

val inputStream = <define some source>

val result = inputStream
  .assignAscendingTimestamps { e => e.temp_reading_timestamp }
  .keyBy { e => e.probeID }
  .timeWindow(Time.minutes(10))
  .apply(new SumFunction(), new ComputeAverageFunction())

result.print()

(Where SumFunction() would sum up temperatures and keep a count and
ComputeAverageFunction() would divide the sum by the count.)

In this way, computation is parallelized because it can be spread across
several machines and partitioned by the key. Without such a key everything
has to be computed on one machine because a global view of the data is
required.

Cheers,






-- 
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is
where they should be.
Now put the foundation under them."

Reply via email to