Repository: kylin Updated Branches: refs/heads/master 8f5442bff -> 2b929c209
minor refactor, code format Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/2b929c20 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/2b929c20 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/2b929c20 Branch: refs/heads/master Commit: 2b929c20978d74e0416543e1b943920476f48a8b Parents: 8f5442b Author: Li Yang <liy...@apache.org> Authored: Thu Aug 18 14:51:53 2016 +0800 Committer: Li Yang <liy...@apache.org> Committed: Thu Aug 18 14:52:34 2016 +0800 ---------------------------------------------------------------------- .../kylin/common/restclient/Broadcaster.java | 532 ++++++------ .../org/apache/kylin/cube/model/CubeDesc.java | 5 +- .../apache/kylin/metadata/MetadataManager.java | 4 +- .../kylin/metadata/model/DataModelDesc.java | 837 ++++++++++--------- 4 files changed, 697 insertions(+), 681 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/2b929c20/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java b/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java index 0176ad7..230888f 100644 --- a/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java +++ b/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java @@ -1,260 +1,272 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.common.restclient; - -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.BlockingDeque; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.commons.lang.StringUtils; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.DaemonThreadFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Objects; -import com.google.common.collect.Lists; - -/** - * Broadcast kylin event out - */ -public class Broadcaster { - - private static final Logger logger = LoggerFactory.getLogger(Broadcaster.class); - - // static cached instances - private static final ConcurrentHashMap<KylinConfig, Broadcaster> CACHE = new ConcurrentHashMap<KylinConfig, Broadcaster>(); - - public static Broadcaster getInstance(KylinConfig config) { - Broadcaster r = CACHE.get(config); - if (r != null) { - return r; - } - - synchronized (Broadcaster.class) { - r = CACHE.get(config); - if (r != null) { - return r; - } - - r = new Broadcaster(config); - CACHE.put(config, r); - if (CACHE.size() > 1) { - logger.warn("More than one singleton exist"); - } - return r; - } - } - - public static void clearCache() { - CACHE.clear(); - } - - // ============================================================================ - - private BlockingDeque<BroadcastEvent> broadcastEvents = new LinkedBlockingDeque<>(); - - private AtomicLong counter = new AtomicLong(); - - private Broadcaster(final KylinConfig config) { - final String[] nodes = config.getRestServers(); - if (nodes == null || nodes.length < 1) { - logger.warn("There is no available rest server; check the 'kylin.rest.servers' config"); - broadcastEvents = null; // disable the broadcaster - return; - } - logger.debug(nodes.length + " nodes in the cluster: " + Arrays.toString(nodes)); - - Executors.newSingleThreadExecutor(new DaemonThreadFactory()).execute(new Runnable() { - @Override - public void run() { - final List<RestClient> restClients = Lists.newArrayList(); - for (String node : nodes) { - restClients.add(new RestClient(node)); - } - final ExecutorService wipingCachePool = Executors.newFixedThreadPool(restClients.size()); - while (true) { - try { - final BroadcastEvent broadcastEvent = broadcastEvents.takeFirst(); - logger.info("new broadcast event:" + broadcastEvent); - for (final RestClient restClient : restClients) { - wipingCachePool.execute(new Runnable() { - @Override - public void run() { - try { - restClient.wipeCache(broadcastEvent.getType(), broadcastEvent.getAction(), broadcastEvent.getName()); - } catch (IOException e) { - logger.warn("Thread failed during wipe cache at " + broadcastEvent); - } - } - }); - } - } catch (Exception e) { - logger.error("error running wiping", e); - } - } - } - }); - } - - /** - * Broadcast the cubedesc event out - * - * @param action - * event action - */ - public void queue(String type, String action, String key) { - if (broadcastEvents == null) - return; - - try { - counter.incrementAndGet(); - broadcastEvents.putFirst(new BroadcastEvent(type, action, key)); - } catch (Exception e) { - counter.decrementAndGet(); - logger.error("error putting BroadcastEvent", e); - } - } - - public long getCounterAndClear() { - return counter.getAndSet(0); - } - - public enum EVENT { - - CREATE("create"), UPDATE("update"), DROP("drop"); - private String text; - - EVENT(String text) { - this.text = text; - } - - public String getType() { - return text; - } - - public static EVENT getEvent(String event) { - for (EVENT one : values()) { - if (one.getType().equalsIgnoreCase(event)) { - return one; - } - } - - return null; - } - } - - public enum TYPE { - ALL("all"), CUBE("cube"), STREAMING("streaming"), KAFKA("kafka"), CUBE_DESC("cube_desc"), PROJECT("project"), INVERTED_INDEX("inverted_index"), INVERTED_INDEX_DESC("ii_desc"), TABLE("table"), DATA_MODEL("data_model"), EXTERNAL_FILTER("external_filter"), HYBRID("hybrid"); - private String text; - - TYPE(String text) { - this.text = text; - } - - public String getType() { - return text; - } - - /** - * @param type - * @return - */ - public static TYPE getType(String type) { - for (TYPE one : values()) { - if (one.getType().equalsIgnoreCase(type)) { - return one; - } - } - - return null; - } - } - - public static class BroadcastEvent { - private String type; - private String action; - private String name; - - public BroadcastEvent(String type, String action, String name) { - super(); - this.type = type; - this.action = action; - this.name = name; - } - - public String getType() { - return type; - } - - public String getAction() { - return action; - } - - public String getName() { - return name; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((action == null) ? 0 : action.hashCode()); - result = prime * result + ((name == null) ? 0 : name.hashCode()); - result = prime * result + ((type == null) ? 0 : type.hashCode()); - return result; - } - - @Override - public boolean equals(Object obj) { - if (obj == null) { - return false; - } - if (this == obj) { - return true; - } - if (getClass() != obj.getClass()) { - return false; - } - BroadcastEvent other = (BroadcastEvent) obj; - if (!StringUtils.equals(action, other.action)) { - return false; - } - if (!StringUtils.equals(name, other.name)) { - return false; - } - if (!StringUtils.equals(type, other.type)) { - return false; - } - return true; - } - - @Override - public String toString() { - return Objects.toStringHelper(this).add("type", type).add("name", name).add("action", action).toString(); - } - - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.common.restclient; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.lang.StringUtils; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.DaemonThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Objects; +import com.google.common.collect.Lists; + +/** + * Broadcast kylin event out + */ +public class Broadcaster { + + private static final Logger logger = LoggerFactory.getLogger(Broadcaster.class); + + // static cached instances + private static final ConcurrentHashMap<KylinConfig, Broadcaster> CACHE = new ConcurrentHashMap<KylinConfig, Broadcaster>(); + + public static Broadcaster getInstance(KylinConfig config) { + Broadcaster r = CACHE.get(config); + if (r != null) { + return r; + } + + synchronized (Broadcaster.class) { + r = CACHE.get(config); + if (r != null) { + return r; + } + + r = new Broadcaster(config); + CACHE.put(config, r); + if (CACHE.size() > 1) { + logger.warn("More than one singleton exist"); + } + return r; + } + } + + public static void clearCache() { + CACHE.clear(); + } + + // ============================================================================ + + private BlockingDeque<BroadcastEvent> broadcastEvents = new LinkedBlockingDeque<>(); + + private AtomicLong counter = new AtomicLong(); + + private Broadcaster(final KylinConfig config) { + final String[] nodes = config.getRestServers(); + if (nodes == null || nodes.length < 1) { + logger.warn("There is no available rest server; check the 'kylin.rest.servers' config"); + broadcastEvents = null; // disable the broadcaster + return; + } + logger.debug(nodes.length + " nodes in the cluster: " + Arrays.toString(nodes)); + + Executors.newSingleThreadExecutor(new DaemonThreadFactory()).execute(new Runnable() { + @Override + public void run() { + final List<RestClient> restClients = Lists.newArrayList(); + for (String node : nodes) { + restClients.add(new RestClient(node)); + } + final ExecutorService wipingCachePool = Executors.newFixedThreadPool(restClients.size()); + while (true) { + try { + final BroadcastEvent broadcastEvent = broadcastEvents.takeFirst(); + logger.info("new broadcast event:" + broadcastEvent); + for (final RestClient restClient : restClients) { + wipingCachePool.execute(new Runnable() { + @Override + public void run() { + try { + restClient.wipeCache(broadcastEvent.getType(), broadcastEvent.getAction(), broadcastEvent.getName()); + } catch (IOException e) { + logger.warn("Thread failed during wipe cache at " + broadcastEvent); + } + } + }); + } + } catch (Exception e) { + logger.error("error running wiping", e); + } + } + } + }); + } + + /** + * Broadcast the cubedesc event out + * + * @param action + * event action + */ + public void queue(String type, String action, String key) { + if (broadcastEvents == null) + return; + + try { + counter.incrementAndGet(); + broadcastEvents.putFirst(new BroadcastEvent(type, action, key)); + } catch (Exception e) { + counter.decrementAndGet(); + logger.error("error putting BroadcastEvent", e); + } + } + + public long getCounterAndClear() { + return counter.getAndSet(0); + } + + public enum EVENT { + + CREATE("create"), UPDATE("update"), DROP("drop"); + private String text; + + EVENT(String text) { + this.text = text; + } + + public String getType() { + return text; + } + + public static EVENT getEvent(String event) { + for (EVENT one : values()) { + if (one.getType().equalsIgnoreCase(event)) { + return one; + } + } + + return null; + } + } + + public enum TYPE { + ALL("all"), // + PROJECT("project"), // + CUBE("cube"), // + CUBE_DESC("cube_desc"), // + STREAMING("streaming"), // + KAFKA("kafka"), // + INVERTED_INDEX("inverted_index"), // + INVERTED_INDEX_DESC("ii_desc"), // + TABLE("table"), // + DATA_MODEL("data_model"), // + EXTERNAL_FILTER("external_filter"), // + HYBRID("hybrid"); + + private String text; + + TYPE(String text) { + this.text = text; + } + + public String getType() { + return text; + } + + /** + * @param type + * @return + */ + public static TYPE getType(String type) { + for (TYPE one : values()) { + if (one.getType().equalsIgnoreCase(type)) { + return one; + } + } + + return null; + } + } + + public static class BroadcastEvent { + private String type; + private String action; + private String name; + + public BroadcastEvent(String type, String action, String name) { + super(); + this.type = type; + this.action = action; + this.name = name; + } + + public String getType() { + return type; + } + + public String getAction() { + return action; + } + + public String getName() { + return name; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((action == null) ? 0 : action.hashCode()); + result = prime * result + ((name == null) ? 0 : name.hashCode()); + result = prime * result + ((type == null) ? 0 : type.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (this == obj) { + return true; + } + if (getClass() != obj.getClass()) { + return false; + } + BroadcastEvent other = (BroadcastEvent) obj; + if (!StringUtils.equals(action, other.action)) { + return false; + } + if (!StringUtils.equals(name, other.name)) { + return false; + } + if (!StringUtils.equals(type, other.type)) { + return false; + } + return true; + } + + @Override + public String toString() { + return Objects.toStringHelper(this).add("type", type).add("name", name).add("action", action).toString(); + } + + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/2b929c20/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java index c0f3ed8..2c83972 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java @@ -271,10 +271,7 @@ public class CubeDesc extends RootPersistentEntity { List<DeriveInfo> wantedInfo = new ArrayList<DeriveInfo>(); for (DeriveInfo info : entry.getValue()) { - if (wantedCols == null || Collections.disjoint(wantedCols, Arrays.asList(info.columns)) == false) // has - // any - // wanted - // columns? + if (wantedCols == null || Collections.disjoint(wantedCols, Arrays.asList(info.columns)) == false) // has any wanted columns? wantedInfo.add(info); } http://git-wip-us.apache.org/repos/asf/kylin/blob/2b929c20/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java index e46fcec..7d45710 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java @@ -447,7 +447,7 @@ public class MetadataManager { ResourceStore store = getStore(); try { DataModelDesc dataModelDesc = store.getResource(path, DataModelDesc.class, MODELDESC_SERIALIZER); - dataModelDesc.init(this.getAllTablesMap()); + dataModelDesc.init(config, this.getAllTablesMap()); dataModelDescMap.putLocal(dataModelDesc.getName(), dataModelDesc); return dataModelDesc; } catch (IOException e) { @@ -495,7 +495,7 @@ public class MetadataManager { } private DataModelDesc saveDataModelDesc(DataModelDesc dataModelDesc) throws IOException { - dataModelDesc.init(this.getAllTablesMap()); + dataModelDesc.init(config, this.getAllTablesMap()); String path = dataModelDesc.getResourcePath(); getStore().putResource(path, dataModelDesc, MODELDESC_SERIALIZER); http://git-wip-us.apache.org/repos/asf/kylin/blob/2b929c20/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java index ebdfa99..5f39049 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java @@ -1,415 +1,422 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.metadata.model; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Map; - -import org.apache.commons.lang.ArrayUtils; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.common.persistence.RootPersistentEntity; -import org.apache.kylin.common.util.StringUtil; -import org.apache.kylin.metadata.MetadataConstants; - -import com.fasterxml.jackson.annotation.JsonAutoDetect; -import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@SuppressWarnings("serial") -@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) -public class DataModelDesc extends RootPersistentEntity { - private static final Logger logger = LoggerFactory.getLogger(DataModelDesc.class); - public static enum RealizationCapacity { - SMALL, MEDIUM, LARGE - } - - @JsonProperty("name") - private String name; - - @JsonProperty("owner") - private String owner; - - @JsonProperty("description") - private String description; - - @JsonProperty("fact_table") - private String factTable; - - @JsonProperty("lookups") - private LookupDesc[] lookups; - - @JsonProperty("dimensions") - private List<ModelDimensionDesc> dimensions; - - @JsonProperty("metrics") - private String[] metrics; - - @JsonProperty("filter_condition") - private String filterCondition; - - @JsonProperty("partition_desc") - PartitionDesc partitionDesc; - - @JsonProperty("capacity") - private RealizationCapacity capacity = RealizationCapacity.MEDIUM; - - private TableDesc factTableDesc; - - private List<TableDesc> lookupTableDescs = Lists.newArrayList(); - - /** - * Error messages during resolving json metadata - */ - private List<String> errors = new ArrayList<String>(); - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public String getOwner() { - return owner; - } - - public void setOwner(String owner) { - this.owner = owner; - } - - public String getDescription() { - return description; - } - - public void setDescription(String description) { - this.description = description; - } - - public Collection<String> getAllTables() { - HashSet<String> ret = Sets.newHashSet(); - ret.add(factTable); - for (LookupDesc lookupDesc : lookups) - ret.add(lookupDesc.getTable()); - return ret; - } - - public String getFactTable() { - return factTable; - } - - public TableDesc getFactTableDesc() { - return factTableDesc; - } - - public List<TableDesc> getLookupTableDescs() { - return lookupTableDescs; - } - - public void setFactTable(String factTable) { - this.factTable = factTable.toUpperCase(); - } - - public LookupDesc[] getLookups() { - return lookups; - } - - public void setLookups(LookupDesc[] lookups) { - this.lookups = lookups; - } - - public boolean isFactTable(String factTable) { - return this.factTable.equalsIgnoreCase(factTable); - } - - public String getFilterCondition() { - return filterCondition; - } - - public void setFilterCondition(String filterCondition) { - this.filterCondition = filterCondition; - } - - public PartitionDesc getPartitionDesc() { - return partitionDesc; - } - - public void setPartitionDesc(PartitionDesc partitionDesc) { - this.partitionDesc = partitionDesc; - } - - public RealizationCapacity getCapacity() { - return capacity; - } - - public void setCapacity(RealizationCapacity capacity) { - this.capacity = capacity; - } - - public TblColRef findPKByFK(TblColRef fk, String joinType) { - assert isFactTable(fk.getTable()); - - TblColRef candidate = null; - - for (LookupDesc dim : lookups) { - JoinDesc join = dim.getJoin(); - if (join == null) - continue; - - if (joinType != null && !joinType.equals(join.getType())) - continue; - - int find = ArrayUtils.indexOf(join.getForeignKeyColumns(), fk); - if (find >= 0) { - candidate = join.getPrimaryKeyColumns()[find]; - if (join.getForeignKeyColumns().length == 1) { // is single - // column join? - break; - } - } - } - return candidate; - } - - // TODO let this replace CubeDesc.buildColumnNameAbbreviation() - public ColumnDesc findColumn(String column) { - ColumnDesc colDesc = null; - - int cut = column.lastIndexOf('.'); - if (cut > 0) { - // table specified - String table = column.substring(0, cut); - TableDesc tableDesc = findTable(table); - colDesc = tableDesc.findColumnByName(column.substring(cut + 1)); - } else { - // table not specified, try each table - colDesc = factTableDesc.findColumnByName(column); - if (colDesc == null) { - for (TableDesc tableDesc : lookupTableDescs) { - colDesc = tableDesc.findColumnByName(column); - if (colDesc != null) - break; - } - } - } - - if (colDesc == null) - throw new IllegalArgumentException("Column not found by " + column); - - return colDesc; - } - - public TableDesc findTable(String table) { - if (factTableDesc.getName().equalsIgnoreCase(table) || factTableDesc.getIdentity().equalsIgnoreCase(table)) - return factTableDesc; - - for (TableDesc desc : lookupTableDescs) { - if (desc.getName().equalsIgnoreCase(table) || desc.getIdentity().equalsIgnoreCase(table)) - return desc; - } - - throw new IllegalArgumentException("Table not found by " + table); - } - - public void init(Map<String, TableDesc> tables) { - this.factTable = this.factTable.toUpperCase(); - this.factTableDesc = tables.get(this.factTable.toUpperCase()); - if (factTableDesc == null) { - throw new IllegalStateException("Fact table does not exist:" + this.factTable); - } - - initJoinColumns(tables); - ModelDimensionDesc.capicalizeStrings(dimensions); - initPartitionDesc(tables); - } - - private void initPartitionDesc(Map<String, TableDesc> tables) { - if (this.partitionDesc != null) - this.partitionDesc.init(tables); - } - - private void initJoinColumns(Map<String, TableDesc> tables) { - // join columns may or may not present in cube; - // here we don't modify 'allColumns' and 'dimensionColumns'; - // initDimensionColumns() will do the update - for (LookupDesc lookup : this.lookups) { - lookup.setTable(lookup.getTable().toUpperCase()); - TableDesc dimTable = tables.get(lookup.getTable()); - if (dimTable == null) { - throw new IllegalStateException("Table " + lookup.getTable() + " does not exist for " + this); - } - this.lookupTableDescs.add(dimTable); - - JoinDesc join = lookup.getJoin(); - if (join == null) - continue; - - StringUtil.toUpperCaseArray(join.getForeignKey(), join.getForeignKey()); - StringUtil.toUpperCaseArray(join.getPrimaryKey(), join.getPrimaryKey()); - - // primary key - String[] pks = join.getPrimaryKey(); - TblColRef[] pkCols = new TblColRef[pks.length]; - for (int i = 0; i < pks.length; i++) { - ColumnDesc col = dimTable.findColumnByName(pks[i]); - if (col == null) { - throw new IllegalStateException("Can't find column " + pks[i] + " in table " + dimTable.getIdentity()); - } - TblColRef colRef = new TblColRef(col); - pks[i] = colRef.getName(); - pkCols[i] = colRef; - } - join.setPrimaryKeyColumns(pkCols); - - // foreign key - String[] fks = join.getForeignKey(); - TblColRef[] fkCols = new TblColRef[fks.length]; - for (int i = 0; i < fks.length; i++) { - ColumnDesc col = factTableDesc.findColumnByName(fks[i]); - if (col == null) { - throw new IllegalStateException("Can't find column " + fks[i] + " in table " + this.getFactTable()); - } - TblColRef colRef = new TblColRef(col); - fks[i] = colRef.getName(); - fkCols[i] = colRef; - } - join.setForeignKeyColumns(fkCols); - - // Validate join in dimension - if (pkCols.length != fkCols.length) { - throw new IllegalStateException("Primary keys(" + lookup.getTable() + ")" + Arrays.toString(pks) + " are not consistent with Foreign keys(" + this.getFactTable() + ") " + Arrays.toString(fks)); - } - for (int i = 0; i < fkCols.length; i++) { - if (!fkCols[i].getDatatype().equals(pkCols[i].getDatatype())) { - final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - final String msg = "Primary key " + lookup.getTable() + "." + pkCols[i].getName() + "." + pkCols[i].getDatatype() + " are not consistent with Foreign key " + this.getFactTable() + "." + fkCols[i].getName() + "." + fkCols[i].getDatatype(); - if (kylinConfig.getTableJoinTypeCheck() == true) { - throw new IllegalStateException(msg); - } else { - logger.warn(msg); - } - } - } - - } - } - - /** * Add error info and thrown exception out - * - * @param message - */ - public void addError(String message) { - addError(message, false); - } - - /** - * @param message - * error message - * @param silent - * if throw exception - */ - public void addError(String message, boolean silent) { - if (!silent) { - throw new IllegalStateException(message); - } else { - this.errors.add(message); - } - } - - public List<String> getError() { - return this.errors; - } - - @Override - public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - - DataModelDesc modelDesc = (DataModelDesc) o; - - if (!name.equals(modelDesc.name)) - return false; - if (!getFactTable().equals(modelDesc.getFactTable())) - return false; - - return true; - } - - @Override - public int hashCode() { - int result = 0; - result = 31 * result + name.hashCode(); - result = 31 * result + getFactTable().hashCode(); - return result; - } - - @Override - public String toString() { - return "DataModelDesc [name=" + name + "]"; - } - - public String getResourcePath() { - return concatResourcePath(name); - } - - public static String concatResourcePath(String descName) { - return ResourceStore.DATA_MODEL_DESC_RESOURCE_ROOT + "/" + descName + MetadataConstants.FILE_SURFIX; - } - - public List<ModelDimensionDesc> getDimensions() { - return dimensions; - } - - public String[] getMetrics() { - return metrics; - } - - public void setDimensions(List<ModelDimensionDesc> dimensions) { - this.dimensions = dimensions; - } - - public void setMetrics(String[] metrics) { - this.metrics = metrics; - } - - public static DataModelDesc getCopyOf(DataModelDesc dataModelDesc) { - DataModelDesc newDataModelDesc = new DataModelDesc(); - newDataModelDesc.setName(dataModelDesc.getName()); - newDataModelDesc.setDescription(dataModelDesc.getDescription()); - newDataModelDesc.setDimensions(dataModelDesc.getDimensions()); - newDataModelDesc.setFilterCondition(dataModelDesc.getFilterCondition()); - newDataModelDesc.setFactTable(dataModelDesc.getFactTable()); - newDataModelDesc.setLookups(dataModelDesc.getLookups()); - newDataModelDesc.setMetrics(dataModelDesc.getMetrics()); - newDataModelDesc.setPartitionDesc(PartitionDesc.getCopyOf(dataModelDesc.getPartitionDesc())); - newDataModelDesc.updateRandomUuid(); - return newDataModelDesc; - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metadata.model; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.persistence.RootPersistentEntity; +import org.apache.kylin.common.util.StringUtil; +import org.apache.kylin.metadata.MetadataConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +@SuppressWarnings("serial") +@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) +public class DataModelDesc extends RootPersistentEntity { + private static final Logger logger = LoggerFactory.getLogger(DataModelDesc.class); + public static enum RealizationCapacity { + SMALL, MEDIUM, LARGE + } + + private KylinConfig config; + + @JsonProperty("name") + private String name; + + @JsonProperty("owner") + private String owner; + + @JsonProperty("description") + private String description; + + @JsonProperty("fact_table") + private String factTable; + + @JsonProperty("lookups") + private LookupDesc[] lookups; + + @JsonProperty("dimensions") + private List<ModelDimensionDesc> dimensions; + + @JsonProperty("metrics") + private String[] metrics; + + @JsonProperty("filter_condition") + private String filterCondition; + + @JsonProperty("partition_desc") + PartitionDesc partitionDesc; + + @JsonProperty("capacity") + private RealizationCapacity capacity = RealizationCapacity.MEDIUM; + + private TableDesc factTableDesc; + + private List<TableDesc> lookupTableDescs = Lists.newArrayList(); + + /** + * Error messages during resolving json metadata + */ + private List<String> errors = new ArrayList<String>(); + + public KylinConfig getConfig() { + return config; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getOwner() { + return owner; + } + + public void setOwner(String owner) { + this.owner = owner; + } + + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + + public Collection<String> getAllTables() { + HashSet<String> ret = Sets.newHashSet(); + ret.add(factTable); + for (LookupDesc lookupDesc : lookups) + ret.add(lookupDesc.getTable()); + return ret; + } + + public String getFactTable() { + return factTable; + } + + public TableDesc getFactTableDesc() { + return factTableDesc; + } + + public List<TableDesc> getLookupTableDescs() { + return lookupTableDescs; + } + + public void setFactTable(String factTable) { + this.factTable = factTable.toUpperCase(); + } + + public LookupDesc[] getLookups() { + return lookups; + } + + public void setLookups(LookupDesc[] lookups) { + this.lookups = lookups; + } + + public boolean isFactTable(String factTable) { + return this.factTable.equalsIgnoreCase(factTable); + } + + public String getFilterCondition() { + return filterCondition; + } + + public void setFilterCondition(String filterCondition) { + this.filterCondition = filterCondition; + } + + public PartitionDesc getPartitionDesc() { + return partitionDesc; + } + + public void setPartitionDesc(PartitionDesc partitionDesc) { + this.partitionDesc = partitionDesc; + } + + public RealizationCapacity getCapacity() { + return capacity; + } + + public void setCapacity(RealizationCapacity capacity) { + this.capacity = capacity; + } + + public TblColRef findPKByFK(TblColRef fk, String joinType) { + assert isFactTable(fk.getTable()); + + TblColRef candidate = null; + + for (LookupDesc dim : lookups) { + JoinDesc join = dim.getJoin(); + if (join == null) + continue; + + if (joinType != null && !joinType.equals(join.getType())) + continue; + + int find = ArrayUtils.indexOf(join.getForeignKeyColumns(), fk); + if (find >= 0) { + candidate = join.getPrimaryKeyColumns()[find]; + if (join.getForeignKeyColumns().length == 1) { // is single + // column join? + break; + } + } + } + return candidate; + } + + // TODO let this replace CubeDesc.buildColumnNameAbbreviation() + public ColumnDesc findColumn(String column) { + ColumnDesc colDesc = null; + + int cut = column.lastIndexOf('.'); + if (cut > 0) { + // table specified + String table = column.substring(0, cut); + TableDesc tableDesc = findTable(table); + colDesc = tableDesc.findColumnByName(column.substring(cut + 1)); + } else { + // table not specified, try each table + colDesc = factTableDesc.findColumnByName(column); + if (colDesc == null) { + for (TableDesc tableDesc : lookupTableDescs) { + colDesc = tableDesc.findColumnByName(column); + if (colDesc != null) + break; + } + } + } + + if (colDesc == null) + throw new IllegalArgumentException("Column not found by " + column); + + return colDesc; + } + + public TableDesc findTable(String table) { + if (factTableDesc.getName().equalsIgnoreCase(table) || factTableDesc.getIdentity().equalsIgnoreCase(table)) + return factTableDesc; + + for (TableDesc desc : lookupTableDescs) { + if (desc.getName().equalsIgnoreCase(table) || desc.getIdentity().equalsIgnoreCase(table)) + return desc; + } + + throw new IllegalArgumentException("Table not found by " + table); + } + + public void init(KylinConfig config, Map<String, TableDesc> tables) { + this.config = config; + this.factTable = this.factTable.toUpperCase(); + this.factTableDesc = tables.get(this.factTable.toUpperCase()); + if (factTableDesc == null) { + throw new IllegalStateException("Fact table does not exist:" + this.factTable); + } + + initJoinColumns(tables); + ModelDimensionDesc.capicalizeStrings(dimensions); + initPartitionDesc(tables); + } + + private void initPartitionDesc(Map<String, TableDesc> tables) { + if (this.partitionDesc != null) + this.partitionDesc.init(tables); + } + + private void initJoinColumns(Map<String, TableDesc> tables) { + // join columns may or may not present in cube; + // here we don't modify 'allColumns' and 'dimensionColumns'; + // initDimensionColumns() will do the update + for (LookupDesc lookup : this.lookups) { + lookup.setTable(lookup.getTable().toUpperCase()); + TableDesc dimTable = tables.get(lookup.getTable()); + if (dimTable == null) { + throw new IllegalStateException("Table " + lookup.getTable() + " does not exist for " + this); + } + this.lookupTableDescs.add(dimTable); + + JoinDesc join = lookup.getJoin(); + if (join == null) + continue; + + StringUtil.toUpperCaseArray(join.getForeignKey(), join.getForeignKey()); + StringUtil.toUpperCaseArray(join.getPrimaryKey(), join.getPrimaryKey()); + + // primary key + String[] pks = join.getPrimaryKey(); + TblColRef[] pkCols = new TblColRef[pks.length]; + for (int i = 0; i < pks.length; i++) { + ColumnDesc col = dimTable.findColumnByName(pks[i]); + if (col == null) { + throw new IllegalStateException("Can't find column " + pks[i] + " in table " + dimTable.getIdentity()); + } + TblColRef colRef = new TblColRef(col); + pks[i] = colRef.getName(); + pkCols[i] = colRef; + } + join.setPrimaryKeyColumns(pkCols); + + // foreign key + String[] fks = join.getForeignKey(); + TblColRef[] fkCols = new TblColRef[fks.length]; + for (int i = 0; i < fks.length; i++) { + ColumnDesc col = factTableDesc.findColumnByName(fks[i]); + if (col == null) { + throw new IllegalStateException("Can't find column " + fks[i] + " in table " + this.getFactTable()); + } + TblColRef colRef = new TblColRef(col); + fks[i] = colRef.getName(); + fkCols[i] = colRef; + } + join.setForeignKeyColumns(fkCols); + + // Validate join in dimension + if (pkCols.length != fkCols.length) { + throw new IllegalStateException("Primary keys(" + lookup.getTable() + ")" + Arrays.toString(pks) + " are not consistent with Foreign keys(" + this.getFactTable() + ") " + Arrays.toString(fks)); + } + for (int i = 0; i < fkCols.length; i++) { + if (!fkCols[i].getDatatype().equals(pkCols[i].getDatatype())) { + final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + final String msg = "Primary key " + lookup.getTable() + "." + pkCols[i].getName() + "." + pkCols[i].getDatatype() + " are not consistent with Foreign key " + this.getFactTable() + "." + fkCols[i].getName() + "." + fkCols[i].getDatatype(); + if (kylinConfig.getTableJoinTypeCheck() == true) { + throw new IllegalStateException(msg); + } else { + logger.warn(msg); + } + } + } + + } + } + + /** * Add error info and thrown exception out + * + * @param message + */ + public void addError(String message) { + addError(message, false); + } + + /** + * @param message + * error message + * @param silent + * if throw exception + */ + public void addError(String message, boolean silent) { + if (!silent) { + throw new IllegalStateException(message); + } else { + this.errors.add(message); + } + } + + public List<String> getError() { + return this.errors; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + DataModelDesc modelDesc = (DataModelDesc) o; + + if (!name.equals(modelDesc.name)) + return false; + if (!getFactTable().equals(modelDesc.getFactTable())) + return false; + + return true; + } + + @Override + public int hashCode() { + int result = 0; + result = 31 * result + name.hashCode(); + result = 31 * result + getFactTable().hashCode(); + return result; + } + + @Override + public String toString() { + return "DataModelDesc [name=" + name + "]"; + } + + public String getResourcePath() { + return concatResourcePath(name); + } + + public static String concatResourcePath(String descName) { + return ResourceStore.DATA_MODEL_DESC_RESOURCE_ROOT + "/" + descName + MetadataConstants.FILE_SURFIX; + } + + public List<ModelDimensionDesc> getDimensions() { + return dimensions; + } + + public String[] getMetrics() { + return metrics; + } + + public void setDimensions(List<ModelDimensionDesc> dimensions) { + this.dimensions = dimensions; + } + + public void setMetrics(String[] metrics) { + this.metrics = metrics; + } + + public static DataModelDesc getCopyOf(DataModelDesc dataModelDesc) { + DataModelDesc newDataModelDesc = new DataModelDesc(); + newDataModelDesc.setName(dataModelDesc.getName()); + newDataModelDesc.setDescription(dataModelDesc.getDescription()); + newDataModelDesc.setDimensions(dataModelDesc.getDimensions()); + newDataModelDesc.setFilterCondition(dataModelDesc.getFilterCondition()); + newDataModelDesc.setFactTable(dataModelDesc.getFactTable()); + newDataModelDesc.setLookups(dataModelDesc.getLookups()); + newDataModelDesc.setMetrics(dataModelDesc.getMetrics()); + newDataModelDesc.setPartitionDesc(PartitionDesc.getCopyOf(dataModelDesc.getPartitionDesc())); + newDataModelDesc.updateRandomUuid(); + return newDataModelDesc; + } +}