[
https://issues.apache.org/jira/browse/MAHOUT-1490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14002587#comment-14002587
]
Anand Avati commented on MAHOUT-1490:
-------------------------------------
[~dlyubimov] I will try my best to explain what's happening. Disclaimer that I
was neither the designer nor implementer, and only have an understanding of how
it works by reading the source and asking questions.
It first helps to understand as background how data is laid out in an H2O
cloud. A huge matrix (100Ks cols X billions of rows) of numbers (Frame) can be
imagined to consist of columns (Vectors). A Vector (column) is sliced into
chunks, and entire chunks are hashed/distributed across the cluster. Chunks
which hold elements of a given row of the Frame (matrix) are guaranteed to be
on the same server (i.e "similarly partitioned" in Spark lingo). A chunk is
typically a few MBs in size, i.e expect to store a few 100k to a few million
adjacent elements of a given column. The reason for such a "columnar"
orientation is because elements in a column are expected to be "similar"
(compared to elements in a row) and therefore better compression can be
applied. The size of a Chunk should be large enough to make compression
meaningful (e.g chunk of 8 or 16 elements is too small for compression) and
small enough such that there is not "too much variance" (see next).
Given the background it now makes sense to see how chunk compression works.
Compression of each Chunk is an independent process, and a different algorithm
may be used to compress different Chunks, even if two Chunks belong to the same
Vector/Column. The choice of the compression algorithm to be used is determined
by inspecting the elements after they are all made available. For e.g, when a
new large matrix is being read from disk, the elements are first read into a
datatype called "NewChunk". You can only "append" elements in the NewChunk
phase. Once a NewChunk is filled with enough elements, it is then compressed
into a Chunk. A Chunk itself is an abstract class. Based on the compression
algorithm, there are many concrete implementations of Chunk (there are 16
different compression algorithms/implementations as of now), available as
C<xy>Chunk.java in
https://github.com/0xdata/h2o/tree/master/src/main/java/water/fvec.
NewChunk.compress() (the link shared above) is the method which converts the
inflated NewChunk into the most appropriate compression C<xy>Chunk, and the
selection is made by inspecting all the elements.
The various strategies of compression include things like:
- Biasing: For e.g if all elements in the Chunk consist of values between
76861433640456465 and 76861433640456480, they can actually be represented as
char bytes (i.e data type which can hold Xmax - Xmin) along with the bias base
recorded in the Chunk head.
- Exponent scaling: For e.g convert set of {1.2,23,0.34} by multiplying by 100
into {123,2300,34} which can now be represnted as 2-byte (short) int, instead
of float.
- Counting the "types" of elements
-- if there are just two types, no matter their values (e.g only 23 and 37),
it is still represented as a boolean bitvector
-- if there is just one element value throughout (e.g only 129) then no
memory is consumed.
Combinations of techniques such as above result in the 16 Chunk implementations
(how they are internally "viewed as"):
- C0DChunk: All NAs/Strings/etc (trivial case)
- CXDChunk - Sparse doubles (floating point)
- C0LChunk - Constant column of longs
- C0DChunk - Constant column of doubles
- CX0Chunk - Sparse Boolean Bitvector without NAs
- CXIChunk - Sparse 1-byte (with NAs)
- CBSChunk - Dense Boolean Bitvector
- C1SChunk - Scaled to 1-byte (with bias)
- C2SChunk - Scaled to 2-byte (with bias)
- C4SChunk - Scaled to 4-byte (with bias)
- C1NChunk - Scaled to 1-byte (data readily fit into unsigned bytes)
- C1Chunk - Scaled to 1-byte (with NAs)
- C8Chunk - Scaled to 1-byte (no bias - data readily fit)
- C2Chunk - Scale to 2-byte (no bias - data readily fit)
- C4Chunk - Scale to 4-byte (no bias - data readily fit)
And inflation involves at worst simple scaling, biasing and a binary lookup -
typically a subset. These operations happen with no load/stores, completely off
registers. Reading compressed floats and doubles is done through unsafe get()s.
All this results in only compressed data travelling over the memory bus and
expands in the core. For operations like adding all elements of a matrix, the
job is typically memory bandwidth bound.
As a side effect of this type of compression, set() can sometimes be expensive.
If the new value passed to set() is "compatible" with the existing compression
(i.e, fits within the scale/bias, does not convert sparse location to filled
location etc.) it is not very expensive. But if the new value does not "fit",
then the whole chunk is inflated back into a NewChunk, and re-evaluated with
the compress() function what best algorithm can be applied (new compression is
deferred until lot of sets() have happened and not re-compressed on every
incompatible set()).
Is there a way in which the benefits of H2O compression can be leveraged by
just using it as a backend, rather than re-implementing all of this? I'm happy
to explore/contribute..
> Data frame R-like bindings
> --------------------------
>
> Key: MAHOUT-1490
> URL: https://issues.apache.org/jira/browse/MAHOUT-1490
> Project: Mahout
> Issue Type: New Feature
> Reporter: Saikat Kanjilal
> Assignee: Dmitriy Lyubimov
> Fix For: 1.0
>
> Original Estimate: 20h
> Remaining Estimate: 20h
>
> Create Data frame R-like bindings for spark
--
This message was sent by Atlassian JIRA
(v6.2#6252)