MD-670: Querying MapR-DB JSON Tables returns no results * Use DocumentReader API to emit "_id" field instead of handling it as a special case. * Update the DrillBuf reference field when reallocation happen. * Catch the correct exception when schema change happens and include the field name in the warning message. + Get rid of unused code.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/869cfbdf Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/869cfbdf Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/869cfbdf Branch: refs/heads/master Commit: 869cfbdf0599c708db62fb7a4d308e22ccbe2fb7 Parents: 004aad9 Author: Aditya <adi...@mapr.com> Authored: Mon Feb 1 17:43:42 2016 -0800 Committer: Aditya Kishore <a...@apache.org> Committed: Fri Sep 9 10:08:34 2016 -0700 ---------------------------------------------------------------------- .../maprdb/json/MaprDBJsonRecordReader.java | 94 +++++++------------- .../drill/maprdb/tests/MaprDBTestsSuite.java | 5 ++ .../drill/maprdb/tests/json/TestSimpleJson.java | 62 ++++++------- 3 files changed, 66 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/869cfbdf/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/MaprDBJsonRecordReader.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/MaprDBJsonRecordReader.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/MaprDBJsonRecordReader.java index fd8bf93..b5e4ceb 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/MaprDBJsonRecordReader.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/MaprDBJsonRecordReader.java @@ -37,7 +37,6 @@ import org.apache.drill.exec.ops.OperatorStats; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.store.AbstractRecordReader; import org.apache.drill.exec.store.maprdb.MapRDBSubScanSpec; -import org.apache.drill.exec.store.maprdb.util.CommonFns; import org.apache.drill.exec.vector.BaseValueVector; import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter; import org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter; @@ -49,9 +48,7 @@ import org.ojai.DocumentReader; import org.ojai.DocumentReader.EventType; import org.ojai.DocumentStream; import org.ojai.FieldPath; -import org.ojai.Value; import org.ojai.store.QueryCondition; -import org.ojai.store.QueryCondition.Op; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; @@ -86,9 +83,11 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { private DocumentStream<Document> documentStream; private Iterator<DocumentReader> documentReaderIterators; - + private boolean includeId; + private String currentFieldName; + public MaprDBJsonRecordReader(MapRDBSubScanSpec subScanSpec, List<SchemaPath> projectedColumns, FragmentContext context) { buffer = context.getManagedBuffer(); @@ -98,22 +97,6 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { setColumns(projectedColumns); } - private void addKeyCondition(QueryCondition condition, Op op, byte[] key) { - if (!CommonFns.isNullOrEmpty(key)) { - Value value = IdCodec.decode(key); - switch (value.getType()) { - case STRING: - condition.is(ID_FIELD, op, value.getString()); - return; - case BINARY: - condition.is(ID_FIELD, op, value.getBinary()); - return; - default: - throw new UnsupportedOperationException(""); - } - } - } - @Override protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> columns) { Set<SchemaPath> transformed = Sets.newLinkedHashSet(); @@ -145,7 +128,7 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { try { table = MapRDB.getTable(tableName); - table.setOption(TableOption.EXCLUDEID, true); + table.setOption(TableOption.EXCLUDEID, !includeId); documentStream = table.find(condition, projectedFields); documentReaderIterators = documentStream.documentReaders().iterator(); } catch (DBException e) { @@ -171,25 +154,11 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { throw new IllegalStateException("The document did not start with START_MAP!"); } try { - MapWriter map = writer.rootAsMap(); - if (includeId && reader.getId() != null) { - switch (reader.getId().getType()) { - case BINARY: - writeBinary(map.varBinary(ID_KEY), reader.getId().getBinary()); - break; - case STRING: - writeString(map.varChar(ID_KEY), reader.getId().getString()); - break; - default: - throw new UnsupportedOperationException(reader.getId().getType() + - " is not a supported type for _id field."); - } - } - writeToMap(reader, map); + writeToMap(reader, writer.rootAsMap()); recordCount++; - } catch (IllegalStateException e) { - logger.warn(String.format("Possible schema change at _id: %s", - IdCodec.asString(reader.getId())), e); + } catch (IllegalArgumentException e) { + logger.warn(String.format("Possible schema change at _id: '%s', field: '%s'", + IdCodec.asString(reader.getId()), currentFieldName), e); } } @@ -199,62 +168,60 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { } private void writeToMap(DBDocumentReaderBase reader, MapWriter map) { - String fieldName = null; map.start(); outside: while (true) { EventType event = reader.next(); - if (event == null) break outside; - fieldName = reader.getFieldName(); + if (event == null || event == EventType.END_MAP) break outside; + + currentFieldName = reader.getFieldName(); switch (event) { case NULL: - map.varChar(fieldName).write(null); // treat as VARCHAR for now + map.varChar(currentFieldName).write(null); // treat as VARCHAR for now case BINARY: - writeBinary(map.varBinary(fieldName), reader.getBinary()); + writeBinary(map.varBinary(currentFieldName), reader.getBinary()); break; case BOOLEAN: - map.bit(fieldName).writeBit(reader.getBoolean() ? 1 : 0); + map.bit(currentFieldName).writeBit(reader.getBoolean() ? 1 : 0); break; case STRING: - writeString(map.varChar(fieldName), reader.getString()); + writeString(map.varChar(currentFieldName), reader.getString()); break; case BYTE: - map.tinyInt(fieldName).writeTinyInt(reader.getByte()); + map.tinyInt(currentFieldName).writeTinyInt(reader.getByte()); break; case SHORT: - map.smallInt(fieldName).writeSmallInt(reader.getShort()); + map.smallInt(currentFieldName).writeSmallInt(reader.getShort()); break; case INT: - map.integer(fieldName).writeInt(reader.getInt()); + map.integer(currentFieldName).writeInt(reader.getInt()); break; case LONG: - map.bigInt(fieldName).writeBigInt(reader.getLong()); + map.bigInt(currentFieldName).writeBigInt(reader.getLong()); break; case FLOAT: - map.float4(fieldName).writeFloat4(reader.getFloat()); + map.float4(currentFieldName).writeFloat4(reader.getFloat()); break; case DOUBLE: - map.float8(fieldName).writeFloat8(reader.getDouble()); + map.float8(currentFieldName).writeFloat8(reader.getDouble()); break; case DECIMAL: throw new UnsupportedOperationException("Decimals are currently not supported."); case DATE: - map.date(fieldName).writeDate(reader.getDate().toDate().getTime()); + map.date(currentFieldName).writeDate(reader.getDate().toDate().getTime()); break; case TIME: - map.time(fieldName).writeTime(reader.getTimeInt()); + map.time(currentFieldName).writeTime(reader.getTimeInt()); break; case TIMESTAMP: - map.timeStamp(fieldName).writeTimeStamp(reader.getTimestampLong()); + map.timeStamp(currentFieldName).writeTimeStamp(reader.getTimestampLong()); break; case INTERVAL: throw new UnsupportedOperationException("Interval is currently not supported."); case START_MAP: - writeToMap(reader, map.map(fieldName)); + writeToMap(reader, map.map(currentFieldName)); break; - case END_MAP: - break outside; case START_ARRAY: - writeToList(reader, map.list(fieldName)); + writeToList(reader, map.list(currentFieldName)); break; case END_ARRAY: throw new IllegalStateException("Shouldn't get a END_ARRAY inside a map"); @@ -269,7 +236,8 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { list.startList(); outside: while (true) { EventType event = reader.next(); - if (event == null) break outside; + if (event == null || event == EventType.END_ARRAY) break outside; + switch (event) { case NULL: list.varChar().write(null); // treat as VARCHAR for now @@ -321,8 +289,6 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { case START_ARRAY: writeToList(reader, list.list()); break; - case END_ARRAY: - break outside; default: throw new UnsupportedOperationException("Unsupported type: " + event); } @@ -331,14 +297,14 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { } private void writeBinary(VarBinaryWriter binaryWriter, ByteBuffer buf) { - buffer.reallocIfNeeded(buf.remaining()); + buffer = buffer.reallocIfNeeded(buf.remaining()); buffer.setBytes(0, buf, buf.position(), buf.remaining()); binaryWriter.writeVarBinary(0, buf.remaining(), buffer); } private void writeString(VarCharWriter varCharWriter, String string) { final byte[] strBytes = Bytes.toBytes(string); - buffer.reallocIfNeeded(strBytes.length); + buffer = buffer.reallocIfNeeded(strBytes.length); buffer.setBytes(0, strBytes); varCharWriter.writeVarChar(0, strBytes.length, buffer); } http://git-wip-us.apache.org/repos/asf/drill/blob/869cfbdf/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/MaprDBTestsSuite.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/MaprDBTestsSuite.java b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/MaprDBTestsSuite.java index 0f54796..e81aa09 100644 --- a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/MaprDBTestsSuite.java +++ b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/MaprDBTestsSuite.java @@ -108,6 +108,11 @@ public class MaprDBTestsSuite { " \"location\": \"/tmp\"," + " \"writable\": false," + " \"defaultInputFormat\": \"maprdb\"" + + " }," + + " \"root\": {" + + " \"location\": \"/\"," + + " \"writable\": false," + + " \"defaultInputFormat\": \"maprdb\"" + " }" + " }," + " \"formats\": {" + http://git-wip-us.apache.org/repos/asf/drill/blob/869cfbdf/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestSimpleJson.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestSimpleJson.java b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestSimpleJson.java index f4c7e89..f05b87a 100644 --- a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestSimpleJson.java +++ b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestSimpleJson.java @@ -67,10 +67,10 @@ public class TestSimpleJson extends BaseTestQuery { + " name = 'Sprint'" ; runSQLAndVerifyCount(sql, 1); - + final String[] expectedPlan = {"condition=\\(name = \"Sprint\"\\)"}; final String[] excludedPlan = {}; - + PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan); } @@ -85,13 +85,13 @@ public class TestSimpleJson extends BaseTestQuery { + " name LIKE 'S%'" ; runSQLAndVerifyCount(sql, 3); - + final String[] expectedPlan = {"condition=\\(name MATCHES \"\\^\\\\\\\\QS\\\\\\\\E\\.\\*\\$\"\\)"}; final String[] excludedPlan = {}; - + PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan); } - + @Test public void testPushdownStringNotEqual() throws Exception { setColumnWidths(new int[] {25, 40, 40, 40}); @@ -103,10 +103,10 @@ public class TestSimpleJson extends BaseTestQuery { + " name <> 'Sprint'" ; runSQLAndVerifyCount(sql, 9); - + final String[] expectedPlan = {"condition=\\(name != \"Sprint\"\\)"}; final String[] excludedPlan = {}; - + PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan); } @@ -121,10 +121,10 @@ public class TestSimpleJson extends BaseTestQuery { + " zip = 85260" ; runSQLAndVerifyCount(sql, 1); - + final String[] expectedPlan = {"condition=\\(zip = \\{\"\\$numberLong\":85260\\}\\)"}; final String[] excludedPlan = {}; - + PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan); } @@ -141,10 +141,10 @@ public class TestSimpleJson extends BaseTestQuery { + " city = 'Las Vegas'" ; runSQLAndVerifyCount(sql, 4); - + final String[] expectedPlan = {"condition=\\(\\(zip = \\{\"\\$numberLong\":85260\\}\\) or \\(city = \"Las Vegas\"\\)\\)"}; final String[] excludedPlan = {}; - + PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan); } @@ -159,10 +159,10 @@ public class TestSimpleJson extends BaseTestQuery { + " _id = 'jFTZmywe7StuZ2hEjxyA'" ; runSQLAndVerifyCount(sql, 1); - + final String[] expectedPlan = {"condition=\\(_id = \"jFTZmywe7StuZ2hEjxyA\"\\)"}; final String[] excludedPlan ={}; - + PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan); } @@ -179,10 +179,10 @@ public class TestSimpleJson extends BaseTestQuery { + " name = 'Subway'" ; runSQLAndVerifyCount(sql, 1); - + final String[] expectedPlan = {"condition=\\(\\(_id = \"jFTZmywe7StuZ2hEjxyA\"\\) and \\(name = \"Subway\"\\)\\)"}; final String[] excludedPlan ={}; - + PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan); } @@ -197,10 +197,10 @@ public class TestSimpleJson extends BaseTestQuery { + " b.`attributes.Ambience.casual` = false" ; runSQLAndVerifyCount(sql, 1); - + final String[] expectedPlan = {"condition=\\(attributes.Ambience.casual = false\\)"}; final String[] excludedPlan ={}; - + PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan); } @@ -215,10 +215,10 @@ public class TestSimpleJson extends BaseTestQuery { + " b.`attributes.Attire` = 'casual'" ; runSQLAndVerifyCount(sql, 4); - + final String[] expectedPlan = {"condition=\\(attributes.Attire = \"casual\"\\)"}; final String[] excludedPlan ={}; - + PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan); } @Test @@ -233,10 +233,10 @@ public class TestSimpleJson extends BaseTestQuery { + " business.`attributes.Ambience.casual` IS NULL" ; runSQLAndVerifyCount(sql, 7); - + final String[] expectedPlan = {"condition=\\(attributes.Ambience.casual = null\\)"}; final String[] excludedPlan ={}; - + PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan); } @@ -252,10 +252,10 @@ public class TestSimpleJson extends BaseTestQuery { + " b.`attributes.Ambience.casual` IS NOT NULL" ; runSQLAndVerifyCount(sql, 3); - + final String[] expectedPlan = {"condition=\\(attributes.Ambience.casual != null\\)"}; final String[] excludedPlan ={}; - + PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan); } @@ -270,10 +270,10 @@ public class TestSimpleJson extends BaseTestQuery { + " b.`attributes.Accepts Credit Cards` IS NULL" ; runSQLAndVerifyCount(sql, 3); - + final String[] expectedPlan = {"condition=\\(attributes.Accepts Credit Cards = null\\)"}; final String[] excludedPlan ={}; - + PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan); } @@ -287,10 +287,10 @@ public class TestSimpleJson extends BaseTestQuery { + " stars > 4.0" ; runSQLAndVerifyCount(sql, 2); - + final String[] expectedPlan = {"condition=\\(stars > 4\\)"}; final String[] excludedPlan ={}; - + PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan); } @@ -305,13 +305,13 @@ public class TestSimpleJson extends BaseTestQuery { + " stars > 4.1" ; runSQLAndVerifyCount(sql, 1); - + final String[] expectedPlan = {"condition=\\(\\(attributes.Good For.lunch = true\\) and \\(stars > 4.1\\)\\)"}; final String[] excludedPlan ={}; - + PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan); } - + /* @Test public void testPushDownSubField5() throws Exception { @@ -363,7 +363,7 @@ public class TestSimpleJson extends BaseTestQuery { runSQLAndVerifyCount(sql, 1); } */ - + protected List<QueryDataBatch> runHBaseSQLlWithResults(String sql) throws Exception { System.out.println("Running query:\n" + sql); return testSqlWithResults(sql);