Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2963#discussion_r244007049
  
    --- Diff: 
datamap/example/src/main/java/org/apache/carbondata/datamap/minmax/MinMaxDataMapFactory.java
 ---
    @@ -0,0 +1,353 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.datamap.minmax;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentHashMap;
    +
    +import org.apache.carbondata.common.annotations.InterfaceAudience;
    +import 
org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.cache.Cache;
    +import org.apache.carbondata.core.cache.CacheProvider;
    +import org.apache.carbondata.core.cache.CacheType;
    +import org.apache.carbondata.core.datamap.DataMapDistributable;
    +import org.apache.carbondata.core.datamap.DataMapLevel;
    +import org.apache.carbondata.core.datamap.DataMapMeta;
    +import org.apache.carbondata.core.datamap.DataMapStoreManager;
    +import org.apache.carbondata.core.datamap.Segment;
    +import org.apache.carbondata.core.datamap.TableDataMap;
    +import org.apache.carbondata.core.datamap.dev.DataMapBuilder;
    +import org.apache.carbondata.core.datamap.dev.DataMapWriter;
    +import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
    +import 
org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMapFactory;
    +import org.apache.carbondata.core.datastore.block.SegmentProperties;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.features.TableOperation;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
    +import 
org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
    +import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
    +import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
    +import org.apache.carbondata.core.util.CarbonUtil;
    +import org.apache.carbondata.core.util.path.CarbonTablePath;
    +import org.apache.carbondata.events.Event;
    +
    +import org.apache.log4j.Logger;
    +
    +/**
    + * Min Max DataMap Factory
    + */
    +@InterfaceAudience.Internal
    +public class MinMaxDataMapFactory extends CoarseGrainDataMapFactory {
    +  private static final Logger LOGGER =
    +      
LogServiceFactory.getLogService(MinMaxDataMapFactory.class.getName());
    +  private DataMapMeta dataMapMeta;
    +  private String dataMapName;
    +  // segmentId -> list of index files
    +  private Map<String, Set<String>> segmentMap = new ConcurrentHashMap<>();
    +  private Cache<MinMaxDataMapCacheKeyValue.Key, 
MinMaxDataMapCacheKeyValue.Value> cache;
    +
    +  public MinMaxDataMapFactory(CarbonTable carbonTable, DataMapSchema 
dataMapSchema)
    +      throws MalformedDataMapCommandException {
    +    super(carbonTable, dataMapSchema);
    +
    +    // this is an example for datamap, we can choose the columns and 
operations that
    +    // will be supported by this datamap. Furthermore, we can add 
cache-support for this datamap.
    +
    +    this.dataMapName = dataMapSchema.getDataMapName();
    +    List<CarbonColumn> indexedColumns = 
carbonTable.getIndexedColumns(dataMapSchema);
    +
    +    // operations that will be supported on the indexed columns
    +    List<ExpressionType> optOperations = new ArrayList<>();
    +    optOperations.add(ExpressionType.NOT);
    +    optOperations.add(ExpressionType.EQUALS);
    +    optOperations.add(ExpressionType.NOT_EQUALS);
    +    optOperations.add(ExpressionType.GREATERTHAN);
    +    optOperations.add(ExpressionType.GREATERTHAN_EQUALTO);
    +    optOperations.add(ExpressionType.LESSTHAN);
    +    optOperations.add(ExpressionType.LESSTHAN_EQUALTO);
    +    optOperations.add(ExpressionType.IN);
    +    this.dataMapMeta = new DataMapMeta(indexedColumns, optOperations);
    +
    +    // init cache. note that the createCache ensures the singleton of the 
cache
    +    try {
    +      this.cache = CacheProvider.getInstance()
    +          .createCache(new CacheType("minmax_cache"), 
MinMaxDataMapCache.class.getName());
    +    } catch (Exception e) {
    +      LOGGER.error("Failed to create cache for minmax datamap", e);
    +      throw new MalformedDataMapCommandException(e.getMessage());
    +    }
    +  }
    +
    +  /**
    +   * createWriter will return the MinMaxDataWriter.
    +   *
    +   * @param segment
    +   * @param shardName
    +   * @return
    +   */
    +  @Override
    +  public DataMapWriter createWriter(Segment segment, String shardName,
    +      SegmentProperties segmentProperties) throws IOException {
    +    if (LOGGER.isDebugEnabled()) {
    +      LOGGER.debug(String.format(
    +          "Data of MinMaxDataMap %s for table %s will be written to %s",
    +          dataMapName, getCarbonTable().getTableName(), shardName));
    +    }
    +    return new MinMaxDataMapDirectWriter(getCarbonTable().getTablePath(), 
dataMapName,
    +        dataMapMeta.getIndexedColumns(), segment, shardName, 
segmentProperties);
    +  }
    +
    +  @Override
    +  public DataMapBuilder createBuilder(Segment segment, String shardName,
    +      SegmentProperties segmentProperties) throws IOException {
    +    return new MinMaxDataMapBuilder(getCarbonTable().getTablePath(), 
dataMapName,
    +        dataMapMeta.getIndexedColumns(), segment, shardName, 
segmentProperties);
    +  }
    +
    +  /**
    +   * getDataMaps Factory method Initializes the Min Max Data Map and 
returns.
    +   *
    +   * @param segment
    +   * @return
    +   * @throws IOException
    +   */
    +  @Override
    +  public List<CoarseGrainDataMap> getDataMaps(Segment segment) throws 
IOException {
    +    List<CoarseGrainDataMap> dataMaps = new ArrayList<>();
    +    Set<String> shardPaths = segmentMap.get(segment.getSegmentNo());
    +    if (shardPaths == null) {
    +      String dataMapStorePath = DataMapWriter.getDefaultDataMapPath(
    +          getCarbonTable().getTablePath(), segment.getSegmentNo(), 
dataMapName);
    +      CarbonFile[] carbonFiles = 
FileFactory.getCarbonFile(dataMapStorePath).listFiles();
    +      shardPaths = new HashSet<>();
    +      for (CarbonFile carbonFile : carbonFiles) {
    +        shardPaths.add(carbonFile.getAbsolutePath());
    +      }
    +      segmentMap.put(segment.getSegmentNo(), shardPaths);
    +    }
    +
    +    for (String shard : shardPaths) {
    +      MinMaxIndexDataMap dataMap = new MinMaxIndexDataMap();
    +      dataMap.init(new MinMaxDataMapModel(shard, cache, 
segment.getConfiguration()));
    +      dataMap.initOthers(getCarbonTable(), 
dataMapMeta.getIndexedColumns());
    +      dataMaps.add(dataMap);
    +    }
    +    return dataMaps;
    +  }
    +
    +  @Override
    +  public DataMapMeta getMeta() {
    +    return this.dataMapMeta;
    +  }
    +
    +  @Override
    +  public DataMapLevel getDataMapLevel() {
    +    return DataMapLevel.CG;
    +  }
    +
    +  @Override
    +  public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable 
distributable)
    +      throws IOException {
    +    List<CoarseGrainDataMap> coarseGrainDataMaps = new ArrayList<>();
    +    MinMaxIndexDataMap minMaxIndexDataMap = new MinMaxIndexDataMap();
    +    String indexPath = ((MinMaxDataMapDistributable) 
distributable).getIndexPath();
    +    minMaxIndexDataMap.init(
    +        new MinMaxDataMapModel(indexPath, cache, 
FileFactory.getConfiguration()));
    +    minMaxIndexDataMap.initOthers(getCarbonTable(), 
dataMapMeta.getIndexedColumns());
    +    coarseGrainDataMaps.add(minMaxIndexDataMap);
    +    return coarseGrainDataMaps;
    +  }
    +
    +  /**
    +   * returns all the directories of lucene index files for query
    +   * Note: copied from BloomFilterDataMapFactory, will extract to a common 
interface
    +   */
    +  private CarbonFile[] getAllIndexDirs(String tablePath, String segmentId) 
{
    +    List<CarbonFile> indexDirs = new ArrayList<>();
    +    List<TableDataMap> dataMaps;
    +    try {
    +      // there can be multiple bloom datamaps present on a table, so get 
all datamaps and form
    +      // the path till the index file directories in all datamaps folders 
present in each segment
    +      dataMaps = 
DataMapStoreManager.getInstance().getAllDataMap(getCarbonTable());
    +    } catch (IOException ex) {
    +      LOGGER.error(String.format(
    +          "failed to get datamaps for tablePath %s, segmentId %s", 
tablePath, segmentId), ex);
    +      throw new RuntimeException(ex);
    +    }
    +    if (dataMaps.size() > 0) {
    +      for (TableDataMap dataMap : dataMaps) {
    +        if 
(dataMap.getDataMapSchema().getDataMapName().equals(this.dataMapName)) {
    +          List<CarbonFile> indexFiles;
    +          String dmPath = CarbonTablePath.getDataMapStorePath(tablePath, 
segmentId,
    +              dataMap.getDataMapSchema().getDataMapName());
    +          final CarbonFile dirPath = FileFactory.getCarbonFile(dmPath);
    +          indexFiles = Arrays.asList(dirPath.listFiles(new 
CarbonFileFilter() {
    +            @Override
    +            public boolean accept(CarbonFile file) {
    +              return file.isDirectory();
    +            }
    +          }));
    +          indexDirs.addAll(indexFiles);
    +        }
    +      }
    +    }
    +    return indexDirs.toArray(new CarbonFile[0]);
    +  }
    +
    +  @Override
    +  public List<DataMapDistributable> toDistributable(Segment segment) {
    +    List<DataMapDistributable> dataMapDistributableList = new 
ArrayList<>();
    +    CarbonFile[] indexDirs =
    +        getAllIndexDirs(getCarbonTable().getTablePath(), 
segment.getSegmentNo());
    +    if (segment.getFilteredIndexShardNames().size() == 0) {
    +      for (CarbonFile indexDir : indexDirs) {
    +        DataMapDistributable bloomDataMapDistributable =
    +            new MinMaxDataMapDistributable(indexDir.getAbsolutePath());
    +        dataMapDistributableList.add(bloomDataMapDistributable);
    +      }
    +      return dataMapDistributableList;
    +    }
    +    for (CarbonFile indexDir : indexDirs) {
    +      // Filter out the tasks which are filtered through CG datamap.
    +      if 
(!segment.getFilteredIndexShardNames().contains(indexDir.getName())) {
    +        continue;
    +      }
    +      DataMapDistributable bloomDataMapDistributable =
    +          new MinMaxDataMapDistributable(indexDir.getAbsolutePath());
    +      dataMapDistributableList.add(bloomDataMapDistributable);
    +    }
    +    return dataMapDistributableList;
    +  }
    +
    +  @Override
    +  public void fireEvent(Event event) {
    +
    +  }
    +
    +  @Override
    +  public void clear(Segment segment) {
    +    Set<String> shards = segmentMap.remove(segment.getSegmentNo());
    +    if (null != shards) {
    +      for (String shard : shards) {
    +        cache.invalidate(new MinMaxDataMapCacheKeyValue.Key(shard));
    +      }
    +    }
    +  }
    +
    +  @Override
    +  public synchronized void clear() {
    +    if (segmentMap.size() > 0) {
    +      List<String> segments = new ArrayList<>(segmentMap.keySet());
    +      for (String segmentId : segments) {
    +        clear(new Segment(segmentId, null, null));
    +      }
    +    }
    +  }
    +
    +  @Override
    +  public void deleteDatamapData(Segment segment) throws IOException {
    +    try {
    +      String segmentId = segment.getSegmentNo();
    +      String datamapPath = CarbonTablePath
    +          .getDataMapStorePath(getCarbonTable().getTablePath(), segmentId, 
dataMapName);
    +      if (FileFactory.isFileExist(datamapPath)) {
    +        CarbonFile file = FileFactory.getCarbonFile(datamapPath);
    +        CarbonUtil.deleteFoldersAndFilesSilent(file);
    +      }
    +    } catch (InterruptedException ex) {
    +      throw new IOException("Failed to delete datamap for segment_" + 
segment.getSegmentNo());
    +    }
    +  }
    +
    +  @Override
    +  public void deleteDatamapData() {
    +    SegmentStatusManager ssm =
    +        new 
SegmentStatusManager(getCarbonTable().getAbsoluteTableIdentifier());
    +    try {
    +      List<Segment> validSegments = 
ssm.getValidAndInvalidSegments().getValidSegments();
    +      for (Segment segment : validSegments) {
    +        deleteDatamapData(segment);
    +      }
    +    } catch (IOException e) {
    +      LOGGER.error("drop datamap failed, failed to delete datamap 
directory");
    +    }
    +  }
    +
    +  @Override
    +  public boolean willBecomeStale(TableOperation operation) {
    +    switch (operation) {
    +      case ALTER_DROP:
    +      case ALTER_CHANGE_DATATYPE:
    +      case DELETE:
    +      case UPDATE:
    +      case PARTITION:
    +        return true;
    +      default:
    +        return false;
    +    }
    +  }
    +
    +  @Override
    +  public boolean isOperationBlocked(TableOperation operation, Object... 
targets) {
    +    switch (operation) {
    +      case ALTER_DROP: {
    --- End diff --
    
    `{` seems not necessary, to avoid using break?


---

Reply via email to