Repository: incubator-drill Updated Branches: refs/heads/master 71f5ad447 -> e764479c1
DRILL-1547: enforce writers to explicitly check for buffer bounds to avoid IndexOutOfBounds errors; make writer hierarchy to stop immediately in case of a write error Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/e764479c Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/e764479c Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/e764479c Branch: refs/heads/master Commit: e764479c1fafacb1bd529841b19ed56b1c9e1998 Parents: 71f5ad4 Author: Hanifi Gunes <[email protected]> Authored: Tue Oct 21 23:37:20 2014 -0700 Committer: Aman Sinha <[email protected]> Committed: Thu Oct 23 11:57:16 2014 -0700 ---------------------------------------------------------------------- .../src/main/codegen/templates/BaseWriter.java | 1 + .../main/codegen/templates/ComplexWriters.java | 4 + .../src/main/codegen/templates/ListWriters.java | 8 +- .../src/main/codegen/templates/MapWriters.java | 6 + .../exec/store/easy/json/JSONRecordReader2.java | 3 - .../drill/exec/util/JsonStringArrayList.java | 11 ++ .../drill/exec/util/JsonStringHashMap.java | 11 ++ .../drill/exec/vector/complex/MapVector.java | 17 ++- .../exec/vector/complex/RepeatedMapVector.java | 4 +- .../exec/vector/complex/fn/JsonReader.java | 20 ++- .../vector/complex/impl/ComplexWriterImpl.java | 5 + .../complex/impl/VectorContainerWriter.java | 5 + .../exec/work/fragment/FragmentExecutor.java | 2 +- .../exec/vector/complex/fn/JsonReaderTests.java | 127 +++++++++++++++++++ .../vector/complex/fn/nested-with-nulls.json | 3 + .../resources/vector/complex/fn/sparse.json | 4 + 16 files changed, 218 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e764479c/exec/java-exec/src/main/codegen/templates/BaseWriter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/codegen/templates/BaseWriter.java b/exec/java-exec/src/main/codegen/templates/BaseWriter.java index 69ab6cd..224148c 100644 --- a/exec/java-exec/src/main/codegen/templates/BaseWriter.java +++ b/exec/java-exec/src/main/codegen/templates/BaseWriter.java @@ -31,6 +31,7 @@ public interface BaseWriter extends Positionable{ FieldWriter getParent(); boolean ok(); WriteState getState(); + int getValueCapacity(); public interface MapWriter extends BaseWriter{ http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e764479c/exec/java-exec/src/main/codegen/templates/ComplexWriters.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/codegen/templates/ComplexWriters.java b/exec/java-exec/src/main/codegen/templates/ComplexWriters.java index 2442434..c1e6052 100644 --- a/exec/java-exec/src/main/codegen/templates/ComplexWriters.java +++ b/exec/java-exec/src/main/codegen/templates/ComplexWriters.java @@ -57,6 +57,10 @@ public class ${eName}WriterImpl extends AbstractFieldWriter { return vector.getField(); } + public int getValueCapacity() { + return vector.getValueCapacity(); + } + public void checkValueCapacity() { inform(vector.getValueCapacity() > idx()); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e764479c/exec/java-exec/src/main/codegen/templates/ListWriters.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/codegen/templates/ListWriters.java b/exec/java-exec/src/main/codegen/templates/ListWriters.java index 6ea9bb4..ff4c3d8 100644 --- a/exec/java-exec/src/main/codegen/templates/ListWriters.java +++ b/exec/java-exec/src/main/codegen/templates/ListWriters.java @@ -71,7 +71,11 @@ public class ${mode}ListWriter extends AbstractFieldWriter{ public void clear(){ writer.clear(); } - + + public int getValueCapacity() { + return innerVector==null ? 0:innerVector.getValueCapacity(); + } + public void setValueCount(int count){ if(innerVector != null) innerVector.getMutator().setValueCount(count); } @@ -152,6 +156,8 @@ public class ${mode}ListWriter extends AbstractFieldWriter{ <#if mode == "Repeated"> public void start(){ if(ok()){ + checkValueCapacity(); + if (!ok()) return; // update the repeated vector to state that there is current+1 objects. RepeatedListHolder h = new RepeatedListHolder(); container.getAccessor().get(idx(), h); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e764479c/exec/java-exec/src/main/codegen/templates/MapWriters.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/codegen/templates/MapWriters.java b/exec/java-exec/src/main/codegen/templates/MapWriters.java index dd7e653..1fdb4db 100644 --- a/exec/java-exec/src/main/codegen/templates/MapWriters.java +++ b/exec/java-exec/src/main/codegen/templates/MapWriters.java @@ -54,6 +54,10 @@ public class ${mode}MapWriter extends AbstractFieldWriter{ this.container = container; } + public int getValueCapacity() { + return container.getValueCapacity(); + } + public MaterializedField getField() { return container.getField(); } @@ -105,6 +109,8 @@ public class ${mode}MapWriter extends AbstractFieldWriter{ <#if mode == "Repeated"> public void start(){ if(ok()){ + checkValueCapacity(); + if (!ok()) return; // update the repeated vector to state that there is current+1 objects. RepeatedMapHolder h = new RepeatedMapHolder(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e764479c/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java index 18cff5d..ff7d3f2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java @@ -117,9 +117,6 @@ public class JSONRecordReader2 extends AbstractRecordReader { break outside; case WRITE_FAILED: - if (recordCount == 0) { - throw new DrillRuntimeException("Record is too big to fit into allocated ValueVector"); - } break outside; }; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e764479c/exec/java-exec/src/main/java/org/apache/drill/exec/util/JsonStringArrayList.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/JsonStringArrayList.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/JsonStringArrayList.java index d1f4c08..cea5676 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/JsonStringArrayList.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/JsonStringArrayList.java @@ -37,6 +37,17 @@ public class JsonStringArrayList<E> extends ArrayList<E> { } @Override + public boolean equals(Object other) { + if (other instanceof JsonStringArrayList) { + return toString().equals(other.toString()); + } + if (other instanceof String) { + return toString().equals(other); + } + return false; + } + + @Override public final String toString() { try { return mapper.writeValueAsString(this); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e764479c/exec/java-exec/src/main/java/org/apache/drill/exec/util/JsonStringHashMap.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/JsonStringHashMap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/JsonStringHashMap.java index 7ec870f..6e83494 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/JsonStringHashMap.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/JsonStringHashMap.java @@ -41,6 +41,17 @@ public class JsonStringHashMap<K, V> extends LinkedHashMap<K, V> { } @Override + public boolean equals(Object other) { + if (other instanceof JsonStringHashMap) { + return toString().equals(other.toString()); + } + if (other instanceof String) { + return toString().equals(other); + } + return false; + } + + @Override public final String toString() { try { return mapper.writeValueAsString(this); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e764479c/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java index 037f1c7..3403fc3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java @@ -17,6 +17,8 @@ */ package org.apache.drill.exec.vector.complex; +import com.google.common.collect.Ordering; +import com.google.common.primitives.Ints; import io.netty.buffer.DrillBuf; import java.util.HashMap; @@ -24,6 +26,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import javax.annotation.Nullable; + import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos.DataMode; @@ -280,7 +284,18 @@ public class MapVector extends AbstractContainerVector { if (this.vectors.isEmpty()) { return 0; } - return vectors.values().iterator().next().getValueCapacity(); + + final Ordering<ValueVector> natural = new Ordering<ValueVector>() { + @Override + public int compare(@Nullable ValueVector left, @Nullable ValueVector right) { + return Ints.compare( + Preconditions.checkNotNull(left).getValueCapacity(), + Preconditions.checkNotNull(right).getValueCapacity() + ); + } + }; + + return natural.min(vectors.values()).getValueCapacity(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e764479c/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java index 7a7b11d..2612924 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java @@ -316,7 +316,7 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat @Override public int getValueCapacity() { - return offsets.getValueCapacity(); + return offsets.getValueCapacity()-1; } @Override @@ -414,7 +414,7 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat } public void get(int index, RepeatedMapHolder holder) { - assert index <= getValueCapacity(); + assert index < getValueCapacity()-1; holder.start = offsets.getAccessor().get(index); holder.end = offsets.getAccessor().get(index+1); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e764479c/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java index 02871f6..17e266f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java @@ -139,8 +139,11 @@ public class JsonReader { return writeToVector(writer, t); } -private boolean writeToVector(ComplexWriter writer, JsonToken t) - throws JsonParseException, IOException { +private boolean writeToVector(ComplexWriter writer, JsonToken t) throws JsonParseException, IOException { + if (!writer.ok()) { + return false; + } + switch (t) { case START_OBJECT: writeData(writer.rootAsMap()); @@ -193,6 +196,10 @@ private boolean writeToVector(ComplexWriter writer, JsonToken t) // map.start(); outside: while(true) { + if (!map.ok()) { + logger.warn("Error reported. Quit writing"); + break; + } JsonToken t = parser.nextToken(); if (t == JsonToken.NOT_AVAILABLE || t == JsonToken.END_OBJECT) { return; @@ -243,11 +250,10 @@ private boolean writeToVector(ComplexWriter writer, JsonToken t) break; } case VALUE_NULL: - if (allTextMode) { + // do check value capacity only if vector is allocated. + if (map.getValueCapacity() > 0) { map.checkValueCapacity(); - break; } - map.checkValueCapacity(); // do nothing as we don't have a type. break; case VALUE_NUMBER_FLOAT: @@ -313,6 +319,10 @@ private boolean writeToVector(ComplexWriter writer, JsonToken t) private void writeData(ListWriter list) throws JsonParseException, IOException { list.start(); outside: while (true) { + if (!list.ok()) { + logger.warn("Error reported. Quit writing"); + break; + } switch (parser.nextToken()) { case START_ARRAY: http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e764479c/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java index 9f934b7..920a4f3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java @@ -51,6 +51,11 @@ public class ComplexWriterImpl extends AbstractFieldWriter implements ComplexWri return container.getField(); } + @Override + public int getValueCapacity() { + return container.getValueCapacity(); + } + public void checkValueCapacity(){ inform(container.getValueCapacity() > idx()); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e764479c/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java index e92626e..417d3ef 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java @@ -43,6 +43,11 @@ public class VectorContainerWriter extends AbstractFieldWriter implements Comple return mapVector.getField(); } + @Override + public int getValueCapacity() { + return mapVector.getValueCapacity(); + } + public void checkValueCapacity(){ inform(mapVector.getValueCapacity() > idx()); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e764479c/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java index ecc8df2..bfefc8d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java @@ -114,7 +114,7 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid } } } catch (AssertionError | Exception e) { - logger.debug("Error while initializing or executing fragment", e); + logger.warn("Error while initializing or executing fragment", e); context.fail(e); internalFail(e); } finally { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e764479c/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/fn/JsonReaderTests.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/fn/JsonReaderTests.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/fn/JsonReaderTests.java new file mode 100644 index 0000000..9a8eedf --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/fn/JsonReaderTests.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector.complex.fn; + + +import java.util.List; +import java.util.Objects; + +import org.apache.drill.BaseTestQuery; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.proto.beans.QueryResult; +import org.apache.drill.exec.record.RecordBatchLoader; +import org.apache.drill.exec.rpc.user.QueryResultBatch; +import org.apache.drill.exec.vector.ValueVector; +import org.junit.Test; + +public class JsonReaderTests extends BaseTestQuery { + + static interface Function<T> { + void apply(T param); + } + + protected void query(final String query, final Function<RecordBatchLoader> testBody) throws Exception { + List<QueryResultBatch> batches = testSqlWithResults(query); + RecordBatchLoader loader = new RecordBatchLoader(client.getAllocator()); + try { + QueryResultBatch batch = batches.get(0); + loader.load(batch.getHeader().getDef(), batch.getData()); + + testBody.apply(loader); + + } finally { + for (QueryResultBatch batch:batches) { + batch.release(); + } + loader.clear(); + } + } + + @Test + public void testIfDrillCanReadSparseRecords() throws Exception { + final String sql = "select * from cp.`vector/complex/fn/sparse.json`"; + query(sql, new Function<RecordBatchLoader>() { + @Override + public void apply(RecordBatchLoader loader) { + assert loader.getRecordCount() == 4 : "invalid record count returned"; + + //XXX: make sure value order matches vector order + final Object[][] values = new Object[][] { + {null, null}, + {1L, null}, + {null, 2L}, + {3L, 3L} + }; + + Object[] row; + Object expected; + Object actual; + for (int r=0;r<values.length;r++) { + row = values[r]; + for (int c=0; c<values[r].length; c++) { + expected = row[c]; + actual = loader.getValueAccessorById(ValueVector.class, c).getValueVector().getAccessor().getObject(r); + assert Objects.equals(expected, actual) : String.format("row:%d - col:%d - expected:%s[%s] - actual:%s[%s]", + r, c, + expected, + expected==null?"null":expected.getClass().getSimpleName(), + actual, + actual==null?"null":actual.getClass().getSimpleName()); + } + } + } + }); + } + + @Test + public void testIfDrillCanReadSparseNestedRecordsWithoutRaisingException() throws Exception { + final String sql = "select * from cp.`vector/complex/fn/nested-with-nulls.json`"; + query(sql, new Function<RecordBatchLoader>() { + @Override + public void apply(RecordBatchLoader loader) { + assert loader.getRecordCount() == 3 : "invalid record count returned"; + + //XXX: make sure value order matches vector order + final Object[][] values = new Object[][] { + {"[{},{},{},{\"name\":\"doe\"},{}]"}, + {"[]"}, + {"[{\"name\":\"john\",\"id\":10}]"}, + }; + + Object[] row; + Object expected; + Object actual; + for (int r=0;r<values.length;r++) { + row = values[r]; + for (int c = 0; c < values[r].length; c++) { + expected = row[c]; + actual = loader.getValueAccessorById(ValueVector.class, c).getValueVector().getAccessor().getObject(r); + assert Objects.equals(actual, expected) : String.format("row:%d - col:%d - expected:%s[%s] - actual:%s[%s]", + r, c, + expected, + expected == null ? "null" : expected.getClass().getSimpleName(), + actual, + actual == null ? "null" : actual.getClass().getSimpleName()); + } + } + } + }); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e764479c/exec/java-exec/src/test/resources/vector/complex/fn/nested-with-nulls.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/vector/complex/fn/nested-with-nulls.json b/exec/java-exec/src/test/resources/vector/complex/fn/nested-with-nulls.json new file mode 100644 index 0000000..1514a47 --- /dev/null +++ b/exec/java-exec/src/test/resources/vector/complex/fn/nested-with-nulls.json @@ -0,0 +1,3 @@ +{"users":[{}, {"id": null, "name":null}, {}, {"name": "doe"}, {}]} +{} +{"users":[{"id": 10, "name":"john"}]} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e764479c/exec/java-exec/src/test/resources/vector/complex/fn/sparse.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/vector/complex/fn/sparse.json b/exec/java-exec/src/test/resources/vector/complex/fn/sparse.json new file mode 100644 index 0000000..d215c53 --- /dev/null +++ b/exec/java-exec/src/test/resources/vector/complex/fn/sparse.json @@ -0,0 +1,4 @@ +{"some":null, "field":null} +{"some":1, "field":null} +{"some":null, "field":2} +{"some":3, "field":3} \ No newline at end of file
