http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/util/BytesUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/util/BytesUtil.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/util/BytesUtil.java new file mode 100644 index 0000000..9653baa --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/util/BytesUtil.java @@ -0,0 +1,267 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.nativetask.util; + +import java.io.UnsupportedEncodingException; + +public class BytesUtil { + + public static final int SIZEOF_INT = Integer.SIZE / Byte.SIZE; + public static final int SIZEOF_LONG = Long.SIZE / Byte.SIZE; + + public static byte[] toBytes(String str) { + if (str == null) { + return null; + } + try { + return str.getBytes("utf-8"); + } catch (final UnsupportedEncodingException e) { + throw new RuntimeException(e.getMessage()); + } + } + + public static String fromBytes(byte[] data) { + if (data == null) { + return null; + } + try { + return new String(data, "utf-8"); + } catch (final UnsupportedEncodingException e) { + throw new RuntimeException(e.getMessage()); + } + } + + /** + * Converts a byte array to an int value + * @param bytes byte array + * @return the int value + */ + public static int toInt(byte[] bytes) { + return toInt(bytes, 0, SIZEOF_INT); + } + + /** + * Converts a byte array to an int value + * @param bytes byte array + * @param offset offset into array + * @param length length of int (has to be {@link #SIZEOF_INT}) + * @return the int value + * @throws RuntimeException if length is not {@link #SIZEOF_INT} or + * if there's not enough room in the array at the offset indicated. + */ + public static int toInt(byte[] bytes, int offset, final int length) { + if (length != SIZEOF_INT || offset + length > bytes.length) { + throw new RuntimeException( + "toInt exception. length not equals to SIZE of Int or buffer overflow"); + } + int n = 0; + for (int i = offset; i< offset + length; i++) { + n <<= 4; + n ^= bytes[i] & 0xff; + } + return n; + } + + /** + * Converts a byte array to a long value. + * @param bytes array + * @return the long value + */ + public static long toLong(byte[] bytes) { + return toLong(bytes, 0, SIZEOF_LONG); + } + + /** + * Converts a byte array to a long value. + * + * @param bytes array of bytes + * @param offset offset into array + * @return the long value + */ + public static long toLong(byte[] bytes, int offset) { + return toLong(bytes, offset, SIZEOF_LONG); + } + + /** + * Converts a byte array to a long value. + * + * @param bytes array of bytes + * @param offset offset into array + * @param length length of data (must be {@link #SIZEOF_LONG}) + * @return the long value + * @throws RuntimeException if length is not {@link #SIZEOF_LONG} or + * if there's not enough room in the array at the offset indicated. + */ + public static long toLong(byte[] bytes, int offset, final int length) { + if (length != SIZEOF_LONG || offset + length > bytes.length) { + throw new RuntimeException( + "toLong exception. length not equals to SIZE of Long or buffer overflow"); + } + long l = 0; + for (int i = offset; i < offset + length; i++) { + l <<= 8; + l ^= bytes[i] & 0xff; + } + return l; + } + + /** + * Presumes float encoded as IEEE 754 floating-point "single format" + * @param bytes byte array + * @return Float made from passed byte array. + */ + public static float toFloat(byte [] bytes) { + return toFloat(bytes, 0); + } + + /** + * Presumes float encoded as IEEE 754 floating-point "single format" + * @param bytes array to convert + * @param offset offset into array + * @return Float made from passed byte array. + */ + public static float toFloat(byte [] bytes, int offset) { + return Float.intBitsToFloat(toInt(bytes, offset, SIZEOF_INT)); + } + + /** + * @param bytes byte array + * @return Return double made from passed bytes. + */ + public static double toDouble(final byte [] bytes) { + return toDouble(bytes, 0); + } + + /** + * @param bytes byte array + * @param offset offset where double is + * @return Return double made from passed bytes. + */ + public static double toDouble(final byte [] bytes, final int offset) { + return Double.longBitsToDouble(toLong(bytes, offset, SIZEOF_LONG)); + } + + /** + * Write a printable representation of a byte array. + * + * @param b byte array + * @return string + * @see #toStringBinary(byte[], int, int) + */ + public static String toStringBinary(final byte [] b) { + if (b == null) + return "null"; + return toStringBinary(b, 0, b.length); + } + + /** + * Write a printable representation of a byte array. Non-printable + * characters are hex escaped in the format \\x%02X, eg: + * \x00 \x05 etc + * + * @param b array to write out + * @param off offset to start at + * @param len length to write + * @return string output + */ + public static String toStringBinary(final byte [] b, int off, int len) { + StringBuilder result = new StringBuilder(); + // Just in case we are passed a 'len' that is > buffer length... + if (off >= b.length) return result.toString(); + if (off + len > b.length) len = b.length - off; + for (int i = off; i < off + len ; ++i ) { + int ch = b[i] & 0xFF; + if ( (ch >= '0' && ch <= '9') + || (ch >= 'A' && ch <= 'Z') + || (ch >= 'a' && ch <= 'z') + || " `~!@#$%^&*()-_=+[]{}|;:'\",.<>/?".indexOf(ch) >= 0 ) { + result.append((char)ch); + } else { + result.append(String.format("\\x%02X", ch)); + } + } + return result.toString(); + } + + /** + * Convert a boolean to a byte array. True becomes -1 + * and false becomes 0. + * + * @param b value + * @return <code>b</code> encoded in a byte array. + */ + public static byte [] toBytes(final boolean b) { + return new byte[] { b ? (byte) -1 : (byte) 0 }; + } + + /** + * Convert an int value to a byte array. Big-endian. Same as what DataOutputStream.writeInt + * does. + * + * @param val value + * @return the byte array + */ + public static byte[] toBytes(int val) { + byte [] b = new byte[4]; + for(int i = 3; i > 0; i--) { + b[i] = (byte) val; + val >>>= 8; + } + b[0] = (byte) val; + return b; + } + + /** + * Convert a long value to a byte array using big-endian. + * + * @param val value to convert + * @return the byte array + */ + public static byte[] toBytes(long val) { + byte [] b = new byte[8]; + for (int i = 7; i > 0; i--) { + b[i] = (byte) val; + val >>>= 8; + } + b[0] = (byte) val; + return b; + } + + /** + * @param f float value + * @return the float represented as byte [] + */ + public static byte [] toBytes(final float f) { + // Encode it as int + return toBytes(Float.floatToRawIntBits(f)); + } + + /** + * Serialize a double as the IEEE 754 double format output. The resultant + * array will be 8 bytes long. + * + * @param d value + * @return the double represented as byte [] + */ + public static byte [] toBytes(final double d) { + // Encode it as a long + return toBytes(Double.doubleToRawLongBits(d)); + } + +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/util/ConfigUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/util/ConfigUtil.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/util/ConfigUtil.java new file mode 100644 index 0000000..50db44c --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/util/ConfigUtil.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask.util; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; + +public class ConfigUtil { + public static byte[][] toBytes(Configuration conf) { + List<byte[]> nativeConfigs = new ArrayList<byte[]>(); + for (Map.Entry<String, String> e : conf) { + nativeConfigs.add(BytesUtil.toBytes(e.getKey())); + nativeConfigs.add(BytesUtil.toBytes(e.getValue())); + } + return nativeConfigs.toArray(new byte[nativeConfigs.size()][]); + } + + public static String booleansToString(boolean[] value) { + StringBuilder sb = new StringBuilder(); + for (boolean b: value) { + sb.append(b ? 1 : 0); + } + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/util/LocalJobOutputFiles.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/util/LocalJobOutputFiles.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/util/LocalJobOutputFiles.java new file mode 100644 index 0000000..77eae50 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/util/LocalJobOutputFiles.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask.util; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.TaskID; + +public class LocalJobOutputFiles implements NativeTaskOutput { + + static final String TASKTRACKER_OUTPUT = "output"; + static final String REDUCE_INPUT_FILE_FORMAT_STRING = "%s/map_%d.out"; + static final String SPILL_FILE_FORMAT_STRING = "%s/spill%d.out"; + static final String SPILL_INDEX_FILE_FORMAT_STRING = "%s/spill%d.out.index"; + static final String OUTPUT_FILE_FORMAT_STRING = "%s/file.out"; + static final String OUTPUT_FILE_INDEX_FORMAT_STRING = "%s/file.out.index"; + + private JobConf conf; + private LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir"); + + public LocalJobOutputFiles(Configuration conf, String id) { + this.conf = new JobConf(conf); + } + + /** + * Return the path to local map output file created earlier + * + * @return path + * @throws IOException + */ + public Path getOutputFile() throws IOException { + String path = String.format(OUTPUT_FILE_FORMAT_STRING, TASKTRACKER_OUTPUT); + return lDirAlloc.getLocalPathToRead(path, conf); + } + + /** + * Create a local map output file name. + * + * @param size + * the size of the file + * @return path + * @throws IOException + */ + public Path getOutputFileForWrite(long size) throws IOException { + String path = String.format(OUTPUT_FILE_FORMAT_STRING, TASKTRACKER_OUTPUT); + return lDirAlloc.getLocalPathForWrite(path, size, conf); + } + + /** + * Return the path to a local map output index file created earlier + * + * @return path + * @throws IOException + */ + public Path getOutputIndexFile() throws IOException { + String path = String.format(OUTPUT_FILE_INDEX_FORMAT_STRING, TASKTRACKER_OUTPUT); + return lDirAlloc.getLocalPathToRead(path, conf); + } + + /** + * Create a local map output index file name. + * + * @param size + * the size of the file + * @return path + * @throws IOException + */ + public Path getOutputIndexFileForWrite(long size) throws IOException { + String path = String.format(OUTPUT_FILE_INDEX_FORMAT_STRING, TASKTRACKER_OUTPUT); + return lDirAlloc.getLocalPathForWrite(path, size, conf); + } + + /** + * Return a local map spill file created earlier. + * + * @param spillNumber + * the number + * @return path + * @throws IOException + */ + public Path getSpillFile(int spillNumber) throws IOException { + String path = String.format(SPILL_FILE_FORMAT_STRING, TASKTRACKER_OUTPUT, spillNumber); + return lDirAlloc.getLocalPathToRead(path, conf); + } + + /** + * Create a local map spill file name. + * + * @param spillNumber + * the number + * @param size + * the size of the file + * @return path + * @throws IOException + */ + public Path getSpillFileForWrite(int spillNumber, long size) throws IOException { + String path = String.format(SPILL_FILE_FORMAT_STRING, TASKTRACKER_OUTPUT, spillNumber); + return lDirAlloc.getLocalPathForWrite(path, size, conf); + } + + /** + * Return a local map spill index file created earlier + * + * @param spillNumber + * the number + * @return path + * @throws IOException + */ + public Path getSpillIndexFile(int spillNumber) throws IOException { + String path = String +.format(SPILL_INDEX_FILE_FORMAT_STRING, TASKTRACKER_OUTPUT, spillNumber); + return lDirAlloc.getLocalPathToRead(path, conf); + } + + /** + * Create a local map spill index file name. + * + * @param spillNumber + * the number + * @param size + * the size of the file + * @return path + * @throws IOException + */ + public Path getSpillIndexFileForWrite(int spillNumber, long size) throws IOException { + String path = String +.format(SPILL_INDEX_FILE_FORMAT_STRING, TASKTRACKER_OUTPUT, spillNumber); + return lDirAlloc.getLocalPathForWrite(path, size, conf); + } + + /** + * Return a local reduce input file created earlier + * + * @param mapId + * a map task id + * @return path + * @throws IOException + */ + public Path getInputFile(int mapId) throws IOException { + return lDirAlloc.getLocalPathToRead( + String.format(REDUCE_INPUT_FILE_FORMAT_STRING, TASKTRACKER_OUTPUT, Integer.valueOf(mapId)), + conf); + } + + /** + * Create a local reduce input file name. + * + * @param mapId + * a map task id + * @param size + * the size of the file + * @return path + * @throws IOException + */ + public Path getInputFileForWrite(TaskID mapId, long size, Configuration conf) throws IOException { + return lDirAlloc.getLocalPathForWrite( + String.format(REDUCE_INPUT_FILE_FORMAT_STRING, TASKTRACKER_OUTPUT, mapId.getId()), size, + conf); + } + + /** Removes all of the files related to a task. */ + public void removeAll() throws IOException { + conf.deleteLocalFiles(TASKTRACKER_OUTPUT); + } + + public String getOutputName(int partition) { + return String.format("part-%05d", partition); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/util/NativeTaskOutput.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/util/NativeTaskOutput.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/util/NativeTaskOutput.java new file mode 100644 index 0000000..3da8a0d --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/util/NativeTaskOutput.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask.util; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.TaskID; + +/** + * base class of output files manager. + */ +public interface NativeTaskOutput { + + /** + * Return the path to local map output file created earlier + * + * @return path + * @throws IOException + */ + public Path getOutputFile() throws IOException; + + /** + * Create a local map output file name. + * + * @param size + * the size of the file + * @return path + * @throws IOException + */ + public Path getOutputFileForWrite(long size) throws IOException; + + /** + * Return the path to a local map output index file created earlier + * + * @return path + * @throws IOException + */ + public Path getOutputIndexFile() throws IOException; + + /** + * Create a local map output index file name. + * + * @param size + * the size of the file + * @return path + * @throws IOException + */ + public Path getOutputIndexFileForWrite(long size) throws IOException; + + /** + * Return a local map spill file created earlier. + * + * @param spillNumber + * the number + * @return path + * @throws IOException + */ + public Path getSpillFile(int spillNumber) throws IOException; + + /** + * Create a local map spill file name. + * + * @param spillNumber + * the number + * @param size + * the size of the file + * @return path + * @throws IOException + */ + public Path getSpillFileForWrite(int spillNumber, long size) throws IOException; + + /** + * Return a local map spill index file created earlier + * + * @param spillNumber + * the number + * @return path + * @throws IOException + */ + public Path getSpillIndexFile(int spillNumber) throws IOException; + + /** + * Create a local map spill index file name. + * + * @param spillNumber + * the number + * @param size + * the size of the file + * @return path + * @throws IOException + */ + public Path getSpillIndexFileForWrite(int spillNumber, long size) throws IOException; + + /** + * Return a local reduce input file created earlier + * + * @param mapId + * a map task id + * @return path + * @throws IOException + */ + public Path getInputFile(int mapId) throws IOException; + + /** + * Create a local reduce input file name. + * + * @param mapId + * a map task id + * @param size + * the size of the file + * @return path + * @throws IOException + */ + public Path getInputFileForWrite(TaskID mapId, long size, Configuration conf) throws IOException; + + /** Removes all of the files related to a task. */ + public void removeAll() throws IOException; + + public String getOutputName(int partition); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/util/NativeTaskOutputFiles.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/util/NativeTaskOutputFiles.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/util/NativeTaskOutputFiles.java new file mode 100644 index 0000000..18cbddb --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/util/NativeTaskOutputFiles.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask.util; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.TaskID; + +/** + * Manipulate the working area for the transient store for maps and reduces. + * + * This class is used by map and reduce tasks to identify the directories that they need to write + * to/read from for intermediate files. The callers of these methods are from child space and see + * mapreduce.cluster.local.dir as taskTracker/jobCache/jobId/attemptId This class should not be used + * from TaskTracker space. + */ + +public class NativeTaskOutputFiles implements NativeTaskOutput { + + static final String TASKTRACKER_OUTPUT = "output"; + static final String REDUCE_INPUT_FILE_FORMAT_STRING = "%s/map_%d.out"; + static final String SPILL_FILE_FORMAT_STRING = "%s/%s/spill%d.out"; + static final String SPILL_INDEX_FILE_FORMAT_STRING = "%s/%s/spill%d.out.index"; + static final String OUTPUT_FILE_FORMAT_STRING = "%s/%s/file.out"; + static final String OUTPUT_FILE_INDEX_FORMAT_STRING = "%s/%s/file.out.index"; + + private String id; + private JobConf conf; + private LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir"); + + public NativeTaskOutputFiles(Configuration conf, String id) { + this.conf = new JobConf(conf); + this.id = id; + } + + /** + * Return the path to local map output file created earlier + * + * @return path + * @throws IOException + */ + public Path getOutputFile() throws IOException { + String path = String.format(OUTPUT_FILE_FORMAT_STRING, TASKTRACKER_OUTPUT, id); + return lDirAlloc.getLocalPathToRead(path, conf); + } + + /** + * Create a local map output file name. + * + * @param size + * the size of the file + * @return path + * @throws IOException + */ + public Path getOutputFileForWrite(long size) throws IOException { + String path = String.format(OUTPUT_FILE_FORMAT_STRING, TASKTRACKER_OUTPUT, id); + return lDirAlloc.getLocalPathForWrite(path, size, conf); + } + + /** + * Return the path to a local map output index file created earlier + * + * @return path + * @throws IOException + */ + public Path getOutputIndexFile() throws IOException { + String path = String.format(OUTPUT_FILE_INDEX_FORMAT_STRING, TASKTRACKER_OUTPUT, id); + return lDirAlloc.getLocalPathToRead(path, conf); + } + + /** + * Create a local map output index file name. + * + * @param size + * the size of the file + * @return path + * @throws IOException + */ + public Path getOutputIndexFileForWrite(long size) throws IOException { + String path = String.format(OUTPUT_FILE_INDEX_FORMAT_STRING, TASKTRACKER_OUTPUT, id); + return lDirAlloc.getLocalPathForWrite(path, size, conf); + } + + /** + * Return a local map spill file created earlier. + * + * @param spillNumber + * the number + * @return path + * @throws IOException + */ + public Path getSpillFile(int spillNumber) throws IOException { + String path = String.format(SPILL_FILE_FORMAT_STRING, id, TASKTRACKER_OUTPUT, spillNumber); + return lDirAlloc.getLocalPathToRead(path, conf); + } + + /** + * Create a local map spill file name. + * + * @param spillNumber + * the number + * @param size + * the size of the file + * @return path + * @throws IOException + */ + public Path getSpillFileForWrite(int spillNumber, long size) throws IOException { + String path = String.format(SPILL_FILE_FORMAT_STRING, id, TASKTRACKER_OUTPUT, spillNumber); + return lDirAlloc.getLocalPathForWrite(path, size, conf); + } + + /** + * Return a local map spill index file created earlier + * + * @param spillNumber + * the number + * @return path + * @throws IOException + */ + public Path getSpillIndexFile(int spillNumber) throws IOException { + String path = String + .format(SPILL_INDEX_FILE_FORMAT_STRING, id, TASKTRACKER_OUTPUT, spillNumber); + return lDirAlloc.getLocalPathToRead(path, conf); + } + + /** + * Create a local map spill index file name. + * + * @param spillNumber + * the number + * @param size + * the size of the file + * @return path + * @throws IOException + */ + public Path getSpillIndexFileForWrite(int spillNumber, long size) throws IOException { + String path = String + .format(SPILL_INDEX_FILE_FORMAT_STRING, id, TASKTRACKER_OUTPUT, spillNumber); + return lDirAlloc.getLocalPathForWrite(path, size, conf); + } + + /** + * Return a local reduce input file created earlier + * + * @param mapId + * a map task id + * @return path + * @throws IOException + */ + public Path getInputFile(int mapId) throws IOException { + return lDirAlloc.getLocalPathToRead( + String.format(REDUCE_INPUT_FILE_FORMAT_STRING, TASKTRACKER_OUTPUT, Integer.valueOf(mapId)), + conf); + } + + /** + * Create a local reduce input file name. + * + * @param mapId + * a map task id + * @param size + * the size of the file + * @return path + * @throws IOException + */ + public Path getInputFileForWrite(TaskID mapId, long size, Configuration conf) throws IOException { + return lDirAlloc.getLocalPathForWrite( + String.format(REDUCE_INPUT_FILE_FORMAT_STRING, TASKTRACKER_OUTPUT, mapId.getId()), size, + conf); + } + + /** Removes all of the files related to a task. */ + public void removeAll() throws IOException { + conf.deleteLocalFiles(TASKTRACKER_OUTPUT); + } + + public String getOutputName(int partition) { + return String.format("part-%05d", partition); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/util/OutputUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/util/OutputUtil.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/util/OutputUtil.java new file mode 100644 index 0000000..bdef796 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/util/OutputUtil.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.nativetask.util; + +import java.lang.reflect.Constructor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; + + +public class OutputUtil { + + private static Log LOG = LogFactory.getLog(OutputUtil.class); + public static final String NATIVE_TASK_OUTPUT_MANAGER = "nativetask.output.manager"; + + public static NativeTaskOutput createNativeTaskOutput(Configuration conf, String id) { + Class<?> clazz = conf.getClass(OutputUtil.NATIVE_TASK_OUTPUT_MANAGER, + NativeTaskOutputFiles.class); + LOG.info(OutputUtil.NATIVE_TASK_OUTPUT_MANAGER + " = " + clazz.getName()); + try { + Constructor<?> ctor = clazz.getConstructor(Configuration.class, String.class); + ctor.setAccessible(true); + NativeTaskOutput instance = (NativeTaskOutput) ctor.newInstance(conf, id); + return instance; + } catch (Exception e) { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/util/ReadWriteBuffer.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/util/ReadWriteBuffer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/util/ReadWriteBuffer.java new file mode 100644 index 0000000..5dcae14 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/util/ReadWriteBuffer.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.nativetask.util; + +public class ReadWriteBuffer { + private byte[] _buff; + private int _writePoint; + private int _readPoint; + final int CACHE_LINE_SIZE = 16; + + public ReadWriteBuffer(int length) { + if (length > 0) { + _buff = new byte[length]; + } + } + + public ReadWriteBuffer() { + _buff = new byte[CACHE_LINE_SIZE]; + } + + public ReadWriteBuffer(byte[] bytes) { + _buff = bytes; + _writePoint = 0; + _readPoint = 0; + } + + public void reset(byte[] newBuff) { + _buff = newBuff; + _writePoint = 0; + _readPoint = 0; + } + + public void setReadPoint(int pos) { + _readPoint = pos; + } + + public void setWritePoint(int pos) { + _writePoint = pos; + } + + public byte[] getBuff() { + return _buff; + } + + public int getWritePoint() { + return _writePoint; + } + + public int getReadPoint() { + return _readPoint; + } + + public void writeInt(int v) { + checkWriteSpaceAndResizeIfNecessary(4); + + _buff[_writePoint + 0] = (byte) ((v >>> 0) & 0xFF); + _buff[_writePoint + 1] = (byte) ((v >>> 8) & 0xFF); + _buff[_writePoint + 2] = (byte) ((v >>> 16) & 0xFF); + _buff[_writePoint + 3] = (byte) ((v >>> 24) & 0xFF); + + _writePoint += 4; + } + + public void writeLong(long v) { + checkWriteSpaceAndResizeIfNecessary(8); + + _buff[_writePoint + 0] = (byte) (v >>> 0); + _buff[_writePoint + 1] = (byte) (v >>> 8); + _buff[_writePoint + 2] = (byte) (v >>> 16); + _buff[_writePoint + 3] = (byte) (v >>> 24); + _buff[_writePoint + 4] = (byte) (v >>> 32); + _buff[_writePoint + 5] = (byte) (v >>> 40); + _buff[_writePoint + 6] = (byte) (v >>> 48); + _buff[_writePoint + 7] = (byte) (v >>> 56); + + _writePoint += 8; + } + + public void writeBytes(byte b[], int off, int len) { + writeInt(len); + checkWriteSpaceAndResizeIfNecessary(len); + System.arraycopy(b, off, _buff, _writePoint, len); + _writePoint += len; + } + + public int readInt() { + final int ch4 = 0xff & (_buff[_readPoint + 0]); + final int ch3 = 0xff & (_buff[_readPoint + 1]); + final int ch2 = 0xff & (_buff[_readPoint + 2]); + final int ch1 = 0xff & (_buff[_readPoint + 3]); + _readPoint += 4; + return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0)); + } + + public long readLong() { + final long result = ((_buff[_readPoint + 0] & 255) << 0) + ((_buff[_readPoint + 1] & 255) << 8) + + ((_buff[_readPoint + 2] & 255) << 16) + ((long) (_buff[_readPoint + 3] & 255) << 24) + + ((long) (_buff[_readPoint + 4] & 255) << 32) + ((long) (_buff[_readPoint + 5] & 255) << 40) + + ((long) (_buff[_readPoint + 6] & 255) << 48) + (((long) _buff[_readPoint + 7] << 56)); + + _readPoint += 8; + return result; + } + + public byte[] readBytes() { + final int length = readInt(); + final byte[] result = new byte[length]; + System.arraycopy(_buff, _readPoint, result, 0, length); + _readPoint += length; + return result; + } + + public void writeString(String str) { + final byte[] bytes = BytesUtil.toBytes(str); + writeBytes(bytes, 0, bytes.length); + } + + public String readString() { + final byte[] bytes = readBytes(); + return BytesUtil.fromBytes(bytes); + } + + private void checkWriteSpaceAndResizeIfNecessary(int toBeWritten) { + + if (_buff.length - _writePoint >= toBeWritten) { + return; + } + final int newLength = (toBeWritten + _writePoint > CACHE_LINE_SIZE) ? (toBeWritten + _writePoint) : CACHE_LINE_SIZE; + final byte[] newBuff = new byte[newLength]; + System.arraycopy(_buff, 0, newBuff, 0, _writePoint); + _buff = newBuff; + } + +}; http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/util/SizedWritable.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/util/SizedWritable.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/util/SizedWritable.java new file mode 100644 index 0000000..5dc099d --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/util/SizedWritable.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask.util; + +import java.io.IOException; + +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.ReflectionUtils; + +public class SizedWritable<T> { + public static int INVALID_LENGTH = -1; + + public int length = INVALID_LENGTH; + public Writable v; + + public SizedWritable(Class<?> klass) { + if (null != klass) { + v = (Writable) ReflectionUtils.newInstance(klass, null); + } + length = INVALID_LENGTH; + } + + public void readFields(DataInputBuffer key) throws IOException { + if (null != key) { + this.v.readFields(key); + this.length = INVALID_LENGTH; + } else { + throw new IOException("input key is null"); + } + + } + + public void reset(T w) { + this.v = (Writable) w; + this.length = INVALID_LENGTH; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/util/SnappyUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/util/SnappyUtil.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/util/SnappyUtil.java new file mode 100644 index 0000000..fb15960 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/util/SnappyUtil.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask.util; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.io.compress.SnappyCodec; + +public class SnappyUtil { + + public static boolean isNativeSnappyLoaded(Configuration conf) { + return SnappyCodec.isNativeCodeLoaded() && conf.getBoolean( + CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, + CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_DEFAULT); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/COPYING ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/COPYING b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/COPYING new file mode 100644 index 0000000..c5d3420 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/COPYING @@ -0,0 +1,83 @@ +CityHash +--------------------------------------------------------------------- +// Copyright (c) 2011 Google, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +GoogleTest +--------------------------------------------------------------------- +Copyright 2008, Google Inc. +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + +LZ4 +--------------------------------------------------------------------- + LZ4 - Fast LZ compression algorithm + Header File + Copyright (C) 2011, Yann Collet. + BSD License + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are + met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above + copyright notice, this list of conditions and the following disclaimer + in the documentation and/or other materials provided with the + distribution. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/cityhash/city.cc ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/cityhash/city.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/cityhash/city.cc new file mode 100644 index 0000000..36ff93b --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/cityhash/city.cc @@ -0,0 +1,307 @@ +// Copyright (c) 2011 Google, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. +// +// CityHash Version 1, by Geoff Pike and Jyrki Alakuijala +// +// This file provides CityHash64() and related functions. +// +// It's probably possible to create even faster hash functions by +// writing a program that systematically explores some of the space of +// possible hash functions, by using SIMD instructions, or by +// compromising on hash quality. + +#include "city.h" + +#include <algorithm> + +using namespace std; + +#define UNALIGNED_LOAD64(p) (*(const uint64*)(p)) +#define UNALIGNED_LOAD32(p) (*(const uint32*)(p)) + +#if !defined(LIKELY) +#if defined(__GNUC__) +#define LIKELY(x) (__builtin_expect(!!(x), 1)) +#else +#define LIKELY(x) (x) +#endif +#endif + +// Some primes between 2^63 and 2^64 for various uses. +static const uint64 k0 = 0xc3a5c85c97cb3127ULL; +static const uint64 k1 = 0xb492b66fbe98f273ULL; +static const uint64 k2 = 0x9ae16a3b2f90404fULL; +static const uint64 k3 = 0xc949d7c7509e6557ULL; + +// Bitwise right rotate. Normally this will compile to a single +// instruction, especially if the shift is a manifest constant. +static uint64 Rotate(uint64 val, int shift) { + // Avoid shifting by 64: doing so yields an undefined result. + return shift == 0 ? val : ((val >> shift) | (val << (64 - shift))); +} + +// Equivalent to Rotate(), but requires the second arg to be non-zero. +// On x86-64, and probably others, it's possible for this to compile +// to a single instruction if both args are already in registers. +static uint64 RotateByAtLeast1(uint64 val, int shift) { + return (val >> shift) | (val << (64 - shift)); +} + +static uint64 ShiftMix(uint64 val) { + return val ^ (val >> 47); +} + +static uint64 HashLen16(uint64 u, uint64 v) { + return Hash128to64(uint128(u, v)); +} + +static uint64 HashLen0to16(const char *s, size_t len) { + if (len > 8) { + uint64 a = UNALIGNED_LOAD64(s); + uint64 b = UNALIGNED_LOAD64(s + len - 8); + return HashLen16(a, RotateByAtLeast1(b + len, len)) ^ b; + } + if (len >= 4) { + uint64 a = UNALIGNED_LOAD32(s); + return HashLen16(len + (a << 3), UNALIGNED_LOAD32(s + len - 4)); + } + if (len > 0) { + uint8 a = s[0]; + uint8 b = s[len >> 1]; + uint8 c = s[len - 1]; + uint32 y = static_cast<uint32>(a) + (static_cast<uint32>(b) << 8); + uint32 z = len + (static_cast<uint32>(c) << 2); + return ShiftMix(y * k2 ^ z * k3) * k2; + } + return k2; +} + +// This probably works well for 16-byte strings as well, but it may be overkill +// in that case. +static uint64 HashLen17to32(const char *s, size_t len) { + uint64 a = UNALIGNED_LOAD64(s) * k1; + uint64 b = UNALIGNED_LOAD64(s + 8); + uint64 c = UNALIGNED_LOAD64(s + len - 8) * k2; + uint64 d = UNALIGNED_LOAD64(s + len - 16) * k0; + return HashLen16(Rotate(a - b, 43) + Rotate(c, 30) + d, + a + Rotate(b ^ k3, 20) - c + len); +} + +// Return a 16-byte hash for 48 bytes. Quick and dirty. +// Callers do best to use "random-looking" values for a and b. +static pair<uint64, uint64> WeakHashLen32WithSeeds( + uint64 w, uint64 x, uint64 y, uint64 z, uint64 a, uint64 b) { + a += w; + b = Rotate(b + a + z, 21); + uint64 c = a; + a += x; + a += y; + b += Rotate(a, 44); + return make_pair(a + z, b + c); +} + +// Return a 16-byte hash for s[0] ... s[31], a, and b. Quick and dirty. +static pair<uint64, uint64> WeakHashLen32WithSeeds( + const char* s, uint64 a, uint64 b) { + return WeakHashLen32WithSeeds(UNALIGNED_LOAD64(s), + UNALIGNED_LOAD64(s + 8), + UNALIGNED_LOAD64(s + 16), + UNALIGNED_LOAD64(s + 24), + a, + b); +} + +// Return an 8-byte hash for 33 to 64 bytes. +static uint64 HashLen33to64(const char *s, size_t len) { + uint64 z = UNALIGNED_LOAD64(s + 24); + uint64 a = UNALIGNED_LOAD64(s) + (len + UNALIGNED_LOAD64(s + len - 16)) * k0; + uint64 b = Rotate(a + z, 52); + uint64 c = Rotate(a, 37); + a += UNALIGNED_LOAD64(s + 8); + c += Rotate(a, 7); + a += UNALIGNED_LOAD64(s + 16); + uint64 vf = a + z; + uint64 vs = b + Rotate(a, 31) + c; + a = UNALIGNED_LOAD64(s + 16) + UNALIGNED_LOAD64(s + len - 32); + z = UNALIGNED_LOAD64(s + len - 8); + b = Rotate(a + z, 52); + c = Rotate(a, 37); + a += UNALIGNED_LOAD64(s + len - 24); + c += Rotate(a, 7); + a += UNALIGNED_LOAD64(s + len - 16); + uint64 wf = a + z; + uint64 ws = b + Rotate(a, 31) + c; + uint64 r = ShiftMix((vf + ws) * k2 + (wf + vs) * k0); + return ShiftMix(r * k0 + vs) * k2; +} + +uint64 CityHash64(const char *s, size_t len) { + if (len <= 32) { + if (len <= 16) { + return HashLen0to16(s, len); + } else { + return HashLen17to32(s, len); + } + } else if (len <= 64) { + return HashLen33to64(s, len); + } + + // For strings over 64 bytes we hash the end first, and then as we + // loop we keep 56 bytes of state: v, w, x, y, and z. + uint64 x = UNALIGNED_LOAD64(s); + uint64 y = UNALIGNED_LOAD64(s + len - 16) ^ k1; + uint64 z = UNALIGNED_LOAD64(s + len - 56) ^ k0; + pair<uint64, uint64> v = WeakHashLen32WithSeeds(s + len - 64, len, y); + pair<uint64, uint64> w = WeakHashLen32WithSeeds(s + len - 32, len * k1, k0); + z += ShiftMix(v.second) * k1; + x = Rotate(z + x, 39) * k1; + y = Rotate(y, 33) * k1; + + // Decrease len to the nearest multiple of 64, and operate on 64-byte chunks. + len = (len - 1) & ~static_cast<size_t>(63); + do { + x = Rotate(x + y + v.first + UNALIGNED_LOAD64(s + 16), 37) * k1; + y = Rotate(y + v.second + UNALIGNED_LOAD64(s + 48), 42) * k1; + x ^= w.second; + y ^= v.first; + z = Rotate(z ^ w.first, 33); + v = WeakHashLen32WithSeeds(s, v.second * k1, x + w.first); + w = WeakHashLen32WithSeeds(s + 32, z + w.second, y); + std::swap(z, x); + s += 64; + len -= 64; + } while (len != 0); + return HashLen16(HashLen16(v.first, w.first) + ShiftMix(y) * k1 + z, + HashLen16(v.second, w.second) + x); +} + +uint64 CityHash64WithSeed(const char *s, size_t len, uint64 seed) { + return CityHash64WithSeeds(s, len, k2, seed); +} + +uint64 CityHash64WithSeeds(const char *s, size_t len, + uint64 seed0, uint64 seed1) { + return HashLen16(CityHash64(s, len) - seed0, seed1); +} + +// A subroutine for CityHash128(). Returns a decent 128-bit hash for strings +// of any length representable in ssize_t. Based on City and Murmur. +static uint128 CityMurmur(const char *s, size_t len, uint128 seed) { + uint64 a = Uint128Low64(seed); + uint64 b = Uint128High64(seed); + uint64 c = 0; + uint64 d = 0; + ssize_t l = len - 16; + if (l <= 0) { // len <= 16 + c = b * k1 + HashLen0to16(s, len); + d = Rotate(a + (len >= 8 ? UNALIGNED_LOAD64(s) : c), 32); + } else { // len > 16 + c = HashLen16(UNALIGNED_LOAD64(s + len - 8) + k1, a); + d = HashLen16(b + len, c + UNALIGNED_LOAD64(s + len - 16)); + a += d; + do { + a ^= ShiftMix(UNALIGNED_LOAD64(s) * k1) * k1; + a *= k1; + b ^= a; + c ^= ShiftMix(UNALIGNED_LOAD64(s + 8) * k1) * k1; + c *= k1; + d ^= c; + s += 16; + l -= 16; + } while (l > 0); + } + a = HashLen16(a, c); + b = HashLen16(d, b); + return uint128(a ^ b, HashLen16(b, a)); +} + +uint128 CityHash128WithSeed(const char *s, size_t len, uint128 seed) { + if (len < 128) { + return CityMurmur(s, len, seed); + } + + // We expect len >= 128 to be the common case. Keep 56 bytes of state: + // v, w, x, y, and z. + pair<uint64, uint64> v, w; + uint64 x = Uint128Low64(seed); + uint64 y = Uint128High64(seed); + uint64 z = len * k1; + v.first = Rotate(y ^ k1, 49) * k1 + UNALIGNED_LOAD64(s); + v.second = Rotate(v.first, 42) * k1 + UNALIGNED_LOAD64(s + 8); + w.first = Rotate(y + z, 35) * k1 + x; + w.second = Rotate(x + UNALIGNED_LOAD64(s + 88), 53) * k1; + + // This is the same inner loop as CityHash64(), manually unrolled. + do { + x = Rotate(x + y + v.first + UNALIGNED_LOAD64(s + 16), 37) * k1; + y = Rotate(y + v.second + UNALIGNED_LOAD64(s + 48), 42) * k1; + x ^= w.second; + y ^= v.first; + z = Rotate(z ^ w.first, 33); + v = WeakHashLen32WithSeeds(s, v.second * k1, x + w.first); + w = WeakHashLen32WithSeeds(s + 32, z + w.second, y); + std::swap(z, x); + s += 64; + x = Rotate(x + y + v.first + UNALIGNED_LOAD64(s + 16), 37) * k1; + y = Rotate(y + v.second + UNALIGNED_LOAD64(s + 48), 42) * k1; + x ^= w.second; + y ^= v.first; + z = Rotate(z ^ w.first, 33); + v = WeakHashLen32WithSeeds(s, v.second * k1, x + w.first); + w = WeakHashLen32WithSeeds(s + 32, z + w.second, y); + std::swap(z, x); + s += 64; + len -= 128; + } while (LIKELY(len >= 128)); + y += Rotate(w.first, 37) * k0 + z; + x += Rotate(v.first + z, 49) * k0; + // If 0 < len < 128, hash up to 4 chunks of 32 bytes each from the end of s. + for (size_t tail_done = 0; tail_done < len; ) { + tail_done += 32; + y = Rotate(y - x, 42) * k0 + v.second; + w.first += UNALIGNED_LOAD64(s + len - tail_done + 16); + x = Rotate(x, 49) * k0 + w.first; + w.first += v.first; + v = WeakHashLen32WithSeeds(s + len - tail_done, v.first, v.second); + } + // At this point our 48 bytes of state should contain more than + // enough information for a strong 128-bit hash. We use two + // different 48-byte-to-8-byte hashes to get a 16-byte final result. + x = HashLen16(x, v.first); + y = HashLen16(y, w.first); + return uint128(HashLen16(x + v.second, w.second) + y, + HashLen16(x + w.second, y + v.second)); +} + +uint128 CityHash128(const char *s, size_t len) { + if (len >= 16) { + return CityHash128WithSeed(s + 16, + len - 16, + uint128(UNALIGNED_LOAD64(s) ^ k3, + UNALIGNED_LOAD64(s + 8))); + } else if (len >= 8) { + return CityHash128WithSeed(NULL, + 0, + uint128(UNALIGNED_LOAD64(s) ^ (len * k0), + UNALIGNED_LOAD64(s + len - 8) ^ k1)); + } else { + return CityHash128WithSeed(s, len, uint128(k0, k1)); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/cityhash/city.h ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/cityhash/city.h b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/cityhash/city.h new file mode 100644 index 0000000..7b290be --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/cityhash/city.h @@ -0,0 +1,90 @@ +// Copyright (c) 2011 Google, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. +// +// CityHash Version 1, by Geoff Pike and Jyrki Alakuijala +// +// This file provides a few functions for hashing strings. On x86-64 +// hardware in 2011, CityHash64() is faster than other high-quality +// hash functions, such as Murmur. This is largely due to higher +// instruction-level parallelism. CityHash64() and CityHash128() also perform +// well on hash-quality tests. +// +// CityHash128() is optimized for relatively long strings and returns +// a 128-bit hash. For strings more than about 2000 bytes it can be +// faster than CityHash64(). +// +// Functions in the CityHash family are not suitable for cryptography. +// +// WARNING: This code has not been tested on big-endian platforms! +// It is known to work well on little-endian platforms that have a small penalty +// for unaligned reads, such as current Intel and AMD moderate-to-high-end CPUs. +// +// By the way, for some hash functions, given strings a and b, the hash +// of a+b is easily derived from the hashes of a and b. This property +// doesn't hold for any hash functions in this file. + +#ifndef CITY_HASH_H_ +#define CITY_HASH_H_ + +#include <stdlib.h> // for size_t. +#include <stdint.h> +#include <utility> + +typedef uint8_t uint8; +typedef uint32_t uint32; +typedef uint64_t uint64; +typedef std::pair<uint64, uint64> uint128; + +inline uint64 Uint128Low64(const uint128& x) { return x.first; } +inline uint64 Uint128High64(const uint128& x) { return x.second; } + +// Hash function for a byte array. +uint64 CityHash64(const char *buf, size_t len); + +// Hash function for a byte array. For convenience, a 64-bit seed is also +// hashed into the result. +uint64 CityHash64WithSeed(const char *buf, size_t len, uint64 seed); + +// Hash function for a byte array. For convenience, two seeds are also +// hashed into the result. +uint64 CityHash64WithSeeds(const char *buf, size_t len, + uint64 seed0, uint64 seed1); + +// Hash function for a byte array. +uint128 CityHash128(const char *s, size_t len); + +// Hash function for a byte array. For convenience, a 128-bit seed is also +// hashed into the result. +uint128 CityHash128WithSeed(const char *s, size_t len, uint128 seed); + +// Hash 128 input bits down to 64 bits of output. +// This is intended to be a reasonably good hash function. +inline uint64 Hash128to64(const uint128& x) { + // Murmur-inspired hashing. + const uint64 kMul = 0x9ddfea08eb382d69ULL; + uint64 a = (Uint128Low64(x) ^ Uint128High64(x)) * kMul; + a ^= (a >> 47); + uint64 b = (Uint128High64(x) ^ a) * kMul; + b ^= (b >> 47); + b *= kMul; + return b; +} + +#endif // CITY_HASH_H_