IGNITE-4405: Hadoop: implemented "readLine" method for HadoopDataInStream and HadoopDirectDataInput classes. This closes #1358.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a61b0eaf Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a61b0eaf Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a61b0eaf Branch: refs/heads/ignite-3477 Commit: a61b0eaff1817d84c0659e8a7e095f29e22800e1 Parents: 7d82d6a Author: tledkov-gridgain <tled...@gridgain.com> Authored: Wed Dec 28 14:09:38 2016 +0300 Committer: devozerov <voze...@gridgain.com> Committed: Wed Dec 28 14:09:38 2016 +0300 ---------------------------------------------------------------------- .../shuffle/direct/HadoopDirectDataInput.java | 34 +++- .../shuffle/streams/HadoopDataInStream.java | 34 +++- .../shuffle/streams/HadoopOffheapBuffer.java | 18 ++ .../streams/HadoopDataStreamSelfTest.java | 177 +++++++++++++++++-- 4 files changed, 244 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a61b0eaf/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java index e3a713a..6f0e2b0 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.hadoop.shuffle.direct; import org.apache.ignite.internal.util.GridUnsafe; +import org.apache.ignite.internal.util.typedef.internal.SB; import org.jetbrains.annotations.NotNull; import java.io.DataInput; @@ -150,8 +151,37 @@ public class HadoopDirectDataInput extends InputStream implements DataInput { /** {@inheritDoc} */ @Override public String readLine() throws IOException { - // TODO: Create ticket! - throw new UnsupportedOperationException(); + if (pos == buf.length) + return null; + + SB sb = new SB(); + + while (pos < buf.length) { + char c = (char)readByte(); + + switch (c) { + case '\n': + return sb.toString(); + + case '\r': + if (pos == buf.length) + return sb.toString(); + + c = (char)readByte(); + + if (c == '\n') + return sb.toString(); + else + pos--; + + return sb.toString(); + + default: + sb.a(c); + } + } + + return sb.toString(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/a61b0eaf/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataInStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataInStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataInStream.java index 3b5fa15..261daee 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataInStream.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataInStream.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; +import org.apache.ignite.internal.util.typedef.internal.SB; /** * Data input stream. @@ -52,6 +53,7 @@ public class HadoopDataInStream extends InputStream implements DataInput { /** * @param size Size. * @return Old pointer. + * @throws IOException On error. */ protected long move(long size) throws IOException { long ptr = buf.move(size); @@ -156,7 +158,37 @@ public class HadoopDataInStream extends InputStream implements DataInput { /** {@inheritDoc} */ @Override public String readLine() throws IOException { - throw new UnsupportedOperationException(); + if (buf.remaining() == 0) + return null; + + SB sb = new SB(); + + while (buf.remaining() > 0) { + char c = (char)readByte(); + + switch (c) { + case '\n': + return sb.toString(); + + case '\r': + if (buf.remaining() == 0) + return sb.toString(); + + c = (char)readByte(); + + if (c == '\n') + return sb.toString(); + else + buf.moveBackward(1); + + return sb.toString(); + + default: + sb.a(c); + } + } + + return sb.toString(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/a61b0eaf/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java index acc9be6..d15e7eb 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java @@ -106,6 +106,24 @@ public class HadoopOffheapBuffer { } /** + * @param size Size move on. + * @return Old position pointer or {@code 0} if move goes beyond the begin of the buffer. + */ + public long moveBackward(long size) { + assert size > 0 : size; + + long oldPos = posPtr; + long newPos = oldPos - size; + + if (newPos < bufPtr) + return 0; + + posPtr = newPos; + + return oldPos; + } + + /** * @param ptr Pointer. * @return {@code true} If the given pointer is inside of this buffer. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/a61b0eaf/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopDataStreamSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopDataStreamSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopDataStreamSelfTest.java index 612e892..c7d4dce 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopDataStreamSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopDataStreamSelfTest.java @@ -17,30 +17,173 @@ package org.apache.ignite.internal.processors.hadoop.impl.shuffle.streams; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; +import org.apache.ignite.internal.processors.hadoop.shuffle.direct.HadoopDirectDataInput; +import org.apache.ignite.internal.processors.hadoop.shuffle.direct.HadoopDirectDataOutput; import org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopDataInStream; import org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopDataOutStream; import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.NotNull; /** * */ public class HadoopDataStreamSelfTest extends GridCommonAbstractTest { + private static final int BUFF_SIZE = 4 * 1024; + /** + * @throws IOException If failed. + */ public void testStreams() throws IOException { GridUnsafeMemory mem = new GridUnsafeMemory(0); HadoopDataOutStream out = new HadoopDataOutStream(mem); - int size = 4 * 1024; + final long ptr = mem.allocate(BUFF_SIZE); - final long ptr = mem.allocate(size); + out.buffer().set(ptr, BUFF_SIZE); - out.buffer().set(ptr, size); + write(out); + HadoopDataInStream in = new HadoopDataInStream(mem); + + in.buffer().set(ptr, out.buffer().pointer() - ptr); + + checkRead(in); + } + + /** + * @throws IOException If failed. + */ + public void testDirectStreams() throws IOException { + HadoopDirectDataOutput out = new HadoopDirectDataOutput(BUFF_SIZE); + + write(out); + + byte [] inBuf = Arrays.copyOf(out.buffer(), out.position()); + + HadoopDirectDataInput in = new HadoopDirectDataInput(inBuf); + + checkRead(in); + } + + /** + * @throws IOException If failed. + */ + public void testReadline() throws IOException { + checkReadLine("String1\rString2\r\nString3\nString4"); + checkReadLine("String1\rString2\r\nString3\nString4\r\n"); + checkReadLine("String1\rString2\r\nString3\nString4\r"); + checkReadLine("\nA\rB\r\nC\nD\n"); + checkReadLine("\rA\rB\r\nC\nD\n"); + checkReadLine("\r\nA\rB\r\nC\nD\n"); + checkReadLine("\r\r\nA\r\r\nC\nD\n"); + checkReadLine("\r\r\r\n\n\n"); + checkReadLine("\r\n"); + checkReadLine("\r"); + checkReadLine("\n"); + } + + /** + * @param val String value. + * @throws IOException On error. + */ + private void checkReadLine(String val) throws IOException { + List<String> expected = readLineByDataInputStream(val); + List<String> dataInp = readLineByHadoopDataInStream(val); + List<String> directDataInp = readLineByHadoopDirectDataInput(val); + + assertEquals(expected, dataInp); + assertEquals(expected, directDataInp); + } + + /** + * @param val String value. + * @return List of strings are returned by readLine(). + * @throws IOException On error. + */ + List<String> readLineByDataInputStream(String val) throws IOException { + ByteArrayOutputStream byteArrayOs = new ByteArrayOutputStream(); + + byteArrayOs.write(val.getBytes()); + + byteArrayOs.close(); + + try (DataInputStream in = new DataInputStream(new ByteArrayInputStream(byteArrayOs.toByteArray()))) { + return readLineStrings(in); + } + } + + /** + * @param val String value. + * @return List of strings are returned by readLine(). + * @throws IOException On error. + */ + List<String> readLineByHadoopDataInStream(String val) throws IOException { + GridUnsafeMemory mem = new GridUnsafeMemory(0); + + HadoopDataOutStream out = new HadoopDataOutStream(mem); + + final long ptr = mem.allocate(BUFF_SIZE); + + out.buffer().set(ptr, BUFF_SIZE); + + out.write(val.getBytes()); + + HadoopDataInStream in = new HadoopDataInStream(mem); + + in.buffer().set(ptr, out.buffer().pointer() - ptr); + + return readLineStrings(in); + } + + /** + * @param val String value. + * @return List of strings are returned by readLine(). + * @throws IOException On error. + */ + List<String> readLineByHadoopDirectDataInput(String val) throws IOException { + + HadoopDirectDataOutput out = new HadoopDirectDataOutput(BUFF_SIZE); + + out.write(val.getBytes()); + + byte [] inBuf = Arrays.copyOf(out.buffer(), out.position()); + + HadoopDirectDataInput in = new HadoopDirectDataInput(inBuf); + + return readLineStrings(in); + } + + /** + * @param in Data input. + * @return List of strings are returned by readLine(). + * @throws IOException On error. + */ + @NotNull private List<String> readLineStrings(DataInput in) throws IOException { + List<String> strs = new ArrayList<>(); + + for (String str = in.readLine(); str != null; str = in.readLine()) + strs.add(str); + + return strs; + } + + /** + * @param out Data output. + * @throws IOException On error. + */ + private void write(DataOutput out) throws IOException { out.writeBoolean(false); out.writeBoolean(true); out.writeBoolean(false); @@ -84,20 +227,22 @@ public class HadoopDataStreamSelfTest extends GridCommonAbstractTest { out.writeLong(Long.MIN_VALUE); out.writeLong(0); out.writeLong(-1L); - out.write(new byte[]{1,2,3}); - out.write(new byte[]{0,1,2,3}, 1, 2); + out.write(new byte[] {1, 2, 3}); + out.write(new byte[] {0, 1, 2, 3}, 1, 2); out.writeUTF("mom washes rum"); + } - HadoopDataInStream in = new HadoopDataInStream(mem); - - in.buffer().set(ptr, out.buffer().pointer()); - + /** + * @param in Data input. + * @throws IOException On error. + */ + private void checkRead(DataInput in) throws IOException { assertEquals(false, in.readBoolean()); assertEquals(true, in.readBoolean()); assertEquals(false, in.readBoolean()); - assertEquals(17, in.read()); - assertEquals(121, in.read()); - assertEquals(0xfa, in.read()); + assertEquals(17, in.readUnsignedByte()); + assertEquals(121, in.readUnsignedByte()); + assertEquals(0xfa, in.readUnsignedByte()); assertEquals(17, in.readByte()); assertEquals(121, in.readByte()); assertEquals((byte)0xfa, in.readByte()); @@ -138,15 +283,15 @@ public class HadoopDataStreamSelfTest extends GridCommonAbstractTest { byte[] b = new byte[3]; - in.read(b); + in.readFully(b); - assertTrue(Arrays.equals(new byte[]{1,2,3}, b)); + assertTrue(Arrays.equals(new byte[] {1, 2, 3}, b)); b = new byte[4]; - in.read(b, 1, 2); + in.readFully(b, 1, 2); - assertTrue(Arrays.equals(new byte[]{0, 1, 2, 0}, b)); + assertTrue(Arrays.equals(new byte[] {0, 1, 2, 0}, b)); assertEquals("mom washes rum", in.readUTF()); }