MD-670: Querying MapR-DB JSON Tables returns no results

* Use DocumentReader API to emit "_id" field instead of handling it as a 
special case.
* Update the DrillBuf reference field when reallocation happen.
* Catch the correct exception when schema change happens and include the field 
name in the warning message.
+ Get rid of unused code.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/869cfbdf
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/869cfbdf
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/869cfbdf

Branch: refs/heads/master
Commit: 869cfbdf0599c708db62fb7a4d308e22ccbe2fb7
Parents: 004aad9
Author: Aditya <adi...@mapr.com>
Authored: Mon Feb 1 17:43:42 2016 -0800
Committer: Aditya Kishore <a...@apache.org>
Committed: Fri Sep 9 10:08:34 2016 -0700

----------------------------------------------------------------------
 .../maprdb/json/MaprDBJsonRecordReader.java     | 94 +++++++-------------
 .../drill/maprdb/tests/MaprDBTestsSuite.java    |  5 ++
 .../drill/maprdb/tests/json/TestSimpleJson.java | 62 ++++++-------
 3 files changed, 66 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/869cfbdf/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/MaprDBJsonRecordReader.java
----------------------------------------------------------------------
diff --git 
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/MaprDBJsonRecordReader.java
 
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/MaprDBJsonRecordReader.java
index fd8bf93..b5e4ceb 100644
--- 
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/MaprDBJsonRecordReader.java
+++ 
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/MaprDBJsonRecordReader.java
@@ -37,7 +37,6 @@ import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.maprdb.MapRDBSubScanSpec;
-import org.apache.drill.exec.store.maprdb.util.CommonFns;
 import org.apache.drill.exec.vector.BaseValueVector;
 import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter;
@@ -49,9 +48,7 @@ import org.ojai.DocumentReader;
 import org.ojai.DocumentReader.EventType;
 import org.ojai.DocumentStream;
 import org.ojai.FieldPath;
-import org.ojai.Value;
 import org.ojai.store.QueryCondition;
-import org.ojai.store.QueryCondition.Op;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
@@ -86,9 +83,11 @@ public class MaprDBJsonRecordReader extends 
AbstractRecordReader {
   private DocumentStream<Document> documentStream;
 
   private Iterator<DocumentReader> documentReaderIterators;
-  
+
   private boolean includeId;
 
+  private String currentFieldName;
+
   public MaprDBJsonRecordReader(MapRDBSubScanSpec subScanSpec,
       List<SchemaPath> projectedColumns, FragmentContext context) {
     buffer = context.getManagedBuffer();
@@ -98,22 +97,6 @@ public class MaprDBJsonRecordReader extends 
AbstractRecordReader {
     setColumns(projectedColumns);
   }
 
-  private void addKeyCondition(QueryCondition condition, Op op, byte[] key) {
-    if (!CommonFns.isNullOrEmpty(key)) {
-      Value value = IdCodec.decode(key);
-      switch (value.getType()) {
-      case STRING:
-        condition.is(ID_FIELD, op, value.getString());
-        return;
-      case BINARY:
-        condition.is(ID_FIELD, op, value.getBinary());
-        return;
-      default:
-        throw new UnsupportedOperationException("");
-      }
-    }
-  }
-
   @Override
   protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> 
columns) {
     Set<SchemaPath> transformed = Sets.newLinkedHashSet();
@@ -145,7 +128,7 @@ public class MaprDBJsonRecordReader extends 
AbstractRecordReader {
 
     try {
       table = MapRDB.getTable(tableName);
-      table.setOption(TableOption.EXCLUDEID, true);
+      table.setOption(TableOption.EXCLUDEID, !includeId);
       documentStream = table.find(condition, projectedFields);
       documentReaderIterators = documentStream.documentReaders().iterator();
     } catch (DBException e) {
@@ -171,25 +154,11 @@ public class MaprDBJsonRecordReader extends 
AbstractRecordReader {
         throw new IllegalStateException("The document did not start with 
START_MAP!");
       }
       try {
-        MapWriter map = writer.rootAsMap();
-        if (includeId && reader.getId() != null) {
-          switch (reader.getId().getType()) {
-          case BINARY:
-            writeBinary(map.varBinary(ID_KEY), reader.getId().getBinary());
-            break;
-          case STRING:
-            writeString(map.varChar(ID_KEY), reader.getId().getString());
-            break;
-          default:
-            throw new UnsupportedOperationException(reader.getId().getType() +
-                " is not a supported type for _id field.");
-          }
-        }
-        writeToMap(reader, map);
+        writeToMap(reader, writer.rootAsMap());
         recordCount++;
-      } catch (IllegalStateException e) {
-        logger.warn(String.format("Possible schema change at _id: %s",
-            IdCodec.asString(reader.getId())), e);
+      } catch (IllegalArgumentException e) {
+        logger.warn(String.format("Possible schema change at _id: '%s', field: 
'%s'",
+            IdCodec.asString(reader.getId()), currentFieldName), e);
       }
     }
 
@@ -199,62 +168,60 @@ public class MaprDBJsonRecordReader extends 
AbstractRecordReader {
   }
 
   private void writeToMap(DBDocumentReaderBase reader, MapWriter map) {
-    String fieldName = null;
     map.start();
     outside: while (true) {
       EventType event = reader.next();
-      if (event == null) break outside;
-      fieldName = reader.getFieldName();
+      if (event == null || event == EventType.END_MAP) break outside;
+
+      currentFieldName = reader.getFieldName();
       switch (event) {
       case NULL:
-        map.varChar(fieldName).write(null); // treat as VARCHAR for now
+        map.varChar(currentFieldName).write(null); // treat as VARCHAR for now
       case BINARY:
-        writeBinary(map.varBinary(fieldName), reader.getBinary());
+        writeBinary(map.varBinary(currentFieldName), reader.getBinary());
         break;
       case BOOLEAN:
-        map.bit(fieldName).writeBit(reader.getBoolean() ? 1 : 0);
+        map.bit(currentFieldName).writeBit(reader.getBoolean() ? 1 : 0);
         break;
       case STRING:
-        writeString(map.varChar(fieldName), reader.getString());
+        writeString(map.varChar(currentFieldName), reader.getString());
         break;
       case BYTE:
-        map.tinyInt(fieldName).writeTinyInt(reader.getByte());
+        map.tinyInt(currentFieldName).writeTinyInt(reader.getByte());
         break;
       case SHORT:
-        map.smallInt(fieldName).writeSmallInt(reader.getShort());
+        map.smallInt(currentFieldName).writeSmallInt(reader.getShort());
         break;
       case INT:
-        map.integer(fieldName).writeInt(reader.getInt());
+        map.integer(currentFieldName).writeInt(reader.getInt());
         break;
       case LONG:
-        map.bigInt(fieldName).writeBigInt(reader.getLong());
+        map.bigInt(currentFieldName).writeBigInt(reader.getLong());
         break;
       case FLOAT:
-        map.float4(fieldName).writeFloat4(reader.getFloat());
+        map.float4(currentFieldName).writeFloat4(reader.getFloat());
         break;
       case DOUBLE:
-        map.float8(fieldName).writeFloat8(reader.getDouble());
+        map.float8(currentFieldName).writeFloat8(reader.getDouble());
         break;
       case DECIMAL:
         throw new UnsupportedOperationException("Decimals are currently not 
supported.");
       case DATE:
-        map.date(fieldName).writeDate(reader.getDate().toDate().getTime());
+        
map.date(currentFieldName).writeDate(reader.getDate().toDate().getTime());
         break;
       case TIME:
-        map.time(fieldName).writeTime(reader.getTimeInt());
+        map.time(currentFieldName).writeTime(reader.getTimeInt());
         break;
       case TIMESTAMP:
-        map.timeStamp(fieldName).writeTimeStamp(reader.getTimestampLong());
+        
map.timeStamp(currentFieldName).writeTimeStamp(reader.getTimestampLong());
         break;
       case INTERVAL:
         throw new UnsupportedOperationException("Interval is currently not 
supported.");
       case START_MAP:
-        writeToMap(reader, map.map(fieldName));
+        writeToMap(reader, map.map(currentFieldName));
         break;
-      case END_MAP:
-        break outside;
       case START_ARRAY:
-        writeToList(reader, map.list(fieldName));
+        writeToList(reader, map.list(currentFieldName));
         break;
       case END_ARRAY:
         throw new IllegalStateException("Shouldn't get a END_ARRAY inside a 
map");
@@ -269,7 +236,8 @@ public class MaprDBJsonRecordReader extends 
AbstractRecordReader {
     list.startList();
     outside: while (true) {
       EventType event = reader.next();
-      if (event == null) break outside;
+      if (event == null || event == EventType.END_ARRAY) break outside;
+
       switch (event) {
       case NULL:
         list.varChar().write(null); // treat as VARCHAR for now
@@ -321,8 +289,6 @@ public class MaprDBJsonRecordReader extends 
AbstractRecordReader {
       case START_ARRAY:
         writeToList(reader, list.list());
         break;
-      case END_ARRAY:
-        break outside;
       default:
         throw new UnsupportedOperationException("Unsupported type: " + event);
       }
@@ -331,14 +297,14 @@ public class MaprDBJsonRecordReader extends 
AbstractRecordReader {
   }
 
   private void writeBinary(VarBinaryWriter binaryWriter, ByteBuffer buf) {
-    buffer.reallocIfNeeded(buf.remaining());
+    buffer = buffer.reallocIfNeeded(buf.remaining());
     buffer.setBytes(0, buf, buf.position(), buf.remaining());
     binaryWriter.writeVarBinary(0, buf.remaining(), buffer);
   }
 
   private void writeString(VarCharWriter varCharWriter, String string) {
     final byte[] strBytes = Bytes.toBytes(string);
-    buffer.reallocIfNeeded(strBytes.length);
+    buffer = buffer.reallocIfNeeded(strBytes.length);
     buffer.setBytes(0, strBytes);
     varCharWriter.writeVarChar(0, strBytes.length, buffer);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/869cfbdf/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/MaprDBTestsSuite.java
----------------------------------------------------------------------
diff --git 
a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/MaprDBTestsSuite.java
 
b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/MaprDBTestsSuite.java
index 0f54796..e81aa09 100644
--- 
a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/MaprDBTestsSuite.java
+++ 
b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/MaprDBTestsSuite.java
@@ -108,6 +108,11 @@ public class MaprDBTestsSuite {
               "      \"location\": \"/tmp\"," +
               "      \"writable\": false," +
               "      \"defaultInputFormat\": \"maprdb\"" +
+              "    }," +
+              "    \"root\": {" +
+              "      \"location\": \"/\"," +
+              "      \"writable\": false," +
+              "      \"defaultInputFormat\": \"maprdb\"" +
               "    }" +
               "  }," +
               "  \"formats\": {" +

http://git-wip-us.apache.org/repos/asf/drill/blob/869cfbdf/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestSimpleJson.java
----------------------------------------------------------------------
diff --git 
a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestSimpleJson.java
 
b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestSimpleJson.java
index f4c7e89..f05b87a 100644
--- 
a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestSimpleJson.java
+++ 
b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestSimpleJson.java
@@ -67,10 +67,10 @@ public class TestSimpleJson extends BaseTestQuery {
         + " name = 'Sprint'"
         ;
     runSQLAndVerifyCount(sql, 1);
-    
+
     final String[] expectedPlan = {"condition=\\(name = \"Sprint\"\\)"};
     final String[] excludedPlan = {};
-    
+
     PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan);
   }
 
@@ -85,13 +85,13 @@ public class TestSimpleJson extends BaseTestQuery {
         + " name LIKE 'S%'"
         ;
     runSQLAndVerifyCount(sql, 3);
-    
+
     final String[] expectedPlan = {"condition=\\(name MATCHES 
\"\\^\\\\\\\\QS\\\\\\\\E\\.\\*\\$\"\\)"};
     final String[] excludedPlan = {};
-    
+
     PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan);
   }
-    
+
   @Test
   public void testPushdownStringNotEqual() throws Exception {
     setColumnWidths(new int[] {25, 40, 40, 40});
@@ -103,10 +103,10 @@ public class TestSimpleJson extends BaseTestQuery {
         + " name <> 'Sprint'"
         ;
     runSQLAndVerifyCount(sql, 9);
-    
+
     final String[] expectedPlan = {"condition=\\(name != \"Sprint\"\\)"};
     final String[] excludedPlan = {};
-    
+
     PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan);
   }
 
@@ -121,10 +121,10 @@ public class TestSimpleJson extends BaseTestQuery {
         + " zip = 85260"
         ;
     runSQLAndVerifyCount(sql, 1);
-    
+
     final String[] expectedPlan = {"condition=\\(zip = 
\\{\"\\$numberLong\":85260\\}\\)"};
     final String[] excludedPlan = {};
-    
+
     PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan);
   }
 
@@ -141,10 +141,10 @@ public class TestSimpleJson extends BaseTestQuery {
         + " city = 'Las Vegas'"
         ;
     runSQLAndVerifyCount(sql, 4);
-    
+
     final String[] expectedPlan = {"condition=\\(\\(zip = 
\\{\"\\$numberLong\":85260\\}\\) or \\(city = \"Las Vegas\"\\)\\)"};
     final String[] excludedPlan = {};
-    
+
     PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan);
   }
 
@@ -159,10 +159,10 @@ public class TestSimpleJson extends BaseTestQuery {
         + " _id = 'jFTZmywe7StuZ2hEjxyA'"
         ;
     runSQLAndVerifyCount(sql, 1);
-    
+
     final String[] expectedPlan = {"condition=\\(_id = 
\"jFTZmywe7StuZ2hEjxyA\"\\)"};
     final String[] excludedPlan ={};
-    
+
     PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan);
   }
 
@@ -179,10 +179,10 @@ public class TestSimpleJson extends BaseTestQuery {
         + " name = 'Subway'"
         ;
     runSQLAndVerifyCount(sql, 1);
-    
+
     final String[] expectedPlan = {"condition=\\(\\(_id = 
\"jFTZmywe7StuZ2hEjxyA\"\\) and \\(name = \"Subway\"\\)\\)"};
     final String[] excludedPlan ={};
-    
+
     PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan);
   }
 
@@ -197,10 +197,10 @@ public class TestSimpleJson extends BaseTestQuery {
         + " b.`attributes.Ambience.casual` = false"
         ;
     runSQLAndVerifyCount(sql, 1);
-    
+
     final String[] expectedPlan = {"condition=\\(attributes.Ambience.casual = 
false\\)"};
     final String[] excludedPlan ={};
-    
+
     PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan);
   }
 
@@ -215,10 +215,10 @@ public class TestSimpleJson extends BaseTestQuery {
         + " b.`attributes.Attire` = 'casual'"
         ;
     runSQLAndVerifyCount(sql, 4);
-    
+
     final String[] expectedPlan = {"condition=\\(attributes.Attire = 
\"casual\"\\)"};
     final String[] excludedPlan ={};
-    
+
     PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan);
   }
   @Test
@@ -233,10 +233,10 @@ public class TestSimpleJson extends BaseTestQuery {
         + " business.`attributes.Ambience.casual` IS NULL"
         ;
     runSQLAndVerifyCount(sql, 7);
-    
+
     final String[] expectedPlan = {"condition=\\(attributes.Ambience.casual = 
null\\)"};
     final String[] excludedPlan ={};
-    
+
     PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan);
   }
 
@@ -252,10 +252,10 @@ public class TestSimpleJson extends BaseTestQuery {
         + " b.`attributes.Ambience.casual` IS NOT NULL"
         ;
     runSQLAndVerifyCount(sql, 3);
-    
+
     final String[] expectedPlan = {"condition=\\(attributes.Ambience.casual != 
null\\)"};
     final String[] excludedPlan ={};
-    
+
     PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan);
   }
 
@@ -270,10 +270,10 @@ public class TestSimpleJson extends BaseTestQuery {
         + " b.`attributes.Accepts Credit Cards` IS NULL"
         ;
     runSQLAndVerifyCount(sql, 3);
-    
+
     final String[] expectedPlan = {"condition=\\(attributes.Accepts Credit 
Cards = null\\)"};
     final String[] excludedPlan ={};
-    
+
     PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan);
   }
 
@@ -287,10 +287,10 @@ public class TestSimpleJson extends BaseTestQuery {
         + " stars > 4.0"
         ;
     runSQLAndVerifyCount(sql, 2);
-    
+
     final String[] expectedPlan = {"condition=\\(stars > 4\\)"};
     final String[] excludedPlan ={};
-    
+
     PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan);
   }
 
@@ -305,13 +305,13 @@ public class TestSimpleJson extends BaseTestQuery {
         + " stars > 4.1"
         ;
     runSQLAndVerifyCount(sql, 1);
-    
+
     final String[] expectedPlan = {"condition=\\(\\(attributes.Good For.lunch 
= true\\) and \\(stars > 4.1\\)\\)"};
     final String[] excludedPlan ={};
-    
+
     PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan);
   }
-  
+
   /*
   @Test
   public void testPushDownSubField5() throws Exception {
@@ -363,7 +363,7 @@ public class TestSimpleJson extends BaseTestQuery {
     runSQLAndVerifyCount(sql, 1);
   }
   */
-  
+
   protected List<QueryDataBatch> runHBaseSQLlWithResults(String sql) throws 
Exception {
     System.out.println("Running query:\n" + sql);
     return testSqlWithResults(sql);

Reply via email to