[ https://issues.apache.org/jira/browse/HADOOP-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chris Douglas updated HADOOP-2085: ---------------------------------- Status: Open (was: Patch Available) > Map-side joins on sorted, equally-partitioned datasets > ------------------------------------------------------ > > Key: HADOOP-2085 > URL: https://issues.apache.org/jira/browse/HADOOP-2085 > Project: Hadoop > Issue Type: New Feature > Components: mapred > Reporter: Chris Douglas > Assignee: Chris Douglas > Fix For: 0.16.0 > > Attachments: 2085-2.patch, 2085-3.patch, 2085-4.patch, 2085-5.patch, > 2085.patch > > > h3. Motivation > Given a set of sorted datasets keyed with the same class and yielding equal > partitions, it is possible to effect a join of those datasets prior to the > map. This could save costs in re-partitioning, sorting, shuffling, and > writing out data required in the general case. > h3. Interface > The attached code offers the following interface to users of these classes. > || property || required || value || > | mapred.join.expr | yes | Join expression to effect over input data | > | mapred.join.keycomparator | no | {{WritableComparator}} class to use for > comparing keys | > | mapred.join.define.<ident> | no | Class mapped to identifier in join > expression | > The join expression understands the following grammar: > {noformat} > func ::= <ident>([<func>,]*<func>) > func ::= tbl(<class>,"<path>"); > {noformat} > Operations included in this patch are partitioned into one of two types: > join operations emitting tuples and "multi-filter" operations emitting a > single value from (but not necessarily included in) a set of input values. > For a given key, each operation will consider the cross product of all > values for all sources at that node. > Identifiers supported by default: > || identifier || type || description || > | inner | Join | Full inner join | > | outer | Join | Full outer join | > | override | MultiFilter | For a given key, prefer values from the rightmost > source | > A user of this class must set the {{InputFormat}} for the job to > {{CompositeInputFormat}} and define a join expression accepted by the > preceding > grammar. For example, both of the following are acceptable: > {noformat} > inner(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class, > "hdfs://host:8020/foo/bar"), > tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class, > "hdfs://host:8020/foo/baz")) > outer(override(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class, > "hdfs://host:8020/foo/bar"), > tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class, > "hdfs://host:8020/foo/baz")), > tbl(org.apache.hadoop.mapred/SequenceFileInputFormat.class, > "hdfs://host:8020/foo/rab")) > {noformat} > {{CompositeInputFormat}} includes a handful of convenience methods to aid > construction of these verbose statements. > As in the second example, joins may be nested. Users may provide a > comparator class in the {{mapred.join.keycomparator}} property to > specify the ordering of their keys, or accept the default comparator as > returned by {{WritableComparator.get(keyclass)}}. > Users can specify their own join operations, typically by overriding > {{JoinRecordReader}} or {{MultiFilterRecordReader}} and mapping that class > to an identifier in the join expression using the > {{mapred.join.define._ident_}} property, where _ident_ is the identifier > appearing in the join expression. Users may elect to emit- or modify- values > passing through their join operation. Consulting the existing operations for > guidance is recommended. Adding arguments is considerably more complex (and > only partially supported), as one must also add a {{Node}} type to the parse > tree. One is probably better off extending {{RecordReader}} in most cases. > h3. Design > As alluded to above, the design defines inner (Composite) and leaf (Wrapped) > types for the join tree. Delegation satisfies most requirements of the > {{InputFormat}} contract, particularly {{validateInput}} and {{getSplits}}. > Most of the work in this patch concerns {{getRecordReader}}. The > {{CompositeInputFormat}} itself delegates to the parse tree generated by > {{Parser}}. > h4. Hierarchical Joins > Each {{RecordReader}} from the user must be "wrapped", since effecting a > join requires the framework to track the head value from each source. Since > the cross product of all values for each composite level of the join is > emitted to its parent, all sources ^1^ must be capable of repeating the > values for the current key. To avoid keeping an excessive number of copies > (one per source per level), each composite requests its children to populate > a {{JoinCollector}} with an iterator over its values. This way, there is > only one copy of the current key for each composite node, the head key-value > pair for each leaf, and storage at each leaf for all the values matching the > current key at the parent collector (if it is currently participating in a > join at the root). Strategies have been employed to avoid excessive copying > when filling a user-provided {{Writable}}, but they have been conservative > (e.g. in {{MultiFilterRecordReader}}, the value emitted is cloned in case > the user modifies the value returned, possibly changing the state of a > {{JoinCollector}} in the tree). For example, if the following sources > contain these key streams: > {noformat} > A: 0 0 1 1 2 ... > B: 1 1 1 1 2 ... > C: 1 6 21 107 ... > D: 6 28 496 8128 33550336 ... > {noformat} > Let _A-D_ be wrapped sources and _x,y_ be composite operations. If the > expression is of the form {{x(A, y(B,C,D))}}, then when the current key at > the root is 1 the tree may look like this: > {noformat} > x (1, [ I(A), [ I(y) ] ] ) > / \ > W y (1, [ I(B), I(C), EMPTY ]) > | / | \ > | W W W > | | | D (6, V~6~) => EMPTY > | | C (6, V~6~) => V~1.1~ @1.1 > | B (2, V~2~) => V~1,1~ V~1,2~ V~1,3~ V~1,4~ @1,3 > A (2, V~2~) => V~1,1~ V~1,2~ @1,2 > {noformat} > A {{JoinCollector}} from _x_ will have been created by requesting an > iterator from _A_ and another from _y_. The iterator at _y_ is built by > requesting iterators from _B_, _C_, and _D_. Since _D_ doesn't contain the > key 1, it returns an empty iterator. Since the value to return for a given > join is a {{Writable}} provided by the user, the iterators returned are also > responsible for writing the next value in that stream. For multilevel joins > passing through a subclass of {{JoinRecordReader}}, the value produced will > contain tuples within tuples; iterators for composites delegate to > sub-iterators responsible for filling the value in the tuple at the position > matching their position in the composite. In a sense, the only iterators > that write to a tuple are the {{RecordReader}} s at the leaves. Note that > this also implies that emitted tuples may not contain values from each > source, but they will always have the same capacity. > h4. Writables > {{Writable}} objects- including {{InputSplit}} s and {{TupleWritable}} s- > encode themselves in the following format: > {noformat} > <count><class1><class2>...<classn><obj1><obj2>...<objn> > {noformat} > The inefficiency is regrettable- particularly since this overhead is > incurred for every instance and most often the tuples emitted will be > processed only within the map- but the encoding satisfies the {{Writable}} > contract well enough to be emitted to the reducer, written to disk, etc. It > is hoped that general compression will trim the most egregious waste. It > should be noted that the framework does not actually write out a tuple (i.e. > does not suffer for this deficiency) unless emitting one from > {{MultiFilterRecordReader}} (a rare case in practice, it is hoped). > h4. Extensibility > The join framework is modestly extensible. Practically, users seeking to add > their own identifiers to join expressions are limited to extending > {{JoinRecordReader}} and {{MultiFilterRecordReader}}. There is considerable > latitude within these constraints, as illustrated in > {{OverrideRecordReader}}, where values in child {{RecordReader}} s are > skipped instead of incurring the overhead of building the iterator (that > will inevitably be discarded).^2^ For most cases, the user need only > implement the combine and/or emit methods in their subclass. It is expected > that most will find that the three default operations will suffice. > Adding arguments to expressions is more difficult. One would need to include > a {{Node}} type for the parser, which requires some knowledge of its inner > workings. The model in this area is crude and requires refinement before it > can be "extensible" by a reasonable definition. > h3. Performance > I have no numbers. > Notes > 1. This isn't strictly true. The "leftmost" source will never need to repeat > itself. Adding a pseudo-{{ResettableIterator}} to handle this case would be > a welcome addition. > 2. Note that- even if reset- the override will only loop through the values > in the rightmost key, instead of repeating that series a number of times > equal to the cardinality of the cross product of the discarded streams > (regrettably, looking at the code of {{OverrideRecordReader}} is more > illustrative than this explanation). -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.