[ 
https://issues.apache.org/jira/browse/MAPREDUCE-326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12834398#action_12834398
 ] 

Chris Douglas commented on MAPREDUCE-326:
-----------------------------------------

bq. Tom> The proposal does not introduce extra buffer copies. The example in 
section 3.3 is really just a reorganization of the current code, since the 
serializers are constructed with DataOutputStream objects - just as they are 
now. The point is that the serialization is moved out of the kernel. There is 
no extra copying going on.

Let's be more explicit. This is the code in question:
{code}
public void write(K key, V value) throws IOException, InterruptedException {
  int partition = partitioner.getPartition(key, value, partitions);
  keySerializer.serialize(key);
  valSerializer.serialize(value);
  rawMapOutputCollector.collect(keyBuffer, valueBuffer, partition);
}
{code}
What is backing {{keySerializer}} and {{valSerializer}}? What are {{keyBuffer}} 
and {{valueBuffer}} if not intermediate structures separate from the collection 
buffer? If backed by the collection buffer as in the current code, then the 
call to collect is redundant, because the bytes have already been written; 
{{collect}} would merely note that the range of bytes just written belongs to 
that partition, which is just confused. And each serializer is backed by the 
internal {{BlockingBuffer}} that manages the writes into the shared collection 
buffer; it subclasses {{DataOutputStream}} because it used to require 
{{DataOutput}} because of the {{Writable}} contract, not because it has an 
intermediate {{DataOutputStream}} for the serializers.

What the proposed section says is that {{keySerializer}} and {{valSerializer}} 
write into separate {{DataOutputBuffer}} instances ({{keyBuffer}} and 
{{valueBuffer}}) then the serialized bytes are copied from the byte arrays 
backing those structures into the collection buffer. There is absolutely an 
extra copy in this scheme. It would not be reasonable to read it otherwise.

bq. Tom> At least, this was true until MAPREDUCE-64 was committed (around the 
time I was writing the proposal). Now it uses a different buffer object 
(BlockingBuffer, which is a DataOutputStream), so really DataOutputBuffer 
should be replaced by DataOutputStream in the write() method signature.

It has worked this way since HADOOP-2919 in 0.17... and again: if both 
{{keySerializer}} and {{valSerializer}} are backed by the collection buffer, 
then what is {{collect}} doing? What are {{keyBuffer}} and {{valueBuffer}}?

bq. Tom> With a binary API it would be possible to bypass the BytesWritable and 
write directly into the MapOutputBuffer.

You're confusing buffering with unnecessary copying. The transport between 
streaming/pipes and the collection buffer could admissibly write directly from 
its source into the collection buffer without an intermediate {{BytesWritable}} 
iff no backtracking is required. One can only write directly into the buffer if 
the framework guarantees that it will not flush the stream, as Doug's 
{{Channel}} API acknowledges by requiring that the bytes be retained in memory 
while being broken into records.

bq. Tom> Keys and values sent to MapOutputBuffer.collect() ultimately find 
their way to IFile.append() (in MapOutputBuffer's sortAndSpill() method). I'm 
just remarking that these become binary level interfaces, so there are no 
generic types in these interfaces in this proposal.

There should be no type overhead in the intermediate data pipeline, unless one 
is using a combiner or has not defined a {{RawComparator}}. Even there, since 
context-specific serialization has been admitted as an admissible use case, I 
do not think one is forced to make a copy of backing storage to run the 
combiner. If the generic types are {{ByteBuffer}} or {{Object}}, I'm not sure 
what overhead is incurred; certainly there is no safety.

----

bq. Doug> Perhaps this might instead look something 
[like|https://issues.apache.org/jira/browse/MAPREDUCE-326?focusedCommentId=12833907&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#action_12833907]:

So the goal is to read directly into the collection buffer? Note that this 
could be constraining for future framework dev: the current design is free to 
relocate the offset of its next record, but a design that lends backing storage 
to user code may need to avoid and track segments of unreleased data not yet in 
records. This may introduce internal fragmentation into the intermediate 
buffering and- if frameworks are not careful- may block more often than 
necessary by preventing the framework from spilling when it prefers. None of 
these are fatal, but it's worth noting that any proposal on these lines cedes a 
considerable amount of control to frameworks, complicates the implementation if 
it retains any intelligence, and denies MapReduce some opportunities to make 
optimizations that benefit all.

The API you suggest looks great for the identity map (many jobs do use it) or 
for a framework that does its own buffering of emitted records, keeping its own 
set of offsets as the output collector does. Is this how Pig and Hive work?

bq. Doug> The goal is to permit the kernel to identify record boundaries (so 
that it can compare, sort and transmit records) while at the same time minimize 
per-record data copying. Getting this API right without benchmarking might 
prove difficult. We should benchmark this under various scenarios: A key/value 
pair of Writable instances, line-based data from a text file, and 
length-delimited, raw binary data.

+1 Winning a benchmark- by a significant margin- should be a prerequisite to 
committing these changes.

bq. Doug> Can you please elaborate? I don't see the words "pipes" or 
"streaming" mentioned in that issue. How does one load Python, Ruby, C++, etc. 
into Java? MAPREDUCE-1183 seems to me just to be a different way to encapsulate 
configuration data, grouping it per extension point rather than centralizing it 
in the job config.

The discussion in MAPREDUCE-1183 also includes writing job descriptions for 
TaskRunners in different languages. You were the first to raise it 
[here|https://issues.apache.org/jira/browse/MAPREDUCE-1183?focusedCommentId=12774027&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#action_12774027].
 Nothing in Java will be better than an implementation in the native language, 
someday it will be written, and MAPREDUCE-1183 makes progress toward it by 
defining a submission API that is as versatile as the serialization and not 
{{Configuration}}. I think it makes more progress toward polyglotism than the 
issue at hand.

----

I proposed earlier that the byte-oriented utilities effecting the work in 
MapReduce- {{MapOutputBuffer}}, {{IFile}}, {{Merger}}, etc.- be cleaned up and 
make available to frameworks. I still think it is sufficient for performance 
improvements in the near, even foreseeable future.

Again, even one use case for this would be hugely useful to the discussion.

> The lowest level map-reduce APIs should be byte oriented
> --------------------------------------------------------
>
>                 Key: MAPREDUCE-326
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-326
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>            Reporter: eric baldeschwieler
>         Attachments: MAPREDUCE-326-api.patch, MAPREDUCE-326.pdf
>
>
> As discussed here:
> https://issues.apache.org/jira/browse/HADOOP-1986#action_12551237
> The templates, serializers and other complexities that allow map-reduce to 
> use arbitrary types complicate the design and lead to lots of object creates 
> and other overhead that a byte oriented design would not suffer.  I believe 
> the lowest level implementation of hadoop map-reduce should have byte string 
> oriented APIs (for keys and values).  This API would be more performant, 
> simpler and more easily cross language.
> The existing API could be maintained as a thin layer on top of the leaner API.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to