Github user f-sander commented on the pull request:
https://github.com/apache/flink/pull/1565#issuecomment-180798486
Sorry for the long delay. I still don't really have time for this, but I
wan't to describe it anyways. That's why the writing and formatting is pretty
sloppy in this. Sorry for that, I hope you bare with me:
We only consider isotonic regression on weighted, two dimensional data.
Thus, datapoints are tuples of three doubles: `(y, x, w)`.
PAV assumes the data to be sorted by `x`. It starts on the left and goes to
the right. Whenever two Point's (or more) are found that are descending in
order of `x`, it "pools" them, which means all `y` values (multiplied by their
weight) in that pool are averaged by the sum of all weights. Any point in the
pool then looks like this: `(y_weighted_pool_avg, x, w)`. Because the `y`
values where changed, we have to look back in `x` order if the new pool avg is
lower than the value before the pool. If that's the case, we have to pool again
until now higher `y` value is present before the pool.
Any sequence of data points from `i` to `j` sharing the same `y` value is
compressed in the following way: `(y, x_i, sum_of_weights), (y, x_j, 0)`. The
hope of Sparks implementation is that enough data gets compressed that way,
that all remaining data fits into one node in the last step. However, there are
of course cases, where this simply doesn't work.
Our approach (not implemented in this PR) works like this:
```
compare two consecutive data points i and j:
if y_i < y_j, leave them untouched
if y_i > y_j, replace both with ((y_i * w_i + y_j * w_j) / (w_i + w_j),
x_i, w_i + w_j). Also remember x_j
if y_i = y_j, replace both with (y_i, x_i, w_i + w_j). Also remember x_j
Repeat that until no pairs are combined to one
```
After the iteration terminated: Foreach point that has a "remembered"
`x_j`, add another `(y, x_j, 0)` directly behind it.
We are able to compare each point with its successor, by attaching each
point with an index (zipWithIndex) and a "next-pointer" (index+1) and then
doing a:
`set.join(set).where(next).equalTo(index)`
However, because of the weight summation, we must avoid that a point
appears in multiple join pairs. Otherwise a point's weight might be summed into
multiple combined points.
We worked around that by doing two joins in each iteration step:
```
step 1: left join side has only points with even indices, right side only
with odd
step 2: left join side has only points with odd indices, right side only
with even
if nothing happened during these two runs, we are done
```
Unfortunately, because of the merging the indices are not incrementing by 1
anymore. That's why we wanted to apply another zipWithIndex after the two
joins, but the join repartitioned the data, so we loose our range-partitioning.
But, this is required to get indices representing the total order of the data.
I hope you can understand the problem. Again sorry for sloppy writing.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---