[CARBONDATA-3220] Support presto to read stream segment data Support presto read the streaming table
re-factory old CarbonStreamRecordReader to reuse code for presto change CarbondataPageSource to support read streaming data by StreamRecordReader This closes #3001 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d78db8f6 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d78db8f6 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d78db8f6 Branch: refs/heads/master Commit: d78db8f6bd714e6a5111812b0c0473418201b23c Parents: 8e6def9 Author: QiangCai <qiang...@qq.com> Authored: Wed Jan 9 22:06:02 2019 +0800 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Thu Jan 10 16:59:54 2019 +0530 ---------------------------------------------------------------------- .../carbondata/hadoop/CarbonInputSplit.java | 4 +- .../hadoop/stream/CarbonStreamInputFormat.java | 159 +++++ .../hadoop/stream/CarbonStreamUtils.java | 40 ++ .../hadoop/stream/StreamBlockletReader.java | 261 +++++++ .../hadoop/stream/StreamRecordReader.java | 614 +++++++++++++++++ .../carbondata/presto/CarbonVectorBatch.java | 10 +- .../carbondata/presto/CarbondataPageSource.java | 318 ++++++++- .../presto/CarbondataPageSourceProvider.java | 142 +--- .../presto/impl/CarbonLocalInputSplit.java | 44 +- .../presto/impl/CarbonLocalMultiBlockSplit.java | 4 + .../presto/impl/CarbonTableReader.java | 3 +- .../presto/readers/BooleanStreamReader.java | 11 + .../readers/DecimalSliceStreamReader.java | 12 + .../presto/readers/DoubleStreamReader.java | 12 + .../presto/readers/IntegerStreamReader.java | 7 + .../presto/readers/LongStreamReader.java | 12 + .../presto/readers/ShortStreamReader.java | 12 + .../presto/readers/SliceStreamReader.java | 13 + .../presto/readers/TimestampStreamReader.java | 12 + .../carbondata/spark/rdd/CarbonScanRDD.scala | 2 +- .../carbondata/spark/rdd/StreamHandoffRDD.scala | 2 +- .../stream/CarbonStreamRecordReader.java | 684 ++----------------- .../streaming/CarbonStreamInputFormat.java | 160 ----- .../carbondata/streaming/CarbonStreamUtils.java | 40 -- .../streaming/StreamBlockletReader.java | 261 ------- 25 files changed, 1595 insertions(+), 1244 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java index de2451b..bcf703c 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java @@ -195,7 +195,9 @@ public class CarbonInputSplit extends FileSplit blockletInfos, split.getVersion(), split.getDeleteDeltaFiles()); blockInfo.setDetailInfo(split.getDetailInfo()); blockInfo.setDataMapWriterPath(split.dataMapWritePath); - blockInfo.setBlockOffset(split.getDetailInfo().getBlockFooterOffset()); + if (split.getDetailInfo() != null) { + blockInfo.setBlockOffset(split.getDetailInfo().getBlockFooterOffset()); + } tableBlockInfoList.add(blockInfo); } catch (IOException e) { throw new RuntimeException("fail to get location of split: " + split, e); http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamInputFormat.java new file mode 100644 index 0000000..e4819ee --- /dev/null +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamInputFormat.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.stream; + +import java.io.IOException; +import java.lang.reflect.Constructor; + +import org.apache.carbondata.core.cache.Cache; +import org.apache.carbondata.core.cache.dictionary.Dictionary; +import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.scan.complextypes.ArrayQueryType; +import org.apache.carbondata.core.scan.complextypes.PrimitiveQueryType; +import org.apache.carbondata.core.scan.complextypes.StructQueryType; +import org.apache.carbondata.core.scan.filter.GenericQueryType; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.hadoop.InputMetricsStats; + +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; + +/** + * Stream input format + */ +public class CarbonStreamInputFormat extends FileInputFormat<Void, Object> { + + public static final String READ_BUFFER_SIZE = "carbon.stream.read.buffer.size"; + public static final String READ_BUFFER_SIZE_DEFAULT = "65536"; + public static final String STREAM_RECORD_READER_INSTANCE = + "org.apache.carbondata.stream.CarbonStreamRecordReader"; + // return raw row for handoff + private boolean useRawRow = false; + + public void setUseRawRow(boolean useRawRow) { + this.useRawRow = useRawRow; + } + + public void setInputMetricsStats(InputMetricsStats inputMetricsStats) { + this.inputMetricsStats = inputMetricsStats; + } + + public void setIsVectorReader(boolean vectorReader) { + isVectorReader = vectorReader; + } + + public void setModel(QueryModel model) { + this.model = model; + } + + // InputMetricsStats + private InputMetricsStats inputMetricsStats; + // vector reader + private boolean isVectorReader; + private QueryModel model; + + @Override + public RecordReader<Void, Object> createRecordReader(InputSplit split, TaskAttemptContext context) + throws IOException { + try { + Constructor cons = CarbonStreamUtils + .getConstructorWithReflection(STREAM_RECORD_READER_INSTANCE, boolean.class, + InputMetricsStats.class, QueryModel.class, boolean.class); + return (RecordReader) CarbonStreamUtils + .getInstanceWithReflection(cons, isVectorReader, inputMetricsStats, model, useRawRow); + + } catch (Exception e) { + throw new IOException(e); + } + } + + public static GenericQueryType[] getComplexDimensions(CarbonTable carbontable, + CarbonColumn[] carbonColumns, Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache) + throws IOException { + GenericQueryType[] queryTypes = new GenericQueryType[carbonColumns.length]; + for (int i = 0; i < carbonColumns.length; i++) { + if (carbonColumns[i].isComplex()) { + if (DataTypes.isArrayType(carbonColumns[i].getDataType())) { + queryTypes[i] = new ArrayQueryType(carbonColumns[i].getColName(), + carbonColumns[i].getColName(), i); + } else if (DataTypes.isStructType(carbonColumns[i].getDataType())) { + queryTypes[i] = new StructQueryType(carbonColumns[i].getColName(), + carbonColumns[i].getColName(), i); + } else { + throw new UnsupportedOperationException( + carbonColumns[i].getDataType().getName() + " is not supported"); + } + + fillChildren(carbontable, queryTypes[i], (CarbonDimension) carbonColumns[i], i, cache); + } + } + + return queryTypes; + } + + private static void fillChildren(CarbonTable carbontable, GenericQueryType parentQueryType, + CarbonDimension dimension, int parentBlockIndex, + Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache) throws IOException { + for (int i = 0; i < dimension.getNumberOfChild(); i++) { + CarbonDimension child = dimension.getListOfChildDimensions().get(i); + DataType dataType = child.getDataType(); + GenericQueryType queryType = null; + if (DataTypes.isArrayType(dataType)) { + queryType = + new ArrayQueryType(child.getColName(), dimension.getColName(), ++parentBlockIndex); + + } else if (DataTypes.isStructType(dataType)) { + queryType = + new StructQueryType(child.getColName(), dimension.getColName(), ++parentBlockIndex); + parentQueryType.addChildren(queryType); + } else { + boolean isDirectDictionary = + CarbonUtil.hasEncoding(child.getEncoder(), Encoding.DIRECT_DICTIONARY); + boolean isDictionary = + CarbonUtil.hasEncoding(child.getEncoder(), Encoding.DICTIONARY); + Dictionary dictionary = null; + if (isDictionary) { + String dictionaryPath = carbontable.getTableInfo().getFactTable().getTableProperties() + .get(CarbonCommonConstants.DICTIONARY_PATH); + DictionaryColumnUniqueIdentifier dictionarIdentifier = + new DictionaryColumnUniqueIdentifier(carbontable.getAbsoluteTableIdentifier(), + child.getColumnIdentifier(), child.getDataType(), dictionaryPath); + dictionary = cache.get(dictionarIdentifier); + } + queryType = + new PrimitiveQueryType(child.getColName(), dimension.getColName(), ++parentBlockIndex, + child.getDataType(), 4, dictionary, + isDirectDictionary); + } + parentQueryType.addChildren(queryType); + if (child.getNumberOfChild() > 0) { + fillChildren(carbontable, queryType, child, parentBlockIndex, cache); + } + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamUtils.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamUtils.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamUtils.java new file mode 100644 index 0000000..78669e7 --- /dev/null +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamUtils.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.hadoop.stream; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; + +/** + * Util class which does utility function for stream module + */ +public class CarbonStreamUtils { + + public static Constructor getConstructorWithReflection(String className, + Class<?>... parameterTypes) + throws ClassNotFoundException, NoSuchMethodException { + Class loadedClass = Class.forName(className); + return loadedClass.getConstructor(parameterTypes); + + } + + public static Object getInstanceWithReflection(Constructor cons, Object... initargs) throws + IllegalAccessException, + InvocationTargetException, InstantiationException { + return cons.newInstance(initargs); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamBlockletReader.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamBlockletReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamBlockletReader.java new file mode 100644 index 0000000..dbcf72d --- /dev/null +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamBlockletReader.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.stream; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; + +import org.apache.carbondata.core.datastore.compression.Compressor; +import org.apache.carbondata.core.datastore.compression.CompressorFactory; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.format.BlockletHeader; + +/** + * stream blocklet reader + */ +public class StreamBlockletReader { + + private byte[] buffer; + private int offset; + private final byte[] syncMarker; + private final byte[] syncBuffer; + private final int syncLen; + private long pos = 0; + private final InputStream in; + private final long limitStart; + private final long limitEnd; + private boolean isAlreadySync = false; + private Compressor compressor; + private int rowNums = 0; + private int rowIndex = 0; + private boolean isHeaderPresent; + + public StreamBlockletReader(byte[] syncMarker, InputStream in, long limit, + boolean isHeaderPresent, String compressorName) { + this.syncMarker = syncMarker; + syncLen = syncMarker.length; + syncBuffer = new byte[syncLen]; + this.in = in; + limitStart = limit; + limitEnd = limitStart + syncLen; + this.isHeaderPresent = isHeaderPresent; + this.compressor = CompressorFactory.getInstance().getCompressor(compressorName); + } + + private void ensureCapacity(int capacity) { + if (buffer == null || capacity > buffer.length) { + buffer = new byte[capacity]; + } + } + + /** + * find the first position of sync_marker in input stream + */ + private boolean sync() throws IOException { + if (!readBytesFromStream(syncBuffer, 0, syncLen)) { + return false; + } + boolean skipHeader = false; + for (int i = 0; i < limitStart; i++) { + int j = 0; + for (; j < syncLen; j++) { + if (syncMarker[j] != syncBuffer[(i + j) % syncLen]) break; + } + if (syncLen == j) { + if (isHeaderPresent) { + if (skipHeader) { + return true; + } else { + skipHeader = true; + } + } else { + return true; + } + } + int value = in.read(); + if (-1 == value) { + return false; + } + syncBuffer[i % syncLen] = (byte) value; + pos++; + } + return false; + } + + public BlockletHeader readBlockletHeader() throws IOException { + int len = readIntFromStream(); + byte[] b = new byte[len]; + if (!readBytesFromStream(b, 0, len)) { + throw new EOFException("Failed to read blocklet header"); + } + BlockletHeader header = CarbonUtil.readBlockletHeader(b); + rowNums = header.getBlocklet_info().getNum_rows(); + rowIndex = 0; + return header; + } + + public void readBlockletData(BlockletHeader header) throws IOException { + ensureCapacity(header.getBlocklet_length()); + offset = 0; + int len = readIntFromStream(); + byte[] b = new byte[len]; + if (!readBytesFromStream(b, 0, len)) { + throw new EOFException("Failed to read blocklet data"); + } + compressor.rawUncompress(b, buffer); + } + + public void skipBlockletData(boolean reset) throws IOException { + int len = readIntFromStream(); + skip(len); + pos += len; + if (reset) { + this.rowNums = 0; + this.rowIndex = 0; + } + } + + private void skip(int len) throws IOException { + long remaining = len; + do { + long skipLen = in.skip(remaining); + remaining -= skipLen; + } while (remaining > 0); + } + + /** + * find the next blocklet + */ + public boolean nextBlocklet() throws IOException { + if (pos >= limitStart) { + return false; + } + if (isAlreadySync) { + if (!readBytesFromStream(syncBuffer, 0, syncLen)) { + return false; + } + } else { + isAlreadySync = true; + if (!sync()) { + return false; + } + } + + return pos < limitEnd; + } + + public boolean hasNext() throws IOException { + return rowIndex < rowNums; + } + + public void nextRow() { + rowIndex++; + } + + public int readIntFromStream() throws IOException { + int ch1 = in.read(); + int ch2 = in.read(); + int ch3 = in.read(); + int ch4 = in.read(); + if ((ch1 | ch2 | ch3 | ch4) < 0) throw new EOFException(); + pos += 4; + return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0)); + } + + /** + * Reads <code>len</code> bytes of data from the input stream into + * an array of bytes. + * @return <code>true</code> if reading data successfully, or + * <code>false</code> if there is no more data because the end of the stream has been reached. + */ + public boolean readBytesFromStream(byte[] b, int offset, int len) throws IOException { + int readLen = in.read(b, offset, len); + if (readLen < 0) { + return false; + } + pos += readLen; + if (readLen < len) { + return readBytesFromStream(b, offset + readLen, len - readLen); + } else { + return true; + } + } + + public boolean readBoolean() { + return (buffer[offset++]) != 0; + } + + public short readShort() { + short v = (short) ((buffer[offset + 1] & 255) + + ((buffer[offset]) << 8)); + offset += 2; + return v; + } + + public byte[] copy(int len) { + byte[] b = new byte[len]; + System.arraycopy(buffer, offset, b, 0, len); + return b; + } + + public int readInt() { + int v = ((buffer[offset + 3] & 255) + + ((buffer[offset + 2] & 255) << 8) + + ((buffer[offset + 1] & 255) << 16) + + ((buffer[offset]) << 24)); + offset += 4; + return v; + } + + public long readLong() { + long v = ((long)(buffer[offset + 7] & 255)) + + ((long) (buffer[offset + 6] & 255) << 8) + + ((long) (buffer[offset + 5] & 255) << 16) + + ((long) (buffer[offset + 4] & 255) << 24) + + ((long) (buffer[offset + 3] & 255) << 32) + + ((long) (buffer[offset + 2] & 255) << 40) + + ((long) (buffer[offset + 1] & 255) << 48) + + ((long) (buffer[offset]) << 56); + offset += 8; + return v; + } + + public double readDouble() { + return Double.longBitsToDouble(readLong()); + } + + public byte[] readBytes(int len) { + byte[] b = new byte[len]; + System.arraycopy(buffer, offset, b, 0, len); + offset += len; + return b; + } + + public void skipBytes(int len) { + offset += len; + } + + public int getRowNums() { + return rowNums; + } + + public void close() { + CarbonUtil.closeStreams(in); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java new file mode 100644 index 0000000..75e36be --- /dev/null +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java @@ -0,0 +1,614 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.stream; + +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.BitSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.core.cache.Cache; +import org.apache.carbondata.core.cache.CacheProvider; +import org.apache.carbondata.core.cache.CacheType; +import org.apache.carbondata.core.cache.dictionary.Dictionary; +import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.datastore.compression.CompressorFactory; +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator; +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; +import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.reader.CarbonHeaderReader; +import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; +import org.apache.carbondata.core.scan.filter.FilterUtil; +import org.apache.carbondata.core.scan.filter.GenericQueryType; +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; +import org.apache.carbondata.core.scan.filter.intf.RowImpl; +import org.apache.carbondata.core.scan.filter.intf.RowIntf; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.util.CarbonMetadataUtil; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.format.BlockletHeader; +import org.apache.carbondata.format.FileHeader; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +/** + * Stream row record reader + */ +public class StreamRecordReader extends RecordReader<Void, Object> { + + // metadata + protected CarbonTable carbonTable; + private CarbonColumn[] storageColumns; + private boolean[] isRequired; + private DataType[] measureDataTypes; + private int dimensionCount; + private int measureCount; + + // input + private FileSplit fileSplit; + private Configuration hadoopConf; + protected StreamBlockletReader input; + protected boolean isFirstRow = true; + protected QueryModel model; + + // decode data + private BitSet allNonNull; + private boolean[] isNoDictColumn; + private DirectDictionaryGenerator[] directDictionaryGenerators; + private CacheProvider cacheProvider; + private Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache; + private GenericQueryType[] queryTypes; + private String compressorName; + + // vectorized reader + protected boolean isFinished = false; + + // filter + protected FilterExecuter filter; + private boolean[] isFilterRequired; + private Object[] filterValues; + protected RowIntf filterRow; + private int[] filterMap; + + // output + protected CarbonColumn[] projection; + private boolean[] isProjectionRequired; + private int[] projectionMap; + protected Object[] outputValues; + + // empty project, null filter + protected boolean skipScanData; + + // return raw row for handoff + private boolean useRawRow = false; + + public StreamRecordReader(QueryModel mdl, boolean useRawRow) { + this.model = mdl; + this.useRawRow = useRawRow; + + } + + @Override public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException { + // input + if (split instanceof CarbonInputSplit) { + fileSplit = (CarbonInputSplit) split; + } else if (split instanceof CarbonMultiBlockSplit) { + fileSplit = ((CarbonMultiBlockSplit) split).getAllSplits().get(0); + } else { + fileSplit = (FileSplit) split; + } + + // metadata + hadoopConf = context.getConfiguration(); + if (model == null) { + CarbonTableInputFormat format = new CarbonTableInputFormat<Object>(); + model = format.createQueryModel(split, context); + } + carbonTable = model.getTable(); + List<CarbonDimension> dimensions = + carbonTable.getDimensionByTableName(carbonTable.getTableName()); + dimensionCount = dimensions.size(); + List<CarbonMeasure> measures = carbonTable.getMeasureByTableName(carbonTable.getTableName()); + measureCount = measures.size(); + List<CarbonColumn> carbonColumnList = + carbonTable.getStreamStorageOrderColumn(carbonTable.getTableName()); + storageColumns = carbonColumnList.toArray(new CarbonColumn[carbonColumnList.size()]); + isNoDictColumn = CarbonDataProcessorUtil.getNoDictionaryMapping(storageColumns); + directDictionaryGenerators = new DirectDictionaryGenerator[storageColumns.length]; + for (int i = 0; i < storageColumns.length; i++) { + if (storageColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) { + directDictionaryGenerators[i] = DirectDictionaryKeyGeneratorFactory + .getDirectDictionaryGenerator(storageColumns[i].getDataType()); + } + } + measureDataTypes = new DataType[measureCount]; + for (int i = 0; i < measureCount; i++) { + measureDataTypes[i] = storageColumns[dimensionCount + i].getDataType(); + } + + // decode data + allNonNull = new BitSet(storageColumns.length); + projection = model.getProjectionColumns(); + + isRequired = new boolean[storageColumns.length]; + boolean[] isFiltlerDimensions = model.getIsFilterDimensions(); + boolean[] isFiltlerMeasures = model.getIsFilterMeasures(); + isFilterRequired = new boolean[storageColumns.length]; + filterMap = new int[storageColumns.length]; + for (int i = 0; i < storageColumns.length; i++) { + if (storageColumns[i].isDimension()) { + if (isFiltlerDimensions[storageColumns[i].getOrdinal()]) { + isRequired[i] = true; + isFilterRequired[i] = true; + filterMap[i] = storageColumns[i].getOrdinal(); + } + } else { + if (isFiltlerMeasures[storageColumns[i].getOrdinal()]) { + isRequired[i] = true; + isFilterRequired[i] = true; + filterMap[i] = carbonTable.getDimensionOrdinalMax() + storageColumns[i].getOrdinal(); + } + } + } + + isProjectionRequired = new boolean[storageColumns.length]; + projectionMap = new int[storageColumns.length]; + for (int j = 0; j < projection.length; j++) { + for (int i = 0; i < storageColumns.length; i++) { + if (storageColumns[i].getColName().equals(projection[j].getColName())) { + isRequired[i] = true; + isProjectionRequired[i] = true; + projectionMap[i] = j; + break; + } + } + } + + // initialize filter + if (null != model.getFilterExpressionResolverTree()) { + initializeFilter(); + } else if (projection.length == 0) { + skipScanData = true; + } + + } + + private void initializeFilter() { + + List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil + .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName()), + carbonTable.getMeasureByTableName(carbonTable.getTableName())); + int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()]; + for (int i = 0; i < dimLensWithComplex.length; i++) { + dimLensWithComplex[i] = Integer.MAX_VALUE; + } + + int[] dictionaryColumnCardinality = + CarbonUtil.getFormattedCardinality(dimLensWithComplex, wrapperColumnSchemaList); + SegmentProperties segmentProperties = + new SegmentProperties(wrapperColumnSchemaList, dictionaryColumnCardinality); + Map<Integer, GenericQueryType> complexDimensionInfoMap = new HashMap<>(); + + FilterResolverIntf resolverIntf = model.getFilterExpressionResolverTree(); + filter = + FilterUtil.getFilterExecuterTree(resolverIntf, segmentProperties, complexDimensionInfoMap); + // for row filter, we need update column index + FilterUtil.updateIndexOfColumnExpression(resolverIntf.getFilterExpression(), + carbonTable.getDimensionOrdinalMax()); + + } + + private byte[] getSyncMarker(String filePath) throws IOException { + CarbonHeaderReader headerReader = new CarbonHeaderReader(filePath); + FileHeader header = headerReader.readHeader(); + // legacy store does not have this member + if (header.isSetCompressor_name()) { + compressorName = header.getCompressor_name(); + } else { + compressorName = CompressorFactory.NativeSupportedCompressor.SNAPPY.getName(); + } + return header.getSync_marker(); + } + + protected void initializeAtFirstRow() throws IOException { + filterValues = new Object[carbonTable.getDimensionOrdinalMax() + measureCount]; + filterRow = new RowImpl(); + filterRow.setValues(filterValues); + + outputValues = new Object[projection.length]; + + Path file = fileSplit.getPath(); + + byte[] syncMarker = getSyncMarker(file.toString()); + + FileSystem fs = file.getFileSystem(hadoopConf); + + int bufferSize = Integer.parseInt(hadoopConf.get(CarbonStreamInputFormat.READ_BUFFER_SIZE, + CarbonStreamInputFormat.READ_BUFFER_SIZE_DEFAULT)); + + FSDataInputStream fileIn = fs.open(file, bufferSize); + fileIn.seek(fileSplit.getStart()); + input = new StreamBlockletReader(syncMarker, fileIn, fileSplit.getLength(), + fileSplit.getStart() == 0, compressorName); + + cacheProvider = CacheProvider.getInstance(); + cache = cacheProvider.createCache(CacheType.FORWARD_DICTIONARY); + queryTypes = CarbonStreamInputFormat.getComplexDimensions(carbonTable, storageColumns, cache); + } + + /** + * check next Row + */ + protected boolean nextRow() throws IOException { + // read row one by one + try { + boolean hasNext; + boolean scanMore = false; + do { + hasNext = input.hasNext(); + if (hasNext) { + if (skipScanData) { + input.nextRow(); + scanMore = false; + } else { + if (useRawRow) { + // read raw row for streaming handoff which does not require decode raw row + readRawRowFromStream(); + } else { + readRowFromStream(); + } + if (null != filter) { + scanMore = !filter.applyFilter(filterRow, carbonTable.getDimensionOrdinalMax()); + } else { + scanMore = false; + } + } + } else { + if (input.nextBlocklet()) { + BlockletHeader header = input.readBlockletHeader(); + if (isScanRequired(header)) { + if (skipScanData) { + input.skipBlockletData(false); + } else { + input.readBlockletData(header); + } + } else { + input.skipBlockletData(true); + } + scanMore = true; + } else { + isFinished = true; + scanMore = false; + } + } + } while (scanMore); + return hasNext; + } catch (FilterUnsupportedException e) { + throw new IOException("Failed to filter row in detail reader", e); + } + } + + @Override public boolean nextKeyValue() throws IOException, InterruptedException { + if (isFirstRow) { + isFirstRow = false; + initializeAtFirstRow(); + } + if (isFinished) { + return false; + } + + return nextRow(); + } + + @Override public Void getCurrentKey() throws IOException, InterruptedException { + return null; + } + + @Override public Object getCurrentValue() throws IOException, InterruptedException { + return outputValues; + } + + protected boolean isScanRequired(BlockletHeader header) { + if (filter != null && header.getBlocklet_index() != null) { + BlockletMinMaxIndex minMaxIndex = CarbonMetadataUtil + .convertExternalMinMaxIndex(header.getBlocklet_index().getMin_max_index()); + if (minMaxIndex != null) { + BitSet bitSet = filter + .isScanRequired(minMaxIndex.getMaxValues(), minMaxIndex.getMinValues(), + minMaxIndex.getIsMinMaxSet()); + if (bitSet.isEmpty()) { + return false; + } else { + return true; + } + } + } + return true; + } + + protected void readRowFromStream() { + input.nextRow(); + short nullLen = input.readShort(); + BitSet nullBitSet = allNonNull; + if (nullLen > 0) { + nullBitSet = BitSet.valueOf(input.readBytes(nullLen)); + } + int colCount = 0; + // primitive type dimension + for (; colCount < isNoDictColumn.length; colCount++) { + if (nullBitSet.get(colCount)) { + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = null; + } + } else { + if (isNoDictColumn[colCount]) { + int v = input.readShort(); + if (isRequired[colCount]) { + byte[] b = input.readBytes(v); + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = b; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = DataTypeUtil + .getDataBasedOnDataTypeForNoDictionaryColumn(b, + storageColumns[colCount].getDataType()); + } + } else { + input.skipBytes(v); + } + } else if (null != directDictionaryGenerators[colCount]) { + if (isRequired[colCount]) { + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = input.copy(4); + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = + directDictionaryGenerators[colCount].getValueFromSurrogate(input.readInt()); + } else { + input.skipBytes(4); + } + } else { + input.skipBytes(4); + } + } else { + if (isRequired[colCount]) { + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = input.copy(4); + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = input.readInt(); + } else { + input.skipBytes(4); + } + } else { + input.skipBytes(4); + } + } + } + } + // complex type dimension + for (; colCount < dimensionCount; colCount++) { + if (nullBitSet.get(colCount)) { + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = null; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = null; + } + } else { + short v = input.readShort(); + if (isRequired[colCount]) { + byte[] b = input.readBytes(v); + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = b; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = + queryTypes[colCount].getDataBasedOnDataType(ByteBuffer.wrap(b)); + } + } else { + input.skipBytes(v); + } + } + } + // measure + DataType dataType; + for (int msrCount = 0; msrCount < measureCount; msrCount++, colCount++) { + if (nullBitSet.get(colCount)) { + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = null; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = null; + } + } else { + dataType = measureDataTypes[msrCount]; + if (dataType == DataTypes.BOOLEAN) { + if (isRequired[colCount]) { + boolean v = input.readBoolean(); + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = v; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = v; + } + } else { + input.skipBytes(1); + } + } else if (dataType == DataTypes.SHORT) { + if (isRequired[colCount]) { + short v = input.readShort(); + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = v; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = v; + } + } else { + input.skipBytes(2); + } + } else if (dataType == DataTypes.INT) { + if (isRequired[colCount]) { + int v = input.readInt(); + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = v; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = v; + } + } else { + input.skipBytes(4); + } + } else if (dataType == DataTypes.LONG) { + if (isRequired[colCount]) { + long v = input.readLong(); + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = v; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = v; + } + } else { + input.skipBytes(8); + } + } else if (dataType == DataTypes.DOUBLE) { + if (isRequired[colCount]) { + double v = input.readDouble(); + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = v; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = v; + } + } else { + input.skipBytes(8); + } + } else if (DataTypes.isDecimal(dataType)) { + int len = input.readShort(); + if (isRequired[colCount]) { + BigDecimal v = DataTypeUtil.byteToBigDecimal(input.readBytes(len)); + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = v; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = + DataTypeUtil.getDataTypeConverter().convertFromBigDecimalToDecimal(v); + } + } else { + input.skipBytes(len); + } + } + } + } + } + + private void readRawRowFromStream() { + input.nextRow(); + short nullLen = input.readShort(); + BitSet nullBitSet = allNonNull; + if (nullLen > 0) { + nullBitSet = BitSet.valueOf(input.readBytes(nullLen)); + } + int colCount = 0; + // primitive type dimension + for (; colCount < isNoDictColumn.length; colCount++) { + if (nullBitSet.get(colCount)) { + outputValues[colCount] = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY; + } else { + if (isNoDictColumn[colCount]) { + int v = input.readShort(); + outputValues[colCount] = input.readBytes(v); + } else { + outputValues[colCount] = input.readInt(); + } + } + } + // complex type dimension + for (; colCount < dimensionCount; colCount++) { + if (nullBitSet.get(colCount)) { + outputValues[colCount] = null; + } else { + short v = input.readShort(); + outputValues[colCount] = input.readBytes(v); + } + } + // measure + DataType dataType; + for (int msrCount = 0; msrCount < measureCount; msrCount++, colCount++) { + if (nullBitSet.get(colCount)) { + outputValues[colCount] = null; + } else { + dataType = measureDataTypes[msrCount]; + if (dataType == DataTypes.BOOLEAN) { + outputValues[colCount] = input.readBoolean(); + } else if (dataType == DataTypes.SHORT) { + outputValues[colCount] = input.readShort(); + } else if (dataType == DataTypes.INT) { + outputValues[colCount] = input.readInt(); + } else if (dataType == DataTypes.LONG) { + outputValues[colCount] = input.readLong(); + } else if (dataType == DataTypes.DOUBLE) { + outputValues[colCount] = input.readDouble(); + } else if (DataTypes.isDecimal(dataType)) { + int len = input.readShort(); + outputValues[colCount] = DataTypeUtil.byteToBigDecimal(input.readBytes(len)); + } + } + } + } + + @Override public float getProgress() { + return 0; + } + + @Override public void close() throws IOException { + if (null != input) { + input.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java index 140e46b..2f0c9eb 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java @@ -75,7 +75,7 @@ public class CarbonVectorBatch { } } - private CarbonColumnVectorImpl createDirectStreamReader(int batchSize, DataType dataType, + public static CarbonColumnVectorImpl createDirectStreamReader(int batchSize, DataType dataType, StructField field, Dictionary dictionary) { if (dataType == DataTypes.BOOLEAN) { return new BooleanStreamReader(batchSize, field.getDataType(), dictionary); @@ -92,8 +92,12 @@ public class CarbonVectorBatch { } else if (dataType == DataTypes.STRING) { return new SliceStreamReader(batchSize, field.getDataType(), dictionary); } else if (DataTypes.isDecimal(dataType)) { - return new DecimalSliceStreamReader(batchSize, field.getDataType(), (DecimalType) dataType, - dictionary); + if (dataType instanceof DecimalType) { + return new DecimalSliceStreamReader(batchSize, field.getDataType(), (DecimalType) dataType, + dictionary); + } else { + return null; + } } else { return new ObjectStreamReader(batchSize, field.getDataType()); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java index 93de394..f289718 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java @@ -18,14 +18,46 @@ package org.apache.carbondata.presto; import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Date; import java.util.List; import java.util.Objects; +import org.apache.carbondata.common.CarbonIterator; import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.cache.dictionary.Dictionary; +import org.apache.carbondata.core.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator; +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.datatype.StructField; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.scan.executor.QueryExecutor; +import org.apache.carbondata.core.scan.executor.QueryExecutorFactory; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.scan.model.ProjectionDimension; +import org.apache.carbondata.core.scan.model.ProjectionMeasure; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator; +import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl; +import org.apache.carbondata.core.statusmanager.FileFormat; +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; +import org.apache.carbondata.hadoop.CarbonProjection; +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; +import org.apache.carbondata.hadoop.stream.StreamRecordReader; +import org.apache.carbondata.presto.impl.CarbonLocalMultiBlockSplit; import org.apache.carbondata.presto.readers.PrestoVectorBlockBuilder; import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException; import com.facebook.presto.hadoop.$internal.com.google.common.base.Throwables; +import com.facebook.presto.hive.HiveColumnHandle; +import com.facebook.presto.hive.HiveSplit; import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ConnectorPageSource; import com.facebook.presto.spi.Page; @@ -33,6 +65,12 @@ import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.block.Block; import com.facebook.presto.spi.block.LazyBlock; import com.facebook.presto.spi.block.LazyBlockLoader; +import com.google.common.collect.ImmutableList; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.log4j.Logger; import static com.google.common.base.Preconditions.checkState; @@ -44,18 +82,103 @@ class CarbondataPageSource implements ConnectorPageSource { private static final Logger LOGGER = LogServiceFactory.getLogService(CarbondataPageSource.class.getName()); + + private HiveSplit split; + private CarbonTable carbonTable; + private String queryId; + private Configuration hadoopConf; + private FileFormat fileFormat; private List<ColumnHandle> columnHandles; + private int columnCount = 0; private boolean closed; - private PrestoCarbonVectorizedRecordReader vectorReader; private long sizeOfData = 0; private int batchId; private long nanoStart; private long nanoEnd; + private CarbonDictionaryDecodeReadSupport readSupport; + + // columnar format split + private PrestoCarbonVectorizedRecordReader vectorReader; + private boolean isDirectVectorFill; + + // row format split + private StreamRecordReader rowReader; + private StructField[] fields; + private int batchSize = 100; + private Dictionary[] dictionaries; + private DataType[] dataTypes; + private boolean isFrstPage = true; - CarbondataPageSource(PrestoCarbonVectorizedRecordReader vectorizedRecordReader, - List<ColumnHandle> columnHandles) { + CarbondataPageSource(CarbonTable carbonTable, String queryId, HiveSplit split, + List<ColumnHandle> columnHandles, Configuration hadoopConf, boolean isDirectVectorFill) { + this.carbonTable = carbonTable; + this.queryId = queryId; + this.split = split; this.columnHandles = columnHandles; - vectorReader = vectorizedRecordReader; + this.hadoopConf = hadoopConf; + this.isDirectVectorFill = isDirectVectorFill; + initialize(); + } + + private void initialize() { + CarbonMultiBlockSplit carbonInputSplit = CarbonLocalMultiBlockSplit + .convertSplit(split.getSchema().getProperty("carbonSplit")); + fileFormat = carbonInputSplit.getFileFormat(); + if (fileFormat.ordinal() == FileFormat.ROW_V1.ordinal()) { + initializeForRow(); + } else { + initializeForColumnar(); + } + } + + private void initializeForColumnar() { + readSupport = new CarbonDictionaryDecodeReadSupport(); + vectorReader = createReaderForColumnar(split, columnHandles, readSupport, hadoopConf); + } + + private void initializeForRow() { + QueryModel queryModel = createQueryModel(split, columnHandles, hadoopConf); + rowReader = new StreamRecordReader(queryModel, false); + List<ProjectionDimension> queryDimension = queryModel.getProjectionDimensions(); + List<ProjectionMeasure> queryMeasures = queryModel.getProjectionMeasures(); + fields = new StructField[queryDimension.size() + queryMeasures.size()]; + for (int i = 0; i < queryDimension.size(); i++) { + ProjectionDimension dim = queryDimension.get(i); + if (dim.getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) { + DirectDictionaryGenerator generator = DirectDictionaryKeyGeneratorFactory + .getDirectDictionaryGenerator(dim.getDimension().getDataType()); + fields[dim.getOrdinal()] = new StructField(dim.getColumnName(), generator.getReturnType()); + } else if (!dim.getDimension().hasEncoding(Encoding.DICTIONARY)) { + fields[dim.getOrdinal()] = + new StructField(dim.getColumnName(), dim.getDimension().getDataType()); + } else if (dim.getDimension().isComplex()) { + fields[dim.getOrdinal()] = + new StructField(dim.getColumnName(), dim.getDimension().getDataType()); + } else { + fields[dim.getOrdinal()] = new StructField(dim.getColumnName(), DataTypes.INT); + } + } + + for (ProjectionMeasure msr : queryMeasures) { + DataType dataType = msr.getMeasure().getDataType(); + if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.SHORT || dataType == DataTypes.INT + || dataType == DataTypes.LONG) { + fields[msr.getOrdinal()] = + new StructField(msr.getColumnName(), msr.getMeasure().getDataType()); + } else if (DataTypes.isDecimal(dataType)) { + fields[msr.getOrdinal()] = + new StructField(msr.getColumnName(), msr.getMeasure().getDataType()); + } else { + fields[msr.getOrdinal()] = new StructField(msr.getColumnName(), DataTypes.DOUBLE); + } + } + + this.columnCount = columnHandles.size(); + readSupport = new CarbonDictionaryDecodeReadSupport(); + readSupport.initialize(queryModel.getProjectionColumns(), queryModel.getTable()); + this.dictionaries = readSupport.getDictionaries(); + this.dataTypes = readSupport.getDataTypes(); + } @Override public long getCompletedBytes() { @@ -71,6 +194,14 @@ class CarbondataPageSource implements ConnectorPageSource { } @Override public Page getNextPage() { + if (fileFormat.ordinal() == FileFormat.ROW_V1.ordinal()) { + return getNextPageForRow(); + } else { + return getNextPageForColumnar(); + } + } + + private Page getNextPageForColumnar() { if (nanoStart == 0) { nanoStart = System.nanoTime(); } @@ -111,6 +242,68 @@ class CarbondataPageSource implements ConnectorPageSource { } } + private Page getNextPageForRow() { + if (isFrstPage) { + isFrstPage = false; + initialReaderForRow(); + } + + if (nanoStart == 0) { + nanoStart = System.nanoTime(); + } + int count = 0; + try { + Block[] blocks = new Block[columnCount]; + CarbonColumnVectorImpl[] columns = new CarbonColumnVectorImpl[columnCount]; + for (int i = 0; i < columnCount; ++i) { + columns[i] = CarbonVectorBatch + .createDirectStreamReader(batchSize, dataTypes[i], fields[i], dictionaries[i]); + } + + while (rowReader.nextKeyValue()) { + Object[] values = (Object[]) rowReader.getCurrentValue(); + for (int index = 0; index < columnCount; index++) { + columns[index].putObject(count, values[index]); + } + count++; + if (count == batchSize) { + break; + } + } + if (count == 0) { + close(); + return null; + } else { + for (int index = 0; index < columnCount; index++) { + blocks[index] = ((PrestoVectorBlockBuilder) columns[index]).buildBlock(); + sizeOfData += blocks[index].getSizeInBytes(); + } + } + return new Page(count, blocks); + } catch (PrestoException e) { + closeWithSuppression(e); + throw e; + } catch (RuntimeException | InterruptedException | IOException e) { + closeWithSuppression(e); + throw new CarbonDataLoadingException("Exception when creating the Carbon data Block", e); + } + } + + private void initialReaderForRow() { + SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmm"); + String jobTrackerId = formatter.format(new Date()); + TaskAttemptID attemptId = new TaskAttemptID(jobTrackerId, 0, TaskType.MAP, 0, 0); + TaskAttemptContextImpl attemptContext = + new TaskAttemptContextImpl(FileFactory.getConfiguration(), attemptId); + CarbonMultiBlockSplit carbonInputSplit = CarbonLocalMultiBlockSplit + .convertSplit(split.getSchema().getProperty("carbonSplit")); + try { + rowReader.initialize(carbonInputSplit, attemptContext); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + @Override public long getSystemMemoryUsage() { return sizeOfData; } @@ -122,7 +315,12 @@ class CarbondataPageSource implements ConnectorPageSource { } closed = true; try { - vectorReader.close(); + if (vectorReader != null) { + vectorReader.close(); + } + if (rowReader != null) { + rowReader.close(); + } nanoEnd = System.nanoTime(); } catch (Exception e) { throw Throwables.propagate(e); @@ -144,6 +342,116 @@ class CarbondataPageSource implements ConnectorPageSource { } /** + * Create vector reader using the split. + */ + private PrestoCarbonVectorizedRecordReader createReaderForColumnar(HiveSplit carbonSplit, + List<? extends ColumnHandle> columns, CarbonDictionaryDecodeReadSupport readSupport, + Configuration conf) { + QueryModel queryModel = createQueryModel(carbonSplit, columns, conf); + if (isDirectVectorFill) { + queryModel.setDirectVectorFill(true); + queryModel.setPreFetchData(false); + } + QueryExecutor queryExecutor = + QueryExecutorFactory.getQueryExecutor(queryModel, new Configuration()); + try { + CarbonIterator iterator = queryExecutor.execute(queryModel); + readSupport.initialize(queryModel.getProjectionColumns(), queryModel.getTable()); + PrestoCarbonVectorizedRecordReader reader = + new PrestoCarbonVectorizedRecordReader(queryExecutor, queryModel, + (AbstractDetailQueryResultIterator) iterator, readSupport); + reader.setTaskId(Long.parseLong(carbonSplit.getSchema().getProperty("index"))); + return reader; + } catch (Exception e) { + throw new RuntimeException("Failed to create reader ", e); + } + } + + /** + * @param carbondataSplit + * @param columns + * @return + */ + private QueryModel createQueryModel(HiveSplit carbondataSplit, + List<? extends ColumnHandle> columns, Configuration conf) { + + try { + CarbonProjection carbonProjection = getCarbonProjection(columns); + conf.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, ""); + String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath(); + CarbonTableInputFormat + .setTransactionalTable(conf, carbonTable.getTableInfo().isTransactionalTable()); + CarbonTableInputFormat.setTableInfo(conf, carbonTable.getTableInfo()); + conf.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath); + conf.set("query.id", queryId); + JobConf jobConf = new JobConf(conf); + CarbonTableInputFormat carbonTableInputFormat = createInputFormat(jobConf, carbonTable, + PrestoFilterUtil.parseFilterExpression(carbondataSplit.getEffectivePredicate()), + carbonProjection); + TaskAttemptContextImpl hadoopAttemptContext = + new TaskAttemptContextImpl(jobConf, new TaskAttemptID("", 1, TaskType.MAP, 0, 0)); + CarbonMultiBlockSplit carbonInputSplit = CarbonLocalMultiBlockSplit + .convertSplit(carbondataSplit.getSchema().getProperty("carbonSplit")); + QueryModel queryModel = + carbonTableInputFormat.createQueryModel(carbonInputSplit, hadoopAttemptContext); + queryModel.setQueryId(queryId); + queryModel.setVectorReader(true); + queryModel.setStatisticsRecorder( + CarbonTimeStatisticsFactory.createExecutorRecorder(queryModel.getQueryId())); + + List<TableBlockInfo> tableBlockInfoList = + CarbonInputSplit.createBlocks(carbonInputSplit.getAllSplits()); + queryModel.setTableBlockInfos(tableBlockInfoList); + return queryModel; + } catch (IOException e) { + throw new RuntimeException("Unable to get the Query Model ", e); + } + } + + /** + * @param conf + * @param carbonTable + * @param filterExpression + * @param projection + * @return + */ + private CarbonTableInputFormat<Object> createInputFormat(Configuration conf, + CarbonTable carbonTable, Expression filterExpression, CarbonProjection projection) { + + AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier(); + CarbonTableInputFormat format = new CarbonTableInputFormat<Object>(); + try { + CarbonTableInputFormat + .setTablePath(conf, identifier.appendWithLocalPrefix(identifier.getTablePath())); + CarbonTableInputFormat + .setDatabaseName(conf, identifier.getCarbonTableIdentifier().getDatabaseName()); + CarbonTableInputFormat + .setTableName(conf, identifier.getCarbonTableIdentifier().getTableName()); + } catch (Exception e) { + throw new RuntimeException("Unable to create the CarbonTableInputFormat", e); + } + CarbonTableInputFormat.setFilterPredicates(conf, filterExpression); + CarbonTableInputFormat.setColumnProjection(conf, projection); + + return format; + } + + /** + * @param columns + * @return + */ + private CarbonProjection getCarbonProjection(List<? extends ColumnHandle> columns) { + CarbonProjection carbonProjection = new CarbonProjection(); + // Convert all columns handles + ImmutableList.Builder<HiveColumnHandle> handles = ImmutableList.builder(); + for (ColumnHandle handle : columns) { + handles.add(Types.checkType(handle, HiveColumnHandle.class, "handle")); + carbonProjection.addColumn(((HiveColumnHandle) handle).getName()); + } + return carbonProjection; + } + + /** * Lazy Block Implementation for the Carbondata */ private final class CarbondataBlockLoader implements LazyBlockLoader<LazyBlock> { http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java index c81e0c3..be088e1 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java @@ -17,27 +17,12 @@ package org.apache.carbondata.presto; -import java.io.IOException; import java.util.List; import java.util.Set; import static java.util.Objects.requireNonNull; -import org.apache.carbondata.common.CarbonIterator; -import org.apache.carbondata.core.datastore.block.TableBlockInfo; -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.scan.executor.QueryExecutor; -import org.apache.carbondata.core.scan.executor.QueryExecutorFactory; -import org.apache.carbondata.core.scan.expression.Expression; -import org.apache.carbondata.core.scan.model.QueryModel; -import org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator; -import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; -import org.apache.carbondata.hadoop.CarbonInputSplit; -import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; -import org.apache.carbondata.hadoop.CarbonProjection; -import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; -import org.apache.carbondata.presto.impl.CarbonLocalMultiBlockSplit; import org.apache.carbondata.presto.impl.CarbonTableCacheModel; import org.apache.carbondata.presto.impl.CarbonTableReader; @@ -45,7 +30,6 @@ import static org.apache.carbondata.presto.Types.checkType; import com.facebook.presto.hive.HdfsEnvironment; import com.facebook.presto.hive.HiveClientConfig; -import com.facebook.presto.hive.HiveColumnHandle; import com.facebook.presto.hive.HivePageSourceFactory; import com.facebook.presto.hive.HivePageSourceProvider; import com.facebook.presto.hive.HiveRecordCursorProvider; @@ -57,14 +41,9 @@ import com.facebook.presto.spi.ConnectorSplit; import com.facebook.presto.spi.SchemaTableName; import com.facebook.presto.spi.connector.ConnectorTransactionHandle; import com.facebook.presto.spi.type.TypeManager; -import com.google.common.collect.ImmutableList; import com.google.inject.Inject; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.TaskAttemptContextImpl; -import org.apache.hadoop.mapred.TaskAttemptID; -import org.apache.hadoop.mapreduce.TaskType; import static com.google.common.base.Preconditions.checkNotNull; @@ -103,122 +82,11 @@ public class CarbondataPageSourceProvider extends HivePageSourceProvider { new HdfsEnvironment.HdfsContext(session, carbonSplit.getDatabase(), carbonSplit.getTable()), new Path(carbonSplit.getSchema().getProperty("tablePath"))); configuration = carbonTableReader.updateS3Properties(configuration); - CarbonDictionaryDecodeReadSupport readSupport = new CarbonDictionaryDecodeReadSupport(); - PrestoCarbonVectorizedRecordReader carbonRecordReader = - createReader(carbonSplit, columns, readSupport, configuration); - return new CarbondataPageSource(carbonRecordReader, columns); - } - - /** - * Create vector reader using the split. - */ - private PrestoCarbonVectorizedRecordReader createReader(HiveSplit carbonSplit, - List<? extends ColumnHandle> columns, CarbonDictionaryDecodeReadSupport readSupport, - Configuration conf) { - QueryModel queryModel = createQueryModel(carbonSplit, columns, conf); - if (carbonTableReader.config.getPushRowFilter() == null || - carbonTableReader.config.getPushRowFilter().equalsIgnoreCase("false")) { - queryModel.setDirectVectorFill(true); - queryModel.setPreFetchData(false); - } - QueryExecutor queryExecutor = - QueryExecutorFactory.getQueryExecutor(queryModel, new Configuration()); - try { - CarbonIterator iterator = queryExecutor.execute(queryModel); - readSupport.initialize(queryModel.getProjectionColumns(), queryModel.getTable()); - PrestoCarbonVectorizedRecordReader reader = - new PrestoCarbonVectorizedRecordReader(queryExecutor, queryModel, - (AbstractDetailQueryResultIterator) iterator, readSupport); - reader.setTaskId(Long.parseLong(carbonSplit.getSchema().getProperty("index"))); - return reader; - } catch (Exception e) { - throw new RuntimeException("Failed to create reader ", e); - } - } - - /** - * @param carbondataSplit - * @param columns - * @return - */ - private QueryModel createQueryModel(HiveSplit carbondataSplit, - List<? extends ColumnHandle> columns, Configuration conf) { - - try { - CarbonProjection carbonProjection = getCarbonProjection(columns); - CarbonTable carbonTable = getCarbonTable(carbondataSplit, conf); - conf.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, ""); - String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath(); - CarbonTableInputFormat - .setTransactionalTable(conf, carbonTable.getTableInfo().isTransactionalTable()); - CarbonTableInputFormat.setTableInfo(conf, carbonTable.getTableInfo()); - conf.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath); - conf.set("query.id", queryId); - JobConf jobConf = new JobConf(conf); - CarbonTableInputFormat carbonTableInputFormat = createInputFormat(jobConf, carbonTable, - PrestoFilterUtil.parseFilterExpression(carbondataSplit.getEffectivePredicate()), - carbonProjection); - TaskAttemptContextImpl hadoopAttemptContext = - new TaskAttemptContextImpl(jobConf, new TaskAttemptID("", 1, TaskType.MAP, 0, 0)); - CarbonMultiBlockSplit carbonInputSplit = CarbonLocalMultiBlockSplit - .convertSplit(carbondataSplit.getSchema().getProperty("carbonSplit")); - QueryModel queryModel = - carbonTableInputFormat.createQueryModel(carbonInputSplit, hadoopAttemptContext); - queryModel.setQueryId(queryId); - queryModel.setVectorReader(true); - queryModel.setStatisticsRecorder( - CarbonTimeStatisticsFactory.createExecutorRecorder(queryModel.getQueryId())); - - List<TableBlockInfo> tableBlockInfoList = - CarbonInputSplit.createBlocks(carbonInputSplit.getAllSplits()); - queryModel.setTableBlockInfos(tableBlockInfoList); - return queryModel; - } catch (IOException e) { - throw new RuntimeException("Unable to get the Query Model ", e); - } - } - - /** - * @param conf - * @param carbonTable - * @param filterExpression - * @param projection - * @return - */ - private CarbonTableInputFormat<Object> createInputFormat(Configuration conf, - CarbonTable carbonTable, Expression filterExpression, CarbonProjection projection) { - - AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier(); - CarbonTableInputFormat format = new CarbonTableInputFormat<Object>(); - try { - CarbonTableInputFormat - .setTablePath(conf, identifier.appendWithLocalPrefix(identifier.getTablePath())); - CarbonTableInputFormat - .setDatabaseName(conf, identifier.getCarbonTableIdentifier().getDatabaseName()); - CarbonTableInputFormat - .setTableName(conf, identifier.getCarbonTableIdentifier().getTableName()); - } catch (Exception e) { - throw new RuntimeException("Unable to create the CarbonTableInputFormat", e); - } - CarbonTableInputFormat.setFilterPredicates(conf, filterExpression); - CarbonTableInputFormat.setColumnProjection(conf, projection); - - return format; - } - - /** - * @param columns - * @return - */ - private CarbonProjection getCarbonProjection(List<? extends ColumnHandle> columns) { - CarbonProjection carbonProjection = new CarbonProjection(); - // Convert all columns handles - ImmutableList.Builder<HiveColumnHandle> handles = ImmutableList.builder(); - for (ColumnHandle handle : columns) { - handles.add(checkType(handle, HiveColumnHandle.class, "handle")); - carbonProjection.addColumn(((HiveColumnHandle) handle).getName()); - } - return carbonProjection; + CarbonTable carbonTable = getCarbonTable(carbonSplit, configuration); + boolean isDirectVectorFill = carbonTableReader.config.getPushRowFilter() == null || + carbonTableReader.config.getPushRowFilter().equalsIgnoreCase("false"); + return new CarbondataPageSource( + carbonTable, queryId, carbonSplit, columns, configuration, isDirectVectorFill); } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java index 718cb33..f4f50a5 100755 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.carbondata.core.indexstore.BlockletDetailInfo; import org.apache.carbondata.core.metadata.ColumnarFormatVersion; +import org.apache.carbondata.core.statusmanager.FileFormat; import org.apache.carbondata.hadoop.CarbonInputSplit; import com.fasterxml.jackson.annotation.JsonCreator; @@ -44,9 +45,9 @@ public class CarbonLocalInputSplit { private short version; private String[] deleteDeltaFiles; private String blockletId; - - private String detailInfo; + private int fileFormatOrdinal; + private FileFormat fileFormat; /** * Number of BlockLets in a block @@ -93,6 +94,14 @@ public class CarbonLocalInputSplit { return blockletId; } + @JsonProperty public int getFileFormatOrdinal() { + return fileFormatOrdinal; + } + + public FileFormat getFileFormat() { + return fileFormat; + } + public void setDetailInfo(BlockletDetailInfo blockletDetailInfo) { Gson gson = new Gson(); detailInfo = gson.toJson(blockletDetailInfo); @@ -107,7 +116,8 @@ public class CarbonLocalInputSplit { @JsonProperty("version") short version, @JsonProperty("deleteDeltaFiles") String[] deleteDeltaFiles, @JsonProperty("blockletId") String blockletId, - @JsonProperty("detailInfo") String detailInfo + @JsonProperty("detailInfo") String detailInfo, + @JsonProperty("fileFormatOrdinal") int fileFormatOrdinal ) { this.path = path; this.start = start; @@ -120,7 +130,8 @@ public class CarbonLocalInputSplit { this.deleteDeltaFiles = deleteDeltaFiles; this.blockletId = blockletId; this.detailInfo = detailInfo; - + this.fileFormatOrdinal = fileFormatOrdinal; + this.fileFormat = FileFormat.getByOrdinal(fileFormatOrdinal); } public static CarbonInputSplit convertSplit(CarbonLocalInputSplit carbonLocalInputSplit) { @@ -132,18 +143,21 @@ public class CarbonLocalInputSplit { carbonLocalInputSplit.getNumberOfBlocklets(), ColumnarFormatVersion.valueOf(carbonLocalInputSplit.getVersion()), carbonLocalInputSplit.getDeleteDeltaFiles()); - Gson gson = new Gson(); - BlockletDetailInfo blockletDetailInfo = - gson.fromJson(carbonLocalInputSplit.detailInfo, BlockletDetailInfo.class); - if (null == blockletDetailInfo) { - throw new RuntimeException("Could not read blocklet details"); - } - try { - blockletDetailInfo.readColumnSchema(blockletDetailInfo.getColumnSchemaBinary()); - } catch (IOException e) { - throw new RuntimeException(e); + inputSplit.setFormat(carbonLocalInputSplit.getFileFormat()); + if (FileFormat.COLUMNAR_V3.ordinal() == inputSplit.getFileFormat().ordinal()) { + Gson gson = new Gson(); + BlockletDetailInfo blockletDetailInfo = + gson.fromJson(carbonLocalInputSplit.detailInfo, BlockletDetailInfo.class); + if (null == blockletDetailInfo) { + throw new RuntimeException("Could not read blocklet details"); + } + try { + blockletDetailInfo.readColumnSchema(blockletDetailInfo.getColumnSchemaBinary()); + } catch (IOException e) { + throw new RuntimeException(e); + } + inputSplit.setDetailInfo(blockletDetailInfo); } - inputSplit.setDetailInfo(blockletDetailInfo); return inputSplit; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalMultiBlockSplit.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalMultiBlockSplit.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalMultiBlockSplit.java index fd232ed..6702c5f 100755 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalMultiBlockSplit.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalMultiBlockSplit.java @@ -70,6 +70,9 @@ public class CarbonLocalMultiBlockSplit { @JsonProperty("locations") String[] locations) { this.splitList = splitList; this.locations = locations; + if (!splitList.isEmpty()) { + this.fileFormat = splitList.get(0).getFileFormat(); + } } public String getJsonString() { @@ -87,6 +90,7 @@ public class CarbonLocalMultiBlockSplit { CarbonMultiBlockSplit carbonMultiBlockSplit = new CarbonMultiBlockSplit(carbonInputSplitList, carbonLocalMultiBlockSplit.getLocations()); + carbonMultiBlockSplit.setFileFormat(carbonLocalMultiBlockSplit.getFileFormat()); return carbonMultiBlockSplit; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java index 5ede272..1121a37 100755 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java @@ -277,7 +277,8 @@ public class CarbonTableReader { carbonInputSplit.getLength(), Arrays.asList(carbonInputSplit.getLocations()), carbonInputSplit.getNumberOfBlocklets(), carbonInputSplit.getVersion().number(), carbonInputSplit.getDeleteDeltaFiles(), carbonInputSplit.getBlockletId(), - gson.toJson(carbonInputSplit.getDetailInfo()))); + gson.toJson(carbonInputSplit.getDetailInfo()), + carbonInputSplit.getFileFormat().ordinal())); } // Use block distribution http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java index 0eee58a..37eb111 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java @@ -92,4 +92,15 @@ public class BooleanStreamReader extends CarbonColumnVectorImpl builder = type.createBlockBuilder(null, batchSize); } + @Override public void putObject(int rowId, Object value) { + if (value == null) { + putNull(rowId); + } else { + if (dictionary == null) { + putBoolean(rowId, (boolean) value); + } else { + putInt(rowId, (int) value); + } + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java index ddc855a..da8d913 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java @@ -167,4 +167,16 @@ public class DecimalSliceStreamReader extends CarbonColumnVectorImpl "Read decimal precision larger than column precision"); return decimal; } + + @Override public void putObject(int rowId, Object value) { + if (value == null) { + putNull(rowId); + } else { + if (dictionary == null) { + decimalBlockWriter((BigDecimal) value); + } else { + putInt(rowId, (int) value); + } + } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java index ed9a202..8c3a73f 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java @@ -89,4 +89,16 @@ public class DoubleStreamReader extends CarbonColumnVectorImpl implements Presto @Override public void reset() { builder = type.createBlockBuilder(null, batchSize); } + + @Override public void putObject(int rowId, Object value) { + if (value == null) { + putNull(rowId); + } else { + if (dictionary == null) { + putDouble(rowId, (double) value); + } else { + putInt(rowId, (int) value); + } + } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java index 52ddbb2..3b7e0bf 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java @@ -88,4 +88,11 @@ public class IntegerStreamReader extends CarbonColumnVectorImpl builder = type.createBlockBuilder(null, batchSize); } + @Override public void putObject(int rowId, Object value) { + if (value == null) { + putNull(rowId); + } else { + putInt(rowId, (int) value); + } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java index 81fdf88..abaf0a0 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java @@ -86,4 +86,16 @@ public class LongStreamReader extends CarbonColumnVectorImpl implements PrestoVe builder.appendNull(); } } + + @Override public void putObject(int rowId, Object value) { + if (value == null) { + putNull(rowId); + } else { + if (dictionary == null) { + putLong(rowId, (long) value); + } else { + putInt(rowId, (int) value); + } + } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java index 7411513..32498e0 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java @@ -86,4 +86,16 @@ public class ShortStreamReader extends CarbonColumnVectorImpl implements PrestoV @Override public void reset() { builder = type.createBlockBuilder(null, batchSize); } + + @Override public void putObject(int rowId, Object value) { + if (value == null) { + putNull(rowId); + } else { + if (dictionary == null) { + putShort(rowId, (short) value); + } else { + putInt(rowId, (int) value); + } + } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java index 1e4688f..3b3c78c 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java @@ -27,6 +27,7 @@ import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.scan.result.vector.CarbonDictionary; import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl; +import org.apache.carbondata.core.util.ByteUtil; import org.apache.carbondata.core.util.DataTypeUtil; import com.facebook.presto.spi.block.Block; @@ -156,4 +157,16 @@ public class SliceStreamReader extends CarbonColumnVectorImpl implements PrestoV ((String) data).getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)))); } } + + @Override public void putObject(int rowId, Object value) { + if (value == null) { + putNull(rowId); + } else { + if (dictionaryBlock == null) { + putByteArray(rowId, ByteUtil.toBytes((String) value)); + } else { + putInt(rowId, (int) value); + } + } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java index 1052a74..2b7f0c0 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java @@ -87,4 +87,16 @@ public class TimestampStreamReader extends CarbonColumnVectorImpl @Override public void reset() { builder = type.createBlockBuilder(null, batchSize); } + + @Override public void putObject(int rowId, Object value) { + if (value == null) { + putNull(rowId); + } else { + if (dictionary == null) { + putLong(rowId, (Long) value); + } else { + putInt(rowId, (int) value); + } + } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index a32a8de..0ab6a3a 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -61,11 +61,11 @@ import org.apache.carbondata.hadoop._ import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonInputFormat} import org.apache.carbondata.hadoop.api.CarbonTableInputFormat import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport +import org.apache.carbondata.hadoop.stream.CarbonStreamInputFormat import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil import org.apache.carbondata.processing.util.CarbonLoaderUtil import org.apache.carbondata.spark.InitInputMetrics import org.apache.carbondata.spark.util.Util -import org.apache.carbondata.streaming.CarbonStreamInputFormat /** * This RDD is used to perform query on CarbonData file. Before sending tasks to scan