Repository: drill Updated Branches: refs/heads/master 5c57b50f2 -> 073ea6819
http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java index e249c19..d2ff805 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java @@ -33,6 +33,7 @@ import org.apache.drill.exec.ops.OperExecContext; import org.apache.drill.exec.physical.config.Sort; import org.apache.drill.exec.physical.impl.spill.SpillSet; import org.apache.drill.exec.physical.impl.xsort.managed.SortImpl.SortResults; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.record.BatchSchema; @@ -90,8 +91,15 @@ public class TestSortImpl extends DrillTest { .setQueryId(queryId) .build(); SortConfig sortConfig = new SortConfig(opContext.getConfig()); + DrillbitEndpoint ep = DrillbitEndpoint.newBuilder() + .setAddress("foo.bar.com") + .setUserPort(1234) + .setControlPort(1235) + .setDataPort(1236) + .setVersion("1.11") + .build(); SpillSet spillSet = new SpillSet(opContext.getConfig(), handle, - popConfig); + popConfig, ep); PriorityQueueCopierWrapper copierHolder = new PriorityQueueCopierWrapper(opContext); SpilledRuns spilledRuns = new SpilledRuns(opContext, spillSet, copierHolder); return new SortImpl(opContext, sortConfig, spilledRuns, outputBatch); http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java index 4200073..f525020 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -119,6 +119,7 @@ public class TestDrillbitResilience extends DrillTest { } try { + @SuppressWarnings("resource") final Drillbit drillbit = Drillbit.start(zkHelper.getConfig(), remoteServiceSet); drillbits.put(name, drillbit); } catch (final DrillbitStartupException e) { @@ -132,6 +133,7 @@ public class TestDrillbitResilience extends DrillTest { * @param name name of the drillbit */ private static void stopDrillbit(final String name) { + @SuppressWarnings("resource") final Drillbit drillbit = drillbits.get(name); if (drillbit == null) { throw new IllegalStateException("No Drillbit named \"" + name + "\" found"); @@ -168,8 +170,8 @@ public class TestDrillbitResilience extends DrillTest { * @param name name of the drillbit * @return endpoint of the drillbit */ + @SuppressWarnings("resource") private static DrillbitEndpoint getEndpoint(final String name) { - @SuppressWarnings("resource") final Drillbit drillbit = drillbits.get(name); if (drillbit == null) { throw new IllegalStateException("No Drillbit named \"" + name + "\" found."); @@ -508,9 +510,11 @@ public class TestDrillbitResilience extends DrillTest { } /** - * Given the result of {@link WaitUntilCompleteListener#waitForCompletion}, this method fails if the completed state - * is not as expected, or if an exception is thrown. The completed state could be COMPLETED or CANCELED. This state - * is set when {@link WaitUntilCompleteListener#queryCompleted} is called. + * Given the result of {@link WaitUntilCompleteListener#waitForCompletion}, + * this method fails if the completed state is not as expected, or if an + * exception is thrown. The completed state could be COMPLETED or CANCELED. + * This state is set when {@link WaitUntilCompleteListener#queryCompleted} is + * called. */ private static void assertStateCompleted(final Pair<QueryState, Exception> result, final QueryState expectedState) { final QueryState actualState = result.getFirst(); @@ -758,8 +762,8 @@ public class TestDrillbitResilience extends DrillTest { } /** - * Test cancelling query interrupts currently blocked FragmentExecutor threads waiting for some event to happen. - * Specifically tests cancelling fragment which has {@link MergingRecordBatch} blocked waiting for data. + * Test canceling query interrupts currently blocked FragmentExecutor threads waiting for some event to happen. + * Specifically tests canceling fragment which has {@link MergingRecordBatch} blocked waiting for data. */ @Test @Repeat(count = NUM_RUNS) @@ -776,8 +780,8 @@ public class TestDrillbitResilience extends DrillTest { } /** - * Test cancelling query interrupts currently blocked FragmentExecutor threads waiting for some event to happen. - * Specifically tests cancelling fragment which has {@link UnorderedReceiverBatch} blocked waiting for data. + * Test canceling query interrupts currently blocked FragmentExecutor threads waiting for some event to happen. + * Specifically tests canceling fragment which has {@link UnorderedReceiverBatch} blocked waiting for data. */ @Test @Repeat(count = NUM_RUNS) @@ -931,7 +935,13 @@ public class TestDrillbitResilience extends DrillTest { @Test // DRILL-3065 public void failsAfterMSorterSorting() { - final String query = "select n_name from cp.`tpch/nation.parquet` order by n_name"; + + // Note: must use an input table that returns more than one + // batch. The sort uses an optimization for single-batch inputs + // which bypasses the code where this partiucular fault is + // injected. + + final String query = "select n_name from cp.`tpch/lineitem.parquet` order by n_name"; final Class<? extends Exception> typeOfException = RuntimeException.class; final long before = countAllocatedMemory(); @@ -946,7 +956,13 @@ public class TestDrillbitResilience extends DrillTest { @Test // DRILL-3085 public void failsAfterMSorterSetup() { - final String query = "select n_name from cp.`tpch/nation.parquet` order by n_name"; + + // Note: must use an input table that returns more than one + // batch. The sort uses an optimization for single-batch inputs + // which bypasses the code where this partiucular fault is + // injected. + + final String query = "select n_name from cp.`tpch/lineitem.parquet` order by n_name"; final Class<? extends Exception> typeOfException = RuntimeException.class; final long before = countAllocatedMemory(); http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java index 5a06ec2..4e44464 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java @@ -17,8 +17,13 @@ */ package org.apache.drill.test; +import java.io.BufferedReader; import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; import java.io.IOException; +import java.io.Reader; +import java.io.StringReader; import java.util.List; import java.util.Properties; @@ -231,4 +236,119 @@ public class ClientFixture implements AutoCloseable { public RowSetBuilder rowSetBuilder(BatchSchema schema) { return new RowSetBuilder(allocator(), schema); } + + /** + * Very simple parser for semi-colon separated lists of SQL statements which + * handles quoted semicolons. Drill can execute only one statement at a time + * (without a trailing semi-colon.) This parser breaks up a statement list + * into single statements. Input:<code><pre> + * USE a.b; + * ALTER SESSION SET `foo` = ";"; + * SELECT * FROM bar WHERE x = "\";"; + * </pre><code>Output: + * <ul> + * <li><tt>USE a.b</tt></li> + * <li><tt>ALTER SESSION SET `foo` = ";"</tt></li> + * <li><tt>SELECT * FROM bar WHERE x = "\";"</tt></li> + */ + + public static class StatementParser { + private final Reader in; + private StringBuilder buf; + + public StatementParser(Reader in) { + this.in = in; + } + + public String parseNext() throws IOException { + boolean eof = false; + buf = new StringBuilder(); + for (;;) { + int c = in.read(); + if (c == -1) { + eof = true; + break; + } + if (c == ';') { + break; + } + buf.append((char) c); + if (c == '"' || c == '\'' || c == '`') { + int quote = c; + boolean escape = false; + for (;;) { + c = in.read(); + if (c == -1) { + throw new IllegalArgumentException("Mismatched quote: " + (char) c); + } + buf.append((char) c); + if (! escape && c == quote) { + break; + } + escape = c == '\\'; + } + } + } + String stmt = buf.toString().trim(); + if (stmt.isEmpty() && eof) { + return null; + } + return stmt; + } + } + + private boolean trace = false; + + public void enableTrace(boolean flag) { + this.trace = flag; + } + + public int exec(Reader in) throws IOException { + StatementParser parser = new StatementParser(in); + int count = 0; + for (;;) { + String stmt = parser.parseNext(); + if (stmt == null) { + if (trace) { + System.out.println("----"); + } + return count; + } + if (stmt.isEmpty()) { + continue; + } + if (trace) { + System.out.println("----"); + System.out.println(stmt); + } + runSqlSilently(stmt); + count++; + } + } + + /** + * Execute a set of statements from a file. + * @param stmts the set of statements, separated by semicolons + * @return the number of statements executed + */ + + public int exec(File source) throws FileNotFoundException, IOException { + try (Reader in = new BufferedReader(new FileReader(source))) { + return exec(in); + } + } + + /** + * Execute a set of statements from a string. + * @param stmts the set of statements, separated by semicolons + * @return the number of statements executed + */ + + public int exec(String stmts) { + try (Reader in = new StringReader(stmts)) { + return exec(in); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } } http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/test/java/org/apache/drill/test/ProfileParser.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ProfileParser.java b/exec/java-exec/src/test/java/org/apache/drill/test/ProfileParser.java index 1dafef7..d9e344a 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/ProfileParser.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/ProfileParser.java @@ -545,7 +545,7 @@ public class ProfileParser { p = Pattern.compile("rowcount = ([\\d.E]+), cumulative cost = \\{([\\d.E]+) rows, ([\\d.E]+) cpu, ([\\d.E]+) io, ([\\d.E]+) network, ([\\d.E]+) memory\\}, id = (\\d+)"); m = p.matcher(tail); - if (! m.matches()) { + if (! m.find()) { throw new IllegalStateException("Could not parse costs: " + tail); } estRows = Double.parseDouble(m.group(1)); http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java index f2a27c8..37fcdfd 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java @@ -17,6 +17,11 @@ */ package org.apache.drill.test; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -48,6 +53,7 @@ import org.apache.drill.exec.util.VectorUtil; import org.apache.drill.exec.vector.NullableVarCharVector; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.test.BufferingQueryEventListener.QueryEvent; +import org.apache.drill.test.ClientFixture.StatementParser; import org.apache.drill.test.rowSet.DirectRowSet; import org.apache.drill.test.rowSet.RowSet; import org.apache.drill.test.rowSet.RowSet.RowSetReader; @@ -230,6 +236,25 @@ public class QueryBuilder { return sql(String.format(query, args)); } + /** + * Parse a single SQL statement (with optional ending semi-colon) from + * the file provided. + * @param file the file containing exactly one SQL statement, with + * optional ending semi-colon + * @return this builder + */ + + public QueryBuilder sql(File file) throws FileNotFoundException, IOException { + try (BufferedReader in = new BufferedReader(new FileReader(file))) { + StatementParser parser = new StatementParser(in); + String sql = parser.parseNext(); + if (sql == null) { + throw new IllegalArgumentException("No query found"); + } + return sql(sql); + } + } + public QueryBuilder physical(String plan) { return query(QueryType.PHYSICAL, plan); } http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java ---------------------------------------------------------------------- diff --git a/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java b/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java index 5139086..a9feafd 100644 --- a/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java +++ b/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java @@ -81,7 +81,6 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable { if (BaseAllocator.DEBUG) { historicalLog.recordEvent("create()"); } - } public DrillBuf reallocIfNeeded(final int size) { @@ -184,15 +183,15 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable { /** * Transfer the memory accounting ownership of this DrillBuf to another allocator. This will generate a new DrillBuf * that carries an association with the underlying memory of this DrillBuf. If this DrillBuf is connected to the - * owning BufferLedger of this memory, that memory ownership/accounting will be transferred to the taret allocator. If + * owning BufferLedger of this memory, that memory ownership/accounting will be transferred to the target allocator. If * this DrillBuf does not currently own the memory underlying it (and is only associated with it), this does not * transfer any ownership to the newly created DrillBuf. - * + * <p> * This operation has no impact on the reference count of this DrillBuf. The newly created DrillBuf with either have a * reference count of 1 (in the case that this is the first time this memory is being associated with the new * allocator) or the current value of the reference count for the other AllocationManager/BufferLedger combination in * the case that the provided allocator already had an association to this underlying memory. - * + * <p> * Transfers will always succeed, even if that puts the other allocator into an overlimit situation. This is possible * due to the fact that the original owning allocator may have allocated this memory out of a local reservation * whereas the target allocator may need to allocate new memory from a parent or RootAllocator. This operation is done @@ -218,6 +217,13 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable { } /** + * Visible only for memory allocation calculations. + * + * @return + */ + public BufferLedger getLedger() { return ledger; } + + /** * The outcome of a Transfer. */ public class TransferResult { @@ -236,7 +242,6 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable { this.allocationFit = allocationFit; this.buffer = buffer; } - } @Override @@ -269,9 +274,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable { throw new IllegalStateException( String.format("DrillBuf[%d] refCnt has gone negative. Buffer Info: %s", id, toVerboseString())); } - return refCnt == 0; - } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java ---------------------------------------------------------------------- diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java index 34647f9..833a604 100644 --- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java +++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java @@ -34,6 +34,7 @@ import org.apache.drill.exec.memory.BaseAllocator.Verbosity; import org.apache.drill.exec.metrics.DrillMetrics; import org.apache.drill.exec.ops.BufferManager; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; /** @@ -246,7 +247,6 @@ public class AllocationManager { owningLedger = target; return overlimit; } - } /** @@ -387,11 +387,10 @@ public class AllocationManager { } return buf; - } /** - * What is the total size (in bytes) of memory underlying this ledger. + * The total size (in bytes) of memory underlying this ledger. * * @return Size in bytes */ @@ -400,7 +399,7 @@ public class AllocationManager { } /** - * How much memory is accounted for by this ledger. This is either getSize() if this is the owning ledger for the + * Amount of memory accounted for by this ledger. This is either getSize() if this is the owning ledger for the * memory or zero in the case that this is not the owning ledger associated with this memory. * * @return Amount of accounted(owned) memory associated with this ledger. @@ -418,17 +417,17 @@ public class AllocationManager { /** * Package visible for debugging/verification only. */ - UnsafeDirectLittleEndian getUnderlying() { + @VisibleForTesting + protected UnsafeDirectLittleEndian getUnderlying() { return underlying; } /** * Package visible for debugging/verification only. */ - boolean isOwningLedger() { + @VisibleForTesting + protected boolean isOwningLedger() { return this == owningLedger; } - } - -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java ---------------------------------------------------------------------- diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java index 8efa3ee..4e17eda 100644 --- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java +++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java @@ -841,6 +841,7 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato @Override public void write(DrillBuf buf, OutputStream out) throws IOException { + assert(buf.readerIndex() == 0); write(buf, buf.readableBytes(), out); } http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/vector/src/main/codegen/includes/vv_imports.ftl ---------------------------------------------------------------------- diff --git a/exec/vector/src/main/codegen/includes/vv_imports.ftl b/exec/vector/src/main/codegen/includes/vv_imports.ftl index 87a2106..efca346 100644 --- a/exec/vector/src/main/codegen/includes/vv_imports.ftl +++ b/exec/vector/src/main/codegen/includes/vv_imports.ftl @@ -43,12 +43,14 @@ import org.apache.drill.exec.vector.complex.writer.*; import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter; import org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter; import org.apache.drill.exec.util.JsonStringArrayList; +import org.apache.drill.exec.memory.AllocationManager.BufferLedger; import org.apache.drill.exec.exception.OutOfMemoryException; import java.util.Arrays; import java.util.Random; import java.util.List; +import java.util.Set; import java.io.Closeable; import java.io.InputStream; http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/vector/src/main/codegen/templates/FixedValueVectors.java ---------------------------------------------------------------------- diff --git a/exec/vector/src/main/codegen/templates/FixedValueVectors.java b/exec/vector/src/main/codegen/templates/FixedValueVectors.java index 1e83a4f..5a53e21 100644 --- a/exec/vector/src/main/codegen/templates/FixedValueVectors.java +++ b/exec/vector/src/main/codegen/templates/FixedValueVectors.java @@ -266,8 +266,8 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F } @Override - public int getPayloadByteCount() { - return getAccessor().getValueCount() * VALUE_WIDTH; + public int getPayloadByteCount(int valueCount) { + return valueCount * ${type.width}; } private class TransferImpl implements TransferPair{ http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/vector/src/main/codegen/templates/NullableValueVectors.java ---------------------------------------------------------------------- diff --git a/exec/vector/src/main/codegen/templates/NullableValueVectors.java b/exec/vector/src/main/codegen/templates/NullableValueVectors.java index 0f8d90c..a2c0deb 100644 --- a/exec/vector/src/main/codegen/templates/NullableValueVectors.java +++ b/exec/vector/src/main/codegen/templates/NullableValueVectors.java @@ -15,12 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +import org.apache.drill.exec.memory.AllocationManager.BufferLedger; import org.apache.drill.exec.util.DecimalUtility; import org.apache.drill.exec.vector.BaseDataValueVector; import org.apache.drill.exec.vector.NullableVectorDefinitionSetter; import java.lang.Override; import java.lang.UnsupportedOperationException; +import java.util.Set; <@pp.dropOutputFile /> <#list vv.types as type> @@ -177,15 +179,16 @@ public final class ${className} extends BaseDataValueVector implements <#if type } @Override - public int getAllocatedByteCount() { - return bits.getAllocatedByteCount() + values.getAllocatedByteCount(); + public void collectLedgers(Set<BufferLedger> ledgers) { + bits.collectLedgers(ledgers); + values.collectLedgers(ledgers); } @Override - public int getPayloadByteCount() { + public int getPayloadByteCount(int valueCount) { // For nullable, we include all values, null or not, in computing // the value length. - return bits.getPayloadByteCount() + values.getPayloadByteCount(); + return bits.getPayloadByteCount(valueCount) + values.getPayloadByteCount(valueCount); } <#if type.major == "VarLen"> @@ -225,7 +228,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type public void allocateNew(int valueCount) { try { values.allocateNew(valueCount); - bits.allocateNew(valueCount+1); + bits.allocateNew(valueCount); } catch(OutOfMemoryException e) { clear(); throw e; http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/vector/src/main/codegen/templates/UnionVector.java ---------------------------------------------------------------------- diff --git a/exec/vector/src/main/codegen/templates/UnionVector.java b/exec/vector/src/main/codegen/templates/UnionVector.java index 207e55a..2c732f4 100644 --- a/exec/vector/src/main/codegen/templates/UnionVector.java +++ b/exec/vector/src/main/codegen/templates/UnionVector.java @@ -29,9 +29,12 @@ package org.apache.drill.exec.vector.complex; <#include "/@includes/vv_imports.ftl" /> import java.util.Iterator; +import java.util.Set; + import org.apache.drill.exec.vector.complex.impl.ComplexCopier; import org.apache.drill.exec.util.CallBack; import org.apache.drill.exec.expr.BasicTypeHelper; +import org.apache.drill.exec.memory.AllocationManager.BufferLedger; /* * This class is generated using freemarker and the ${.template_name} template. @@ -203,19 +206,18 @@ public class UnionVector implements ValueVector { } @Override - public int getAllocatedByteCount() { + public void collectLedgers(Set<BufferLedger> ledgers) { // Most vectors are held inside the internal map. - int count = internalMap.getAllocatedByteCount(); + internalMap.collectLedgers(ledgers); if (bit != null) { - count += bit.getAllocatedByteCount(); + bit.collectLedgers(ledgers); } - return count; } @Override - public int getPayloadByteCount() { - return internalMap.getPayloadByteCount(); + public int getPayloadByteCount(int valueCount) { + return internalMap.getPayloadByteCount(valueCount); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/vector/src/main/codegen/templates/VariableLengthVectors.java ---------------------------------------------------------------------- diff --git a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java index 9a9e178..0eb8906 100644 --- a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java +++ b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java @@ -17,8 +17,10 @@ */ import java.lang.Override; +import java.util.Set; import org.apache.drill.exec.exception.OutOfMemoryException; +import org.apache.drill.exec.memory.AllocationManager.BufferLedger; import org.apache.drill.exec.vector.BaseDataValueVector; import org.apache.drill.exec.vector.BaseValueVector; import org.apache.drill.exec.vector.VariableWidthVector; @@ -247,27 +249,26 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V } @Override - public int getAllocatedByteCount() { - return offsetVector.getAllocatedByteCount() + super.getAllocatedByteCount(); + public void collectLedgers(Set<BufferLedger> ledgers) { + offsetVector.collectLedgers(ledgers); + super.collectLedgers(ledgers); } @Override - public int getPayloadByteCount() { - UInt${type.width}Vector.Accessor a = offsetVector.getAccessor(); - int count = a.getValueCount(); - if (count == 0) { + public int getPayloadByteCount(int valueCount) { + if (valueCount == 0) { return 0; - } else { - // If 1 or more values, then the last value is set to - // the offset of the next value, which is the same as - // the length of existing values. - // In addition to the actual data bytes, we must also - // include the "overhead" bytes: the offset vector entries - // that accompany each column value. Thus, total payload - // size is consumed text bytes + consumed offset vector - // bytes. - return a.get(count-1) + offsetVector.getPayloadByteCount(); } + // If 1 or more values, then the last value is set to + // the offset of the next value, which is the same as + // the length of existing values. + // In addition to the actual data bytes, we must also + // include the "overhead" bytes: the offset vector entries + // that accompany each column value. Thus, total payload + // size is consumed text bytes + consumed offset vector + // bytes. + return offsetVector.getAccessor().get(valueCount) + + offsetVector.getPayloadByteCount(valueCount); } private class TransferImpl implements TransferPair{ @@ -308,7 +309,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V if (size > MAX_ALLOCATION_SIZE) { throw new OversizedAllocationException("Requested amount of memory is more than max allowed allocation size"); } - allocationSizeInBytes = (int)size; + allocationSizeInBytes = (int) size; offsetVector.setInitialCapacity(valueCount + 1); } @@ -385,7 +386,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V throw new OversizedAllocationException("Unable to expand the buffer. Max allowed buffer size is reached."); } - logger.trace("Reallocating VarChar, new size {}",newAllocationSize); + logger.trace("Reallocating VarChar, new size {}", newAllocationSize); final DrillBuf newBuf = allocator.buffer((int)newAllocationSize); newBuf.setBytes(0, data, 0, data.capacity()); data.release(); http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/vector/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java ---------------------------------------------------------------------- diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java index 77d457b..1401373 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -21,41 +21,42 @@ import org.apache.drill.exec.vector.complex.RepeatedFixedWidthVectorLike; import org.apache.drill.exec.vector.complex.RepeatedVariableWidthVectorLike; public class AllocationHelper { -// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AllocationHelper.class); - public static void allocate(ValueVector v, int valueCount, int bytesPerValue) { - allocate(v, valueCount, bytesPerValue, 5); + public static void allocate(ValueVector vector, int valueCount, int bytesPerValue) { + allocate(vector, valueCount, bytesPerValue, 5); } - public static void allocatePrecomputedChildCount(ValueVector v, int valueCount, int bytesPerValue, int childValCount){ - if(v instanceof FixedWidthVector) { - ((FixedWidthVector) v).allocateNew(valueCount); - } else if (v instanceof VariableWidthVector) { - ((VariableWidthVector) v).allocateNew(valueCount * bytesPerValue, valueCount); - } else if(v instanceof RepeatedFixedWidthVectorLike) { - ((RepeatedFixedWidthVectorLike) v).allocateNew(valueCount, childValCount); - } else if(v instanceof RepeatedVariableWidthVectorLike) { - ((RepeatedVariableWidthVectorLike) v).allocateNew(childValCount * bytesPerValue, valueCount, childValCount); + public static void allocatePrecomputedChildCount(ValueVector vector, int valueCount, int bytesPerValue, int childValCount) { + if (vector instanceof FixedWidthVector) { + ((FixedWidthVector) vector).allocateNew(valueCount); + } else if (vector instanceof VariableWidthVector) { + ((VariableWidthVector) vector).allocateNew(valueCount * bytesPerValue, valueCount); + } else if (vector instanceof RepeatedFixedWidthVectorLike) { + ((RepeatedFixedWidthVectorLike) vector).allocateNew(valueCount, childValCount); + } else if (vector instanceof RepeatedVariableWidthVectorLike && childValCount > 0 && bytesPerValue > 0) { + // Assertion thrown if byte count is zero in the full allocateNew, + // so use default version instead. + ((RepeatedVariableWidthVectorLike) vector).allocateNew(childValCount * bytesPerValue, valueCount, childValCount); } else { - v.allocateNew(); + vector.allocateNew(); } } - public static void allocate(ValueVector v, int valueCount, int bytesPerValue, int repeatedPerTop){ - allocatePrecomputedChildCount(v, valueCount, bytesPerValue, repeatedPerTop * valueCount); + public static void allocate(ValueVector vector, int valueCount, int bytesPerValue, int repeatedPerTop){ + allocatePrecomputedChildCount(vector, valueCount, bytesPerValue, repeatedPerTop * valueCount); } /** * Allocates the exact amount if v is fixed width, otherwise falls back to dynamic allocation - * @param v value vector we are trying to allocate + * @param vector value vector we are trying to allocate * @param valueCount size we are trying to allocate * @throws org.apache.drill.exec.memory.OutOfMemoryException if it can't allocate the memory */ - public static void allocateNew(ValueVector v, int valueCount) { - if (v instanceof FixedWidthVector) { - ((FixedWidthVector) v).allocateNew(valueCount); + public static void allocateNew(ValueVector vector, int valueCount) { + if (vector instanceof FixedWidthVector) { + ((FixedWidthVector) vector).allocateNew(valueCount); } else { - v.allocateNew(); + vector.allocateNew(); } } } http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java ---------------------------------------------------------------------- diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java index 5ce58ed..e98a417 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java @@ -18,7 +18,11 @@ package org.apache.drill.exec.vector; import io.netty.buffer.DrillBuf; + +import java.util.Set; + import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.AllocationManager.BufferLedger; import org.apache.drill.exec.record.MaterializedField; @@ -87,9 +91,6 @@ public abstract class BaseDataValueVector extends BaseValueVector { public void reset() {} @Override - public int getAllocatedByteCount() { return data.capacity(); } - - @Override public void exchange(ValueVector other) { BaseDataValueVector target = (BaseDataValueVector) other; DrillBuf temp = data; @@ -99,4 +100,11 @@ public abstract class BaseDataValueVector extends BaseValueVector { getMutator().exchange(target.getMutator()); // No state in an Accessor to reset } + + public void collectLedgers(Set<BufferLedger> ledgers) { + BufferLedger ledger = data.getLedger(); + if (ledger != null) { + ledgers.add(ledger); + } + } } http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java ---------------------------------------------------------------------- diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java index 0062e77..a7c81de 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java @@ -528,8 +528,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe } @Override - public int getPayloadByteCount() { - // One byte per value - return valueCount; + public int getPayloadByteCount(int valueCount) { + return getSizeFromCount(valueCount); } } http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/vector/src/main/java/org/apache/drill/exec/vector/ObjectVector.java ---------------------------------------------------------------------- diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/ObjectVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/ObjectVector.java index bd8566d..3136e32 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/ObjectVector.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/ObjectVector.java @@ -22,9 +22,11 @@ import io.netty.buffer.DrillBuf; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Set; import org.apache.drill.exec.expr.holders.ObjectHolder; import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.AllocationManager.BufferLedger; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.record.MaterializedField; @@ -229,13 +231,10 @@ public class ObjectVector extends BaseValueVector { } @Override - public int getAllocatedByteCount() { - // Values not stored in direct memory? - return 0; - } + public void collectLedgers(Set<BufferLedger> ledgers) {} @Override - public int getPayloadByteCount() { + public int getPayloadByteCount(int valueCount) { // Values not stored in direct memory? return 0; } http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java ---------------------------------------------------------------------- diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java index 3bc43fa..2926862 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java @@ -18,10 +18,12 @@ package org.apache.drill.exec.vector; import java.io.Closeable; +import java.util.Set; import io.netty.buffer.DrillBuf; import org.apache.drill.exec.exception.OutOfMemoryException; +import org.apache.drill.exec.memory.AllocationManager.BufferLedger; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.proto.UserBitShared.SerializedField; import org.apache.drill.exec.record.MaterializedField; @@ -204,16 +206,19 @@ public interface ValueVector extends Closeable, Iterable<ValueVector> { void copyEntry(int toIndex, ValueVector from, int fromIndex); /** - * Return the total memory consumed by all buffers within this vector. + * Add the ledgers underlying the buffers underlying the components of the + * vector to the set provided. Used to determine actual memory allocation. + * + * @param ledgers set of ledgers to which to add ledgers for this vector */ - int getAllocatedByteCount(); + void collectLedgers(Set<BufferLedger> ledgers); /** * Return the number of value bytes consumed by actual data. */ - int getPayloadByteCount(); + int getPayloadByteCount(int valueCount); /** * Exchange state with another value vector of the same type. http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/vector/src/main/java/org/apache/drill/exec/vector/ZeroVector.java ---------------------------------------------------------------------- diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/ZeroVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/ZeroVector.java index 9a0b6be..5786487 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/ZeroVector.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/ZeroVector.java @@ -18,11 +18,13 @@ package org.apache.drill.exec.vector; import java.util.Iterator; +import java.util.Set; import com.google.common.collect.Iterators; import io.netty.buffer.DrillBuf; import org.apache.drill.common.types.Types; import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.AllocationManager.BufferLedger; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.record.MaterializedField; @@ -158,11 +160,10 @@ public class ZeroVector implements ValueVector { public void copyEntry(int toIndex, ValueVector from, int fromIndex) { } @Override - public int getAllocatedByteCount() { return 0; } + public void exchange(ValueVector other) { } - @Override - public int getPayloadByteCount() { return 0; } + public void collectLedgers(Set<BufferLedger> ledgers) {} @Override - public void exchange(ValueVector other) { } + public int getPayloadByteCount(int valueCount) { return 0; } } http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java ---------------------------------------------------------------------- diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java index baba086..30db41e 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java @@ -22,11 +22,13 @@ import io.netty.buffer.DrillBuf; import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.Set; import org.apache.drill.common.collections.MapWithOrdinal; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.exec.expr.BasicTypeHelper; import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.AllocationManager.BufferLedger; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.util.CallBack; import org.apache.drill.exec.vector.ValueVector; @@ -117,6 +119,7 @@ public abstract class AbstractMapVector extends AbstractContainerVector { * * @return resultant {@link org.apache.drill.exec.vector.ValueVector} */ + @SuppressWarnings("unchecked") @Override public <T extends ValueVector> T addOrGet(String name, TypeProtos.MajorType type, Class<T> clazz) { final ValueVector existing = getChild(name); @@ -277,21 +280,18 @@ public abstract class AbstractMapVector extends AbstractContainerVector { } @Override - public int getAllocatedByteCount() { - int count = 0; - + public void collectLedgers(Set<BufferLedger> ledgers) { for (final ValueVector v : vectors.values()) { - count += v.getAllocatedByteCount(); + v.collectLedgers(ledgers); } - return count; } @Override - public int getPayloadByteCount() { + public int getPayloadByteCount(int valueCount) { int count = 0; for (final ValueVector v : vectors.values()) { - count += v.getPayloadByteCount(); + count += v.getPayloadByteCount(valueCount); } return count; } http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java ---------------------------------------------------------------------- diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java index 5b8f44d..2b41b8b 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java @@ -21,12 +21,14 @@ import io.netty.buffer.DrillBuf; import java.util.Collections; import java.util.Iterator; +import java.util.Set; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.Types; import org.apache.drill.exec.exception.SchemaChangeRuntimeException; import org.apache.drill.exec.expr.BasicTypeHelper; import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.AllocationManager.BufferLedger; import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.vector.AddOrGetResult; @@ -182,6 +184,7 @@ public abstract class BaseRepeatedValueVector extends BaseValueVector implements return vector == DEFAULT_DATA_VECTOR ? 0:1; } + @SuppressWarnings("unchecked") @Override public <T extends ValueVector> AddOrGetResult<T> addOrGetVector(VectorDescriptor descriptor) { boolean created = false; @@ -210,13 +213,15 @@ public abstract class BaseRepeatedValueVector extends BaseValueVector implements } @Override - public int getAllocatedByteCount() { - return offsets.getAllocatedByteCount() + vector.getAllocatedByteCount(); + public void collectLedgers(Set<BufferLedger> ledgers) { + offsets.collectLedgers(ledgers); + vector.collectLedgers(ledgers); } @Override - public int getPayloadByteCount() { - return offsets.getPayloadByteCount() + vector.getPayloadByteCount(); + public int getPayloadByteCount(int valueCount) { + int entryCount = offsets.getAccessor().get(valueCount); + return offsets.getPayloadByteCount(valueCount) + vector.getPayloadByteCount(entryCount); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java ---------------------------------------------------------------------- diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java index 7f0e939..c61fd00 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java @@ -22,6 +22,7 @@ import io.netty.buffer.DrillBuf; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.common.types.Types; import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.AllocationManager.BufferLedger; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.record.MaterializedField; @@ -41,6 +42,7 @@ import org.apache.drill.exec.vector.complex.reader.FieldReader; import org.apache.drill.exec.vector.complex.writer.FieldWriter; import java.util.List; +import java.util.Set; public class ListVector extends BaseRepeatedValueVector { @@ -323,12 +325,15 @@ public class ListVector extends BaseRepeatedValueVector { } @Override - public int getAllocatedByteCount() { - return offsets.getAllocatedByteCount() + bits.getAllocatedByteCount() + super.getAllocatedByteCount(); + public void collectLedgers(Set<BufferLedger> ledgers) { + offsets.collectLedgers(ledgers); + bits.collectLedgers(ledgers); + super.collectLedgers(ledgers); } @Override - public int getPayloadByteCount() { - return offsets.getPayloadByteCount() + bits.getPayloadByteCount() + super.getPayloadByteCount(); + public int getPayloadByteCount(int valueCount) { + return offsets.getPayloadByteCount(valueCount) + bits.getPayloadByteCount(valueCount) + + super.getPayloadByteCount(valueCount); } } http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java ---------------------------------------------------------------------- diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java index 969c141..ab2c3d8 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java @@ -23,6 +23,7 @@ import io.netty.buffer.DrillBuf; import java.util.Iterator; import java.util.List; +import java.util.Set; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.TypeProtos.MinorType; @@ -30,6 +31,7 @@ import org.apache.drill.common.types.Types; import org.apache.drill.exec.expr.holders.ComplexHolder; import org.apache.drill.exec.expr.holders.RepeatedListHolder; import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.AllocationManager.BufferLedger; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.proto.UserBitShared.SerializedField; import org.apache.drill.exec.record.MaterializedField; @@ -415,6 +417,7 @@ public class RepeatedListVector extends AbstractContainerVector public void allocateNew(int valueCount, int innerValueCount) { clear(); getOffsetVector().allocateNew(valueCount + 1); + getOffsetVector().getMutator().setSafe(0, 0); getMutator().reset(); } @@ -435,14 +438,13 @@ public class RepeatedListVector extends AbstractContainerVector copyFromSafe(fromIndex, toIndex, (RepeatedListVector) from); } - @Override - public int getAllocatedByteCount() { - return delegate.getAllocatedByteCount(); + public void collectLedgers(Set<BufferLedger> ledgers) { + delegate.collectLedgers(ledgers); } @Override - public int getPayloadByteCount() { - return delegate.getPayloadByteCount(); + public int getPayloadByteCount(int valueCount) { + return delegate.getPayloadByteCount(valueCount); } @Override @@ -450,5 +452,4 @@ public class RepeatedListVector extends AbstractContainerVector // TODO: Figure out how to test this scenario, then what to do... throw new UnsupportedOperationException("Exchange() not yet supported for repeated lists"); } - } http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java ---------------------------------------------------------------------- diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java index 7ff36a7..be73fc8 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java @@ -22,6 +22,7 @@ import io.netty.buffer.DrillBuf; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.commons.lang3.ArrayUtils; import org.apache.drill.common.types.TypeProtos.DataMode; @@ -33,6 +34,7 @@ import org.apache.drill.exec.expr.BasicTypeHelper; import org.apache.drill.exec.expr.holders.ComplexHolder; import org.apache.drill.exec.expr.holders.RepeatedMapHolder; import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.AllocationManager.BufferLedger; import org.apache.drill.exec.proto.UserBitShared.SerializedField; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.TransferPair; @@ -100,7 +102,7 @@ public class RepeatedMapVector extends AbstractMapVector public void allocateNew(int groupCount, int innerValueCount) { clear(); try { - offsets.allocateNew(groupCount + 1); + allocateOffsetsNew(groupCount); for (ValueVector v : getChildren()) { AllocationHelper.allocatePrecomputedChildCount(v, groupCount, 50, innerValueCount); } @@ -108,10 +110,14 @@ public class RepeatedMapVector extends AbstractMapVector clear(); throw e; } - offsets.zeroVector(); mutator.reset(); } + public void allocateOffsetsNew(int groupCount) { + offsets.allocateNew(groupCount + 1); + offsets.zeroVector(); + } + public Iterator<String> fieldNameIterator() { return getChildFieldNames().iterator(); } @@ -128,11 +134,7 @@ public class RepeatedMapVector extends AbstractMapVector if (getAccessor().getValueCount() == 0) { return 0; } - long bufferSize = offsets.getBufferSize(); - for (final ValueVector v : (Iterable<ValueVector>) this) { - bufferSize += v.getBufferSize(); - } - return (int) bufferSize; + return offsets.getBufferSize() + super.getBufferSize(); } @Override @@ -141,7 +143,7 @@ public class RepeatedMapVector extends AbstractMapVector return 0; } - long bufferSize = 0; + long bufferSize = offsets.getBufferSizeFor(valueCount); for (final ValueVector v : (Iterable<ValueVector>) this) { bufferSize += v.getBufferSizeFor(valueCount); } @@ -451,7 +453,7 @@ public class RepeatedMapVector extends AbstractMapVector bufOffset += vectorLength; } - assert bufOffset == buffer.capacity(); + assert bufOffset == buffer.writerIndex(); } @@ -598,7 +600,8 @@ public class RepeatedMapVector extends AbstractMapVector } @Override - public int getAllocatedByteCount() { - return super.getAllocatedByteCount( ) + offsets.getAllocatedByteCount(); + public void collectLedgers(Set<BufferLedger> ledgers) { + super.collectLedgers(ledgers); + offsets.collectLedgers(ledgers); } }