http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/DataChannel.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/DataChannel.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/DataChannel.java
new file mode 100644
index 0000000..e8132bd
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/DataChannel.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask;
+
+public enum DataChannel {
+  /**
+   * We will only read data from this channel
+   */
+  IN,
+  /**
+   * We will only write data from this channel
+   */
+  OUT,
+  /**
+   * We will do both read and write for this channel
+   */
+  INOUT,
+  /**
+   * There is no data exchange
+   */
+  NONE
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/DataReceiver.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/DataReceiver.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/DataReceiver.java
new file mode 100644
index 0000000..c47cdac
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/DataReceiver.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask;
+
+import java.io.IOException;
+
+/**
+ * a DataReceiver pulls in arriving data, an example
+ * is {@link org.apache.hadoop.mapred.nativetask.handlers.BufferPuller}
+ */
+public interface DataReceiver {
+
+  /**
+   * Send a signal to the receiver that the data arrives.
+   * The data is transferred in another band.
+   * 
+   * @return
+   * @throws IOException
+   */
+  public boolean receiveData() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/HadoopPlatform.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/HadoopPlatform.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/HadoopPlatform.java
new file mode 100644
index 0000000..1c4ede5
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/HadoopPlatform.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.VIntWritable;
+import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.nativetask.serde.*;
+import org.apache.log4j.Logger;
+
+public class HadoopPlatform extends Platform {
+  private static final Logger LOG = Logger.getLogger(HadoopPlatform.class);
+
+  public HadoopPlatform() throws IOException {
+  }
+
+  @Override
+  public void init() throws IOException {
+    registerKey(NullWritable.class.getName(), NullWritableSerializer.class);
+    registerKey(Text.class.getName(), TextSerializer.class);
+    registerKey(LongWritable.class.getName(), LongWritableSerializer.class);
+    registerKey(IntWritable.class.getName(), IntWritableSerializer.class);
+    registerKey(Writable.class.getName(), DefaultSerializer.class);
+    registerKey(BytesWritable.class.getName(), BytesWritableSerializer.class);
+    registerKey(BooleanWritable.class.getName(), BoolWritableSerializer.class);
+    registerKey(ByteWritable.class.getName(), ByteWritableSerializer.class);
+    registerKey(FloatWritable.class.getName(), FloatWritableSerializer.class);
+    registerKey(DoubleWritable.class.getName(), 
DoubleWritableSerializer.class);
+    registerKey(VIntWritable.class.getName(), VIntWritableSerializer.class);
+    registerKey(VLongWritable.class.getName(), VLongWritableSerializer.class);
+
+    LOG.info("Hadoop platform inited");
+  }
+
+  @Override
+  public boolean support(String keyClassName, INativeSerializer serializer, 
JobConf job) {
+    if (keyClassNames.contains(keyClassName)
+      && serializer instanceof INativeComparable) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public boolean define(Class comparatorClass) {
+    return false;
+  }
+
+  @Override
+  public String name() {
+    return "Hadoop";
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/ICombineHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/ICombineHandler.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/ICombineHandler.java
new file mode 100644
index 0000000..8f50863
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/ICombineHandler.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask;
+
+import java.io.IOException;
+
+/**
+ * interacts with native side to support Java Combiner
+ */
+public interface ICombineHandler {
+
+  /**
+   * run combiner
+   * @throws IOException
+   */
+  public void combine() throws IOException;
+
+  /**
+   * @return id of this handler
+   */
+  public long getId();
+
+  /**
+   * close handlers, buffer pullers and pushers
+   * @throws IOException
+   */
+  public void close() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/INativeComparable.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/INativeComparable.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/INativeComparable.java
new file mode 100644
index 0000000..ae07f3b
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/INativeComparable.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask;
+
+/**
+ *
+ * Any key type that is comparable at native side must implement this interface
+ *
+ * a native comparator function should have the ComparatorPtr type
+ *
+ *   typedef int (*ComparatorPtr)(const char * src, uint32_t srcLength,
+ *   const char * dest,  uint32_t destLength);
+ *
+ * keys are in serialized format at native side. The function has passed in
+ * the keys' locations and lengths such that we can compare them in the same
+ * logic as their Java comparator
+ *
+ *
+ * For example, a HiveKey {@see HiveKey#write} is serialized as
+ * int field (containing the length of raw bytes) + raw bytes
+ * When comparing two HiveKeys, we firstly read the length field and then
+ * comparing the raw bytes invoking the BytesComparator provided by our 
library.
+ * We pass the location and length of raw bytes into BytesComparator
+ *
+ *   int HivePlatform::HiveKeyComparator(const char * src, uint32_t srcLength,
+ *   const char * dest, uint32_t destLength) {
+ *     uint32_t sl = bswap(*(uint32_t*)src);
+ *     uint32_t dl = bswap(*(uint32_t*)dest);
+ *     return NativeObjectFactory::BytesComparator(src + 4, sl, dest + 4, dl);
+ *   }
+ */
+public interface INativeComparable {
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/INativeHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/INativeHandler.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/INativeHandler.java
new file mode 100644
index 0000000..3e6e3ac
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/INativeHandler.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer;
+
+/**
+ * A Handler accept input, and give output can be used to transfer command and 
data
+ */
+public interface INativeHandler extends NativeDataTarget, NativeDataSource {
+
+  public String name();
+
+  public long getNativeHandler();
+
+  /**
+   * init the native handler
+   */
+  public void init(Configuration conf) throws IOException;
+
+  /**
+   * close the native handler
+   */
+  public void close() throws IOException;
+
+  /**
+   * call command to downstream
+   * 
+   * @param command
+   * @param parameter
+   * @return
+   * @throws IOException
+   */
+  public ReadWriteBuffer call(Command command, ReadWriteBuffer parameter) 
throws IOException;
+
+  /**
+   * @param handler
+   */
+  void setCommandDispatcher(CommandDispatcher handler);
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeBatchProcessor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeBatchProcessor.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeBatchProcessor.java
new file mode 100644
index 0000000..fd68ea6
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeBatchProcessor.java
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.nativetask.buffer.BufferType;
+import org.apache.hadoop.mapred.nativetask.buffer.DirectBufferPool;
+import org.apache.hadoop.mapred.nativetask.buffer.InputBuffer;
+import org.apache.hadoop.mapred.nativetask.buffer.OutputBuffer;
+import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer;
+import org.apache.hadoop.mapred.nativetask.util.ConfigUtil;
+
+/**
+ * used to create channel, transfer data and command between Java and native
+ */
+public class NativeBatchProcessor implements INativeHandler {
+  private static Log LOG = LogFactory.getLog(NativeBatchProcessor.class);
+
+  private final String nativeHandlerName;
+  private long nativeHandlerAddr;
+
+  private boolean isInputFinished = false;
+
+  // << Field used directly in Native, the name must NOT be changed
+  private ByteBuffer rawOutputBuffer;
+  private ByteBuffer rawInputBuffer;
+  // >>
+
+  private InputBuffer in;
+  private OutputBuffer out;
+
+  private CommandDispatcher commandDispatcher;
+  private DataReceiver dataReceiver;
+
+  static {
+    if (NativeRuntime.isNativeLibraryLoaded()) {
+      InitIDs();
+    }
+  }
+
+  public static INativeHandler create(String nativeHandlerName,
+      Configuration conf, DataChannel channel) throws IOException {
+
+    final int bufferSize = conf.getInt(Constants.NATIVE_PROCESSOR_BUFFER_KB,
+        1024) * 1024;
+
+    LOG.info("NativeHandler: direct buffer size: " + bufferSize);
+
+    OutputBuffer out = null;
+    InputBuffer in = null;
+
+    switch (channel) {
+    case IN:
+      in = new InputBuffer(BufferType.DIRECT_BUFFER, bufferSize);
+      break;
+    case OUT:
+      out = new OutputBuffer(BufferType.DIRECT_BUFFER, bufferSize);
+      break;
+    case INOUT:
+      in = new InputBuffer(BufferType.DIRECT_BUFFER, bufferSize);
+      out = new OutputBuffer(BufferType.DIRECT_BUFFER, bufferSize);
+      break;
+    case NONE:
+    }
+
+    final INativeHandler handler = new NativeBatchProcessor(nativeHandlerName,
+        in, out);
+    handler.init(conf);
+    return handler;
+  }
+
+  protected NativeBatchProcessor(String nativeHandlerName, InputBuffer input,
+      OutputBuffer output) throws IOException {
+    this.nativeHandlerName = nativeHandlerName;
+
+    if (null != input) {
+      this.in = input;
+      this.rawInputBuffer = input.getByteBuffer();
+    }
+    if (null != output) {
+      this.out = output;
+      this.rawOutputBuffer = output.getByteBuffer();
+    }
+  }
+
+  @Override
+  public void setCommandDispatcher(CommandDispatcher handler) {
+    this.commandDispatcher = handler;
+  }
+
+  @Override
+  public void init(Configuration conf) throws IOException {
+    this.nativeHandlerAddr = NativeRuntime
+        .createNativeObject(nativeHandlerName);
+    if (this.nativeHandlerAddr == 0) {
+      throw new RuntimeException("Native object create failed, class: "
+          + nativeHandlerName);
+    }
+    setupHandler(nativeHandlerAddr, ConfigUtil.toBytes(conf));
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    if (nativeHandlerAddr != 0) {
+      NativeRuntime.releaseNativeObject(nativeHandlerAddr);
+      nativeHandlerAddr = 0;
+    }
+    if (null != in && null != in.getByteBuffer() && 
in.getByteBuffer().isDirect()) {
+      DirectBufferPool.getInstance().returnBuffer(in.getByteBuffer());
+    }
+  }
+
+  @Override
+  public long getNativeHandler() {
+    return nativeHandlerAddr;
+  }
+
+  @Override
+  public ReadWriteBuffer call(Command command, ReadWriteBuffer parameter)
+      throws IOException {
+    final byte[] bytes = nativeCommand(nativeHandlerAddr, command.id(),
+        null == parameter ? null : parameter.getBuff());
+
+    final ReadWriteBuffer result = new ReadWriteBuffer(bytes);
+    result.setWritePoint(bytes.length);
+    return result;
+  }
+
+  @Override
+  public void sendData() throws IOException {
+    nativeProcessInput(nativeHandlerAddr, rawOutputBuffer.position());
+    rawOutputBuffer.position(0);
+  }
+
+  @Override
+  public void finishSendData() throws IOException {
+    if (null == rawOutputBuffer || isInputFinished) {
+      return;
+    }
+
+    sendData();
+    nativeFinish(nativeHandlerAddr);
+    isInputFinished = true;
+  }
+
+  private byte[] sendCommandToJava(int command, byte[] data) throws 
IOException {
+    try {
+
+      final Command cmd = new Command(command);
+      ReadWriteBuffer param = null;
+
+      if (null != data) {
+        param = new ReadWriteBuffer();
+        param.reset(data);
+        param.setWritePoint(data.length);
+      }
+
+      if (null != commandDispatcher) {
+        ReadWriteBuffer result = null;
+
+        result = commandDispatcher.onCall(cmd, param);
+        if (null != result) {
+          return result.getBuff();
+        } else {
+          return null;
+        }
+      } else {
+        return null;
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Called by native side, clean output buffer so native side can continue
+   * processing
+   */
+  private void flushOutput(int length) throws IOException {
+
+    if (null != rawInputBuffer) {
+      rawInputBuffer.position(0);
+      rawInputBuffer.limit(length);
+
+      if (null != dataReceiver) {
+        try {
+          dataReceiver.receiveData();
+        } catch (IOException e) {
+          e.printStackTrace();
+          throw e;
+        }
+      }
+    }
+  }
+
+  /**
+   * Cache JNI field & method ids
+   */
+  private static native void InitIDs();
+
+  /**
+   * Setup native side BatchHandler
+   */
+  private native void setupHandler(long nativeHandlerAddr, byte[][] configs);
+
+  /**
+   * Let native side to process data in inputBuffer
+   * 
+   * @param handler
+   * @param length
+   */
+  private native void nativeProcessInput(long handler, int length);
+
+  /**
+   * Notice native side input is finished
+   * 
+   * @param handler
+   */
+  private native void nativeFinish(long handler);
+
+  /**
+   * Send control message to native side
+   * 
+   * @param cmd
+   *          command data
+   * @return return value
+   */
+  private native byte[] nativeCommand(long handler, int cmd, byte[] parameter);
+
+  /**
+   * Load data from native
+   * 
+   * @return
+   */
+  private native void nativeLoadData(long handler);
+
+  protected void finishOutput() {
+  }
+
+  @Override
+  public InputBuffer getInputBuffer() {
+    return this.in;
+  }
+
+  @Override
+  public OutputBuffer getOutputBuffer() {
+    return this.out;
+  }
+
+  @Override
+  public void loadData() throws IOException {
+    nativeLoadData(nativeHandlerAddr);
+    //
+    // return call(Command.CMD_LOAD, param);
+  }
+
+  @Override
+  public void setDataReceiver(DataReceiver handler) {
+    this.dataReceiver = handler;
+  }
+
+  @Override
+  public String name() {
+    return nativeHandlerName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeDataSource.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeDataSource.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeDataSource.java
new file mode 100644
index 0000000..d0bfa93
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeDataSource.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.nativetask.buffer.InputBuffer;
+import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer;
+
+/**
+ * NativeDataSource loads data from upstream
+ */
+public interface NativeDataSource {
+
+  /**
+   * get input buffer
+   * 
+   * @return
+   */
+  public InputBuffer getInputBuffer();
+
+  /**
+   * set listener. When data from upstream arrives, the listener will be 
activated.
+   * 
+   * @param handler
+   */
+  void setDataReceiver(DataReceiver handler);
+
+  /**
+   * load data from upstream
+   * 
+   * @return
+   * @throws IOException
+   */
+  public void loadData() throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeDataTarget.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeDataTarget.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeDataTarget.java
new file mode 100644
index 0000000..7a780eb
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeDataTarget.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.nativetask.buffer.OutputBuffer;
+
+/**
+ * NativeDataTarge sends data to downstream
+ */
+public interface NativeDataTarget {
+
+  /**
+   * send a signal to indicate that the data has been stored in output buffer
+   * 
+   * @throws IOException
+   */
+  public void sendData() throws IOException;
+
+  /**
+   * Send a signal that there is no more data
+   * 
+   * @throws IOException
+   */
+  public void finishSendData() throws IOException;
+
+  /**
+   * get the output buffer.
+   * 
+   * @return
+   */
+  public OutputBuffer getOutputBuffer();
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeMapOutputCollectorDelegator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeMapOutputCollectorDelegator.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeMapOutputCollectorDelegator.java
new file mode 100644
index 0000000..a48b5c3
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeMapOutputCollectorDelegator.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapred.InvalidJobConfException;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapOutputCollector;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.nativetask.handlers.NativeCollectorOnlyHandler;
+import org.apache.hadoop.mapred.nativetask.serde.INativeSerializer;
+import org.apache.hadoop.mapred.nativetask.serde.NativeSerialization;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.util.QuickSort;
+import org.apache.hadoop.util.RunJar;
+
+/**
+ * native map output collector wrapped in Java interface
+ */
+public class NativeMapOutputCollectorDelegator<K, V> implements 
MapOutputCollector<K, V> {
+
+  private static Log LOG = 
LogFactory.getLog(NativeMapOutputCollectorDelegator.class);
+  private JobConf job;
+  private NativeCollectorOnlyHandler<K, V> handler;
+
+  private StatusReportChecker updater;
+
+  @Override
+  public void collect(K key, V value, int partition) throws IOException, 
InterruptedException {
+    handler.collect(key, value, partition);
+  }
+
+  @Override
+  public void close() throws IOException, InterruptedException {
+    handler.close();
+    if (null != updater) {
+      updater.stop();
+    }
+  }
+
+  @Override
+  public void flush() throws IOException, InterruptedException, 
ClassNotFoundException {
+    handler.flush();
+  }
+
+  @Override
+  public void init(Context context) throws IOException, ClassNotFoundException 
{
+    this.job = context.getJobConf();
+
+    Platforms.init(job);
+
+    if (job.getNumReduceTasks() == 0) {
+      String message = "There is no reducer, no need to use native output 
collector";
+      LOG.error(message);
+      throw new InvalidJobConfException(message);
+    }
+
+    Class comparatorClass = job.getClass(MRJobConfig.KEY_COMPARATOR, null, 
RawComparator.class);
+    if (comparatorClass != null && !Platforms.define(comparatorClass)) {
+      String message = "Native output collector don't support customized java 
comparator "
+        + job.get(MRJobConfig.KEY_COMPARATOR);
+      LOG.error(message);
+      throw new InvalidJobConfException(message);
+    }
+
+    if (job.getBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, false) == true) {
+      if (!isCodecSupported(job.get(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC))) {
+        String message = "Native output collector don't support compression 
codec "
+          + job.get(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC) + ", We support 
Gzip, Lz4, snappy";
+        LOG.error(message);
+        throw new InvalidJobConfException(message);
+      }
+    }
+
+    if (!QuickSort.class.getName().equals(job.get(Constants.MAP_SORT_CLASS))) {
+      String message = "Native-Task don't support sort class " + 
job.get(Constants.MAP_SORT_CLASS);
+      LOG.error(message);
+      throw new InvalidJobConfException(message);
+    }
+
+    if (job.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY, false) == true) {
+      String message = "Native-Task don't support secure shuffle";
+      LOG.error(message);
+      throw new InvalidJobConfException(message);
+    }
+
+    final Class<?> keyCls = job.getMapOutputKeyClass();
+    try {
+      @SuppressWarnings("rawtypes")
+      final INativeSerializer serializer = 
NativeSerialization.getInstance().getSerializer(keyCls);
+      if (null == serializer) {
+        String message = "Key type not supported. Cannot find serializer for " 
+ keyCls.getName();
+        LOG.error(message);
+        throw new InvalidJobConfException(message);
+      } else if (!Platforms.support(keyCls.getName(), serializer, job)) {
+        String message = "Native output collector don't support this key, this 
key is not comparable in native "
+          + keyCls.getName();
+        LOG.error(message);
+        throw new InvalidJobConfException(message);
+      }
+    } catch (final IOException e) {
+      String message = "Cannot find serializer for " + keyCls.getName();
+      LOG.error(message);
+      throw new IOException(message);
+    }
+
+    final boolean ret = NativeRuntime.isNativeLibraryLoaded();
+    if (ret) {
+      NativeRuntime.configure(job);
+
+      final long updateInterval = 
job.getLong(Constants.NATIVE_STATUS_UPDATE_INTERVAL,
+          Constants.NATIVE_STATUS_UPDATE_INTERVAL_DEFVAL);
+      updater = new StatusReportChecker(context.getReporter(), updateInterval);
+      updater.start();
+
+    } else {
+      String message = "Nativeruntime cannot be loaded, please check the 
libnativetask.so is in hadoop library dir";
+      LOG.error(message);
+      throw new InvalidJobConfException(message);
+    }
+
+    this.handler = null;
+    try {
+      final Class<K> oKClass = (Class<K>) job.getMapOutputKeyClass();
+      final Class<K> oVClass = (Class<K>) job.getMapOutputValueClass();
+      final TaskAttemptID id = context.getMapTask().getTaskID();
+      final TaskContext taskContext = new TaskContext(job, null, null, 
oKClass, oVClass,
+          context.getReporter(), id);
+      handler = NativeCollectorOnlyHandler.create(taskContext);
+    } catch (final IOException e) {
+      String message = "Native output collector cannot be loaded;";
+      LOG.error(message);
+      throw new IOException(message, e);
+    }
+
+    LOG.info("Native output collector can be successfully enabled!");
+  }
+
+  private boolean isCodecSupported(String string) {
+    if ("org.apache.hadoop.io.compress.SnappyCodec".equals(string)
+        || "org.apache.hadoop.io.compress.GzipCodec".equals(string)
+        || "org.apache.hadoop.io.compress.Lz4Codec".equals(string)) {
+      return true;
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeRuntime.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeRuntime.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeRuntime.java
new file mode 100644
index 0000000..53b1acd
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeRuntime.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Task.TaskReporter;
+import org.apache.hadoop.mapred.nativetask.util.BytesUtil;
+import org.apache.hadoop.mapred.nativetask.util.ConfigUtil;
+import org.apache.hadoop.mapred.nativetask.util.SnappyUtil;
+import org.apache.hadoop.util.VersionInfo;
+
+/**
+ * This class stands for the native runtime It has three functions: 1. Create 
native handlers for map, reduce,
+ * outputcollector, and etc 2. Configure native task with provided MR configs 
3. Provide file system api to native
+ * space, so that it can use File system like HDFS.
+ * 
+ */
+public class NativeRuntime {
+  private static Log LOG = LogFactory.getLog(NativeRuntime.class);
+  private static boolean nativeLibraryLoaded = false;
+
+  private static Configuration conf = new Configuration();
+
+  static {
+    try {
+      if (false == SnappyUtil.isNativeSnappyLoaded(conf)) {
+        throw new IOException("Snappy library cannot be loaded");
+      } else {
+        LOG.info("Snappy native library is available");
+      }
+      System.loadLibrary("nativetask");
+      LOG.info("Nativetask JNI library loaded.");
+      nativeLibraryLoaded = true;
+    } catch (final Throwable t) {
+      // Ignore failures
+      LOG.error("Failed to load nativetask JNI library with error: " + t);
+      LOG.info("java.library.path=" + System.getProperty("java.library.path"));
+      LOG.info("LD_LIBRARY_PATH=" + System.getenv("LD_LIBRARY_PATH"));
+    }
+  }
+
+  private static void assertNativeLibraryLoaded() {
+    if (!nativeLibraryLoaded) {
+      throw new RuntimeException("Native runtime library not loaded");
+    }
+  }
+
+  public static boolean isNativeLibraryLoaded() {
+    return nativeLibraryLoaded;
+  }
+
+  public static void configure(Configuration jobConf) {
+    assertNativeLibraryLoaded();
+    conf = new Configuration(jobConf);
+    conf.set(Constants.NATIVE_HADOOP_VERSION, VersionInfo.getVersion());
+    JNIConfigure(ConfigUtil.toBytes(conf));
+  }
+
+  /**
+   * create native object We use it to create native handlers
+   * 
+   * @param clazz
+   * @return
+   */
+  public synchronized static long createNativeObject(String clazz) {
+    assertNativeLibraryLoaded();
+    final long ret = JNICreateNativeObject(BytesUtil.toBytes(clazz));
+    if (ret == 0) {
+      LOG.warn("Can't create NativeObject for class " + clazz + ", probably 
not exist.");
+    }
+    return ret;
+  }
+
+  /**
+   * Register a customized library
+   * 
+   * @param clazz
+   * @return
+   */
+  public synchronized static long registerLibrary(String libraryName, String 
clazz) {
+    assertNativeLibraryLoaded();
+    final long ret = JNIRegisterModule(BytesUtil.toBytes(libraryName), 
BytesUtil.toBytes(clazz));
+    if (ret != 0) {
+      LOG.warn("Can't create NativeObject for class " + clazz + ", probably 
not exist.");
+    }
+    return ret;
+  }
+
+  /**
+   * destroy native object We use to destory native handlers
+   */
+  public synchronized static void releaseNativeObject(long addr) {
+    assertNativeLibraryLoaded();
+    JNIReleaseNativeObject(addr);
+  }
+
+  /**
+   * Get the status report from native space
+   * 
+   * @param reporter
+   * @throws IOException
+   */
+  public static void reportStatus(TaskReporter reporter) throws IOException {
+    assertNativeLibraryLoaded();
+    synchronized (reporter) {
+      final byte[] statusBytes = JNIUpdateStatus();
+      final DataInputBuffer ib = new DataInputBuffer();
+      ib.reset(statusBytes, statusBytes.length);
+      final FloatWritable progress = new FloatWritable();
+      progress.readFields(ib);
+      reporter.setProgress(progress.get());
+      final Text status = new Text();
+      status.readFields(ib);
+      if (status.getLength() > 0) {
+        reporter.setStatus(status.toString());
+      }
+      final IntWritable numCounters = new IntWritable();
+      numCounters.readFields(ib);
+      if (numCounters.get() == 0) {
+        return;
+      }
+      final Text group = new Text();
+      final Text name = new Text();
+      final LongWritable amount = new LongWritable();
+      for (int i = 0; i < numCounters.get(); i++) {
+        group.readFields(ib);
+        name.readFields(ib);
+        amount.readFields(ib);
+        reporter.incrCounter(group.toString(), name.toString(), amount.get());
+      }
+    }
+  }
+
+
+  /*******************************************************
+   *** The following are JNI Apis
+   ********************************************************/
+
+  /**
+   * Config the native runtime with mapreduce job configurations.
+   * 
+   * @param configs
+   */
+  private native static void JNIConfigure(byte[][] configs);
+
+  /**
+   * create a native object in native space
+   * 
+   * @param clazz
+   * @return
+   */
+  private native static long JNICreateNativeObject(byte[] clazz);
+
+  /**
+   * create the default native object for certain type
+   * 
+   * @param type
+   * @return
+   */
+  @Deprecated
+  private native static long JNICreateDefaultNativeObject(byte[] type);
+
+  /**
+   * destroy native object in native space
+   * 
+   * @param addr
+   */
+  private native static void JNIReleaseNativeObject(long addr);
+
+  /**
+   * get status update from native side Encoding: progress:float status:Text 
Counter number: int the count of the
+   * counters Counters: array [group:Text, name:Text, incrCount:Long]
+   * 
+   * @return
+   */
+  private native static byte[] JNIUpdateStatus();
+
+  /**
+   * Not used.
+   */
+  private native static void JNIRelease();
+
+  /**
+   * Not used.
+   */
+  private native static int JNIRegisterModule(byte[] path, byte[] name);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Platform.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Platform.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Platform.java
new file mode 100644
index 0000000..aef53ce
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Platform.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.InvalidJobConfException;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.nativetask.serde.INativeSerializer;
+import org.apache.hadoop.mapred.nativetask.serde.NativeSerialization;
+
+/**
+ * Base class for platforms. A platform is a framework running on top of
+ * MapReduce, like Hadoop, Hive, Pig, Mahout. Each framework defines its
+ * own key type and value type across a MapReduce job. For each platform,
+ * we should implement serializers such that we could communicate data with
+ * native side and native comparators so our native output collectors could
+ * sort them and write out. We've already provided the {@link HadoopPlatform}
+ * that supports all key types of Hadoop and users could implement their custom
+ * platform.
+ */
+public abstract class Platform {
+  private final NativeSerialization serialization;
+  protected Set<String> keyClassNames = new HashSet<String>();
+
+  public Platform() {
+    this.serialization = NativeSerialization.getInstance();
+  }
+
+  /**
+   * initialize a platform, where we should call registerKey
+   *
+   * @throws IOException
+   */
+  public abstract void init() throws IOException;
+
+  /**
+   * @return name of a Platform, useful for logs and debug
+   */
+  public abstract String name();
+
+
+  /**
+   * associate a key class with its serializer and platform
+   *
+   * @param keyClassName map out key class name
+   * @param key          key serializer class
+   * @throws IOException
+   */
+  protected void registerKey(String keyClassName, Class key) throws 
IOException {
+    serialization.register(keyClassName, key);
+    keyClassNames.add(keyClassName);
+  }
+
+  /**
+   * whether a platform supports a specific key should at least satisfy two 
conditions
+   *
+   * 1. the key belongs to the platform
+   * 2. the associated serializer must implement {@link INativeComparable} 
interface
+   *
+   *
+   * @param keyClassName map out put key class name
+   * @param serializer   serializer associated with key via registerKey
+   * @param job          job configuration
+   * @return             true if the platform has implemented native 
comparators of the key and
+   *                     false otherwise
+   */
+  protected abstract boolean support(String keyClassName, INativeSerializer 
serializer, JobConf job);
+
+
+  /**
+   * whether it's the platform that has defined a custom Java comparator
+   *
+   * NativeTask doesn't support custom Java comparator(set with 
mapreduce.job.output.key.comparator.class)
+   * but a platform (e.g Pig) could also set that conf and implement native 
comparators so
+   * we shouldn't bail out.
+   *
+   * @param keyComparator comparator set with 
mapreduce.job.output.key.comparator.class
+   * @return
+   */
+  protected abstract boolean define(Class keyComparator);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Platforms.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Platforms.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Platforms.java
new file mode 100644
index 0000000..154bbc8
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Platforms.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask;
+
+import java.io.IOException;
+import java.util.ServiceLoader;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.nativetask.serde.INativeSerializer;
+import org.apache.hadoop.mapred.nativetask.serde.NativeSerialization;
+import org.apache.log4j.Logger;
+
+
+/**
+ * this class will load in and init all platforms on classpath
+ * it is also the facade to check for key type support and other
+ * platform methods
+ */
+public class Platforms {
+
+  private static final Logger LOG = Logger.getLogger(Platforms.class);
+  private static final ServiceLoader<Platform> platforms = 
ServiceLoader.load(Platform.class);
+  
+  public static void init(Configuration conf) throws IOException {
+
+    NativeSerialization.getInstance().reset();
+    synchronized (platforms) {
+      for (Platform platform : platforms) {
+        platform.init();
+      }
+    }
+  }
+
+  public static boolean support(String keyClassName, INativeSerializer 
serializer, JobConf job) {
+    synchronized (platforms) {
+      for (Platform platform : platforms) {
+        if (platform.support(keyClassName, serializer, job)) {
+          LOG.debug("platform " + platform.name() + " support key class"
+            + keyClassName);
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  public static boolean define(Class keyComparator) {
+    synchronized (platforms) {
+      for (Platform platform : platforms) {
+        if (platform.define(keyComparator)) {
+          LOG.debug("platform " + platform.name() + " define comparator "
+            + keyComparator.getName());
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/StatusReportChecker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/StatusReportChecker.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/StatusReportChecker.java
new file mode 100644
index 0000000..1fa59dc
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/StatusReportChecker.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.Task.Counter;
+import org.apache.hadoop.mapred.Task.TaskReporter;
+
+/**
+ * Will periodically check status from native and report to MR framework.
+ * 
+ */
+public class StatusReportChecker implements Runnable {
+
+  private static Log LOG = LogFactory.getLog(StatusReportChecker.class);
+  public static int INTERVAL = 1000; // milli-seconds
+
+  private Thread checker;
+  private final TaskReporter reporter;
+  private final long interval;
+
+  public StatusReportChecker(TaskReporter reporter) {
+    this(reporter, INTERVAL);
+  }
+
+  public StatusReportChecker(TaskReporter reporter, long interval) {
+    this.reporter = reporter;
+    this.interval = interval;
+  }
+
+  @Override
+  public void run() {
+    while (true) {
+      try {
+        Thread.sleep(interval);
+      } catch (final InterruptedException e) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("StatusUpdater thread exiting " + "since it got 
interrupted");
+        }
+        break;
+      }
+      try {
+        NativeRuntime.reportStatus(reporter);
+      } catch (final IOException e) {
+        LOG.warn("Update native status got exception", e);
+        reporter.setStatus(e.toString());
+        break;
+      }
+    }
+  }
+
+  protected void initUsedCounters() {
+    reporter.getCounter(Counter.MAP_INPUT_RECORDS);
+    reporter.getCounter(Counter.MAP_OUTPUT_RECORDS);
+    reporter.getCounter(Counter.MAP_INPUT_BYTES);
+    reporter.getCounter(Counter.MAP_OUTPUT_BYTES);
+    reporter.getCounter(Counter.MAP_OUTPUT_MATERIALIZED_BYTES);
+    reporter.getCounter(Counter.COMBINE_INPUT_RECORDS);
+    reporter.getCounter(Counter.COMBINE_OUTPUT_RECORDS);
+    reporter.getCounter(Counter.REDUCE_INPUT_RECORDS);
+    reporter.getCounter(Counter.REDUCE_OUTPUT_RECORDS);
+    reporter.getCounter(Counter.REDUCE_INPUT_GROUPS);
+    reporter.getCounter(Counter.SPILLED_RECORDS);
+    reporter.getCounter(Counter.MAP_OUTPUT_BYTES);
+    reporter.getCounter(Counter.MAP_OUTPUT_RECORDS);
+  }
+
+  public synchronized void start() {
+    if (checker == null) {
+      // init counters used by native side,
+      // so they will have correct display name
+      initUsedCounters();
+      checker = new Thread(this);
+      checker.setDaemon(true);
+      checker.start();
+    }
+  }
+
+  public synchronized void stop() throws InterruptedException {
+    if (checker != null) {
+      checker.interrupt();
+      checker.join();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/TaskContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/TaskContext.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/TaskContext.java
new file mode 100644
index 0000000..a4430ed
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/TaskContext.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Task.TaskReporter;
+import org.apache.hadoop.mapred.TaskAttemptID;
+
+public class TaskContext {
+  private final JobConf conf;
+  private Class iKClass;
+  private Class iVClass;
+  private Class oKClass;
+  private Class oVClass;
+  private final TaskReporter reporter;
+  private final TaskAttemptID taskAttemptID;
+
+  public TaskContext(JobConf conf, Class iKClass, Class iVClass, Class 
oKClass, Class oVClass, TaskReporter reporter,
+      TaskAttemptID id) {
+    this.conf = conf;
+    this.iKClass = iKClass;
+    this.iVClass = iVClass;
+    this.oKClass = oKClass;
+    this.oVClass = oVClass;
+    this.reporter = reporter;
+    this.taskAttemptID = id;
+  }
+
+  public Class getInputKeyClass() {
+    return iKClass;
+  }
+
+  public void setInputKeyClass(Class klass) {
+    this.iKClass = klass;
+  }
+
+  public Class getInputValueClass() {
+    return iVClass;
+  }
+
+  public void setInputValueClass(Class klass) {
+    this.iVClass = klass;
+  }
+
+  public Class getOuputKeyClass() {
+    return this.oKClass;
+  }
+
+  public void setOutputKeyClass(Class klass) {
+    this.oKClass = klass;
+  }
+
+  public Class getOutputValueClass() {
+    return this.oVClass;
+  }
+
+  public void setOutputValueClass(Class klass) {
+    this.oVClass = klass;
+  }
+
+  public TaskReporter getTaskReporter() {
+    return this.reporter;
+  }
+
+  public TaskAttemptID getTaskAttemptId() {
+    return this.taskAttemptID;
+  }
+
+  public JobConf getConf() {
+    return this.conf;
+  }
+
+  public TaskContext copyOf() {
+    return new TaskContext(conf, iKClass, iVClass, oKClass, oVClass, reporter, 
taskAttemptID);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/BufferType.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/BufferType.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/BufferType.java
new file mode 100644
index 0000000..a52be1f
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/BufferType.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.buffer;
+
+public enum BufferType {
+
+  DIRECT_BUFFER,
+
+  HEAP_BUFFER
+};
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataReader.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataReader.java
new file mode 100644
index 0000000..5af7180
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataReader.java
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.buffer;
+
+import java.io.DataInput;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PushbackInputStream;
+import java.io.UTFDataFormatException;
+import java.nio.ByteBuffer;
+
+/**
+ * read data from a input buffer
+ */
+public class ByteBufferDataReader extends DataInputStream {
+  private ByteBuffer byteBuffer;
+  private char lineCache[];
+
+  public ByteBufferDataReader(InputBuffer buffer) {
+    if (buffer != null) {
+      this.byteBuffer = buffer.getByteBuffer();
+    }
+  }
+
+  public void reset(InputBuffer buffer) {
+    this.byteBuffer = buffer.getByteBuffer();
+  }
+
+  @Override
+  public int read() throws IOException {
+    return byteBuffer.get();
+  }
+
+  @Override
+  public int read(byte b[], int off, int len) throws IOException {
+    byteBuffer.get(b, off, len);
+    return len;
+  }
+
+  @Override
+  public void readFully(byte[] b) throws IOException {
+    byteBuffer.get(b, 0, b.length);
+  }
+
+  @Override
+  public void readFully(byte[] b, int off, int len) throws IOException {
+    byteBuffer.get(b, off, len);
+  }
+
+  @Override
+  public int skipBytes(int n) throws IOException {
+    final int remains = byteBuffer.remaining();
+    final int skip = (remains < n) ? remains : n;
+    final int current = byteBuffer.position();
+    byteBuffer.position(current + skip);
+    return skip;
+  }
+
+  @Override
+  public boolean readBoolean() throws IOException {
+    return (byteBuffer.get() == 1) ? true : false;
+  }
+
+  @Override
+  public byte readByte() throws IOException {
+    return byteBuffer.get();
+  }
+
+  @Override
+  public int readUnsignedByte() throws IOException {
+    final int ch = byteBuffer.get();
+    if (ch < 0) {
+      throw new EOFException();
+    }
+    return ch;
+  }
+
+  @Override
+  public short readShort() throws IOException {
+    return byteBuffer.getShort();
+  }
+
+  @Override
+  public int readUnsignedShort() throws IOException {
+    return byteBuffer.getShort();
+  }
+
+  @Override
+  public char readChar() throws IOException {
+    return byteBuffer.getChar();
+  }
+
+  @Override
+  public int readInt() throws IOException {
+    return byteBuffer.getInt();
+  }
+
+  @Override
+  public long readLong() throws IOException {
+    return byteBuffer.getLong();
+  }
+
+  @Override
+  public float readFloat() throws IOException {
+    return byteBuffer.getFloat();
+  }
+
+  @Override
+  public double readDouble() throws IOException {
+    return byteBuffer.getDouble();
+  }
+
+  @Override
+  public String readLine() throws IOException {
+
+    InputStream in = this;
+
+    char buf[] = lineCache;
+
+    if (buf == null) {
+      buf = lineCache = new char[128];
+    }
+
+    int room = buf.length;
+    int offset = 0;
+    int c;
+
+    loop: while (true) {
+      switch (c = in.read()) {
+      case -1:
+      case '\n':
+        break loop;
+
+      case '\r':
+        final int c2 = in.read();
+        if ((c2 != '\n') && (c2 != -1)) {
+          if (!(in instanceof PushbackInputStream)) {
+            in = new PushbackInputStream(in);
+          }
+          ((PushbackInputStream) in).unread(c2);
+        }
+        break loop;
+
+      default:
+        if (--room < 0) {
+          buf = new char[offset + 128];
+          room = buf.length - offset - 1;
+          System.arraycopy(lineCache, 0, buf, 0, offset);
+          lineCache = buf;
+        }
+        buf[offset++] = (char) c;
+        break;
+      }
+    }
+    if ((c == -1) && (offset == 0)) {
+      return null;
+    }
+    return String.copyValueOf(buf, 0, offset);
+  }
+
+  @Override
+  public final String readUTF() throws IOException {
+    return readUTF(this);
+  }
+
+  private final static String readUTF(DataInput in) throws IOException {
+    final int utflen = in.readUnsignedShort();
+    byte[] bytearr = null;
+    char[] chararr = null;
+
+    bytearr = new byte[utflen];
+    chararr = new char[utflen];
+
+    int c, char2, char3;
+    int count = 0;
+    int chararr_count = 0;
+
+    in.readFully(bytearr, 0, utflen);
+
+    while (count < utflen) {
+      c = bytearr[count] & 0xff;
+      if (c > 127) {
+        break;
+      }
+      count++;
+      chararr[chararr_count++] = (char) c;
+    }
+
+    while (count < utflen) {
+      c = bytearr[count] & 0xff;
+      switch (c >> 4) {
+      case 0:
+      case 1:
+      case 2:
+      case 3:
+      case 4:
+      case 5:
+      case 6:
+      case 7:
+        /* 0xxxxxxx */
+        count++;
+        chararr[chararr_count++] = (char) c;
+        break;
+      case 12:
+      case 13:
+        /* 110x xxxx 10xx xxxx */
+        count += 2;
+        if (count > utflen) {
+          throw new UTFDataFormatException("malformed input: partial character 
at end");
+        }
+        char2 = bytearr[count - 1];
+        if ((char2 & 0xC0) != 0x80) {
+          throw new UTFDataFormatException("malformed input around byte " + 
count);
+        }
+        chararr[chararr_count++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
+        break;
+      case 14:
+        /* 1110 xxxx 10xx xxxx 10xx xxxx */
+        count += 3;
+        if (count > utflen) {
+          throw new UTFDataFormatException("malformed input: partial character 
at end");
+        }
+        char2 = bytearr[count - 2];
+        char3 = bytearr[count - 1];
+        if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
+          throw new UTFDataFormatException("malformed input around byte " + 
(count - 1));
+        }
+        chararr[chararr_count++] = (char) (((c & 0x0F) << 12) | ((char2 & 
0x3F) << 6) | ((char3 & 0x3F) << 0));
+        break;
+      default:
+        /* 10xx xxxx, 1111 xxxx */
+        throw new UTFDataFormatException("malformed input around byte " + 
count);
+      }
+    }
+    // The number of chars produced may be less than utflen
+    return new String(chararr, 0, chararr_count);
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+  }
+
+  @Override
+  public boolean hasUnReadData() {
+    return null != byteBuffer && byteBuffer.hasRemaining();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataWriter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataWriter.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataWriter.java
new file mode 100644
index 0000000..36b2fcf
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataWriter.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.buffer;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.mapred.nativetask.NativeDataTarget;
+
+/**
+ * write data to a output buffer
+ */
+public class ByteBufferDataWriter extends DataOutputStream {
+  private ByteBuffer buffer;
+  private final NativeDataTarget target;
+
+  private void checkSizeAndFlushNecessary(int length) throws IOException {
+    if (buffer.position() > 0 && buffer.remaining() < length) {
+      flush();
+    }
+  }
+
+  public ByteBufferDataWriter(NativeDataTarget handler) {
+    if (null != handler) {
+      this.buffer = handler.getOutputBuffer().getByteBuffer();
+    }
+    this.target = handler;
+  }
+
+  @Override
+  public synchronized void write(int v) throws IOException {
+    checkSizeAndFlushNecessary(1);
+    buffer.put((byte) v);
+  }
+
+  @Override
+  public boolean shortOfSpace(int dataLength) throws IOException {
+    if (buffer.remaining() < dataLength) {
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public synchronized void write(byte b[], int off, int len) throws 
IOException {
+    int remain = len;
+    int offset = off;
+    while (remain > 0) {
+      int currentFlush = 0;
+      if (buffer.remaining() > 0) {
+        currentFlush = Math.min(buffer.remaining(), remain);
+        buffer.put(b, offset, currentFlush);
+        remain -= currentFlush;
+        offset += currentFlush;
+      } else {
+        flush();
+      }
+    }
+  }
+
+  @Override
+  public void flush() throws IOException {
+    target.sendData();
+    buffer.position(0);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (hasUnFlushedData()) {
+      flush();
+    }
+    target.finishSendData();
+  }
+
+  private final static byte TRUE = (byte) 1;
+  private final static byte FALSE = (byte) 0;
+
+  @Override
+  public final void writeBoolean(boolean v) throws IOException {
+    checkSizeAndFlushNecessary(1);
+    buffer.put(v ? TRUE : FALSE);
+  }
+
+  @Override
+  public final void writeByte(int v) throws IOException {
+    checkSizeAndFlushNecessary(1);
+    buffer.put((byte) v);
+  }
+
+  @Override
+  public final void writeShort(int v) throws IOException {
+    checkSizeAndFlushNecessary(2);
+    buffer.putShort((short) v);
+  }
+
+  @Override
+  public final void writeChar(int v) throws IOException {
+    checkSizeAndFlushNecessary(2);
+    buffer.put((byte) ((v >>> 8) & 0xFF));
+    buffer.put((byte) ((v >>> 0) & 0xFF));
+  }
+
+  @Override
+  public final void writeInt(int v) throws IOException {
+    checkSizeAndFlushNecessary(4);
+    buffer.putInt(v);
+  }
+
+  @Override
+  public final void writeLong(long v) throws IOException {
+    checkSizeAndFlushNecessary(8);
+    buffer.putLong(v);
+  }
+
+  @Override
+  public final void writeFloat(float v) throws IOException {
+    checkSizeAndFlushNecessary(4);
+    writeInt(Float.floatToIntBits(v));
+  }
+
+  @Override
+  public final void writeDouble(double v) throws IOException {
+    checkSizeAndFlushNecessary(8);
+    writeLong(Double.doubleToLongBits(v));
+  }
+
+  @Override
+  public final void writeBytes(String s) throws IOException {
+    final int len = s.length();
+
+    int remain = len;
+    int offset = 0;
+    while (remain > 0) {
+      int currentFlush = 0;
+      if (buffer.remaining() > 0) {
+        currentFlush = Math.min(buffer.remaining(), remain);
+
+        for (int i = 0; i < currentFlush; i++) {
+          buffer.put((byte) s.charAt(offset + i));
+        }
+
+        remain -= currentFlush;
+        offset += currentFlush;
+      } else {
+        flush();
+      }
+    }
+  }
+
+  @Override
+  public final void writeChars(String s) throws IOException {
+    final int len = s.length();
+
+    int remain = len;
+    int offset = 0;
+
+    while (remain > 0) {
+      int currentFlush = 0;
+      if (buffer.remaining() > 2) {
+        currentFlush = Math.min(buffer.remaining() / 2, remain);
+
+        for (int i = 0; i < currentFlush; i++) {
+          buffer.putChar(s.charAt(offset + i));
+        }
+
+        remain -= currentFlush;
+        offset += currentFlush;
+      } else {
+        flush();
+      }
+    }
+  }
+
+  @Override
+  public final void writeUTF(String str) throws IOException {
+    writeUTF(str, this);
+  }
+
+  private int writeUTF(String str, DataOutput out) throws IOException {
+    final int strlen = str.length();
+    int utflen = 0;
+    int c, count = 0;
+
+    /* use charAt instead of copying String to char array */
+    for (int i = 0; i < strlen; i++) {
+      c = str.charAt(i);
+      if ((c >= 0x0001) && (c <= 0x007F)) {
+        utflen++;
+      } else if (c > 0x07FF) {
+        utflen += 3;
+      } else {
+        utflen += 2;
+      }
+    }
+
+    if (utflen > 65535) {
+      throw new UTFDataFormatException("encoded string too long: " + utflen + 
" bytes");
+    }
+
+    final byte[] bytearr = new byte[utflen + 2];
+    bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
+    bytearr[count++] = (byte) ((utflen >>> 0) & 0xFF);
+
+    int i = 0;
+    for (i = 0; i < strlen; i++) {
+      c = str.charAt(i);
+      if (!((c >= 0x0001) && (c <= 0x007F))) {
+        break;
+      }
+      bytearr[count++] = (byte) c;
+    }
+
+    for (; i < strlen; i++) {
+      c = str.charAt(i);
+      if ((c >= 0x0001) && (c <= 0x007F)) {
+        bytearr[count++] = (byte) c;
+
+      } else if (c > 0x07FF) {
+        bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
+        bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
+        bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
+      } else {
+        bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
+        bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
+      }
+    }
+    write(bytearr, 0, utflen + 2);
+    return utflen + 2;
+  }
+
+  @Override
+  public boolean hasUnFlushedData() {
+    return !(buffer.position() == 0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DataInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DataInputStream.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DataInputStream.java
new file mode 100644
index 0000000..0c1bc31
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DataInputStream.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.buffer;
+
+import java.io.DataInput;
+import java.io.InputStream;
+
+public abstract class DataInputStream extends InputStream implements DataInput 
{
+  public abstract boolean hasUnReadData();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DataOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DataOutputStream.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DataOutputStream.java
new file mode 100644
index 0000000..6cd6782
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DataOutputStream.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.buffer;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.OutputStream;
+
+public abstract class DataOutputStream extends OutputStream implements 
DataOutput {
+
+  /**
+   * Check whether this buffer has enough space to store length of bytes
+   * 
+   * @param length
+   *          , length of bytes
+   * @return
+   * @throws IOException
+   */
+  public abstract boolean shortOfSpace(int length) throws IOException;
+
+  /**
+   * Check whether there is unflushed data stored in the stream
+   * 
+   * @return
+   */
+  public abstract boolean hasUnFlushedData();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DirectBufferPool.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DirectBufferPool.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DirectBufferPool.java
new file mode 100644
index 0000000..bd3c6bb
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DirectBufferPool.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.buffer;
+
+import java.io.IOException;
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+/**
+ * as direct buffer memory is not collected by GC, we keep a pool
+ * to reuse direct buffers
+ */
+public class DirectBufferPool {
+  
+  private static DirectBufferPool directBufferPool = null;
+  private static Log LOG = LogFactory.getLog(DirectBufferPool.class);
+  private ConcurrentMap<Integer, Queue<WeakReference<ByteBuffer>>> bufferMap = 
new ConcurrentHashMap<Integer, Queue<WeakReference<ByteBuffer>>>();
+
+  private DirectBufferPool() {
+  }
+  
+  public static synchronized DirectBufferPool getInstance() {
+    if (null == directBufferPool) {
+      directBufferPool = new DirectBufferPool();
+    }
+    return directBufferPool;
+  }
+  
+  public static void destoryInstance(){
+    directBufferPool = null;
+  }
+  
+  public synchronized ByteBuffer borrowBuffer(int capacity) throws IOException 
{
+    Queue<WeakReference<ByteBuffer>> list = bufferMap.get(capacity);
+    if (null == list) {
+      return ByteBuffer.allocateDirect(capacity);
+    }
+    WeakReference<ByteBuffer> ref;
+    while ((ref = list.poll()) != null) {
+      ByteBuffer buf = ref.get();
+      if (buf != null) {
+        return buf;
+      }
+    }
+    return ByteBuffer.allocateDirect(capacity);
+  }
+  
+  public void returnBuffer(ByteBuffer buffer) throws IOException {
+    if (null == buffer || !buffer.isDirect()) {
+      throw new IOException("the buffer is null or the buffer returned is not 
direct buffer");
+    }
+
+    buffer.clear();
+    int capacity = buffer.capacity();
+    Queue<WeakReference<ByteBuffer>> list = bufferMap.get(capacity);
+    if (null == list) {
+      list = new ConcurrentLinkedQueue<WeakReference<ByteBuffer>>();
+      Queue<WeakReference<ByteBuffer>> prev = bufferMap.putIfAbsent(capacity, 
list);
+      if (prev != null) {
+        list = prev;
+      }
+    }
+    list.add(new WeakReference<ByteBuffer>(buffer));
+  }
+
+  int getBufCountsForCapacity(int capacity) {
+    return bufferMap.get(capacity).size();
+  }
+
+}

Reply via email to