[
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14703347#comment-14703347
]
ASF GitHub Bot commented on FLINK-1901:
---------------------------------------
Github user tillrohrmann commented on the pull request:
https://github.com/apache/flink/pull/949#issuecomment-132692640
Sorry for the late review @ChengXiangLi. I finished it now and the PR looks
really good.
There is only one thing I would like to change before merging it. This is
to move the `sample` and the `sampleWithSize` methods from the `DataSet` to the
`DataSetUtils` class. This will effectively make the `sample` methods not part
of the core API. The reason for this is that the `sampleWithSize` method breaks
with one of Flink's guarantees, which is the robustness of the core API
functions against `OutOfMemoryExceptions`. Let me elaborate on it for better
understanding.
All of Flink internal operations work on serialized data which is stored in
Flink's *managed* memory. The managed memory allows Flink to detect when to
spill elements from memory to disk to avoid out of memory exceptions and, thus,
to make the system robust. The managed memory is a pre-allocated area of memory
which is administered by the `MemoryManager`. The `MemoryManager` allows you to
allocate and deallocate `MemorySegments` in a c-style fashion. However, once a
data item enters a UDF, the item has to be deserialized putting it on the
remaining heap. This is not bad if your UDF does not accumulate these elements.
However, the `sampleWithSize` method materializes up to `numSamples` elements
on the heap. Depending on the number of samples and the data item size, this
might be enough to eat up all remaining heap memory space and to crash the JVM.
I think that your current implementation will work for most use cases but
in order to make it part of the core API, we also have to deal with the case
where our sample cannot be materialized on the remaining heap of a running
Flink program. In order to achieve this, I think it would be necessary to
implement a native `topK` operator. With *native* I mean an operator which
works on Flink's managed memory and, thus, can also deal with spilling records
to disk. Having such a `topK` operator, we could reimplement the reservoir
sampling algorithm the following way: For sampling without replacement we first
assign weights in a map operation to each element. Then we call topK with
respect to the weights and obtain the sample. For the sampling with replacement
we could simply use a flat map operation to assign `numSamples` times a weight
to each element. Then we again call `topK` with respect to the weight.
For the topK implementation, we would need something like a `PriorityQueue`
which operates on managed memory (similar to the `CompactingHashTable` which is
a hash table working on managed memory). Thus, we would have a priority queue
which stores the priority values of each record and a pointer to the record
which is kept in managed memory. Whenever an element is removed from the
priority queue, we can also free the occupied managed memory. In case that we
run out of managed memory, we have to spill some of the records to disk which
are still in the race for the top k. As a first step, we can skip the spilling
and just throw a proper exception (other than `OutOfMemoryException`) when we
run out of memory. Afterwards, we can incrementally add the spilling
functionality.
I know that you've already spent a lot of effort into writing the sampling
operator and that this result might be a little bit demotivating. However, if
we want to make it right and robust, then I think this is the way to go.
Additionally we would add a proper topK operator to Flink's API which is
missing big time :-) If you want to, then you could also take the lead here.
The further discussion should then happen in a separate issue. I'm more than
willing to assist you in implementing this operator. What do you think?
> Create sample operator for Dataset
> ----------------------------------
>
> Key: FLINK-1901
> URL: https://issues.apache.org/jira/browse/FLINK-1901
> Project: Flink
> Issue Type: Improvement
> Components: Core
> Reporter: Theodore Vasiloudis
> Assignee: Chengxiang Li
>
> In order to be able to implement Stochastic Gradient Descent and a number of
> other machine learning algorithms we need to have a way to take a random
> sample from a Dataset.
> We need to be able to sample with or without replacement from the Dataset,
> choose the relative or exact size of the sample, set a seed for
> reproducibility, and support sampling within iterations.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)