MAPREDUCE-6055. native-task: findbugs, interface annotations, and other misc cleanup
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1081d9ce Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1081d9ce Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1081d9ce Branch: refs/heads/MR-2841 Commit: 1081d9cee23aa661d7c9165bc9855865a38b528e Parents: cce7d1e Author: Todd Lipcon <t...@apache.org> Authored: Wed Sep 3 12:02:47 2014 -0700 Committer: Todd Lipcon <t...@apache.org> Committed: Wed Sep 3 12:22:38 2014 -0700 ---------------------------------------------------------------------- .../CHANGES.MAPREDUCE-2841.txt | 1 + .../hadoop/mapred/nativetask/Command.java | 8 +++++ .../mapred/nativetask/CommandDispatcher.java | 11 ++----- .../hadoop/mapred/nativetask/Constants.java | 18 ++++++----- .../hadoop/mapred/nativetask/DataChannel.java | 3 ++ .../hadoop/mapred/nativetask/DataReceiver.java | 3 ++ .../mapred/nativetask/HadoopPlatform.java | 6 ++-- .../mapred/nativetask/ICombineHandler.java | 3 ++ .../mapred/nativetask/INativeComparable.java | 5 +++ .../mapred/nativetask/INativeHandler.java | 2 ++ .../mapred/nativetask/NativeBatchProcessor.java | 3 +- .../mapred/nativetask/NativeDataSource.java | 10 ++---- .../mapred/nativetask/NativeDataTarget.java | 14 +++------ .../NativeMapOutputCollectorDelegator.java | 11 ++++--- .../hadoop/mapred/nativetask/NativeRuntime.java | 10 +++--- .../hadoop/mapred/nativetask/Platform.java | 15 ++++----- .../hadoop/mapred/nativetask/Platforms.java | 7 +++-- .../mapred/nativetask/StatusReportChecker.java | 33 ++++++++++---------- .../hadoop/mapred/nativetask/TaskContext.java | 29 +++++++++-------- .../mapred/nativetask/buffer/BufferType.java | 5 +-- .../nativetask/buffer/ByteBufferDataReader.java | 10 ++---- .../nativetask/buffer/ByteBufferDataWriter.java | 10 +++--- .../nativetask/buffer/DataInputStream.java | 2 ++ .../nativetask/buffer/DataOutputStream.java | 11 +++---- .../mapred/nativetask/buffer/InputBuffer.java | 2 ++ .../mapred/nativetask/buffer/OutputBuffer.java | 2 ++ .../nativetask/handlers/BufferPullee.java | 4 ++- .../nativetask/handlers/BufferPuller.java | 26 ++++++++++++--- .../nativetask/handlers/BufferPushee.java | 2 ++ .../nativetask/handlers/BufferPusher.java | 3 +- .../nativetask/handlers/CombinerHandler.java | 19 +++++------ .../mapred/nativetask/handlers/IDataLoader.java | 3 ++ .../handlers/NativeCollectorOnlyHandler.java | 22 ++++++++----- .../serde/BoolWritableSerializer.java | 2 ++ .../serde/ByteWritableSerializer.java | 2 ++ .../serde/BytesWritableSerializer.java | 2 ++ .../nativetask/serde/DefaultSerializer.java | 4 +++ .../serde/DoubleWritableSerializer.java | 2 ++ .../serde/FloatWritableSerializer.java | 3 ++ .../mapred/nativetask/serde/IKVSerializer.java | 12 ++++--- .../nativetask/serde/INativeSerializer.java | 4 +++ .../nativetask/serde/IntWritableSerializer.java | 2 ++ .../mapred/nativetask/serde/KVSerializer.java | 17 ++++++---- .../serde/LongWritableSerializer.java | 2 ++ .../nativetask/serde/NativeSerialization.java | 2 ++ .../serde/NullWritableSerializer.java | 2 ++ .../serde/SerializationFramework.java | 3 ++ .../mapred/nativetask/serde/TextSerializer.java | 2 ++ .../serde/VIntWritableSerializer.java | 3 +- .../serde/VLongWritableSerializer.java | 3 +- .../mapred/nativetask/util/BytesUtil.java | 2 ++ .../mapred/nativetask/util/ConfigUtil.java | 5 ++- .../nativetask/util/LocalJobOutputFiles.java | 2 ++ .../nativetask/util/NativeTaskOutput.java | 2 ++ .../nativetask/util/NativeTaskOutputFiles.java | 3 +- .../mapred/nativetask/util/OutputUtil.java | 5 +-- .../mapred/nativetask/util/ReadWriteBuffer.java | 5 +-- .../mapred/nativetask/util/SizedWritable.java | 4 ++- .../mapred/nativetask/TestTaskContext.java | 4 +-- 59 files changed, 264 insertions(+), 148 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt b/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt index 7c9558e..269a2f6 100644 --- a/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt +++ b/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt @@ -19,3 +19,4 @@ MAPREDUCE-5977. Fix or suppress native-task gcc warnings (Manu Zhang via todd) MAPREDUCE-6054. native-task: Speed up tests (todd) MAPREDUCE-6058. native-task: KVTest and LargeKVTest should check mr job is sucessful (Binglin Chang) MAPREDUCE-6056. native-task: move system test working dir to target dir and cleanup test config xml files (Manu Zhang via bchang) +MAPREDUCE-6055. native-task: findbugs, interface annotations, and other misc cleanup (todd) http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Command.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Command.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Command.java index 80a5658..6256fd1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Command.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Command.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.mapred.nativetask; +import org.apache.hadoop.classification.InterfaceAudience; + +@InterfaceAudience.Private public class Command { private int id; @@ -46,4 +49,9 @@ public class Command { } return false; } + + @Override + public int hashCode() { + return id; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/CommandDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/CommandDispatcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/CommandDispatcher.java index 52af730..7984794 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/CommandDispatcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/CommandDispatcher.java @@ -19,20 +19,15 @@ package org.apache.hadoop.mapred.nativetask; import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer; /** * a CommandDispatcher receives {@link Command} from upstream * and performs corresponding operations */ +@InterfaceAudience.Private public interface CommandDispatcher { - - /** - * - * @param command - * @param parameter - * @return - * @throws IOException - */ public ReadWriteBuffer onCall(Command command, ReadWriteBuffer parameter) throws IOException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Constants.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Constants.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Constants.java index e71326f..ff1a95c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Constants.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Constants.java @@ -15,9 +15,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.mapred.nativetask; +import org.apache.hadoop.classification.InterfaceAudience; + +@InterfaceAudience.Private public class Constants { public static final String MAP_SORT_CLASS = "map.sort.class"; @@ -40,17 +42,17 @@ public class Constants { public static final String NATIVE_OUTPUT_FILE_NAME = "native.output.file.name"; public static final String NATIVE_PROCESSOR_BUFFER_KB = "native.processor.buffer.kb"; - public static int NATIVE_PROCESSOR_BUFFER_KB_DEFAULT = 64; - public static int NATIVE_ASYNC_PROCESSOR_BUFFER_KB_DEFAULT = 1024; + public static final int NATIVE_PROCESSOR_BUFFER_KB_DEFAULT = 64; + public static final int NATIVE_ASYNC_PROCESSOR_BUFFER_KB_DEFAULT = 1024; public static final String NATIVE_STATUS_UPDATE_INTERVAL = "native.update.interval"; - public static int NATIVE_STATUS_UPDATE_INTERVAL_DEFVAL = 3000; + public static final int NATIVE_STATUS_UPDATE_INTERVAL_DEFVAL = 3000; public static final String SERIALIZATION_FRAMEWORK = "SerializationFramework"; - public static int SIZEOF_PARTITION_LENGTH = 4; - public static int SIZEOF_KEY_LENGTH = 4; - public static int SIZEOF_VALUE_LENGTH = 4; - public static int SIZEOF_KV_LENGTH = SIZEOF_KEY_LENGTH + SIZEOF_VALUE_LENGTH; + public static final int SIZEOF_PARTITION_LENGTH = 4; + public static final int SIZEOF_KEY_LENGTH = 4; + public static final int SIZEOF_VALUE_LENGTH = 4; + public static final int SIZEOF_KV_LENGTH = SIZEOF_KEY_LENGTH + SIZEOF_VALUE_LENGTH; public static final String NATIVE_CLASS_LIBRARY = "native.class.library"; public static final String NATIVE_CLASS_LIBRARY_CUSTOM = "native.class.library.custom"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/DataChannel.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/DataChannel.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/DataChannel.java index e8132bd..03469ee 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/DataChannel.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/DataChannel.java @@ -18,6 +18,9 @@ package org.apache.hadoop.mapred.nativetask; +import org.apache.hadoop.classification.InterfaceAudience; + +@InterfaceAudience.Private public enum DataChannel { /** * We will only read data from this channel http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/DataReceiver.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/DataReceiver.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/DataReceiver.java index c47cdac..85f78ca 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/DataReceiver.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/DataReceiver.java @@ -20,10 +20,13 @@ package org.apache.hadoop.mapred.nativetask; import java.io.IOException; +import org.apache.hadoop.classification.InterfaceAudience; + /** * a DataReceiver pulls in arriving data, an example * is {@link org.apache.hadoop.mapred.nativetask.handlers.BufferPuller} */ +@InterfaceAudience.Private public interface DataReceiver { /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/HadoopPlatform.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/HadoopPlatform.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/HadoopPlatform.java index 7599bb8..da6f252 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/HadoopPlatform.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/HadoopPlatform.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.ByteWritable; import org.apache.hadoop.io.BytesWritable; @@ -36,6 +37,7 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.nativetask.serde.*; +@InterfaceAudience.Private public class HadoopPlatform extends Platform { private static final Log LOG = LogFactory.getLog(HadoopPlatform.class); @@ -61,7 +63,7 @@ public class HadoopPlatform extends Platform { } @Override - public boolean support(String keyClassName, INativeSerializer serializer, JobConf job) { + public boolean support(String keyClassName, INativeSerializer<?> serializer, JobConf job) { if (keyClassNames.contains(keyClassName) && serializer instanceof INativeComparable) { return true; @@ -71,7 +73,7 @@ public class HadoopPlatform extends Platform { } @Override - public boolean define(Class comparatorClass) { + public boolean define(Class<?> comparatorClass) { return false; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/ICombineHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/ICombineHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/ICombineHandler.java index 8f50863..e4e706b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/ICombineHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/ICombineHandler.java @@ -19,9 +19,12 @@ package org.apache.hadoop.mapred.nativetask; import java.io.IOException; +import org.apache.hadoop.classification.InterfaceAudience; + /** * interacts with native side to support Java Combiner */ +@InterfaceAudience.Private public interface ICombineHandler { /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/INativeComparable.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/INativeComparable.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/INativeComparable.java index ae07f3b..e69248f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/INativeComparable.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/INativeComparable.java @@ -18,6 +18,9 @@ package org.apache.hadoop.mapred.nativetask; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + /** * * Any key type that is comparable at native side must implement this interface @@ -45,6 +48,8 @@ package org.apache.hadoop.mapred.nativetask; * return NativeObjectFactory::BytesComparator(src + 4, sl, dest + 4, dl); * } */ +@InterfaceAudience.Public +@InterfaceStability.Evolving public interface INativeComparable { } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/INativeHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/INativeHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/INativeHandler.java index 3e6e3ac..9506c7f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/INativeHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/INativeHandler.java @@ -19,12 +19,14 @@ package org.apache.hadoop.mapred.nativetask; import java.io.IOException; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer; /** * A Handler accept input, and give output can be used to transfer command and data */ +@InterfaceAudience.Private public interface INativeHandler extends NativeDataTarget, NativeDataSource { public String name(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeBatchProcessor.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeBatchProcessor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeBatchProcessor.java index 837da0e..3c3811f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeBatchProcessor.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeBatchProcessor.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.mapred.nativetask.buffer.BufferType; @@ -30,11 +31,11 @@ import org.apache.hadoop.mapred.nativetask.buffer.InputBuffer; import org.apache.hadoop.mapred.nativetask.buffer.OutputBuffer; import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer; import org.apache.hadoop.mapred.nativetask.util.ConfigUtil; -import org.apache.hadoop.util.DirectBufferPool; /** * used to create channel, transfer data and command between Java and native */ +@InterfaceAudience.Private public class NativeBatchProcessor implements INativeHandler { private static Log LOG = LogFactory.getLog(NativeBatchProcessor.class); http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeDataSource.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeDataSource.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeDataSource.java index d0bfa93..f802b3f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeDataSource.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeDataSource.java @@ -20,33 +20,27 @@ package org.apache.hadoop.mapred.nativetask; import java.io.IOException; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.mapred.nativetask.buffer.InputBuffer; -import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer; /** * NativeDataSource loads data from upstream */ +@InterfaceAudience.Private public interface NativeDataSource { /** * get input buffer - * - * @return */ public InputBuffer getInputBuffer(); /** * set listener. When data from upstream arrives, the listener will be activated. - * - * @param handler */ void setDataReceiver(DataReceiver handler); /** * load data from upstream - * - * @return - * @throws IOException */ public void loadData() throws IOException; http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeDataTarget.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeDataTarget.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeDataTarget.java index 7a780eb..d91070c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeDataTarget.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeDataTarget.java @@ -20,31 +20,27 @@ package org.apache.hadoop.mapred.nativetask; import java.io.IOException; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.mapred.nativetask.buffer.OutputBuffer; /** * NativeDataTarge sends data to downstream */ +@InterfaceAudience.Private public interface NativeDataTarget { /** - * send a signal to indicate that the data has been stored in output buffer - * - * @throws IOException + * Sends a signal to indicate that the data has been stored in output buffer */ public void sendData() throws IOException; /** - * Send a signal that there is no more data - * - * @throws IOException + * Sends a signal that there is no more data */ public void finishSendData() throws IOException; /** - * get the output buffer. - * - * @return + * Gets the output buffer. */ public OutputBuffer getOutputBuffer(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeMapOutputCollectorDelegator.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeMapOutputCollectorDelegator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeMapOutputCollectorDelegator.java index beb3851..224b95b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeMapOutputCollectorDelegator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeMapOutputCollectorDelegator.java @@ -17,13 +17,13 @@ */ package org.apache.hadoop.mapred.nativetask; -import java.io.File; import java.io.IOException; import com.google.common.base.Charsets; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.mapred.InvalidJobConfException; import org.apache.hadoop.mapred.JobConf; @@ -34,13 +34,12 @@ import org.apache.hadoop.mapred.nativetask.serde.INativeSerializer; import org.apache.hadoop.mapred.nativetask.serde.NativeSerialization; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; import org.apache.hadoop.util.QuickSort; -import org.apache.hadoop.util.RunJar; /** * native map output collector wrapped in Java interface */ +@InterfaceAudience.Private public class NativeMapOutputCollectorDelegator<K, V> implements MapOutputCollector<K, V> { private static Log LOG = LogFactory.getLog(NativeMapOutputCollectorDelegator.class); @@ -67,6 +66,7 @@ public class NativeMapOutputCollectorDelegator<K, V> implements MapOutputCollect handler.flush(); } + @SuppressWarnings("unchecked") @Override public void init(Context context) throws IOException, ClassNotFoundException { this.job = context.getJobConf(); @@ -79,7 +79,8 @@ public class NativeMapOutputCollectorDelegator<K, V> implements MapOutputCollect throw new InvalidJobConfException(message); } - Class comparatorClass = job.getClass(MRJobConfig.KEY_COMPARATOR, null, RawComparator.class); + Class<?> comparatorClass = job.getClass(MRJobConfig.KEY_COMPARATOR, null, + RawComparator.class); if (comparatorClass != null && !Platforms.define(comparatorClass)) { String message = "Native output collector doesn't support customized java comparator " + job.get(MRJobConfig.KEY_COMPARATOR); http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeRuntime.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeRuntime.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeRuntime.java index 5ce3c89..395cc89 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeRuntime.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeRuntime.java @@ -23,6 +23,7 @@ import java.io.IOException; import com.google.common.base.Charsets; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.FloatWritable; @@ -34,11 +35,12 @@ import org.apache.hadoop.mapred.nativetask.util.ConfigUtil; import org.apache.hadoop.util.VersionInfo; /** - * This class stands for the native runtime It has three functions: 1. Create native handlers for map, reduce, - * outputcollector, and etc 2. Configure native task with provided MR configs 3. Provide file system api to native - * space, so that it can use File system like HDFS. - * + * This class stands for the native runtime It has three functions: + * 1. Create native handlers for map, reduce, outputcollector, etc + * 2. Configure native task with provided MR configs + * 3. Provide file system api to native space, so that it can use File system like HDFS. */ +@InterfaceAudience.Private public class NativeRuntime { private static Log LOG = LogFactory.getLog(NativeRuntime.class); private static boolean nativeLibraryLoaded = false; http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Platform.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Platform.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Platform.java index aef53ce..7e41989 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Platform.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Platform.java @@ -18,13 +18,11 @@ package org.apache.hadoop.mapred.nativetask; import java.io.IOException; -import java.util.HashMap; import java.util.HashSet; -import java.util.Map; import java.util.Set; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapred.InvalidJobConfException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.nativetask.serde.INativeSerializer; import org.apache.hadoop.mapred.nativetask.serde.NativeSerialization; @@ -39,6 +37,8 @@ import org.apache.hadoop.mapred.nativetask.serde.NativeSerialization; * that supports all key types of Hadoop and users could implement their custom * platform. */ +@InterfaceAudience.Public +@InterfaceStability.Evolving public abstract class Platform { private final NativeSerialization serialization; protected Set<String> keyClassNames = new HashSet<String>(); @@ -67,7 +67,7 @@ public abstract class Platform { * @param key key serializer class * @throws IOException */ - protected void registerKey(String keyClassName, Class key) throws IOException { + protected void registerKey(String keyClassName, Class<?> key) throws IOException { serialization.register(keyClassName, key); keyClassNames.add(keyClassName); } @@ -85,7 +85,8 @@ public abstract class Platform { * @return true if the platform has implemented native comparators of the key and * false otherwise */ - protected abstract boolean support(String keyClassName, INativeSerializer serializer, JobConf job); + protected abstract boolean support(String keyClassName, + INativeSerializer<?> serializer, JobConf job); /** @@ -98,5 +99,5 @@ public abstract class Platform { * @param keyComparator comparator set with mapreduce.job.output.key.comparator.class * @return */ - protected abstract boolean define(Class keyComparator); + protected abstract boolean define(Class<?> keyComparator); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Platforms.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Platforms.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Platforms.java index d0a8496..9fad3a5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Platforms.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Platforms.java @@ -22,6 +22,7 @@ import java.util.ServiceLoader; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.nativetask.serde.INativeSerializer; @@ -33,6 +34,7 @@ import org.apache.hadoop.mapred.nativetask.serde.NativeSerialization; * it is also the facade to check for key type support and other * platform methods */ +@InterfaceAudience.Private public class Platforms { private static final Log LOG = LogFactory.getLog(Platforms.class); @@ -48,7 +50,8 @@ public class Platforms { } } - public static boolean support(String keyClassName, INativeSerializer serializer, JobConf job) { + public static boolean support(String keyClassName, + INativeSerializer<?> serializer, JobConf job) { synchronized (platforms) { for (Platform platform : platforms) { if (platform.support(keyClassName, serializer, job)) { @@ -61,7 +64,7 @@ public class Platforms { return false; } - public static boolean define(Class keyComparator) { + public static boolean define(Class<?> keyComparator) { synchronized (platforms) { for (Platform platform : platforms) { if (platform.define(keyComparator)) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/StatusReportChecker.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/StatusReportChecker.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/StatusReportChecker.java index 1fa59dc..f152074 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/StatusReportChecker.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/StatusReportChecker.java @@ -21,17 +21,18 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.mapred.Task.Counter; import org.apache.hadoop.mapred.Task.TaskReporter; +import org.apache.hadoop.mapreduce.TaskCounter; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter; /** * Will periodically check status from native and report to MR framework. * */ -public class StatusReportChecker implements Runnable { +class StatusReportChecker implements Runnable { private static Log LOG = LogFactory.getLog(StatusReportChecker.class); - public static int INTERVAL = 1000; // milli-seconds + public static final int INTERVAL = 1000; // milliseconds private Thread checker; private final TaskReporter reporter; @@ -68,19 +69,19 @@ public class StatusReportChecker implements Runnable { } protected void initUsedCounters() { - reporter.getCounter(Counter.MAP_INPUT_RECORDS); - reporter.getCounter(Counter.MAP_OUTPUT_RECORDS); - reporter.getCounter(Counter.MAP_INPUT_BYTES); - reporter.getCounter(Counter.MAP_OUTPUT_BYTES); - reporter.getCounter(Counter.MAP_OUTPUT_MATERIALIZED_BYTES); - reporter.getCounter(Counter.COMBINE_INPUT_RECORDS); - reporter.getCounter(Counter.COMBINE_OUTPUT_RECORDS); - reporter.getCounter(Counter.REDUCE_INPUT_RECORDS); - reporter.getCounter(Counter.REDUCE_OUTPUT_RECORDS); - reporter.getCounter(Counter.REDUCE_INPUT_GROUPS); - reporter.getCounter(Counter.SPILLED_RECORDS); - reporter.getCounter(Counter.MAP_OUTPUT_BYTES); - reporter.getCounter(Counter.MAP_OUTPUT_RECORDS); + reporter.getCounter(TaskCounter.MAP_INPUT_RECORDS); + reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS); + reporter.getCounter(FileInputFormatCounter.BYTES_READ); + reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES); + reporter.getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES); + reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS); + reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS); + reporter.getCounter(TaskCounter.REDUCE_INPUT_RECORDS); + reporter.getCounter(TaskCounter.REDUCE_OUTPUT_RECORDS); + reporter.getCounter(TaskCounter.REDUCE_INPUT_GROUPS); + reporter.getCounter(TaskCounter.SPILLED_RECORDS); + reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES); + reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS); } public synchronized void start() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/TaskContext.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/TaskContext.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/TaskContext.java index a4430ed..555ced1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/TaskContext.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/TaskContext.java @@ -17,20 +17,23 @@ */ package org.apache.hadoop.mapred.nativetask; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Task.TaskReporter; import org.apache.hadoop.mapred.TaskAttemptID; +@InterfaceAudience.Private public class TaskContext { private final JobConf conf; - private Class iKClass; - private Class iVClass; - private Class oKClass; - private Class oVClass; + private Class<?> iKClass; + private Class<?> iVClass; + private Class<?> oKClass; + private Class<?> oVClass; private final TaskReporter reporter; private final TaskAttemptID taskAttemptID; - public TaskContext(JobConf conf, Class iKClass, Class iVClass, Class oKClass, Class oVClass, TaskReporter reporter, + public TaskContext(JobConf conf, Class<?> iKClass, Class<?> iVClass, + Class<?> oKClass, Class<?> oVClass, TaskReporter reporter, TaskAttemptID id) { this.conf = conf; this.iKClass = iKClass; @@ -41,35 +44,35 @@ public class TaskContext { this.taskAttemptID = id; } - public Class getInputKeyClass() { + public Class<?> getInputKeyClass() { return iKClass; } - public void setInputKeyClass(Class klass) { + public void setInputKeyClass(Class<?> klass) { this.iKClass = klass; } - public Class getInputValueClass() { + public Class<?> getInputValueClass() { return iVClass; } - public void setInputValueClass(Class klass) { + public void setInputValueClass(Class<?> klass) { this.iVClass = klass; } - public Class getOuputKeyClass() { + public Class<?> getOutputKeyClass() { return this.oKClass; } - public void setOutputKeyClass(Class klass) { + public void setOutputKeyClass(Class<?> klass) { this.oKClass = klass; } - public Class getOutputValueClass() { + public Class<?> getOutputValueClass() { return this.oVClass; } - public void setOutputValueClass(Class klass) { + public void setOutputValueClass(Class<?> klass) { this.oVClass = klass; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/BufferType.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/BufferType.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/BufferType.java index a52be1f..bbdcd54 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/BufferType.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/BufferType.java @@ -18,9 +18,10 @@ package org.apache.hadoop.mapred.nativetask.buffer; -public enum BufferType { +import org.apache.hadoop.classification.InterfaceAudience; +@InterfaceAudience.Private +public enum BufferType { DIRECT_BUFFER, - HEAP_BUFFER }; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataReader.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataReader.java index 24f402d..72c65f5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataReader.java @@ -17,22 +17,17 @@ */ package org.apache.hadoop.mapred.nativetask.buffer; -import com.google.common.base.Charsets; - -import java.io.DataInput; import java.io.EOFException; import java.io.IOException; -import java.io.InputStream; -import java.io.PushbackInputStream; -import java.io.UTFDataFormatException; import java.nio.ByteBuffer; +import org.apache.hadoop.classification.InterfaceAudience; /** * read data from a input buffer */ +@InterfaceAudience.Private public class ByteBufferDataReader extends DataInputStream { private ByteBuffer byteBuffer; - private char lineCache[]; private java.io.DataInputStream javaReader; public ByteBufferDataReader(InputBuffer buffer) { @@ -130,6 +125,7 @@ public class ByteBufferDataReader extends DataInputStream { return byteBuffer.getDouble(); } + @SuppressWarnings("deprecation") @Override public String readLine() throws IOException { return javaReader.readLine(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataWriter.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataWriter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataWriter.java index aa87e0d..3d8f78b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataWriter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataWriter.java @@ -17,23 +17,21 @@ */ package org.apache.hadoop.mapred.nativetask.buffer; -import java.io.DataOutput; import java.io.IOException; -import java.io.UTFDataFormatException; import java.nio.ByteBuffer; -import com.google.common.base.Charsets; -import com.google.common.base.Preconditions; -import com.google.common.primitives.UnsignedInteger; -import com.google.common.primitives.UnsignedInts; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.mapred.nativetask.NativeDataTarget; +import com.google.common.base.Preconditions; + /** * DataOutputStream implementation which buffers data in a fixed-size * ByteBuffer. * When the byte buffer has filled up, synchronously passes the buffer * to a downstream NativeDataTarget. */ +@InterfaceAudience.Private public class ByteBufferDataWriter extends DataOutputStream { private final ByteBuffer buffer; private final NativeDataTarget target; http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DataInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DataInputStream.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DataInputStream.java index 0c1bc31..be17ef4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DataInputStream.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DataInputStream.java @@ -19,7 +19,9 @@ package org.apache.hadoop.mapred.nativetask.buffer; import java.io.DataInput; import java.io.InputStream; +import org.apache.hadoop.classification.InterfaceAudience; +@InterfaceAudience.Private public abstract class DataInputStream extends InputStream implements DataInput { public abstract boolean hasUnReadData(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DataOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DataOutputStream.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DataOutputStream.java index 6cd6782..38f9dcb 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DataOutputStream.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DataOutputStream.java @@ -21,22 +21,19 @@ import java.io.DataOutput; import java.io.IOException; import java.io.OutputStream; -public abstract class DataOutputStream extends OutputStream implements DataOutput { +import org.apache.hadoop.classification.InterfaceAudience; +@InterfaceAudience.Private +public abstract class DataOutputStream extends OutputStream implements DataOutput { /** * Check whether this buffer has enough space to store length of bytes * - * @param length - * , length of bytes - * @return - * @throws IOException + * @param length length of bytes */ public abstract boolean shortOfSpace(int length) throws IOException; /** * Check whether there is unflushed data stored in the stream - * - * @return */ public abstract boolean hasUnFlushedData(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/InputBuffer.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/InputBuffer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/InputBuffer.java index 82193b8..2120c6c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/InputBuffer.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/InputBuffer.java @@ -19,12 +19,14 @@ package org.apache.hadoop.mapred.nativetask.buffer; import org.apache.hadoop.util.DirectBufferPool; +import org.apache.hadoop.classification.InterfaceAudience; import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; +@InterfaceAudience.Private public class InputBuffer implements Closeable { static DirectBufferPool bufferPool = new DirectBufferPool(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/OutputBuffer.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/OutputBuffer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/OutputBuffer.java index 3c54948..8f33946 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/OutputBuffer.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/OutputBuffer.java @@ -20,7 +20,9 @@ package org.apache.hadoop.mapred.nativetask.buffer; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import org.apache.hadoop.classification.InterfaceAudience; +@InterfaceAudience.Private public class OutputBuffer { protected ByteBuffer byteBuffer; private final BufferType type; http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPullee.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPullee.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPullee.java index 50d7816..1047bbf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPullee.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPullee.java @@ -19,6 +19,7 @@ package org.apache.hadoop.mapred.nativetask.handlers; import java.io.IOException; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.mapred.RawKeyValueIterator; import org.apache.hadoop.mapred.nativetask.Constants; import org.apache.hadoop.mapred.nativetask.NativeDataTarget; @@ -30,9 +31,10 @@ import org.apache.hadoop.mapred.nativetask.util.SizedWritable; /** * load data into a buffer signaled by a {@link BufferPuller} */ +@InterfaceAudience.Private public class BufferPullee<IK, IV> implements IDataLoader { - public static int KV_HEADER_LENGTH = Constants.SIZEOF_KV_LENGTH; + public static final int KV_HEADER_LENGTH = Constants.SIZEOF_KV_LENGTH; private final SizedWritable<IK> tmpInputKey; private final SizedWritable<IV> tmpInputValue; http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPuller.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPuller.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPuller.java index 704b664..18523ef 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPuller.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPuller.java @@ -1,10 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.mapred.nativetask.handlers; import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.mapred.RawKeyValueIterator; import org.apache.hadoop.mapred.nativetask.Constants; @@ -13,13 +32,12 @@ import org.apache.hadoop.mapred.nativetask.NativeDataSource; import org.apache.hadoop.mapred.nativetask.buffer.BufferType; import org.apache.hadoop.mapred.nativetask.buffer.ByteBufferDataReader; import org.apache.hadoop.mapred.nativetask.buffer.InputBuffer; -import org.apache.hadoop.mapred.nativetask.serde.SerializationFramework; -import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer; import org.apache.hadoop.util.Progress; /** * actively signal a {@link BufferPullee} to load data into buffer and receive */ +@InterfaceAudience.Private public class BufferPuller implements RawKeyValueIterator, DataReceiver { private static Log LOG = LogFactory.getLog(BufferPuller.class); @@ -108,8 +126,8 @@ public class BufferPuller implements RawKeyValueIterator, DataReceiver { valueBytes = new byte[valueLength]; } - nativeReader.read(keyBytes, 0, keyLength); - nativeReader.read(valueBytes, 0, valueLength); + IOUtils.readFully(nativeReader, keyBytes, 0, keyLength); + IOUtils.readFully(nativeReader, valueBytes, 0, valueLength); keyBuffer.reset(keyBytes, keyLength); valueBuffer.reset(valueBytes, valueLength); http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPushee.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPushee.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPushee.java index 8decad8..fe78325 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPushee.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPushee.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapred.nativetask.Constants; import org.apache.hadoop.mapred.nativetask.buffer.BufferType; @@ -34,6 +35,7 @@ import org.apache.hadoop.mapred.nativetask.util.SizedWritable; /** * collect data when signaled */ +@InterfaceAudience.Private public class BufferPushee<OK, OV> implements Closeable { private static Log LOG = LogFactory.getLog(BufferPushee.class); http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPusher.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPusher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPusher.java index 3713078..10d619b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPusher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPusher.java @@ -22,7 +22,7 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.io.Text; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.nativetask.NativeDataTarget; import org.apache.hadoop.mapred.nativetask.buffer.ByteBufferDataWriter; @@ -33,6 +33,7 @@ import org.apache.hadoop.mapred.nativetask.util.SizedWritable; /** * actively push data into a buffer and signal a {@link BufferPushee} to collect it */ +@InterfaceAudience.Private public class BufferPusher<K, V> implements OutputCollector<K, V> { private static Log LOG = LogFactory.getLog(BufferPusher.class); http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/CombinerHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/CombinerHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/CombinerHandler.java index 6a57683..dad7af9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/CombinerHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/CombinerHandler.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.mapred.nativetask.handlers; -import static org.apache.hadoop.mapred.Task.Counter.COMBINE_INPUT_RECORDS; - import java.io.IOException; import org.apache.commons.logging.Log; @@ -37,13 +35,13 @@ import org.apache.hadoop.mapred.nativetask.TaskContext; import org.apache.hadoop.mapred.nativetask.serde.SerializationFramework; import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.TaskCounter; -public class CombinerHandler<K, V> implements ICombineHandler, CommandDispatcher { - - public static String NAME = "NativeTask.CombineHandler"; +class CombinerHandler<K, V> implements ICombineHandler, CommandDispatcher { + public static final String NAME = "NativeTask.CombineHandler"; private static Log LOG = LogFactory.getLog(NativeCollectorOnlyHandler.class); - public static Command LOAD = new Command(1, "Load"); - public static Command COMBINE = new Command(4, "Combine"); + public static final Command LOAD = new Command(1, "Load"); + public static final Command COMBINE = new Command(4, "Combine"); public final CombinerRunner<K, V> combinerRunner; private final INativeHandler nativeHandler; @@ -66,13 +64,16 @@ public class CombinerHandler<K, V> implements ICombineHandler, CommandDispatcher LOG.info("NativeTask Combiner is enabled, class = " + combinerClazz); } - final Counter combineInputCounter = context.getTaskReporter().getCounter(COMBINE_INPUT_RECORDS); + final Counter combineInputCounter = context.getTaskReporter().getCounter( + TaskCounter.COMBINE_INPUT_RECORDS); final CombinerRunner<K, V> combinerRunner = CombinerRunner.create(conf, context.getTaskAttemptId(), combineInputCounter, context.getTaskReporter(), null); final INativeHandler nativeHandler = NativeBatchProcessor.create(NAME, conf, DataChannel.INOUT); - final BufferPusher<K, V> pusher = new BufferPusher<K, V>(context.getInputKeyClass(), context.getInputValueClass(), + @SuppressWarnings("unchecked") + final BufferPusher<K, V> pusher = new BufferPusher<K, V>((Class<K>)context.getInputKeyClass(), + (Class<V>)context.getInputValueClass(), nativeHandler); final BufferPuller puller = new BufferPuller(nativeHandler); return new CombinerHandler<K, V>(nativeHandler, combinerRunner, puller, pusher); http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/IDataLoader.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/IDataLoader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/IDataLoader.java index ff472a6..d2cbf1f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/IDataLoader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/IDataLoader.java @@ -20,9 +20,12 @@ package org.apache.hadoop.mapred.nativetask.handlers; import java.io.IOException; +import org.apache.hadoop.classification.InterfaceAudience; + /** * an IDataLoader loads data on demand */ +@InterfaceAudience.Private public interface IDataLoader { /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/NativeCollectorOnlyHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/NativeCollectorOnlyHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/NativeCollectorOnlyHandler.java index 678e13d..284c3dc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/NativeCollectorOnlyHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/NativeCollectorOnlyHandler.java @@ -23,6 +23,7 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.TaskAttemptID; @@ -41,14 +42,19 @@ import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer; * Java Record Reader + Java Mapper + Native Collector */ @SuppressWarnings("unchecked") +@InterfaceAudience.Private public class NativeCollectorOnlyHandler<K, V> implements CommandDispatcher, Closeable { - public static String NAME = "NativeTask.MCollectorOutputHandler"; + public static final String NAME = "NativeTask.MCollectorOutputHandler"; private static Log LOG = LogFactory.getLog(NativeCollectorOnlyHandler.class); - public static Command GET_OUTPUT_PATH = new Command(100, "GET_OUTPUT_PATH"); - public static Command GET_OUTPUT_INDEX_PATH = new Command(101, "GET_OUTPUT_INDEX_PATH"); - public static Command GET_SPILL_PATH = new Command(102, "GET_SPILL_PATH"); - public static Command GET_COMBINE_HANDLER = new Command(103, "GET_COMBINE_HANDLER"); + public static final Command GET_OUTPUT_PATH = + new Command(100, "GET_OUTPUT_PATH"); + public static final Command GET_OUTPUT_INDEX_PATH = + new Command(101, "GET_OUTPUT_INDEX_PATH"); + public static final Command GET_SPILL_PATH = + new Command(102, "GET_SPILL_PATH"); + public static final Command GET_COMBINE_HANDLER = + new Command(103, "GET_COMBINE_HANDLER"); private NativeTaskOutput output; private int spillNumber = 0; @@ -63,7 +69,7 @@ public class NativeCollectorOnlyHandler<K, V> implements CommandDispatcher, Clos ICombineHandler combinerHandler = null; try { final TaskContext combineContext = context.copyOf(); - combineContext.setInputKeyClass(context.getOuputKeyClass()); + combineContext.setInputKeyClass(context.getOutputKeyClass()); combineContext.setInputValueClass(context.getOutputValueClass()); combinerHandler = CombinerHandler.create(combineContext); @@ -76,7 +82,9 @@ public class NativeCollectorOnlyHandler<K, V> implements CommandDispatcher, Clos } final INativeHandler nativeHandler = NativeBatchProcessor.create(NAME, context.getConf(), DataChannel.OUT); - final BufferPusher<K, V> kvPusher = new BufferPusher<K, V>(context.getOuputKeyClass(), context.getOutputValueClass(), + final BufferPusher<K, V> kvPusher = new BufferPusher<K, V>( + (Class<K>)context.getOutputKeyClass(), + (Class<V>)context.getOutputValueClass(), nativeHandler); return new NativeCollectorOnlyHandler<K, V>(context, nativeHandler, kvPusher, combinerHandler); http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/BoolWritableSerializer.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/BoolWritableSerializer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/BoolWritableSerializer.java index 9a026be..a4e8a56 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/BoolWritableSerializer.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/BoolWritableSerializer.java @@ -20,9 +20,11 @@ package org.apache.hadoop.mapred.nativetask.serde; import java.io.IOException; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.nativetask.INativeComparable; +@InterfaceAudience.Private public class BoolWritableSerializer extends DefaultSerializer implements INativeComparable { http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/ByteWritableSerializer.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/ByteWritableSerializer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/ByteWritableSerializer.java index 1ec2fdb..b029e67 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/ByteWritableSerializer.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/ByteWritableSerializer.java @@ -20,9 +20,11 @@ package org.apache.hadoop.mapred.nativetask.serde; import java.io.IOException; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.nativetask.INativeComparable; +@InterfaceAudience.Private public class ByteWritableSerializer extends DefaultSerializer implements INativeComparable { http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/BytesWritableSerializer.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/BytesWritableSerializer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/BytesWritableSerializer.java index 2bd18d7..08264f9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/BytesWritableSerializer.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/BytesWritableSerializer.java @@ -22,9 +22,11 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapred.nativetask.INativeComparable; +@InterfaceAudience.Private public class BytesWritableSerializer implements INativeComparable, INativeSerializer<BytesWritable> { @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/DefaultSerializer.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/DefaultSerializer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/DefaultSerializer.java index d4fc7e0..fcef779 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/DefaultSerializer.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/DefaultSerializer.java @@ -24,8 +24,12 @@ import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.io.Writable; +@InterfaceAudience.Public +@InterfaceStability.Evolving public class DefaultSerializer implements INativeSerializer<Writable> { static class ModifiedByteArrayOutputStream extends ByteArrayOutputStream { http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/DoubleWritableSerializer.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/DoubleWritableSerializer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/DoubleWritableSerializer.java index 8de0fba..a171f6f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/DoubleWritableSerializer.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/DoubleWritableSerializer.java @@ -20,9 +20,11 @@ package org.apache.hadoop.mapred.nativetask.serde; import java.io.IOException; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.nativetask.INativeComparable; +@InterfaceAudience.Private public class DoubleWritableSerializer extends DefaultSerializer implements INativeComparable { http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/FloatWritableSerializer.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/FloatWritableSerializer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/FloatWritableSerializer.java index 4a2366c..bf2c959 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/FloatWritableSerializer.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/FloatWritableSerializer.java @@ -20,9 +20,12 @@ package org.apache.hadoop.mapred.nativetask.serde; import java.io.IOException; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.nativetask.INativeComparable; + +@InterfaceAudience.Private public class FloatWritableSerializer extends DefaultSerializer implements INativeComparable { http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/IKVSerializer.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/IKVSerializer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/IKVSerializer.java index 64c5810..8506d17 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/IKVSerializer.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/IKVSerializer.java @@ -20,6 +20,7 @@ package org.apache.hadoop.mapred.nativetask.serde; import java.io.IOException; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.mapred.nativetask.buffer.DataInputStream; import org.apache.hadoop.mapred.nativetask.buffer.DataOutputStream; import org.apache.hadoop.mapred.nativetask.util.SizedWritable; @@ -27,6 +28,7 @@ import org.apache.hadoop.mapred.nativetask.util.SizedWritable; /** * serializes key-value pair */ +@InterfaceAudience.Private public interface IKVSerializer { /** @@ -35,7 +37,7 @@ public interface IKVSerializer { * @param value * @throws IOException */ - public void updateLength(SizedWritable key, SizedWritable value) throws IOException; + public void updateLength(SizedWritable<?> key, SizedWritable<?> value) throws IOException; /** * @@ -45,7 +47,8 @@ public interface IKVSerializer { * @return bytes written * @throws IOException */ - public int serializeKV(DataOutputStream out, SizedWritable key, SizedWritable value) throws IOException; + public int serializeKV(DataOutputStream out, SizedWritable<?> key, + SizedWritable<?> value) throws IOException; /** * serialize partitionId as well @@ -56,7 +59,8 @@ public interface IKVSerializer { * @return * @throws IOException */ - public int serializePartitionKV(DataOutputStream out, int partitionId, SizedWritable key, SizedWritable value) + public int serializePartitionKV(DataOutputStream out, int partitionId, + SizedWritable<?> key, SizedWritable<?> value) throws IOException; /** @@ -67,5 +71,5 @@ public interface IKVSerializer { * @return bytes read * @throws IOException */ - public int deserializeKV(DataInputStream in, SizedWritable key, SizedWritable value) throws IOException; + public int deserializeKV(DataInputStream in, SizedWritable<?> key, SizedWritable<?> value) throws IOException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/INativeSerializer.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/INativeSerializer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/INativeSerializer.java index f61d12d..c970394 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/INativeSerializer.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/INativeSerializer.java @@ -21,6 +21,8 @@ package org.apache.hadoop.mapred.nativetask.serde; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; /** * an INativeSerializer serializes and deserializes data transferred between @@ -30,6 +32,8 @@ import java.io.IOException; * you have to make sure the native side can serialize it correctly. * */ +@InterfaceAudience.Public +@InterfaceStability.Evolving public interface INativeSerializer<T> { /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/IntWritableSerializer.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/IntWritableSerializer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/IntWritableSerializer.java index e7e19a9..e79d182 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/IntWritableSerializer.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/IntWritableSerializer.java @@ -20,9 +20,11 @@ package org.apache.hadoop.mapred.nativetask.serde; import java.io.IOException; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.nativetask.INativeComparable; +@InterfaceAudience.Private public class IntWritableSerializer extends DefaultSerializer implements INativeComparable { http://git-wip-us.apache.org/repos/asf/hadoop/blob/1081d9ce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/KVSerializer.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/KVSerializer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/KVSerializer.java index 4b76df4..d650a4d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/KVSerializer.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/KVSerializer.java @@ -22,18 +22,21 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.nativetask.Constants; import org.apache.hadoop.mapred.nativetask.buffer.DataInputStream; import org.apache.hadoop.mapred.nativetask.buffer.DataOutputStream; import org.apache.hadoop.mapred.nativetask.util.SizedWritable; -public class KVSerializer<K, V> implements IKVSerializer { +@InterfaceAudience.Private +public class KVSerializer<K, V> implements IKVSerializer { + private static final Log LOG = LogFactory.getLog(KVSerializer.class); - public static int KV_HEAD_LENGTH = Constants.SIZEOF_KV_LENGTH; + public static final int KV_HEAD_LENGTH = Constants.SIZEOF_KV_LENGTH; private final INativeSerializer<Writable> keySerializer; private final INativeSerializer<Writable> valueSerializer; @@ -45,19 +48,20 @@ public class KVSerializer<K, V> implements IKVSerializer { } @Override - public void updateLength(SizedWritable key, SizedWritable value) throws IOException { + public void updateLength(SizedWritable<?> key, SizedWritable<?> value) throws IOException { key.length = keySerializer.getLength(key.v); value.length = valueSerializer.getLength(value.v); return; } @Override - public int serializeKV(DataOutputStream out, SizedWritable key, SizedWritable value) throws IOException { + public int serializeKV(DataOutputStream out, SizedWritable<?> key, SizedWritable<?> value) throws IOException { return serializePartitionKV(out, -1, key, value); } @Override - public int serializePartitionKV(DataOutputStream out, int partitionId, SizedWritable key, SizedWritable value) + public int serializePartitionKV(DataOutputStream out, int partitionId, + SizedWritable<?> key, SizedWritable<?> value) throws IOException { if (key.length == SizedWritable.INVALID_LENGTH || value.length == SizedWritable.INVALID_LENGTH) { @@ -90,7 +94,8 @@ public class KVSerializer<K, V> implements IKVSerializer { } @Override - public int deserializeKV(DataInputStream in, SizedWritable key, SizedWritable value) throws IOException { + public int deserializeKV(DataInputStream in, SizedWritable<?> key, + SizedWritable<?> value) throws IOException { if (!in.hasUnReadData()) { return 0;