Repository: kylin Updated Branches: refs/heads/master a2af43d40 -> 4a29d92e8
KYLIN-2929, speed up dump performance, write dump file to disk in lazy way Signed-off-by: Billy Liu <billy...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4a29d92e Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4a29d92e Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4a29d92e Branch: refs/heads/master Commit: 4a29d92e8cb5e92dc97ac18181417893c0d79d9b Parents: a2af43d Author: å¯å® <hzfen...@corp.netease.com> Authored: Mon Jan 29 14:37:39 2018 +0800 Committer: Billy Liu <billy...@apache.org> Committed: Sun Feb 4 22:31:31 2018 +0800 ---------------------------------------------------------------------- .../kylin/gridtable/GTAggregateScanner.java | 70 ++++++++++++++++---- 1 file changed, 57 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/4a29d92e/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java index 9fba336..6dc5de3 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java @@ -18,6 +18,8 @@ package org.apache.kylin.gridtable; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.Closeable; import java.io.DataInputStream; import java.io.DataOutputStream; @@ -25,6 +27,8 @@ import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Comparator; @@ -64,6 +68,7 @@ import com.google.common.collect.Maps; public class GTAggregateScanner implements IGTScanner, IGTBypassChecker { private static final Logger logger = LoggerFactory.getLogger(GTAggregateScanner.class); + private static final int MAX_BUFFER_SIZE = 64 * 1024 * 1024; final GTInfo info; final ImmutableBitSet dimensions; // dimensions to return, can be more than group by @@ -73,7 +78,7 @@ public class GTAggregateScanner implements IGTScanner, IGTBypassChecker { final IGTScanner inputScanner; final BufferedMeasureCodec measureCodec; final AggregationCache aggrCache; - final long spillThreshold; // 0 means no memory control && no spill + long spillThreshold; // 0 means no memory control && no spill final int storagePushDownLimit;//default to be Int.MAX final StorageLimitLevel storageLimitLevel; final boolean spillEnabled; @@ -281,6 +286,7 @@ public class GTAggregateScanner implements IGTScanner, IGTBypassChecker { final int keyLength; final boolean[] compareMask; boolean compareAll = true; + long sumSpilledSize = 0; ByPassChecker byPassChecker = null; final Comparator<byte[]> bytesComparator = new Comparator<byte[]>() { @@ -425,6 +431,18 @@ public class GTAggregateScanner implements IGTScanner, IGTBypassChecker { Dump dump = new Dump(aggBufMap, estMemSize); dump.flush(); dumps.add(dump); + sumSpilledSize += dump.size(); + // when spilled data is too much, we can modify it by other strategy. + // this means, all spilled data is bigger than half of original spillThreshold. + if(sumSpilledSize > spillThreshold) { + for(Dump current : dumps) { + current.spill(); + } + spillThreshold += sumSpilledSize; + sumSpilledSize = 0; + } else { + spillThreshold -= dump.size(); + } } catch (Exception e) { throw new RuntimeException("AggregationCache failed to spill", e); } @@ -643,7 +661,7 @@ public class GTAggregateScanner implements IGTScanner, IGTBypassChecker { final File dumpedFile; SortedMap<byte[], MeasureAggregator[]> buffMap; final long estMemSize; - + byte[] spillBuffer; DataInputStream dis; public Dump(SortedMap<byte[], MeasureAggregator[]> buffMap, long estMemSize) throws IOException { @@ -660,7 +678,11 @@ public class GTAggregateScanner implements IGTScanner, IGTBypassChecker { + (dumpedFile == null ? "<null>" : dumpedFile.getAbsolutePath())); } - dis = new DataInputStream(new FileInputStream(dumpedFile)); + if(spillBuffer == null) { + dis = new DataInputStream(new FileInputStream(dumpedFile)); + } else { + dis = new DataInputStream(new ByteArrayInputStream(spillBuffer)); + } final int count = dis.readInt(); return new Iterator<Pair<byte[], byte[]>>() { int cursorIdx = 0; @@ -697,39 +719,61 @@ public class GTAggregateScanner implements IGTScanner, IGTBypassChecker { } } + public void spill() throws IOException { + if(spillBuffer == null) return; + OutputStream ops = new FileOutputStream(dumpedFile); + InputStream ips = new ByteArrayInputStream(spillBuffer); + IOUtils.copy(ips, ops); + spillBuffer = null; + IOUtils.closeQuietly(ips); + IOUtils.closeQuietly(ops); + + logger.info("Spill buffer to disk, location: {}, size = {}.", dumpedFile.getAbsolutePath(), + dumpedFile.length()); + } + + public int size() { + return spillBuffer == null ? 0 : spillBuffer.length; + } + public void flush() throws IOException { logger.info("AggregationCache(size={} est_mem_size={} threshold={}) will spill to {}", buffMap.size(), estMemSize, spillThreshold, dumpedFile.getAbsolutePath()); - + ByteArrayOutputStream baos = new ByteArrayOutputStream(MAX_BUFFER_SIZE); if (buffMap != null) { - DataOutputStream dos = null; + DataOutputStream bos = new DataOutputStream(baos); Object[] aggrResult = null; try { - dos = new DataOutputStream(new FileOutputStream(dumpedFile)); - dos.writeInt(buffMap.size()); + bos.writeInt(buffMap.size()); + for (Entry<byte[], MeasureAggregator[]> entry : buffMap.entrySet()) { MeasureAggregators aggs = new MeasureAggregators(entry.getValue()); aggrResult = new Object[metrics.trueBitCount()]; aggs.collectStates(aggrResult); ByteBuffer metricsBuf = measureCodec.encode(aggrResult); - dos.writeInt(entry.getKey().length); - dos.write(entry.getKey()); - dos.writeInt(metricsBuf.position()); - dos.write(metricsBuf.array(), 0, metricsBuf.position()); + + bos.writeInt(entry.getKey().length); + bos.write(entry.getKey()); + bos.writeInt(metricsBuf.position()); + bos.write(metricsBuf.array(), 0, metricsBuf.position()); } } finally { buffMap = null; - IOUtils.closeQuietly(dos); + IOUtils.closeQuietly(bos); } } + spillBuffer = baos.toByteArray(); + IOUtils.closeQuietly(baos); + logger.info("Accurately spill data size = {}", spillBuffer.length); } public void terminate() throws IOException { buffMap = null; if (dis != null) - dis.close(); + IOUtils.closeQuietly(dis); if (dumpedFile != null && dumpedFile.exists()) dumpedFile.delete(); + spillBuffer = null; } }