[
https://issues.apache.org/jira/browse/MAPREDUCE-326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12834398#action_12834398
]
Chris Douglas commented on MAPREDUCE-326:
-----------------------------------------
bq. Tom> The proposal does not introduce extra buffer copies. The example in
section 3.3 is really just a reorganization of the current code, since the
serializers are constructed with DataOutputStream objects - just as they are
now. The point is that the serialization is moved out of the kernel. There is
no extra copying going on.
Let's be more explicit. This is the code in question:
{code}
public void write(K key, V value) throws IOException, InterruptedException {
int partition = partitioner.getPartition(key, value, partitions);
keySerializer.serialize(key);
valSerializer.serialize(value);
rawMapOutputCollector.collect(keyBuffer, valueBuffer, partition);
}
{code}
What is backing {{keySerializer}} and {{valSerializer}}? What are {{keyBuffer}}
and {{valueBuffer}} if not intermediate structures separate from the collection
buffer? If backed by the collection buffer as in the current code, then the
call to collect is redundant, because the bytes have already been written;
{{collect}} would merely note that the range of bytes just written belongs to
that partition, which is just confused. And each serializer is backed by the
internal {{BlockingBuffer}} that manages the writes into the shared collection
buffer; it subclasses {{DataOutputStream}} because it used to require
{{DataOutput}} because of the {{Writable}} contract, not because it has an
intermediate {{DataOutputStream}} for the serializers.
What the proposed section says is that {{keySerializer}} and {{valSerializer}}
write into separate {{DataOutputBuffer}} instances ({{keyBuffer}} and
{{valueBuffer}}) then the serialized bytes are copied from the byte arrays
backing those structures into the collection buffer. There is absolutely an
extra copy in this scheme. It would not be reasonable to read it otherwise.
bq. Tom> At least, this was true until MAPREDUCE-64 was committed (around the
time I was writing the proposal). Now it uses a different buffer object
(BlockingBuffer, which is a DataOutputStream), so really DataOutputBuffer
should be replaced by DataOutputStream in the write() method signature.
It has worked this way since HADOOP-2919 in 0.17... and again: if both
{{keySerializer}} and {{valSerializer}} are backed by the collection buffer,
then what is {{collect}} doing? What are {{keyBuffer}} and {{valueBuffer}}?
bq. Tom> With a binary API it would be possible to bypass the BytesWritable and
write directly into the MapOutputBuffer.
You're confusing buffering with unnecessary copying. The transport between
streaming/pipes and the collection buffer could admissibly write directly from
its source into the collection buffer without an intermediate {{BytesWritable}}
iff no backtracking is required. One can only write directly into the buffer if
the framework guarantees that it will not flush the stream, as Doug's
{{Channel}} API acknowledges by requiring that the bytes be retained in memory
while being broken into records.
bq. Tom> Keys and values sent to MapOutputBuffer.collect() ultimately find
their way to IFile.append() (in MapOutputBuffer's sortAndSpill() method). I'm
just remarking that these become binary level interfaces, so there are no
generic types in these interfaces in this proposal.
There should be no type overhead in the intermediate data pipeline, unless one
is using a combiner or has not defined a {{RawComparator}}. Even there, since
context-specific serialization has been admitted as an admissible use case, I
do not think one is forced to make a copy of backing storage to run the
combiner. If the generic types are {{ByteBuffer}} or {{Object}}, I'm not sure
what overhead is incurred; certainly there is no safety.
----
bq. Doug> Perhaps this might instead look something
[like|https://issues.apache.org/jira/browse/MAPREDUCE-326?focusedCommentId=12833907&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#action_12833907]:
So the goal is to read directly into the collection buffer? Note that this
could be constraining for future framework dev: the current design is free to
relocate the offset of its next record, but a design that lends backing storage
to user code may need to avoid and track segments of unreleased data not yet in
records. This may introduce internal fragmentation into the intermediate
buffering and- if frameworks are not careful- may block more often than
necessary by preventing the framework from spilling when it prefers. None of
these are fatal, but it's worth noting that any proposal on these lines cedes a
considerable amount of control to frameworks, complicates the implementation if
it retains any intelligence, and denies MapReduce some opportunities to make
optimizations that benefit all.
The API you suggest looks great for the identity map (many jobs do use it) or
for a framework that does its own buffering of emitted records, keeping its own
set of offsets as the output collector does. Is this how Pig and Hive work?
bq. Doug> The goal is to permit the kernel to identify record boundaries (so
that it can compare, sort and transmit records) while at the same time minimize
per-record data copying. Getting this API right without benchmarking might
prove difficult. We should benchmark this under various scenarios: A key/value
pair of Writable instances, line-based data from a text file, and
length-delimited, raw binary data.
+1 Winning a benchmark- by a significant margin- should be a prerequisite to
committing these changes.
bq. Doug> Can you please elaborate? I don't see the words "pipes" or
"streaming" mentioned in that issue. How does one load Python, Ruby, C++, etc.
into Java? MAPREDUCE-1183 seems to me just to be a different way to encapsulate
configuration data, grouping it per extension point rather than centralizing it
in the job config.
The discussion in MAPREDUCE-1183 also includes writing job descriptions for
TaskRunners in different languages. You were the first to raise it
[here|https://issues.apache.org/jira/browse/MAPREDUCE-1183?focusedCommentId=12774027&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#action_12774027].
Nothing in Java will be better than an implementation in the native language,
someday it will be written, and MAPREDUCE-1183 makes progress toward it by
defining a submission API that is as versatile as the serialization and not
{{Configuration}}. I think it makes more progress toward polyglotism than the
issue at hand.
----
I proposed earlier that the byte-oriented utilities effecting the work in
MapReduce- {{MapOutputBuffer}}, {{IFile}}, {{Merger}}, etc.- be cleaned up and
make available to frameworks. I still think it is sufficient for performance
improvements in the near, even foreseeable future.
Again, even one use case for this would be hugely useful to the discussion.
> The lowest level map-reduce APIs should be byte oriented
> --------------------------------------------------------
>
> Key: MAPREDUCE-326
> URL: https://issues.apache.org/jira/browse/MAPREDUCE-326
> Project: Hadoop Map/Reduce
> Issue Type: Improvement
> Reporter: eric baldeschwieler
> Attachments: MAPREDUCE-326-api.patch, MAPREDUCE-326.pdf
>
>
> As discussed here:
> https://issues.apache.org/jira/browse/HADOOP-1986#action_12551237
> The templates, serializers and other complexities that allow map-reduce to
> use arbitrary types complicate the design and lead to lots of object creates
> and other overhead that a byte oriented design would not suffer. I believe
> the lowest level implementation of hadoop map-reduce should have byte string
> oriented APIs (for keys and values). This API would be more performant,
> simpler and more easily cross language.
> The existing API could be maintained as a thin layer on top of the leaner API.
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.