This is an automated email from the ASF dual-hosted git repository. ravipesala pushed a commit to branch branch-1.6 in repository https://gitbox.apache.org/repos/asf/carbondata.git
commit 43a8086e15ec34648c02f9022975e683fc139f25 Author: kunal642 <kunalkapoor...@gmail.com> AuthorDate: Thu Jun 27 14:32:11 2019 +0530 [CARBONDATA-3454] optimized index server output for count(*) Optimised the output for count(*) queries so that only a long is send back to the driver to reduce the network transfer cost for index server This closes #3308 --- .../apache/carbondata/core/datamap/DataMapJob.java | 2 + .../carbondata/core/datamap/DataMapUtil.java | 13 ++- .../core/datamap/DistributableDataMapFormat.java | 34 +++++-- .../core/indexstore/ExtendedBlocklet.java | 68 ++++++++----- .../core/indexstore/ExtendedBlockletWrapper.java | 27 +++-- .../ExtendedBlockletWrapperContainer.java | 19 ++-- .../carbondata/hadoop/api/CarbonInputFormat.java | 52 ++++++++-- .../hadoop/api/CarbonTableInputFormat.java | 22 ++-- .../carbondata/indexserver/DataMapJobs.scala | 15 ++- .../indexserver/DistributedCountRDD.scala | 111 +++++++++++++++++++++ .../indexserver/DistributedPruneRDD.scala | 29 ++---- .../indexserver/DistributedRDDUtils.scala | 13 +++ .../carbondata/indexserver/IndexServer.scala | 19 ++++ 13 files changed, 319 insertions(+), 105 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapJob.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapJob.java index 9eafe7c..326282d 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapJob.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapJob.java @@ -35,4 +35,6 @@ public interface DataMapJob extends Serializable { List<ExtendedBlocklet> execute(DistributableDataMapFormat dataMapFormat); + Long executeCountJob(DistributableDataMapFormat dataMapFormat); + } diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java index dd9debc..bca7409 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java @@ -230,7 +230,7 @@ public class DataMapUtil { List<Segment> validSegments, List<Segment> invalidSegments, DataMapLevel level, List<String> segmentsToBeRefreshed) throws IOException { return executeDataMapJob(carbonTable, resolver, dataMapJob, partitionsToPrune, validSegments, - invalidSegments, level, false, segmentsToBeRefreshed); + invalidSegments, level, false, segmentsToBeRefreshed, false); } /** @@ -241,7 +241,8 @@ public class DataMapUtil { public static List<ExtendedBlocklet> executeDataMapJob(CarbonTable carbonTable, FilterResolverIntf resolver, DataMapJob dataMapJob, List<PartitionSpec> partitionsToPrune, List<Segment> validSegments, List<Segment> invalidSegments, DataMapLevel level, - Boolean isFallbackJob, List<String> segmentsToBeRefreshed) throws IOException { + Boolean isFallbackJob, List<String> segmentsToBeRefreshed, boolean isCountJob) + throws IOException { List<String> invalidSegmentNo = new ArrayList<>(); for (Segment segment : invalidSegments) { invalidSegmentNo.add(segment.getSegmentNo()); @@ -250,9 +251,11 @@ public class DataMapUtil { DistributableDataMapFormat dataMapFormat = new DistributableDataMapFormat(carbonTable, resolver, validSegments, invalidSegmentNo, partitionsToPrune, false, level, isFallbackJob); - List<ExtendedBlocklet> prunedBlocklets = dataMapJob.execute(dataMapFormat); - // Apply expression on the blocklets. - return prunedBlocklets; + if (isCountJob) { + dataMapFormat.setCountStarJob(); + dataMapFormat.setIsWriteToFile(false); + } + return dataMapJob.execute(dataMapFormat); } public static SegmentStatusManager.ValidAndInvalidSegmentsInfo getValidAndInvalidSegments( diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java index 8426fcb..b430c5d 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java @@ -28,7 +28,6 @@ import java.util.UUID; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datamap.dev.DataMap; import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.indexstore.ExtendedBlocklet; @@ -91,6 +90,8 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl private boolean isWriteToFile = true; + private boolean isCountStarJob = false; + DistributableDataMapFormat() { } @@ -103,7 +104,7 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl this.dataMapToClear = dataMapToClear; } - DistributableDataMapFormat(CarbonTable table, FilterResolverIntf filterResolverIntf, + public DistributableDataMapFormat(CarbonTable table, FilterResolverIntf filterResolverIntf, List<Segment> validSegments, List<String> invalidSegments, List<PartitionSpec> partitions, boolean isJobToClearDataMaps, DataMapLevel dataMapLevel, boolean isFallbackJob) throws IOException { @@ -136,7 +137,6 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl return new RecordReader<Void, ExtendedBlocklet>() { private Iterator<ExtendedBlocklet> blockletIterator; private ExtendedBlocklet currBlocklet; - private List<DataMap> dataMaps; @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) @@ -149,7 +149,6 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl if (dataMapLevel == null) { TableDataMap defaultDataMap = DataMapStoreManager.getInstance() .getDataMap(table, distributable.getDistributable().getDataMapSchema()); - dataMaps = defaultDataMap.getTableDataMaps(distributable.getDistributable()); blocklets = defaultDataMap .prune(segmentsToLoad, new DataMapFilter(filterResolverIntf), partitions); blocklets = DataMapUtil @@ -192,11 +191,6 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl @Override public void close() throws IOException { - if (null != dataMaps) { - for (DataMap dataMap : dataMaps) { - dataMap.finish(); - } - } } }; } @@ -247,6 +241,7 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl out.writeUTF(taskGroupDesc); out.writeUTF(queryId); out.writeBoolean(isWriteToFile); + out.writeBoolean(isCountStarJob); } @Override @@ -292,6 +287,7 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl this.taskGroupDesc = in.readUTF(); this.queryId = in.readUTF(); this.isWriteToFile = in.readBoolean(); + this.isCountStarJob = in.readBoolean(); } private void initReadCommittedScope() throws IOException { @@ -398,9 +394,29 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl return validSegments; } + public List<Segment> getValidSegments() { + return validSegments; + } + public void createDataMapChooser() throws IOException { if (null != filterResolverIntf) { this.dataMapChooser = new DataMapChooser(table); } } + + public void setCountStarJob() { + this.isCountStarJob = true; + } + + public boolean isCountStarJob() { + return this.isCountStarJob; + } + + public List<PartitionSpec> getPartitions() { + return partitions; + } + + public ReadCommittedScope getReadCommittedScope() { + return readCommittedScope; + } } diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java index a85423b..611e969 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java @@ -39,6 +39,10 @@ public class ExtendedBlocklet extends Blocklet { private CarbonInputSplit inputSplit; + private Long count; + + private String segmentNo; + public ExtendedBlocklet() { } @@ -78,6 +82,9 @@ public class ExtendedBlocklet extends Blocklet { } public String getSegmentId() { + if (segmentNo != null) { + return segmentNo; + } return this.inputSplit.getSegmentId(); } @@ -92,8 +99,12 @@ public class ExtendedBlocklet extends Blocklet { return getFilePath(); } - public String getDataMapWriterPath() { - return this.inputSplit.getDataMapWritePath(); + public Long getRowCount() { + if (count != null) { + return count; + } else { + return (long) inputSplit.getRowCount(); + } } public void setDataMapWriterPath(String dataMapWriterPath) { @@ -161,30 +172,35 @@ public class ExtendedBlocklet extends Blocklet { * @param uniqueLocation * @throws IOException */ - public void serializeData(DataOutput out, Map<String, Short> uniqueLocation) + public void serializeData(DataOutput out, Map<String, Short> uniqueLocation, boolean isCountJob) throws IOException { super.write(out); - if (dataMapUniqueId == null) { - out.writeBoolean(false); + if (isCountJob) { + out.writeLong(inputSplit.getRowCount()); + out.writeUTF(inputSplit.getSegmentId()); } else { - out.writeBoolean(true); - out.writeUTF(dataMapUniqueId); - } - out.writeBoolean(inputSplit != null); - if (inputSplit != null) { - // creating byte array output stream to get the size of input split serializeData size - ExtendedByteArrayOutputStream ebos = new ExtendedByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(ebos); - inputSplit.setFilePath(null); - inputSplit.setBucketId(null); - if (inputSplit.isBlockCache()) { - inputSplit.updateFooteroffset(); - inputSplit.updateBlockLength(); - inputSplit.setWriteDetailInfo(false); + if (dataMapUniqueId == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeUTF(dataMapUniqueId); + } + out.writeBoolean(inputSplit != null); + if (inputSplit != null) { + // creating byte array output stream to get the size of input split serializeData size + ExtendedByteArrayOutputStream ebos = new ExtendedByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(ebos); + inputSplit.setFilePath(null); + inputSplit.setBucketId(null); + if (inputSplit.isBlockCache()) { + inputSplit.updateFooteroffset(); + inputSplit.updateBlockLength(); + inputSplit.setWriteDetailInfo(false); + } + inputSplit.serializeFields(dos, uniqueLocation); + out.writeInt(ebos.size()); + out.write(ebos.getBuffer(), 0, ebos.size()); } - inputSplit.serializeFields(dos, uniqueLocation); - out.writeInt(ebos.size()); - out.write(ebos.getBuffer(), 0 , ebos.size()); } } @@ -195,9 +211,15 @@ public class ExtendedBlocklet extends Blocklet { * @param tablePath * @throws IOException */ - public void deserializeFields(DataInput in, String[] locations, String tablePath) + public void deserializeFields(DataInput in, String[] locations, String tablePath, + boolean isCountJob) throws IOException { super.readFields(in); + if (isCountJob) { + count = in.readLong(); + segmentNo = in.readUTF(); + return; + } if (in.readBoolean()) { dataMapUniqueId = in.readUTF(); } diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapper.java index ab051ea..f722f32 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapper.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapper.java @@ -56,25 +56,20 @@ public class ExtendedBlockletWrapper implements Writable, Serializable { private static final Logger LOGGER = LogServiceFactory.getLogService(ExtendedBlockletWrapper.class.getName()); - + private static final int BUFFER_SIZE = 8 * 1024 * 1024; + private static final int BLOCK_SIZE = 256 * 1024 * 1024; private boolean isWrittenToFile; - private int dataSize; - private byte[] bytes; - private static final int BUFFER_SIZE = 8 * 1024 * 1024; - - private static final int BLOCK_SIZE = 256 * 1024 * 1024; - public ExtendedBlockletWrapper() { } public ExtendedBlockletWrapper(List<ExtendedBlocklet> extendedBlockletList, String tablePath, - String queryId, boolean isWriteToFile) { + String queryId, boolean isWriteToFile, boolean isCountJob) { Map<String, Short> uniqueLocations = new HashMap<>(); - byte[] bytes = convertToBytes(tablePath, uniqueLocations, extendedBlockletList); + byte[] bytes = convertToBytes(tablePath, uniqueLocations, extendedBlockletList, isCountJob); int serializeAllowedSize = Integer.parseInt(CarbonProperties.getInstance() .getProperty(CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD, CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD_DEFAULT)) * 1024; @@ -122,13 +117,13 @@ public class ExtendedBlockletWrapper implements Writable, Serializable { } private byte[] convertToBytes(String tablePath, Map<String, Short> uniqueLocations, - List<ExtendedBlocklet> extendedBlockletList) { + List<ExtendedBlocklet> extendedBlockletList, boolean isCountJob) { ByteArrayOutputStream bos = new ExtendedByteArrayOutputStream(); DataOutputStream stream = new DataOutputStream(bos); try { for (ExtendedBlocklet extendedBlocklet : extendedBlockletList) { extendedBlocklet.setFilePath(extendedBlocklet.getFilePath().replace(tablePath, "")); - extendedBlocklet.serializeData(stream, uniqueLocations); + extendedBlocklet.serializeData(stream, uniqueLocations, isCountJob); } return new SnappyCompressor().compressByte(bos.toByteArray()); } catch (IOException e) { @@ -142,6 +137,7 @@ public class ExtendedBlockletWrapper implements Writable, Serializable { * Below method will be used to write the data to stream[file/memory] * Data Format * <number of splits><number of unique location[short]><locations><serialize data len><data> + * * @param stream * @param data * @param uniqueLocation @@ -158,7 +154,7 @@ public class ExtendedBlockletWrapper implements Writable, Serializable { final Map.Entry<String, Short> next = iterator.next(); uniqueLoc[next.getValue()] = next.getKey(); } - stream.writeShort((short)uniqueLoc.length); + stream.writeShort((short) uniqueLoc.length); for (String loc : uniqueLoc) { stream.writeUTF(loc); } @@ -170,12 +166,14 @@ public class ExtendedBlockletWrapper implements Writable, Serializable { * deseralize the blocklet data from file or stream * data format * <number of splits><number of unique location[short]><locations><serialize data len><data> + * * @param tablePath * @param queryId * @return * @throws IOException */ - public List<ExtendedBlocklet> readBlocklet(String tablePath, String queryId) throws IOException { + public List<ExtendedBlocklet> readBlocklet(String tablePath, String queryId, boolean isCountJob) + throws IOException { byte[] data; if (bytes != null) { if (isWrittenToFile) { @@ -218,7 +216,7 @@ public class ExtendedBlockletWrapper implements Writable, Serializable { try { for (int i = 0; i < numberOfBlocklet; i++) { ExtendedBlocklet extendedBlocklet = new ExtendedBlocklet(); - extendedBlocklet.deserializeFields(eDIS, locations, tablePath); + extendedBlocklet.deserializeFields(eDIS, locations, tablePath, isCountJob); extendedBlockletList.add(extendedBlocklet); } } finally { @@ -248,4 +246,5 @@ public class ExtendedBlockletWrapper implements Writable, Serializable { } this.dataSize = in.readInt(); } + } diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapperContainer.java b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapperContainer.java index 0c52297..40acf9e 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapperContainer.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapperContainer.java @@ -62,8 +62,8 @@ public class ExtendedBlockletWrapperContainer implements Writable { this.isFallbackJob = isFallbackJob; } - public List<ExtendedBlocklet> getExtendedBlockets(String tablePath, String queryId) - throws IOException { + public List<ExtendedBlocklet> getExtendedBlockets(String tablePath, String queryId, + boolean isCountJob) throws IOException { if (!isFallbackJob) { int numOfThreads = CarbonProperties.getNumOfThreadsForPruning(); ExecutorService executorService = Executors @@ -85,8 +85,8 @@ public class ExtendedBlockletWrapperContainer implements Writable { List<Future<List<ExtendedBlocklet>>> futures = new ArrayList<>(); for (int i = 0; i < split.length; i++) { end += split[i]; - futures.add(executorService - .submit(new ExtendedBlockletDeserializerThread(start, end, tablePath, queryId))); + futures.add(executorService.submit( + new ExtendedBlockletDeserializerThread(start, end, tablePath, queryId, isCountJob))); start += split[i]; } executorService.shutdown(); @@ -109,7 +109,8 @@ public class ExtendedBlockletWrapperContainer implements Writable { } else { List<ExtendedBlocklet> extendedBlocklets = new ArrayList<>(); for (ExtendedBlockletWrapper extendedBlockletWrapper: extendedBlockletWrappers) { - extendedBlocklets.addAll(extendedBlockletWrapper.readBlocklet(tablePath, queryId)); + extendedBlocklets + .addAll(extendedBlockletWrapper.readBlocklet(tablePath, queryId, isCountJob)); } return extendedBlocklets; } @@ -125,18 +126,22 @@ public class ExtendedBlockletWrapperContainer implements Writable { private String queryId; + private boolean isCountJob; + public ExtendedBlockletDeserializerThread(int start, int end, String tablePath, - String queryId) { + String queryId, boolean isCountJob) { this.start = start; this.end = end; this.tablePath = tablePath; this.queryId = queryId; + this.isCountJob = isCountJob; } @Override public List<ExtendedBlocklet> call() throws Exception { List<ExtendedBlocklet> extendedBlocklets = new ArrayList<>(); for (int i = start; i < end; i++) { - extendedBlocklets.addAll(extendedBlockletWrappers[i].readBlocklet(tablePath, queryId)); + extendedBlocklets.addAll(extendedBlockletWrappers[i].readBlocklet(tablePath, queryId, + isCountJob)); } return extendedBlocklets; } diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java index 45041e4..56ccabc 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java @@ -39,6 +39,7 @@ import org.apache.carbondata.core.datamap.DataMapJob; import org.apache.carbondata.core.datamap.DataMapLevel; import org.apache.carbondata.core.datamap.DataMapStoreManager; import org.apache.carbondata.core.datamap.DataMapUtil; +import org.apache.carbondata.core.datamap.DistributableDataMapFormat; import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.TableDataMap; import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper; @@ -412,10 +413,42 @@ m filterExpression */ @Override public abstract List<InputSplit> getSplits(JobContext job) throws IOException; - List<ExtendedBlocklet> getDistributedSplit(CarbonTable table, + /** + * This method will execute a distributed job(DistributedDataMapJob) to get the count for the + * table. If the DistributedDataMapJob fails for some reason then an embedded job is fired to + * get the count. + */ + Long getDistributedCount(CarbonTable table, + List<PartitionSpec> partitionNames, List<Segment> validSegments) throws IOException { + DistributableDataMapFormat dataMapFormat = + new DistributableDataMapFormat(table, null, validSegments, new ArrayList<String>(), + partitionNames, false, null, false); + dataMapFormat.setIsWriteToFile(false); + try { + DataMapJob dataMapJob = + (DataMapJob) DataMapUtil.createDataMapJob(DataMapUtil.DISTRIBUTED_JOB_NAME); + if (dataMapJob == null) { + throw new ExceptionInInitializerError("Unable to create DistributedDataMapJob"); + } + return dataMapJob.executeCountJob(dataMapFormat); + } catch (Exception e) { + LOG.error("Failed to get count from index server. Initializing fallback", e); + DataMapJob dataMapJob = DataMapUtil.getEmbeddedJob(); + return dataMapJob.executeCountJob(dataMapFormat); + } + } + + List<ExtendedBlocklet> getDistributedBlockRowCount(CarbonTable table, + List<PartitionSpec> partitionNames, List<Segment> validSegments, + List<Segment> invalidSegments, List<String> segmentsToBeRefreshed) throws IOException { + return getDistributedSplit(table, null, partitionNames, validSegments, invalidSegments, + segmentsToBeRefreshed, true); + } + + private List<ExtendedBlocklet> getDistributedSplit(CarbonTable table, FilterResolverIntf filterResolverIntf, List<PartitionSpec> partitionNames, List<Segment> validSegments, List<Segment> invalidSegments, - List<String> segmentsToBeRefreshed) throws IOException { + List<String> segmentsToBeRefreshed, boolean isCountJob) throws IOException { try { DataMapJob dataMapJob = (DataMapJob) DataMapUtil.createDataMapJob(DataMapUtil.DISTRIBUTED_JOB_NAME); @@ -424,7 +457,7 @@ m filterExpression } return DataMapUtil .executeDataMapJob(table, filterResolverIntf, dataMapJob, partitionNames, validSegments, - invalidSegments, null, segmentsToBeRefreshed); + invalidSegments, null, false, segmentsToBeRefreshed, isCountJob); } catch (Exception e) { // Check if fallback is disabled for testing purposes then directly throw exception. if (CarbonProperties.getInstance().isFallBackDisabled()) { @@ -432,10 +465,9 @@ m filterExpression } LOG.error("Exception occurred while getting splits using index server. Initiating Fall " + "back to embedded mode", e); - return DataMapUtil - .executeDataMapJob(table, filterResolverIntf, - DataMapUtil.getEmbeddedJob(), partitionNames, validSegments, invalidSegments, null, - true, segmentsToBeRefreshed); + return DataMapUtil.executeDataMapJob(table, filterResolverIntf, + DataMapUtil.getEmbeddedJob(), partitionNames, validSegments, + invalidSegments, null, true, segmentsToBeRefreshed, isCountJob); } } @@ -545,7 +577,7 @@ m filterExpression try { prunedBlocklets = getDistributedSplit(carbonTable, filter.getResolver(), partitionsToPrune, segmentIds, - invalidSegments, segmentsToBeRefreshed); + invalidSegments, segmentsToBeRefreshed, false); } catch (Exception e) { // Check if fallback is disabled then directly throw exception otherwise try driver // pruning. @@ -580,7 +612,7 @@ m filterExpression if (distributedCG && dataMapJob != null) { cgPrunedBlocklets = DataMapUtil .executeDataMapJob(carbonTable, filter.getResolver(), dataMapJob, partitionsToPrune, - segmentIds, invalidSegments, DataMapLevel.CG, true, new ArrayList<String>()); + segmentIds, invalidSegments, DataMapLevel.CG, new ArrayList<String>()); } else { cgPrunedBlocklets = cgDataMapExprWrapper.prune(segmentIds, partitionsToPrune); } @@ -616,7 +648,7 @@ m filterExpression // Prune segments from already pruned blocklets fgPrunedBlocklets = DataMapUtil .executeDataMapJob(carbonTable, filter.getResolver(), dataMapJob, partitionsToPrune, - segmentIds, invalidSegments, fgDataMapExprWrapper.getDataMapLevel(), true, + segmentIds, invalidSegments, fgDataMapExprWrapper.getDataMapLevel(), new ArrayList<String>()); // note that the 'fgPrunedBlocklets' has extra datamap related info compared with // 'prunedBlocklets', so the intersection should keep the elements in 'fgPrunedBlocklets' diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index 3b7a800..74a4d6e 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -33,6 +33,7 @@ import org.apache.carbondata.core.datamap.DataMapStoreManager; import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.TableDataMap; import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.indexstore.ExtendedBlocklet; import org.apache.carbondata.core.indexstore.PartitionSpec; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.schema.PartitionInfo; @@ -562,15 +563,14 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { if (CarbonProperties.getInstance() .isDistributedPruningEnabled(table.getDatabaseName(), table.getTableName())) { try { - List<InputSplit> extendedBlocklets = CarbonTableInputFormat.convertToCarbonInputSplit( - getDistributedSplit(table, null, partitions, filteredSegment, - allSegments.getInvalidSegments(), toBeCleanedSegments)); - for (InputSplit extendedBlocklet : extendedBlocklets) { - CarbonInputSplit blocklet = (CarbonInputSplit) extendedBlocklet; + List<ExtendedBlocklet> extendedBlocklets = + getDistributedBlockRowCount(table, partitions, filteredSegment, + allSegments.getInvalidSegments(), toBeCleanedSegments); + for (ExtendedBlocklet blocklet : extendedBlocklets) { String filePath = blocklet.getFilePath().replace("\\", "/"); String blockName = filePath.substring(filePath.lastIndexOf("/") + 1); blockletToRowCountMap.put(blocklet.getSegmentId() + "," + blockName, - (long) blocklet.getRowCount()); + blocklet.getRowCount()); } } catch (Exception e) { // Check if fallback is disabled then directly throw exception otherwise try driver @@ -615,15 +615,11 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { } } } else { - long totalRowCount = 0L; + long totalRowCount; if (CarbonProperties.getInstance() .isDistributedPruningEnabled(table.getDatabaseName(), table.getTableName())) { - List<InputSplit> extendedBlocklets = CarbonTableInputFormat.convertToCarbonInputSplit( - getDistributedSplit(table, null, partitions, filteredSegment, - allSegments.getInvalidSegments(), new ArrayList<String>())); - for (InputSplit extendedBlocklet : extendedBlocklets) { - totalRowCount += ((CarbonInputSplit) extendedBlocklet).getRowCount(); - } + totalRowCount = + getDistributedCount(table, partitions, filteredSegment); } else { TableDataMap defaultDataMap = DataMapStoreManager.getInstance().getDefaultDataMap(table); totalRowCount = defaultDataMap.getRowCount(filteredSegment, partitions, defaultDataMap); diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala index 01b8824..1fee051 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala @@ -65,7 +65,8 @@ class DistributedDataMapJob extends AbstractDataMapJob { dataMapFormat.getCarbonTable.getAbsoluteTableIdentifier, filterProcessor) dataMapFormat.setFilterResolverIntf(filterInf) IndexServer.getClient.getSplits(dataMapFormat) - .getExtendedBlockets(dataMapFormat.getCarbonTable.getTablePath, dataMapFormat.getQueryId) + .getExtendedBlockets(dataMapFormat.getCarbonTable.getTablePath, dataMapFormat + .getQueryId, dataMapFormat.isCountStarJob) } finally { val tmpPath = CarbonUtil .getIndexServerTempPath(dataMapFormat.getCarbonTable.getTablePath, @@ -106,7 +107,11 @@ class DistributedDataMapJob extends AbstractDataMapJob { filterInf.getFilterExpression.getFilterExpressionType == ExpressionType.UNKNOWN) { return filterProcessor.changeUnknownResloverToTrue(tableIdentifer) } - return filterInf; + filterInf + } + + override def executeCountJob(dataMapFormat: DistributableDataMapFormat): java.lang.Long = { + IndexServer.getClient.getCount(dataMapFormat).get() } } @@ -122,7 +127,7 @@ class EmbeddedDataMapJob extends AbstractDataMapJob { dataMapFormat.setIsWriteToFile(false) dataMapFormat.setFallbackJob() val splits = IndexServer.getSplits(dataMapFormat).getExtendedBlockets(dataMapFormat - .getCarbonTable.getTablePath, dataMapFormat.getQueryId) + .getCarbonTable.getTablePath, dataMapFormat.getQueryId, dataMapFormat.isCountStarJob) // Fire a job to clear the cache from executors as Embedded mode does not maintain the cache. IndexServer.invalidateSegmentCache(dataMapFormat.getCarbonTable, dataMapFormat .getValidSegmentIds.asScala.toArray) @@ -130,4 +135,8 @@ class EmbeddedDataMapJob extends AbstractDataMapJob { splits } + override def executeCountJob(dataMapFormat: DistributableDataMapFormat): java.lang.Long = { + IndexServer.getCount(dataMapFormat).get() + } + } diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedCountRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedCountRDD.scala new file mode 100644 index 0000000..4a080fa --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedCountRDD.scala @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.indexserver + +import java.util.concurrent.Executors + +import scala.collection.JavaConverters._ +import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future} +import scala.concurrent.duration.Duration + +import org.apache.hadoop.mapred.TaskAttemptID +import org.apache.hadoop.mapreduce.{InputSplit, TaskType} +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.spark.{Partition, SparkEnv, TaskContext} +import org.apache.spark.sql.SparkSession + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.cache.CacheProvider +import org.apache.carbondata.core.datamap.{DataMapStoreManager, DistributableDataMapFormat} +import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.util.{CarbonProperties, CarbonThreadFactory} +import org.apache.carbondata.spark.rdd.CarbonRDD + +/** + * An RDD which will get the count for the table. + */ +class DistributedCountRDD(@transient ss: SparkSession, dataMapFormat: DistributableDataMapFormat) + extends CarbonRDD[(String, String)](ss, Nil) { + + @transient private val LOGGER = LogServiceFactory.getLogService(classOf[DistributedPruneRDD] + .getName) + + override protected def getPreferredLocations(split: Partition): Seq[String] = { + if (split.asInstanceOf[DataMapRDDPartition].getLocations != null) { + split.asInstanceOf[DataMapRDDPartition].getLocations.toSeq + } else { + Seq() + } + } + + override def internalCompute(split: Partition, + context: TaskContext): Iterator[(String, String)] = { + val attemptId = new TaskAttemptID(DistributedRDDUtils.generateTrackerId, + id, TaskType.MAP, split.index, 0) + val attemptContext = new TaskAttemptContextImpl(FileFactory.getConfiguration, attemptId) + val inputSplits = split.asInstanceOf[DataMapRDDPartition].inputSplit + val numOfThreads = CarbonProperties.getInstance().getNumOfThreadsForExecutorPruning + val service = Executors + .newFixedThreadPool(numOfThreads, new CarbonThreadFactory("IndexPruningPool", true)) + implicit val ec: ExecutionContextExecutor = ExecutionContext + .fromExecutor(service) + val futures = if (inputSplits.length <= numOfThreads) { + inputSplits.map { + split => generateFuture(Seq(split)) + } + } else { + DistributedRDDUtils.groupSplits(inputSplits, numOfThreads).map { + splits => generateFuture(splits) + } + } + // scalastyle:off awaitresult + val results = Await.result(Future.sequence(futures), Duration.Inf).flatten + // scalastyle:on awaitresult + val executorIP = s"${ SparkEnv.get.blockManager.blockManagerId.host }_${ + SparkEnv.get.blockManager.blockManagerId.executorId + }" + val cacheSize = if (CacheProvider.getInstance().getCarbonCache != null) { + CacheProvider.getInstance().getCarbonCache.getCurrentSize + } else { + 0L + } + Iterator((executorIP + "_" + cacheSize.toString, results.map(_._2.toLong).sum.toString)) + } + + override protected def internalGetPartitions: Array[Partition] = { + new DistributedPruneRDD(ss, dataMapFormat).partitions + } + + private def generateFuture(split: Seq[InputSplit]) + (implicit executionContext: ExecutionContext) = { + Future { + val segments = split.map { inputSplit => + val distributable = inputSplit.asInstanceOf[DataMapDistributableWrapper] + distributable.getDistributable.getSegment + .setReadCommittedScope(dataMapFormat.getReadCommittedScope) + distributable.getDistributable.getSegment + } + val defaultDataMap = DataMapStoreManager.getInstance + .getDataMap(dataMapFormat.getCarbonTable, split.head + .asInstanceOf[DataMapDistributableWrapper].getDistributable.getDataMapSchema) + defaultDataMap.getBlockRowCount(segments.toList.asJava, dataMapFormat + .getPartitions, defaultDataMap).asScala + } + } + +} diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala index d8b9c19..76d33b4 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala @@ -17,15 +17,12 @@ package org.apache.carbondata.indexserver -import java.text.SimpleDateFormat -import java.util.Date import java.util.concurrent.Executors import scala.collection.JavaConverters._ import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future} import scala.concurrent.duration.Duration -import org.apache.commons.lang.StringUtils import org.apache.hadoop.mapred.{RecordReader, TaskAttemptID} import org.apache.hadoop.mapreduce.{InputSplit, Job, TaskType} import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl @@ -35,9 +32,8 @@ import org.apache.spark.sql.hive.DistributionUtil import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.cache.CacheProvider -import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapStoreManager, DistributableDataMapFormat, TableDataMap} +import org.apache.carbondata.core.datamap.{DataMapStoreManager, DistributableDataMapFormat} import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper -import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.indexstore.{ExtendedBlocklet, ExtendedBlockletWrapper} import org.apache.carbondata.core.util.{CarbonProperties, CarbonThreadFactory} @@ -65,21 +61,13 @@ private[indexserver] class DistributedPruneRDD(@transient private val ss: SparkS @transient private val LOGGER = LogServiceFactory.getLogService(classOf[DistributedPruneRDD] .getName) - private val jobTrackerId: String = { - val formatter = new SimpleDateFormat("yyyyMMddHHmm") - formatter.format(new Date()) - } - var readers: scala.collection.Iterator[RecordReader[Void, ExtendedBlocklet]] = _ - private def groupSplits(xs: Seq[InputSplit], n: Int) = { - val (quot, rem) = (xs.size / n, xs.size % n) - val (smaller, bigger) = xs.splitAt(xs.size - rem * (quot + 1)) - (smaller.grouped(quot) ++ bigger.grouped(quot + 1)).toList - } + var readers: scala.collection.Iterator[RecordReader[Void, ExtendedBlocklet]] = _ override def internalCompute(split: Partition, context: TaskContext): Iterator[(String, ExtendedBlockletWrapper)] = { - val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0) + val attemptId = new TaskAttemptID(DistributedRDDUtils.generateTrackerId, + id, TaskType.MAP, split.index, 0) val attemptContext = new TaskAttemptContextImpl(FileFactory.getConfiguration, attemptId) val inputSplits = split.asInstanceOf[DataMapRDDPartition].inputSplit if (dataMapFormat.isJobToClearDataMaps) { @@ -118,7 +106,7 @@ private[indexserver] class DistributedPruneRDD(@transient private val ss: SparkS split => generateFuture(Seq(split), attemptContext) } } else { - groupSplits(inputSplits, numOfThreads).map { + DistributedRDDUtils.groupSplits(inputSplits, numOfThreads).map { splits => generateFuture(splits, attemptContext) } } @@ -139,14 +127,13 @@ private[indexserver] class DistributedPruneRDD(@transient private val ss: SparkS }" val value = (executorIP + "_" + cacheSize.toString, new ExtendedBlockletWrapper(f.toList .asJava, dataMapFormat.getCarbonTable.getTablePath, dataMapFormat.getQueryId, - dataMapFormat.isWriteToFile)) + dataMapFormat.isWriteToFile, dataMapFormat.isCountStarJob)) Iterator(value) } } - private def generateFuture(split: Seq[InputSplit], - attemptContextImpl: TaskAttemptContextImpl) - (implicit executionContext: ExecutionContext) = { + private def generateFuture(split: Seq[InputSplit], attemptContextImpl: TaskAttemptContextImpl) + (implicit executionContext: ExecutionContext): Future[Seq[ExtendedBlocklet]] = { Future { split.flatMap { inputSplit => val blocklets = new java.util.ArrayList[ExtendedBlocklet]() diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala index 933ec15..4819779 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala @@ -16,6 +16,8 @@ */ package org.apache.carbondata.indexserver +import java.text.SimpleDateFormat +import java.util.Date import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ @@ -325,4 +327,15 @@ object DistributedRDDUtils { } } + def groupSplits(xs: Seq[InputSplit], n: Int): List[Seq[InputSplit]] = { + val (quot, rem) = (xs.size / n, xs.size % n) + val (smaller, bigger) = xs.splitAt(xs.size - rem * (quot + 1)) + (smaller.grouped(quot) ++ bigger.grouped(quot + 1)).toList + } + + def generateTrackerId: String = { + val formatter = new SimpleDateFormat("yyyyMMddHHmm") + formatter.format(new Date()) + } + } diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala index fdaa3d1..abee487 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala @@ -23,6 +23,7 @@ import java.util.UUID import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.io.LongWritable import org.apache.hadoop.ipc.{ProtocolInfo, RPC} import org.apache.hadoop.net.NetUtils import org.apache.hadoop.security.{KerberosInfo, SecurityUtil, UserGroupInformation} @@ -58,6 +59,9 @@ trait ServerInterface { */ def invalidateSegmentCache(carbonTable: CarbonTable, segmentIds: Array[String], jobGroupId: String = ""): Unit + + def getCount(request: DistributableDataMapFormat): LongWritable + } /** @@ -99,6 +103,21 @@ object IndexServer extends ServerInterface { }) } + def getCount(request: DistributableDataMapFormat): LongWritable = { + doAs { + if (!request.isFallbackJob) { + sparkSession.sparkContext.setLocalProperty("spark.jobGroup.id", request.getTaskGroupId) + sparkSession.sparkContext + .setLocalProperty("spark.job.description", request.getTaskGroupDesc) + } + val splits = new DistributedCountRDD(sparkSession, request).collect() + if (!request.isFallbackJob) { + DistributedRDDUtils.updateExecutorCacheSize(splits.map(_._1).toSet) + } + new LongWritable(splits.map(_._2.toLong).sum) + } + } + def getSplits(request: DistributableDataMapFormat): ExtendedBlockletWrapperContainer = { doAs { if (!request.isFallbackJob) {