http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/resources/META-INF/services/org.apache.hadoop.mapred.nativetask.Platform
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/resources/META-INF/services/org.apache.hadoop.mapred.nativetask.Platform
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/resources/META-INF/services/org.apache.hadoop.mapred.nativetask.Platform
new file mode 100644
index 0000000..eef215d
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/resources/META-INF/services/org.apache.hadoop.mapred.nativetask.Platform
@@ -0,0 +1,14 @@
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+org.apache.hadoop.mapred.nativetask.HadoopPlatform

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/TestTaskContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/TestTaskContext.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/TestTaskContext.java
new file mode 100644
index 0000000..a76b1b2
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/TestTaskContext.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask;
+
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+import junit.framework.TestCase;
+
+public class TestTaskContext extends TestCase {
+  
+  public void testTaskContext() {
+    TaskContext context = new TaskContext(null, null, null, null, null, null, 
null);
+    
+    context.setInputKeyClass(IntWritable.class);
+    assertEquals(IntWritable.class.getName(), 
context.getInputKeyClass().getName()); 
+ 
+    context.setInputValueClass(Text.class);
+    assertEquals(Text.class.getName(), 
context.getInputValueClass().getName()); 
+   
+    context.setOutputKeyClass(LongWritable.class);
+    assertEquals(LongWritable.class.getName(), 
context.getOuputKeyClass().getName()); 
+
+    context.setOutputValueClass(FloatWritable.class);
+    assertEquals(FloatWritable.class.getName(), 
context.getOutputValueClass().getName()); 
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestBufferPushPull.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestBufferPushPull.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestBufferPushPull.java
new file mode 100644
index 0000000..5dcac35
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestBufferPushPull.java
@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.buffer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.nativetask.DataReceiver;
+import org.apache.hadoop.mapred.nativetask.NativeDataSource;
+import org.apache.hadoop.mapred.nativetask.NativeDataTarget;
+import org.apache.hadoop.mapred.nativetask.handlers.BufferPullee;
+import org.apache.hadoop.mapred.nativetask.handlers.BufferPuller;
+import org.apache.hadoop.mapred.nativetask.handlers.BufferPushee;
+import org.apache.hadoop.mapred.nativetask.handlers.BufferPusher;
+import org.apache.hadoop.mapred.nativetask.handlers.IDataLoader;
+import org.apache.hadoop.mapred.nativetask.testutil.TestInput;
+import org.apache.hadoop.mapred.nativetask.testutil.TestInput.KV;
+import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer;
+import org.apache.hadoop.util.Progress;
+import org.junit.Before;
+
+@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
+public class TestBufferPushPull extends TestCase {
+
+  public static int BUFFER_LENGTH = 100; // 100 bytes
+  public static int INPUT_KV_COUNT = 1000;
+  private KV<BytesWritable, BytesWritable>[] dataInput;
+
+  @Override
+  @Before
+  public void setUp() {
+    this.dataInput = TestInput.getMapInputs(INPUT_KV_COUNT);
+  }
+
+  public void testPush() throws Exception {
+    final byte[] buff = new byte[BUFFER_LENGTH];
+
+    final InputBuffer input = new InputBuffer(buff);
+
+    final OutputBuffer out = new OutputBuffer(buff);
+
+    final Class<BytesWritable> iKClass = BytesWritable.class;
+    final Class<BytesWritable> iVClass = BytesWritable.class;
+
+    final RecordWriterForPush writer = new RecordWriterForPush() {
+      @Override
+      public void write(BytesWritable key, BytesWritable value) throws 
IOException {
+        final KV expect = dataInput[count++];
+        Assert.assertEquals(expect.key.toString(), key.toString());
+        Assert.assertEquals(expect.value.toString(), value.toString());
+      }
+    };
+
+    final BufferPushee pushee = new BufferPushee(iKClass, iVClass, writer);
+
+    final PushTarget handler = new PushTarget(out) {
+
+      @Override
+      public void sendData() throws IOException {
+        final int outputLength = out.length();
+        input.rewind(0, outputLength);
+        out.rewind();
+        pushee.collect(input);
+      }
+    };
+
+    final BufferPusher pusher = new BufferPusher(iKClass, iVClass, handler);
+
+    writer.reset();
+    for (int i = 0; i < INPUT_KV_COUNT; i++) {
+      pusher.collect(dataInput[i].key, dataInput[i].value);
+    }
+    pusher.close();
+    pushee.close();
+  }
+
+  public void testPull() throws Exception {
+    final byte[] buff = new byte[BUFFER_LENGTH];
+
+    final InputBuffer input = new InputBuffer(buff);
+
+    final OutputBuffer out = new OutputBuffer(buff);
+
+    final Class<BytesWritable> iKClass = BytesWritable.class;
+    final Class<BytesWritable> iVClass = BytesWritable.class;
+
+    final NativeHandlerForPull handler = new NativeHandlerForPull(input, out);
+
+    final KeyValueIterator iter = new KeyValueIterator();
+    final BufferPullee pullee = new BufferPullee(iKClass, iVClass, iter, 
handler);
+    handler.setDataLoader(pullee);
+
+    final BufferPuller puller = new BufferPuller(handler);
+    handler.setDataReceiver(puller);
+
+    int count = 0;
+
+    while (puller.next()) {
+      final DataInputBuffer key = puller.getKey();
+      final DataInputBuffer value = puller.getValue();
+
+      final BytesWritable keyBytes = new BytesWritable();
+      final BytesWritable valueBytes = new BytesWritable();
+
+      keyBytes.readFields(key);
+      valueBytes.readFields(value);
+
+      Assert.assertEquals(dataInput[count].key.toString(), 
keyBytes.toString());
+      Assert.assertEquals(dataInput[count].value.toString(), 
valueBytes.toString());
+
+      count++;
+    }
+
+    puller.close();
+    pullee.close();
+  }
+
+  public abstract class PushTarget implements NativeDataTarget {
+    OutputBuffer out;
+
+    PushTarget(OutputBuffer out) {
+      this.out = out;
+    }
+
+    @Override
+    public abstract void sendData() throws IOException;
+
+    @Override
+    public void finishSendData() throws IOException {
+      sendData();
+    }
+
+    @Override
+    public OutputBuffer getOutputBuffer() {
+      return out;
+    }
+  }
+
+  public abstract class RecordWriterForPush implements 
RecordWriter<BytesWritable, BytesWritable> {
+
+    protected int count = 0;
+
+    RecordWriterForPush() {
+    }
+
+    @Override
+    public abstract void write(BytesWritable key, BytesWritable value) throws 
IOException;
+
+    @Override
+    public void close(Reporter reporter) throws IOException {
+    }
+
+    public void reset() {
+      count = 0;
+    }
+  };
+
+  public static class NativeHandlerForPull implements NativeDataSource, 
NativeDataTarget {
+
+    InputBuffer in;
+    private final OutputBuffer out;
+
+    private IDataLoader dataLoader;
+    private DataReceiver dataReceiver;
+
+    public NativeHandlerForPull(InputBuffer input, OutputBuffer out) {
+      this.in = input;
+      this.out = out;
+    }
+
+    @Override
+    public InputBuffer getInputBuffer() {
+      return in;
+    }
+
+    @Override
+    public void setDataReceiver(DataReceiver handler) {
+      this.dataReceiver = handler;
+    }
+
+    @Override
+    public void loadData() throws IOException {
+      final int size = dataLoader.load();
+    }
+
+    public void setDataLoader(IDataLoader dataLoader) {
+      this.dataLoader = dataLoader;
+    }
+
+    @Override
+    public void sendData() throws IOException {
+      final int len = out.length();
+      out.rewind();
+      in.rewind(0, len);
+      dataReceiver.receiveData();
+    }
+
+    @Override
+    public void finishSendData() throws IOException {
+      dataReceiver.receiveData();
+    }
+
+    @Override
+    public OutputBuffer getOutputBuffer() {
+      return this.out;
+    }
+  }
+
+  public class KeyValueIterator implements RawKeyValueIterator {
+    int count = 0;
+    BytesWritable key;
+    BytesWritable value;
+
+    @Override
+    public DataInputBuffer getKey() throws IOException {
+      return convert(key);
+    }
+
+    @Override
+    public DataInputBuffer getValue() throws IOException {
+      return convert(value);
+    }
+
+    private DataInputBuffer convert(BytesWritable b) throws IOException {
+      final ByteArrayOutputStream out = new ByteArrayOutputStream();
+      b.write(new DataOutputStream(out));
+      final byte[] array = out.toByteArray();
+      final DataInputBuffer result = new DataInputBuffer();
+      result.reset(array, array.length);
+      return result;
+    }
+
+    @Override
+    public boolean next() throws IOException {
+      if (count < INPUT_KV_COUNT) {
+        key = dataInput[count].key;
+        value = dataInput[count].key;
+        count++;
+        return true;
+      }
+      return false;
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public Progress getProgress() {
+      return null;
+    }
+  };
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestByteBufferReadWrite.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestByteBufferReadWrite.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestByteBufferReadWrite.java
new file mode 100644
index 0000000..424354b
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestByteBufferReadWrite.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.buffer;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+import org.apache.hadoop.mapred.nativetask.NativeDataTarget;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+public class TestByteBufferReadWrite extends TestCase{
+  
+  
+  public void testReadWrite() throws IOException {
+    byte[] buff = new byte[10000];
+    
+    InputBuffer input = new InputBuffer(buff);
+    MockDataTarget target = new MockDataTarget(buff);
+    ByteBufferDataWriter writer = new ByteBufferDataWriter(target);
+    
+    writer.write(1);
+    writer.write(new byte[] {2, 2}, 0, 2);
+    writer.writeBoolean(true);
+    writer.writeByte(4);
+    writer.writeShort(5);
+    writer.writeChar(6);
+    writer.writeInt(7);
+    writer.writeLong(8);
+    writer.writeFloat(9);
+    writer.writeDouble(10);
+    writer.writeBytes("goodboy");
+    writer.writeChars("hello");
+    writer.writeUTF("native task");
+    
+    int length = target.getOutputBuffer().length();
+    input.rewind(0, length);
+    ByteBufferDataReader reader = new ByteBufferDataReader(input);
+    
+    Assert.assertEquals(1, reader.read());
+    byte[] two = new byte[2];
+    reader.read(two);
+    Assert.assertTrue(two[0] == two[1] && two[0] == 2);
+    
+    
+    Assert.assertEquals(true, reader.readBoolean());
+    Assert.assertEquals(4, reader.readByte());
+    Assert.assertEquals(5, reader.readShort());
+    Assert.assertEquals(6, reader.readChar());
+    Assert.assertEquals(7, reader.readInt());
+    Assert.assertEquals(8, reader.readLong());
+    Assert.assertTrue(reader.readFloat() - 9 < 0.0001);
+    Assert.assertTrue(reader.readDouble() - 10 < 0.0001);
+    
+    byte[] goodboy = new byte["goodboy".length()];
+    reader.read(goodboy);
+    Assert.assertEquals("goodboy", toString(goodboy));
+    
+    char[] hello = new char["hello".length()];
+    for (int i = 0; i < hello.length; i++) {
+      hello[i] = reader.readChar();
+    }
+    
+    String helloString = new String(hello);
+    Assert.assertEquals("hello", helloString);
+    
+    Assert.assertEquals("native task", reader.readUTF());
+    
+    Assert.assertEquals(0, input.remaining());
+  }
+  
+  public void testShortOfSpace() throws IOException {
+    byte[] buff = new byte[10];
+    MockDataTarget target = new MockDataTarget(buff);
+    ByteBufferDataWriter writer = new ByteBufferDataWriter(target);
+    Assert.assertEquals(false, writer.hasUnFlushedData()); 
+    
+    writer.write(1);
+    writer.write(new byte[] {2, 2}, 0, 2);
+    Assert.assertEquals(true, writer.hasUnFlushedData()); 
+    
+    Assert.assertEquals(true, writer.shortOfSpace(100));
+  }
+
+  public void testFlush() throws IOException {
+    byte[] buff = new byte[10];
+    final Counter flushCount = new Counter();
+    final Flag finishFlag = new Flag();
+    MockDataTarget target = new MockDataTarget(buff) {
+      @Override
+      public void sendData() throws IOException {
+        flushCount.increase();
+      }
+      
+      @Override
+      public void finishSendData() throws IOException {
+        finishFlag.set(true);
+      }
+    };
+    
+    ByteBufferDataWriter writer = new ByteBufferDataWriter(target);
+    Assert.assertEquals(false, writer.hasUnFlushedData()); 
+    
+    writer.write(1);
+    writer.write(new byte[100]);
+
+    Assert.assertEquals(true, writer.hasUnFlushedData()); 
+    writer.close();    
+    Assert.assertEquals(11, flushCount.get());
+    Assert.assertEquals(true, finishFlag.get()); 
+
+  }
+  
+  private static String toString(byte[] str) throws 
UnsupportedEncodingException {
+    return new String(str, 0, str.length, "UTF-8");
+  }
+  
+  private static class MockDataTarget implements NativeDataTarget {
+
+    private OutputBuffer out;
+
+    MockDataTarget(byte[] buffer) {
+      this.out = new OutputBuffer(buffer);
+    }
+    
+    @Override
+    public void sendData() throws IOException {
+      
+    }
+
+    @Override
+    public void finishSendData() throws IOException {
+       
+    }
+
+    @Override
+    public OutputBuffer getOutputBuffer() {
+      return out;
+    }    
+  }
+  
+  private static class Counter {
+    private int count;
+    
+    public int get() {
+      return count;
+    }
+    
+    public void increase() {
+      count++;
+    }
+  }
+  
+  private static class Flag {
+    private boolean value;
+    
+    public void set(boolean status) {
+      this.value = status;
+    }
+    
+    public boolean get() {
+      return this.value;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestDirectBufferPool.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestDirectBufferPool.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestDirectBufferPool.java
new file mode 100644
index 0000000..09c1ef5
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestDirectBufferPool.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.buffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.ArrayList;
+
+import org.junit.Test;
+
+public class TestDirectBufferPool {
+
+  @Test
+  public void testGetInstance() throws Exception {
+    final int num = 100;
+    List<DirectBufferPool> pools = new ArrayList<DirectBufferPool>();
+    Thread[] list = new Thread[num];
+    for (int i = 0; i < num; i++)  {
+      Thread t = getPoolThread(pools);
+      t.start();
+      list[i] = t;
+    }
+    for (int i = 0; i < num; i++) {
+      try {
+        list[i].join(10000);
+      } catch (Exception e) {
+        e.printStackTrace(); 
+      }
+    }
+    DirectBufferPool p1 = pools.get(0);
+    assertNotNull(p1);
+    for (int i = 1; i < pools.size(); i++) {
+      DirectBufferPool p2 = pools.get(i);
+      assertNotNull(p2);
+      assertSame(p1, p2);
+    }
+  }
+
+  private Thread getPoolThread(final List<DirectBufferPool> pools) {
+    Thread t = new Thread() {
+      public void run() {
+        pools.add(DirectBufferPool.getInstance());
+      }
+    };
+    return t;
+  }
+
+
+  @Test
+  public void testBufBorrow() throws IOException {
+    final DirectBufferPool bufferPool = DirectBufferPool.getInstance();
+    ByteBuffer b1 = bufferPool.borrowBuffer(100);
+    assertTrue(b1.isDirect());
+    assertEquals(0, b1.position());
+    assertEquals(100, b1.capacity());
+    bufferPool.returnBuffer(b1);
+    ByteBuffer b2 = bufferPool.borrowBuffer(100);
+    assertTrue(b2.isDirect());
+    assertEquals(0, b2.position());
+    assertEquals(100, b2.capacity());
+    assertSame(b1, b2);
+
+    ByteBuffer b3 =  bufferPool.borrowBuffer(100);
+    assertTrue(b3.isDirect());
+    assertEquals(0, b3.position());
+    assertEquals(100, b3.capacity());
+    assertNotSame(b2, b3);
+    bufferPool.returnBuffer(b2);
+    bufferPool.returnBuffer(b3);
+  }
+
+  @Test
+  public void testBufReset() throws IOException {
+    final DirectBufferPool bufferPool = DirectBufferPool.getInstance();
+    ByteBuffer b1 = bufferPool.borrowBuffer(100);
+    assertTrue(b1.isDirect());
+    assertEquals(0, b1.position());
+    assertEquals(100, b1.capacity());
+    b1.putInt(1);
+    assertEquals(4, b1.position());
+    bufferPool.returnBuffer(b1);
+    ByteBuffer b2 = bufferPool.borrowBuffer(100);
+    assertSame(b1, b2);
+    assertTrue(b2.isDirect());
+    assertEquals(0, b2.position());
+    assertEquals(100, b2.capacity());
+  }
+
+  @Test
+  public void testBufReturn() throws IOException {
+    final DirectBufferPool bufferPool = DirectBufferPool.getInstance();
+    int numOfBufs = 100;
+    int capacity = 100;
+    final ByteBuffer[] bufs = new ByteBuffer[numOfBufs];
+    for (int i = 0; i < numOfBufs; i++) {
+      bufs[i] = bufferPool.borrowBuffer(capacity);
+    }
+
+    assertEquals(0, bufferPool.getBufCountsForCapacity(capacity));
+
+
+    int numOfThreads = numOfBufs;
+    Thread[] list = new Thread[numOfThreads];
+    for (int i = 0; i < numOfThreads; i++) {
+      Thread t = retBufThread(bufferPool, bufs, i);
+      t.start();
+      list[i] = t;
+    }
+    for (int i = 0; i < numOfThreads; i++) {
+      try {
+        list[i].join(10000);
+      } catch (Exception e) {
+       e.printStackTrace();
+      }
+    }
+
+    assertEquals(numOfBufs, bufferPool.getBufCountsForCapacity(capacity));
+  }
+
+  private Thread retBufThread(final DirectBufferPool bufferPool, final 
ByteBuffer[] bufs, final int i) {
+       Thread t = new Thread(new Runnable(){
+        @Override
+        public void run() {
+          try {
+          bufferPool.returnBuffer(bufs[i]);
+          } catch (Exception e) {
+            e.printStackTrace();
+          }
+        }
+      });
+    return t;
+  }
+
+  @Test
+  public void testBufException() {
+    final DirectBufferPool bufferPool = DirectBufferPool.getInstance();
+    boolean thrown = false;
+    try {
+      bufferPool.returnBuffer(null);
+    } catch (IOException e) {
+      thrown = true;
+    }
+    assertEquals(true, thrown);
+
+    thrown = false;
+    ByteBuffer buf = ByteBuffer.allocate(100);
+    try {
+      bufferPool.returnBuffer(buf);
+    } catch (IOException e) {
+      thrown = true;
+    }
+    assertEquals(true, thrown);
+  }
+
+  @Test
+  public void testBufWeakRefClear() throws IOException {
+    final DirectBufferPool bufferPool = DirectBufferPool.getInstance();
+    int numOfBufs = 100;
+    int capacity = 100;
+    ByteBuffer[] list = new ByteBuffer[capacity];
+    for (int i = 0; i < numOfBufs; i++) {
+      list[i] = bufferPool.borrowBuffer(capacity);
+    }
+    for (int i = 0; i < numOfBufs; i++) {
+      bufferPool.returnBuffer(list[i]);
+      list[i] = null;
+    }
+
+    assertEquals(numOfBufs, bufferPool.getBufCountsForCapacity(capacity));
+
+    for (int i = 0; i < 3; i++) {
+      System.gc();
+    }
+
+    ByteBuffer b = bufferPool.borrowBuffer(capacity);
+    assertEquals(0, bufferPool.getBufCountsForCapacity(capacity));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestInputBuffer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestInputBuffer.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestInputBuffer.java
new file mode 100644
index 0000000..7eb6467
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestInputBuffer.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.buffer;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.junit.Assert;
+
+public class TestInputBuffer extends TestCase {
+  public void testInputBuffer() throws IOException {
+    final int size = 100;
+    final InputBuffer input1 = new InputBuffer(BufferType.DIRECT_BUFFER, size);
+    Assert.assertEquals(input1.getType(), BufferType.DIRECT_BUFFER);
+
+    Assert.assertTrue(input1.position() == 0);
+    Assert.assertTrue(input1.length() == 0);
+    Assert.assertTrue(input1.remaining() == 0);
+    Assert.assertTrue(input1.capacity() == size);
+
+    final InputBuffer input2 = new InputBuffer(BufferType.HEAP_BUFFER, size);
+    Assert.assertEquals(input2.getType(), BufferType.HEAP_BUFFER);
+
+    Assert.assertTrue(input2.position() == 0);
+    Assert.assertTrue(input2.length() == 0);
+    Assert.assertTrue(input2.remaining() == 0);
+    Assert.assertTrue(input2.capacity() == size);
+
+    final InputBuffer input3 = new InputBuffer(new byte[size]);
+    Assert.assertEquals(input3.getType(), BufferType.HEAP_BUFFER);
+
+    Assert.assertTrue(input3.position() == 0);
+    Assert.assertTrue(input3.length() == 0);
+    Assert.assertTrue(input3.remaining() == 0);
+    Assert.assertEquals(input3.capacity(), size);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestOutputBuffer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestOutputBuffer.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestOutputBuffer.java
new file mode 100644
index 0000000..39c25a6
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestOutputBuffer.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.buffer;
+
+import junit.framework.TestCase;
+
+import org.junit.Assert;
+
+public class TestOutputBuffer extends TestCase {
+  public void testOutputBuffer() {
+    final int size = 100;
+    final OutputBuffer output1 = new OutputBuffer(BufferType.DIRECT_BUFFER, 
size);
+    Assert.assertEquals(output1.getType(), BufferType.DIRECT_BUFFER);
+
+    Assert.assertTrue(output1.length() == 0);
+    Assert.assertEquals(output1.limit(), size);
+
+    final OutputBuffer output2 = new OutputBuffer(BufferType.HEAP_BUFFER, 
size);
+    Assert.assertEquals(output2.getType(), BufferType.HEAP_BUFFER);
+
+    Assert.assertTrue(output2.length() == 0);
+    Assert.assertEquals(output2.limit(), size);
+
+    final OutputBuffer output3 = new OutputBuffer(new byte[size]);
+    Assert.assertEquals(output3.getType(), BufferType.HEAP_BUFFER);
+
+    Assert.assertTrue(output3.length() == 0);
+    Assert.assertEquals(output3.limit(), size);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/handlers/TestCombineHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/handlers/TestCombineHandler.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/handlers/TestCombineHandler.java
new file mode 100644
index 0000000..7b337b7
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/handlers/TestCombineHandler.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.handlers;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.mapred.Task.CombinerRunner;
+import org.apache.hadoop.mapred.nativetask.Command;
+import org.apache.hadoop.mapred.nativetask.INativeHandler;
+import org.apache.hadoop.mapred.nativetask.buffer.BufferType;
+import org.apache.hadoop.mapred.nativetask.buffer.InputBuffer;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+
+@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
+public class TestCombineHandler extends TestCase {
+
+  private CombinerHandler handler;
+  private INativeHandler nativeHandler;
+  private BufferPusher pusher;
+  private BufferPuller puller;
+  private CombinerRunner combinerRunner;
+
+  @Override
+  public void setUp() throws IOException {
+    
+    this.nativeHandler = Mockito.mock(INativeHandler.class);
+    this.pusher = Mockito.mock(BufferPusher.class);
+    this.puller =  Mockito.mock(BufferPuller.class);
+    this.combinerRunner =  Mockito.mock(CombinerRunner.class);
+
+    Mockito.when(nativeHandler.getInputBuffer()).thenReturn(new 
InputBuffer(BufferType.HEAP_BUFFER, 100));
+  }
+
+  public void testCommandDispatcherSetting() throws IOException {
+    this.handler = new CombinerHandler(nativeHandler, combinerRunner, puller, 
pusher);
+    Mockito.verify(nativeHandler, 
Mockito.times(1)).setCommandDispatcher(Matchers.eq(handler));
+    Mockito.verify(nativeHandler, 
Mockito.times(1)).setDataReceiver(Matchers.eq(puller));
+  }
+
+  public void testCombine() throws IOException, InterruptedException, 
ClassNotFoundException {
+    this.handler = new CombinerHandler(nativeHandler, combinerRunner, puller, 
pusher);
+    Assert.assertEquals(null, handler.onCall(CombinerHandler.COMBINE, null));
+    handler.close();
+    handler.close();
+
+    Mockito.verify(combinerRunner, 
Mockito.times(1)).combine(Matchers.eq(puller), Matchers.eq(pusher));
+
+    Mockito.verify(pusher, Mockito.times(1)).close();
+    Mockito.verify(puller, Mockito.times(1)).close();
+    Mockito.verify(nativeHandler, Mockito.times(1)).close();
+  }
+
+  public void testOnCall() throws IOException {
+    this.handler = new CombinerHandler(nativeHandler, combinerRunner, puller, 
pusher);
+    Assert.assertEquals(null, handler.onCall(new Command(-1), null));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/handlers/TestNativeCollectorOnlyHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/handlers/TestNativeCollectorOnlyHandler.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/handlers/TestNativeCollectorOnlyHandler.java
new file mode 100644
index 0000000..05d87cf
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/handlers/TestNativeCollectorOnlyHandler.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.handlers;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.nativetask.Command;
+import org.apache.hadoop.mapred.nativetask.ICombineHandler;
+import org.apache.hadoop.mapred.nativetask.INativeHandler;
+import org.apache.hadoop.mapred.nativetask.TaskContext;
+import org.apache.hadoop.mapred.nativetask.buffer.BufferType;
+import org.apache.hadoop.mapred.nativetask.buffer.InputBuffer;
+import org.apache.hadoop.mapred.nativetask.util.OutputUtil;
+import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+
+@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
+public class TestNativeCollectorOnlyHandler extends TestCase {
+
+  private NativeCollectorOnlyHandler handler;
+  private INativeHandler nativeHandler;
+  private BufferPusher pusher;
+  private ICombineHandler combiner;
+  private TaskContext taskContext;
+  private String localDir = "build/test/mapred/local";
+
+  @Override
+  public void setUp() throws IOException {
+    this.nativeHandler = Mockito.mock(INativeHandler.class);
+    this.pusher = Mockito.mock(BufferPusher.class);
+    this.combiner = Mockito.mock(ICombineHandler.class);
+    JobConf jobConf = new JobConf();
+    jobConf.set(OutputUtil.NATIVE_TASK_OUTPUT_MANAGER,
+        "org.apache.hadoop.mapred.nativetask.util.LocalJobOutputFiles");
+    jobConf.set("mapred.local.dir", localDir);
+    this.taskContext = new TaskContext(jobConf,
+        BytesWritable.class, BytesWritable.class,
+        BytesWritable.class,
+        BytesWritable.class,
+        null,
+        null);
+
+    Mockito.when(nativeHandler.getInputBuffer()).thenReturn(new 
InputBuffer(BufferType.HEAP_BUFFER, 100));
+  }
+
+  public void testCollect() throws IOException {
+    this.handler = new NativeCollectorOnlyHandler(taskContext, nativeHandler, 
pusher, combiner);
+    handler.collect(new BytesWritable(), new BytesWritable(), 100);
+    handler.close();
+    handler.close();
+
+    Mockito.verify(pusher, 
Mockito.times(1)).collect(Matchers.any(BytesWritable.class),
+        Matchers.any(BytesWritable.class), Matchers.anyInt());
+
+    Mockito.verify(pusher, Mockito.times(1)).close();
+    Mockito.verify(combiner, Mockito.times(1)).close();
+    Mockito.verify(nativeHandler, Mockito.times(1)).close();
+  }
+
+  public void testGetCombiner() throws IOException {
+    this.handler = new NativeCollectorOnlyHandler(taskContext, nativeHandler, 
pusher, combiner);
+    Mockito.when(combiner.getId()).thenReturn(100L);
+    final ReadWriteBuffer result = 
handler.onCall(NativeCollectorOnlyHandler.GET_COMBINE_HANDLER, null);
+    Assert.assertEquals(100L, result.readLong());
+  }
+
+  public void testOnCall() throws IOException {
+    this.handler = new NativeCollectorOnlyHandler(taskContext, nativeHandler, 
pusher, combiner);
+    boolean thrown = false;
+    try {
+      handler.onCall(new Command(-1), null);
+    } catch(final IOException e) {
+      thrown = true;
+    }
+    Assert.assertTrue("exception thrown", thrown);
+
+    final String expectedOutputPath = localDir + "/output/file.out";
+    final String expectedOutputIndexPath = localDir + "/output/file.out.index";
+    final String expectedSpillPath = localDir + "/output/spill0.out";
+
+    final String outputPath = 
handler.onCall(NativeCollectorOnlyHandler.GET_OUTPUT_PATH, null).readString();
+    Assert.assertEquals(expectedOutputPath, outputPath);
+
+    final String outputIndexPath = 
handler.onCall(NativeCollectorOnlyHandler.GET_OUTPUT_INDEX_PATH, 
null).readString();
+    Assert.assertEquals(expectedOutputIndexPath, outputIndexPath);
+
+    final String spillPath = 
handler.onCall(NativeCollectorOnlyHandler.GET_SPILL_PATH, null).readString();
+    Assert.assertEquals(expectedSpillPath, spillPath);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/serde/TestKVSerializer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/serde/TestKVSerializer.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/serde/TestKVSerializer.java
new file mode 100644
index 0000000..004e8b8
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/serde/TestKVSerializer.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.serde;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.nativetask.Constants;
+import org.apache.hadoop.mapred.nativetask.buffer.DataInputStream;
+import org.apache.hadoop.mapred.nativetask.buffer.DataOutputStream;
+import org.apache.hadoop.mapred.nativetask.testutil.TestInput;
+import org.apache.hadoop.mapred.nativetask.testutil.TestInput.KV;
+import org.apache.hadoop.mapred.nativetask.util.SizedWritable;
+import org.junit.Assert;
+import org.junit.Before;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class TestKVSerializer extends TestCase {
+
+  int inputArraySize = 1000; // 1000 bytesWriable elements
+  int bufferSize = 100; // bytes
+  private KV<BytesWritable, BytesWritable>[] inputArray;
+
+  final ByteArrayOutputStream result = new ByteArrayOutputStream();
+  private SizedWritable key;
+  private SizedWritable value;
+  private KVSerializer serializer;
+
+  @Override
+  @Before
+  public void setUp() throws IOException {
+    this.inputArray = TestInput.getMapInputs(inputArraySize);
+    this.key = new SizedWritable(BytesWritable.class);
+    this.value = new SizedWritable(BytesWritable.class);
+
+    this.serializer = new KVSerializer(BytesWritable.class, 
BytesWritable.class);
+
+    key.reset(inputArray[4].key);
+    value.reset(inputArray[4].value);
+    serializer.updateLength(key, value);
+  }
+
+  public void testUpdateLength() throws IOException {
+    Mockito.mock(DataOutputStream.class);
+
+    int kvLength = 0;
+    for (int i = 0; i < inputArraySize; i++) {
+      key.reset(inputArray[i].key);
+      value.reset(inputArray[i].value);
+      serializer.updateLength(key, value);
+
+      // verify whether the size increase
+      Assert.assertTrue(key.length + value.length > kvLength);
+      kvLength = key.length + value.length;
+    }
+  }
+
+  public void testSerializeKV() throws IOException {
+    final DataOutputStream dataOut = Mockito.mock(DataOutputStream.class);
+
+    Mockito.when(dataOut.hasUnFlushedData()).thenReturn(true);
+    Mockito.when(dataOut.shortOfSpace(key.length + value.length + 
Constants.SIZEOF_KV_LENGTH)).thenReturn(true);
+    final int written = serializer.serializeKV(dataOut, key, value);
+
+    // flush once, write 4 int, and 2 byte array
+    Mockito.verify(dataOut, Mockito.times(1)).flush();
+    Mockito.verify(dataOut, Mockito.times(4)).writeInt(Matchers.anyInt());
+    Mockito.verify(dataOut, 
Mockito.times(2)).write(Matchers.any(byte[].class), Matchers.anyInt(), 
Matchers.anyInt());
+
+    Assert.assertEquals(written, key.length + value.length + 
Constants.SIZEOF_KV_LENGTH);
+  }
+
+  public void testSerializeNoFlush() throws IOException {
+    final DataOutputStream dataOut = Mockito.mock(DataOutputStream.class);
+
+    // suppose there are enough space
+    Mockito.when(dataOut.hasUnFlushedData()).thenReturn(true);
+    Mockito.when(dataOut.shortOfSpace(Matchers.anyInt())).thenReturn(false);
+    final int written = serializer.serializeKV(dataOut, key, value);
+
+    // flush 0, write 4 int, and 2 byte array
+    Mockito.verify(dataOut, Mockito.times(0)).flush();
+    Mockito.verify(dataOut, Mockito.times(4)).writeInt(Matchers.anyInt());
+    Mockito.verify(dataOut, 
Mockito.times(2)).write(Matchers.any(byte[].class), Matchers.anyInt(), 
Matchers.anyInt());
+
+    Assert.assertEquals(written, key.length + value.length + 
Constants.SIZEOF_KV_LENGTH);
+  }
+
+  public void testSerializePartitionKV() throws IOException {
+    final DataOutputStream dataOut = Mockito.mock(DataOutputStream.class);
+
+    Mockito.when(dataOut.hasUnFlushedData()).thenReturn(true);
+    Mockito.when(
+        dataOut
+        .shortOfSpace(key.length + value.length + Constants.SIZEOF_KV_LENGTH + 
Constants.SIZEOF_PARTITION_LENGTH))
+        .thenReturn(true);
+    final int written = serializer.serializePartitionKV(dataOut, 100, key, 
value);
+
+    // flush once, write 4 int, and 2 byte array
+    Mockito.verify(dataOut, Mockito.times(1)).flush();
+    Mockito.verify(dataOut, Mockito.times(5)).writeInt(Matchers.anyInt());
+    Mockito.verify(dataOut, 
Mockito.times(2)).write(Matchers.any(byte[].class), Matchers.anyInt(), 
Matchers.anyInt());
+
+    Assert.assertEquals(written, key.length + value.length + 
Constants.SIZEOF_KV_LENGTH
+        + Constants.SIZEOF_PARTITION_LENGTH);
+  }
+
+  public void testDeserializerNoData() throws IOException {
+    final DataInputStream in = Mockito.mock(DataInputStream.class);
+    Mockito.when(in.hasUnReadData()).thenReturn(false);
+    Assert.assertEquals(0, serializer.deserializeKV(in, key, value));
+  }
+
+  public void testDeserializer() throws IOException {
+    final DataInputStream in = Mockito.mock(DataInputStream.class);
+    Mockito.when(in.hasUnReadData()).thenReturn(true);
+    Assert.assertTrue(serializer.deserializeKV(in, key, value) > 0);
+
+    Mockito.verify(in, Mockito.times(4)).readInt();
+    Mockito.verify(in, Mockito.times(2)).readFully(Matchers.any(byte[].class), 
Matchers.anyInt(), Matchers.anyInt());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/serde/TestNativeSerialization.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/serde/TestNativeSerialization.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/serde/TestNativeSerialization.java
new file mode 100644
index 0000000..4b67454
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/serde/TestNativeSerialization.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.serde;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.nativetask.INativeComparable;
+
+@SuppressWarnings({ "rawtypes", "deprecation" })
+public class TestNativeSerialization extends TestCase {
+  public void testRegisterAndGet() throws IOException {
+    final NativeSerialization serialization = 
NativeSerialization.getInstance();
+    serialization.reset();
+
+    serialization.register(WritableKey.class.getName(), 
ComparableKeySerializer.class);
+
+    INativeSerializer serializer = 
serialization.getSerializer(WritableKey.class);
+    Assert.assertEquals(ComparableKeySerializer.class.getName(), 
serializer.getClass().getName());
+
+    serializer = serialization.getSerializer(WritableValue.class);
+    Assert.assertEquals(DefaultSerializer.class.getName(), 
serializer.getClass().getName());
+
+    boolean ioExceptionThrown = false;
+    try {
+      serializer = serialization.getSerializer(NonWritableValue.class);
+    } catch (final IOException e) {
+      ioExceptionThrown = true;
+    }
+    Assert.assertTrue(ioExceptionThrown);
+  }
+
+  public static class WritableKey implements Writable {
+    private int value;
+
+    public WritableKey(int a) {
+      this.value = a;
+    }
+
+    public int getLength() {
+      return 4;
+    }
+
+    public int getValue() {
+      return value;
+    }
+
+    public void setValue(int v) {
+      this.value = v;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+    }
+  }
+
+  public static class WritableValue implements Writable {
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+    }
+  }
+
+  public static class NonWritableValue {
+  }
+
+  public static class ComparableKeySerializer implements INativeComparable, 
INativeSerializer<WritableKey> {
+
+    @Override
+    public int getLength(WritableKey w) throws IOException {
+      return w.getLength();
+    }
+
+    @Override
+    public void serialize(WritableKey w, DataOutput out) throws IOException {
+      out.writeInt(w.getValue());
+    }
+
+    @Override
+    public void deserialize(DataInput in, int length, WritableKey w) throws 
IOException {
+      w.setValue(in.readInt());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/TestInput.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/TestInput.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/TestInput.java
new file mode 100644
index 0000000..6713a4a
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/TestInput.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.testutil;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.io.BytesWritable;
+
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class TestInput {
+
+  public static class KV<K, V> {
+    public K key;
+    public V value;
+  }
+
+  public static char[] CHAR_SET = new char[] { 'A', 'B', 'C', 'D', 'E', 'F', 
'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N',
+    'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 
'd', 'e', 'f', 'g', 'h', 'i', 'j',
+    'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 
'z', '0', '1', '2', '3', '4', '5',
+    '6', '7', '8', '9', '*', '/' };
+
+  public static KV[] getMapInputs(int size) {
+
+    final KV[] dataInput = new KV[size];
+
+    for (int i = 0; i < size; i++) {
+      dataInput[i] = getSingleMapInput(i);
+    }
+    return dataInput;
+  }
+
+  private static KV getSingleMapInput(int i) {
+    final char character = CHAR_SET[i % CHAR_SET.length];
+    final byte b = (byte) character;
+
+    final byte[] bytes = new byte[i];
+    Arrays.fill(bytes, b);
+    final BytesWritable result = new BytesWritable(bytes);
+    final KV kv = new KV();
+    kv.key = result;
+    kv.value = result;
+    return kv;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/utils/TestBytesUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/utils/TestBytesUtil.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/utils/TestBytesUtil.java
new file mode 100644
index 0000000..8d74d63
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/utils/TestBytesUtil.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.utils;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.mapred.nativetask.util.BytesUtil;
+
+@SuppressWarnings({ "deprecation" })
+public class TestBytesUtil extends TestCase {
+
+  public void testBytesStringConversion() {
+
+    final String str = "I am good!";
+    final byte[] bytes = BytesUtil.toBytes(str);
+
+    Assert.assertEquals(str, BytesUtil.fromBytes(bytes));
+ }
+
+  public void testBytesIntConversion() {
+    final int a = 1000;
+    final byte[] intBytes = BytesUtil.toBytes(a);
+
+    Assert.assertEquals(a, BytesUtil.toInt(intBytes));
+  }
+
+  public void testBytesLongConversion() {
+    final long l = 1000000L;
+    final byte[] longBytes = BytesUtil.toBytes(l);
+
+    Assert.assertEquals(l, BytesUtil.toLong(longBytes));
+  }
+
+  public void testBytesFloatConversion() {
+    final float f = 3.14f;
+    final byte[] floatBytes = BytesUtil.toBytes(f);
+
+    Assert.assertEquals(f, BytesUtil.toFloat(floatBytes));
+  }
+
+  public void testBytesDoubleConversion() {
+    final double d = 3.14;
+    final byte[] doubleBytes = BytesUtil.toBytes(d);
+
+    Assert.assertEquals(d, BytesUtil.toDouble(doubleBytes));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/utils/TestReadWriteBuffer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/utils/TestReadWriteBuffer.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/utils/TestReadWriteBuffer.java
new file mode 100644
index 0000000..6ea8092
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/utils/TestReadWriteBuffer.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.utils;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer;
+import org.junit.Assert;
+
+public class TestReadWriteBuffer extends TestCase {
+
+  private static byte[] bytes = new byte[] { '0', 'a', 'b', 'c', 'd', '9' };
+
+  public void testReadWriteBuffer() {
+
+    final ReadWriteBuffer buffer = new ReadWriteBuffer();
+
+    Assert.assertFalse(buffer.getBuff() == null);
+
+    Assert.assertEquals(buffer.getWritePoint(), 0);
+    Assert.assertEquals(buffer.getReadPoint(), 0);
+
+    buffer.writeInt(3);
+
+    buffer.writeString("goodboy");
+
+    buffer.writeLong(10L);
+    buffer.writeBytes(bytes, 0, bytes.length);
+    buffer.writeLong(100L);
+
+    Assert.assertEquals(buffer.getWritePoint(), 41);
+    Assert.assertEquals(buffer.getReadPoint(), 0);
+    Assert.assertTrue(buffer.getBuff().length >= 41);
+
+    Assert.assertEquals(buffer.readInt(), 3);
+    Assert.assertEquals(buffer.readString(), "goodboy");
+    Assert.assertEquals(buffer.readLong(), 10L);
+
+    final byte[] read = buffer.readBytes();
+    for (int i = 0; i < bytes.length; i++) {
+      Assert.assertEquals(bytes[i], read[i]);
+    }
+
+    Assert.assertEquals(100L, buffer.readLong());
+    Assert.assertEquals(41, buffer.getReadPoint());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/utils/TestSizedWritable.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/utils/TestSizedWritable.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/utils/TestSizedWritable.java
new file mode 100644
index 0000000..7b82eff
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/utils/TestSizedWritable.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.utils;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.nativetask.util.SizedWritable;
+import org.junit.Assert;
+
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class TestSizedWritable extends TestCase {
+
+  public void testSizedWritable() {
+    final SizedWritable w = new SizedWritable(BytesWritable.class);
+    Assert.assertTrue(w.length == SizedWritable.INVALID_LENGTH);
+    Assert.assertFalse(w.v == null);
+  }
+}

Reply via email to