sohami closed pull request #1375: DRILL-6594: Data batches for Project operator
are not being split properly and exceed the maximum specified
URL: https://github.com/apache/drill/pull/1375
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java
index b9240d68be0..84a3f46f886 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java
@@ -95,30 +95,31 @@ public OutputWidthCalculator getCalculator() {
}
/**
- * VarLenReadExpr captures the name of a variable length column that is
used (read) in an expression.
- * The captured name will be used to lookup the average entry size for the
column in the corresponding
+ * VarLenReadExpr captures the inputColumnName and the readExpression used
to read a variable length column.
+ * The captured inputColumnName will be used to lookup the average entry
size for the column in the corresponding.
+ * If inputColumnName is null then the readExpression is used to get the
name of the column.
* {@link org.apache.drill.exec.record.RecordBatchSizer}
*/
public static class VarLenReadExpr extends OutputWidthExpression {
ValueVectorReadExpression readExpression;
- String name;
+ String inputColumnName;
public VarLenReadExpr(ValueVectorReadExpression readExpression) {
this.readExpression = readExpression;
- this.name = null;
+ this.inputColumnName = null;
}
- public VarLenReadExpr(String name) {
+ public VarLenReadExpr(String inputColumnName) {
this.readExpression = null;
- this.name = name;
+ this.inputColumnName = inputColumnName;
}
public ValueVectorReadExpression getReadExpression() {
return readExpression;
}
- public String getName() {
- return name;
+ public String getInputColumnName() {
+ return inputColumnName;
}
@Override
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java
index cb587952304..70908bf2fcb 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java
@@ -205,7 +205,7 @@ public OutputWidthExpression visitFixedLenExpr(FixedLenExpr
fixedLenExpr, Output
@Override
public OutputWidthExpression visitVarLenReadExpr(VarLenReadExpr
varLenReadExpr, OutputWidthVisitorState state)
throws
RuntimeException {
- String columnName = varLenReadExpr.getName();
+ String columnName = varLenReadExpr.getInputColumnName();
if (columnName == null) {
TypedFieldId fieldId =
varLenReadExpr.getReadExpression().getTypedFieldId();
columnName = TypedFieldId.getPath(fieldId,
state.manager.getIncomingBatch());
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitorState.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitorState.java
index c0e0cb1c9e6..e18c827128b 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitorState.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitorState.java
@@ -20,18 +20,13 @@
public class OutputWidthVisitorState {
ProjectMemoryManager manager;
- ProjectMemoryManager.OutputColumnType outputColumnType;
- public OutputWidthVisitorState(ProjectMemoryManager manager,
ProjectMemoryManager.OutputColumnType outputColumnType) {
+ public OutputWidthVisitorState(ProjectMemoryManager manager) {
this.manager = manager;
- this.outputColumnType = outputColumnType;
}
public ProjectMemoryManager getManager() {
return manager;
}
- public ProjectMemoryManager.OutputColumnType getOutputColumnType() {
- return outputColumnType;
- }
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
index f461b092281..03c849cfef6 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.physical.impl.project;
+import com.google.common.base.Preconditions;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.DataMode;
@@ -88,15 +89,12 @@ public RecordBatch getIncomingBatch() {
}
class ColumnWidthInfo {
- //MaterializedField materializedField;
OutputWidthExpression outputExpression;
int width;
WidthType widthType;
OutputColumnType outputColumnType;
- String name;
- ColumnWidthInfo(ValueVector vv,
- OutputWidthExpression outputWidthExpression,
+ ColumnWidthInfo(OutputWidthExpression outputWidthExpression,
OutputColumnType outputColumnType,
WidthType widthType,
int fieldWidth) {
@@ -104,8 +102,6 @@ public RecordBatch getIncomingBatch() {
this.width = fieldWidth;
this.outputColumnType = outputColumnType;
this.widthType = widthType;
- String columnName = vv.getField().getName();
- this.name = columnName;
}
public OutputWidthExpression getOutputExpression() { return
outputExpression; }
@@ -116,7 +112,6 @@ public RecordBatch getIncomingBatch() {
public int getWidth() { return width; }
- public String getName() { return name; }
}
void ShouldNotReachHere() {
@@ -180,43 +175,44 @@ public static int
getWidthOfFixedWidthType(TypeProtos.MajorType majorType) {
}
- void addTransferField(ValueVector vvOut, String path) {
- addField(vvOut, null, OutputColumnType.TRANSFER, path);
+ void addTransferField(ValueVector vvIn, String inputColumnName, String
outputColumnName) {
+ addField(vvIn, null, OutputColumnType.TRANSFER, inputColumnName,
outputColumnName);
}
- void addNewField(ValueVector vv, LogicalExpression logicalExpression) {
- addField(vv, logicalExpression, OutputColumnType.NEW, null);
+ void addNewField(ValueVector vvOut, LogicalExpression logicalExpression) {
+ addField(vvOut, logicalExpression, OutputColumnType.NEW, null,
vvOut.getField().getName());
}
- void addField(ValueVector vv, LogicalExpression logicalExpression,
OutputColumnType outputColumnType, String path) {
+ void addField(ValueVector vv, LogicalExpression logicalExpression,
OutputColumnType outputColumnType,
+ String inputColumnName, String outputColumnName) {
if(isFixedWidth(vv)) {
addFixedWidthField(vv);
} else {
- addVariableWidthField(vv, logicalExpression, outputColumnType,
path);
+ addVariableWidthField(vv, logicalExpression, outputColumnType,
inputColumnName, outputColumnName);
}
}
private void addVariableWidthField(ValueVector vv, LogicalExpression
logicalExpression,
- OutputColumnType outputColumnType,
String path) {
+ OutputColumnType outputColumnType,
String inputColumnName, String outputColumnName) {
variableWidthColumnCount++;
ColumnWidthInfo columnWidthInfo;
//Variable width transfers
if(outputColumnType == OutputColumnType.TRANSFER) {
- String columnName = path;
- VarLenReadExpr readExpr = new VarLenReadExpr(columnName);
- columnWidthInfo = new ColumnWidthInfo(vv, readExpr,
outputColumnType,
+ VarLenReadExpr readExpr = new VarLenReadExpr(inputColumnName);
+ columnWidthInfo = new ColumnWidthInfo(readExpr, outputColumnType,
WidthType.VARIABLE, -1); //fieldWidth has to be obtained
from the RecordBatchSizer
} else if (isComplex(vv.getField().getType())) {
addComplexField(vv);
return;
} else {
// Walk the tree of LogicalExpressions to get a tree of
OutputWidthExpressions
- OutputWidthVisitorState state = new OutputWidthVisitorState(this,
outputColumnType);
+ OutputWidthVisitorState state = new OutputWidthVisitorState(this);
OutputWidthExpression outputWidthExpression =
logicalExpression.accept(new OutputWidthVisitor(), state);
- columnWidthInfo = new ColumnWidthInfo(vv, outputWidthExpression,
outputColumnType,
+ columnWidthInfo = new ColumnWidthInfo(outputWidthExpression,
outputColumnType,
WidthType.VARIABLE, -1); //fieldWidth has to be obtained
from the OutputWidthExpression
}
- outputColumnSizes.put(columnWidthInfo.getName(), columnWidthInfo);
+ ColumnWidthInfo existingInfo = outputColumnSizes.put(outputColumnName,
columnWidthInfo);
+ Preconditions.checkState(existingInfo == null);
}
void addComplexField(ValueVector vv) {
@@ -258,8 +254,8 @@ public void update() {
setRecordBatchSizer(batchSizer);
rowWidth = 0;
int totalVariableColumnWidth = 0;
- for (String expr : outputColumnSizes.keySet()) {
- ColumnWidthInfo columnWidthInfo = outputColumnSizes.get(expr);
+ for (String outputColumnName : outputColumnSizes.keySet()) {
+ ColumnWidthInfo columnWidthInfo =
outputColumnSizes.get(outputColumnName);
int width = -1;
if (columnWidthInfo.isFixedWidth()) {
// fixed width columns are accumulated in
totalFixedWidthColumnWidth
@@ -269,12 +265,10 @@ public void update() {
//As the tree is walked, the RecordBatchSizer and function
annotations
//are looked-up to come up with the final FixedLenExpr
OutputWidthExpression savedWidthExpr =
columnWidthInfo.getOutputExpression();
- OutputColumnType columnType =
columnWidthInfo.getOutputColumnType();
- OutputWidthVisitorState state = new
OutputWidthVisitorState(this, columnType);
+ OutputWidthVisitorState state = new
OutputWidthVisitorState(this);
OutputWidthExpression reducedExpr = savedWidthExpr.accept(new
OutputWidthVisitor(), state);
- assert reducedExpr instanceof FixedLenExpr;
width = ((FixedLenExpr)reducedExpr).getWidth();
- assert width >= 0;
+ Preconditions.checkState(width >= 0);
}
totalVariableColumnWidth += width;
}
@@ -301,7 +295,7 @@ public void update() {
logger.trace("update() : Output RC {}, BatchSizer RC {}, incoming RC
{}, width {}, total fixed width {}"
+ ", total variable width {}, total complex width {},
batchSizer time {} ms, update time {} ms"
+ ", manager {}, incoming {}",outPutRowCount,
batchSizer.rowCount(), incomingBatch.getRecordCount(),
- totalFixedWidthColumnWidth, totalVariableColumnWidth,
totalComplexColumnWidth,
+ rowWidth, totalFixedWidthColumnWidth,
totalVariableColumnWidth, totalComplexColumnWidth,
(batchSizerEndTime - updateStartTime),(updateEndTime -
updateStartTime), this, incomingBatch);
logger.debug("BATCH_STATS, incoming: {}", getRecordBatchSizer());
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 4bc63c0b1b7..dd933250a2b 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -113,11 +113,6 @@ private void clear() {
public ProjectRecordBatch(final Project pop, final RecordBatch incoming,
final FragmentContext context) throws OutOfMemoryException {
super(pop, context, incoming);
-
- // get the output batch size from config.
- int configuredBatchSize = (int)
context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
-
- memoryManager = new ProjectMemoryManager(configuredBatchSize);
}
@Override
@@ -367,6 +362,9 @@ private boolean isWildcard(final NamedExpression ex) {
private void setupNewSchemaFromInput(RecordBatch incomingBatch) throws
SchemaChangeException {
long setupNewSchemaStartTime = System.currentTimeMillis();
+ // get the output batch size from config.
+ int configuredBatchSize = (int)
context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
+ memoryManager = new ProjectMemoryManager(configuredBatchSize);
memoryManager.init(incomingBatch, this);
if (allocationVectors != null) {
for (final ValueVector v : allocationVectors) {
@@ -431,7 +429,7 @@ private void setupNewSchemaFromInput(RecordBatch
incomingBatch) throws SchemaCha
final ValueVector vvOut =
container.addOrGet(MaterializedField.create(ref.getAsNamePart().getName(),
vvIn.getField().getType()), callBack);
final TransferPair tp = vvIn.makeTransferPair(vvOut);
- memoryManager.addTransferField(vvIn, vvIn.getField().getName());
+ memoryManager.addTransferField(vvIn, vvIn.getField().getName(),
vvOut.getField().getName());
transfers.add(tp);
}
} else if (value != null && value > 1) { // subsequent wildcards
should do a copy of incoming valuevectors
@@ -513,7 +511,7 @@ private void setupNewSchemaFromInput(RecordBatch
incomingBatch) throws SchemaCha
container.addOrGet(MaterializedField.create(ref.getLastSegment().getNameSegment().getPath(),
vectorRead.getMajorType()), callBack);
final TransferPair tp = vvIn.makeTransferPair(vvOut);
- memoryManager.addTransferField(vvIn, TypedFieldId.getPath(id,
incomingBatch));
+ memoryManager.addTransferField(vvIn, TypedFieldId.getPath(id,
incomingBatch), vvOut.getField().getName());
transfers.add(tp);
transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]);
} else if (expr instanceof DrillFuncHolderExpr &&
@@ -540,13 +538,13 @@ private void setupNewSchemaFromInput(RecordBatch
incomingBatch) throws SchemaCha
memoryManager.addComplexField(null); // this will just add an estimate
to the row width
} else {
// need to do evaluation.
- final ValueVector vector = container.addOrGet(outputField, callBack);
- allocationVectors.add(vector);
+ final ValueVector ouputVector = container.addOrGet(outputField,
callBack);
+ allocationVectors.add(ouputVector);
final TypedFieldId fid =
container.getValueVectorId(SchemaPath.getSimplePath(outputField.getName()));
- final boolean useSetSafe = !(vector instanceof FixedWidthVector);
+ final boolean useSetSafe = !(ouputVector instanceof FixedWidthVector);
final ValueVectorWriteExpression write = new
ValueVectorWriteExpression(fid, expr, useSetSafe);
final HoldingContainer hc = cg.addExpr(write,
ClassGenerator.BlkCreateMode.TRUE_IF_BOUND);
- memoryManager.addNewField(vector, write);
+ memoryManager.addNewField(ouputVector, write);
// We cannot do multiple transfers from the same vector. However we
still need to instantiate the output vector.
if (expr instanceof ValueVectorReadExpression) {
@@ -555,7 +553,7 @@ private void setupNewSchemaFromInput(RecordBatch
incomingBatch) throws SchemaCha
final TypedFieldId id = vectorRead.getFieldId();
final ValueVector vvIn =
incomingBatch.getValueAccessorById(id.getIntermediateClass(),
id.getFieldIds()).getValueVector();
- vvIn.makeTransferPair(vector);
+ vvIn.makeTransferPair(ouputVector);
}
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services