http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/resources/META-INF/services/org.apache.hadoop.mapred.nativetask.Platform ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/resources/META-INF/services/org.apache.hadoop.mapred.nativetask.Platform b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/resources/META-INF/services/org.apache.hadoop.mapred.nativetask.Platform new file mode 100644 index 0000000..eef215d --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/resources/META-INF/services/org.apache.hadoop.mapred.nativetask.Platform @@ -0,0 +1,14 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +org.apache.hadoop.mapred.nativetask.HadoopPlatform
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/TestTaskContext.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/TestTaskContext.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/TestTaskContext.java new file mode 100644 index 0000000..a76b1b2 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/TestTaskContext.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask; + +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; + +import junit.framework.TestCase; + +public class TestTaskContext extends TestCase { + + public void testTaskContext() { + TaskContext context = new TaskContext(null, null, null, null, null, null, null); + + context.setInputKeyClass(IntWritable.class); + assertEquals(IntWritable.class.getName(), context.getInputKeyClass().getName()); + + context.setInputValueClass(Text.class); + assertEquals(Text.class.getName(), context.getInputValueClass().getName()); + + context.setOutputKeyClass(LongWritable.class); + assertEquals(LongWritable.class.getName(), context.getOuputKeyClass().getName()); + + context.setOutputValueClass(FloatWritable.class); + assertEquals(FloatWritable.class.getName(), context.getOutputValueClass().getName()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestBufferPushPull.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestBufferPushPull.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestBufferPushPull.java new file mode 100644 index 0000000..5dcac35 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestBufferPushPull.java @@ -0,0 +1,278 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.nativetask.buffer; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import junit.framework.Assert; +import junit.framework.TestCase; + +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.mapred.RawKeyValueIterator; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.nativetask.DataReceiver; +import org.apache.hadoop.mapred.nativetask.NativeDataSource; +import org.apache.hadoop.mapred.nativetask.NativeDataTarget; +import org.apache.hadoop.mapred.nativetask.handlers.BufferPullee; +import org.apache.hadoop.mapred.nativetask.handlers.BufferPuller; +import org.apache.hadoop.mapred.nativetask.handlers.BufferPushee; +import org.apache.hadoop.mapred.nativetask.handlers.BufferPusher; +import org.apache.hadoop.mapred.nativetask.handlers.IDataLoader; +import org.apache.hadoop.mapred.nativetask.testutil.TestInput; +import org.apache.hadoop.mapred.nativetask.testutil.TestInput.KV; +import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer; +import org.apache.hadoop.util.Progress; +import org.junit.Before; + +@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" }) +public class TestBufferPushPull extends TestCase { + + public static int BUFFER_LENGTH = 100; // 100 bytes + public static int INPUT_KV_COUNT = 1000; + private KV<BytesWritable, BytesWritable>[] dataInput; + + @Override + @Before + public void setUp() { + this.dataInput = TestInput.getMapInputs(INPUT_KV_COUNT); + } + + public void testPush() throws Exception { + final byte[] buff = new byte[BUFFER_LENGTH]; + + final InputBuffer input = new InputBuffer(buff); + + final OutputBuffer out = new OutputBuffer(buff); + + final Class<BytesWritable> iKClass = BytesWritable.class; + final Class<BytesWritable> iVClass = BytesWritable.class; + + final RecordWriterForPush writer = new RecordWriterForPush() { + @Override + public void write(BytesWritable key, BytesWritable value) throws IOException { + final KV expect = dataInput[count++]; + Assert.assertEquals(expect.key.toString(), key.toString()); + Assert.assertEquals(expect.value.toString(), value.toString()); + } + }; + + final BufferPushee pushee = new BufferPushee(iKClass, iVClass, writer); + + final PushTarget handler = new PushTarget(out) { + + @Override + public void sendData() throws IOException { + final int outputLength = out.length(); + input.rewind(0, outputLength); + out.rewind(); + pushee.collect(input); + } + }; + + final BufferPusher pusher = new BufferPusher(iKClass, iVClass, handler); + + writer.reset(); + for (int i = 0; i < INPUT_KV_COUNT; i++) { + pusher.collect(dataInput[i].key, dataInput[i].value); + } + pusher.close(); + pushee.close(); + } + + public void testPull() throws Exception { + final byte[] buff = new byte[BUFFER_LENGTH]; + + final InputBuffer input = new InputBuffer(buff); + + final OutputBuffer out = new OutputBuffer(buff); + + final Class<BytesWritable> iKClass = BytesWritable.class; + final Class<BytesWritable> iVClass = BytesWritable.class; + + final NativeHandlerForPull handler = new NativeHandlerForPull(input, out); + + final KeyValueIterator iter = new KeyValueIterator(); + final BufferPullee pullee = new BufferPullee(iKClass, iVClass, iter, handler); + handler.setDataLoader(pullee); + + final BufferPuller puller = new BufferPuller(handler); + handler.setDataReceiver(puller); + + int count = 0; + + while (puller.next()) { + final DataInputBuffer key = puller.getKey(); + final DataInputBuffer value = puller.getValue(); + + final BytesWritable keyBytes = new BytesWritable(); + final BytesWritable valueBytes = new BytesWritable(); + + keyBytes.readFields(key); + valueBytes.readFields(value); + + Assert.assertEquals(dataInput[count].key.toString(), keyBytes.toString()); + Assert.assertEquals(dataInput[count].value.toString(), valueBytes.toString()); + + count++; + } + + puller.close(); + pullee.close(); + } + + public abstract class PushTarget implements NativeDataTarget { + OutputBuffer out; + + PushTarget(OutputBuffer out) { + this.out = out; + } + + @Override + public abstract void sendData() throws IOException; + + @Override + public void finishSendData() throws IOException { + sendData(); + } + + @Override + public OutputBuffer getOutputBuffer() { + return out; + } + } + + public abstract class RecordWriterForPush implements RecordWriter<BytesWritable, BytesWritable> { + + protected int count = 0; + + RecordWriterForPush() { + } + + @Override + public abstract void write(BytesWritable key, BytesWritable value) throws IOException; + + @Override + public void close(Reporter reporter) throws IOException { + } + + public void reset() { + count = 0; + } + }; + + public static class NativeHandlerForPull implements NativeDataSource, NativeDataTarget { + + InputBuffer in; + private final OutputBuffer out; + + private IDataLoader dataLoader; + private DataReceiver dataReceiver; + + public NativeHandlerForPull(InputBuffer input, OutputBuffer out) { + this.in = input; + this.out = out; + } + + @Override + public InputBuffer getInputBuffer() { + return in; + } + + @Override + public void setDataReceiver(DataReceiver handler) { + this.dataReceiver = handler; + } + + @Override + public void loadData() throws IOException { + final int size = dataLoader.load(); + } + + public void setDataLoader(IDataLoader dataLoader) { + this.dataLoader = dataLoader; + } + + @Override + public void sendData() throws IOException { + final int len = out.length(); + out.rewind(); + in.rewind(0, len); + dataReceiver.receiveData(); + } + + @Override + public void finishSendData() throws IOException { + dataReceiver.receiveData(); + } + + @Override + public OutputBuffer getOutputBuffer() { + return this.out; + } + } + + public class KeyValueIterator implements RawKeyValueIterator { + int count = 0; + BytesWritable key; + BytesWritable value; + + @Override + public DataInputBuffer getKey() throws IOException { + return convert(key); + } + + @Override + public DataInputBuffer getValue() throws IOException { + return convert(value); + } + + private DataInputBuffer convert(BytesWritable b) throws IOException { + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + b.write(new DataOutputStream(out)); + final byte[] array = out.toByteArray(); + final DataInputBuffer result = new DataInputBuffer(); + result.reset(array, array.length); + return result; + } + + @Override + public boolean next() throws IOException { + if (count < INPUT_KV_COUNT) { + key = dataInput[count].key; + value = dataInput[count].key; + count++; + return true; + } + return false; + } + + @Override + public void close() throws IOException { + } + + @Override + public Progress getProgress() { + return null; + } + }; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestByteBufferReadWrite.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestByteBufferReadWrite.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestByteBufferReadWrite.java new file mode 100644 index 0000000..424354b --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestByteBufferReadWrite.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask.buffer; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; + +import org.apache.hadoop.mapred.nativetask.NativeDataTarget; + +import junit.framework.Assert; +import junit.framework.TestCase; + +public class TestByteBufferReadWrite extends TestCase{ + + + public void testReadWrite() throws IOException { + byte[] buff = new byte[10000]; + + InputBuffer input = new InputBuffer(buff); + MockDataTarget target = new MockDataTarget(buff); + ByteBufferDataWriter writer = new ByteBufferDataWriter(target); + + writer.write(1); + writer.write(new byte[] {2, 2}, 0, 2); + writer.writeBoolean(true); + writer.writeByte(4); + writer.writeShort(5); + writer.writeChar(6); + writer.writeInt(7); + writer.writeLong(8); + writer.writeFloat(9); + writer.writeDouble(10); + writer.writeBytes("goodboy"); + writer.writeChars("hello"); + writer.writeUTF("native task"); + + int length = target.getOutputBuffer().length(); + input.rewind(0, length); + ByteBufferDataReader reader = new ByteBufferDataReader(input); + + Assert.assertEquals(1, reader.read()); + byte[] two = new byte[2]; + reader.read(two); + Assert.assertTrue(two[0] == two[1] && two[0] == 2); + + + Assert.assertEquals(true, reader.readBoolean()); + Assert.assertEquals(4, reader.readByte()); + Assert.assertEquals(5, reader.readShort()); + Assert.assertEquals(6, reader.readChar()); + Assert.assertEquals(7, reader.readInt()); + Assert.assertEquals(8, reader.readLong()); + Assert.assertTrue(reader.readFloat() - 9 < 0.0001); + Assert.assertTrue(reader.readDouble() - 10 < 0.0001); + + byte[] goodboy = new byte["goodboy".length()]; + reader.read(goodboy); + Assert.assertEquals("goodboy", toString(goodboy)); + + char[] hello = new char["hello".length()]; + for (int i = 0; i < hello.length; i++) { + hello[i] = reader.readChar(); + } + + String helloString = new String(hello); + Assert.assertEquals("hello", helloString); + + Assert.assertEquals("native task", reader.readUTF()); + + Assert.assertEquals(0, input.remaining()); + } + + public void testShortOfSpace() throws IOException { + byte[] buff = new byte[10]; + MockDataTarget target = new MockDataTarget(buff); + ByteBufferDataWriter writer = new ByteBufferDataWriter(target); + Assert.assertEquals(false, writer.hasUnFlushedData()); + + writer.write(1); + writer.write(new byte[] {2, 2}, 0, 2); + Assert.assertEquals(true, writer.hasUnFlushedData()); + + Assert.assertEquals(true, writer.shortOfSpace(100)); + } + + public void testFlush() throws IOException { + byte[] buff = new byte[10]; + final Counter flushCount = new Counter(); + final Flag finishFlag = new Flag(); + MockDataTarget target = new MockDataTarget(buff) { + @Override + public void sendData() throws IOException { + flushCount.increase(); + } + + @Override + public void finishSendData() throws IOException { + finishFlag.set(true); + } + }; + + ByteBufferDataWriter writer = new ByteBufferDataWriter(target); + Assert.assertEquals(false, writer.hasUnFlushedData()); + + writer.write(1); + writer.write(new byte[100]); + + Assert.assertEquals(true, writer.hasUnFlushedData()); + writer.close(); + Assert.assertEquals(11, flushCount.get()); + Assert.assertEquals(true, finishFlag.get()); + + } + + private static String toString(byte[] str) throws UnsupportedEncodingException { + return new String(str, 0, str.length, "UTF-8"); + } + + private static class MockDataTarget implements NativeDataTarget { + + private OutputBuffer out; + + MockDataTarget(byte[] buffer) { + this.out = new OutputBuffer(buffer); + } + + @Override + public void sendData() throws IOException { + + } + + @Override + public void finishSendData() throws IOException { + + } + + @Override + public OutputBuffer getOutputBuffer() { + return out; + } + } + + private static class Counter { + private int count; + + public int get() { + return count; + } + + public void increase() { + count++; + } + } + + private static class Flag { + private boolean value; + + public void set(boolean status) { + this.value = status; + } + + public boolean get() { + return this.value; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestDirectBufferPool.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestDirectBufferPool.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestDirectBufferPool.java new file mode 100644 index 0000000..09c1ef5 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestDirectBufferPool.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.nativetask.buffer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.ArrayList; + +import org.junit.Test; + +public class TestDirectBufferPool { + + @Test + public void testGetInstance() throws Exception { + final int num = 100; + List<DirectBufferPool> pools = new ArrayList<DirectBufferPool>(); + Thread[] list = new Thread[num]; + for (int i = 0; i < num; i++) { + Thread t = getPoolThread(pools); + t.start(); + list[i] = t; + } + for (int i = 0; i < num; i++) { + try { + list[i].join(10000); + } catch (Exception e) { + e.printStackTrace(); + } + } + DirectBufferPool p1 = pools.get(0); + assertNotNull(p1); + for (int i = 1; i < pools.size(); i++) { + DirectBufferPool p2 = pools.get(i); + assertNotNull(p2); + assertSame(p1, p2); + } + } + + private Thread getPoolThread(final List<DirectBufferPool> pools) { + Thread t = new Thread() { + public void run() { + pools.add(DirectBufferPool.getInstance()); + } + }; + return t; + } + + + @Test + public void testBufBorrow() throws IOException { + final DirectBufferPool bufferPool = DirectBufferPool.getInstance(); + ByteBuffer b1 = bufferPool.borrowBuffer(100); + assertTrue(b1.isDirect()); + assertEquals(0, b1.position()); + assertEquals(100, b1.capacity()); + bufferPool.returnBuffer(b1); + ByteBuffer b2 = bufferPool.borrowBuffer(100); + assertTrue(b2.isDirect()); + assertEquals(0, b2.position()); + assertEquals(100, b2.capacity()); + assertSame(b1, b2); + + ByteBuffer b3 = bufferPool.borrowBuffer(100); + assertTrue(b3.isDirect()); + assertEquals(0, b3.position()); + assertEquals(100, b3.capacity()); + assertNotSame(b2, b3); + bufferPool.returnBuffer(b2); + bufferPool.returnBuffer(b3); + } + + @Test + public void testBufReset() throws IOException { + final DirectBufferPool bufferPool = DirectBufferPool.getInstance(); + ByteBuffer b1 = bufferPool.borrowBuffer(100); + assertTrue(b1.isDirect()); + assertEquals(0, b1.position()); + assertEquals(100, b1.capacity()); + b1.putInt(1); + assertEquals(4, b1.position()); + bufferPool.returnBuffer(b1); + ByteBuffer b2 = bufferPool.borrowBuffer(100); + assertSame(b1, b2); + assertTrue(b2.isDirect()); + assertEquals(0, b2.position()); + assertEquals(100, b2.capacity()); + } + + @Test + public void testBufReturn() throws IOException { + final DirectBufferPool bufferPool = DirectBufferPool.getInstance(); + int numOfBufs = 100; + int capacity = 100; + final ByteBuffer[] bufs = new ByteBuffer[numOfBufs]; + for (int i = 0; i < numOfBufs; i++) { + bufs[i] = bufferPool.borrowBuffer(capacity); + } + + assertEquals(0, bufferPool.getBufCountsForCapacity(capacity)); + + + int numOfThreads = numOfBufs; + Thread[] list = new Thread[numOfThreads]; + for (int i = 0; i < numOfThreads; i++) { + Thread t = retBufThread(bufferPool, bufs, i); + t.start(); + list[i] = t; + } + for (int i = 0; i < numOfThreads; i++) { + try { + list[i].join(10000); + } catch (Exception e) { + e.printStackTrace(); + } + } + + assertEquals(numOfBufs, bufferPool.getBufCountsForCapacity(capacity)); + } + + private Thread retBufThread(final DirectBufferPool bufferPool, final ByteBuffer[] bufs, final int i) { + Thread t = new Thread(new Runnable(){ + @Override + public void run() { + try { + bufferPool.returnBuffer(bufs[i]); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + return t; + } + + @Test + public void testBufException() { + final DirectBufferPool bufferPool = DirectBufferPool.getInstance(); + boolean thrown = false; + try { + bufferPool.returnBuffer(null); + } catch (IOException e) { + thrown = true; + } + assertEquals(true, thrown); + + thrown = false; + ByteBuffer buf = ByteBuffer.allocate(100); + try { + bufferPool.returnBuffer(buf); + } catch (IOException e) { + thrown = true; + } + assertEquals(true, thrown); + } + + @Test + public void testBufWeakRefClear() throws IOException { + final DirectBufferPool bufferPool = DirectBufferPool.getInstance(); + int numOfBufs = 100; + int capacity = 100; + ByteBuffer[] list = new ByteBuffer[capacity]; + for (int i = 0; i < numOfBufs; i++) { + list[i] = bufferPool.borrowBuffer(capacity); + } + for (int i = 0; i < numOfBufs; i++) { + bufferPool.returnBuffer(list[i]); + list[i] = null; + } + + assertEquals(numOfBufs, bufferPool.getBufCountsForCapacity(capacity)); + + for (int i = 0; i < 3; i++) { + System.gc(); + } + + ByteBuffer b = bufferPool.borrowBuffer(capacity); + assertEquals(0, bufferPool.getBufCountsForCapacity(capacity)); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestInputBuffer.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestInputBuffer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestInputBuffer.java new file mode 100644 index 0000000..7eb6467 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestInputBuffer.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask.buffer; + +import java.io.IOException; + +import junit.framework.TestCase; + +import org.junit.Assert; + +public class TestInputBuffer extends TestCase { + public void testInputBuffer() throws IOException { + final int size = 100; + final InputBuffer input1 = new InputBuffer(BufferType.DIRECT_BUFFER, size); + Assert.assertEquals(input1.getType(), BufferType.DIRECT_BUFFER); + + Assert.assertTrue(input1.position() == 0); + Assert.assertTrue(input1.length() == 0); + Assert.assertTrue(input1.remaining() == 0); + Assert.assertTrue(input1.capacity() == size); + + final InputBuffer input2 = new InputBuffer(BufferType.HEAP_BUFFER, size); + Assert.assertEquals(input2.getType(), BufferType.HEAP_BUFFER); + + Assert.assertTrue(input2.position() == 0); + Assert.assertTrue(input2.length() == 0); + Assert.assertTrue(input2.remaining() == 0); + Assert.assertTrue(input2.capacity() == size); + + final InputBuffer input3 = new InputBuffer(new byte[size]); + Assert.assertEquals(input3.getType(), BufferType.HEAP_BUFFER); + + Assert.assertTrue(input3.position() == 0); + Assert.assertTrue(input3.length() == 0); + Assert.assertTrue(input3.remaining() == 0); + Assert.assertEquals(input3.capacity(), size); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestOutputBuffer.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestOutputBuffer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestOutputBuffer.java new file mode 100644 index 0000000..39c25a6 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestOutputBuffer.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask.buffer; + +import junit.framework.TestCase; + +import org.junit.Assert; + +public class TestOutputBuffer extends TestCase { + public void testOutputBuffer() { + final int size = 100; + final OutputBuffer output1 = new OutputBuffer(BufferType.DIRECT_BUFFER, size); + Assert.assertEquals(output1.getType(), BufferType.DIRECT_BUFFER); + + Assert.assertTrue(output1.length() == 0); + Assert.assertEquals(output1.limit(), size); + + final OutputBuffer output2 = new OutputBuffer(BufferType.HEAP_BUFFER, size); + Assert.assertEquals(output2.getType(), BufferType.HEAP_BUFFER); + + Assert.assertTrue(output2.length() == 0); + Assert.assertEquals(output2.limit(), size); + + final OutputBuffer output3 = new OutputBuffer(new byte[size]); + Assert.assertEquals(output3.getType(), BufferType.HEAP_BUFFER); + + Assert.assertTrue(output3.length() == 0); + Assert.assertEquals(output3.limit(), size); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/handlers/TestCombineHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/handlers/TestCombineHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/handlers/TestCombineHandler.java new file mode 100644 index 0000000..7b337b7 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/handlers/TestCombineHandler.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask.handlers; + +import java.io.IOException; + +import junit.framework.Assert; +import junit.framework.TestCase; + +import org.apache.hadoop.mapred.Task.CombinerRunner; +import org.apache.hadoop.mapred.nativetask.Command; +import org.apache.hadoop.mapred.nativetask.INativeHandler; +import org.apache.hadoop.mapred.nativetask.buffer.BufferType; +import org.apache.hadoop.mapred.nativetask.buffer.InputBuffer; +import org.mockito.Matchers; +import org.mockito.Mockito; + +@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" }) +public class TestCombineHandler extends TestCase { + + private CombinerHandler handler; + private INativeHandler nativeHandler; + private BufferPusher pusher; + private BufferPuller puller; + private CombinerRunner combinerRunner; + + @Override + public void setUp() throws IOException { + + this.nativeHandler = Mockito.mock(INativeHandler.class); + this.pusher = Mockito.mock(BufferPusher.class); + this.puller = Mockito.mock(BufferPuller.class); + this.combinerRunner = Mockito.mock(CombinerRunner.class); + + Mockito.when(nativeHandler.getInputBuffer()).thenReturn(new InputBuffer(BufferType.HEAP_BUFFER, 100)); + } + + public void testCommandDispatcherSetting() throws IOException { + this.handler = new CombinerHandler(nativeHandler, combinerRunner, puller, pusher); + Mockito.verify(nativeHandler, Mockito.times(1)).setCommandDispatcher(Matchers.eq(handler)); + Mockito.verify(nativeHandler, Mockito.times(1)).setDataReceiver(Matchers.eq(puller)); + } + + public void testCombine() throws IOException, InterruptedException, ClassNotFoundException { + this.handler = new CombinerHandler(nativeHandler, combinerRunner, puller, pusher); + Assert.assertEquals(null, handler.onCall(CombinerHandler.COMBINE, null)); + handler.close(); + handler.close(); + + Mockito.verify(combinerRunner, Mockito.times(1)).combine(Matchers.eq(puller), Matchers.eq(pusher)); + + Mockito.verify(pusher, Mockito.times(1)).close(); + Mockito.verify(puller, Mockito.times(1)).close(); + Mockito.verify(nativeHandler, Mockito.times(1)).close(); + } + + public void testOnCall() throws IOException { + this.handler = new CombinerHandler(nativeHandler, combinerRunner, puller, pusher); + Assert.assertEquals(null, handler.onCall(new Command(-1), null)); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/handlers/TestNativeCollectorOnlyHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/handlers/TestNativeCollectorOnlyHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/handlers/TestNativeCollectorOnlyHandler.java new file mode 100644 index 0000000..05d87cf --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/handlers/TestNativeCollectorOnlyHandler.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask.handlers; + +import java.io.IOException; + +import junit.framework.Assert; +import junit.framework.TestCase; + +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.nativetask.Command; +import org.apache.hadoop.mapred.nativetask.ICombineHandler; +import org.apache.hadoop.mapred.nativetask.INativeHandler; +import org.apache.hadoop.mapred.nativetask.TaskContext; +import org.apache.hadoop.mapred.nativetask.buffer.BufferType; +import org.apache.hadoop.mapred.nativetask.buffer.InputBuffer; +import org.apache.hadoop.mapred.nativetask.util.OutputUtil; +import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer; +import org.mockito.Matchers; +import org.mockito.Mockito; + +@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" }) +public class TestNativeCollectorOnlyHandler extends TestCase { + + private NativeCollectorOnlyHandler handler; + private INativeHandler nativeHandler; + private BufferPusher pusher; + private ICombineHandler combiner; + private TaskContext taskContext; + private String localDir = "build/test/mapred/local"; + + @Override + public void setUp() throws IOException { + this.nativeHandler = Mockito.mock(INativeHandler.class); + this.pusher = Mockito.mock(BufferPusher.class); + this.combiner = Mockito.mock(ICombineHandler.class); + JobConf jobConf = new JobConf(); + jobConf.set(OutputUtil.NATIVE_TASK_OUTPUT_MANAGER, + "org.apache.hadoop.mapred.nativetask.util.LocalJobOutputFiles"); + jobConf.set("mapred.local.dir", localDir); + this.taskContext = new TaskContext(jobConf, + BytesWritable.class, BytesWritable.class, + BytesWritable.class, + BytesWritable.class, + null, + null); + + Mockito.when(nativeHandler.getInputBuffer()).thenReturn(new InputBuffer(BufferType.HEAP_BUFFER, 100)); + } + + public void testCollect() throws IOException { + this.handler = new NativeCollectorOnlyHandler(taskContext, nativeHandler, pusher, combiner); + handler.collect(new BytesWritable(), new BytesWritable(), 100); + handler.close(); + handler.close(); + + Mockito.verify(pusher, Mockito.times(1)).collect(Matchers.any(BytesWritable.class), + Matchers.any(BytesWritable.class), Matchers.anyInt()); + + Mockito.verify(pusher, Mockito.times(1)).close(); + Mockito.verify(combiner, Mockito.times(1)).close(); + Mockito.verify(nativeHandler, Mockito.times(1)).close(); + } + + public void testGetCombiner() throws IOException { + this.handler = new NativeCollectorOnlyHandler(taskContext, nativeHandler, pusher, combiner); + Mockito.when(combiner.getId()).thenReturn(100L); + final ReadWriteBuffer result = handler.onCall(NativeCollectorOnlyHandler.GET_COMBINE_HANDLER, null); + Assert.assertEquals(100L, result.readLong()); + } + + public void testOnCall() throws IOException { + this.handler = new NativeCollectorOnlyHandler(taskContext, nativeHandler, pusher, combiner); + boolean thrown = false; + try { + handler.onCall(new Command(-1), null); + } catch(final IOException e) { + thrown = true; + } + Assert.assertTrue("exception thrown", thrown); + + final String expectedOutputPath = localDir + "/output/file.out"; + final String expectedOutputIndexPath = localDir + "/output/file.out.index"; + final String expectedSpillPath = localDir + "/output/spill0.out"; + + final String outputPath = handler.onCall(NativeCollectorOnlyHandler.GET_OUTPUT_PATH, null).readString(); + Assert.assertEquals(expectedOutputPath, outputPath); + + final String outputIndexPath = handler.onCall(NativeCollectorOnlyHandler.GET_OUTPUT_INDEX_PATH, null).readString(); + Assert.assertEquals(expectedOutputIndexPath, outputIndexPath); + + final String spillPath = handler.onCall(NativeCollectorOnlyHandler.GET_SPILL_PATH, null).readString(); + Assert.assertEquals(expectedSpillPath, spillPath); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/serde/TestKVSerializer.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/serde/TestKVSerializer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/serde/TestKVSerializer.java new file mode 100644 index 0000000..004e8b8 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/serde/TestKVSerializer.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask.serde; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +import junit.framework.TestCase; + +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapred.nativetask.Constants; +import org.apache.hadoop.mapred.nativetask.buffer.DataInputStream; +import org.apache.hadoop.mapred.nativetask.buffer.DataOutputStream; +import org.apache.hadoop.mapred.nativetask.testutil.TestInput; +import org.apache.hadoop.mapred.nativetask.testutil.TestInput.KV; +import org.apache.hadoop.mapred.nativetask.util.SizedWritable; +import org.junit.Assert; +import org.junit.Before; +import org.mockito.Matchers; +import org.mockito.Mockito; + +@SuppressWarnings({ "rawtypes", "unchecked" }) +public class TestKVSerializer extends TestCase { + + int inputArraySize = 1000; // 1000 bytesWriable elements + int bufferSize = 100; // bytes + private KV<BytesWritable, BytesWritable>[] inputArray; + + final ByteArrayOutputStream result = new ByteArrayOutputStream(); + private SizedWritable key; + private SizedWritable value; + private KVSerializer serializer; + + @Override + @Before + public void setUp() throws IOException { + this.inputArray = TestInput.getMapInputs(inputArraySize); + this.key = new SizedWritable(BytesWritable.class); + this.value = new SizedWritable(BytesWritable.class); + + this.serializer = new KVSerializer(BytesWritable.class, BytesWritable.class); + + key.reset(inputArray[4].key); + value.reset(inputArray[4].value); + serializer.updateLength(key, value); + } + + public void testUpdateLength() throws IOException { + Mockito.mock(DataOutputStream.class); + + int kvLength = 0; + for (int i = 0; i < inputArraySize; i++) { + key.reset(inputArray[i].key); + value.reset(inputArray[i].value); + serializer.updateLength(key, value); + + // verify whether the size increase + Assert.assertTrue(key.length + value.length > kvLength); + kvLength = key.length + value.length; + } + } + + public void testSerializeKV() throws IOException { + final DataOutputStream dataOut = Mockito.mock(DataOutputStream.class); + + Mockito.when(dataOut.hasUnFlushedData()).thenReturn(true); + Mockito.when(dataOut.shortOfSpace(key.length + value.length + Constants.SIZEOF_KV_LENGTH)).thenReturn(true); + final int written = serializer.serializeKV(dataOut, key, value); + + // flush once, write 4 int, and 2 byte array + Mockito.verify(dataOut, Mockito.times(1)).flush(); + Mockito.verify(dataOut, Mockito.times(4)).writeInt(Matchers.anyInt()); + Mockito.verify(dataOut, Mockito.times(2)).write(Matchers.any(byte[].class), Matchers.anyInt(), Matchers.anyInt()); + + Assert.assertEquals(written, key.length + value.length + Constants.SIZEOF_KV_LENGTH); + } + + public void testSerializeNoFlush() throws IOException { + final DataOutputStream dataOut = Mockito.mock(DataOutputStream.class); + + // suppose there are enough space + Mockito.when(dataOut.hasUnFlushedData()).thenReturn(true); + Mockito.when(dataOut.shortOfSpace(Matchers.anyInt())).thenReturn(false); + final int written = serializer.serializeKV(dataOut, key, value); + + // flush 0, write 4 int, and 2 byte array + Mockito.verify(dataOut, Mockito.times(0)).flush(); + Mockito.verify(dataOut, Mockito.times(4)).writeInt(Matchers.anyInt()); + Mockito.verify(dataOut, Mockito.times(2)).write(Matchers.any(byte[].class), Matchers.anyInt(), Matchers.anyInt()); + + Assert.assertEquals(written, key.length + value.length + Constants.SIZEOF_KV_LENGTH); + } + + public void testSerializePartitionKV() throws IOException { + final DataOutputStream dataOut = Mockito.mock(DataOutputStream.class); + + Mockito.when(dataOut.hasUnFlushedData()).thenReturn(true); + Mockito.when( + dataOut + .shortOfSpace(key.length + value.length + Constants.SIZEOF_KV_LENGTH + Constants.SIZEOF_PARTITION_LENGTH)) + .thenReturn(true); + final int written = serializer.serializePartitionKV(dataOut, 100, key, value); + + // flush once, write 4 int, and 2 byte array + Mockito.verify(dataOut, Mockito.times(1)).flush(); + Mockito.verify(dataOut, Mockito.times(5)).writeInt(Matchers.anyInt()); + Mockito.verify(dataOut, Mockito.times(2)).write(Matchers.any(byte[].class), Matchers.anyInt(), Matchers.anyInt()); + + Assert.assertEquals(written, key.length + value.length + Constants.SIZEOF_KV_LENGTH + + Constants.SIZEOF_PARTITION_LENGTH); + } + + public void testDeserializerNoData() throws IOException { + final DataInputStream in = Mockito.mock(DataInputStream.class); + Mockito.when(in.hasUnReadData()).thenReturn(false); + Assert.assertEquals(0, serializer.deserializeKV(in, key, value)); + } + + public void testDeserializer() throws IOException { + final DataInputStream in = Mockito.mock(DataInputStream.class); + Mockito.when(in.hasUnReadData()).thenReturn(true); + Assert.assertTrue(serializer.deserializeKV(in, key, value) > 0); + + Mockito.verify(in, Mockito.times(4)).readInt(); + Mockito.verify(in, Mockito.times(2)).readFully(Matchers.any(byte[].class), Matchers.anyInt(), Matchers.anyInt()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/serde/TestNativeSerialization.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/serde/TestNativeSerialization.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/serde/TestNativeSerialization.java new file mode 100644 index 0000000..4b67454 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/serde/TestNativeSerialization.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask.serde; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import junit.framework.Assert; +import junit.framework.TestCase; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.nativetask.INativeComparable; + +@SuppressWarnings({ "rawtypes", "deprecation" }) +public class TestNativeSerialization extends TestCase { + public void testRegisterAndGet() throws IOException { + final NativeSerialization serialization = NativeSerialization.getInstance(); + serialization.reset(); + + serialization.register(WritableKey.class.getName(), ComparableKeySerializer.class); + + INativeSerializer serializer = serialization.getSerializer(WritableKey.class); + Assert.assertEquals(ComparableKeySerializer.class.getName(), serializer.getClass().getName()); + + serializer = serialization.getSerializer(WritableValue.class); + Assert.assertEquals(DefaultSerializer.class.getName(), serializer.getClass().getName()); + + boolean ioExceptionThrown = false; + try { + serializer = serialization.getSerializer(NonWritableValue.class); + } catch (final IOException e) { + ioExceptionThrown = true; + } + Assert.assertTrue(ioExceptionThrown); + } + + public static class WritableKey implements Writable { + private int value; + + public WritableKey(int a) { + this.value = a; + } + + public int getLength() { + return 4; + } + + public int getValue() { + return value; + } + + public void setValue(int v) { + this.value = v; + } + + @Override + public void write(DataOutput out) throws IOException { + + } + + @Override + public void readFields(DataInput in) throws IOException { + } + } + + public static class WritableValue implements Writable { + + @Override + public void write(DataOutput out) throws IOException { + } + + @Override + public void readFields(DataInput in) throws IOException { + } + } + + public static class NonWritableValue { + } + + public static class ComparableKeySerializer implements INativeComparable, INativeSerializer<WritableKey> { + + @Override + public int getLength(WritableKey w) throws IOException { + return w.getLength(); + } + + @Override + public void serialize(WritableKey w, DataOutput out) throws IOException { + out.writeInt(w.getValue()); + } + + @Override + public void deserialize(DataInput in, int length, WritableKey w) throws IOException { + w.setValue(in.readInt()); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/TestInput.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/TestInput.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/TestInput.java new file mode 100644 index 0000000..6713a4a --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/TestInput.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask.testutil; + +import java.util.Arrays; + +import org.apache.hadoop.io.BytesWritable; + +@SuppressWarnings({ "rawtypes", "unchecked" }) +public class TestInput { + + public static class KV<K, V> { + public K key; + public V value; + } + + public static char[] CHAR_SET = new char[] { 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', + 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', + 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '0', '1', '2', '3', '4', '5', + '6', '7', '8', '9', '*', '/' }; + + public static KV[] getMapInputs(int size) { + + final KV[] dataInput = new KV[size]; + + for (int i = 0; i < size; i++) { + dataInput[i] = getSingleMapInput(i); + } + return dataInput; + } + + private static KV getSingleMapInput(int i) { + final char character = CHAR_SET[i % CHAR_SET.length]; + final byte b = (byte) character; + + final byte[] bytes = new byte[i]; + Arrays.fill(bytes, b); + final BytesWritable result = new BytesWritable(bytes); + final KV kv = new KV(); + kv.key = result; + kv.value = result; + return kv; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/utils/TestBytesUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/utils/TestBytesUtil.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/utils/TestBytesUtil.java new file mode 100644 index 0000000..8d74d63 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/utils/TestBytesUtil.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask.utils; + +import junit.framework.Assert; +import junit.framework.TestCase; + +import org.apache.hadoop.mapred.nativetask.util.BytesUtil; + +@SuppressWarnings({ "deprecation" }) +public class TestBytesUtil extends TestCase { + + public void testBytesStringConversion() { + + final String str = "I am good!"; + final byte[] bytes = BytesUtil.toBytes(str); + + Assert.assertEquals(str, BytesUtil.fromBytes(bytes)); + } + + public void testBytesIntConversion() { + final int a = 1000; + final byte[] intBytes = BytesUtil.toBytes(a); + + Assert.assertEquals(a, BytesUtil.toInt(intBytes)); + } + + public void testBytesLongConversion() { + final long l = 1000000L; + final byte[] longBytes = BytesUtil.toBytes(l); + + Assert.assertEquals(l, BytesUtil.toLong(longBytes)); + } + + public void testBytesFloatConversion() { + final float f = 3.14f; + final byte[] floatBytes = BytesUtil.toBytes(f); + + Assert.assertEquals(f, BytesUtil.toFloat(floatBytes)); + } + + public void testBytesDoubleConversion() { + final double d = 3.14; + final byte[] doubleBytes = BytesUtil.toBytes(d); + + Assert.assertEquals(d, BytesUtil.toDouble(doubleBytes)); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/utils/TestReadWriteBuffer.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/utils/TestReadWriteBuffer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/utils/TestReadWriteBuffer.java new file mode 100644 index 0000000..6ea8092 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/utils/TestReadWriteBuffer.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask.utils; + +import junit.framework.TestCase; + +import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer; +import org.junit.Assert; + +public class TestReadWriteBuffer extends TestCase { + + private static byte[] bytes = new byte[] { '0', 'a', 'b', 'c', 'd', '9' }; + + public void testReadWriteBuffer() { + + final ReadWriteBuffer buffer = new ReadWriteBuffer(); + + Assert.assertFalse(buffer.getBuff() == null); + + Assert.assertEquals(buffer.getWritePoint(), 0); + Assert.assertEquals(buffer.getReadPoint(), 0); + + buffer.writeInt(3); + + buffer.writeString("goodboy"); + + buffer.writeLong(10L); + buffer.writeBytes(bytes, 0, bytes.length); + buffer.writeLong(100L); + + Assert.assertEquals(buffer.getWritePoint(), 41); + Assert.assertEquals(buffer.getReadPoint(), 0); + Assert.assertTrue(buffer.getBuff().length >= 41); + + Assert.assertEquals(buffer.readInt(), 3); + Assert.assertEquals(buffer.readString(), "goodboy"); + Assert.assertEquals(buffer.readLong(), 10L); + + final byte[] read = buffer.readBytes(); + for (int i = 0; i < bytes.length; i++) { + Assert.assertEquals(bytes[i], read[i]); + } + + Assert.assertEquals(100L, buffer.readLong()); + Assert.assertEquals(41, buffer.getReadPoint()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/utils/TestSizedWritable.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/utils/TestSizedWritable.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/utils/TestSizedWritable.java new file mode 100644 index 0000000..7b82eff --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/utils/TestSizedWritable.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask.utils; + +import junit.framework.TestCase; + +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapred.nativetask.util.SizedWritable; +import org.junit.Assert; + +@SuppressWarnings({ "rawtypes", "unchecked" }) +public class TestSizedWritable extends TestCase { + + public void testSizedWritable() { + final SizedWritable w = new SizedWritable(BytesWritable.class); + Assert.assertTrue(w.length == SizedWritable.INVALID_LENGTH); + Assert.assertFalse(w.v == null); + } +}