IGNITE-4516: Hadoop: implemented optional compression for remote shuffle output. This closes #1399.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d62542b9 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d62542b9 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d62542b9 Branch: refs/heads/ignite-3477 Commit: d62542b9bbfff5a221915f2bd1d23ecfee9814cf Parents: 2774d87 Author: devozerov <voze...@gridgain.com> Authored: Thu Jan 5 11:30:42 2017 +0300 Committer: devozerov <voze...@gridgain.com> Committed: Thu Jan 5 11:30:42 2017 +0300 ---------------------------------------------------------------------- .../processors/hadoop/HadoopJobProperty.java | 7 +++ .../shuffle/HadoopDirectShuffleMessage.java | 34 +++++++++++- .../hadoop/shuffle/HadoopShuffleJob.java | 57 +++++++++++++++++--- .../shuffle/direct/HadoopDirectDataOutput.java | 14 +++++ .../direct/HadoopDirectDataOutputContext.java | 48 +++++++++++++++-- .../direct/HadoopDirectDataOutputState.java | 14 ++++- .../hadoop/impl/HadoopTeraSortTest.java | 32 +++++++++-- 7 files changed, 188 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d62542b9/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java index 4398acd..4dd3bf5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java @@ -103,6 +103,13 @@ public enum HadoopJobProperty { SHUFFLE_MSG_SIZE("ignite.shuffle.message.size"), /** + * Whether shuffle message should be compressed with GZIP. + * <p> + * Defaults to {@code false}. + */ + SHUFFLE_MSG_GZIP("ignite.shuffle.message.gzip"), + + /** * Whether to stripe mapper output for remote reducers. * <p> * Defaults to {@code false}. http://git-wip-us.apache.org/repos/asf/ignite/blob/d62542b9/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopDirectShuffleMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopDirectShuffleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopDirectShuffleMessage.java index e81dc5f..f596100 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopDirectShuffleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopDirectShuffleMessage.java @@ -57,6 +57,9 @@ public class HadoopDirectShuffleMessage implements Message, HadoopMessage { @GridDirectTransient private transient int bufLen; + /** Data length. */ + private int dataLen; + /** * Default constructor. */ @@ -72,8 +75,9 @@ public class HadoopDirectShuffleMessage implements Message, HadoopMessage { * @param cnt Count. * @param buf Buffer. * @param bufLen Buffer length. + * @param dataLen Data length. */ - public HadoopDirectShuffleMessage(HadoopJobId jobId, int reducer, int cnt, byte[] buf, int bufLen) { + public HadoopDirectShuffleMessage(HadoopJobId jobId, int reducer, int cnt, byte[] buf, int bufLen, int dataLen) { assert jobId != null; this.jobId = jobId; @@ -81,6 +85,7 @@ public class HadoopDirectShuffleMessage implements Message, HadoopMessage { this.cnt = cnt; this.buf = buf; this.bufLen = bufLen; + this.dataLen = dataLen; } /** @@ -111,6 +116,13 @@ public class HadoopDirectShuffleMessage implements Message, HadoopMessage { return buf; } + /** + * @return Data length. + */ + public int dataLength() { + return dataLen; + } + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); @@ -147,6 +159,12 @@ public class HadoopDirectShuffleMessage implements Message, HadoopMessage { writer.incrementState(); + case 4: + if (!writer.writeInt("dataLen", dataLen)) + return false; + + writer.incrementState(); + } return true; @@ -194,6 +212,14 @@ public class HadoopDirectShuffleMessage implements Message, HadoopMessage { reader.incrementState(); + case 4: + dataLen = reader.readInt("dataLen"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } return reader.afterMessageRead(HadoopDirectShuffleMessage.class); @@ -206,7 +232,7 @@ public class HadoopDirectShuffleMessage implements Message, HadoopMessage { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 4; + return 5; } /** {@inheritDoc} */ @@ -222,6 +248,8 @@ public class HadoopDirectShuffleMessage implements Message, HadoopMessage { out.writeInt(cnt); U.writeByteArray(out, buf); + + out.writeInt(dataLen); } /** {@inheritDoc} */ @@ -234,6 +262,8 @@ public class HadoopDirectShuffleMessage implements Message, HadoopMessage { buf = U.readByteArray(in); bufLen = buf != null ? buf.length : 0; + + dataLen = in.readInt(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/d62542b9/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java index 1c546a1..0394865 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java @@ -56,6 +56,8 @@ import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; +import java.io.ByteArrayInputStream; +import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -63,10 +65,12 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.zip.GZIPInputStream; import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.PARTITION_HASHMAP_SIZE; import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_JOB_THROTTLE; import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_MAPPER_STRIPED_OUTPUT; +import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_MSG_GZIP; import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_MSG_SIZE; import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_REDUCER_NO_SORTING; import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.get; @@ -79,6 +83,9 @@ public class HadoopShuffleJob<T> implements AutoCloseable { private static final int DFLT_SHUFFLE_MSG_SIZE = 1024 * 1024; /** */ + private static final boolean DFLT_SHUFFLE_MSG_GZIP = false; + + /** */ private final HadoopJob job; /** */ @@ -130,6 +137,9 @@ public class HadoopShuffleJob<T> implements AutoCloseable { /** Message size. */ private final int msgSize; + /** Whether to GZIP shuffle messages. */ + private final boolean msgGzip; + /** Whether to strip mappers for remote execution. */ private final boolean stripeMappers; @@ -190,6 +200,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable { stripeMappers = stripeMappers0; msgSize = get(job.info(), SHUFFLE_MSG_SIZE, DFLT_SHUFFLE_MSG_SIZE); + msgGzip = get(job.info(), SHUFFLE_MSG_GZIP, DFLT_SHUFFLE_MSG_GZIP); locReducersCtx = new AtomicReferenceArray<>(totalReducerCnt); @@ -360,22 +371,26 @@ public class HadoopShuffleJob<T> implements AutoCloseable { * @throws IgniteCheckedException Exception. */ public void onDirectShuffleMessage(T src, HadoopDirectShuffleMessage msg) throws IgniteCheckedException { - assert msg.buffer() != null; + byte[] buf = extractBuffer(msg); - HadoopTaskContext taskCtx = locReducersCtx.get(msg.reducer()).get(); + assert buf != null; + + int rdc = msg.reducer(); + + HadoopTaskContext taskCtx = locReducersCtx.get(rdc).get(); HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(taskCtx.counters(), null); - perfCntr.onShuffleMessage(msg.reducer(), U.currentTimeMillis()); + perfCntr.onShuffleMessage(rdc, U.currentTimeMillis()); - HadoopMultimap map = getOrCreateMap(locMaps, msg.reducer()); + HadoopMultimap map = getOrCreateMap(locMaps, rdc); HadoopSerialization keySer = taskCtx.keySerialization(); HadoopSerialization valSer = taskCtx.valueSerialization(); // Add data from message to the map. try (HadoopMultimap.Adder adder = map.startAdding(taskCtx)) { - HadoopDirectDataInput in = new HadoopDirectDataInput(msg.buffer()); + HadoopDirectDataInput in = new HadoopDirectDataInput(buf); Object key = null; Object val = null; @@ -393,6 +408,31 @@ public class HadoopShuffleJob<T> implements AutoCloseable { } /** + * Extract buffer from direct shuffle message. + * + * @param msg Message. + * @return Buffer. + */ + private byte[] extractBuffer(HadoopDirectShuffleMessage msg) throws IgniteCheckedException { + if (msgGzip) { + byte[] res = new byte[msg.dataLength()]; + + try (GZIPInputStream in = new GZIPInputStream(new ByteArrayInputStream(msg.buffer()), res.length)) { + int len = in.read(res, 0, res.length); + + assert len == res.length; + } + catch (IOException e) { + throw new IgniteCheckedException("Failed to uncompress direct shuffle message.", e); + } + + return res; + } + else + return msg.buffer(); + } + + /** * @param ack Shuffle ack. */ @SuppressWarnings("ConstantConditions") @@ -595,7 +635,8 @@ public class HadoopShuffleJob<T> implements AutoCloseable { * @param rmtDirectCtx Remote direct context. * @param reset Whether to perform reset. */ - private void sendShuffleMessage(int rmtMapIdx, @Nullable HadoopDirectDataOutputContext rmtDirectCtx, boolean reset) { + private void sendShuffleMessage(int rmtMapIdx, @Nullable HadoopDirectDataOutputContext rmtDirectCtx, + boolean reset) { if (rmtDirectCtx == null) return; @@ -612,7 +653,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable { rmtDirectCtx.reset(); HadoopDirectShuffleMessage msg = new HadoopDirectShuffleMessage(job.id(), rmtRdcIdx, cnt, - state.buffer(), state.bufferLength()); + state.buffer(), state.bufferLength(), state.dataLength()); T nodeId = reduceAddrs[rmtRdcIdx]; @@ -983,7 +1024,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable { HadoopDirectDataOutputContext rmtDirectCtx = rmtDirectCtxs[idx]; if (rmtDirectCtx == null) { - rmtDirectCtx = new HadoopDirectDataOutputContext(msgSize, taskCtx); + rmtDirectCtx = new HadoopDirectDataOutputContext(msgSize, msgGzip, taskCtx); rmtDirectCtxs[idx] = rmtDirectCtx; } http://git-wip-us.apache.org/repos/asf/ignite/blob/d62542b9/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutput.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutput.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutput.java index 151e552..5840994 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutput.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutput.java @@ -170,6 +170,13 @@ public class HadoopDirectDataOutput extends OutputStream implements DataOutput { } /** + * @return Buffer length (how much memory is allocated). + */ + public int bufferLength() { + return bufSize; + } + + /** * @return Position. */ public int position() { @@ -184,6 +191,13 @@ public class HadoopDirectDataOutput extends OutputStream implements DataOutput { } /** + * Reset the stream. + */ + public void reset() { + pos = 0; + } + + /** * Ensure that the given amount of bytes is available within the stream, then shift the position. * * @param cnt Count. http://git-wip-us.apache.org/repos/asf/ignite/blob/d62542b9/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputContext.java index bc70ef3..454278b 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputContext.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputContext.java @@ -18,16 +18,29 @@ package org.apache.ignite.internal.processors.hadoop.shuffle.direct; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.internal.processors.hadoop.HadoopSerialization; import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; +import java.io.IOException; +import java.util.zip.GZIPOutputStream; + /** * Hadoop data output context for direct communication. */ public class HadoopDirectDataOutputContext { + /** Initial allocation size for GZIP output. We start with very low value, but then it will grow if needed. */ + private static final int GZIP_OUT_MIN_ALLOC_SIZE = 1024; + + /** GZIP buffer size. We should remove it when we implement efficient direct GZIP output. */ + private static final int GZIP_BUFFER_SIZE = 8096; + /** Flush size. */ private final int flushSize; + /** Whether to perform GZIP. */ + private final boolean gzip; + /** Key serialization. */ private final HadoopSerialization keySer; @@ -37,6 +50,9 @@ public class HadoopDirectDataOutputContext { /** Data output. */ private HadoopDirectDataOutput out; + /** Data output for GZIP. */ + private HadoopDirectDataOutput gzipOut; + /** Number of keys written. */ private int cnt; @@ -44,17 +60,22 @@ public class HadoopDirectDataOutputContext { * Constructor. * * @param flushSize Flush size. + * @param gzip Whether to perform GZIP. * @param taskCtx Task context. * @throws IgniteCheckedException If failed. */ - public HadoopDirectDataOutputContext(int flushSize, HadoopTaskContext taskCtx) + public HadoopDirectDataOutputContext(int flushSize, boolean gzip, HadoopTaskContext taskCtx) throws IgniteCheckedException { this.flushSize = flushSize; + this.gzip = gzip; keySer = taskCtx.keySerialization(); valSer = taskCtx.valueSerialization(); out = new HadoopDirectDataOutput(flushSize); + + if (gzip) + gzipOut = new HadoopDirectDataOutput(Math.max(flushSize / 8, GZIP_OUT_MIN_ALLOC_SIZE)); } /** @@ -85,16 +106,35 @@ public class HadoopDirectDataOutputContext { * @return State. */ public HadoopDirectDataOutputState state() { - return new HadoopDirectDataOutputState(out.buffer(), out.position()); + if (gzip) { + try { + try (GZIPOutputStream gzip = new GZIPOutputStream(gzipOut, GZIP_BUFFER_SIZE)) { + gzip.write(out.buffer(), 0, out.position()); + } + + return new HadoopDirectDataOutputState(gzipOut.buffer(), gzipOut.position(), out.position()); + } + catch (IOException e) { + throw new IgniteException("Failed to compress.", e); + } + } + else + return new HadoopDirectDataOutputState(out.buffer(), out.position(), out.position()); } /** * Reset buffer. */ public void reset() { - int allocSize = Math.max(flushSize, out.position()); + if (gzip) { + // In GZIP mode we do not expose normal output to the outside. Hence, no need for reallocation, just reset. + out.reset(); + + gzipOut = new HadoopDirectDataOutput(gzipOut.bufferLength()); + } + else + out = new HadoopDirectDataOutput(flushSize, out.bufferLength()); - out = new HadoopDirectDataOutput(flushSize, allocSize); cnt = 0; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d62542b9/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputState.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputState.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputState.java index a9c12e3..cadde7a 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputState.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputState.java @@ -27,15 +27,20 @@ public class HadoopDirectDataOutputState { /** Buffer length. */ private final int bufLen; + /** Data length. */ + private final int dataLen; + /** * Constructor. * * @param buf Buffer. * @param bufLen Buffer length. + * @param dataLen Data length. */ - public HadoopDirectDataOutputState(byte[] buf, int bufLen) { + public HadoopDirectDataOutputState(byte[] buf, int bufLen, int dataLen) { this.buf = buf; this.bufLen = bufLen; + this.dataLen = dataLen; } /** @@ -51,4 +56,11 @@ public class HadoopDirectDataOutputState { public int bufferLength() { return bufLen; } + + /** + * @return Original data length. + */ + public int dataLength() { + return dataLen; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d62542b9/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java index b1fa91f..d237180 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java @@ -137,8 +137,10 @@ public class HadoopTeraSortTest extends HadoopAbstractSelfTest { /** * Does actual test TeraSort job Through Ignite API + * + * @param gzip Whether to use GZIP. */ - protected final void teraSort() throws Exception { + protected final void teraSort(boolean gzip) throws Exception { System.out.println("TeraSort ==============================================================="); getFileSystem().delete(new Path(sortOutDir), true); @@ -164,6 +166,10 @@ public class HadoopTeraSortTest extends HadoopAbstractSelfTest { jobConf.set("mapred.max.split.size", String.valueOf(splitSize)); jobConf.setBoolean(HadoopJobProperty.SHUFFLE_MAPPER_STRIPED_OUTPUT.propertyName(), true); + jobConf.setInt(HadoopJobProperty.SHUFFLE_MSG_SIZE.propertyName(), 4096); + + if (gzip) + jobConf.setBoolean(HadoopJobProperty.SHUFFLE_MSG_GZIP.propertyName(), true); jobConf.set(HadoopJobProperty.JOB_PARTIALLY_RAW_COMPARATOR.propertyName(), TextPartiallyRawComparator.class.getName()); @@ -347,12 +353,32 @@ public class HadoopTeraSortTest extends HadoopAbstractSelfTest { /** * Runs generate/sort/validate phases of the terasort sample. - * @throws Exception + * + * @throws Exception If failed. */ public void testTeraSort() throws Exception { + checkTeraSort(false); + } + + /** + * Runs generate/sort/validate phases of the terasort sample. + * + * @throws Exception If failed. + */ + public void testTeraSortGzip() throws Exception { + checkTeraSort(true); + } + + /** + * Check terasort. + * + * @param gzip GZIP flag. + * @throws Exception If failed. + */ + private void checkTeraSort(boolean gzip) throws Exception { teraGenerate(); - teraSort(); + teraSort(gzip); teraValidate(); }