http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/solr/Solr5Index.java ---------------------------------------------------------------------- diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/solr/Solr5Index.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/solr/Solr5Index.java new file mode 100644 index 0000000..3b5620c --- /dev/null +++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/solr/Solr5Index.java @@ -0,0 +1,975 @@ +/* + * Copyright 2012-2013 Aurelius LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.thinkaurelius.titan.diskstorage.solr; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; +import com.thinkaurelius.titan.core.Order; +import com.thinkaurelius.titan.core.TitanElement; +import com.thinkaurelius.titan.core.attribute.Cmp; +import com.thinkaurelius.titan.core.attribute.Geo; +import com.thinkaurelius.titan.core.attribute.Geoshape; +import com.thinkaurelius.titan.core.attribute.Text; +import com.thinkaurelius.titan.core.schema.Mapping; +import com.thinkaurelius.titan.diskstorage.BackendException; +import com.thinkaurelius.titan.diskstorage.BaseTransaction; +import com.thinkaurelius.titan.diskstorage.BaseTransactionConfig; +import com.thinkaurelius.titan.diskstorage.BaseTransactionConfigurable; +import com.thinkaurelius.titan.diskstorage.PermanentBackendException; +import com.thinkaurelius.titan.diskstorage.TemporaryBackendException; +import com.thinkaurelius.titan.diskstorage.configuration.ConfigNamespace; +import com.thinkaurelius.titan.diskstorage.configuration.ConfigOption; +import com.thinkaurelius.titan.diskstorage.configuration.Configuration; +import com.thinkaurelius.titan.diskstorage.indexing.IndexEntry; +import com.thinkaurelius.titan.diskstorage.indexing.IndexFeatures; +import com.thinkaurelius.titan.diskstorage.indexing.IndexMutation; +import com.thinkaurelius.titan.diskstorage.indexing.IndexProvider; +import com.thinkaurelius.titan.diskstorage.indexing.IndexQuery; +import com.thinkaurelius.titan.diskstorage.indexing.KeyInformation; +import com.thinkaurelius.titan.diskstorage.indexing.RawQuery; +import com.thinkaurelius.titan.diskstorage.util.DefaultTransaction; +import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration; +import com.thinkaurelius.titan.graphdb.configuration.PreInitializeConfigOptions; +import com.thinkaurelius.titan.graphdb.database.serialize.AttributeUtil; +import com.thinkaurelius.titan.graphdb.database.serialize.attribute.AbstractDecimal; +import com.thinkaurelius.titan.graphdb.query.TitanPredicate; +import com.thinkaurelius.titan.graphdb.query.condition.And; +import com.thinkaurelius.titan.graphdb.query.condition.Condition; +import com.thinkaurelius.titan.graphdb.query.condition.Not; +import com.thinkaurelius.titan.graphdb.query.condition.Or; +import com.thinkaurelius.titan.graphdb.query.condition.PredicateCondition; +import com.thinkaurelius.titan.graphdb.types.ParameterType; +import org.apache.commons.lang.StringUtils; +import org.apache.http.client.HttpClient; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.impl.HttpClientUtil; +import org.apache.solr.client.solrj.impl.HttpSolrClient; +import org.apache.solr.client.solrj.impl.Krb5HttpClientConfigurer; +import org.apache.solr.client.solrj.impl.LBHttpSolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.response.CollectionAdminResponse; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.client.solrj.util.ClientUtils; +import org.apache.solr.common.SolrDocument; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.TimeZone; +import java.util.UUID; + +import static com.thinkaurelius.titan.core.attribute.Cmp.*; +import static com.thinkaurelius.titan.core.schema.Mapping.*; + +/** + * NOTE: Copied from titan for supporting sol5. Do not change + */ +@PreInitializeConfigOptions +public class Solr5Index implements IndexProvider { + + private static final Logger logger = LoggerFactory.getLogger(Solr5Index.class); + + + private static final String DEFAULT_ID_FIELD = "id"; + + private enum Mode { + HTTP, CLOUD; + + public static Mode parse(String mode) { + for (Mode m : Mode.values()) { + if (m.toString().equalsIgnoreCase(mode)) return m; + } + throw new IllegalArgumentException("Unrecognized mode: "+mode); + } + } + + public static final ConfigNamespace SOLR_NS = + new ConfigNamespace(GraphDatabaseConfiguration.INDEX_NS, "solr", "Solr index configuration"); + + public static final ConfigOption<String> SOLR_MODE = new ConfigOption<String>(SOLR_NS,"mode", + "The operation mode for Solr which is either via HTTP (`http`) or using SolrCloud (`cloud`)", + ConfigOption.Type.GLOBAL_OFFLINE, "cloud"); + + public static final ConfigOption<Boolean> DYNAMIC_FIELDS = new ConfigOption<Boolean>(SOLR_NS,"dyn-fields", + "Whether to use dynamic fields (which appends the data type to the field name). If dynamic fields is disabled" + + "the user must map field names and define them explicitly in the schema.", + ConfigOption.Type.GLOBAL_OFFLINE, true); + + public static final ConfigOption<String[]> KEY_FIELD_NAMES = new ConfigOption<String[]>(SOLR_NS,"key-field-names", + "Field name that uniquely identifies each document in Solr. Must be specified as a list of `collection=field`.", + ConfigOption.Type.GLOBAL, String[].class); + + public static final ConfigOption<String> TTL_FIELD = new ConfigOption<String>(SOLR_NS,"ttl_field", + "Name of the TTL field for Solr collections.", + ConfigOption.Type.GLOBAL_OFFLINE, "ttl"); + + /** SolrCloud Configuration */ + + public static final ConfigOption<String> ZOOKEEPER_URL = new ConfigOption<String>(SOLR_NS,"zookeeper-url", + "URL of the Zookeeper instance coordinating the SolrCloud cluster", + ConfigOption.Type.MASKABLE, "localhost:2181"); + + public static final ConfigOption<Integer> NUM_SHARDS = new ConfigOption<Integer>(SOLR_NS,"num-shards", + "Number of shards for a collection. This applies when creating a new collection which is only supported under the SolrCloud operation mode.", + ConfigOption.Type.GLOBAL_OFFLINE, 1); + + public static final ConfigOption<Integer> MAX_SHARDS_PER_NODE = new ConfigOption<Integer>(SOLR_NS,"max-shards-per-node", + "Maximum number of shards per node. This applies when creating a new collection which is only supported under the SolrCloud operation mode.", + ConfigOption.Type.GLOBAL_OFFLINE, 1); + + public static final ConfigOption<Integer> REPLICATION_FACTOR = new ConfigOption<Integer>(SOLR_NS,"replication-factor", + "Replication factor for a collection. This applies when creating a new collection which is only supported under the SolrCloud operation mode.", + ConfigOption.Type.GLOBAL_OFFLINE, 1); + + + /** HTTP Configuration */ + + public static final ConfigOption<String[]> HTTP_URLS = new ConfigOption<String[]>(SOLR_NS,"http-urls", + "List of URLs to use to connect to Solr Servers (LBHttpSolrClient is used), don't add core or collection name to the URL.", + ConfigOption.Type.MASKABLE, new String[] { "http://localhost:8983/solr" }); + + public static final ConfigOption<Integer> HTTP_CONNECTION_TIMEOUT = new ConfigOption<Integer>(SOLR_NS,"http-connection-timeout", + "Solr HTTP connection timeout.", + ConfigOption.Type.MASKABLE, 5000); + + public static final ConfigOption<Boolean> HTTP_ALLOW_COMPRESSION = new ConfigOption<Boolean>(SOLR_NS,"http-compression", + "Enable/disable compression on the HTTP connections made to Solr.", + ConfigOption.Type.MASKABLE, false); + + public static final ConfigOption<Integer> HTTP_MAX_CONNECTIONS_PER_HOST = new ConfigOption<Integer>(SOLR_NS,"http-max-per-host", + "Maximum number of HTTP connections per Solr host.", + ConfigOption.Type.MASKABLE, 20); + + public static final ConfigOption<Integer> HTTP_GLOBAL_MAX_CONNECTIONS = new ConfigOption<Integer>(SOLR_NS,"http-max", + "Maximum number of HTTP connections in total to all Solr servers.", + ConfigOption.Type.MASKABLE, 100); + + public static final ConfigOption<Boolean> WAIT_SEARCHER = new ConfigOption<Boolean>(SOLR_NS, "wait-searcher", + "When mutating - wait for the index to reflect new mutations before returning. This can have a negative impact on performance.", + ConfigOption.Type.LOCAL, false); + + + + private static final IndexFeatures SOLR_FEATURES = new IndexFeatures.Builder().supportsDocumentTTL() + .setDefaultStringMapping(TEXT).supportedStringMappings(TEXT, STRING).build(); + + private final SolrClient solrClient; + private final Configuration configuration; + private final Mode mode; + private final boolean dynFields; + private final Map<String, String> keyFieldIds; + private final String ttlField; + private final int maxResults; + private final boolean waitSearcher; + + public Solr5Index(final Configuration config) throws BackendException { + Preconditions.checkArgument(config!=null); + configuration = config; + + mode = Mode.parse(config.get(SOLR_MODE)); + dynFields = config.get(DYNAMIC_FIELDS); + keyFieldIds = parseKeyFieldsForCollections(config); + maxResults = config.get(GraphDatabaseConfiguration.INDEX_MAX_RESULT_SET_SIZE); + ttlField = config.get(TTL_FIELD); + waitSearcher = config.get(WAIT_SEARCHER); + + if (mode==Mode.CLOUD) { + HttpClientUtil.setConfigurer(new Krb5HttpClientConfigurer()); + String zookeeperUrl = config.get(Solr5Index.ZOOKEEPER_URL); + CloudSolrClient cloudServer = new CloudSolrClient(zookeeperUrl, true); + cloudServer.connect(); + solrClient = cloudServer; + } else if (mode==Mode.HTTP) { + HttpClientUtil.setConfigurer(new Krb5HttpClientConfigurer()); + HttpClient clientParams = HttpClientUtil.createClient(new ModifiableSolrParams() {{ + add(HttpClientUtil.PROP_ALLOW_COMPRESSION, config.get(HTTP_ALLOW_COMPRESSION).toString()); + add(HttpClientUtil.PROP_CONNECTION_TIMEOUT, config.get(HTTP_CONNECTION_TIMEOUT).toString()); + add(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, config.get(HTTP_MAX_CONNECTIONS_PER_HOST).toString()); + add(HttpClientUtil.PROP_MAX_CONNECTIONS, config.get(HTTP_GLOBAL_MAX_CONNECTIONS).toString()); + }}); + + solrClient = new LBHttpSolrClient(clientParams, config.get(HTTP_URLS)); + + + } else { + throw new IllegalArgumentException("Unsupported Solr operation mode: " + mode); + } + } + + private Map<String, String> parseKeyFieldsForCollections(Configuration config) throws BackendException { + Map<String, String> keyFieldNames = new HashMap<String, String>(); + String[] collectionFieldStatements = config.has(KEY_FIELD_NAMES)?config.get(KEY_FIELD_NAMES):new String[0]; + for (String collectionFieldStatement : collectionFieldStatements) { + String[] parts = collectionFieldStatement.trim().split("="); + if (parts.length != 2) { + throw new PermanentBackendException("Unable to parse the collection name / key field name pair. It should be of the format collection=field"); + } + String collectionName = parts[0]; + String keyFieldName = parts[1]; + keyFieldNames.put(collectionName, keyFieldName); + } + return keyFieldNames; + } + + private String getKeyFieldId(String collection) { + String field = keyFieldIds.get(collection); + if (field==null) field = DEFAULT_ID_FIELD; + return field; + } + + /** + * Unlike the ElasticSearch Index, which is schema free, Solr requires a schema to + * support searching. This means that you will need to modify the solr schema with the + * appropriate field definitions in order to work properly. If you have a running instance + * of Solr and you modify its schema with new fields, don't forget to re-index! + * @param store Index store + * @param key New key to register + * @param information Datatype to register for the key + * @param tx enclosing transaction + * @throws com.thinkaurelius.titan.diskstorage.BackendException + */ + @Override + public void register(String store, String key, KeyInformation information, BaseTransaction tx) throws BackendException { + if (mode==Mode.CLOUD) { + CloudSolrClient client = (CloudSolrClient) solrClient; + try { + createCollectionIfNotExists(client, configuration, store); + } catch (IOException e) { + throw new PermanentBackendException(e); + } catch (SolrServerException e) { + throw new PermanentBackendException(e); + } catch (InterruptedException e) { + throw new PermanentBackendException(e); + } catch (KeeperException e) { + throw new PermanentBackendException(e); + } + } + //Since all data types must be defined in the schema.xml, pre-registering a type does not work + } + + @Override + public void mutate(Map<String, Map<String, IndexMutation>> mutations, KeyInformation.IndexRetriever informations, BaseTransaction tx) throws BackendException { + logger.debug("Mutating SOLR"); + try { + for (Map.Entry<String, Map<String, IndexMutation>> stores : mutations.entrySet()) { + String collectionName = stores.getKey(); + String keyIdField = getKeyFieldId(collectionName); + + List<String> deleteIds = new ArrayList<String>(); + Collection<SolrInputDocument> changes = new ArrayList<SolrInputDocument>(); + + for (Map.Entry<String, IndexMutation> entry : stores.getValue().entrySet()) { + String docId = entry.getKey(); + IndexMutation mutation = entry.getValue(); + Preconditions.checkArgument(!(mutation.isNew() && mutation.isDeleted())); + Preconditions.checkArgument(!mutation.isNew() || !mutation.hasDeletions()); + Preconditions.checkArgument(!mutation.isDeleted() || !mutation.hasAdditions()); + + //Handle any deletions + if (mutation.hasDeletions()) { + if (mutation.isDeleted()) { + logger.trace("Deleting entire document {}", docId); + deleteIds.add(docId); + } else { + HashSet<IndexEntry> fieldDeletions = Sets.newHashSet(mutation.getDeletions()); + if (mutation.hasAdditions()) { + for (IndexEntry indexEntry : mutation.getAdditions()) { + fieldDeletions.remove(indexEntry); + } + } + deleteIndividualFieldsFromIndex(collectionName, keyIdField, docId, fieldDeletions); + } + } + + if (mutation.hasAdditions()) { + int ttl = mutation.determineTTL(); + + SolrInputDocument doc = new SolrInputDocument(); + doc.setField(keyIdField, docId); + + boolean isNewDoc = mutation.isNew(); + + if (isNewDoc) + logger.trace("Adding new document {}", docId); + + for (IndexEntry e : mutation.getAdditions()) { + final Object fieldValue = convertValue(e.value); + doc.setField(e.field, isNewDoc + ? fieldValue : new HashMap<String, Object>(1) {{ put("set", fieldValue); }}); + } + if (ttl>0) { + Preconditions.checkArgument(isNewDoc,"Solr only supports TTL on new documents [%s]",docId); + doc.setField(ttlField, String.format("+%dSECONDS", ttl)); + } + changes.add(doc); + } + } + + commitDeletes(collectionName, deleteIds); + commitDocumentChanges(collectionName, changes); + } + } catch (Exception e) { + throw storageException(e); + } + } + + private Object convertValue(Object value) throws BackendException { + if (value instanceof Geoshape) + return GeoToWktConverter.convertToWktString((Geoshape) value); + // in order to serialize/deserialize properly Solr will have to have an + // access to Titan source which has Decimal type, so for now we simply convert to + // double and let Solr do the same thing or fail. + if (value instanceof AbstractDecimal) + return ((AbstractDecimal) value).doubleValue(); + if (value instanceof UUID) + return value.toString(); + return value; + } + + @Override + public void restore(Map<String, Map<String, List<IndexEntry>>> documents, KeyInformation.IndexRetriever informations, BaseTransaction tx) throws BackendException { + try { + for (Map.Entry<String, Map<String, List<IndexEntry>>> stores : documents.entrySet()) { + final String collectionName = stores.getKey(); + + List<String> deleteIds = new ArrayList<String>(); + List<SolrInputDocument> newDocuments = new ArrayList<SolrInputDocument>(); + + for (Map.Entry<String, List<IndexEntry>> entry : stores.getValue().entrySet()) { + final String docID = entry.getKey(); + final List<IndexEntry> content = entry.getValue(); + + if (content == null || content.isEmpty()) { + if (logger.isTraceEnabled()) + logger.trace("Deleting document [{}]", docID); + + deleteIds.add(docID); + continue; + } + + newDocuments.add(new SolrInputDocument() {{ + setField(getKeyFieldId(collectionName), docID); + + for (IndexEntry addition : content) { + Object fieldValue = addition.value; + setField(addition.field, convertValue(fieldValue)); + } + }}); + } + + commitDeletes(collectionName, deleteIds); + commitDocumentChanges(collectionName, newDocuments); + } + } catch (Exception e) { + throw new TemporaryBackendException("Could not restore Solr index", e); + } + } + + private void deleteIndividualFieldsFromIndex(String collectionName, String keyIdField, String docId, HashSet<IndexEntry> fieldDeletions) throws SolrServerException, IOException { + if (fieldDeletions.isEmpty()) return; + + Map<String, String> fieldDeletes = new HashMap<String, String>(1) {{ put("set", null); }}; + + SolrInputDocument doc = new SolrInputDocument(); + doc.addField(keyIdField, docId); + StringBuilder sb = new StringBuilder(); + for (IndexEntry fieldToDelete : fieldDeletions) { + doc.addField(fieldToDelete.field, fieldDeletes); + sb.append(fieldToDelete).append(","); + } + + if (logger.isTraceEnabled()) + logger.trace("Deleting individual fields [{}] for document {}", sb.toString(), docId); + + UpdateRequest singleDocument = newUpdateRequest(); + singleDocument.add(doc); + solrClient.request(singleDocument, collectionName); + } + + private void commitDocumentChanges(String collectionName, Collection<SolrInputDocument> documents) throws SolrServerException, IOException { + if (documents.size() == 0) return; + + try { + solrClient.request(newUpdateRequest().add(documents), collectionName); + } catch (HttpSolrClient.RemoteSolrException rse) { + logger.error("Unable to save documents to Solr as one of the shape objects stored were not compatible with Solr.", rse); + logger.error("Details in failed document batch: "); + for (SolrInputDocument d : documents) { + Collection<String> fieldNames = d.getFieldNames(); + for (String name : fieldNames) { + logger.error(name + ":" + d.getFieldValue(name).toString()); + } + } + + throw rse; + } + } + + private void commitDeletes(String collectionName, List<String> deleteIds) throws SolrServerException, IOException { + if (deleteIds.size() == 0) return; + solrClient.request(newUpdateRequest().deleteById(deleteIds), collectionName); + } + + @Override + public List<String> query(IndexQuery query, KeyInformation.IndexRetriever informations, BaseTransaction tx) throws BackendException { + List<String> result; + String collection = query.getStore(); + String keyIdField = getKeyFieldId(collection); + SolrQuery solrQuery = new SolrQuery("*:*"); + String queryFilter = buildQueryFilter(query.getCondition(), informations.get(collection)); + solrQuery.addFilterQuery(queryFilter); + if (!query.getOrder().isEmpty()) { + List<IndexQuery.OrderEntry> orders = query.getOrder(); + for (IndexQuery.OrderEntry order1 : orders) { + String item = order1.getKey(); + SolrQuery.ORDER order = order1.getOrder() == Order.ASC ? SolrQuery.ORDER.asc : SolrQuery.ORDER.desc; + solrQuery.addSort(new SolrQuery.SortClause(item, order)); + } + } + solrQuery.setStart(0); + if (query.hasLimit()) { + solrQuery.setRows(query.getLimit()); + } else { + solrQuery.setRows(maxResults); + } + try { + QueryResponse response = solrClient.query(collection, solrQuery); + + if (logger.isDebugEnabled()) + logger.debug("Executed query [{}] in {} ms", query.getCondition(), response.getElapsedTime()); + + int totalHits = response.getResults().size(); + + if (!query.hasLimit() && totalHits >= maxResults) + logger.warn("Query result set truncated to first [{}] elements for query: {}", maxResults, query); + + result = new ArrayList<String>(totalHits); + for (SolrDocument hit : response.getResults()) { + result.add(hit.getFieldValue(keyIdField).toString()); + } + } catch (IOException e) { + logger.error("Query did not complete : ", e); + throw new PermanentBackendException(e); + } catch (SolrServerException e) { + logger.error("Unable to query Solr index.", e); + throw new PermanentBackendException(e); + } + return result; + } + + @Override + public Iterable<RawQuery.Result<String>> query(RawQuery query, KeyInformation.IndexRetriever informations, BaseTransaction tx) throws BackendException { + List<RawQuery.Result<String>> result; + String collection = query.getStore(); + String keyIdField = getKeyFieldId(collection); + SolrQuery solrQuery = new SolrQuery(query.getQuery()) + .addField(keyIdField) + .setIncludeScore(true) + .setStart(query.getOffset()) + .setRows(query.hasLimit() ? query.getLimit() : maxResults); + + try { + QueryResponse response = solrClient.query(collection, solrQuery); + if (logger.isDebugEnabled()) + logger.debug("Executed query [{}] in {} ms", query.getQuery(), response.getElapsedTime()); + + int totalHits = response.getResults().size(); + if (!query.hasLimit() && totalHits >= maxResults) { + logger.warn("Query result set truncated to first [{}] elements for query: {}", maxResults, query); + } + result = new ArrayList<RawQuery.Result<String>>(totalHits); + + for (SolrDocument hit : response.getResults()) { + double score = Double.parseDouble(hit.getFieldValue("score").toString()); + result.add(new RawQuery.Result<String>(hit.getFieldValue(keyIdField).toString(), score)); + } + } catch (IOException e) { + logger.error("Query did not complete : ", e); + throw new PermanentBackendException(e); + } catch (SolrServerException e) { + logger.error("Unable to query Solr index.", e); + throw new PermanentBackendException(e); + } + return result; + } + + private static String escapeValue(Object value) { + return ClientUtils.escapeQueryChars(value.toString()); + } + + public String buildQueryFilter(Condition<TitanElement> condition, KeyInformation.StoreRetriever informations) { + if (condition instanceof PredicateCondition) { + PredicateCondition<String, TitanElement> atom = (PredicateCondition<String, TitanElement>) condition; + Object value = atom.getValue(); + String key = atom.getKey(); + TitanPredicate titanPredicate = atom.getPredicate(); + + if (value instanceof Number) { + String queryValue = escapeValue(value); + Preconditions.checkArgument(titanPredicate instanceof Cmp, "Relation not supported on numeric types: " + titanPredicate); + Cmp numRel = (Cmp) titanPredicate; + switch (numRel) { + case EQUAL: + return (key + ":" + queryValue); + case NOT_EQUAL: + return ("-" + key + ":" + queryValue); + case LESS_THAN: + //use right curly to mean up to but not including value + return (key + ":[* TO " + queryValue + "}"); + case LESS_THAN_EQUAL: + return (key + ":[* TO " + queryValue + "]"); + case GREATER_THAN: + //use left curly to mean greater than but not including value + return (key + ":{" + queryValue + " TO *]"); + case GREATER_THAN_EQUAL: + return (key + ":[" + queryValue + " TO *]"); + default: throw new IllegalArgumentException("Unexpected relation: " + numRel); + } + } else if (value instanceof String) { + Mapping map = getStringMapping(informations.get(key)); + assert map== TEXT || map== STRING; + if (map== TEXT && !titanPredicate.toString().startsWith("CONTAINS")) + throw new IllegalArgumentException("Text mapped string values only support CONTAINS queries and not: " + titanPredicate); + if (map== STRING && titanPredicate.toString().startsWith("CONTAINS")) + throw new IllegalArgumentException("String mapped string values do not support CONTAINS queries: " + titanPredicate); + + //Special case + if (titanPredicate == Text.CONTAINS) { + //e.g. - if terms tomorrow and world were supplied, and fq=text:(tomorrow world) + //sample data set would return 2 documents: one where text = Tomorrow is the World, + //and the second where text = Hello World. Hence, we are decomposing the query string + //and building an AND query explicitly because we need AND semantics + value = ((String) value).toLowerCase(); + List<String> terms = Text.tokenize((String) value); + + if (terms.isEmpty()) { + return ""; + } else if (terms.size() == 1) { + return (key + ":(" + escapeValue(terms.get(0)) + ")"); + } else { + And<TitanElement> andTerms = new And<TitanElement>(); + for (String term : terms) { + andTerms.add(new PredicateCondition<String, TitanElement>(key, titanPredicate, term)); + } + return buildQueryFilter(andTerms, informations); + } + } + if (titanPredicate == Text.PREFIX || titanPredicate == Text.CONTAINS_PREFIX) { + return (key + ":" + escapeValue(value) + "*"); + } else if (titanPredicate == Text.REGEX || titanPredicate == Text.CONTAINS_REGEX) { + return (key + ":/" + value + "/"); + } else if (titanPredicate == EQUAL) { + return (key + ":\"" + escapeValue(value) + "\""); + } else if (titanPredicate == NOT_EQUAL) { + return ("-" + key + ":\"" + escapeValue(value) + "\""); + } else { + throw new IllegalArgumentException("Relation is not supported for string value: " + titanPredicate); + } + } else if (value instanceof Geoshape) { + Geoshape geo = (Geoshape)value; + if (geo.getType() == Geoshape.Type.CIRCLE) { + Geoshape.Point center = geo.getPoint(); + return ("{!geofilt sfield=" + key + + " pt=" + center.getLatitude() + "," + center.getLongitude() + + " d=" + geo.getRadius() + "} distErrPct=0"); //distance in kilometers + } else if (geo.getType() == Geoshape.Type.BOX) { + Geoshape.Point southwest = geo.getPoint(0); + Geoshape.Point northeast = geo.getPoint(1); + return (key + ":[" + southwest.getLatitude() + "," + southwest.getLongitude() + + " TO " + northeast.getLatitude() + "," + northeast.getLongitude() + "]"); + } else if (geo.getType() == Geoshape.Type.POLYGON) { + List<Geoshape.Point> coordinates = getPolygonPoints(geo); + StringBuilder poly = new StringBuilder(key + ":\"IsWithin(POLYGON(("); + for (Geoshape.Point coordinate : coordinates) { + poly.append(coordinate.getLongitude()).append(" ").append(coordinate.getLatitude()).append(", "); + } + //close the polygon with the first coordinate + poly.append(coordinates.get(0).getLongitude()).append(" ").append(coordinates.get(0).getLatitude()); + poly.append(")))\" distErrPct=0"); + return (poly.toString()); + } + } else if (value instanceof Date) { + String queryValue = escapeValue(toIsoDate((Date)value)); + Preconditions.checkArgument(titanPredicate instanceof Cmp, "Relation not supported on date types: " + titanPredicate); + Cmp numRel = (Cmp) titanPredicate; + + switch (numRel) { + case EQUAL: + return (key + ":" + queryValue); + case NOT_EQUAL: + return ("-" + key + ":" + queryValue); + case LESS_THAN: + //use right curly to mean up to but not including value + return (key + ":[* TO " + queryValue + "}"); + case LESS_THAN_EQUAL: + return (key + ":[* TO " + queryValue + "]"); + case GREATER_THAN: + //use left curly to mean greater than but not including value + return (key + ":{" + queryValue + " TO *]"); + case GREATER_THAN_EQUAL: + return (key + ":[" + queryValue + " TO *]"); + default: throw new IllegalArgumentException("Unexpected relation: " + numRel); + } + } else if (value instanceof Boolean) { + Cmp numRel = (Cmp) titanPredicate; + String queryValue = escapeValue(value); + switch (numRel) { + case EQUAL: + return (key + ":" + queryValue); + case NOT_EQUAL: + return ("-" + key + ":" + queryValue); + default: + throw new IllegalArgumentException("Boolean types only support EQUAL or NOT_EQUAL"); + } + } else if (value instanceof UUID) { + if (titanPredicate == EQUAL) { + return (key + ":\"" + escapeValue(value) + "\""); + } else if (titanPredicate == NOT_EQUAL) { + return ("-" + key + ":\"" + escapeValue(value) + "\""); + } else { + throw new IllegalArgumentException("Relation is not supported for uuid value: " + titanPredicate); + } + } else throw new IllegalArgumentException("Unsupported type: " + value); + } else if (condition instanceof Not) { + String sub = buildQueryFilter(((Not)condition).getChild(),informations); + if (StringUtils.isNotBlank(sub)) return "-("+sub+")"; + else return ""; + } else if (condition instanceof And) { + int numChildren = ((And) condition).size(); + StringBuilder sb = new StringBuilder(); + for (Condition<TitanElement> c : condition.getChildren()) { + String sub = buildQueryFilter(c, informations); + + if (StringUtils.isBlank(sub)) + continue; + + // we don't have to add "+" which means AND iff + // a. it's a NOT query, + // b. expression is a single statement in the AND. + if (!sub.startsWith("-") && numChildren > 1) + sb.append("+"); + + sb.append(sub).append(" "); + } + return sb.toString(); + } else if (condition instanceof Or) { + StringBuilder sb = new StringBuilder(); + int element=0; + for (Condition<TitanElement> c : condition.getChildren()) { + String sub = buildQueryFilter(c,informations); + if (StringUtils.isBlank(sub)) continue; + if (element==0) sb.append("("); + else sb.append(" OR "); + sb.append(sub); + element++; + } + if (element>0) sb.append(")"); + return sb.toString(); + } else { + throw new IllegalArgumentException("Invalid condition: " + condition); + } + return null; + } + + private String toIsoDate(Date value) { + TimeZone tz = TimeZone.getTimeZone("UTC"); + DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); + df.setTimeZone(tz); + return df.format(value); + } + + private List<Geoshape.Point> getPolygonPoints(Geoshape polygon) { + List<Geoshape.Point> locations = new ArrayList<Geoshape.Point>(); + + int index = 0; + boolean hasCoordinates = true; + while (hasCoordinates) { + try { + locations.add(polygon.getPoint(index)); + } catch (ArrayIndexOutOfBoundsException ignore) { + //just means we asked for a point past the size of the list + //of known coordinates + hasCoordinates = false; + } + } + + return locations; + } + + /** + * Solr handles all transactions on the server-side. That means all + * commit, optimize, or rollback applies since the last commit/optimize/rollback. + * Solr documentation recommends best way to update Solr is in one process to avoid + * race conditions. + * + * @return New Transaction Handle + * @throws com.thinkaurelius.titan.diskstorage.BackendException + */ + @Override + public BaseTransactionConfigurable beginTransaction(BaseTransactionConfig config) throws BackendException { + return new DefaultTransaction(config); + } + + @Override + public void close() throws BackendException { + logger.trace("Shutting down connection to Solr", solrClient); + try { + solrClient.close(); + } catch (IOException e) { + throw new TemporaryBackendException(e); + } + } + + @Override + public void clearStorage() throws BackendException { + try { + if (mode!=Mode.CLOUD) throw new UnsupportedOperationException("Operation only supported for SolrCloud"); + logger.debug("Clearing storage from Solr: {}", solrClient); + ZkStateReader zkStateReader = ((CloudSolrClient) solrClient).getZkStateReader(); + zkStateReader.updateClusterState(); + ClusterState clusterState = zkStateReader.getClusterState(); + for (String collection : clusterState.getCollections()) { + logger.debug("Clearing collection [{}] in Solr",collection); + UpdateRequest deleteAll = newUpdateRequest(); + deleteAll.deleteByQuery("*:*"); + solrClient.request(deleteAll, collection); + } + + } catch (SolrServerException e) { + logger.error("Unable to clear storage from index due to server error on Solr.", e); + throw new PermanentBackendException(e); + } catch (IOException e) { + logger.error("Unable to clear storage from index due to low-level I/O error.", e); + throw new PermanentBackendException(e); + } catch (Exception e) { + logger.error("Unable to clear storage from index due to general error.", e); + throw new PermanentBackendException(e); + } + } + + @Override + public boolean supports(KeyInformation information, TitanPredicate titanPredicate) { + Class<?> dataType = information.getDataType(); + Mapping mapping = getMapping(information); + if (mapping!= DEFAULT && !AttributeUtil.isString(dataType)) return false; + + if (Number.class.isAssignableFrom(dataType)) { + return titanPredicate instanceof Cmp; + } else if (dataType == Geoshape.class) { + return titanPredicate == Geo.WITHIN; + } else if (AttributeUtil.isString(dataType)) { + switch(mapping) { + case DEFAULT: + case TEXT: + return titanPredicate == Text.CONTAINS || titanPredicate == Text.CONTAINS_PREFIX || titanPredicate == Text.CONTAINS_REGEX; + case STRING: + return titanPredicate == EQUAL || titanPredicate== NOT_EQUAL || titanPredicate==Text.REGEX || titanPredicate==Text.PREFIX; + // case TEXTSTRING: + // return (titanPredicate instanceof Text) || titanPredicate == Cmp.EQUAL || titanPredicate==Cmp.NOT_EQUAL; + } + } else if (dataType == Date.class) { + if (titanPredicate instanceof Cmp) return true; + } else if (dataType == Boolean.class) { + return titanPredicate == EQUAL || titanPredicate == NOT_EQUAL; + } else if (dataType == UUID.class) { + return titanPredicate == EQUAL || titanPredicate== NOT_EQUAL; + } + return false; + } + + @Override + public boolean supports(KeyInformation information) { + Class<?> dataType = information.getDataType(); + Mapping mapping = getMapping(information); + if (Number.class.isAssignableFrom(dataType) || dataType == Geoshape.class || dataType == Date.class || dataType == Boolean.class || dataType == UUID.class) { + if (mapping== DEFAULT) return true; + } else if (AttributeUtil.isString(dataType)) { + if (mapping== DEFAULT || mapping== TEXT || mapping== STRING) return true; + } + return false; + } + + @Override + public String mapKey2Field(String key, KeyInformation keyInfo) { + Preconditions.checkArgument(!StringUtils.containsAny(key, new char[]{' '}),"Invalid key name provided: %s",key); + if (!dynFields) return key; + if (ParameterType.MAPPED_NAME.hasParameter(keyInfo.getParameters())) return key; + String postfix; + Class datatype = keyInfo.getDataType(); + if (AttributeUtil.isString(datatype)) { + Mapping map = getStringMapping(keyInfo); + switch (map) { + case TEXT: postfix = "_t"; break; + case STRING: postfix = "_s"; break; + default: throw new IllegalArgumentException("Unsupported string mapping: " + map); + } + } else if (AttributeUtil.isWholeNumber(datatype)) { + if (datatype.equals(Long.class)) postfix = "_l"; + else postfix = "_i"; + } else if (AttributeUtil.isDecimal(datatype)) { + if (datatype.equals(Float.class)) postfix = "_f"; + else postfix = "_d"; + } else if (datatype.equals(Geoshape.class)) { + postfix = "_g"; + } else if (datatype.equals(Date.class)) { + postfix = "_dt"; + } else if (datatype.equals(Boolean.class)) { + postfix = "_b"; + } else if (datatype.equals(UUID.class)) { + postfix = "_uuid"; + } else throw new IllegalArgumentException("Unsupported data type ["+datatype+"] for field: " + key); + return key+postfix; + } + + @Override + public IndexFeatures getFeatures() { + return SOLR_FEATURES; + } + + /* + ################# UTILITY METHODS ####################### + */ + + private static Mapping getStringMapping(KeyInformation information) { + assert AttributeUtil.isString(information.getDataType()); + Mapping map = getMapping(information); + if (map== DEFAULT) map = TEXT; + return map; + } + + private UpdateRequest newUpdateRequest() { + UpdateRequest req = new UpdateRequest(); + req.setAction(UpdateRequest.ACTION.COMMIT, true, true); + if (waitSearcher) { + req.setAction(UpdateRequest.ACTION.COMMIT, true, true); + } + return req; + } + + private BackendException storageException(Exception solrException) { + return new TemporaryBackendException("Unable to complete query on Solr.", solrException); + } + + private static void createCollectionIfNotExists(CloudSolrClient client, Configuration config, String collection) + throws IOException, SolrServerException, KeeperException, InterruptedException { + if (!checkIfCollectionExists(client, collection)) { + Integer numShards = config.get(NUM_SHARDS); + Integer maxShardsPerNode = config.get(MAX_SHARDS_PER_NODE); + Integer replicationFactor = config.get(REPLICATION_FACTOR); + + CollectionAdminRequest.Create createRequest = new CollectionAdminRequest.Create(); + + createRequest.setConfigName(collection); + createRequest.setCollectionName(collection); + createRequest.setNumShards(numShards); + createRequest.setMaxShardsPerNode(maxShardsPerNode); + createRequest.setReplicationFactor(replicationFactor); + + CollectionAdminResponse createResponse = createRequest.process(client); + if (createResponse.isSuccess()) { + logger.trace("Collection {} successfully created.", collection); + } else { + throw new SolrServerException(Joiner.on("\n").join(createResponse.getErrorMessages())); + } + } + + waitForRecoveriesToFinish(client, collection); + } + + /** + * Checks if the collection has already been created in Solr. + */ + private static boolean checkIfCollectionExists(CloudSolrClient server, String collection) throws KeeperException, InterruptedException { + ZkStateReader zkStateReader = server.getZkStateReader(); + zkStateReader.updateClusterState(); + ClusterState clusterState = zkStateReader.getClusterState(); + return clusterState.getCollectionOrNull(collection) != null; + } + + /** + * Wait for all the collection shards to be ready. + */ + private static void waitForRecoveriesToFinish(CloudSolrClient server, String collection) throws KeeperException, InterruptedException { + ZkStateReader zkStateReader = server.getZkStateReader(); + try { + boolean cont = true; + + while (cont) { + boolean sawLiveRecovering = false; + zkStateReader.updateClusterState(); + ClusterState clusterState = zkStateReader.getClusterState(); + Map<String, Slice> slices = clusterState.getSlicesMap(collection); + Preconditions.checkNotNull("Could not find collection:" + collection, slices); + + for (Map.Entry<String, Slice> entry : slices.entrySet()) { + Map<String, Replica> shards = entry.getValue().getReplicasMap(); + for (Map.Entry<String, Replica> shard : shards.entrySet()) { + String state = shard.getValue().getStr(ZkStateReader.STATE_PROP); + if ((state.equals(Replica.State.RECOVERING) + || state.equals(Replica.State.DOWN)) + && clusterState.liveNodesContain(shard.getValue().getStr( + ZkStateReader.NODE_NAME_PROP))) { + sawLiveRecovering = true; + } + } + } + if (!sawLiveRecovering) { + cont = false; + } else { + Thread.sleep(1000); + } + } + } finally { + logger.info("Exiting solr wait"); + } + } + + private static class GeoToWktConverter { + /** + * {@link com.thinkaurelius.titan.core.attribute.Geoshape} stores Points in the String format: point[X.0,Y.0]. + * Solr needs it to be in Well-Known Text format: POINT(X.0 Y.0) + */ + static String convertToWktString(Geoshape fieldValue) throws BackendException { + if (fieldValue.getType() == Geoshape.Type.POINT) { + Geoshape.Point point = fieldValue.getPoint(); + return "POINT(" + point.getLongitude() + " " + point.getLatitude() + ")"; + } else { + throw new PermanentBackendException("Cannot index " + fieldValue.getType()); + } + } + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/graphdb/query/graph/GraphCentricQueryBuilder.java ---------------------------------------------------------------------- diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/graphdb/query/graph/GraphCentricQueryBuilder.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/graphdb/query/graph/GraphCentricQueryBuilder.java new file mode 100644 index 0000000..c1a983b --- /dev/null +++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/graphdb/query/graph/GraphCentricQueryBuilder.java @@ -0,0 +1,457 @@ +/* + * Copyright 2012-2013 Aurelius LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.thinkaurelius.titan.graphdb.query.graph; + +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.*; +import com.thinkaurelius.titan.core.*; +import com.thinkaurelius.titan.core.attribute.Cmp; +import com.thinkaurelius.titan.core.Cardinality; +import com.thinkaurelius.titan.core.schema.SchemaStatus; +import com.thinkaurelius.titan.core.schema.TitanSchemaType; +import com.thinkaurelius.titan.graphdb.database.IndexSerializer; +import com.thinkaurelius.titan.graphdb.internal.ElementCategory; +import com.thinkaurelius.titan.graphdb.internal.InternalRelationType; +import com.thinkaurelius.titan.graphdb.internal.OrderList; +import com.thinkaurelius.titan.graphdb.query.*; +import com.thinkaurelius.titan.graphdb.query.condition.*; +import com.thinkaurelius.titan.graphdb.transaction.StandardTitanTx; +import com.thinkaurelius.titan.graphdb.types.*; +import com.thinkaurelius.titan.graphdb.types.system.ImplicitKey; +import com.thinkaurelius.titan.util.datastructures.Interval; +import com.thinkaurelius.titan.util.datastructures.PointInterval; +import com.tinkerpop.blueprints.Edge; +import com.tinkerpop.blueprints.Vertex; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.util.*; + +/** + * + * Builds a {@link TitanGraphQuery}, optimizes the query and compiles the result into a {@link GraphCentricQuery} which + * is then executed through a {@link QueryProcessor}. + * This class from titan-0.5.4 has no major changes except adding a few logs for debugging index usage + * + * @author Matthias Broecheler ([email protected]) + */ +public class GraphCentricQueryBuilder implements TitanGraphQuery<GraphCentricQueryBuilder> { + + private static final Logger log = LoggerFactory.getLogger(GraphCentricQueryBuilder.class); + + /** + * Transaction in which this query is executed. + */ + private final StandardTitanTx tx; + /** + * Serializer used to serialize the query conditions into backend queries. + */ + private final IndexSerializer serializer; + /** + * The constraints added to this query. None by default. + */ + private List<PredicateCondition<String, TitanElement>> constraints; + /** + * The order in which the elements should be returned. None by default. + */ + private OrderList orders = new OrderList(); + /** + * The limit of this query. No limit by default. + */ + private int limit = Query.NO_LIMIT; + + public GraphCentricQueryBuilder(StandardTitanTx tx, IndexSerializer serializer) { + log.debug("Loaded shaded version of class GraphCentricQueryBuilder"); + Preconditions.checkNotNull(tx); + Preconditions.checkNotNull(serializer); + this.tx = tx; + this.serializer = serializer; + this.constraints = new ArrayList<PredicateCondition<String, TitanElement>>(5); + } + + /* --------------------------------------------------------------- + * Query Construction + * --------------------------------------------------------------- + */ + + private GraphCentricQueryBuilder has(String key, TitanPredicate predicate, Object condition) { + Preconditions.checkNotNull(key); + Preconditions.checkNotNull(predicate); + Preconditions.checkArgument(predicate.isValidCondition(condition), "Invalid condition: %s", condition); + constraints.add(new PredicateCondition<String, TitanElement>(key, predicate, condition)); + return this; + } + + @Override + public GraphCentricQueryBuilder has(String key, com.tinkerpop.blueprints.Predicate predicate, Object condition) { + Preconditions.checkNotNull(key); + TitanPredicate titanPredicate = TitanPredicate.Converter.convert(predicate); + return has(key, titanPredicate, condition); + } + + @Override + public GraphCentricQueryBuilder has(PropertyKey key, TitanPredicate predicate, Object condition) { + Preconditions.checkNotNull(key); + Preconditions.checkNotNull(predicate); + return has(key.getName(), predicate, condition); + } + + @Override + public GraphCentricQueryBuilder has(String key) { + return has(key, Cmp.NOT_EQUAL, (Object) null); + } + + @Override + public GraphCentricQueryBuilder hasNot(String key) { + return has(key, Cmp.EQUAL, (Object) null); + } + + @Override + @Deprecated + public <T extends Comparable<T>> GraphCentricQueryBuilder has(String s, T t, Compare compare) { + return has(s, compare, t); + } + + @Override + public GraphCentricQueryBuilder has(String key, Object value) { + return has(key, Cmp.EQUAL, value); + } + + @Override + public GraphCentricQueryBuilder hasNot(String key, Object value) { + return has(key, Cmp.NOT_EQUAL, value); + } + + @Override + public <T extends Comparable<?>> GraphCentricQueryBuilder interval(String s, T t1, T t2) { + has(s, Cmp.GREATER_THAN_EQUAL, t1); + return has(s, Cmp.LESS_THAN, t2); + } + + @Override + public GraphCentricQueryBuilder limit(final int limit) { + Preconditions.checkArgument(limit >= 0, "Non-negative limit expected: %s", limit); + this.limit = limit; + return this; + } + + @Override + public GraphCentricQueryBuilder orderBy(String key, Order order) { + Preconditions.checkArgument(tx.containsPropertyKey(key),"Provided key does not exist: %s",key); + return orderBy(tx.getPropertyKey(key), order); + } + + @Override + public GraphCentricQueryBuilder orderBy(PropertyKey key, Order order) { + Preconditions.checkArgument(key!=null && order!=null,"Need to specify and key and an order"); + Preconditions.checkArgument(Comparable.class.isAssignableFrom(key.getDataType()), + "Can only order on keys with comparable data type. [%s] has datatype [%s]", key.getName(), key.getDataType()); + Preconditions.checkArgument(key.getCardinality()== Cardinality.SINGLE, "Ordering is undefined on multi-valued key [%s]", key.getName()); + Preconditions.checkArgument(!orders.containsKey(key)); + orders.add(key, order); + return this; + } + + /* --------------------------------------------------------------- + * Query Execution + * --------------------------------------------------------------- + */ + + @Override + public Iterable<Vertex> vertices() { + GraphCentricQuery query = constructQuery(ElementCategory.VERTEX); + return Iterables.filter(new QueryProcessor<GraphCentricQuery, TitanElement, JointIndexQuery>(query, tx.elementProcessor), Vertex.class); + } + + @Override + public Iterable<Edge> edges() { + GraphCentricQuery query = constructQuery(ElementCategory.EDGE); + return Iterables.filter(new QueryProcessor<GraphCentricQuery, TitanElement, JointIndexQuery>(query, tx.elementProcessor), Edge.class); + } + + @Override + public Iterable<TitanProperty> properties() { + GraphCentricQuery query = constructQuery(ElementCategory.PROPERTY); + return Iterables.filter(new QueryProcessor<GraphCentricQuery, TitanElement, JointIndexQuery>(query, tx.elementProcessor), TitanProperty.class); + } + + private QueryDescription describe(ElementCategory category) { + return new StandardQueryDescription(1,constructQuery(category)); + } + + public QueryDescription describeForVertices() { + return describe(ElementCategory.VERTEX); + } + + public QueryDescription describeForEdges() { + return describe(ElementCategory.EDGE); + } + + public QueryDescription describeForProperties() { + return describe(ElementCategory.PROPERTY); + } + + /* --------------------------------------------------------------- + * Query Construction + * --------------------------------------------------------------- + */ + + private static final int DEFAULT_NO_LIMIT = 1000; + private static final int MAX_BASE_LIMIT = 20000; + private static final int HARD_MAX_LIMIT = 100000; + + private static final double EQUAL_CONDITION_SCORE = 4; + private static final double OTHER_CONDITION_SCORE = 1; + private static final double ORDER_MATCH = 2; + private static final double ALREADY_MATCHED_ADJUSTOR = 0.1; + private static final double CARDINALITY_SINGE_SCORE = 1000; + private static final double CARDINALITY_OTHER_SCORE = 1000; + + public GraphCentricQuery constructQuery(final ElementCategory resultType) { + Preconditions.checkNotNull(resultType); + if (limit == 0) return GraphCentricQuery.emptyQuery(resultType); + + //Prepare constraints + And<TitanElement> conditions = QueryUtil.constraints2QNF(tx, constraints); + if (conditions == null) return GraphCentricQuery.emptyQuery(resultType); + + //Prepare orders + orders.makeImmutable(); + if (orders.isEmpty()) orders = OrderList.NO_ORDER; + + //Compile all indexes that cover at least one of the query conditions + final Set<IndexType> indexCandidates = new HashSet<IndexType>(); + ConditionUtil.traversal(conditions,new Predicate<Condition<TitanElement>>() { + @Override + public boolean apply(@Nullable Condition<TitanElement> condition) { + if (condition instanceof PredicateCondition) { + RelationType type = ((PredicateCondition<RelationType,TitanElement>)condition).getKey(); + Preconditions.checkArgument(type!=null && type.isPropertyKey()); + Iterables.addAll(indexCandidates,Iterables.filter(((InternalRelationType) type).getKeyIndexes(), new Predicate<IndexType>() { + @Override + public boolean apply(@Nullable IndexType indexType) { + return indexType.getElement()==resultType; + } + })); + } + return true; + } + }); + + /* + Determine the best join index query to answer this query: + Iterate over all potential indexes (as compiled above) and compute a score based on how many clauses + this index covers. The index with the highest score (as long as it covers at least one additional clause) + is picked and added to the joint query for as long as such exist. + */ + JointIndexQuery jointQuery = new JointIndexQuery(); + boolean isSorted = orders.isEmpty(); + Set<Condition> coveredClauses = Sets.newHashSet(); + while (true) { + IndexType bestCandidate = null; + double candidateScore = 0.0; + Set<Condition> candidateSubcover = null; + boolean candidateSupportsSort = false; + Object candidateSubcondition = null; + + for (IndexType index : indexCandidates) { + Set<Condition> subcover = Sets.newHashSet(); + Object subcondition; + boolean supportsSort = orders.isEmpty(); + //Check that this index actually applies in case of a schema constraint + if (index.hasSchemaTypeConstraint()) { + TitanSchemaType type = index.getSchemaTypeConstraint(); + Map.Entry<Condition,Collection<Object>> equalCon = getEqualityConditionValues(conditions,ImplicitKey.LABEL); + if (equalCon==null) continue; + Collection<Object> labels = equalCon.getValue(); + assert labels.size()>=1; + if (labels.size()>1) { + log.warn("The query optimizer currently does not support multiple label constraints in query: {}",this); + continue; + } + if (!type.getName().equals((String)Iterables.getOnlyElement(labels))) continue; + subcover.add(equalCon.getKey()); + } + + if (index.isCompositeIndex()) { + subcondition = indexCover((CompositeIndexType) index,conditions,subcover); + } else { + subcondition = indexCover((MixedIndexType) index,conditions,serializer,subcover); + if (coveredClauses.isEmpty() && !supportsSort + && indexCoversOrder((MixedIndexType)index,orders)) supportsSort=true; + } + if (subcondition==null) continue; + assert !subcover.isEmpty(); + double score = 0.0; + boolean coversAdditionalClause = false; + for (Condition c : subcover) { + double s = (c instanceof PredicateCondition && ((PredicateCondition)c).getPredicate()==Cmp.EQUAL)? + EQUAL_CONDITION_SCORE:OTHER_CONDITION_SCORE; + if (coveredClauses.contains(c)) s=s*ALREADY_MATCHED_ADJUSTOR; + else coversAdditionalClause = true; + score+=s; + if (index.isCompositeIndex()) + score+=((CompositeIndexType)index).getCardinality()==Cardinality.SINGLE? + CARDINALITY_SINGE_SCORE:CARDINALITY_OTHER_SCORE; + } + if (supportsSort) score+=ORDER_MATCH; + if (coversAdditionalClause && score>candidateScore) { + candidateScore=score; + bestCandidate=index; + candidateSubcover = subcover; + candidateSubcondition = subcondition; + candidateSupportsSort = supportsSort; + } + } + if (bestCandidate!=null) { + if (coveredClauses.isEmpty()) isSorted=candidateSupportsSort; + coveredClauses.addAll(candidateSubcover); + + log.debug("Index chosen for query {} {} " , bestCandidate.isCompositeIndex() ? "COMPOSITE" : "MIXED", coveredClauses); + if (bestCandidate.isCompositeIndex()) { + jointQuery.add((CompositeIndexType)bestCandidate, + serializer.getQuery((CompositeIndexType)bestCandidate,(List<Object[]>)candidateSubcondition)); + } else { + jointQuery.add((MixedIndexType)bestCandidate, + serializer.getQuery((MixedIndexType)bestCandidate,(Condition)candidateSubcondition,orders)); + } + } else { + break; + } + /* TODO: smarter optimization: + - use in-memory histograms to estimate selectivity of PredicateConditions and filter out low-selectivity ones + if they would result in an individual index call (better to filter afterwards in memory) + - move OR's up and extend GraphCentricQuery to allow multiple JointIndexQuery for proper or'ing of queries + */ + } + + BackendQueryHolder<JointIndexQuery> query; + if (!coveredClauses.isEmpty()) { + int indexLimit = limit == Query.NO_LIMIT ? HARD_MAX_LIMIT : limit; + if (tx.getGraph().getConfiguration().adjustQueryLimit()) { + indexLimit = limit == Query.NO_LIMIT ? DEFAULT_NO_LIMIT : Math.min(MAX_BASE_LIMIT, limit); + } + indexLimit = Math.min(HARD_MAX_LIMIT, QueryUtil.adjustLimitForTxModifications(tx, coveredClauses.size(), indexLimit)); + jointQuery.setLimit(indexLimit); + query = new BackendQueryHolder<JointIndexQuery>(jointQuery, coveredClauses.size()==conditions.numChildren(), isSorted, null); + } else { + query = new BackendQueryHolder<JointIndexQuery>(new JointIndexQuery(), false, isSorted, null); + } + + return new GraphCentricQuery(resultType, conditions, orders, query, limit); + } + + public static final boolean indexCoversOrder(MixedIndexType index, OrderList orders) { + for (int i = 0; i < orders.size(); i++) { + if (!index.indexesKey(orders.getKey(i))) return false; + } + return true; + } + + public static List<Object[]> indexCover(final CompositeIndexType index, Condition<TitanElement> condition, Set<Condition> covered) { + assert QueryUtil.isQueryNormalForm(condition); + assert condition instanceof And; + if (index.getStatus()!= SchemaStatus.ENABLED) return null; + IndexField[] fields = index.getFieldKeys(); + Object[] indexValues = new Object[fields.length]; + Set<Condition> coveredClauses = new HashSet<Condition>(fields.length); + List<Object[]> indexCovers = new ArrayList<Object[]>(4); + + constructIndexCover(indexValues,0,fields,condition,indexCovers,coveredClauses); + if (!indexCovers.isEmpty()) { + covered.addAll(coveredClauses); + return indexCovers; + } else return null; + } + + private static void constructIndexCover(Object[] indexValues, int position, IndexField[] fields, + Condition<TitanElement> condition, + List<Object[]> indexCovers, Set<Condition> coveredClauses) { + if (position>=fields.length) { + indexCovers.add(indexValues); + } else { + IndexField field = fields[position]; + Map.Entry<Condition,Collection<Object>> equalCon = getEqualityConditionValues(condition,field.getFieldKey()); + if (equalCon!=null) { + coveredClauses.add(equalCon.getKey()); + assert equalCon.getValue().size()>0; + for (Object value : equalCon.getValue()) { + Object[] newValues = Arrays.copyOf(indexValues,fields.length); + newValues[position]=value; + constructIndexCover(newValues,position+1,fields,condition,indexCovers,coveredClauses); + } + } else return; + } + + } + + private static final Map.Entry<Condition,Collection<Object>> getEqualityConditionValues(Condition<TitanElement> condition, RelationType type) { + for (Condition c : condition.getChildren()) { + if (c instanceof Or) { + Map.Entry<RelationType,Collection> orEqual = QueryUtil.extractOrCondition((Or)c); + if (orEqual!=null && orEqual.getKey().equals(type) && !orEqual.getValue().isEmpty()) { + return new AbstractMap.SimpleImmutableEntry(c,orEqual.getValue()); + } + } else if (c instanceof PredicateCondition) { + PredicateCondition<RelationType, TitanRelation> atom = (PredicateCondition)c; + if (atom.getKey().equals(type) && atom.getPredicate()==Cmp.EQUAL && atom.getValue()!=null) { + return new AbstractMap.SimpleImmutableEntry(c,ImmutableList.of(atom.getValue())); + } + } + + } + return null; + } + + public static final Condition<TitanElement> indexCover(final MixedIndexType index, Condition<TitanElement> condition, + final IndexSerializer indexInfo, final Set<Condition> covered) { + assert QueryUtil.isQueryNormalForm(condition); + assert condition instanceof And; + And<TitanElement> subcondition = new And<TitanElement>(condition.numChildren()); + for (Condition<TitanElement> subclause : condition.getChildren()) { + if (coversAll(index,subclause,indexInfo)) { + subcondition.add(subclause); + covered.add(subclause); + } + } + return subcondition.isEmpty()?null:subcondition; + } + + private static final boolean coversAll(final MixedIndexType index, Condition<TitanElement> condition, IndexSerializer indexInfo) { + if (condition.getType()==Condition.Type.LITERAL) { + if (!(condition instanceof PredicateCondition)) return false; + PredicateCondition<RelationType, TitanElement> atom = (PredicateCondition) condition; + if (atom.getValue()==null) return false; + + Preconditions.checkArgument(atom.getKey().isPropertyKey()); + PropertyKey key = (PropertyKey) atom.getKey(); + ParameterIndexField[] fields = index.getFieldKeys(); + ParameterIndexField match = null; + for (int i = 0; i < fields.length; i++) { + if (fields[i].getStatus()!= SchemaStatus.ENABLED) continue; + if (fields[i].getFieldKey().equals(key)) match = fields[i]; + } + if (match==null) return false; + return indexInfo.supports(index,match,atom.getPredicate()); + } else { + for (Condition<TitanElement> child : condition.getChildren()) { + if (!coversAll(index,child,indexInfo)) return false; + } + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/GraphDbObjectFactory.java ---------------------------------------------------------------------- diff --git a/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/GraphDbObjectFactory.java b/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/GraphDbObjectFactory.java new file mode 100644 index 0000000..89de23d --- /dev/null +++ b/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/GraphDbObjectFactory.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.graphdb.titan0; + +import org.apache.atlas.repository.graphdb.AtlasGraphIndex; +import org.apache.atlas.repository.graphdb.titan0.query.Titan0GraphQuery; + +import com.thinkaurelius.titan.core.PropertyKey; +import com.thinkaurelius.titan.core.schema.TitanGraphIndex; +import com.tinkerpop.blueprints.Edge; +import com.tinkerpop.blueprints.Vertex; + +/** + * Factory that serves up instances of graph database abstraction layer classes + * that correspond to Titan/Tinkerpop classes. + */ +public final class GraphDbObjectFactory { + + private GraphDbObjectFactory() { + + } + + /** + * Creates a Titan0Edge that corresponds to the given Gremlin Edge. + * + * @param source + * @return + */ + public static Titan0Edge createEdge(Titan0Graph graph, Edge source) { + + if (source == null) { + return null; + } + return new Titan0Edge(graph, source); + } + + /** + * Creates a Titan0GraphQuery that corresponds to the given GraphQuery. + * @param source + * + * @return + */ + public static Titan0GraphQuery createQuery(Titan0Graph graph) { + + return new Titan0GraphQuery(graph); + } + + /** + * Creates a Titan0Vertex that corresponds to the given Gremlin Vertex. + * + * @param source + * @return + */ + public static Titan0Vertex createVertex(Titan0Graph graph, Vertex source) { + + if (source == null) { + return null; + } + return new Titan0Vertex(graph, source); + } + + /** + * @param propertyKey + * @return + */ + public static Titan0PropertyKey createPropertyKey(PropertyKey propertyKey) { + if (propertyKey == null) { + return null; + } + return new Titan0PropertyKey(propertyKey); + } + + /** + * @param index + * @return + */ + public static AtlasGraphIndex createGraphIndex(TitanGraphIndex index) { + if (index == null) { + return null; + } + return new Titan0GraphIndex(index); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0Database.java ---------------------------------------------------------------------- diff --git a/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0Database.java b/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0Database.java new file mode 100644 index 0000000..56b1664 --- /dev/null +++ b/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0Database.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.graphdb.titan0; + +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.HashMap; +import java.util.Map; + +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasException; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.GraphDatabase; +import org.apache.commons.configuration.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.ImmutableMap; +import com.thinkaurelius.titan.core.TitanFactory; +import com.thinkaurelius.titan.core.TitanGraph; +import com.thinkaurelius.titan.core.schema.TitanManagement; +import com.thinkaurelius.titan.core.util.TitanCleanup; +import com.thinkaurelius.titan.diskstorage.StandardIndexProvider; +import com.thinkaurelius.titan.diskstorage.solr.Solr5Index; + +/** + * Titan 0.5.4 implementation of GraphDatabase. + */ +public class Titan0Database implements GraphDatabase<Titan0Vertex, Titan0Edge> { + + private static final Logger LOG = LoggerFactory.getLogger(Titan0Database.class); + + /** + * Constant for the configuration property that indicates the prefix. + */ + public static final String GRAPH_PREFIX = "atlas.graph"; + + public static final String INDEX_BACKEND_CONF = "index.search.backend"; + + public static final String INDEX_BACKEND_LUCENE = "lucene"; + + public static final String INDEX_BACKEND_ES = "elasticsearch"; + + private static volatile TitanGraph graphInstance; + + public static Configuration getConfiguration() throws AtlasException { + Configuration configProperties = ApplicationProperties.get(); + return ApplicationProperties.getSubsetConfiguration(configProperties, GRAPH_PREFIX); + } + + static { + addSolr5Index(); + } + + /** + * Titan loads index backend name to implementation using + * StandardIndexProvider.ALL_MANAGER_CLASSES But + * StandardIndexProvider.ALL_MANAGER_CLASSES is a private static final + * ImmutableMap Only way to inject Solr5Index is to modify this field. So, + * using hacky reflection to add Sol5Index + */ + private static void addSolr5Index() { + try { + Field field = StandardIndexProvider.class.getDeclaredField("ALL_MANAGER_CLASSES"); + field.setAccessible(true); + + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL); + + Map<String, String> customMap = new HashMap<>(StandardIndexProvider.getAllProviderClasses()); + customMap.put("solr", Solr5Index.class.getName()); // for + // consistency + // with Titan + // 1.0.0 + customMap.put("solr5", Solr5Index.class.getName()); // for backward + // compatibility + ImmutableMap<String, String> immap = ImmutableMap.copyOf(customMap); + field.set(null, immap); + + LOG.debug("Injected solr5 index - {}", Solr5Index.class.getName()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static TitanGraph getGraphInstance() { + if (graphInstance == null) { + synchronized (Titan0Database.class) { + if (graphInstance == null) { + Configuration config; + try { + config = getConfiguration(); + } catch (AtlasException e) { + throw new RuntimeException(e); + } + + graphInstance = TitanFactory.open(config); + validateIndexBackend(config); + } + } + } + return graphInstance; + } + + public static void unload() { + + synchronized (Titan0Database.class) { + if (graphInstance == null) { + return; + } + + graphInstance.commit(); + //shutdown invalidates the graph instance + graphInstance.shutdown(); + graphInstance = null; + } + } + + static void validateIndexBackend(Configuration config) { + String configuredIndexBackend = config.getString(INDEX_BACKEND_CONF); + TitanManagement managementSystem = null; + + try { + managementSystem = getGraphInstance().getManagementSystem(); + String currentIndexBackend = managementSystem.get(INDEX_BACKEND_CONF); + + if (!equals(configuredIndexBackend, currentIndexBackend)) { + throw new RuntimeException("Configured Index Backend " + configuredIndexBackend + + " differs from earlier configured Index Backend " + currentIndexBackend + ". Aborting!"); + } + + } finally { + if (managementSystem != null) { + managementSystem.commit(); + } + } + + + } + + private static boolean equals(Object o1, Object o2) { + if (o1 == null) { + return o2 == null; + } + return o1.equals(o2); + } + + @Override + public AtlasGraph<Titan0Vertex, Titan0Edge> getGraph() { + // force graph loading up front to avoid bootstrapping + // issues + getGraphInstance(); + return new Titan0Graph(); + } + + @Override + public boolean isGraphLoaded() { + return graphInstance != null; + } + + + @Override + public void initializeTestGraph() { + + //nothing to do + } + + @Override + public void removeTestGraph() { + try { + getGraphInstance().shutdown(); + } + catch(Throwable t) { + LOG.warn("Could not shutdown test TitanGraph", t); + t.printStackTrace(); + } + + try { + TitanCleanup.clear(getGraphInstance()); + } + catch(Throwable t) { + LOG.warn("Could not clear test TitanGraph", t); + t.printStackTrace(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0DatabaseManager.java ---------------------------------------------------------------------- diff --git a/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0DatabaseManager.java b/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0DatabaseManager.java new file mode 100644 index 0000000..b4234d7 --- /dev/null +++ b/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0DatabaseManager.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.graphdb.titan0; + +import org.apache.atlas.repository.graphdb.AtlasGraphIndex; +import org.apache.atlas.repository.graphdb.AtlasGraphManagement; +import org.apache.atlas.repository.graphdb.AtlasPropertyKey; +import org.apache.atlas.typesystem.types.Multiplicity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.thinkaurelius.titan.core.Cardinality; +import com.thinkaurelius.titan.core.PropertyKey; +import com.thinkaurelius.titan.core.schema.Mapping; +import com.thinkaurelius.titan.core.schema.PropertyKeyMaker; +import com.thinkaurelius.titan.core.schema.TitanGraphIndex; +import com.thinkaurelius.titan.core.schema.TitanManagement; +import com.tinkerpop.blueprints.Edge; +import com.tinkerpop.blueprints.Element; +import com.tinkerpop.blueprints.Vertex; + +/** + * Titan 0.5.4 implementation of AtlasGraphManagement. + */ +public class Titan0DatabaseManager implements AtlasGraphManagement { + + private static final Logger LOG = LoggerFactory.getLogger(Titan0DatabaseManager.class); + + private TitanManagement management; + + public Titan0DatabaseManager(TitanManagement managementSystem) { + + management = managementSystem; + } + + @Override + public void buildMixedVertexIndex(String index, String backingIndex) { + buildMixedIndex(index, Vertex.class, backingIndex); + } + + @Override + public void buildMixedEdgeIndex(String index, String backingIndex) { + buildMixedIndex(index, Edge.class, backingIndex); + } + + private void buildMixedIndex(String index, Class<? extends Element> titanClass, String backingIndex) { + + management.buildIndex(index, titanClass).buildMixedIndex(backingIndex); + } + + @Override + public void createFullTextIndex(String indexName, AtlasPropertyKey propertyKey, String backingIndex) { + + PropertyKey fullText = TitanObjectFactory.createPropertyKey(propertyKey); + + management.buildIndex(indexName, Vertex.class) + .addKey(fullText, com.thinkaurelius.titan.core.schema.Parameter.of("mapping", Mapping.TEXT)) + .buildMixedIndex(backingIndex); + } + + @Override + public boolean containsPropertyKey(String propertyKey) { + return management.containsPropertyKey(propertyKey); + } + + @Override + public void rollback() { + management.rollback(); + + } + + @Override + public void commit() { + management.commit(); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.atlas.repository.graphdb.AtlasGraphManagement#makePropertyKey( + * java.lang.String, java.lang.Class, + * org.apache.atlas.typesystem.types.Multiplicity) + */ + @Override + public AtlasPropertyKey makePropertyKey(String propertyName, Class propertyClass, Multiplicity multiplicity) { + + PropertyKeyMaker propertyKeyBuilder = management.makePropertyKey(propertyName).dataType(propertyClass); + + if (multiplicity != null) { + Cardinality cardinality = TitanObjectFactory.createCardinality(multiplicity); + propertyKeyBuilder.cardinality(cardinality); + } + PropertyKey propertyKey = propertyKeyBuilder.make(); + return GraphDbObjectFactory.createPropertyKey(propertyKey); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.atlas.repository.graphdb.AtlasGraphManagement#getPropertyKey( + * java.lang.String) + */ + @Override + public AtlasPropertyKey getPropertyKey(String propertyName) { + + return GraphDbObjectFactory.createPropertyKey(management.getPropertyKey(propertyName)); + } + + /* + * (non-Javadoc) + * + * @see org.apache.atlas.repository.graphdb.AtlasGraphManagement# + * createCompositeIndex(java.lang.String, + * org.apache.atlas.repository.graphdb.AtlasPropertyKey, boolean) + */ + @Override + public void createCompositeIndex(String propertyName, AtlasPropertyKey propertyKey, boolean enforceUniqueness) { + PropertyKey titanKey = TitanObjectFactory.createPropertyKey(propertyKey); + TitanManagement.IndexBuilder indexBuilder = management.buildIndex(propertyName, Vertex.class).addKey(titanKey); + if (enforceUniqueness) { + indexBuilder.unique(); + } + indexBuilder.buildCompositeIndex(); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.atlas.repository.graphdb.AtlasGraphManagement#addIndexKey(java + * .lang.String, org.apache.atlas.repository.graphdb.AtlasPropertyKey) + */ + @Override + public void addIndexKey(String indexName, AtlasPropertyKey propertyKey) { + PropertyKey titanKey = TitanObjectFactory.createPropertyKey(propertyKey); + TitanGraphIndex vertexIndex = management.getGraphIndex(indexName); + management.addIndexKey(vertexIndex, titanKey); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.atlas.repository.graphdb.AtlasGraphManagement#getGraphIndex( + * java.lang.String) + */ + @Override + public AtlasGraphIndex getGraphIndex(String indexName) { + TitanGraphIndex index = management.getGraphIndex(indexName); + return GraphDbObjectFactory.createGraphIndex(index); + } + +}
