LENS-1386 : Add support for separate tables for update periods in one storage
Project: http://git-wip-us.apache.org/repos/asf/lens/repo Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/f0dadd79 Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/f0dadd79 Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/f0dadd79 Branch: refs/heads/lens-1381 Commit: f0dadd79bb626fe6f8bbf21569e3062aeb9be070 Parents: 0cd22b1 Author: Lavkesh Lahngir <lavk...@linux.com> Authored: Mon Feb 20 15:08:40 2017 +0530 Committer: Amareshwari Sriramadasu <amareshw...@apache.org> Committed: Mon Feb 20 15:08:40 2017 +0530 ---------------------------------------------------------------------- lens-api/src/main/resources/cube-0.1.xsd | 28 +- .../lens/cube/metadata/CubeFactTable.java | 68 +++- .../lens/cube/metadata/CubeMetastoreClient.java | 339 +++++++++++-------- .../lens/cube/metadata/MetastoreUtil.java | 6 + .../org/apache/lens/cube/metadata/Storage.java | 30 +- .../cube/metadata/TestCubeMetastoreClient.java | 151 ++++++++- .../metastore/CubeMetastoreServiceImpl.java | 182 ++++++---- .../apache/lens/server/metastore/JAXBUtils.java | 66 +++- .../server/metastore/TestMetastoreService.java | 186 +++++++++- 9 files changed, 811 insertions(+), 245 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/lens/blob/f0dadd79/lens-api/src/main/resources/cube-0.1.xsd ---------------------------------------------------------------------- diff --git a/lens-api/src/main/resources/cube-0.1.xsd b/lens-api/src/main/resources/cube-0.1.xsd index f438f48..431d68b 100644 --- a/lens-api/src/main/resources/cube-0.1.xsd +++ b/lens-api/src/main/resources/cube-0.1.xsd @@ -681,8 +681,27 @@ </xs:complexType> <xs:complexType name="x_update_periods"> - <xs:sequence> + <xs:annotation> + <xs:documentation> + A list of update_period which contains either update period table descriptor or list of update_peroid enum. + </xs:documentation> + </xs:annotation> + <xs:choice maxOccurs="1" minOccurs="0"> + <xs:element name="update_period_table_descriptor" type="x_update_period_table_descriptor" maxOccurs="unbounded" + minOccurs="0"/> <xs:element name="update_period" type="x_update_period" maxOccurs="unbounded" minOccurs="0"/> + </xs:choice> + </xs:complexType> + + <xs:complexType name="x_update_period_table_descriptor"> + <xs:annotation> + <xs:documentation> + An update period descriptor keeps an enum of update period and a storage table descriptor. + </xs:documentation> + </xs:annotation> + <xs:sequence> + <xs:element name="update_period" type="x_update_period" maxOccurs="1" minOccurs="1"/> + <xs:element name="table_desc" type="x_storage_table_desc" maxOccurs="1" minOccurs="1"/> </xs:sequence> </xs:complexType> @@ -1001,13 +1020,14 @@ <xs:complexType name="x_storage_table_element"> <xs:annotation> <xs:documentation> - Storage and storage table description and update periods + Storage and storage table description and update periods. table_desc is invalid when update_periods has a list + of update_period_table_descriptor instead of a list of enums. </xs:documentation> </xs:annotation> <xs:sequence> - <xs:element name="update_periods" type="x_update_periods" maxOccurs="1" minOccurs="0"/> + <xs:element name="update_periods" type="x_update_periods" maxOccurs="1" minOccurs="1"/> <xs:element name="storage_name" type="xs:string"/> - <xs:element type="x_storage_table_desc" name="table_desc"/> + <xs:element type="x_storage_table_desc" name="table_desc" maxOccurs="1" minOccurs="0"/> </xs:sequence> </xs:complexType> http://git-wip-us.apache.org/repos/asf/lens/blob/f0dadd79/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeFactTable.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeFactTable.java b/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeFactTable.java index adb6c92..896a7a1 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeFactTable.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeFactTable.java @@ -29,10 +29,14 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.metadata.Table; import com.google.common.collect.Lists; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; @Slf4j public class CubeFactTable extends AbstractCubeTable { + @Getter + // Map<StorageName, Map<update_period, storage_table_prefix>> + private final Map<String, Map<UpdatePeriod, String>> storagePrefixUpdatePeriodMap; private String cubeName; private final Map<String, Set<UpdatePeriod>> storageUpdatePeriods; @@ -40,8 +44,10 @@ public class CubeFactTable extends AbstractCubeTable { super(hiveTable); this.storageUpdatePeriods = getUpdatePeriods(getName(), getProperties()); this.cubeName = getCubeName(getName(), getProperties()); + this.storagePrefixUpdatePeriodMap = getUpdatePeriodMap(getName(), getProperties()); } + public CubeFactTable(String cubeName, String factName, List<FieldSchema> columns, Map<String, Set<UpdatePeriod>> storageUpdatePeriods) { this(cubeName, factName, columns, storageUpdatePeriods, 0L, new HashMap<String, String>()); @@ -54,9 +60,18 @@ public class CubeFactTable extends AbstractCubeTable { public CubeFactTable(String cubeName, String factName, List<FieldSchema> columns, Map<String, Set<UpdatePeriod>> storageUpdatePeriods, double weight, Map<String, String> properties) { + this(cubeName, factName, columns, storageUpdatePeriods, weight, properties, + new HashMap<String, Map<UpdatePeriod, String>>()); + + } + + public CubeFactTable(String cubeName, String factName, List<FieldSchema> columns, + Map<String, Set<UpdatePeriod>> storageUpdatePeriods, double weight, Map<String, String> properties, + Map<String, Map<UpdatePeriod, String>> storagePrefixUpdatePeriodMap) { super(factName, columns, properties, weight); this.cubeName = cubeName; this.storageUpdatePeriods = storageUpdatePeriods; + this.storagePrefixUpdatePeriodMap = storagePrefixUpdatePeriodMap; addProperties(); } @@ -65,6 +80,18 @@ public class CubeFactTable extends AbstractCubeTable { super.addProperties(); addCubeNames(getName(), getProperties(), cubeName); addUpdatePeriodProperies(getName(), getProperties(), storageUpdatePeriods); + addStorageTableProperties(getName(), getProperties(), storagePrefixUpdatePeriodMap); + } + + private void addStorageTableProperties(String name, Map<String, String> properties, + Map<String, Map<UpdatePeriod, String>> storageUpdatePeriodMap) { + for (String storageName : storageUpdatePeriodMap.keySet()) { + String prefix = MetastoreUtil.getFactKeyPrefix(name) + "." + storageName; + for (Map.Entry updatePeriodEntry : storageUpdatePeriodMap.get(storageName).entrySet()) { + String updatePeriod = ((UpdatePeriod) updatePeriodEntry.getKey()).getName(); + properties.put(prefix + "." + updatePeriod, (String) updatePeriodEntry.getValue()); + } + } } private static void addUpdatePeriodProperies(String name, Map<String, String> props, @@ -82,7 +109,29 @@ public class CubeFactTable extends AbstractCubeTable { props.put(MetastoreUtil.getFactCubeNameKey(factName), cubeName); } - private static Map<String, Set<UpdatePeriod>> getUpdatePeriods(String name, Map<String, String> props) { + private Map<String, Map<UpdatePeriod, String>> getUpdatePeriodMap(String factName, Map<String, String> props) { + Map<String, Map<UpdatePeriod, String>> ret = new HashMap<>(); + for (Map.Entry entry : storageUpdatePeriods.entrySet()) { + String storage = (String) entry.getKey(); + for (UpdatePeriod period : (Set<UpdatePeriod>) entry.getValue()) { + String storagePrefixKey = MetastoreUtil + .getUpdatePeriodStoragePrefixKey(factName.trim(), storage, period.getName()); + String storageTableNamePrefix = props.get(storagePrefixKey); + if (storageTableNamePrefix == null) { + storageTableNamePrefix = storage; + } + Map<UpdatePeriod, String> mapOfUpdatePeriods = ret.get(storage); + if (mapOfUpdatePeriods == null) { + mapOfUpdatePeriods = new HashMap<>(); + ret.put(storage, mapOfUpdatePeriods); + } + mapOfUpdatePeriods.put(period, storageTableNamePrefix); + } + } + return ret; + } + + private Map<String, Set<UpdatePeriod>> getUpdatePeriods(String name, Map<String, String> props) { Map<String, Set<UpdatePeriod>> storageUpdatePeriods = new HashMap<>(); String storagesStr = props.get(MetastoreUtil.getFactStorageListKey(name)); if (!StringUtils.isBlank(storagesStr)) { @@ -273,13 +322,16 @@ public class CubeFactTable extends AbstractCubeTable { /** * Add a storage with specified update periods - * * @param storage * @param updatePeriods + * @param updatePeriodStoragePrefix */ - void addStorage(String storage, Set<UpdatePeriod> updatePeriods) { + void addStorage(String storage, Set<UpdatePeriod> updatePeriods, + Map<UpdatePeriod, String> updatePeriodStoragePrefix) { storageUpdatePeriods.put(storage, updatePeriods); + storagePrefixUpdatePeriodMap.put(storage, updatePeriodStoragePrefix); addUpdatePeriodProperies(getName(), getProperties(), storageUpdatePeriods); + addStorageTableProperties(getName(), getProperties(), storagePrefixUpdatePeriodMap); } /** @@ -289,6 +341,12 @@ public class CubeFactTable extends AbstractCubeTable { */ void dropStorage(String storage) { storageUpdatePeriods.remove(storage); + String prefix = MetastoreUtil.getFactKeyPrefix(getName()) + "." + storage; + for (Map.Entry updatePeriodEntry : storagePrefixUpdatePeriodMap.get(storage).entrySet()) { + String updatePeriod = ((UpdatePeriod)updatePeriodEntry.getKey()).getName(); + getProperties().remove(prefix + "." + updatePeriod); + } + storagePrefixUpdatePeriodMap.remove(storage); getProperties().remove(MetastoreUtil.getFactUpdatePeriodKey(getName(), storage)); String newStorages = StringUtils.join(storageUpdatePeriods.keySet(), ","); getProperties().put(MetastoreUtil.getFactStorageListKey(getName()), newStorages); @@ -351,5 +409,7 @@ public class CubeFactTable extends AbstractCubeTable { return Collections.min(Lists.newArrayList(getRelativeEndTime(), getAbsoluteEndTime())); } - + public String getTablePrefix(String storage, UpdatePeriod updatePeriod) { + return storagePrefixUpdatePeriodMap.get(storage).get(updatePeriod); + } } http://git-wip-us.apache.org/repos/asf/lens/blob/f0dadd79/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeMetastoreClient.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeMetastoreClient.java b/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeMetastoreClient.java index 6c9cde2..087c203 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeMetastoreClient.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeMetastoreClient.java @@ -31,7 +31,7 @@ import org.apache.lens.cube.metadata.Storage.LatestInfo; import org.apache.lens.cube.metadata.Storage.LatestPartColumnInfo; import org.apache.lens.cube.metadata.timeline.PartitionTimeline; import org.apache.lens.cube.metadata.timeline.PartitionTimelineFactory; -import org.apache.lens.server.api.*; +import org.apache.lens.server.api.LensConfConstants; import org.apache.lens.server.api.error.LensException; import org.apache.lens.server.api.metastore.DataCompletenessChecker; import org.apache.lens.server.api.util.LensUtil; @@ -121,7 +121,13 @@ public class CubeMetastoreClient { if (ind <= 0) { throw new LensException("storageTable: " + storageTableName + ", does not belong to fact: " + fact.getName()); } - return storageTableName.substring(0, ind - StorageConstants.STORGAE_SEPARATOR.length()); + String name = storageTableName.substring(0, ind - StorageConstants.STORGAE_SEPARATOR.length()); + for (String storageName : fact.getStorages()) { + if (name.equalsIgnoreCase(storageName)) { + return storageName; + } + } + throw new LensException("storageTable: " + storageTableName + ", does not belong to fact: " + fact.getName()); } /** @@ -169,11 +175,11 @@ public class CubeMetastoreClient { UpdatePeriod updatePeriod = updatePeriodStr == null ? null : UpdatePeriod.valueOf(updatePeriodStr.toUpperCase()); List<PartitionTimeline> ret = Lists.newArrayList(); CubeFactTable fact = getCubeFact(factName); - List<String> keys = Lists.newArrayList(); + List<String> storageList = Lists.newArrayList(); if (storage != null) { - keys.add(storage); + storageList.add(storage); } else { - keys.addAll(fact.getStorages()); + storageList.addAll(fact.getStorages()); } String partCol = null; if (timeDimension != null) { @@ -186,9 +192,9 @@ public class CubeMetastoreClient { } partCol = baseCube.getPartitionColumnOfTimeDim(timeDimension); } - for (String key : keys) { + for (String storageName : storageList) { for (Map.Entry<UpdatePeriod, CaseInsensitiveStringHashMap<PartitionTimeline>> entry : partitionTimelineCache - .get(factName, key).entrySet()) { + .get(factName, storageName).entrySet()) { if (updatePeriod == null || entry.getKey().equals(updatePeriod)) { for (Map.Entry<String, PartitionTimeline> entry1 : entry.getValue().entrySet()) { if (partCol == null || partCol.equals(entry1.getKey())) { @@ -201,25 +207,30 @@ public class CubeMetastoreClient { return ret; } - public void updatePartition(String fact, String storageName, Partition partition) + public void updatePartition(String fact, String storageName, Partition partition, UpdatePeriod updatePeriod) throws HiveException, InvalidOperationException, LensException { - updatePartitions(fact, storageName, Collections.singletonList(partition)); + Map<UpdatePeriod, List<Partition>> updatePeriodListMap = new HashMap<>(); + updatePeriodListMap.put(updatePeriod, Collections.singletonList(partition)); + updatePartitions(fact, storageName, updatePeriodListMap); } - public void updatePartitions(String factOrDimtableName, String storageName, List<Partition> partitions) - throws HiveException, InvalidOperationException, LensException { - List<Partition> partitionsToAlter = Lists.newArrayList(); - partitionsToAlter.addAll(partitions); - partitionsToAlter.addAll(getAllLatestPartsEquivalentTo(factOrDimtableName, storageName, partitions)); - getStorage(storageName).updatePartitions(getClient(), factOrDimtableName, partitionsToAlter); + public void updatePartitions(String factOrDimtableName, String storageName, + Map<UpdatePeriod, List<Partition>> partitions) throws HiveException, InvalidOperationException, LensException { + for (Map.Entry entry : partitions.entrySet()) { + List<Partition> partitionsToAlter = Lists.newArrayList(); + partitionsToAlter.addAll((List<Partition>) entry.getValue()); + String storageTableName = getStorageTableName(factOrDimtableName, storageName, (UpdatePeriod) entry.getKey()); + partitionsToAlter.addAll( + getAllLatestPartsEquivalentTo(factOrDimtableName, storageTableName, (List<Partition>) entry.getValue())); + getStorage(storageName).updatePartitions(storageTableName, getClient(), factOrDimtableName, partitionsToAlter); + } } - private List<Partition> getAllLatestPartsEquivalentTo(String factOrDimtableName, String storageName, + private List<Partition> getAllLatestPartsEquivalentTo(String factOrDimtableName, String storageTableName, List<Partition> partitions) throws HiveException, LensException { if (isFactTable(factOrDimtableName)) { return Lists.newArrayList(); } - String storageTableName = getFactOrDimtableStorageTableName(factOrDimtableName, storageName); Table storageTable = getTable(storageTableName); List<String> timePartCols = getTimePartColNamesOfTable(storageTable); List<Partition> latestParts = Lists.newArrayList(); @@ -279,6 +290,17 @@ public class CubeMetastoreClient { } } + public void createCubeFactTable(String cubeName, String factName, List<FieldSchema> columns, + Map<String, Set<UpdatePeriod>> storageAggregatePeriods, double weight, Map<String, String> properties, + Map<String, StorageTableDesc> storageTableDescs, Map<String, Map<UpdatePeriod, String>> storageUpdatePeriodMap) + throws LensException { + CubeFactTable factTable = new CubeFactTable(cubeName, factName, columns, storageAggregatePeriods, weight, + properties, storageUpdatePeriodMap); + createCubeTable(factTable, storageTableDescs); + // do a get to update cache + getCubeFact(factName); + + } /** * In-memory storage of {@link PartitionTimeline} objects for each valid @@ -327,48 +349,75 @@ public class CubeMetastoreClient { public TreeMap<UpdatePeriod, CaseInsensitiveStringHashMap<PartitionTimeline>> get(String fact, String storage) throws HiveException, LensException { // SUSPEND CHECKSTYLE CHECK DoubleCheckedLockingCheck - String storageTableName = getStorageTableName(fact, Storage.getPrefix(storage)); - if (get(storageTableName) == null) { - synchronized (this) { - if (get(storageTableName) == null) { - Table storageTable = getTable(storageTableName); - if ("true".equalsIgnoreCase(storageTable.getParameters().get(getPartitionTimelineCachePresenceKey()))) { - try { - loadTimelinesFromTableProperties(fact, storage); - } catch (Exception e) { - // Ideally this should never come. But since we have another source, - // let's piggyback on that for loading timeline - log.error("Error while loading timelines from table properties.", e); - loadTimelinesFromAllPartitions(fact, storage); - } - } else { - loadTimelinesFromAllPartitions(fact, storage); + // Unique key for the timeline cache, based on storageName and fact. + String timeLineKey = (Storage.getPrefix(storage)+ fact).toLowerCase(); + synchronized (this) { + if (get(timeLineKey) == null) { + loadTimeLines(fact, storage, timeLineKey); + } + log.info("timeline for {} is: {}", storage, get(timeLineKey)); + // return the final value from memory + return get(timeLineKey); + // RESUME CHECKSTYLE CHECK DoubleCheckedLockingCheck + } + } + + /** + * @param fact + * @param storage + */ + private void loadTimeLines(String fact, String storage, String timeLineKey) throws LensException, HiveException { + Set<String> uniqueStorageTables = new HashSet<>(); + Map<UpdatePeriod, String> updatePeriodTableName = new HashMap<>(); + for (UpdatePeriod updatePeriod : getCubeFact(fact).getUpdatePeriods().get(storage)) { + String storageTableName = getStorageTableName(fact, storage, updatePeriod); + updatePeriodTableName.put(updatePeriod, storageTableName); + Table storageTable = getTable(storageTableName); + if ("true".equalsIgnoreCase(storageTable.getParameters().get(getPartitionTimelineCachePresenceKey()))) { + try { + loadTimelinesFromTableProperties(updatePeriod, storageTableName, timeLineKey); + } catch (Exception e) { + // Ideally this should never come. But since we have another source, + // let's piggyback on that for loading timeline + log.error("Error while loading timelines from table properties.", e); + ensureEntryForTimeLineKey(fact, storage, updatePeriod, storageTableName, timeLineKey); + if (!uniqueStorageTables.contains(storageTableName)) { + uniqueStorageTables.add(storageTableName); + loadTimelinesFromAllPartitions(storageTableName, timeLineKey); } } + } else { + ensureEntryForTimeLineKey(fact, storage, updatePeriod, storageTableName, timeLineKey); + if (!uniqueStorageTables.contains(storageTableName)) { + uniqueStorageTables.add(storageTableName); + loadTimelinesFromAllPartitions(storageTableName, timeLineKey); + } } - log.info("timeline for {} is: {}", storageTableName, get(storageTableName)); } - // return the final value from memory - return get(storageTableName); - // RESUME CHECKSTYLE CHECK DoubleCheckedLockingCheck + for (Map.Entry entry : updatePeriodTableName.entrySet()) { + alterTablePartitionCache(timeLineKey, (UpdatePeriod) entry.getKey(), (String) entry.getValue()); + } } - private void loadTimelinesFromAllPartitions(String fact, String storage) throws HiveException, LensException { + private void ensureEntryForTimeLineKey(String fact, String storage, UpdatePeriod updatePeriod, + String storageTableName, String timeLineKey) throws LensException { // Not found in table properties either, compute from all partitions of the fact-storage table. // First make sure all combinations of update period and partition column have an entry even // if no partitions exist - String storageTableName = getStorageTableName(fact, Storage.getPrefix(storage)); - log.info("loading from all partitions: {}", storageTableName); - Table storageTable = getTable(storageTableName); - if (getCubeFact(fact).getUpdatePeriods() != null && getCubeFact(fact).getUpdatePeriods().get( - storage) != null) { - for (UpdatePeriod updatePeriod : getCubeFact(fact).getUpdatePeriods().get(storage)) { - for (String partCol : getTimePartColNamesOfTable(storageTable)) { - ensureEntry(storageTableName, updatePeriod, partCol); - } + if (getCubeFact(fact).getUpdatePeriods() != null && getCubeFact(fact).getUpdatePeriods().get(storage) != null) { + log.info("loading from all partitions: {}", storageTableName); + Table storageTable = getTable(storageTableName); + for (String partCol : getTimePartColNamesOfTable(storageTable)) { + ensureEntry(timeLineKey, storageTableName, updatePeriod, partCol); } } + + } + + private void loadTimelinesFromAllPartitions(String storageTableName, String timeLineKey) + throws HiveException, LensException { // Then add all existing partitions for batch addition in respective timelines. + Table storageTable = getTable(storageTableName); List<String> timeParts = getTimePartColNamesOfTable(storageTable); List<FieldSchema> partCols = storageTable.getPartCols(); for (Partition partition : getPartitionsByFilter(storageTableName, null)) { @@ -382,23 +431,17 @@ public class CubeMetastoreClient { } for (int i = 0; i < partCols.size(); i++) { if (timeParts.contains(partCols.get(i).getName())) { - addForBatchAddition(storageTableName, period, partCols.get(i).getName(), values.get(i)); + addForBatchAddition(timeLineKey, storageTableName, period, partCols.get(i).getName(), values.get(i)); } } } - // commit all batch addition for the storage table, - // which will in-turn commit all batch additions in all it's timelines. - commitAllBatchAdditions(storageTableName); } - private void loadTimelinesFromTableProperties(String fact, String storage) throws HiveException, LensException { - // found in table properties, load from there. - String storageTableName = getStorageTableName(fact, Storage.getPrefix(storage)); + private void loadTimelinesFromTableProperties(UpdatePeriod updatePeriod, + String storageTableName, String timeLineKey) throws HiveException, LensException { log.info("loading from table properties: {}", storageTableName); - for (UpdatePeriod updatePeriod : getCubeFact(fact).getUpdatePeriods().get(storage)) { - for (String partCol : getTimePartColNamesOfTable(storageTableName)) { - ensureEntry(storageTableName, updatePeriod, partCol).init(getTable(storageTableName)); - } + for (String partCol : getTimePartColNamesOfTable(storageTableName)) { + ensureEntry(timeLineKey, storageTableName, updatePeriod, partCol).init(getTable(storageTableName)); } } @@ -406,16 +449,17 @@ public class CubeMetastoreClient { * Adds given partition(for storageTable, updatePeriod, partitionColum=partition) for batch addition in an * appropriate timeline object. Ignore if partition is not valid. * - * @param storageTable storage table + * @param timeLineKey key for the timeLine map + * @param storageTableName hive table name * @param updatePeriod update period * @param partitionColumn partition column * @param partition partition */ - public void addForBatchAddition(String storageTable, UpdatePeriod updatePeriod, String partitionColumn, - String partition) { + public void addForBatchAddition(String timeLineKey, String storageTableName, UpdatePeriod updatePeriod, + String partitionColumn, String partition) { try { - ensureEntry(storageTable, updatePeriod, partitionColumn).addForBatchAddition(TimePartition.of(updatePeriod, - partition)); + ensureEntry(timeLineKey, storageTableName, updatePeriod, partitionColumn) + .addForBatchAddition(TimePartition.of(updatePeriod, partition)); } catch (LensException e) { // to take care of the case where partition name is something like `latest` log.error("Couldn't parse partition: {} with update period: {}, skipping.", partition, updatePeriod, e); @@ -427,42 +471,24 @@ public class CubeMetastoreClient { * <p></p> * kind of like mkdir -p * - * @param storageTable storage table + * @param timeLineKey storage table * @param updatePeriod update period * @param partitionColumn partition column * @return timeline if already exists, or puts a new timeline and returns. */ - public PartitionTimeline ensureEntry(String storageTable, UpdatePeriod updatePeriod, String partitionColumn) { - if (get(storageTable) == null) { - put(storageTable, new TreeMap<UpdatePeriod, CaseInsensitiveStringHashMap<PartitionTimeline>>()); + public PartitionTimeline ensureEntry(String timeLineKey, String storagTableName, UpdatePeriod updatePeriod, + String partitionColumn) { + if (get(timeLineKey) == null) { + put(timeLineKey, new TreeMap<UpdatePeriod, CaseInsensitiveStringHashMap<PartitionTimeline>>()); } - if (get(storageTable).get(updatePeriod) == null) { - get(storageTable).put(updatePeriod, new CaseInsensitiveStringHashMap<PartitionTimeline>()); + if (get(timeLineKey).get(updatePeriod) == null) { + get(timeLineKey).put(updatePeriod, new CaseInsensitiveStringHashMap<PartitionTimeline>()); } - if (get(storageTable).get(updatePeriod).get(partitionColumn) == null) { - get(storageTable).get(updatePeriod).put(partitionColumn, PartitionTimelineFactory.get( - CubeMetastoreClient.this, storageTable, updatePeriod, partitionColumn)); - } - return get(storageTable).get(updatePeriod).get(partitionColumn); - } - - /** - * commit all batch addition for all its timelines. - * - * @param storageTable storage table - * @throws HiveException - * @throws LensException - */ - public void commitAllBatchAdditions(String storageTable) throws HiveException, LensException { - if (get(storageTable) != null) { - for (UpdatePeriod updatePeriod : get(storageTable).keySet()) { - for (String partCol : get(storageTable).get(updatePeriod).keySet()) { - PartitionTimeline timeline = get(storageTable).get(updatePeriod).get(partCol); - timeline.commitBatchAdditions(); - } - } - alterTablePartitionCache(storageTable); + if (get(timeLineKey).get(updatePeriod).get(partitionColumn) == null) { + get(timeLineKey).get(updatePeriod).put(partitionColumn, PartitionTimelineFactory.get( + CubeMetastoreClient.this, storagTableName, updatePeriod, partitionColumn)); } + return get(timeLineKey).get(updatePeriod).get(partitionColumn); } /** check partition existence in the appropriate timeline if it exists */ @@ -478,9 +504,11 @@ public class CubeMetastoreClient { */ public PartitionTimeline get(String fact, String storage, UpdatePeriod updatePeriod, String partCol) throws HiveException, LensException { - return get(fact, storage) != null && get(fact, storage).get(updatePeriod) != null && get(fact, storage).get( - updatePeriod).get(partCol) != null ? get(fact, storage).get(updatePeriod).get(partCol) : null; + return get(fact, storage) != null && get(fact, storage).get(updatePeriod) != null + && get(fact, storage).get(updatePeriod).get(partCol) != null ? get(fact, storage).get(updatePeriod) + .get(partCol) : null; } + /** * returns the timeline corresponding to fact-storage table, updatePeriod, partCol. throws exception if not * exists, which would most probably mean the combination is incorrect. @@ -489,8 +517,8 @@ public class CubeMetastoreClient { throws HiveException, LensException { PartitionTimeline timeline = get(fact, storage, updatePeriod, partCol); if (timeline == null) { - throw new LensException(LensCubeErrorCode.TIMELINE_ABSENT.getLensErrorInfo(), - fact, storage, updatePeriod, partCol); + throw new LensException(LensCubeErrorCode.TIMELINE_ABSENT.getLensErrorInfo(), fact, storage, updatePeriod, + partCol); } return timeline; } @@ -519,8 +547,8 @@ public class CubeMetastoreClient { boolean updated = false; for (Map.Entry<String, Date> entry : timePartSpec.entrySet()) { TimePartition part = TimePartition.of(updatePeriod, entry.getValue()); - if (!partitionExistsByFilter(cubeTableName, storageName, StorageConstants.getPartFilter(entry.getKey(), - part.getDateString()))) { + if (!partitionExistsByFilter(cubeTableName, storageName, updatePeriod, + StorageConstants.getPartFilter(entry.getKey(), part.getDateString()))) { get(cubeTableName, storageName, updatePeriod, entry.getKey()).drop(part); updated = true; } @@ -565,10 +593,10 @@ public class CubeMetastoreClient { Hive.closeCurrent(); } - private void createOrAlterStorageHiveTable(Table parent, String storage, StorageTableDesc crtTblDesc) + private void createOrAlterStorageHiveTable(Table parent, String storageTableNamePrefix, StorageTableDesc crtTblDesc) throws LensException { try { - Table tbl = getStorage(storage).getStorageTable(getClient(), parent, crtTblDesc); + Table tbl = Storage.getStorageTable(storageTableNamePrefix, getClient(), parent, crtTblDesc); if (tableExists(tbl.getTableName())) { // alter table alterHiveTable(tbl.getTableName(), tbl); @@ -730,7 +758,7 @@ public class CubeMetastoreClient { * @param storageAggregatePeriods Aggregate periods for the storages * @param weight Weight of the cube * @param properties Properties of fact table - * @param storageTableDescs Map of storage to its storage table description + * @param storageTableDescs Map of storage table prefix to its storage table description * @throws LensException */ public void createCubeFactTable(String cubeName, String factName, List<FieldSchema> columns, @@ -808,7 +836,7 @@ public class CubeMetastoreClient { * Create cube table defined and create all the corresponding storage tables * * @param cubeTable Can be fact or dimension table - * @param storageTableDescs Map of storage to its storage table description + * @param storageTableDescs Map of storage tableName prefix to its storage table description * @throws LensException */ public void createCubeTable(AbstractCubeTable cubeTable, Map<String, StorageTableDesc> storageTableDescs) @@ -836,14 +864,17 @@ public class CubeMetastoreClient { * @param fact The CubeFactTable * @param storage The storage * @param updatePeriods Update periods of the fact on the storage - * @param storageTableDesc The storage table description + * @param storageTableDescs The storage table description * @throws LensException */ public void addStorage(CubeFactTable fact, String storage, Set<UpdatePeriod> updatePeriods, - StorageTableDesc storageTableDesc) throws LensException { - fact.addStorage(storage, updatePeriods); - createOrAlterStorageHiveTable(getTableWithTypeFailFast(fact.getName(), CubeTableType.FACT), - storage, storageTableDesc); + Map<String, StorageTableDesc> storageTableDescs, Map<UpdatePeriod, String> updatePeriodStoragePrefix) + throws LensException { + fact.addStorage(storage, updatePeriods, updatePeriodStoragePrefix); + for (Map.Entry entry : storageTableDescs.entrySet()) { + createOrAlterStorageHiveTable(getTableWithTypeFailFast(fact.getName(), CubeTableType.FACT), + (String) entry.getKey(), (StorageTableDesc) entry.getValue()); + } alterCubeTable(fact.getName(), getTableWithTypeFailFast(fact.getName(), CubeTableType.FACT), fact); updateFactCache(fact.getName()); } @@ -860,8 +891,8 @@ public class CubeMetastoreClient { public void addStorage(CubeDimensionTable dim, String storage, UpdatePeriod dumpPeriod, StorageTableDesc storageTableDesc) throws LensException { dim.alterSnapshotDumpPeriod(storage, dumpPeriod); - createOrAlterStorageHiveTable(getTableWithTypeFailFast(dim.getName(), CubeTableType.DIM_TABLE), - storage, storageTableDesc); + createOrAlterStorageHiveTable(getTableWithTypeFailFast(dim.getName(), CubeTableType.DIM_TABLE), storage, + storageTableDesc); alterCubeTable(dim.getName(), getTableWithTypeFailFast(dim.getName(), CubeTableType.DIM_TABLE), dim); updateDimCache(dim.getName()); } @@ -896,10 +927,19 @@ public class CubeMetastoreClient { return partsAdded; } + /** + * @param factOrDimTable + * @param storageName + * @param updatePeriod + * @param storagePartitionDescs + * @param type + * @return + * @throws HiveException + * @throws LensException + */ private List<Partition> addPartitions(String factOrDimTable, String storageName, UpdatePeriod updatePeriod, List<StoragePartitionDesc> storagePartitionDescs, CubeTableType type) throws HiveException, LensException { - String storageTableName = getStorageTableName(factOrDimTable.trim(), - Storage.getPrefix(storageName.trim())).toLowerCase(); + String storageTableName = getStorageTableName(factOrDimTable, storageName, updatePeriod); if (type == CubeTableType.DIM_TABLE) { // Adding partition in dimension table. Map<Map<String, String>, LatestInfo> latestInfos = Maps.newHashMap(); @@ -910,7 +950,7 @@ public class CubeMetastoreClient { } List<Partition> partsAdded = getStorage(storageName).addPartitions(getClient(), factOrDimTable, updatePeriod, storagePartitionDescs, - latestInfos); + latestInfos, storageTableName); ListIterator<Partition> iter = partsAdded.listIterator(); while (iter.hasNext()) { if (iter.next().getSpec().values().contains(StorageConstants.LATEST_PARTITION_VALUE)) { @@ -928,10 +968,11 @@ public class CubeMetastoreClient { // Adding partition in fact table. if (storagePartitionDescs.size() > 0) { partsAdded = getStorage(storageName).addPartitions(getClient(), factOrDimTable, updatePeriod, - storagePartitionDescs, null); + storagePartitionDescs, null, storageTableName); } // update hive table - alterTablePartitionCache(getStorageTableName(factOrDimTable, Storage.getPrefix(storageName))); + alterTablePartitionCache((Storage.getPrefix(storageName) + factOrDimTable).toLowerCase(), updatePeriod, + storageTableName); return partsAdded; } else { throw new LensException("Can't add partitions to anything other than fact or dimtable"); @@ -1018,20 +1059,20 @@ public class CubeMetastoreClient { } /** - * store back all timelines of given storage table to table properties + * store back all timelines of given storage to table properties * - * @param storageTableName storage table name + * @param timeLineKey key for the time line + * @param storageTableName Storage table name * @throws HiveException */ - private void alterTablePartitionCache(String storageTableName) throws HiveException, LensException { + private void alterTablePartitionCache(String timeLineKey, UpdatePeriod updatePeriod, String storageTableName) + throws HiveException, LensException { Table table = getTable(storageTableName); Map<String, String> params = table.getParameters(); - if (partitionTimelineCache.get(storageTableName) != null) { - for (UpdatePeriod updatePeriod : partitionTimelineCache.get(storageTableName).keySet()) { - for (Map.Entry<String, PartitionTimeline> entry : partitionTimelineCache.get(storageTableName) - .get(updatePeriod).entrySet()) { - entry.getValue().updateTableParams(table); - } + if (partitionTimelineCache.get(timeLineKey) != null) { + for (Map.Entry<String, PartitionTimeline> entry : partitionTimelineCache.get(timeLineKey).get(updatePeriod) + .entrySet()) { + entry.getValue().updateTableParams(table); } params.put(getPartitionTimelineCachePresenceKey(), "true"); alterHiveTable(storageTableName, table); @@ -1173,8 +1214,7 @@ public class CubeMetastoreClient { */ public void dropPartition(String cubeTableName, String storageName, Map<String, Date> timePartSpec, Map<String, String> nonTimePartSpec, UpdatePeriod updatePeriod) throws HiveException, LensException { - String storageTableName = getStorageTableName(cubeTableName.trim(), - Storage.getPrefix(storageName.trim())).toLowerCase(); + String storageTableName = getStorageTableName(cubeTableName.trim(), storageName, updatePeriod); Table hiveTable = getHiveTable(storageTableName); List<FieldSchema> partCols = hiveTable.getPartCols(); List<String> partColNames = new ArrayList<>(partCols.size()); @@ -1244,7 +1284,8 @@ public class CubeMetastoreClient { // dropping fact partition getStorage(storageName).dropPartition(getClient(), storageTableName, partVals, null, null); if (partitionTimelineCache.updateForDeletion(cubeTableName, storageName, updatePeriod, timePartSpec)) { - this.alterTablePartitionCache(storageTableName); + this.alterTablePartitionCache((Storage.getPrefix(storageName) + cubeTableName).toLowerCase(), updatePeriod, + storageTableName); } } } @@ -1277,7 +1318,7 @@ public class CubeMetastoreClient { public boolean factPartitionExists(String factName, String storageName, UpdatePeriod updatePeriod, Map<String, Date> partitionTimestamp, Map<String, String> partSpec) throws HiveException, LensException { - String storageTableName = getFactOrDimtableStorageTableName(factName, storageName); + String storageTableName = getStorageTableName(factName, storageName, updatePeriod); return partitionExists(storageTableName, updatePeriod, partitionTimestamp, partSpec); } @@ -1286,9 +1327,9 @@ public class CubeMetastoreClient { return partitionExists(storageTableName, getPartitionSpec(updatePeriod, partitionTimestamps)); } - public boolean partitionExistsByFilter(String cubeTableName, String storageName, String filter) throws LensException { - return partitionExistsByFilter(getStorageTableName(cubeTableName, Storage.getPrefix(storageName)), - filter); + public boolean partitionExistsByFilter(String cubeTableName, String storageName, UpdatePeriod updatePeriod, + String filter) throws LensException { + return partitionExistsByFilter(getStorageTableName(cubeTableName, storageName, updatePeriod), filter); } public boolean partitionExistsByFilter(String storageTableName, String filter) throws LensException { @@ -1354,7 +1395,7 @@ public class CubeMetastoreClient { boolean latestPartitionExists(String factOrDimTblName, String storageName, String latestPartCol) throws HiveException, LensException { - String storageTableName = getStorageTableName(factOrDimTblName, Storage.getPrefix(storageName)); + String storageTableName = MetastoreUtil.getStorageTableName(factOrDimTblName, Storage.getPrefix(storageName)); if (isDimensionTable(factOrDimTblName)) { return dimTableLatestPartitionExists(storageTableName); } else { @@ -2225,18 +2266,30 @@ public class CubeMetastoreClient { */ public void dropStorageFromFact(String factName, String storage) throws LensException { CubeFactTable cft = getFactTable(factName); + dropHiveTablesForStorage(factName, storage); cft.dropStorage(storage); - dropHiveTable(getFactOrDimtableStorageTableName(factName, storage)); alterCubeTable(factName, getTableWithTypeFailFast(factName, CubeTableType.FACT), cft); updateFactCache(factName); } + private void dropHiveTablesForStorage(String factName, String storage) throws LensException{ + CubeFactTable cft = getFactTable(factName); + Set<String> droppedTables = new HashSet<>(); + for (Map.Entry updatePeriodEntry : cft.getStoragePrefixUpdatePeriodMap().get(storage).entrySet()) { + UpdatePeriod updatePeriod = (UpdatePeriod) updatePeriodEntry.getKey(); + String storageTableName = getStorageTableName(factName, storage, updatePeriod); + if (!droppedTables.contains(storageTableName)) { + dropHiveTable(storageTableName); + } + droppedTables.add(storageTableName); + } + } // updateFact will be false when fact is fully dropped private void dropStorageFromFact(String factName, String storage, boolean updateFact) throws LensException { - CubeFactTable cft = getFactTable(factName); - dropHiveTable(getFactOrDimtableStorageTableName(factName, storage)); + dropHiveTablesForStorage(factName, storage); if (updateFact) { + CubeFactTable cft = getFactTable(factName); cft.dropStorage(storage); alterCubeTable(factName, getTableWithTypeFailFast(factName, CubeTableType.FACT), cft); updateFactCache(factName); @@ -2432,4 +2485,22 @@ public class CubeMetastoreClient { Date now = new Date(); return isStorageTableCandidateForRange(storageTableName, resolveDate(fromDate, now), resolveDate(toDate, now)); } + + private String getStorageTablePrefixFromStorage(String factOrDimTableName, String storage, UpdatePeriod updatePeriod) + throws LensException { + if (updatePeriod == null) { + return storage; + } + if (isFactTable(factOrDimTableName)) { + return getFactTable(factOrDimTableName).getTablePrefix(storage, updatePeriod); + } else { + return storage; + } + } + + public String getStorageTableName(String factOrDimTableName, String storage, UpdatePeriod updatePeriod) + throws LensException { + return MetastoreUtil.getFactOrDimtableStorageTableName(factOrDimTableName, + getStorageTablePrefixFromStorage(factOrDimTableName, storage, updatePeriod)); + } } http://git-wip-us.apache.org/repos/asf/lens/blob/f0dadd79/lens-cube/src/main/java/org/apache/lens/cube/metadata/MetastoreUtil.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/metadata/MetastoreUtil.java b/lens-cube/src/main/java/org/apache/lens/cube/metadata/MetastoreUtil.java index 53cf8af..57d4502 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/metadata/MetastoreUtil.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/metadata/MetastoreUtil.java @@ -590,4 +590,10 @@ public class MetastoreUtil { } return copy; } + + public static String getUpdatePeriodStoragePrefixKey(String factTableName , String storageName, String updatePeriod) { + return MetastoreUtil.getFactKeyPrefix(factTableName) + "." + storageName + "." + updatePeriod; + } + + } http://git-wip-us.apache.org/repos/asf/lens/blob/f0dadd79/lens-cube/src/main/java/org/apache/lens/cube/metadata/Storage.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/metadata/Storage.java b/lens-cube/src/main/java/org/apache/lens/cube/metadata/Storage.java index cd9f705..936add4 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/metadata/Storage.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/metadata/Storage.java @@ -124,14 +124,18 @@ public abstract class Storage extends AbstractCubeTable implements PartitionMeta /** * Get the storage table descriptor for the given parent table. * + * @param storageTableNamePrefix Storage table prefix based on update period * @param client The metastore client * @param parent Is either Fact or Dimension table * @param crtTbl Create table info * @return Table describing the storage table * @throws HiveException */ - public Table getStorageTable(Hive client, Table parent, StorageTableDesc crtTbl) throws HiveException { - String storageTableName = MetastoreUtil.getStorageTableName(parent.getTableName(), this.getPrefix()); + public static Table getStorageTable(String storageTableNamePrefix, Hive client, Table parent, StorageTableDesc crtTbl) + throws HiveException { + // Change it to the appropriate storage table name. + String storageTableName = MetastoreUtil + .getStorageTableName(parent.getTableName(), Storage.getPrefix(storageTableNamePrefix)); Table tbl = client.getTable(storageTableName, false); if (tbl == null) { tbl = client.newTable(storageTableName); @@ -235,21 +239,6 @@ public abstract class Storage extends AbstractCubeTable implements PartitionMeta } /** - * Add single partition to storage. Just calls #addPartitions. - * @param client - * @param addPartitionDesc - * @param latestInfo - * @throws HiveException - */ - public List<Partition> addPartition(Hive client, StoragePartitionDesc addPartitionDesc, LatestInfo latestInfo) - throws HiveException { - Map<Map<String, String>, LatestInfo> latestInfos = Maps.newHashMap(); - latestInfos.put(addPartitionDesc.getNonTimePartSpec(), latestInfo); - return addPartitions(client, addPartitionDesc.getCubeTableName(), addPartitionDesc.getUpdatePeriod(), - Collections.singletonList(addPartitionDesc), latestInfos); - } - - /** * Add given partitions in the underlying hive table and update latest partition links * * @param client hive client instance @@ -262,12 +251,11 @@ public abstract class Storage extends AbstractCubeTable implements PartitionMeta */ public List<Partition> addPartitions(Hive client, String factOrDimTable, UpdatePeriod updatePeriod, List<StoragePartitionDesc> storagePartitionDescs, - Map<Map<String, String>, LatestInfo> latestInfos) throws HiveException { + Map<Map<String, String>, LatestInfo> latestInfos, String tableName) throws HiveException { preAddPartitions(storagePartitionDescs); Map<Map<String, String>, Map<String, Integer>> latestPartIndexForPartCols = Maps.newHashMap(); boolean success = false; try { - String tableName = MetastoreUtil.getStorageTableName(factOrDimTable, this.getPrefix()); String dbName = SessionState.get().getCurrentDatabase(); AddPartitionDesc addParts = new AddPartitionDesc(dbName, tableName, true); Table storageTbl = client.getTable(dbName, tableName); @@ -383,11 +371,11 @@ public abstract class Storage extends AbstractCubeTable implements PartitionMeta * @throws InvalidOperationException * @throws HiveException */ - public void updatePartitions(Hive client, String fact, List<Partition> partitions) + public void updatePartitions(String storageTable, Hive client, String fact, List<Partition> partitions) throws InvalidOperationException, HiveException { boolean success = false; try { - client.alterPartitions(MetastoreUtil.getFactOrDimtableStorageTableName(fact, getName()), partitions, null); + client.alterPartitions(storageTable, partitions, null); success = true; } finally { if (success) { http://git-wip-us.apache.org/repos/asf/lens/blob/f0dadd79/lens-cube/src/test/java/org/apache/lens/cube/metadata/TestCubeMetastoreClient.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/test/java/org/apache/lens/cube/metadata/TestCubeMetastoreClient.java b/lens-cube/src/test/java/org/apache/lens/cube/metadata/TestCubeMetastoreClient.java index e21dc2a..950534c 100644 --- a/lens-cube/src/test/java/org/apache/lens/cube/metadata/TestCubeMetastoreClient.java +++ b/lens-cube/src/test/java/org/apache/lens/cube/metadata/TestCubeMetastoreClient.java @@ -28,6 +28,7 @@ import static org.apache.lens.server.api.util.LensUtil.getHashMap; import static org.testng.Assert.*; import java.text.SimpleDateFormat; + import java.util.*; import org.apache.lens.cube.error.LensCubeErrorCode; @@ -45,7 +46,10 @@ import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; -import org.apache.hadoop.hive.ql.metadata.*; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.mapred.SequenceFileInputFormat; @@ -965,6 +969,132 @@ public class TestCubeMetastoreClient { assertTrue(client.getAllFacts(altered).isEmpty()); } + @Test(priority = 1) + public void testUpdatePeriodTableDescriptions() throws LensException, HiveException { + List<FieldSchema> factColumns = new ArrayList<>(cubeMeasures.size()); + String factName = "testFactWithUpdatePeriodTableDescriptions"; + + for (CubeMeasure measure : cubeMeasures) { + factColumns.add(measure.getColumn()); + } + // add one dimension of the cube + factColumns.add(new FieldSchema("zipcode", "int", "zip")); + FieldSchema itPart = new FieldSchema("it", "string", "date part"); + FieldSchema etPart = new FieldSchema("et", "string", "date part"); + String[] partColNames = new String[] { getDatePartitionKey(), itPart.getName(), etPart.getName() }; + + StorageTableDesc s1 = new StorageTableDesc(TextInputFormat.class, HiveIgnoreKeyTextOutputFormat.class, + Lists.newArrayList(getDatePartition(), itPart, etPart), + Lists.newArrayList(getDatePartitionKey(), itPart.getName(), etPart.getName())); + StorageTableDesc s2 = new StorageTableDesc(TextInputFormat.class, HiveIgnoreKeyTextOutputFormat.class, + Lists.newArrayList(getDatePartition(), itPart, etPart), + Lists.newArrayList(getDatePartitionKey(), itPart.getName(), etPart.getName())); + + Map<String, Set<UpdatePeriod>> updatePeriods = getHashMap(c1, hourlyAndDaily, c2, hourlyAndDaily); + Map<String, StorageTableDesc> storageTables = getHashMap(HOURLY + "_" + c1, s1, DAILY + "_" + c1, s2, c2, s2); + Map<String, Map<UpdatePeriod, String>> storageUpdatePeriodMap = getHashMap(c1, + getHashMap(HOURLY, HOURLY + "_" + c1, DAILY, DAILY + "_" + c1), c2, getHashMap(HOURLY, c2, DAILY, c2)); + + CubeFactTable cubeFact = new CubeFactTable(CUBE_NAME, factName, factColumns, updatePeriods, 0L, null, + storageUpdatePeriodMap); + client.createCubeFactTable(CUBE_NAME, factName, factColumns, updatePeriods, 0L, null, storageTables, + storageUpdatePeriodMap); + + assertTrue(client.tableExists(factName)); + Table cubeTbl = client.getHiveTable(factName); + assertTrue(client.isFactTable(cubeTbl)); + assertTrue(client.isFactTableForCube(cubeTbl, CUBE_NAME)); + + // Assert for storage tables + for (String entry : storageTables.keySet()) { + String storageTableName = getFactOrDimtableStorageTableName(factName, entry); + assertTrue(client.tableExists(storageTableName)); + } + + String c1TableNameHourly = getFactOrDimtableStorageTableName(cubeFact.getName(), HOURLY + "_" + c1); + String c2TableNameHourly = getFactOrDimtableStorageTableName(cubeFact.getName(), c2); + + Table c1TableHourly = client.getHiveTable(c1TableNameHourly); + c1TableHourly.getParameters().put(getPartitionTimelineStorageClassKey(HOURLY, getDatePartitionKey()), + StoreAllPartitionTimeline.class.getCanonicalName()); + c1TableHourly.getParameters().put(getPartitionTimelineStorageClassKey(HOURLY, itPart.getName()), + StoreAllPartitionTimeline.class.getCanonicalName()); + c1TableHourly.getParameters().put(getPartitionTimelineStorageClassKey(HOURLY, etPart.getName()), + StoreAllPartitionTimeline.class.getCanonicalName()); + client.pushHiveTable(c1TableHourly); + + Table c2TableHourly = client.getHiveTable(c2TableNameHourly); + c2TableHourly.getParameters().put(getPartitionTimelineStorageClassKey(HOURLY, getDatePartitionKey()), + EndsAndHolesPartitionTimeline.class.getCanonicalName()); + c2TableHourly.getParameters().put(getPartitionTimelineStorageClassKey(HOURLY, itPart.getName()), + EndsAndHolesPartitionTimeline.class.getCanonicalName()); + c2TableHourly.getParameters().put(getPartitionTimelineStorageClassKey(HOURLY, etPart.getName()), + EndsAndHolesPartitionTimeline.class.getCanonicalName()); + client.pushHiveTable(c2TableHourly); + + assertSameTimelines(factName, new String[] { c1, c2 }, HOURLY, partColNames); + + StoreAllPartitionTimeline timelineDtC1 = ((StoreAllPartitionTimeline) client.partitionTimelineCache + .get(factName, c1, HOURLY, getDatePartitionKey())); + StoreAllPartitionTimeline timelineItC1 = ((StoreAllPartitionTimeline) client.partitionTimelineCache + .get(factName, c1, HOURLY, itPart.getName())); + StoreAllPartitionTimeline timelineEtC1 = ((StoreAllPartitionTimeline) client.partitionTimelineCache + .get(factName, c1, HOURLY, etPart.getName())); + EndsAndHolesPartitionTimeline timelineDt = ((EndsAndHolesPartitionTimeline) client.partitionTimelineCache + .get(factName, c2, HOURLY, getDatePartitionKey())); + EndsAndHolesPartitionTimeline timelineIt = ((EndsAndHolesPartitionTimeline) client.partitionTimelineCache + .get(factName, c2, HOURLY, itPart.getName())); + EndsAndHolesPartitionTimeline timelineEt = ((EndsAndHolesPartitionTimeline) client.partitionTimelineCache + .get(factName, c2, HOURLY, etPart.getName())); + + StoreAllPartitionTimeline timelineC1 = ((StoreAllPartitionTimeline) client.partitionTimelineCache + .get(factName, c1, HOURLY, getDatePartitionKey())); + + Map<String, Date> timeParts1 = getTimePartitionByOffsets(getDatePartitionKey(), 0, itPart.getName(), 0, + etPart.getName(), 0); + StoragePartitionDesc partSpec1 = new StoragePartitionDesc(cubeFact.getName(), timeParts1, null, HOURLY); + + Map<String, Date> timeParts2 = getTimePartitionByOffsets(getDatePartitionKey(), 0, etPart.getName(), 1); + Map<String, String> nonTimeSpec = getHashMap(itPart.getName(), "default"); + final StoragePartitionDesc partSpec2 = new StoragePartitionDesc(cubeFact.getName(), timeParts2, nonTimeSpec, + HOURLY); + + Map<String, Date> timeParts3 = getTimePartitionByOffsets(getDatePartitionKey(), 0, etPart.getName(), 0); + final StoragePartitionDesc partSpec3 = new StoragePartitionDesc(cubeFact.getName(), timeParts3, nonTimeSpec, + HOURLY); + + client.addPartitions(Arrays.asList(partSpec1, partSpec2, partSpec3), c1, CubeTableType.FACT); + client.addPartitions(Arrays.asList(partSpec1, partSpec2, partSpec3), c2, CubeTableType.FACT); + PartitionTimeline timeline1Temp = client.partitionTimelineCache.get(factName, c1, HOURLY, getDatePartitionKey()); + PartitionTimeline timeline2Temp = client.partitionTimelineCache.get(factName, c2, HOURLY, getDatePartitionKey()); + + assertEquals(timeline1Temp.getClass(), StoreAllPartitionTimeline.class); + assertEquals(timeline2Temp.getClass(), EndsAndHolesPartitionTimeline.class); + + assertEquals(client.getAllParts(c1TableNameHourly).size(), 3); + assertEquals(client.getAllParts(c2TableNameHourly).size(), 3); + + assertSameTimelines(factName, new String[] { c1, c2 }, HOURLY, partColNames); + + assertTimeline(timelineDt, timelineDtC1, HOURLY, 0, 0); + assertTimeline(timelineEt, timelineEtC1, HOURLY, 0, 1); + assertTimeline(timelineIt, timelineItC1, HOURLY, 0, 0); + + assertTrue(client.latestPartitionExists(cubeFact.getName(), c1, getDatePartitionKey())); + assertTrue(client.latestPartitionExists(cubeFact.getName(), c1, itPart.getName())); + assertTrue(client.latestPartitionExists(cubeFact.getName(), c2, etPart.getName())); + + assertNoPartitionNamedLatest(c1TableNameHourly, partColNames); + assertNoPartitionNamedLatest(c2TableNameHourly, partColNames); + + client.dropFact(factName, true); + assertFalse(client.tableExists(factName)); + for (String entry : storageTables.keySet()) { + String storageTableName = getFactOrDimtableStorageTableName(factName, entry); + assertFalse(client.tableExists(storageTableName)); + } + } + @Test(priority = 2) public void testAlterDerivedCube() throws Exception { String name = "alter_derived_cube"; @@ -1238,7 +1368,10 @@ public class TestCubeMetastoreClient { s1.setFieldDelim(":"); storageTables.put(c1, s1); storageTables.put(c4, s1); - factTable.addStorage(c4, hourlyAndDaily); + Map<UpdatePeriod, String> updatePeriodStoragePrefix = new HashMap<>(); + updatePeriodStoragePrefix.put(HOURLY, c4); + updatePeriodStoragePrefix.put(DAILY, c4); + factTable.addStorage(c4, hourlyAndDaily, updatePeriodStoragePrefix); client.alterCubeFactTable(factName, factTable, storageTables, new HashMap<String, String>()); CubeFactTable altered2 = client.getCubeFact(factName); assertTrue(client.tableExists(c1TableName)); @@ -1261,7 +1394,12 @@ public class TestCubeMetastoreClient { assertTrue(client.tableExists(c4TableName)); // add storage - client.addStorage(altered2, c3, hourlyAndDaily, s1); + updatePeriodStoragePrefix.clear(); + updatePeriodStoragePrefix.put(HOURLY, c3); + updatePeriodStoragePrefix.put(DAILY, c3); + Map<String, StorageTableDesc> storageTableDescMap = new HashMap<>(); + storageTableDescMap.put(c3, s1); + client.addStorage(altered2, c3, hourlyAndDaily, storageTableDescMap, updatePeriodStoragePrefix); CubeFactTable altered3 = client.getCubeFact(factName); assertTrue(altered3.getStorages().contains("C3")); assertTrue(altered3.getUpdatePeriods().get("C3").equals(hourlyAndDaily)); @@ -1517,14 +1655,16 @@ public class TestCubeMetastoreClient { for (Partition partition : c1Parts) { partition.setLocation("blah"); partition.setBucketCount(random.nextInt()); - client.updatePartition(factName, c1, partition); + client.updatePartition(factName, c1, partition, HOURLY); } assertSamePartitions(client.getAllParts(c1TableName), c1Parts); for (Partition partition : c2Parts) { partition.setLocation("blah"); partition.setBucketCount(random.nextInt()); } - client.updatePartitions(factName, c2, c2Parts); + Map<UpdatePeriod, List<Partition>> partitionMap = new HashMap<>(); + partitionMap.put(HOURLY, c2Parts); + client.updatePartitions(factName, c2, partitionMap); assertSamePartitions(client.getAllParts(c2TableName), c2Parts); assertSameTimelines(factName, storages, HOURLY, partColNames); @@ -1998,7 +2138,6 @@ public class TestCubeMetastoreClient { timePartCols); Map<String, Set<UpdatePeriod>> updatePeriods = getHashMap(c1, updates); Map<String, StorageTableDesc> storageTables = getHashMap(c1, s1); - CubeFactTable cubeFactWithParts = new CubeFactTable(CUBE_NAME, factNameWithPart, factColumns, updatePeriods); // create cube fact http://git-wip-us.apache.org/repos/asf/lens/blob/f0dadd79/lens-server/src/main/java/org/apache/lens/server/metastore/CubeMetastoreServiceImpl.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/metastore/CubeMetastoreServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/metastore/CubeMetastoreServiceImpl.java index 8b10d1d..24660e1 100644 --- a/lens-server/src/main/java/org/apache/lens/server/metastore/CubeMetastoreServiceImpl.java +++ b/lens-server/src/main/java/org/apache/lens/server/metastore/CubeMetastoreServiceImpl.java @@ -238,7 +238,7 @@ public class CubeMetastoreServiceImpl extends BaseLensService implements CubeMet JAXBUtils.dumpPeriodsFromStorageTables(xDimTable.getStorageTables()); Map<String, String> properties = JAXBUtils.mapFromXProperties(xDimTable.getProperties()); - Map<String, StorageTableDesc> storageDesc = JAXBUtils.storageTableMapFromXStorageTables( + Map<String, StorageTableDesc> storageDesc = JAXBUtils.tableDescPrefixMapFromXStorageTables( xDimTable.getStorageTables()); try (SessionContext ignored = new SessionContext(sessionid)){ @@ -289,7 +289,7 @@ public class CubeMetastoreServiceImpl extends BaseLensService implements CubeMet try (SessionContext ignored = new SessionContext(sessionid)){ getClient(sessionid).alterCubeDimensionTable(dimensionTable.getTableName(), JAXBUtils.cubeDimTableFromDimTable(dimensionTable), - JAXBUtils.storageTableMapFromXStorageTables(dimensionTable.getStorageTables())); + JAXBUtils.tableDescPrefixMapFromXStorageTables(dimensionTable.getStorageTables())); log.info("Updated dimension table " + dimensionTable.getTableName()); } catch (HiveException exc) { throw new LensException(exc); @@ -398,15 +398,38 @@ public class CubeMetastoreServiceImpl extends BaseLensService implements CubeMet CubeMetastoreClient msClient = getClient(sessionid); CubeFactTable cft = msClient.getFactTable(fact); XFactTable factTable = JAXBUtils.factTableFromCubeFactTable(cft); + Map<String, Map<UpdatePeriod, String>> storageMap = cft.getStoragePrefixUpdatePeriodMap(); for (String storageName : cft.getStorages()) { Set<UpdatePeriod> updatePeriods = cft.getUpdatePeriods().get(storageName); - XStorageTableElement tblElement = JAXBUtils.getXStorageTableFromHiveTable( - msClient.getHiveTable(MetastoreUtil.getFactOrDimtableStorageTableName(fact, storageName))); - tblElement.setStorageName(storageName); - for (UpdatePeriod p : updatePeriods) { - tblElement.getUpdatePeriods().getUpdatePeriod().add(XUpdatePeriod.valueOf(p.name())); + // This map tells if there are different tables for different update period. + Map<UpdatePeriod, String> updatePeriodToTableMap = storageMap.get(storageName); + Set<String> tableNames = new HashSet<>(); + for (UpdatePeriod updatePeriod : updatePeriods) { + tableNames.add(updatePeriodToTableMap.get(updatePeriod)); + } + if (tableNames.size() <= 1) { + XStorageTableElement tblElement = JAXBUtils.getXStorageTableFromHiveTable( + msClient.getHiveTable(MetastoreUtil.getFactOrDimtableStorageTableName(fact, storageName))); + tblElement.setStorageName(storageName); + for (UpdatePeriod p : updatePeriods) { + tblElement.getUpdatePeriods().getUpdatePeriod().add(XUpdatePeriod.valueOf(p.name())); + } + factTable.getStorageTables().getStorageTable().add(tblElement); + } else { + // Multiple storage tables. + XStorageTableElement tblElement = new XStorageTableElement(); + tblElement.setStorageName(storageName); + XUpdatePeriods xUpdatePeriods = new XUpdatePeriods(); + tblElement.setUpdatePeriods(xUpdatePeriods); + for (Map.Entry entry : updatePeriodToTableMap.entrySet()) { + XUpdatePeriodTableDescriptor updatePeriodTableDescriptor = new XUpdatePeriodTableDescriptor(); + updatePeriodTableDescriptor.setTableDesc(getStorageTableDescFromHiveTable( + msClient.getHiveTable(MetastoreUtil.getFactOrDimtableStorageTableName(fact, (String) entry.getValue())))); + updatePeriodTableDescriptor.setUpdatePeriod(XUpdatePeriod.valueOf(((UpdatePeriod)entry.getKey()).name())); + xUpdatePeriods.getUpdatePeriodTableDescriptor().add(updatePeriodTableDescriptor); + } + factTable.getStorageTables().getStorageTable().add(tblElement); } - factTable.getStorageTables().getStorageTable().add(tblElement); } return factTable; } @@ -431,7 +454,8 @@ public class CubeMetastoreServiceImpl extends BaseLensService implements CubeMet JAXBUtils.getFactUpdatePeriodsFromStorageTables(fact.getStorageTables()), fact.getWeight(), addFactColStartTimePropertyToFactProperties(fact), - JAXBUtils.storageTableMapFromXStorageTables(fact.getStorageTables())); + JAXBUtils.tableDescPrefixMapFromXStorageTables(fact.getStorageTables()), + JAXBUtils.storageTablePrefixMapOfStorage(fact.getStorageTables())); log.info("Created fact table " + fact.getName()); } } @@ -460,7 +484,7 @@ public class CubeMetastoreServiceImpl extends BaseLensService implements CubeMet public void updateFactTable(LensSessionHandle sessionid, XFactTable fact) throws LensException { try (SessionContext ignored = new SessionContext(sessionid)){ getClient(sessionid).alterCubeFactTable(fact.getName(), JAXBUtils.cubeFactFromFactTable(fact), - JAXBUtils.storageTableMapFromXStorageTables(fact.getStorageTables()), + JAXBUtils.tableDescPrefixMapFromXStorageTables(fact.getStorageTables()), JAXBUtils.columnStartAndEndTimeFromXColumns(fact.getColumns())); log.info("Updated fact table " + fact.getName()); } catch (HiveException e) { @@ -587,11 +611,13 @@ public class CubeMetastoreServiceImpl extends BaseLensService implements CubeMet for (XUpdatePeriod sup : storageTable.getUpdatePeriods().getUpdatePeriod()) { updatePeriods.add(UpdatePeriod.valueOf(sup.name())); } - try (SessionContext ignored = new SessionContext(sessionid)){ + try (SessionContext ignored = new SessionContext(sessionid)) { CubeMetastoreClient msClient = getClient(sessionid); - msClient.addStorage(msClient.getFactTable(fact), - storageTable.getStorageName(), updatePeriods, - JAXBUtils.storageTableDescFromXStorageTableElement(storageTable)); + XStorageTables tables = new XStorageTables(); + tables.getStorageTable().add(storageTable); + msClient.addStorage(msClient.getFactTable(fact), storageTable.getStorageName(), updatePeriods, + JAXBUtils.tableDescPrefixMapFromXStorageTables(tables), + JAXBUtils.storageTablePrefixMapOfStorage(tables).get(storageTable.getStorageName())); log.info("Added storage " + storageTable.getStorageName() + ":" + updatePeriods + " for fact " + fact); } } @@ -615,17 +641,34 @@ public class CubeMetastoreServiceImpl extends BaseLensService implements CubeMet return factTable; } + private Set<String> getAllTablesForStorage(LensSessionHandle sessionHandle, String fact, String storageName) + throws LensException { + Set<String> storageTableNames = new HashSet<>(); + if (getClient(sessionHandle).isFactTable(fact)) { + CubeFactTable cft = getClient(sessionHandle).getCubeFact(fact); + Map<UpdatePeriod, String> storageMap = cft.getStoragePrefixUpdatePeriodMap().get(storageName); + for (Map.Entry entry : storageMap.entrySet()) { + storageTableNames.add(MetastoreUtil.getStorageTableName(fact, Storage.getPrefix((String) entry.getValue()))); + } + } else { + storageTableNames.add(MetastoreUtil.getFactOrDimtableStorageTableName(fact, storageName)); + } + return storageTableNames; + } + @Override - public XPartitionList getAllPartitionsOfFactStorage( - LensSessionHandle sessionid, String fact, String storageName, + public XPartitionList getAllPartitionsOfFactStorage(LensSessionHandle sessionid, String fact, String storageName, String filter) throws LensException { - try (SessionContext ignored = new SessionContext(sessionid)){ + try (SessionContext ignored = new SessionContext(sessionid)) { checkFactStorage(sessionid, fact, storageName); CubeMetastoreClient client = getClient(sessionid); - String storageTableName = MetastoreUtil.getFactOrDimtableStorageTableName(fact, - storageName); - List<Partition> parts = client.getPartitionsByFilter(storageTableName, filter); - List<String> timePartCols = client.getTimePartColNamesOfTable(storageTableName); + Set<String> storageTableNames = getAllTablesForStorage(sessionid, fact, storageName); + List<Partition> parts = new ArrayList<>(); + List<String> timePartCols = new ArrayList<>(); + for (String storageTableName : storageTableNames) { + parts.addAll(client.getPartitionsByFilter(storageTableName, filter)); + timePartCols.addAll(client.getTimePartColNamesOfTable(storageTableName)); + } return xpartitionListFromPartitionList(fact, parts, timePartCols); } catch (HiveException exc) { throw new LensException(exc); @@ -635,10 +678,10 @@ public class CubeMetastoreServiceImpl extends BaseLensService implements CubeMet @Override public int addPartitionToFactStorage(LensSessionHandle sessionid, String fact, String storageName, XPartition partition) throws LensException { - try (SessionContext ignored = new SessionContext(sessionid)){ + try (SessionContext ignored = new SessionContext(sessionid)) { checkFactStorage(sessionid, fact, storageName); - return getClient(sessionid).addPartition(storagePartSpecFromXPartition(partition), storageName, - CubeTableType.FACT).size(); + return getClient(sessionid) + .addPartition(storagePartSpecFromXPartition(partition), storageName, CubeTableType.FACT).size(); } catch (HiveException exc) { throw new LensException(exc); } @@ -647,10 +690,10 @@ public class CubeMetastoreServiceImpl extends BaseLensService implements CubeMet @Override public int addPartitionsToFactStorage(LensSessionHandle sessionid, String fact, String storageName, XPartitionList partitions) throws LensException { - try (SessionContext ignored = new SessionContext(sessionid)){ + try (SessionContext ignored = new SessionContext(sessionid)) { checkFactStorage(sessionid, fact, storageName); - return getClient(sessionid).addPartitions(storagePartSpecListFromXPartitionList(partitions), storageName, - CubeTableType.FACT).size(); + return getClient(sessionid) + .addPartitions(storagePartSpecListFromXPartitionList(partitions), storageName, CubeTableType.FACT).size(); } catch (HiveException exc) { throw new LensException(exc); } @@ -693,15 +736,17 @@ public class CubeMetastoreServiceImpl extends BaseLensService implements CubeMet } @Override - public void updatePartition(LensSessionHandle sessionid, String tblName, String storageName, - XPartition xPartition) throws LensException { - try (SessionContext ignored = new SessionContext(sessionid)){ + public void updatePartition(LensSessionHandle sessionid, String tblName, String storageName, XPartition xPartition) + throws LensException { + try (SessionContext ignored = new SessionContext(sessionid)) { CubeMetastoreClient client = getClient(sessionid); - String storageTableName = MetastoreUtil.getFactOrDimtableStorageTableName(tblName, storageName); + String storageTableName = client + .getStorageTableName(tblName, storageName, UpdatePeriod.valueOf(xPartition.getUpdatePeriod().name())); Partition existingPartition = client.getPartitionByFilter(storageTableName, StorageConstants.getPartFilter(JAXBUtils.getFullPartSpecAsMap(xPartition))); JAXBUtils.updatePartitionFromXPartition(existingPartition, xPartition); - client.updatePartition(tblName, storageName, existingPartition); + client.updatePartition(tblName, storageName, existingPartition, + UpdatePeriod.valueOf(xPartition.getUpdatePeriod().value())); } catch (HiveException | ClassNotFoundException | InvalidOperationException | UnsupportedOperationException exc) { throw new LensException(exc); } @@ -710,15 +755,23 @@ public class CubeMetastoreServiceImpl extends BaseLensService implements CubeMet @Override public void updatePartitions(LensSessionHandle sessionid, String tblName, String storageName, XPartitionList xPartitions) throws LensException { - try (SessionContext ignored = new SessionContext(sessionid)){ + try (SessionContext ignored = new SessionContext(sessionid)) { CubeMetastoreClient client = getClient(sessionid); - String storageTableName = MetastoreUtil.getFactOrDimtableStorageTableName(tblName, storageName); - List<Partition> partitionsToUpdate = new ArrayList<>(xPartitions.getPartition().size()); - for (XPartition xPartition : xPartitions.getPartition()) { - Partition existingPartition = client.getPartitionByFilter(storageTableName, - StorageConstants.getPartFilter(JAXBUtils.getFullPartSpecAsMap(xPartition))); - JAXBUtils.updatePartitionFromXPartition(existingPartition, xPartition); - partitionsToUpdate.add(existingPartition); + Set<String> storageTableNames = getAllTablesForStorage(sessionid, tblName, storageName); + Map<UpdatePeriod, List<Partition>> partitionsToUpdate = new HashMap<>(); + for (String storageTableName : storageTableNames) { + for (XPartition xPartition : xPartitions.getPartition()) { + Partition existingPartition = client.getPartitionByFilter(storageTableName, + StorageConstants.getPartFilter(JAXBUtils.getFullPartSpecAsMap(xPartition))); + JAXBUtils.updatePartitionFromXPartition(existingPartition, xPartition); + UpdatePeriod updatePeriod = UpdatePeriod.valueOf(xPartition.getUpdatePeriod().value()); + List<Partition> partitionList = partitionsToUpdate.get(updatePeriod); + if (partitionList == null) { + partitionList = new ArrayList<>(); + partitionsToUpdate.put(updatePeriod, partitionList); + } + partitionList.add(existingPartition); + } } client.updatePartitions(tblName, storageName, partitionsToUpdate); } catch (HiveException | ClassNotFoundException | InvalidOperationException exc) { @@ -787,29 +840,35 @@ public class CubeMetastoreServiceImpl extends BaseLensService implements CubeMet return period; } - public void dropPartitionFromStorageByValues(LensSessionHandle sessionid, - String cubeTableName, String storageName, String values) throws LensException { - try (SessionContext ignored = new SessionContext(sessionid)){ - String tableName = MetastoreUtil.getStorageTableName(cubeTableName, - Storage.getPrefix(storageName)); + public void dropPartitionFromStorageByValues(LensSessionHandle sessionid, String cubeTableName, String storageName, + String values) throws LensException { + try (SessionContext ignored = new SessionContext(sessionid)) { + Set<String> storageTables = getAllTablesForStorage(sessionid, cubeTableName, storageName); + Map<String, List<Partition>> partitions = new HashMap<>(); CubeMetastoreClient msClient = getClient(sessionid); - String filter = getFilter(msClient, tableName, values); - List<Partition> partitions = msClient.getPartitionsByFilter( - tableName, filter); - if (partitions.size() > 1) { - log.error("More than one partition with specified values, correspoding filter:" + filter); - throw new BadRequestException("More than one partition with specified values"); - } else if (partitions.size() == 0) { - log.error("No partition exists with specified values, correspoding filter:" + filter); + int totalPartitions = 0; + Partition part = null; + for (String tableName : storageTables) { + String filter = getFilter(msClient, tableName, values); + partitions.put(filter, msClient.getPartitionsByFilter(tableName, filter)); + if (partitions.get(filter).size() > 1) { + log.error("More than one partition with specified values, corresponding filter:" + filter); + throw new BadRequestException("More than one partition with specified values"); + } + if (partitions.get(filter).size() == 1) { + part = partitions.get(filter).get(0); + } + totalPartitions += partitions.get(filter).size(); + } + if (totalPartitions == 0) { + log.error("No partition exists with specified values"); throw new NotFoundException("No partition exists with specified values"); } Map<String, Date> timeSpec = new HashMap<>(); Map<String, String> nonTimeSpec = new HashMap<>(); - UpdatePeriod updatePeriod = populatePartSpec(partitions.get(0), timeSpec, nonTimeSpec); - msClient.dropPartition(cubeTableName, - storageName, timeSpec, nonTimeSpec, updatePeriod); - log.info("Dropped partition for dimension: " + cubeTableName - + " storage: " + storageName + " values:" + values); + UpdatePeriod updatePeriod = populatePartSpec(part, timeSpec, nonTimeSpec); + msClient.dropPartition(cubeTableName, storageName, timeSpec, nonTimeSpec, updatePeriod); + log.info("Dropped partition for dimension: " + cubeTableName + " storage: " + storageName + " values:" + values); } catch (HiveException exc) { throw new LensException(exc); } @@ -818,9 +877,12 @@ public class CubeMetastoreServiceImpl extends BaseLensService implements CubeMet public void dropPartitionFromStorageByFilter(LensSessionHandle sessionid, String cubeTableName, String storageName, String filter) throws LensException { try (SessionContext ignored = new SessionContext(sessionid)){ - String tableName = MetastoreUtil.getStorageTableName(cubeTableName, Storage.getPrefix(storageName)); + Set<String> storageTables = getAllTablesForStorage(sessionid, cubeTableName, storageName); + List<Partition> partitions = new ArrayList<>(); CubeMetastoreClient msClient = getClient(sessionid); - List<Partition> partitions = msClient.getPartitionsByFilter(tableName, filter); + for (String tableName : storageTables) { + partitions.addAll(msClient.getPartitionsByFilter(tableName, filter)); + } for (Partition part : partitions) { try { Map<String, Date> timeSpec = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/lens/blob/f0dadd79/lens-server/src/main/java/org/apache/lens/server/metastore/JAXBUtils.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/metastore/JAXBUtils.java b/lens-server/src/main/java/org/apache/lens/server/metastore/JAXBUtils.java index 51fcb43..0bc8e77 100644 --- a/lens-server/src/main/java/org/apache/lens/server/metastore/JAXBUtils.java +++ b/lens-server/src/main/java/org/apache/lens/server/metastore/JAXBUtils.java @@ -45,7 +45,6 @@ import org.apache.hadoop.mapred.InputFormat; import com.google.common.base.Optional; import com.google.common.collect.Maps; - import lombok.extern.slf4j.Slf4j; /** @@ -588,14 +587,22 @@ public final class JAXBUtils { return cols; } - public static Map<String, Set<UpdatePeriod>> getFactUpdatePeriodsFromStorageTables( - XStorageTables storageTables) { + public static Map<String, Set<UpdatePeriod>> getFactUpdatePeriodsFromStorageTables(XStorageTables storageTables) { if (storageTables != null && !storageTables.getStorageTable().isEmpty()) { Map<String, Set<UpdatePeriod>> factUpdatePeriods = new LinkedHashMap<String, Set<UpdatePeriod>>(); for (XStorageTableElement ste : storageTables.getStorageTable()) { - Set<UpdatePeriod> updatePeriods = new TreeSet<UpdatePeriod>(); - for (XUpdatePeriod upd : ste.getUpdatePeriods().getUpdatePeriod()) { + Set<UpdatePeriod> updatePeriods = new TreeSet<>(); + // Check if the update period array is empty. + List<XUpdatePeriod> updatePeriodList = ste.getUpdatePeriods().getUpdatePeriod(); + if (updatePeriodList.isEmpty()) { + List<XUpdatePeriodTableDescriptor> tableDescriptorList = ste.getUpdatePeriods() + .getUpdatePeriodTableDescriptor(); + for (XUpdatePeriodTableDescriptor tableDescriptor : tableDescriptorList) { + updatePeriodList.add(tableDescriptor.getUpdatePeriod()); + } + } + for (XUpdatePeriod upd : updatePeriodList) { updatePeriods.add(UpdatePeriod.valueOf(upd.name())); } factUpdatePeriods.put(ste.getStorageName(), updatePeriods); @@ -706,13 +713,10 @@ public final class JAXBUtils { Map<String, Set<UpdatePeriod>> storageUpdatePeriods = getFactUpdatePeriodsFromStorageTables( fact.getStorageTables()); - - return new CubeFactTable(fact.getCubeName(), - fact.getName(), - columns, - storageUpdatePeriods, - fact.getWeight(), - mapFromXProperties(fact.getProperties())); + Map<String, Map<UpdatePeriod, String>> storageTablePrefixMap = storageTablePrefixMapOfStorage( + fact.getStorageTables()); + return new CubeFactTable(fact.getCubeName(), fact.getName(), columns, storageUpdatePeriods, fact.getWeight(), + mapFromXProperties(fact.getProperties()), storageTablePrefixMap); } public static Segmentation segmentationFromXSegmentation(XSegmentation seg) throws LensException { @@ -849,11 +853,43 @@ public final class JAXBUtils { return tblDesc; } - public static Map<String, StorageTableDesc> storageTableMapFromXStorageTables(XStorageTables storageTables) { - Map<String, StorageTableDesc> storageTableMap = new HashMap<String, StorageTableDesc>(); + public static Map<String, StorageTableDesc> tableDescPrefixMapFromXStorageTables(XStorageTables storageTables) { + Map<String, StorageTableDesc> storageTablePrefixToDescMap = new HashMap<>(); + if (storageTables != null && !storageTables.getStorageTable().isEmpty()) { + for (XStorageTableElement sTbl : storageTables.getStorageTable()) { + if (!sTbl.getUpdatePeriods().getUpdatePeriodTableDescriptor().isEmpty()) { + for (XUpdatePeriodTableDescriptor updatePeriodTable : sTbl.getUpdatePeriods() + .getUpdatePeriodTableDescriptor()) { + // Get table name with update period as the prefix. + storageTablePrefixToDescMap.put(updatePeriodTable.getUpdatePeriod() + "_" + sTbl.getStorageName(), + storageTableDescFromXStorageTableDesc(updatePeriodTable.getTableDesc())); + } + } else { + storageTablePrefixToDescMap.put(sTbl.getStorageName(), storageTableDescFromXStorageTableElement(sTbl)); + } + } + } + return storageTablePrefixToDescMap; + } + + public static Map<String, Map<UpdatePeriod, String>> storageTablePrefixMapOfStorage(XStorageTables storageTables) { + Map<String, Map<UpdatePeriod, String>> storageTableMap = new HashMap<>(); if (storageTables != null && !storageTables.getStorageTable().isEmpty()) { for (XStorageTableElement sTbl : storageTables.getStorageTable()) { - storageTableMap.put(sTbl.getStorageName(), storageTableDescFromXStorageTableElement(sTbl)); + Map<UpdatePeriod, String> storageNameMap = new HashMap<>(); + if (!sTbl.getUpdatePeriods().getUpdatePeriodTableDescriptor().isEmpty()) { + for (XUpdatePeriodTableDescriptor updatePeriodTable : sTbl.getUpdatePeriods() + .getUpdatePeriodTableDescriptor()) { + // Get table name with update period as the prefix. + storageNameMap.put(UpdatePeriod.valueOf(updatePeriodTable.getUpdatePeriod().value()), + updatePeriodTable.getUpdatePeriod() + "_" + sTbl.getStorageName()); + } + } else { + for (XUpdatePeriod updatePeriod :sTbl.getUpdatePeriods().getUpdatePeriod()) { + storageNameMap.put(UpdatePeriod.valueOf(updatePeriod.value()), sTbl.getStorageName()); + } + } + storageTableMap.put(sTbl.getStorageName(), storageNameMap); } } return storageTableMap;