[CARBONDATA-2441][Datamap] Implement distribute interface for bloom datamap
Implement distribute interface for bloom datamap This closes #2272 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/452c42b9 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/452c42b9 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/452c42b9 Branch: refs/heads/spark-2.3 Commit: 452c42b994e6b4c0056b3cf097ebdb579b8d1e70 Parents: 6b94971 Author: xuchuanyin <xuchuan...@hust.edu.cn> Authored: Sat May 5 13:51:34 2018 +0800 Committer: Jacky Li <jacky.li...@qq.com> Committed: Wed May 9 10:27:24 2018 +0800 ---------------------------------------------------------------------- .../bloom/BloomCoarseGrainDataMapFactory.java | 60 +++++++++++++++++++- .../bloom/BloomDataMapDistributable.java | 37 ++++++++++++ 2 files changed, 95 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/452c42b9/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java ---------------------------------------------------------------------- diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java index 4e62526..1d6eab7 100644 --- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java @@ -19,6 +19,7 @@ package org.apache.carbondata.datamap.bloom; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Objects; @@ -29,13 +30,16 @@ import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.datamap.DataMapDistributable; import org.apache.carbondata.core.datamap.DataMapLevel; import org.apache.carbondata.core.datamap.DataMapMeta; +import org.apache.carbondata.core.datamap.DataMapStoreManager; import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datamap.TableDataMap; import org.apache.carbondata.core.datamap.dev.DataMapFactory; import org.apache.carbondata.core.datamap.dev.DataMapModel; import org.apache.carbondata.core.datamap.dev.DataMapRefresher; import org.apache.carbondata.core.datamap.dev.DataMapWriter; import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.features.TableOperation; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; @@ -196,12 +200,64 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa @Override public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable) throws IOException { - return null; + List<CoarseGrainDataMap> coarseGrainDataMaps = new ArrayList<>(); + BloomCoarseGrainDataMap bloomCoarseGrainDataMap = new BloomCoarseGrainDataMap(); + String indexPath = ((BloomDataMapDistributable) distributable).getIndexPath(); + bloomCoarseGrainDataMap.init(new DataMapModel(indexPath)); + coarseGrainDataMaps.add(bloomCoarseGrainDataMap); + return coarseGrainDataMaps; + } + + /** + * returns all the directories of lucene index files for query + * Note: copied from luceneDataMapFactory, will extract to a common interface + */ + private CarbonFile[] getAllIndexDirs(String tablePath, String segmentId) { + List<CarbonFile> indexDirs = new ArrayList<>(); + List<TableDataMap> dataMaps; + try { + // there can be multiple bloom datamaps present on a table, so get all datamaps and form + // the path till the index file directories in all datamaps folders present in each segment + dataMaps = DataMapStoreManager.getInstance().getAllDataMap(getCarbonTable()); + } catch (IOException ex) { + LOGGER.error(ex, String.format("failed to get datamaps for tablePath %s, segmentId %s", + tablePath, segmentId)); + throw new RuntimeException(ex); + } + if (dataMaps.size() > 0) { + for (TableDataMap dataMap : dataMaps) { + List<CarbonFile> indexFiles; + String dmPath = CarbonTablePath.getSegmentPath(tablePath, segmentId) + File.separator + + dataMap.getDataMapSchema().getDataMapName(); + FileFactory.FileType fileType = FileFactory.getFileType(dmPath); + final CarbonFile dirPath = FileFactory.getCarbonFile(dmPath, fileType); + indexFiles = Arrays.asList(dirPath.listFiles(new CarbonFileFilter() { + @Override + public boolean accept(CarbonFile file) { + return file.isDirectory(); + } + })); + indexDirs.addAll(indexFiles); + } + } + return indexDirs.toArray(new CarbonFile[0]); } @Override public List<DataMapDistributable> toDistributable(Segment segment) { - return null; + List<DataMapDistributable> dataMapDistributableList = new ArrayList<>(); + CarbonFile[] indexDirs = + getAllIndexDirs(getCarbonTable().getTablePath(), segment.getSegmentNo()); + for (CarbonFile indexDir : indexDirs) { + // Filter out the tasks which are filtered through CG datamap. + if (!segment.getFilteredIndexShardNames().contains(indexDir.getName())) { + continue; + } + DataMapDistributable bloomDataMapDistributable = new BloomDataMapDistributable( + indexDir.getAbsolutePath()); + dataMapDistributableList.add(bloomDataMapDistributable); + } + return dataMapDistributableList; } @Override http://git-wip-us.apache.org/repos/asf/carbondata/blob/452c42b9/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapDistributable.java ---------------------------------------------------------------------- diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapDistributable.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapDistributable.java new file mode 100644 index 0000000..86d6932 --- /dev/null +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapDistributable.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap.bloom; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.core.datamap.DataMapDistributable; + +@InterfaceAudience.Internal +class BloomDataMapDistributable extends DataMapDistributable { + /** + * parent folder of the bloomindex file + */ + private String indexPath; + + BloomDataMapDistributable(String indexPath) { + this.indexPath = indexPath; + } + + public String getIndexPath() { + return indexPath; + } +}