[ 
https://issues.apache.org/jira/browse/HADOOP-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Douglas updated HADOOP-2085:
----------------------------------

    Attachment: 2085-5.patch

Addressed NPE and included interface info from this JIRA in 
o/a/h/mapred/join/package.html

> Map-side joins on sorted, equally-partitioned datasets
> ------------------------------------------------------
>
>                 Key: HADOOP-2085
>                 URL: https://issues.apache.org/jira/browse/HADOOP-2085
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>            Reporter: Chris Douglas
>            Assignee: Chris Douglas
>             Fix For: 0.16.0
>
>         Attachments: 2085-2.patch, 2085-3.patch, 2085-4.patch, 2085-5.patch, 
> 2085.patch
>
>
> h3. Motivation
> Given a set of sorted datasets keyed with the same class and yielding equal
> partitions, it is possible to effect a join of those datasets prior to the
> map. This could save costs in re-partitioning, sorting, shuffling, and
> writing out data required in the general case.
> h3. Interface
> The attached code offers the following interface to users of these classes.
> || property || required || value ||
> | mapred.join.expr | yes | Join expression to effect over input data |
> | mapred.join.keycomparator | no | {{WritableComparator}} class to use for 
> comparing keys |
> | mapred.join.define.<ident> | no | Class mapped to identifier in join 
> expression |
> The join expression understands the following grammar:
> {noformat}
> func ::= <ident>([<func>,]*<func>)
> func ::= tbl(<class>,"<path>");
> {noformat}
> Operations included in this patch are partitioned into one of two types:
> join operations emitting tuples and "multi-filter" operations emitting a
> single value from (but not necessarily included in) a set of input values.
> For a given key, each operation will consider the cross product of all
> values for all sources at that node.
> Identifiers supported by default:
> || identifier || type || description ||
> | inner | Join | Full inner join |
> | outer | Join | Full outer join |
> | override | MultiFilter | For a given key, prefer values from the rightmost 
> source |
> A user of this class must set the {{InputFormat}} for the job to
> {{CompositeInputFormat}} and define a join expression accepted by the 
> preceding
> grammar. For example, both of the following are acceptable:
> {noformat}
> inner(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/bar"),
>       tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/baz"))
> outer(override(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/bar"),
>                tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/baz")),
>       tbl(org.apache.hadoop.mapred/SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/rab"))
> {noformat}
> {{CompositeInputFormat}} includes a handful of convenience methods to aid
> construction of these verbose statements.
> As in the second example, joins may be nested. Users may provide a
> comparator class in the {{mapred.join.keycomparator}} property to
> specify the ordering of their keys, or accept the default comparator as
> returned by {{WritableComparator.get(keyclass)}}.
> Users can specify their own join operations, typically by overriding
> {{JoinRecordReader}} or {{MultiFilterRecordReader}} and mapping that class
> to an identifier in the join expression using the
> {{mapred.join.define._ident_}} property, where _ident_ is the identifier
> appearing in the join expression. Users may elect to emit- or modify- values
> passing through their join operation. Consulting the existing operations for
> guidance is recommended. Adding arguments is considerably more complex (and
> only partially supported), as one must also add a {{Node}} type to the parse
> tree. One is probably better off extending {{RecordReader}} in most cases.
> h3. Design
> As alluded to above, the design defines inner (Composite) and leaf (Wrapped)
> types for the join tree. Delegation satisfies most requirements of the
> {{InputFormat}} contract, particularly {{validateInput}} and {{getSplits}}.
> Most of the work in this patch concerns {{getRecordReader}}. The
> {{CompositeInputFormat}} itself delegates to the parse tree generated by
> {{Parser}}.
> h4. Hierarchical Joins
> Each {{RecordReader}} from the user must be "wrapped", since effecting a
> join requires the framework to track the head value from each source. Since
> the cross product of all values for each composite level of the join is
> emitted to its parent, all sources ^1^ must be capable of repeating the
> values for the current key. To avoid keeping an excessive number of copies
> (one per source per level), each composite requests its children to populate
> a {{JoinCollector}} with an iterator over its values. This way, there is
> only one copy of the current key for each composite node, the head key-value
> pair for each leaf, and storage at each leaf for all the values matching the
> current key at the parent collector (if it is currently participating in a
> join at the root). Strategies have been employed to avoid excessive copying
> when filling a user-provided {{Writable}}, but they have been conservative
> (e.g. in {{MultiFilterRecordReader}}, the value emitted is cloned in case
> the user modifies the value returned, possibly changing the state of a
> {{JoinCollector}} in the tree). For example, if the following sources
> contain these key streams:
> {noformat}
> A: 0  0   1    1     2        ...
> B: 1  1   1    1     2        ...
> C: 1  6   21   107   ...
> D: 6  28  496  8128  33550336 ...
> {noformat}
> Let _A-D_ be wrapped sources and _x,y_ be composite operations. If the
> expression is of the form {{x(A, y(B,C,D))}}, then when the current key at
> the root is 1 the tree may look like this:
> {noformat}
>             x (1, [ I(A), [ I(y) ] ] )
>           /   \
>          W     y (1, [ I(B), I(C), EMPTY ])
>          |   / | \
>          |  W  W  W
>          |  |  |  D (6, V~6~) => EMPTY
>          |  |  C (6, V~6~)    => V~1.1~ @1.1
>          |  B (2, V~2~)       => V~1,1~ V~1,2~ V~1,3~ V~1,4~ @1,3
>          A (2, V~2~)          => V~1,1~ V~1,2~ @1,2
> {noformat}
> A {{JoinCollector}} from _x_ will have been created by requesting an
> iterator from _A_ and another from _y_. The iterator at _y_ is built by
> requesting iterators from _B_, _C_, and _D_. Since _D_ doesn't contain the
> key 1, it returns an empty iterator. Since the value to return for a given
> join is a {{Writable}} provided by the user, the iterators returned are also
> responsible for writing the next value in that stream. For multilevel joins
> passing through a subclass of {{JoinRecordReader}}, the value produced will
> contain tuples within tuples; iterators for composites delegate to
> sub-iterators responsible for filling the value in the tuple at the position
> matching their position in the composite. In a sense, the only iterators
> that write to a tuple are the {{RecordReader}} s at the leaves. Note that
> this also implies that emitted tuples may not contain values from each
> source, but they will always have the same capacity.
> h4. Writables
> {{Writable}} objects- including {{InputSplit}} s and {{TupleWritable}} s-
> encode themselves in the following format:
> {noformat}
> <count><class1><class2>...<classn><obj1><obj2>...<objn>
> {noformat}
> The inefficiency is regrettable- particularly since this overhead is
> incurred for every instance and most often the tuples emitted will be
> processed only within the map- but the encoding satisfies the {{Writable}}
> contract well enough to be emitted to the reducer, written to disk, etc. It
> is hoped that general compression will trim the most egregious waste. It
> should be noted that the framework does not actually write out a tuple (i.e.
> does not suffer for this deficiency) unless emitting one from
> {{MultiFilterRecordReader}} (a rare case in practice, it is hoped).
> h4. Extensibility
> The join framework is modestly extensible. Practically, users seeking to add
> their own identifiers to join expressions are limited to extending
> {{JoinRecordReader}} and {{MultiFilterRecordReader}}. There is considerable
> latitude within these constraints, as illustrated in
> {{OverrideRecordReader}}, where values in child {{RecordReader}} s are
> skipped instead of incurring the overhead of building the iterator (that
> will inevitably be discarded).^2^ For most cases, the user need only
> implement the combine and/or emit methods in their subclass. It is expected
> that most will find that the three default operations will suffice.
> Adding arguments to expressions is more difficult. One would need to include
> a {{Node}} type for the parser, which requires some knowledge of its inner
> workings. The model in this area is crude and requires refinement before it
> can be "extensible" by a reasonable definition.
> h3. Performance
> I have no numbers.
> Notes
> 1. This isn't strictly true. The "leftmost" source will never need to repeat
> itself. Adding a pseudo-{{ResettableIterator}} to handle this case would be
> a welcome addition.
> 2. Note that- even if reset- the override will only loop through the values
> in the rightmost key, instead of repeating that series a number of times
> equal to the cardinality of the cross product of the discarded streams
> (regrettably, looking at the code of {{OverrideRecordReader}} is more
> illustrative than this explanation).

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to