MD-572: Column names in MapR-DB JSON tables are case-sensitive Disable pushdown of both filter and projects (by default set to false).
This will allow Drill to handle both of these operators in a case-insensitive way. Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/156819d0 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/156819d0 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/156819d0 Branch: refs/heads/master Commit: 156819d0470b95a2822c03a498ed5c5b872d2022 Parents: c74d75c Author: Aditya <adi...@mapr.com> Authored: Tue Mar 8 16:57:52 2016 -0800 Committer: Aditya Kishore <a...@apache.org> Committed: Fri Sep 9 10:08:39 2016 -0700 ---------------------------------------------------------------------- .../store/mapr/db/MapRDBFormatPluginConfig.java | 16 +- .../store/mapr/db/MapRDBPushFilterIntoScan.java | 4 +- .../store/mapr/db/json/JsonTableGroupScan.java | 11 +- .../mapr/db/json/MaprDBJsonRecordReader.java | 382 +++++++------------ .../drill/maprdb/tests/json/TestSimpleJson.java | 32 +- 5 files changed, 200 insertions(+), 245 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/156819d0/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java index 82b360c..7295265 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java @@ -27,8 +27,9 @@ import com.fasterxml.jackson.annotation.JsonTypeName; @JsonTypeName("maprdb") @JsonInclude(Include.NON_DEFAULT) public class MapRDBFormatPluginConfig extends TableFormatPluginConfig { - private boolean allTextMode = false; - private boolean readAllNumbersAsDouble = false; + public boolean allTextMode = false; + public boolean readAllNumbersAsDouble = false; + public boolean enablePushdown = true; @Override public int hashCode() { @@ -42,6 +43,8 @@ public class MapRDBFormatPluginConfig extends TableFormatPluginConfig { return false; } else if (allTextMode != other.allTextMode) { return false; + } else if (enablePushdown != other.enablePushdown) { + return false; } return true; @@ -65,4 +68,13 @@ public class MapRDBFormatPluginConfig extends TableFormatPluginConfig { readAllNumbersAsDouble = read; } + public boolean isEnablePushdown() { + return enablePushdown; + } + + @JsonProperty("enablePushdown") + public void setEnablePushdown(boolean enablePushdown) { + this.enablePushdown = enablePushdown; + } + } http://git-wip-us.apache.org/repos/asf/drill/blob/156819d0/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java index 7292182..6a286a8 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java @@ -112,7 +112,8 @@ public abstract class MapRDBPushFilterIntoScan extends StoragePluginOptimizerRul FilterPrel filter, final ProjectPrel project, ScanPrel scan, JsonTableGroupScan groupScan, RexNode condition) { - if (groupScan.isFilterPushedDown()) { + if (groupScan.isDisablePushdown() // Do not pushdown filter if it is disabled in plugin configuration + || groupScan.isFilterPushedDown()) { // see below /* * The rule can get triggered again due to the transformed "scan => filter" sequence * created by the earlier execution of this rule when we could not do a complete @@ -202,4 +203,5 @@ public abstract class MapRDBPushFilterIntoScan extends StoragePluginOptimizerRul call.transformTo(filter.copy(filter.getTraitSet(), ImmutableList.of(childRel))); } } + } http://git-wip-us.apache.org/repos/asf/drill/blob/156819d0/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java index 9e23af7..0c8ffda 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java @@ -112,7 +112,7 @@ public class JsonTableGroupScan extends MapRDBGroupScan { regionsToScan = new TreeMap<TabletFragmentInfo, String>(); for (TabletInfo tabletInfo : tabletInfos) { TabletInfoImpl tabletInfoImpl = (TabletInfoImpl) tabletInfo; - if (!foundStartRegion + if (!foundStartRegion && !isNullOrEmpty(scanSpec.getStartRow()) && !tabletInfoImpl.containsRow(scanSpec.getStartRow())) { continue; @@ -171,6 +171,15 @@ public class JsonTableGroupScan extends MapRDBGroupScan { return scanSpec.getTableName(); } + public boolean isDisablePushdown() { + return !formatPluginConfig.isEnablePushdown(); + } + + @JsonIgnore + public boolean canPushdownProjects(List<SchemaPath> columns) { + return formatPluginConfig.isEnablePushdown(); + } + @Override public String toString() { return "JsonTableGroupScan [ScanSpec=" + scanSpec + ", columns=" + columns + "]"; http://git-wip-us.apache.org/repos/asf/drill/blob/156819d0/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java index 7fbcd1b..cb86e32 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java @@ -18,10 +18,8 @@ package org.apache.drill.exec.store.mapr.db.json; import static org.ojai.DocumentConstants.ID_KEY; -import io.netty.buffer.DrillBuf; import java.nio.ByteBuffer; -import java.nio.charset.Charset; import java.util.Collection; import java.util.Iterator; import java.util.List; @@ -42,11 +40,8 @@ import org.apache.drill.exec.store.AbstractRecordReader; import org.apache.drill.exec.store.mapr.db.MapRDBFormatPluginConfig; import org.apache.drill.exec.store.mapr.db.MapRDBSubScanSpec; import org.apache.drill.exec.vector.BaseValueVector; +import org.apache.drill.exec.vector.complex.impl.MapOrListWriterImpl; import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter; -import org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter; -import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter; -import org.apache.drill.exec.vector.complex.writer.VarBinaryWriter; -import org.apache.drill.exec.vector.complex.writer.VarCharWriter; import org.ojai.DocumentReader; import org.ojai.DocumentReader.EventType; import org.ojai.DocumentStream; @@ -54,7 +49,6 @@ import org.ojai.FieldPath; import org.ojai.FieldSegment; import org.ojai.Value; import org.ojai.store.QueryCondition; -import org.ojai.types.OTime; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; @@ -68,10 +62,13 @@ import com.mapr.db.ojai.DBDocumentReaderBase; import com.mapr.db.util.ByteBufs; import com.mapr.org.apache.hadoop.hbase.util.Bytes; +import io.netty.buffer.DrillBuf; + public class MaprDBJsonRecordReader extends AbstractRecordReader { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MaprDBJsonRecordReader.class); public static final SchemaPath ID_PATH = SchemaPath.getSimplePath(ID_KEY); + private final long MILLISECONDS_IN_A_DAY = (long)1000 * 60 * 60 * 24; private Table table; private QueryCondition condition; @@ -79,7 +76,7 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { private final String tableName; private OperatorContext operatorContext; - private VectorContainerWriter writer; + private VectorContainerWriter vectorWriter; private DrillBuf buffer; @@ -91,8 +88,8 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { private boolean idOnly; private final boolean unionEnabled; private final boolean readNumbersAsDouble; + private boolean disablePushdown; private final boolean allTextMode; - private final long MILLISECONDS_IN_A_DAY = (long)1000 * 60 * 60 * 24; public MaprDBJsonRecordReader(MapRDBSubScanSpec subScanSpec, MapRDBFormatPluginConfig formatPluginConfig, @@ -114,12 +111,13 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { unionEnabled = context.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE); readNumbersAsDouble = formatPluginConfig.isReadAllNumbersAsDouble(); allTextMode = formatPluginConfig.isAllTextMode(); + disablePushdown = !formatPluginConfig.isEnablePushdown(); } @Override protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> columns) { Set<SchemaPath> transformed = Sets.newLinkedHashSet(); - if (!isStarQuery()) { + if (!isStarQuery() && !disablePushdown) { Set<FieldPath> projectedFieldsSet = Sets.newTreeSet(); for (SchemaPath column : columns) { if (column.getRootSegment().getPath().equalsIgnoreCase(ID_KEY)) { @@ -154,7 +152,7 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { @Override public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException { - this.writer = new VectorContainerWriter(output, unionEnabled); + this.vectorWriter = new VectorContainerWriter(output, unionEnabled); this.operatorContext = context; try { @@ -172,8 +170,8 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { Stopwatch watch = Stopwatch.createUnstarted(); watch.start(); - writer.allocate(); - writer.reset(); + vectorWriter.allocate(); + vectorWriter.reset(); int recordCount = 0; DBDocumentReaderBase reader = null; @@ -182,24 +180,18 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { try { reader = nextDocumentReader(); if (reader == null) break; - writer.setPosition(recordCount); + + vectorWriter.setPosition(recordCount); + MapOrListWriterImpl writer = new MapOrListWriterImpl(vectorWriter.rootAsMap()); if (idOnly) { Value id = reader.getId(); - MapWriter map = writer.rootAsMap(); - try { switch(id.getType()) { case STRING: - writeString(map.varChar(ID_KEY), id.getString()); - recordCount++; + writeString(writer, ID_KEY, id.getString()); break; case BINARY: - if (allTextMode) { - writeString(map.varChar(ID_KEY), new String(id.getBinary().array(), Charset.forName("UTF-8"))); - } else { - writeBinary(map.varBinary(ID_KEY), id.getBinary()); - } - recordCount++; + writeBinary(writer, ID_KEY, id.getBinary()); break; default: throw new UnsupportedOperationException(id.getType() + @@ -213,9 +205,9 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { if (reader.next() != EventType.START_MAP) { throw dataReadError("The document did not start with START_MAP!"); } - writeToMap(reader, writer.rootAsMap()); - recordCount++; + writeToListOrMap(writer, reader); } + recordCount++; } catch (UserException e) { throw UserException.unsupportedError(e) .addContext(String.format("Table: %s, document id: '%s'", @@ -225,132 +217,74 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { } } - writer.setValueCount(recordCount); + vectorWriter.setValueCount(recordCount); logger.debug("Took {} ms to get {} records", watch.elapsed(TimeUnit.MILLISECONDS), recordCount); return recordCount; } - private void writeToMap(DBDocumentReaderBase reader, MapWriter map) { - map.start(); + private void writeToListOrMap(MapOrListWriterImpl writer, DBDocumentReaderBase reader) { + String fieldName = null; + writer.start(); outside: while (true) { EventType event = reader.next(); - if (event == null || event == EventType.END_MAP) break outside; + if (event == null + || event == EventType.END_MAP + || event == EventType.END_ARRAY) { + break outside; + } else if (reader.inMap()) { + fieldName = reader.getFieldName(); + } - String fieldName = reader.getFieldName(); try { switch (event) { case NULL: break; // not setting the field will leave it as null case BINARY: - if (allTextMode) { - writeString(map.varChar(fieldName), new String(reader.getBinary().array(), Charset.forName("UTF-8"))); - } else { - writeBinary(map.varBinary(fieldName), reader.getBinary()); - } + writeBinary(writer, fieldName, reader.getBinary()); break; case BOOLEAN: - if (allTextMode) { - writeString(map.varChar(fieldName), String.valueOf(reader.getBoolean())); - } else { - map.bit(fieldName).writeBit(reader.getBoolean() ? 1 : 0); - } + writeBoolean(writer, fieldName, reader); break; case STRING: - writeString(map.varChar(fieldName), reader.getString()); + writeString(writer, fieldName, reader.getString()); break; case BYTE: - if (allTextMode) { - writeString(map.varChar(fieldName), String.valueOf(reader.getByte())); - } else if (readNumbersAsDouble) { - map.float8(fieldName).writeFloat8(reader.getByte()); - } else { - map.tinyInt(fieldName).writeTinyInt(reader.getByte()); - } + writeByte(writer, fieldName, reader); break; case SHORT: - if (allTextMode) { - writeString(map.varChar(fieldName), String.valueOf(reader.getShort())); - } else if (readNumbersAsDouble) { - map.float8(fieldName).writeFloat8(reader.getShort()); - } else { - map.smallInt(fieldName).writeSmallInt(reader.getShort()); - } + writeShort(writer, fieldName, reader); break; case INT: - if (allTextMode) { - writeString(map.varChar(fieldName), String.valueOf(reader.getInt())); - } else if (readNumbersAsDouble) { - map.float8(fieldName).writeFloat8(reader.getInt()); - } else { - map.integer(fieldName).writeInt(reader.getInt()); - } + writeInt(writer, fieldName, reader); break; case LONG: - if (allTextMode) { - writeString(map.varChar(fieldName), String.valueOf(reader.getLong())); - } else if (readNumbersAsDouble) { - map.float8(fieldName).writeFloat8(reader.getLong()); - } else { - map.bigInt(fieldName).writeBigInt(reader.getLong()); - } + writeLong(writer, fieldName, reader); break; case FLOAT: - if (allTextMode) { - writeString(map.varChar(fieldName), String.valueOf(reader.getFloat())); - } else if (readNumbersAsDouble) { - map.float8(fieldName).writeFloat8(reader.getFloat()); - } else { - map.float4(fieldName).writeFloat4(reader.getFloat()); - } + writeFloat(writer, fieldName, reader); break; case DOUBLE: - if (allTextMode) { - writeString(map.varChar(fieldName), String.valueOf(reader.getDouble())); - } else { - map.float8(fieldName).writeFloat8(reader.getDouble()); - } + writeDouble(writer, fieldName, reader); break; case DECIMAL: throw unsupportedError("Decimal type is currently not supported."); case DATE: - if (allTextMode) { - writeString(map.varChar(fieldName), reader.getDate().toString()); - } else { - - long milliSecondsSinceEpoch = reader.getDate().toDaysSinceEpoch() * MILLISECONDS_IN_A_DAY; - map.date(fieldName).writeDate(milliSecondsSinceEpoch); - } + writeDate(writer, fieldName, reader); break; case TIME: - if (allTextMode) { - writeString(map.varChar(fieldName), reader.getTime().toString()); - } else { - OTime t = reader.getTime(); - int h = t.getHour(); - int m = t.getMinute(); - int s = t.getSecond(); - int ms = t.getMilliSecond(); - int millisOfDay = ms + (s + ((m + (h * 60)) * 60)) * 1000; - map.time(fieldName).writeTime(millisOfDay); - } + writeTime(writer, fieldName, reader); break; case TIMESTAMP: - if (allTextMode) { - writeString(map.varChar(fieldName), reader.getTimestamp().toString()); - } else { - map.timeStamp(fieldName).writeTimeStamp(reader.getTimestampLong()); - } + writeTimeStamp(writer, fieldName, reader); break; case INTERVAL: throw unsupportedError("Interval type is currently not supported."); case START_MAP: - writeToMap(reader, map.map(fieldName)); + writeToListOrMap((MapOrListWriterImpl) (reader.inMap() ? writer.map(fieldName) : writer.listoftmap(fieldName)), reader); break; case START_ARRAY: - writeToList(reader, map.list(fieldName)); + writeToListOrMap((MapOrListWriterImpl) writer.list(fieldName), reader); break; - case END_ARRAY: - throw dataReadError("Encountered an END_ARRAY event inside a map."); default: throw unsupportedError("Unsupported type: %s encountered during the query.", event); } @@ -359,145 +293,115 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { IdCodec.asString(reader.getId()), fieldName), e); } } - map.end(); + writer.end(); } - private void writeToList(DBDocumentReaderBase reader, ListWriter list) { - list.startList(); - outside: while (true) { - EventType event = reader.next(); - if (event == null || event == EventType.END_ARRAY) break outside; - - switch (event) { - case NULL: - throw unsupportedError("Null values are not supported in lists."); - case BINARY: - if (allTextMode) { - writeString(list.varChar(), new String(reader.getBinary().array(), Charset.forName("UTF-8"))); - } else { - writeBinary(list.varBinary(), reader.getBinary()); - } - break; - case BOOLEAN: - if (allTextMode) { - writeString(list.varChar(), String.valueOf(reader.getBoolean())); - } else { - list.bit().writeBit(reader.getBoolean() ? 1 : 0); - } - break; - case STRING: - writeString(list.varChar(), reader.getString()); - break; - case BYTE: - if (allTextMode) { - writeString(list.varChar(), String.valueOf(reader.getByte())); - } else if (readNumbersAsDouble) { - list.float8().writeFloat8(reader.getByte()); - } else { - list.tinyInt().writeTinyInt(reader.getByte()); - } - break; - case SHORT: - if (allTextMode) { - writeString(list.varChar(), String.valueOf(reader.getShort())); - } else if (readNumbersAsDouble) { - list.float8().writeFloat8(reader.getShort()); - } else { - list.smallInt().writeSmallInt(reader.getShort()); - } - break; - case INT: - if (allTextMode) { - writeString(list.varChar(), String.valueOf(reader.getInt())); - } else if (readNumbersAsDouble) { - list.float8().writeFloat8(reader.getInt()); - } else { - list.integer().writeInt(reader.getInt()); - } - break; - case LONG: - if (allTextMode) { - writeString(list.varChar(), String.valueOf(reader.getLong())); - } else if (readNumbersAsDouble) { - list.float8().writeFloat8(reader.getLong()); - } else { - list.bigInt().writeBigInt(reader.getLong()); - } - break; - case FLOAT: - if (allTextMode) { - writeString(list.varChar(), String.valueOf(reader.getFloat())); - } else if (readNumbersAsDouble) { - list.float8().writeFloat8(reader.getFloat()); - } else { - list.float4().writeFloat4(reader.getFloat()); - } - break; - case DOUBLE: - if (allTextMode) { - writeString(list.varChar(), String.valueOf(reader.getDouble())); - } else { - list.float8().writeFloat8(reader.getDouble()); - } - break; - case DECIMAL: - throw unsupportedError("Decimals are currently not supported."); - case DATE: - if (allTextMode) { - writeString(list.varChar(), reader.getDate().toString()); - } else { - long milliSecondsSinceEpoch = reader.getDate().toDaysSinceEpoch() * MILLISECONDS_IN_A_DAY; - list.date().writeDate(milliSecondsSinceEpoch); - } - break; - case TIME: - if (allTextMode) { - writeString(list.varChar(), reader.getTime().toString()); - } else { - OTime t = reader.getTime(); - int h = t.getHour(); - int m = t.getMinute(); - int s = t.getSecond(); - int ms = t.getMilliSecond(); - int millisOfDay = ms + (s + ((m + (h * 60)) * 60)) * 1000; - list.time().writeTime(millisOfDay); - } - break; - case TIMESTAMP: - if (allTextMode) { - writeString(list.varChar(), reader.getTimestamp().toString()); - } else { - list.timeStamp().writeTimeStamp(reader.getTimestampLong()); - } - break; - case INTERVAL: - throw unsupportedError("Interval is currently not supported."); - case START_MAP: - writeToMap(reader, list.map()); - break; - case END_MAP: - throw dataReadError("Encountered an END_MAP event inside a list."); - case START_ARRAY: - writeToList(reader, list.list()); - break; - default: - throw unsupportedError("Unsupported type: %s encountered during the query.%s", event); - } + private void writeTimeStamp(MapOrListWriterImpl writer, String fieldName, DBDocumentReaderBase reader) { + if (allTextMode) { + writeString(writer, fieldName, reader.getTimestamp().toUTCString()); + } else { + ((writer.map != null) ? writer.map.timeStamp(fieldName) : writer.list.timeStamp()).writeTimeStamp(reader.getTimestampLong()); + } + } + + private void writeTime(MapOrListWriterImpl writer, String fieldName, DBDocumentReaderBase reader) { + if (allTextMode) { + writeString(writer, reader.getTime().toTimeStr(), fieldName); + } else { + ((writer.map != null) ? writer.map.time(fieldName) : writer.list.time()).writeTime(reader.getTimeInt()); + } + } + + private void writeDate(MapOrListWriterImpl writer, String fieldName, DBDocumentReaderBase reader) { + if (allTextMode) { + writeString(writer, reader.getDate().toDateStr(), fieldName); + } else { + long milliSecondsSinceEpoch = reader.getDateInt() * MILLISECONDS_IN_A_DAY; + ((writer.map != null) ? writer.map.date(fieldName) : writer.list.date()).writeDate(milliSecondsSinceEpoch); + } + } + + private void writeDouble(MapOrListWriterImpl writer, String fieldName, DBDocumentReaderBase reader) { + if (allTextMode) { + writeString(writer, String.valueOf(reader.getDouble()), fieldName); + } else { + writer.float8(fieldName).writeFloat8(reader.getDouble()); + } + } + + private void writeFloat(MapOrListWriterImpl writer, String fieldName, DBDocumentReaderBase reader) { + if (allTextMode) { + writeString(writer, String.valueOf(reader.getFloat()), fieldName); + } else if (readNumbersAsDouble) { + writer.float8(fieldName).writeFloat8(reader.getFloat()); + } else { + writer.float4(fieldName).writeFloat4(reader.getFloat()); + } + } + + private void writeLong(MapOrListWriterImpl writer, String fieldName, DBDocumentReaderBase reader) { + if (allTextMode) { + writeString(writer, String.valueOf(reader.getLong()), fieldName); + } else if (readNumbersAsDouble) { + writer.float8(fieldName).writeFloat8(reader.getLong()); + } else { + writer.bigInt(fieldName).writeBigInt(reader.getLong()); + } + } + + private void writeInt(MapOrListWriterImpl writer, String fieldName, DBDocumentReaderBase reader) { + if (allTextMode) { + writeString(writer, String.valueOf(reader.getInt()), fieldName); + } else if (readNumbersAsDouble) { + writer.float8(fieldName).writeFloat8(reader.getInt()); + } else { + writer.integer(fieldName).writeInt(reader.getInt()); + } + } + + private void writeShort(MapOrListWriterImpl writer, String fieldName, DBDocumentReaderBase reader) { + if (allTextMode) { + writeString(writer, String.valueOf(reader.getShort()), fieldName); + } else if (readNumbersAsDouble) { + writer.float8(fieldName).writeFloat8(reader.getShort()); + } else { + ((writer.map != null) ? writer.map.smallInt(fieldName) : writer.list.smallInt()).writeSmallInt(reader.getShort()); } - list.endList(); } - private void writeBinary(VarBinaryWriter binaryWriter, ByteBuffer buf) { - buffer = buffer.reallocIfNeeded(buf.remaining()); - buffer.setBytes(0, buf, buf.position(), buf.remaining()); - binaryWriter.writeVarBinary(0, buf.remaining(), buffer); + private void writeByte(MapOrListWriterImpl writer, String fieldName, DBDocumentReaderBase reader) { + if (allTextMode) { + writeString(writer, String.valueOf(reader.getByte()), fieldName); + } else if (readNumbersAsDouble) { + writer.float8(fieldName).writeFloat8(reader.getByte()); + } else { + ((writer.map != null) ? writer.map.tinyInt(fieldName) : writer.list.tinyInt()).writeTinyInt(reader.getByte()); + } + } + + private void writeBoolean(MapOrListWriterImpl writer, String fieldName, DBDocumentReaderBase reader) { + if (allTextMode) { + writeString(writer, String.valueOf(reader.getBoolean()), fieldName); + } else { + writer.bit(fieldName).writeBit(reader.getBoolean() ? 1 : 0); + } + } + + private void writeBinary(MapOrListWriterImpl writer, String fieldName, ByteBuffer buf) { + if (allTextMode) { + writeString(writer, fieldName, Bytes.toString(buf)); + } else { + buffer = buffer.reallocIfNeeded(buf.remaining()); + buffer.setBytes(0, buf, buf.position(), buf.remaining()); + writer.binary(fieldName).writeVarBinary(0, buf.remaining(), buffer); + } } - private void writeString(VarCharWriter varCharWriter, String string) { - final byte[] strBytes = Bytes.toBytes(string); + private void writeString(MapOrListWriterImpl writer, String fieldName, String value) { + final byte[] strBytes = Bytes.toBytes(value); buffer = buffer.reallocIfNeeded(strBytes.length); buffer.setBytes(0, strBytes); - varCharWriter.writeVarChar(0, strBytes.length, buffer); + writer.varChar(fieldName).writeVarChar(0, strBytes.length, buffer); } private UserException unsupportedError(String format, Object... args) { http://git-wip-us.apache.org/repos/asf/drill/blob/156819d0/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestSimpleJson.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestSimpleJson.java b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestSimpleJson.java index 225fb2f..2bf2c31 100644 --- a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestSimpleJson.java +++ b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestSimpleJson.java @@ -39,7 +39,16 @@ import com.mapr.tests.annotations.ClusterTest; public class TestSimpleJson extends BaseJsonTest { @Test - public void testMe() throws Exception { + public void testSelectStar() throws Exception { + final String sql = "SELECT\n" + + " *\n" + + "FROM\n" + + " hbase.`business` business"; + runSQLAndVerifyCount(sql, 10); + } + + @Test + public void testSelectId() throws Exception { setColumnWidths(new int[] {23}); final String sql = "SELECT\n" + " _id\n" @@ -58,6 +67,24 @@ public class TestSimpleJson extends BaseJsonTest { } @Test + public void testPushdownDisabled() throws Exception { + setColumnWidths(new int[] {25, 40, 40, 40}); + final String sql = "SELECT\n" + + " _id, name, categories, full_address\n" + + "FROM\n" + + " table(hbase.`business`(type => 'maprdb', enablePushdown => false)) business\n" + + "WHERE\n" + + " name <> 'Sprint'" + ; + runSQLAndVerifyCount(sql, 9); + + final String[] expectedPlan = {"condition=null", "columns=\\[`\\*`\\]"}; + final String[] excludedPlan = {"condition=\\(name != \"Sprint\"\\)", "columns=\\[`name`, `_id`, `categories`, `full_address`\\]"}; + + PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan); + } + + @Test public void testPushdownStringEqual() throws Exception { setColumnWidths(new int[] {25, 40, 40, 40}); final String sql = "SELECT\n" @@ -131,7 +158,7 @@ public class TestSimpleJson extends BaseJsonTest { ; runSQLAndVerifyCount(sql, 9); - final String[] expectedPlan = {"condition=\\(name != \"Sprint\"\\)"}; + final String[] expectedPlan = {"condition=\\(name != \"Sprint\"\\)", "columns=\\[`name`, `_id`, `categories`, `full_address`\\]"}; final String[] excludedPlan = {}; PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan); @@ -409,4 +436,5 @@ public class TestSimpleJson extends BaseJsonTest { PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan); } + }