http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/DataChannel.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/DataChannel.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/DataChannel.java new file mode 100644 index 0000000..e8132bd --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/DataChannel.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.nativetask; + +public enum DataChannel { + /** + * We will only read data from this channel + */ + IN, + /** + * We will only write data from this channel + */ + OUT, + /** + * We will do both read and write for this channel + */ + INOUT, + /** + * There is no data exchange + */ + NONE +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/DataReceiver.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/DataReceiver.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/DataReceiver.java new file mode 100644 index 0000000..c47cdac --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/DataReceiver.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.nativetask; + +import java.io.IOException; + +/** + * a DataReceiver pulls in arriving data, an example + * is {@link org.apache.hadoop.mapred.nativetask.handlers.BufferPuller} + */ +public interface DataReceiver { + + /** + * Send a signal to the receiver that the data arrives. + * The data is transferred in another band. + * + * @return + * @throws IOException + */ + public boolean receiveData() throws IOException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/HadoopPlatform.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/HadoopPlatform.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/HadoopPlatform.java new file mode 100644 index 0000000..1c4ede5 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/HadoopPlatform.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask; + +import java.io.IOException; + +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.ByteWritable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.VIntWritable; +import org.apache.hadoop.io.VLongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.nativetask.serde.*; +import org.apache.log4j.Logger; + +public class HadoopPlatform extends Platform { + private static final Logger LOG = Logger.getLogger(HadoopPlatform.class); + + public HadoopPlatform() throws IOException { + } + + @Override + public void init() throws IOException { + registerKey(NullWritable.class.getName(), NullWritableSerializer.class); + registerKey(Text.class.getName(), TextSerializer.class); + registerKey(LongWritable.class.getName(), LongWritableSerializer.class); + registerKey(IntWritable.class.getName(), IntWritableSerializer.class); + registerKey(Writable.class.getName(), DefaultSerializer.class); + registerKey(BytesWritable.class.getName(), BytesWritableSerializer.class); + registerKey(BooleanWritable.class.getName(), BoolWritableSerializer.class); + registerKey(ByteWritable.class.getName(), ByteWritableSerializer.class); + registerKey(FloatWritable.class.getName(), FloatWritableSerializer.class); + registerKey(DoubleWritable.class.getName(), DoubleWritableSerializer.class); + registerKey(VIntWritable.class.getName(), VIntWritableSerializer.class); + registerKey(VLongWritable.class.getName(), VLongWritableSerializer.class); + + LOG.info("Hadoop platform inited"); + } + + @Override + public boolean support(String keyClassName, INativeSerializer serializer, JobConf job) { + if (keyClassNames.contains(keyClassName) + && serializer instanceof INativeComparable) { + return true; + } else { + return false; + } + } + + @Override + public boolean define(Class comparatorClass) { + return false; + } + + @Override + public String name() { + return "Hadoop"; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/ICombineHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/ICombineHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/ICombineHandler.java new file mode 100644 index 0000000..8f50863 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/ICombineHandler.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask; + +import java.io.IOException; + +/** + * interacts with native side to support Java Combiner + */ +public interface ICombineHandler { + + /** + * run combiner + * @throws IOException + */ + public void combine() throws IOException; + + /** + * @return id of this handler + */ + public long getId(); + + /** + * close handlers, buffer pullers and pushers + * @throws IOException + */ + public void close() throws IOException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/INativeComparable.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/INativeComparable.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/INativeComparable.java new file mode 100644 index 0000000..ae07f3b --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/INativeComparable.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.nativetask; + +/** + * + * Any key type that is comparable at native side must implement this interface + * + * a native comparator function should have the ComparatorPtr type + * + * typedef int (*ComparatorPtr)(const char * src, uint32_t srcLength, + * const char * dest, uint32_t destLength); + * + * keys are in serialized format at native side. The function has passed in + * the keys' locations and lengths such that we can compare them in the same + * logic as their Java comparator + * + * + * For example, a HiveKey {@see HiveKey#write} is serialized as + * int field (containing the length of raw bytes) + raw bytes + * When comparing two HiveKeys, we firstly read the length field and then + * comparing the raw bytes invoking the BytesComparator provided by our library. + * We pass the location and length of raw bytes into BytesComparator + * + * int HivePlatform::HiveKeyComparator(const char * src, uint32_t srcLength, + * const char * dest, uint32_t destLength) { + * uint32_t sl = bswap(*(uint32_t*)src); + * uint32_t dl = bswap(*(uint32_t*)dest); + * return NativeObjectFactory::BytesComparator(src + 4, sl, dest + 4, dl); + * } + */ +public interface INativeComparable { + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/INativeHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/INativeHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/INativeHandler.java new file mode 100644 index 0000000..3e6e3ac --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/INativeHandler.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer; + +/** + * A Handler accept input, and give output can be used to transfer command and data + */ +public interface INativeHandler extends NativeDataTarget, NativeDataSource { + + public String name(); + + public long getNativeHandler(); + + /** + * init the native handler + */ + public void init(Configuration conf) throws IOException; + + /** + * close the native handler + */ + public void close() throws IOException; + + /** + * call command to downstream + * + * @param command + * @param parameter + * @return + * @throws IOException + */ + public ReadWriteBuffer call(Command command, ReadWriteBuffer parameter) throws IOException; + + /** + * @param handler + */ + void setCommandDispatcher(CommandDispatcher handler); + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeBatchProcessor.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeBatchProcessor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeBatchProcessor.java new file mode 100644 index 0000000..fd68ea6 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeBatchProcessor.java @@ -0,0 +1,289 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.nativetask; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.nativetask.buffer.BufferType; +import org.apache.hadoop.mapred.nativetask.buffer.DirectBufferPool; +import org.apache.hadoop.mapred.nativetask.buffer.InputBuffer; +import org.apache.hadoop.mapred.nativetask.buffer.OutputBuffer; +import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer; +import org.apache.hadoop.mapred.nativetask.util.ConfigUtil; + +/** + * used to create channel, transfer data and command between Java and native + */ +public class NativeBatchProcessor implements INativeHandler { + private static Log LOG = LogFactory.getLog(NativeBatchProcessor.class); + + private final String nativeHandlerName; + private long nativeHandlerAddr; + + private boolean isInputFinished = false; + + // << Field used directly in Native, the name must NOT be changed + private ByteBuffer rawOutputBuffer; + private ByteBuffer rawInputBuffer; + // >> + + private InputBuffer in; + private OutputBuffer out; + + private CommandDispatcher commandDispatcher; + private DataReceiver dataReceiver; + + static { + if (NativeRuntime.isNativeLibraryLoaded()) { + InitIDs(); + } + } + + public static INativeHandler create(String nativeHandlerName, + Configuration conf, DataChannel channel) throws IOException { + + final int bufferSize = conf.getInt(Constants.NATIVE_PROCESSOR_BUFFER_KB, + 1024) * 1024; + + LOG.info("NativeHandler: direct buffer size: " + bufferSize); + + OutputBuffer out = null; + InputBuffer in = null; + + switch (channel) { + case IN: + in = new InputBuffer(BufferType.DIRECT_BUFFER, bufferSize); + break; + case OUT: + out = new OutputBuffer(BufferType.DIRECT_BUFFER, bufferSize); + break; + case INOUT: + in = new InputBuffer(BufferType.DIRECT_BUFFER, bufferSize); + out = new OutputBuffer(BufferType.DIRECT_BUFFER, bufferSize); + break; + case NONE: + } + + final INativeHandler handler = new NativeBatchProcessor(nativeHandlerName, + in, out); + handler.init(conf); + return handler; + } + + protected NativeBatchProcessor(String nativeHandlerName, InputBuffer input, + OutputBuffer output) throws IOException { + this.nativeHandlerName = nativeHandlerName; + + if (null != input) { + this.in = input; + this.rawInputBuffer = input.getByteBuffer(); + } + if (null != output) { + this.out = output; + this.rawOutputBuffer = output.getByteBuffer(); + } + } + + @Override + public void setCommandDispatcher(CommandDispatcher handler) { + this.commandDispatcher = handler; + } + + @Override + public void init(Configuration conf) throws IOException { + this.nativeHandlerAddr = NativeRuntime + .createNativeObject(nativeHandlerName); + if (this.nativeHandlerAddr == 0) { + throw new RuntimeException("Native object create failed, class: " + + nativeHandlerName); + } + setupHandler(nativeHandlerAddr, ConfigUtil.toBytes(conf)); + } + + @Override + public synchronized void close() throws IOException { + if (nativeHandlerAddr != 0) { + NativeRuntime.releaseNativeObject(nativeHandlerAddr); + nativeHandlerAddr = 0; + } + if (null != in && null != in.getByteBuffer() && in.getByteBuffer().isDirect()) { + DirectBufferPool.getInstance().returnBuffer(in.getByteBuffer()); + } + } + + @Override + public long getNativeHandler() { + return nativeHandlerAddr; + } + + @Override + public ReadWriteBuffer call(Command command, ReadWriteBuffer parameter) + throws IOException { + final byte[] bytes = nativeCommand(nativeHandlerAddr, command.id(), + null == parameter ? null : parameter.getBuff()); + + final ReadWriteBuffer result = new ReadWriteBuffer(bytes); + result.setWritePoint(bytes.length); + return result; + } + + @Override + public void sendData() throws IOException { + nativeProcessInput(nativeHandlerAddr, rawOutputBuffer.position()); + rawOutputBuffer.position(0); + } + + @Override + public void finishSendData() throws IOException { + if (null == rawOutputBuffer || isInputFinished) { + return; + } + + sendData(); + nativeFinish(nativeHandlerAddr); + isInputFinished = true; + } + + private byte[] sendCommandToJava(int command, byte[] data) throws IOException { + try { + + final Command cmd = new Command(command); + ReadWriteBuffer param = null; + + if (null != data) { + param = new ReadWriteBuffer(); + param.reset(data); + param.setWritePoint(data.length); + } + + if (null != commandDispatcher) { + ReadWriteBuffer result = null; + + result = commandDispatcher.onCall(cmd, param); + if (null != result) { + return result.getBuff(); + } else { + return null; + } + } else { + return null; + } + + } catch (Exception e) { + e.printStackTrace(); + throw new IOException(e); + } + } + + /** + * Called by native side, clean output buffer so native side can continue + * processing + */ + private void flushOutput(int length) throws IOException { + + if (null != rawInputBuffer) { + rawInputBuffer.position(0); + rawInputBuffer.limit(length); + + if (null != dataReceiver) { + try { + dataReceiver.receiveData(); + } catch (IOException e) { + e.printStackTrace(); + throw e; + } + } + } + } + + /** + * Cache JNI field & method ids + */ + private static native void InitIDs(); + + /** + * Setup native side BatchHandler + */ + private native void setupHandler(long nativeHandlerAddr, byte[][] configs); + + /** + * Let native side to process data in inputBuffer + * + * @param handler + * @param length + */ + private native void nativeProcessInput(long handler, int length); + + /** + * Notice native side input is finished + * + * @param handler + */ + private native void nativeFinish(long handler); + + /** + * Send control message to native side + * + * @param cmd + * command data + * @return return value + */ + private native byte[] nativeCommand(long handler, int cmd, byte[] parameter); + + /** + * Load data from native + * + * @return + */ + private native void nativeLoadData(long handler); + + protected void finishOutput() { + } + + @Override + public InputBuffer getInputBuffer() { + return this.in; + } + + @Override + public OutputBuffer getOutputBuffer() { + return this.out; + } + + @Override + public void loadData() throws IOException { + nativeLoadData(nativeHandlerAddr); + // + // return call(Command.CMD_LOAD, param); + } + + @Override + public void setDataReceiver(DataReceiver handler) { + this.dataReceiver = handler; + } + + @Override + public String name() { + return nativeHandlerName; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeDataSource.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeDataSource.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeDataSource.java new file mode 100644 index 0000000..d0bfa93 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeDataSource.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.nativetask; + +import java.io.IOException; + +import org.apache.hadoop.mapred.nativetask.buffer.InputBuffer; +import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer; + +/** + * NativeDataSource loads data from upstream + */ +public interface NativeDataSource { + + /** + * get input buffer + * + * @return + */ + public InputBuffer getInputBuffer(); + + /** + * set listener. When data from upstream arrives, the listener will be activated. + * + * @param handler + */ + void setDataReceiver(DataReceiver handler); + + /** + * load data from upstream + * + * @return + * @throws IOException + */ + public void loadData() throws IOException; + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeDataTarget.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeDataTarget.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeDataTarget.java new file mode 100644 index 0000000..7a780eb --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeDataTarget.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.nativetask; + +import java.io.IOException; + +import org.apache.hadoop.mapred.nativetask.buffer.OutputBuffer; + +/** + * NativeDataTarge sends data to downstream + */ +public interface NativeDataTarget { + + /** + * send a signal to indicate that the data has been stored in output buffer + * + * @throws IOException + */ + public void sendData() throws IOException; + + /** + * Send a signal that there is no more data + * + * @throws IOException + */ + public void finishSendData() throws IOException; + + /** + * get the output buffer. + * + * @return + */ + public OutputBuffer getOutputBuffer(); + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeMapOutputCollectorDelegator.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeMapOutputCollectorDelegator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeMapOutputCollectorDelegator.java new file mode 100644 index 0000000..a48b5c3 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeMapOutputCollectorDelegator.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask; + +import java.io.File; +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.mapred.InvalidJobConfException; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapOutputCollector; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.mapred.nativetask.handlers.NativeCollectorOnlyHandler; +import org.apache.hadoop.mapred.nativetask.serde.INativeSerializer; +import org.apache.hadoop.mapred.nativetask.serde.NativeSerialization; +import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; +import org.apache.hadoop.util.QuickSort; +import org.apache.hadoop.util.RunJar; + +/** + * native map output collector wrapped in Java interface + */ +public class NativeMapOutputCollectorDelegator<K, V> implements MapOutputCollector<K, V> { + + private static Log LOG = LogFactory.getLog(NativeMapOutputCollectorDelegator.class); + private JobConf job; + private NativeCollectorOnlyHandler<K, V> handler; + + private StatusReportChecker updater; + + @Override + public void collect(K key, V value, int partition) throws IOException, InterruptedException { + handler.collect(key, value, partition); + } + + @Override + public void close() throws IOException, InterruptedException { + handler.close(); + if (null != updater) { + updater.stop(); + } + } + + @Override + public void flush() throws IOException, InterruptedException, ClassNotFoundException { + handler.flush(); + } + + @Override + public void init(Context context) throws IOException, ClassNotFoundException { + this.job = context.getJobConf(); + + Platforms.init(job); + + if (job.getNumReduceTasks() == 0) { + String message = "There is no reducer, no need to use native output collector"; + LOG.error(message); + throw new InvalidJobConfException(message); + } + + Class comparatorClass = job.getClass(MRJobConfig.KEY_COMPARATOR, null, RawComparator.class); + if (comparatorClass != null && !Platforms.define(comparatorClass)) { + String message = "Native output collector don't support customized java comparator " + + job.get(MRJobConfig.KEY_COMPARATOR); + LOG.error(message); + throw new InvalidJobConfException(message); + } + + if (job.getBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, false) == true) { + if (!isCodecSupported(job.get(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC))) { + String message = "Native output collector don't support compression codec " + + job.get(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC) + ", We support Gzip, Lz4, snappy"; + LOG.error(message); + throw new InvalidJobConfException(message); + } + } + + if (!QuickSort.class.getName().equals(job.get(Constants.MAP_SORT_CLASS))) { + String message = "Native-Task don't support sort class " + job.get(Constants.MAP_SORT_CLASS); + LOG.error(message); + throw new InvalidJobConfException(message); + } + + if (job.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY, false) == true) { + String message = "Native-Task don't support secure shuffle"; + LOG.error(message); + throw new InvalidJobConfException(message); + } + + final Class<?> keyCls = job.getMapOutputKeyClass(); + try { + @SuppressWarnings("rawtypes") + final INativeSerializer serializer = NativeSerialization.getInstance().getSerializer(keyCls); + if (null == serializer) { + String message = "Key type not supported. Cannot find serializer for " + keyCls.getName(); + LOG.error(message); + throw new InvalidJobConfException(message); + } else if (!Platforms.support(keyCls.getName(), serializer, job)) { + String message = "Native output collector don't support this key, this key is not comparable in native " + + keyCls.getName(); + LOG.error(message); + throw new InvalidJobConfException(message); + } + } catch (final IOException e) { + String message = "Cannot find serializer for " + keyCls.getName(); + LOG.error(message); + throw new IOException(message); + } + + final boolean ret = NativeRuntime.isNativeLibraryLoaded(); + if (ret) { + NativeRuntime.configure(job); + + final long updateInterval = job.getLong(Constants.NATIVE_STATUS_UPDATE_INTERVAL, + Constants.NATIVE_STATUS_UPDATE_INTERVAL_DEFVAL); + updater = new StatusReportChecker(context.getReporter(), updateInterval); + updater.start(); + + } else { + String message = "Nativeruntime cannot be loaded, please check the libnativetask.so is in hadoop library dir"; + LOG.error(message); + throw new InvalidJobConfException(message); + } + + this.handler = null; + try { + final Class<K> oKClass = (Class<K>) job.getMapOutputKeyClass(); + final Class<K> oVClass = (Class<K>) job.getMapOutputValueClass(); + final TaskAttemptID id = context.getMapTask().getTaskID(); + final TaskContext taskContext = new TaskContext(job, null, null, oKClass, oVClass, + context.getReporter(), id); + handler = NativeCollectorOnlyHandler.create(taskContext); + } catch (final IOException e) { + String message = "Native output collector cannot be loaded;"; + LOG.error(message); + throw new IOException(message, e); + } + + LOG.info("Native output collector can be successfully enabled!"); + } + + private boolean isCodecSupported(String string) { + if ("org.apache.hadoop.io.compress.SnappyCodec".equals(string) + || "org.apache.hadoop.io.compress.GzipCodec".equals(string) + || "org.apache.hadoop.io.compress.Lz4Codec".equals(string)) { + return true; + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeRuntime.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeRuntime.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeRuntime.java new file mode 100644 index 0000000..53b1acd --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeRuntime.java @@ -0,0 +1,216 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.nativetask; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.Task.TaskReporter; +import org.apache.hadoop.mapred.nativetask.util.BytesUtil; +import org.apache.hadoop.mapred.nativetask.util.ConfigUtil; +import org.apache.hadoop.mapred.nativetask.util.SnappyUtil; +import org.apache.hadoop.util.VersionInfo; + +/** + * This class stands for the native runtime It has three functions: 1. Create native handlers for map, reduce, + * outputcollector, and etc 2. Configure native task with provided MR configs 3. Provide file system api to native + * space, so that it can use File system like HDFS. + * + */ +public class NativeRuntime { + private static Log LOG = LogFactory.getLog(NativeRuntime.class); + private static boolean nativeLibraryLoaded = false; + + private static Configuration conf = new Configuration(); + + static { + try { + if (false == SnappyUtil.isNativeSnappyLoaded(conf)) { + throw new IOException("Snappy library cannot be loaded"); + } else { + LOG.info("Snappy native library is available"); + } + System.loadLibrary("nativetask"); + LOG.info("Nativetask JNI library loaded."); + nativeLibraryLoaded = true; + } catch (final Throwable t) { + // Ignore failures + LOG.error("Failed to load nativetask JNI library with error: " + t); + LOG.info("java.library.path=" + System.getProperty("java.library.path")); + LOG.info("LD_LIBRARY_PATH=" + System.getenv("LD_LIBRARY_PATH")); + } + } + + private static void assertNativeLibraryLoaded() { + if (!nativeLibraryLoaded) { + throw new RuntimeException("Native runtime library not loaded"); + } + } + + public static boolean isNativeLibraryLoaded() { + return nativeLibraryLoaded; + } + + public static void configure(Configuration jobConf) { + assertNativeLibraryLoaded(); + conf = new Configuration(jobConf); + conf.set(Constants.NATIVE_HADOOP_VERSION, VersionInfo.getVersion()); + JNIConfigure(ConfigUtil.toBytes(conf)); + } + + /** + * create native object We use it to create native handlers + * + * @param clazz + * @return + */ + public synchronized static long createNativeObject(String clazz) { + assertNativeLibraryLoaded(); + final long ret = JNICreateNativeObject(BytesUtil.toBytes(clazz)); + if (ret == 0) { + LOG.warn("Can't create NativeObject for class " + clazz + ", probably not exist."); + } + return ret; + } + + /** + * Register a customized library + * + * @param clazz + * @return + */ + public synchronized static long registerLibrary(String libraryName, String clazz) { + assertNativeLibraryLoaded(); + final long ret = JNIRegisterModule(BytesUtil.toBytes(libraryName), BytesUtil.toBytes(clazz)); + if (ret != 0) { + LOG.warn("Can't create NativeObject for class " + clazz + ", probably not exist."); + } + return ret; + } + + /** + * destroy native object We use to destory native handlers + */ + public synchronized static void releaseNativeObject(long addr) { + assertNativeLibraryLoaded(); + JNIReleaseNativeObject(addr); + } + + /** + * Get the status report from native space + * + * @param reporter + * @throws IOException + */ + public static void reportStatus(TaskReporter reporter) throws IOException { + assertNativeLibraryLoaded(); + synchronized (reporter) { + final byte[] statusBytes = JNIUpdateStatus(); + final DataInputBuffer ib = new DataInputBuffer(); + ib.reset(statusBytes, statusBytes.length); + final FloatWritable progress = new FloatWritable(); + progress.readFields(ib); + reporter.setProgress(progress.get()); + final Text status = new Text(); + status.readFields(ib); + if (status.getLength() > 0) { + reporter.setStatus(status.toString()); + } + final IntWritable numCounters = new IntWritable(); + numCounters.readFields(ib); + if (numCounters.get() == 0) { + return; + } + final Text group = new Text(); + final Text name = new Text(); + final LongWritable amount = new LongWritable(); + for (int i = 0; i < numCounters.get(); i++) { + group.readFields(ib); + name.readFields(ib); + amount.readFields(ib); + reporter.incrCounter(group.toString(), name.toString(), amount.get()); + } + } + } + + + /******************************************************* + *** The following are JNI Apis + ********************************************************/ + + /** + * Config the native runtime with mapreduce job configurations. + * + * @param configs + */ + private native static void JNIConfigure(byte[][] configs); + + /** + * create a native object in native space + * + * @param clazz + * @return + */ + private native static long JNICreateNativeObject(byte[] clazz); + + /** + * create the default native object for certain type + * + * @param type + * @return + */ + @Deprecated + private native static long JNICreateDefaultNativeObject(byte[] type); + + /** + * destroy native object in native space + * + * @param addr + */ + private native static void JNIReleaseNativeObject(long addr); + + /** + * get status update from native side Encoding: progress:float status:Text Counter number: int the count of the + * counters Counters: array [group:Text, name:Text, incrCount:Long] + * + * @return + */ + private native static byte[] JNIUpdateStatus(); + + /** + * Not used. + */ + private native static void JNIRelease(); + + /** + * Not used. + */ + private native static int JNIRegisterModule(byte[] path, byte[] name); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Platform.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Platform.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Platform.java new file mode 100644 index 0000000..aef53ce --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Platform.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.InvalidJobConfException; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.nativetask.serde.INativeSerializer; +import org.apache.hadoop.mapred.nativetask.serde.NativeSerialization; + +/** + * Base class for platforms. A platform is a framework running on top of + * MapReduce, like Hadoop, Hive, Pig, Mahout. Each framework defines its + * own key type and value type across a MapReduce job. For each platform, + * we should implement serializers such that we could communicate data with + * native side and native comparators so our native output collectors could + * sort them and write out. We've already provided the {@link HadoopPlatform} + * that supports all key types of Hadoop and users could implement their custom + * platform. + */ +public abstract class Platform { + private final NativeSerialization serialization; + protected Set<String> keyClassNames = new HashSet<String>(); + + public Platform() { + this.serialization = NativeSerialization.getInstance(); + } + + /** + * initialize a platform, where we should call registerKey + * + * @throws IOException + */ + public abstract void init() throws IOException; + + /** + * @return name of a Platform, useful for logs and debug + */ + public abstract String name(); + + + /** + * associate a key class with its serializer and platform + * + * @param keyClassName map out key class name + * @param key key serializer class + * @throws IOException + */ + protected void registerKey(String keyClassName, Class key) throws IOException { + serialization.register(keyClassName, key); + keyClassNames.add(keyClassName); + } + + /** + * whether a platform supports a specific key should at least satisfy two conditions + * + * 1. the key belongs to the platform + * 2. the associated serializer must implement {@link INativeComparable} interface + * + * + * @param keyClassName map out put key class name + * @param serializer serializer associated with key via registerKey + * @param job job configuration + * @return true if the platform has implemented native comparators of the key and + * false otherwise + */ + protected abstract boolean support(String keyClassName, INativeSerializer serializer, JobConf job); + + + /** + * whether it's the platform that has defined a custom Java comparator + * + * NativeTask doesn't support custom Java comparator(set with mapreduce.job.output.key.comparator.class) + * but a platform (e.g Pig) could also set that conf and implement native comparators so + * we shouldn't bail out. + * + * @param keyComparator comparator set with mapreduce.job.output.key.comparator.class + * @return + */ + protected abstract boolean define(Class keyComparator); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Platforms.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Platforms.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Platforms.java new file mode 100644 index 0000000..154bbc8 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Platforms.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask; + +import java.io.IOException; +import java.util.ServiceLoader; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.nativetask.serde.INativeSerializer; +import org.apache.hadoop.mapred.nativetask.serde.NativeSerialization; +import org.apache.log4j.Logger; + + +/** + * this class will load in and init all platforms on classpath + * it is also the facade to check for key type support and other + * platform methods + */ +public class Platforms { + + private static final Logger LOG = Logger.getLogger(Platforms.class); + private static final ServiceLoader<Platform> platforms = ServiceLoader.load(Platform.class); + + public static void init(Configuration conf) throws IOException { + + NativeSerialization.getInstance().reset(); + synchronized (platforms) { + for (Platform platform : platforms) { + platform.init(); + } + } + } + + public static boolean support(String keyClassName, INativeSerializer serializer, JobConf job) { + synchronized (platforms) { + for (Platform platform : platforms) { + if (platform.support(keyClassName, serializer, job)) { + LOG.debug("platform " + platform.name() + " support key class" + + keyClassName); + return true; + } + } + } + return false; + } + + public static boolean define(Class keyComparator) { + synchronized (platforms) { + for (Platform platform : platforms) { + if (platform.define(keyComparator)) { + LOG.debug("platform " + platform.name() + " define comparator " + + keyComparator.getName()); + return true; + } + } + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/StatusReportChecker.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/StatusReportChecker.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/StatusReportChecker.java new file mode 100644 index 0000000..1fa59dc --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/StatusReportChecker.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapred.Task.Counter; +import org.apache.hadoop.mapred.Task.TaskReporter; + +/** + * Will periodically check status from native and report to MR framework. + * + */ +public class StatusReportChecker implements Runnable { + + private static Log LOG = LogFactory.getLog(StatusReportChecker.class); + public static int INTERVAL = 1000; // milli-seconds + + private Thread checker; + private final TaskReporter reporter; + private final long interval; + + public StatusReportChecker(TaskReporter reporter) { + this(reporter, INTERVAL); + } + + public StatusReportChecker(TaskReporter reporter, long interval) { + this.reporter = reporter; + this.interval = interval; + } + + @Override + public void run() { + while (true) { + try { + Thread.sleep(interval); + } catch (final InterruptedException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("StatusUpdater thread exiting " + "since it got interrupted"); + } + break; + } + try { + NativeRuntime.reportStatus(reporter); + } catch (final IOException e) { + LOG.warn("Update native status got exception", e); + reporter.setStatus(e.toString()); + break; + } + } + } + + protected void initUsedCounters() { + reporter.getCounter(Counter.MAP_INPUT_RECORDS); + reporter.getCounter(Counter.MAP_OUTPUT_RECORDS); + reporter.getCounter(Counter.MAP_INPUT_BYTES); + reporter.getCounter(Counter.MAP_OUTPUT_BYTES); + reporter.getCounter(Counter.MAP_OUTPUT_MATERIALIZED_BYTES); + reporter.getCounter(Counter.COMBINE_INPUT_RECORDS); + reporter.getCounter(Counter.COMBINE_OUTPUT_RECORDS); + reporter.getCounter(Counter.REDUCE_INPUT_RECORDS); + reporter.getCounter(Counter.REDUCE_OUTPUT_RECORDS); + reporter.getCounter(Counter.REDUCE_INPUT_GROUPS); + reporter.getCounter(Counter.SPILLED_RECORDS); + reporter.getCounter(Counter.MAP_OUTPUT_BYTES); + reporter.getCounter(Counter.MAP_OUTPUT_RECORDS); + } + + public synchronized void start() { + if (checker == null) { + // init counters used by native side, + // so they will have correct display name + initUsedCounters(); + checker = new Thread(this); + checker.setDaemon(true); + checker.start(); + } + } + + public synchronized void stop() throws InterruptedException { + if (checker != null) { + checker.interrupt(); + checker.join(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/TaskContext.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/TaskContext.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/TaskContext.java new file mode 100644 index 0000000..a4430ed --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/TaskContext.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask; + +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Task.TaskReporter; +import org.apache.hadoop.mapred.TaskAttemptID; + +public class TaskContext { + private final JobConf conf; + private Class iKClass; + private Class iVClass; + private Class oKClass; + private Class oVClass; + private final TaskReporter reporter; + private final TaskAttemptID taskAttemptID; + + public TaskContext(JobConf conf, Class iKClass, Class iVClass, Class oKClass, Class oVClass, TaskReporter reporter, + TaskAttemptID id) { + this.conf = conf; + this.iKClass = iKClass; + this.iVClass = iVClass; + this.oKClass = oKClass; + this.oVClass = oVClass; + this.reporter = reporter; + this.taskAttemptID = id; + } + + public Class getInputKeyClass() { + return iKClass; + } + + public void setInputKeyClass(Class klass) { + this.iKClass = klass; + } + + public Class getInputValueClass() { + return iVClass; + } + + public void setInputValueClass(Class klass) { + this.iVClass = klass; + } + + public Class getOuputKeyClass() { + return this.oKClass; + } + + public void setOutputKeyClass(Class klass) { + this.oKClass = klass; + } + + public Class getOutputValueClass() { + return this.oVClass; + } + + public void setOutputValueClass(Class klass) { + this.oVClass = klass; + } + + public TaskReporter getTaskReporter() { + return this.reporter; + } + + public TaskAttemptID getTaskAttemptId() { + return this.taskAttemptID; + } + + public JobConf getConf() { + return this.conf; + } + + public TaskContext copyOf() { + return new TaskContext(conf, iKClass, iVClass, oKClass, oVClass, reporter, taskAttemptID); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/BufferType.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/BufferType.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/BufferType.java new file mode 100644 index 0000000..a52be1f --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/BufferType.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.nativetask.buffer; + +public enum BufferType { + + DIRECT_BUFFER, + + HEAP_BUFFER +}; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataReader.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataReader.java new file mode 100644 index 0000000..5af7180 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataReader.java @@ -0,0 +1,264 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask.buffer; + +import java.io.DataInput; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.PushbackInputStream; +import java.io.UTFDataFormatException; +import java.nio.ByteBuffer; + +/** + * read data from a input buffer + */ +public class ByteBufferDataReader extends DataInputStream { + private ByteBuffer byteBuffer; + private char lineCache[]; + + public ByteBufferDataReader(InputBuffer buffer) { + if (buffer != null) { + this.byteBuffer = buffer.getByteBuffer(); + } + } + + public void reset(InputBuffer buffer) { + this.byteBuffer = buffer.getByteBuffer(); + } + + @Override + public int read() throws IOException { + return byteBuffer.get(); + } + + @Override + public int read(byte b[], int off, int len) throws IOException { + byteBuffer.get(b, off, len); + return len; + } + + @Override + public void readFully(byte[] b) throws IOException { + byteBuffer.get(b, 0, b.length); + } + + @Override + public void readFully(byte[] b, int off, int len) throws IOException { + byteBuffer.get(b, off, len); + } + + @Override + public int skipBytes(int n) throws IOException { + final int remains = byteBuffer.remaining(); + final int skip = (remains < n) ? remains : n; + final int current = byteBuffer.position(); + byteBuffer.position(current + skip); + return skip; + } + + @Override + public boolean readBoolean() throws IOException { + return (byteBuffer.get() == 1) ? true : false; + } + + @Override + public byte readByte() throws IOException { + return byteBuffer.get(); + } + + @Override + public int readUnsignedByte() throws IOException { + final int ch = byteBuffer.get(); + if (ch < 0) { + throw new EOFException(); + } + return ch; + } + + @Override + public short readShort() throws IOException { + return byteBuffer.getShort(); + } + + @Override + public int readUnsignedShort() throws IOException { + return byteBuffer.getShort(); + } + + @Override + public char readChar() throws IOException { + return byteBuffer.getChar(); + } + + @Override + public int readInt() throws IOException { + return byteBuffer.getInt(); + } + + @Override + public long readLong() throws IOException { + return byteBuffer.getLong(); + } + + @Override + public float readFloat() throws IOException { + return byteBuffer.getFloat(); + } + + @Override + public double readDouble() throws IOException { + return byteBuffer.getDouble(); + } + + @Override + public String readLine() throws IOException { + + InputStream in = this; + + char buf[] = lineCache; + + if (buf == null) { + buf = lineCache = new char[128]; + } + + int room = buf.length; + int offset = 0; + int c; + + loop: while (true) { + switch (c = in.read()) { + case -1: + case '\n': + break loop; + + case '\r': + final int c2 = in.read(); + if ((c2 != '\n') && (c2 != -1)) { + if (!(in instanceof PushbackInputStream)) { + in = new PushbackInputStream(in); + } + ((PushbackInputStream) in).unread(c2); + } + break loop; + + default: + if (--room < 0) { + buf = new char[offset + 128]; + room = buf.length - offset - 1; + System.arraycopy(lineCache, 0, buf, 0, offset); + lineCache = buf; + } + buf[offset++] = (char) c; + break; + } + } + if ((c == -1) && (offset == 0)) { + return null; + } + return String.copyValueOf(buf, 0, offset); + } + + @Override + public final String readUTF() throws IOException { + return readUTF(this); + } + + private final static String readUTF(DataInput in) throws IOException { + final int utflen = in.readUnsignedShort(); + byte[] bytearr = null; + char[] chararr = null; + + bytearr = new byte[utflen]; + chararr = new char[utflen]; + + int c, char2, char3; + int count = 0; + int chararr_count = 0; + + in.readFully(bytearr, 0, utflen); + + while (count < utflen) { + c = bytearr[count] & 0xff; + if (c > 127) { + break; + } + count++; + chararr[chararr_count++] = (char) c; + } + + while (count < utflen) { + c = bytearr[count] & 0xff; + switch (c >> 4) { + case 0: + case 1: + case 2: + case 3: + case 4: + case 5: + case 6: + case 7: + /* 0xxxxxxx */ + count++; + chararr[chararr_count++] = (char) c; + break; + case 12: + case 13: + /* 110x xxxx 10xx xxxx */ + count += 2; + if (count > utflen) { + throw new UTFDataFormatException("malformed input: partial character at end"); + } + char2 = bytearr[count - 1]; + if ((char2 & 0xC0) != 0x80) { + throw new UTFDataFormatException("malformed input around byte " + count); + } + chararr[chararr_count++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F)); + break; + case 14: + /* 1110 xxxx 10xx xxxx 10xx xxxx */ + count += 3; + if (count > utflen) { + throw new UTFDataFormatException("malformed input: partial character at end"); + } + char2 = bytearr[count - 2]; + char3 = bytearr[count - 1]; + if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) { + throw new UTFDataFormatException("malformed input around byte " + (count - 1)); + } + chararr[chararr_count++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0)); + break; + default: + /* 10xx xxxx, 1111 xxxx */ + throw new UTFDataFormatException("malformed input around byte " + count); + } + } + // The number of chars produced may be less than utflen + return new String(chararr, 0, chararr_count); + } + + @Override + public void close() throws IOException { + super.close(); + } + + @Override + public boolean hasUnReadData() { + return null != byteBuffer && byteBuffer.hasRemaining(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataWriter.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataWriter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataWriter.java new file mode 100644 index 0000000..36b2fcf --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataWriter.java @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask.buffer; + +import java.io.DataOutput; +import java.io.IOException; +import java.io.UTFDataFormatException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.mapred.nativetask.NativeDataTarget; + +/** + * write data to a output buffer + */ +public class ByteBufferDataWriter extends DataOutputStream { + private ByteBuffer buffer; + private final NativeDataTarget target; + + private void checkSizeAndFlushNecessary(int length) throws IOException { + if (buffer.position() > 0 && buffer.remaining() < length) { + flush(); + } + } + + public ByteBufferDataWriter(NativeDataTarget handler) { + if (null != handler) { + this.buffer = handler.getOutputBuffer().getByteBuffer(); + } + this.target = handler; + } + + @Override + public synchronized void write(int v) throws IOException { + checkSizeAndFlushNecessary(1); + buffer.put((byte) v); + } + + @Override + public boolean shortOfSpace(int dataLength) throws IOException { + if (buffer.remaining() < dataLength) { + return true; + } + return false; + } + + @Override + public synchronized void write(byte b[], int off, int len) throws IOException { + int remain = len; + int offset = off; + while (remain > 0) { + int currentFlush = 0; + if (buffer.remaining() > 0) { + currentFlush = Math.min(buffer.remaining(), remain); + buffer.put(b, offset, currentFlush); + remain -= currentFlush; + offset += currentFlush; + } else { + flush(); + } + } + } + + @Override + public void flush() throws IOException { + target.sendData(); + buffer.position(0); + } + + @Override + public void close() throws IOException { + if (hasUnFlushedData()) { + flush(); + } + target.finishSendData(); + } + + private final static byte TRUE = (byte) 1; + private final static byte FALSE = (byte) 0; + + @Override + public final void writeBoolean(boolean v) throws IOException { + checkSizeAndFlushNecessary(1); + buffer.put(v ? TRUE : FALSE); + } + + @Override + public final void writeByte(int v) throws IOException { + checkSizeAndFlushNecessary(1); + buffer.put((byte) v); + } + + @Override + public final void writeShort(int v) throws IOException { + checkSizeAndFlushNecessary(2); + buffer.putShort((short) v); + } + + @Override + public final void writeChar(int v) throws IOException { + checkSizeAndFlushNecessary(2); + buffer.put((byte) ((v >>> 8) & 0xFF)); + buffer.put((byte) ((v >>> 0) & 0xFF)); + } + + @Override + public final void writeInt(int v) throws IOException { + checkSizeAndFlushNecessary(4); + buffer.putInt(v); + } + + @Override + public final void writeLong(long v) throws IOException { + checkSizeAndFlushNecessary(8); + buffer.putLong(v); + } + + @Override + public final void writeFloat(float v) throws IOException { + checkSizeAndFlushNecessary(4); + writeInt(Float.floatToIntBits(v)); + } + + @Override + public final void writeDouble(double v) throws IOException { + checkSizeAndFlushNecessary(8); + writeLong(Double.doubleToLongBits(v)); + } + + @Override + public final void writeBytes(String s) throws IOException { + final int len = s.length(); + + int remain = len; + int offset = 0; + while (remain > 0) { + int currentFlush = 0; + if (buffer.remaining() > 0) { + currentFlush = Math.min(buffer.remaining(), remain); + + for (int i = 0; i < currentFlush; i++) { + buffer.put((byte) s.charAt(offset + i)); + } + + remain -= currentFlush; + offset += currentFlush; + } else { + flush(); + } + } + } + + @Override + public final void writeChars(String s) throws IOException { + final int len = s.length(); + + int remain = len; + int offset = 0; + + while (remain > 0) { + int currentFlush = 0; + if (buffer.remaining() > 2) { + currentFlush = Math.min(buffer.remaining() / 2, remain); + + for (int i = 0; i < currentFlush; i++) { + buffer.putChar(s.charAt(offset + i)); + } + + remain -= currentFlush; + offset += currentFlush; + } else { + flush(); + } + } + } + + @Override + public final void writeUTF(String str) throws IOException { + writeUTF(str, this); + } + + private int writeUTF(String str, DataOutput out) throws IOException { + final int strlen = str.length(); + int utflen = 0; + int c, count = 0; + + /* use charAt instead of copying String to char array */ + for (int i = 0; i < strlen; i++) { + c = str.charAt(i); + if ((c >= 0x0001) && (c <= 0x007F)) { + utflen++; + } else if (c > 0x07FF) { + utflen += 3; + } else { + utflen += 2; + } + } + + if (utflen > 65535) { + throw new UTFDataFormatException("encoded string too long: " + utflen + " bytes"); + } + + final byte[] bytearr = new byte[utflen + 2]; + bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF); + bytearr[count++] = (byte) ((utflen >>> 0) & 0xFF); + + int i = 0; + for (i = 0; i < strlen; i++) { + c = str.charAt(i); + if (!((c >= 0x0001) && (c <= 0x007F))) { + break; + } + bytearr[count++] = (byte) c; + } + + for (; i < strlen; i++) { + c = str.charAt(i); + if ((c >= 0x0001) && (c <= 0x007F)) { + bytearr[count++] = (byte) c; + + } else if (c > 0x07FF) { + bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F)); + bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F)); + bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F)); + } else { + bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F)); + bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F)); + } + } + write(bytearr, 0, utflen + 2); + return utflen + 2; + } + + @Override + public boolean hasUnFlushedData() { + return !(buffer.position() == 0); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DataInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DataInputStream.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DataInputStream.java new file mode 100644 index 0000000..0c1bc31 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DataInputStream.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask.buffer; + +import java.io.DataInput; +import java.io.InputStream; + +public abstract class DataInputStream extends InputStream implements DataInput { + public abstract boolean hasUnReadData(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DataOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DataOutputStream.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DataOutputStream.java new file mode 100644 index 0000000..6cd6782 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DataOutputStream.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask.buffer; + +import java.io.DataOutput; +import java.io.IOException; +import java.io.OutputStream; + +public abstract class DataOutputStream extends OutputStream implements DataOutput { + + /** + * Check whether this buffer has enough space to store length of bytes + * + * @param length + * , length of bytes + * @return + * @throws IOException + */ + public abstract boolean shortOfSpace(int length) throws IOException; + + /** + * Check whether there is unflushed data stored in the stream + * + * @return + */ + public abstract boolean hasUnFlushedData(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DirectBufferPool.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DirectBufferPool.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DirectBufferPool.java new file mode 100644 index 0000000..bd3c6bb --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DirectBufferPool.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask.buffer; + +import java.io.IOException; +import java.lang.ref.WeakReference; +import java.nio.ByteBuffer; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + + +/** + * as direct buffer memory is not collected by GC, we keep a pool + * to reuse direct buffers + */ +public class DirectBufferPool { + + private static DirectBufferPool directBufferPool = null; + private static Log LOG = LogFactory.getLog(DirectBufferPool.class); + private ConcurrentMap<Integer, Queue<WeakReference<ByteBuffer>>> bufferMap = new ConcurrentHashMap<Integer, Queue<WeakReference<ByteBuffer>>>(); + + private DirectBufferPool() { + } + + public static synchronized DirectBufferPool getInstance() { + if (null == directBufferPool) { + directBufferPool = new DirectBufferPool(); + } + return directBufferPool; + } + + public static void destoryInstance(){ + directBufferPool = null; + } + + public synchronized ByteBuffer borrowBuffer(int capacity) throws IOException { + Queue<WeakReference<ByteBuffer>> list = bufferMap.get(capacity); + if (null == list) { + return ByteBuffer.allocateDirect(capacity); + } + WeakReference<ByteBuffer> ref; + while ((ref = list.poll()) != null) { + ByteBuffer buf = ref.get(); + if (buf != null) { + return buf; + } + } + return ByteBuffer.allocateDirect(capacity); + } + + public void returnBuffer(ByteBuffer buffer) throws IOException { + if (null == buffer || !buffer.isDirect()) { + throw new IOException("the buffer is null or the buffer returned is not direct buffer"); + } + + buffer.clear(); + int capacity = buffer.capacity(); + Queue<WeakReference<ByteBuffer>> list = bufferMap.get(capacity); + if (null == list) { + list = new ConcurrentLinkedQueue<WeakReference<ByteBuffer>>(); + Queue<WeakReference<ByteBuffer>> prev = bufferMap.putIfAbsent(capacity, list); + if (prev != null) { + list = prev; + } + } + list.add(new WeakReference<ByteBuffer>(buffer)); + } + + int getBufCountsForCapacity(int capacity) { + return bufferMap.get(capacity).size(); + } + +}