http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/locking/LocalLockMediator.java ---------------------------------------------------------------------- diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/locking/LocalLockMediator.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/locking/LocalLockMediator.java deleted file mode 100644 index 20c59e1..0000000 --- a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/locking/LocalLockMediator.java +++ /dev/null @@ -1,345 +0,0 @@ -/* - * Copyright 2012-2013 Aurelius LLC - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.thinkaurelius.titan.diskstorage.locking; - -import com.google.common.base.Preconditions; -import com.thinkaurelius.titan.diskstorage.util.time.Timepoint; -import com.thinkaurelius.titan.diskstorage.util.time.TimestampProvider; -import com.thinkaurelius.titan.diskstorage.locking.consistentkey.ExpectedValueCheckingTransaction; -import com.thinkaurelius.titan.diskstorage.util.KeyColumn; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.DelayQueue; -import java.util.concurrent.Delayed; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; - -/** - * This class resolves lock contention between two transactions on the same JVM. - * <p/> - * This is not just an optimization to reduce network traffic. Locks written by - * Titan to a distributed key-value store contain an identifier, the "Rid", - * which is unique only to the process level. The Rid can't tell which - * transaction in a process holds any given lock. This class prevents two - * transactions in a single process from concurrently writing the same lock to a - * distributed key-value store. - * - * @author Dan LaRocque <[email protected]> - */ - -public class LocalLockMediator<T> { - - private static final Logger log = LoggerFactory - .getLogger(LocalLockMediator.class); - - /** - * Namespace for which this mediator is responsible - * - * @see LocalLockMediatorProvider - */ - private final String name; - - private final TimestampProvider times; - - private DelayQueue<ExpirableKeyColumn> expiryQueue = new DelayQueue<>(); - - private ExecutorService lockCleanerService = Executors.newFixedThreadPool(1, new ThreadFactory() { - @Override - public Thread newThread(Runnable runnable) { - Thread thread = Executors.defaultThreadFactory().newThread(runnable); - thread.setDaemon(true); - return thread; - } - }); - - - - /** - * Maps a ({@code key}, {@code column}) pair to the local transaction - * holding a lock on that pair. Values in this map may have already expired - * according to {@link AuditRecord#expires}, in which case the lock should - * be considered invalid. - */ - private final ConcurrentHashMap<KeyColumn, AuditRecord<T>> locks = new ConcurrentHashMap<KeyColumn, AuditRecord<T>>(); - - public LocalLockMediator(String name, TimestampProvider times) { - this.name = name; - this.times = times; - - Preconditions.checkNotNull(name); - Preconditions.checkNotNull(times); - lockCleanerService.submit(new LockCleaner()); - } - - /** - * Acquire the lock specified by {@code kc}. - * <p/> - * <p/> - * For any particular key-column, whatever value of {@code requestor} is - * passed to this method must also be passed to the associated later call to - * {@link #unlock(KeyColumn, ExpectedValueCheckingTransaction)}. - * <p/> - * If some requestor {@code r} calls this method on a KeyColumn {@code k} - * and this method returns true, then subsequent calls to this method by - * {@code r} on {@code l} merely attempt to update the {@code expiresAt} - * timestamp. This differs from typical lock reentrance: multiple successful - * calls to this method do not require an equal number of calls to - * {@code #unlock()}. One {@code #unlock()} call is enough, no matter how - * many times a {@code requestor} called {@code lock} beforehand. Note that - * updating the timestamp may fail, in which case the lock is considered to - * have expired and the calling context should assume it no longer holds the - * lock specified by {@code kc}. - * <p/> - * The number of nanoseconds elapsed since the UNIX Epoch is not readily - * available within the JVM. When reckoning expiration times, this method - * uses the approximation implemented by - * {@link com.thinkaurelius.titan.diskstorage.util.NanoTime#getApproxNSSinceEpoch(false)}. - * <p/> - * The current implementation of this method returns true when given an - * {@code expiresAt} argument in the past. Future implementations may return - * false instead. - * - * @param kc lock identifier - * @param requestor the object locking {@code kc} - * @param expires instant at which this lock will automatically expire - * @return true if the lock is acquired, false if it was not acquired - */ - public boolean lock(KeyColumn kc, T requestor, Timepoint expires) { - assert null != kc; - assert null != requestor; - - AuditRecord<T> audit = new AuditRecord<T>(requestor, expires); - AuditRecord<T> inmap = locks.putIfAbsent(kc, audit); - - boolean success = false; - - if (null == inmap) { - // Uncontended lock succeeded - if (log.isTraceEnabled()) { - log.trace("New local lock created: {} namespace={} txn={}", - new Object[]{kc, name, requestor}); - } - success = true; - } else if (inmap.equals(audit)) { - // requestor has already locked kc; update expiresAt - success = locks.replace(kc, inmap, audit); - if (log.isTraceEnabled()) { - if (success) { - log.trace( - "Updated local lock expiration: {} namespace={} txn={} oldexp={} newexp={}", - new Object[]{kc, name, requestor, inmap.expires, - audit.expires}); - } else { - log.trace( - "Failed to update local lock expiration: {} namespace={} txn={} oldexp={} newexp={}", - new Object[]{kc, name, requestor, inmap.expires, - audit.expires}); - } - } - } else if (0 > inmap.expires.compareTo(times.getTime())) { - // the recorded lock has expired; replace it - success = locks.replace(kc, inmap, audit); - if (log.isTraceEnabled()) { - log.trace( - "Discarding expired lock: {} namespace={} txn={} expired={}", - new Object[]{kc, name, inmap.holder, inmap.expires}); - } - } else { - // we lost to a valid lock - if (log.isTraceEnabled()) { - log.trace( - "Local lock failed: {} namespace={} txn={} (already owned by {})", - new Object[]{kc, name, requestor, inmap}); - } - } - - if (success) { - expiryQueue.add(new ExpirableKeyColumn(kc, expires)); - } - return success; - } - - /** - * Release the lock specified by {@code kc} and which was previously - * locked by {@code requestor}, if it is possible to release it. - * - * @param kc lock identifier - * @param requestor the object which previously locked {@code kc} - */ - public boolean unlock(KeyColumn kc, T requestor) { - - if (!locks.containsKey(kc)) { - log.info("Local unlock failed: no locks found for {}", kc); - return false; - } - - AuditRecord<T> unlocker = new AuditRecord<T>(requestor, null); - - AuditRecord<T> holder = locks.get(kc); - - if (!holder.equals(unlocker)) { - log.error("Local unlock of {} by {} failed: it is held by {}", - new Object[]{kc, unlocker, holder}); - return false; - } - - boolean removed = locks.remove(kc, unlocker); - - if (removed) { - expiryQueue.remove(kc); - if (log.isTraceEnabled()) { - log.trace("Local unlock succeeded: {} namespace={} txn={}", - new Object[]{kc, name, requestor}); - } - } else { - log.warn("Local unlock warning: lock record for {} disappeared " - + "during removal; this suggests the lock either expired " - + "while we were removing it, or that it was erroneously " - + "unlocked multiple times.", kc); - } - - // Even if !removed, we're finished unlocking, so return true - return true; - } - - public String toString() { - return "LocalLockMediator [" + name + ", ~" + locks.size() - + " current locks]"; - } - - /** - * A record containing the local transaction that holds a lock and the - * lock's expiration time. - */ - private static class AuditRecord<T> { - - /** - * The local transaction that holds/held the lock. - */ - private final T holder; - /** - * The expiration time of a the lock. - */ - private final Timepoint expires; - /** - * Cached hashCode. - */ - private int hashCode; - - private AuditRecord(T holder, Timepoint expires) { - this.holder = holder; - this.expires = expires; - } - - /** - * This implementation depends only on the lock holder and not on the - * lock expiration time. - */ - @Override - public int hashCode() { - if (0 == hashCode) - hashCode = holder.hashCode(); - - return hashCode; - } - - /** - * This implementation depends only on the lock holder and not on the - * lock expiration time. - */ - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - /* - * This warning suppression is harmless because we are only going to - * call other.holder.equals(...), and since equals(...) is part of - * Object, it is guaranteed to be defined no matter the concrete - * type of parameter T. - */ - @SuppressWarnings("rawtypes") - AuditRecord other = (AuditRecord) obj; - if (holder == null) { - if (other.holder != null) - return false; - } else if (!holder.equals(other.holder)) - return false; - return true; - } - - @Override - public String toString() { - return "AuditRecord [txn=" + holder + ", expires=" + expires + "]"; - } - - } - - private class LockCleaner implements Runnable { - - @Override - public void run() { - try { - while (true) { - log.debug("Lock Cleaner service started"); - ExpirableKeyColumn lock = expiryQueue.take(); - log.debug("Expiring key column " + lock.getKeyColumn()); - locks.remove(lock.getKeyColumn()); - } - } catch (InterruptedException e) { - log.debug("Received interrupt. Exiting"); - } - } - } - - private static class ExpirableKeyColumn implements Delayed { - - private Timepoint expiryTime; - private KeyColumn kc; - - public ExpirableKeyColumn(KeyColumn keyColumn, Timepoint expiryTime) { - this.kc = keyColumn; - this.expiryTime = expiryTime; - } - - @Override - public long getDelay(TimeUnit unit) { - return expiryTime.getTimestamp(TimeUnit.NANOSECONDS); - } - - @Override - public int compareTo(Delayed o) { - if (this.expiryTime.getTimestamp(TimeUnit.NANOSECONDS) < ((ExpirableKeyColumn) o).expiryTime.getTimestamp(TimeUnit.NANOSECONDS)) { - return -1; - } - if (this.expiryTime.getTimestamp(TimeUnit.NANOSECONDS) > ((ExpirableKeyColumn) o).expiryTime.getTimestamp(TimeUnit.NANOSECONDS)) { - return 1; - } - return 0; - } - - public KeyColumn getKeyColumn() { - return kc; - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/solr/Solr5Index.java ---------------------------------------------------------------------- diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/solr/Solr5Index.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/solr/Solr5Index.java deleted file mode 100644 index 3b5620c..0000000 --- a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/solr/Solr5Index.java +++ /dev/null @@ -1,975 +0,0 @@ -/* - * Copyright 2012-2013 Aurelius LLC - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.thinkaurelius.titan.diskstorage.solr; - -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.collect.Sets; -import com.thinkaurelius.titan.core.Order; -import com.thinkaurelius.titan.core.TitanElement; -import com.thinkaurelius.titan.core.attribute.Cmp; -import com.thinkaurelius.titan.core.attribute.Geo; -import com.thinkaurelius.titan.core.attribute.Geoshape; -import com.thinkaurelius.titan.core.attribute.Text; -import com.thinkaurelius.titan.core.schema.Mapping; -import com.thinkaurelius.titan.diskstorage.BackendException; -import com.thinkaurelius.titan.diskstorage.BaseTransaction; -import com.thinkaurelius.titan.diskstorage.BaseTransactionConfig; -import com.thinkaurelius.titan.diskstorage.BaseTransactionConfigurable; -import com.thinkaurelius.titan.diskstorage.PermanentBackendException; -import com.thinkaurelius.titan.diskstorage.TemporaryBackendException; -import com.thinkaurelius.titan.diskstorage.configuration.ConfigNamespace; -import com.thinkaurelius.titan.diskstorage.configuration.ConfigOption; -import com.thinkaurelius.titan.diskstorage.configuration.Configuration; -import com.thinkaurelius.titan.diskstorage.indexing.IndexEntry; -import com.thinkaurelius.titan.diskstorage.indexing.IndexFeatures; -import com.thinkaurelius.titan.diskstorage.indexing.IndexMutation; -import com.thinkaurelius.titan.diskstorage.indexing.IndexProvider; -import com.thinkaurelius.titan.diskstorage.indexing.IndexQuery; -import com.thinkaurelius.titan.diskstorage.indexing.KeyInformation; -import com.thinkaurelius.titan.diskstorage.indexing.RawQuery; -import com.thinkaurelius.titan.diskstorage.util.DefaultTransaction; -import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration; -import com.thinkaurelius.titan.graphdb.configuration.PreInitializeConfigOptions; -import com.thinkaurelius.titan.graphdb.database.serialize.AttributeUtil; -import com.thinkaurelius.titan.graphdb.database.serialize.attribute.AbstractDecimal; -import com.thinkaurelius.titan.graphdb.query.TitanPredicate; -import com.thinkaurelius.titan.graphdb.query.condition.And; -import com.thinkaurelius.titan.graphdb.query.condition.Condition; -import com.thinkaurelius.titan.graphdb.query.condition.Not; -import com.thinkaurelius.titan.graphdb.query.condition.Or; -import com.thinkaurelius.titan.graphdb.query.condition.PredicateCondition; -import com.thinkaurelius.titan.graphdb.types.ParameterType; -import org.apache.commons.lang.StringUtils; -import org.apache.http.client.HttpClient; -import org.apache.solr.client.solrj.SolrClient; -import org.apache.solr.client.solrj.SolrQuery; -import org.apache.solr.client.solrj.SolrServerException; -import org.apache.solr.client.solrj.impl.CloudSolrClient; -import org.apache.solr.client.solrj.impl.HttpClientUtil; -import org.apache.solr.client.solrj.impl.HttpSolrClient; -import org.apache.solr.client.solrj.impl.Krb5HttpClientConfigurer; -import org.apache.solr.client.solrj.impl.LBHttpSolrClient; -import org.apache.solr.client.solrj.request.CollectionAdminRequest; -import org.apache.solr.client.solrj.request.UpdateRequest; -import org.apache.solr.client.solrj.response.CollectionAdminResponse; -import org.apache.solr.client.solrj.response.QueryResponse; -import org.apache.solr.client.solrj.util.ClientUtils; -import org.apache.solr.common.SolrDocument; -import org.apache.solr.common.SolrInputDocument; -import org.apache.solr.common.cloud.ClusterState; -import org.apache.solr.common.cloud.Replica; -import org.apache.solr.common.cloud.Slice; -import org.apache.solr.common.cloud.ZkStateReader; -import org.apache.solr.common.params.ModifiableSolrParams; -import org.apache.zookeeper.KeeperException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.TimeZone; -import java.util.UUID; - -import static com.thinkaurelius.titan.core.attribute.Cmp.*; -import static com.thinkaurelius.titan.core.schema.Mapping.*; - -/** - * NOTE: Copied from titan for supporting sol5. Do not change - */ -@PreInitializeConfigOptions -public class Solr5Index implements IndexProvider { - - private static final Logger logger = LoggerFactory.getLogger(Solr5Index.class); - - - private static final String DEFAULT_ID_FIELD = "id"; - - private enum Mode { - HTTP, CLOUD; - - public static Mode parse(String mode) { - for (Mode m : Mode.values()) { - if (m.toString().equalsIgnoreCase(mode)) return m; - } - throw new IllegalArgumentException("Unrecognized mode: "+mode); - } - } - - public static final ConfigNamespace SOLR_NS = - new ConfigNamespace(GraphDatabaseConfiguration.INDEX_NS, "solr", "Solr index configuration"); - - public static final ConfigOption<String> SOLR_MODE = new ConfigOption<String>(SOLR_NS,"mode", - "The operation mode for Solr which is either via HTTP (`http`) or using SolrCloud (`cloud`)", - ConfigOption.Type.GLOBAL_OFFLINE, "cloud"); - - public static final ConfigOption<Boolean> DYNAMIC_FIELDS = new ConfigOption<Boolean>(SOLR_NS,"dyn-fields", - "Whether to use dynamic fields (which appends the data type to the field name). If dynamic fields is disabled" + - "the user must map field names and define them explicitly in the schema.", - ConfigOption.Type.GLOBAL_OFFLINE, true); - - public static final ConfigOption<String[]> KEY_FIELD_NAMES = new ConfigOption<String[]>(SOLR_NS,"key-field-names", - "Field name that uniquely identifies each document in Solr. Must be specified as a list of `collection=field`.", - ConfigOption.Type.GLOBAL, String[].class); - - public static final ConfigOption<String> TTL_FIELD = new ConfigOption<String>(SOLR_NS,"ttl_field", - "Name of the TTL field for Solr collections.", - ConfigOption.Type.GLOBAL_OFFLINE, "ttl"); - - /** SolrCloud Configuration */ - - public static final ConfigOption<String> ZOOKEEPER_URL = new ConfigOption<String>(SOLR_NS,"zookeeper-url", - "URL of the Zookeeper instance coordinating the SolrCloud cluster", - ConfigOption.Type.MASKABLE, "localhost:2181"); - - public static final ConfigOption<Integer> NUM_SHARDS = new ConfigOption<Integer>(SOLR_NS,"num-shards", - "Number of shards for a collection. This applies when creating a new collection which is only supported under the SolrCloud operation mode.", - ConfigOption.Type.GLOBAL_OFFLINE, 1); - - public static final ConfigOption<Integer> MAX_SHARDS_PER_NODE = new ConfigOption<Integer>(SOLR_NS,"max-shards-per-node", - "Maximum number of shards per node. This applies when creating a new collection which is only supported under the SolrCloud operation mode.", - ConfigOption.Type.GLOBAL_OFFLINE, 1); - - public static final ConfigOption<Integer> REPLICATION_FACTOR = new ConfigOption<Integer>(SOLR_NS,"replication-factor", - "Replication factor for a collection. This applies when creating a new collection which is only supported under the SolrCloud operation mode.", - ConfigOption.Type.GLOBAL_OFFLINE, 1); - - - /** HTTP Configuration */ - - public static final ConfigOption<String[]> HTTP_URLS = new ConfigOption<String[]>(SOLR_NS,"http-urls", - "List of URLs to use to connect to Solr Servers (LBHttpSolrClient is used), don't add core or collection name to the URL.", - ConfigOption.Type.MASKABLE, new String[] { "http://localhost:8983/solr" }); - - public static final ConfigOption<Integer> HTTP_CONNECTION_TIMEOUT = new ConfigOption<Integer>(SOLR_NS,"http-connection-timeout", - "Solr HTTP connection timeout.", - ConfigOption.Type.MASKABLE, 5000); - - public static final ConfigOption<Boolean> HTTP_ALLOW_COMPRESSION = new ConfigOption<Boolean>(SOLR_NS,"http-compression", - "Enable/disable compression on the HTTP connections made to Solr.", - ConfigOption.Type.MASKABLE, false); - - public static final ConfigOption<Integer> HTTP_MAX_CONNECTIONS_PER_HOST = new ConfigOption<Integer>(SOLR_NS,"http-max-per-host", - "Maximum number of HTTP connections per Solr host.", - ConfigOption.Type.MASKABLE, 20); - - public static final ConfigOption<Integer> HTTP_GLOBAL_MAX_CONNECTIONS = new ConfigOption<Integer>(SOLR_NS,"http-max", - "Maximum number of HTTP connections in total to all Solr servers.", - ConfigOption.Type.MASKABLE, 100); - - public static final ConfigOption<Boolean> WAIT_SEARCHER = new ConfigOption<Boolean>(SOLR_NS, "wait-searcher", - "When mutating - wait for the index to reflect new mutations before returning. This can have a negative impact on performance.", - ConfigOption.Type.LOCAL, false); - - - - private static final IndexFeatures SOLR_FEATURES = new IndexFeatures.Builder().supportsDocumentTTL() - .setDefaultStringMapping(TEXT).supportedStringMappings(TEXT, STRING).build(); - - private final SolrClient solrClient; - private final Configuration configuration; - private final Mode mode; - private final boolean dynFields; - private final Map<String, String> keyFieldIds; - private final String ttlField; - private final int maxResults; - private final boolean waitSearcher; - - public Solr5Index(final Configuration config) throws BackendException { - Preconditions.checkArgument(config!=null); - configuration = config; - - mode = Mode.parse(config.get(SOLR_MODE)); - dynFields = config.get(DYNAMIC_FIELDS); - keyFieldIds = parseKeyFieldsForCollections(config); - maxResults = config.get(GraphDatabaseConfiguration.INDEX_MAX_RESULT_SET_SIZE); - ttlField = config.get(TTL_FIELD); - waitSearcher = config.get(WAIT_SEARCHER); - - if (mode==Mode.CLOUD) { - HttpClientUtil.setConfigurer(new Krb5HttpClientConfigurer()); - String zookeeperUrl = config.get(Solr5Index.ZOOKEEPER_URL); - CloudSolrClient cloudServer = new CloudSolrClient(zookeeperUrl, true); - cloudServer.connect(); - solrClient = cloudServer; - } else if (mode==Mode.HTTP) { - HttpClientUtil.setConfigurer(new Krb5HttpClientConfigurer()); - HttpClient clientParams = HttpClientUtil.createClient(new ModifiableSolrParams() {{ - add(HttpClientUtil.PROP_ALLOW_COMPRESSION, config.get(HTTP_ALLOW_COMPRESSION).toString()); - add(HttpClientUtil.PROP_CONNECTION_TIMEOUT, config.get(HTTP_CONNECTION_TIMEOUT).toString()); - add(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, config.get(HTTP_MAX_CONNECTIONS_PER_HOST).toString()); - add(HttpClientUtil.PROP_MAX_CONNECTIONS, config.get(HTTP_GLOBAL_MAX_CONNECTIONS).toString()); - }}); - - solrClient = new LBHttpSolrClient(clientParams, config.get(HTTP_URLS)); - - - } else { - throw new IllegalArgumentException("Unsupported Solr operation mode: " + mode); - } - } - - private Map<String, String> parseKeyFieldsForCollections(Configuration config) throws BackendException { - Map<String, String> keyFieldNames = new HashMap<String, String>(); - String[] collectionFieldStatements = config.has(KEY_FIELD_NAMES)?config.get(KEY_FIELD_NAMES):new String[0]; - for (String collectionFieldStatement : collectionFieldStatements) { - String[] parts = collectionFieldStatement.trim().split("="); - if (parts.length != 2) { - throw new PermanentBackendException("Unable to parse the collection name / key field name pair. It should be of the format collection=field"); - } - String collectionName = parts[0]; - String keyFieldName = parts[1]; - keyFieldNames.put(collectionName, keyFieldName); - } - return keyFieldNames; - } - - private String getKeyFieldId(String collection) { - String field = keyFieldIds.get(collection); - if (field==null) field = DEFAULT_ID_FIELD; - return field; - } - - /** - * Unlike the ElasticSearch Index, which is schema free, Solr requires a schema to - * support searching. This means that you will need to modify the solr schema with the - * appropriate field definitions in order to work properly. If you have a running instance - * of Solr and you modify its schema with new fields, don't forget to re-index! - * @param store Index store - * @param key New key to register - * @param information Datatype to register for the key - * @param tx enclosing transaction - * @throws com.thinkaurelius.titan.diskstorage.BackendException - */ - @Override - public void register(String store, String key, KeyInformation information, BaseTransaction tx) throws BackendException { - if (mode==Mode.CLOUD) { - CloudSolrClient client = (CloudSolrClient) solrClient; - try { - createCollectionIfNotExists(client, configuration, store); - } catch (IOException e) { - throw new PermanentBackendException(e); - } catch (SolrServerException e) { - throw new PermanentBackendException(e); - } catch (InterruptedException e) { - throw new PermanentBackendException(e); - } catch (KeeperException e) { - throw new PermanentBackendException(e); - } - } - //Since all data types must be defined in the schema.xml, pre-registering a type does not work - } - - @Override - public void mutate(Map<String, Map<String, IndexMutation>> mutations, KeyInformation.IndexRetriever informations, BaseTransaction tx) throws BackendException { - logger.debug("Mutating SOLR"); - try { - for (Map.Entry<String, Map<String, IndexMutation>> stores : mutations.entrySet()) { - String collectionName = stores.getKey(); - String keyIdField = getKeyFieldId(collectionName); - - List<String> deleteIds = new ArrayList<String>(); - Collection<SolrInputDocument> changes = new ArrayList<SolrInputDocument>(); - - for (Map.Entry<String, IndexMutation> entry : stores.getValue().entrySet()) { - String docId = entry.getKey(); - IndexMutation mutation = entry.getValue(); - Preconditions.checkArgument(!(mutation.isNew() && mutation.isDeleted())); - Preconditions.checkArgument(!mutation.isNew() || !mutation.hasDeletions()); - Preconditions.checkArgument(!mutation.isDeleted() || !mutation.hasAdditions()); - - //Handle any deletions - if (mutation.hasDeletions()) { - if (mutation.isDeleted()) { - logger.trace("Deleting entire document {}", docId); - deleteIds.add(docId); - } else { - HashSet<IndexEntry> fieldDeletions = Sets.newHashSet(mutation.getDeletions()); - if (mutation.hasAdditions()) { - for (IndexEntry indexEntry : mutation.getAdditions()) { - fieldDeletions.remove(indexEntry); - } - } - deleteIndividualFieldsFromIndex(collectionName, keyIdField, docId, fieldDeletions); - } - } - - if (mutation.hasAdditions()) { - int ttl = mutation.determineTTL(); - - SolrInputDocument doc = new SolrInputDocument(); - doc.setField(keyIdField, docId); - - boolean isNewDoc = mutation.isNew(); - - if (isNewDoc) - logger.trace("Adding new document {}", docId); - - for (IndexEntry e : mutation.getAdditions()) { - final Object fieldValue = convertValue(e.value); - doc.setField(e.field, isNewDoc - ? fieldValue : new HashMap<String, Object>(1) {{ put("set", fieldValue); }}); - } - if (ttl>0) { - Preconditions.checkArgument(isNewDoc,"Solr only supports TTL on new documents [%s]",docId); - doc.setField(ttlField, String.format("+%dSECONDS", ttl)); - } - changes.add(doc); - } - } - - commitDeletes(collectionName, deleteIds); - commitDocumentChanges(collectionName, changes); - } - } catch (Exception e) { - throw storageException(e); - } - } - - private Object convertValue(Object value) throws BackendException { - if (value instanceof Geoshape) - return GeoToWktConverter.convertToWktString((Geoshape) value); - // in order to serialize/deserialize properly Solr will have to have an - // access to Titan source which has Decimal type, so for now we simply convert to - // double and let Solr do the same thing or fail. - if (value instanceof AbstractDecimal) - return ((AbstractDecimal) value).doubleValue(); - if (value instanceof UUID) - return value.toString(); - return value; - } - - @Override - public void restore(Map<String, Map<String, List<IndexEntry>>> documents, KeyInformation.IndexRetriever informations, BaseTransaction tx) throws BackendException { - try { - for (Map.Entry<String, Map<String, List<IndexEntry>>> stores : documents.entrySet()) { - final String collectionName = stores.getKey(); - - List<String> deleteIds = new ArrayList<String>(); - List<SolrInputDocument> newDocuments = new ArrayList<SolrInputDocument>(); - - for (Map.Entry<String, List<IndexEntry>> entry : stores.getValue().entrySet()) { - final String docID = entry.getKey(); - final List<IndexEntry> content = entry.getValue(); - - if (content == null || content.isEmpty()) { - if (logger.isTraceEnabled()) - logger.trace("Deleting document [{}]", docID); - - deleteIds.add(docID); - continue; - } - - newDocuments.add(new SolrInputDocument() {{ - setField(getKeyFieldId(collectionName), docID); - - for (IndexEntry addition : content) { - Object fieldValue = addition.value; - setField(addition.field, convertValue(fieldValue)); - } - }}); - } - - commitDeletes(collectionName, deleteIds); - commitDocumentChanges(collectionName, newDocuments); - } - } catch (Exception e) { - throw new TemporaryBackendException("Could not restore Solr index", e); - } - } - - private void deleteIndividualFieldsFromIndex(String collectionName, String keyIdField, String docId, HashSet<IndexEntry> fieldDeletions) throws SolrServerException, IOException { - if (fieldDeletions.isEmpty()) return; - - Map<String, String> fieldDeletes = new HashMap<String, String>(1) {{ put("set", null); }}; - - SolrInputDocument doc = new SolrInputDocument(); - doc.addField(keyIdField, docId); - StringBuilder sb = new StringBuilder(); - for (IndexEntry fieldToDelete : fieldDeletions) { - doc.addField(fieldToDelete.field, fieldDeletes); - sb.append(fieldToDelete).append(","); - } - - if (logger.isTraceEnabled()) - logger.trace("Deleting individual fields [{}] for document {}", sb.toString(), docId); - - UpdateRequest singleDocument = newUpdateRequest(); - singleDocument.add(doc); - solrClient.request(singleDocument, collectionName); - } - - private void commitDocumentChanges(String collectionName, Collection<SolrInputDocument> documents) throws SolrServerException, IOException { - if (documents.size() == 0) return; - - try { - solrClient.request(newUpdateRequest().add(documents), collectionName); - } catch (HttpSolrClient.RemoteSolrException rse) { - logger.error("Unable to save documents to Solr as one of the shape objects stored were not compatible with Solr.", rse); - logger.error("Details in failed document batch: "); - for (SolrInputDocument d : documents) { - Collection<String> fieldNames = d.getFieldNames(); - for (String name : fieldNames) { - logger.error(name + ":" + d.getFieldValue(name).toString()); - } - } - - throw rse; - } - } - - private void commitDeletes(String collectionName, List<String> deleteIds) throws SolrServerException, IOException { - if (deleteIds.size() == 0) return; - solrClient.request(newUpdateRequest().deleteById(deleteIds), collectionName); - } - - @Override - public List<String> query(IndexQuery query, KeyInformation.IndexRetriever informations, BaseTransaction tx) throws BackendException { - List<String> result; - String collection = query.getStore(); - String keyIdField = getKeyFieldId(collection); - SolrQuery solrQuery = new SolrQuery("*:*"); - String queryFilter = buildQueryFilter(query.getCondition(), informations.get(collection)); - solrQuery.addFilterQuery(queryFilter); - if (!query.getOrder().isEmpty()) { - List<IndexQuery.OrderEntry> orders = query.getOrder(); - for (IndexQuery.OrderEntry order1 : orders) { - String item = order1.getKey(); - SolrQuery.ORDER order = order1.getOrder() == Order.ASC ? SolrQuery.ORDER.asc : SolrQuery.ORDER.desc; - solrQuery.addSort(new SolrQuery.SortClause(item, order)); - } - } - solrQuery.setStart(0); - if (query.hasLimit()) { - solrQuery.setRows(query.getLimit()); - } else { - solrQuery.setRows(maxResults); - } - try { - QueryResponse response = solrClient.query(collection, solrQuery); - - if (logger.isDebugEnabled()) - logger.debug("Executed query [{}] in {} ms", query.getCondition(), response.getElapsedTime()); - - int totalHits = response.getResults().size(); - - if (!query.hasLimit() && totalHits >= maxResults) - logger.warn("Query result set truncated to first [{}] elements for query: {}", maxResults, query); - - result = new ArrayList<String>(totalHits); - for (SolrDocument hit : response.getResults()) { - result.add(hit.getFieldValue(keyIdField).toString()); - } - } catch (IOException e) { - logger.error("Query did not complete : ", e); - throw new PermanentBackendException(e); - } catch (SolrServerException e) { - logger.error("Unable to query Solr index.", e); - throw new PermanentBackendException(e); - } - return result; - } - - @Override - public Iterable<RawQuery.Result<String>> query(RawQuery query, KeyInformation.IndexRetriever informations, BaseTransaction tx) throws BackendException { - List<RawQuery.Result<String>> result; - String collection = query.getStore(); - String keyIdField = getKeyFieldId(collection); - SolrQuery solrQuery = new SolrQuery(query.getQuery()) - .addField(keyIdField) - .setIncludeScore(true) - .setStart(query.getOffset()) - .setRows(query.hasLimit() ? query.getLimit() : maxResults); - - try { - QueryResponse response = solrClient.query(collection, solrQuery); - if (logger.isDebugEnabled()) - logger.debug("Executed query [{}] in {} ms", query.getQuery(), response.getElapsedTime()); - - int totalHits = response.getResults().size(); - if (!query.hasLimit() && totalHits >= maxResults) { - logger.warn("Query result set truncated to first [{}] elements for query: {}", maxResults, query); - } - result = new ArrayList<RawQuery.Result<String>>(totalHits); - - for (SolrDocument hit : response.getResults()) { - double score = Double.parseDouble(hit.getFieldValue("score").toString()); - result.add(new RawQuery.Result<String>(hit.getFieldValue(keyIdField).toString(), score)); - } - } catch (IOException e) { - logger.error("Query did not complete : ", e); - throw new PermanentBackendException(e); - } catch (SolrServerException e) { - logger.error("Unable to query Solr index.", e); - throw new PermanentBackendException(e); - } - return result; - } - - private static String escapeValue(Object value) { - return ClientUtils.escapeQueryChars(value.toString()); - } - - public String buildQueryFilter(Condition<TitanElement> condition, KeyInformation.StoreRetriever informations) { - if (condition instanceof PredicateCondition) { - PredicateCondition<String, TitanElement> atom = (PredicateCondition<String, TitanElement>) condition; - Object value = atom.getValue(); - String key = atom.getKey(); - TitanPredicate titanPredicate = atom.getPredicate(); - - if (value instanceof Number) { - String queryValue = escapeValue(value); - Preconditions.checkArgument(titanPredicate instanceof Cmp, "Relation not supported on numeric types: " + titanPredicate); - Cmp numRel = (Cmp) titanPredicate; - switch (numRel) { - case EQUAL: - return (key + ":" + queryValue); - case NOT_EQUAL: - return ("-" + key + ":" + queryValue); - case LESS_THAN: - //use right curly to mean up to but not including value - return (key + ":[* TO " + queryValue + "}"); - case LESS_THAN_EQUAL: - return (key + ":[* TO " + queryValue + "]"); - case GREATER_THAN: - //use left curly to mean greater than but not including value - return (key + ":{" + queryValue + " TO *]"); - case GREATER_THAN_EQUAL: - return (key + ":[" + queryValue + " TO *]"); - default: throw new IllegalArgumentException("Unexpected relation: " + numRel); - } - } else if (value instanceof String) { - Mapping map = getStringMapping(informations.get(key)); - assert map== TEXT || map== STRING; - if (map== TEXT && !titanPredicate.toString().startsWith("CONTAINS")) - throw new IllegalArgumentException("Text mapped string values only support CONTAINS queries and not: " + titanPredicate); - if (map== STRING && titanPredicate.toString().startsWith("CONTAINS")) - throw new IllegalArgumentException("String mapped string values do not support CONTAINS queries: " + titanPredicate); - - //Special case - if (titanPredicate == Text.CONTAINS) { - //e.g. - if terms tomorrow and world were supplied, and fq=text:(tomorrow world) - //sample data set would return 2 documents: one where text = Tomorrow is the World, - //and the second where text = Hello World. Hence, we are decomposing the query string - //and building an AND query explicitly because we need AND semantics - value = ((String) value).toLowerCase(); - List<String> terms = Text.tokenize((String) value); - - if (terms.isEmpty()) { - return ""; - } else if (terms.size() == 1) { - return (key + ":(" + escapeValue(terms.get(0)) + ")"); - } else { - And<TitanElement> andTerms = new And<TitanElement>(); - for (String term : terms) { - andTerms.add(new PredicateCondition<String, TitanElement>(key, titanPredicate, term)); - } - return buildQueryFilter(andTerms, informations); - } - } - if (titanPredicate == Text.PREFIX || titanPredicate == Text.CONTAINS_PREFIX) { - return (key + ":" + escapeValue(value) + "*"); - } else if (titanPredicate == Text.REGEX || titanPredicate == Text.CONTAINS_REGEX) { - return (key + ":/" + value + "/"); - } else if (titanPredicate == EQUAL) { - return (key + ":\"" + escapeValue(value) + "\""); - } else if (titanPredicate == NOT_EQUAL) { - return ("-" + key + ":\"" + escapeValue(value) + "\""); - } else { - throw new IllegalArgumentException("Relation is not supported for string value: " + titanPredicate); - } - } else if (value instanceof Geoshape) { - Geoshape geo = (Geoshape)value; - if (geo.getType() == Geoshape.Type.CIRCLE) { - Geoshape.Point center = geo.getPoint(); - return ("{!geofilt sfield=" + key + - " pt=" + center.getLatitude() + "," + center.getLongitude() + - " d=" + geo.getRadius() + "} distErrPct=0"); //distance in kilometers - } else if (geo.getType() == Geoshape.Type.BOX) { - Geoshape.Point southwest = geo.getPoint(0); - Geoshape.Point northeast = geo.getPoint(1); - return (key + ":[" + southwest.getLatitude() + "," + southwest.getLongitude() + - " TO " + northeast.getLatitude() + "," + northeast.getLongitude() + "]"); - } else if (geo.getType() == Geoshape.Type.POLYGON) { - List<Geoshape.Point> coordinates = getPolygonPoints(geo); - StringBuilder poly = new StringBuilder(key + ":\"IsWithin(POLYGON(("); - for (Geoshape.Point coordinate : coordinates) { - poly.append(coordinate.getLongitude()).append(" ").append(coordinate.getLatitude()).append(", "); - } - //close the polygon with the first coordinate - poly.append(coordinates.get(0).getLongitude()).append(" ").append(coordinates.get(0).getLatitude()); - poly.append(")))\" distErrPct=0"); - return (poly.toString()); - } - } else if (value instanceof Date) { - String queryValue = escapeValue(toIsoDate((Date)value)); - Preconditions.checkArgument(titanPredicate instanceof Cmp, "Relation not supported on date types: " + titanPredicate); - Cmp numRel = (Cmp) titanPredicate; - - switch (numRel) { - case EQUAL: - return (key + ":" + queryValue); - case NOT_EQUAL: - return ("-" + key + ":" + queryValue); - case LESS_THAN: - //use right curly to mean up to but not including value - return (key + ":[* TO " + queryValue + "}"); - case LESS_THAN_EQUAL: - return (key + ":[* TO " + queryValue + "]"); - case GREATER_THAN: - //use left curly to mean greater than but not including value - return (key + ":{" + queryValue + " TO *]"); - case GREATER_THAN_EQUAL: - return (key + ":[" + queryValue + " TO *]"); - default: throw new IllegalArgumentException("Unexpected relation: " + numRel); - } - } else if (value instanceof Boolean) { - Cmp numRel = (Cmp) titanPredicate; - String queryValue = escapeValue(value); - switch (numRel) { - case EQUAL: - return (key + ":" + queryValue); - case NOT_EQUAL: - return ("-" + key + ":" + queryValue); - default: - throw new IllegalArgumentException("Boolean types only support EQUAL or NOT_EQUAL"); - } - } else if (value instanceof UUID) { - if (titanPredicate == EQUAL) { - return (key + ":\"" + escapeValue(value) + "\""); - } else if (titanPredicate == NOT_EQUAL) { - return ("-" + key + ":\"" + escapeValue(value) + "\""); - } else { - throw new IllegalArgumentException("Relation is not supported for uuid value: " + titanPredicate); - } - } else throw new IllegalArgumentException("Unsupported type: " + value); - } else if (condition instanceof Not) { - String sub = buildQueryFilter(((Not)condition).getChild(),informations); - if (StringUtils.isNotBlank(sub)) return "-("+sub+")"; - else return ""; - } else if (condition instanceof And) { - int numChildren = ((And) condition).size(); - StringBuilder sb = new StringBuilder(); - for (Condition<TitanElement> c : condition.getChildren()) { - String sub = buildQueryFilter(c, informations); - - if (StringUtils.isBlank(sub)) - continue; - - // we don't have to add "+" which means AND iff - // a. it's a NOT query, - // b. expression is a single statement in the AND. - if (!sub.startsWith("-") && numChildren > 1) - sb.append("+"); - - sb.append(sub).append(" "); - } - return sb.toString(); - } else if (condition instanceof Or) { - StringBuilder sb = new StringBuilder(); - int element=0; - for (Condition<TitanElement> c : condition.getChildren()) { - String sub = buildQueryFilter(c,informations); - if (StringUtils.isBlank(sub)) continue; - if (element==0) sb.append("("); - else sb.append(" OR "); - sb.append(sub); - element++; - } - if (element>0) sb.append(")"); - return sb.toString(); - } else { - throw new IllegalArgumentException("Invalid condition: " + condition); - } - return null; - } - - private String toIsoDate(Date value) { - TimeZone tz = TimeZone.getTimeZone("UTC"); - DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); - df.setTimeZone(tz); - return df.format(value); - } - - private List<Geoshape.Point> getPolygonPoints(Geoshape polygon) { - List<Geoshape.Point> locations = new ArrayList<Geoshape.Point>(); - - int index = 0; - boolean hasCoordinates = true; - while (hasCoordinates) { - try { - locations.add(polygon.getPoint(index)); - } catch (ArrayIndexOutOfBoundsException ignore) { - //just means we asked for a point past the size of the list - //of known coordinates - hasCoordinates = false; - } - } - - return locations; - } - - /** - * Solr handles all transactions on the server-side. That means all - * commit, optimize, or rollback applies since the last commit/optimize/rollback. - * Solr documentation recommends best way to update Solr is in one process to avoid - * race conditions. - * - * @return New Transaction Handle - * @throws com.thinkaurelius.titan.diskstorage.BackendException - */ - @Override - public BaseTransactionConfigurable beginTransaction(BaseTransactionConfig config) throws BackendException { - return new DefaultTransaction(config); - } - - @Override - public void close() throws BackendException { - logger.trace("Shutting down connection to Solr", solrClient); - try { - solrClient.close(); - } catch (IOException e) { - throw new TemporaryBackendException(e); - } - } - - @Override - public void clearStorage() throws BackendException { - try { - if (mode!=Mode.CLOUD) throw new UnsupportedOperationException("Operation only supported for SolrCloud"); - logger.debug("Clearing storage from Solr: {}", solrClient); - ZkStateReader zkStateReader = ((CloudSolrClient) solrClient).getZkStateReader(); - zkStateReader.updateClusterState(); - ClusterState clusterState = zkStateReader.getClusterState(); - for (String collection : clusterState.getCollections()) { - logger.debug("Clearing collection [{}] in Solr",collection); - UpdateRequest deleteAll = newUpdateRequest(); - deleteAll.deleteByQuery("*:*"); - solrClient.request(deleteAll, collection); - } - - } catch (SolrServerException e) { - logger.error("Unable to clear storage from index due to server error on Solr.", e); - throw new PermanentBackendException(e); - } catch (IOException e) { - logger.error("Unable to clear storage from index due to low-level I/O error.", e); - throw new PermanentBackendException(e); - } catch (Exception e) { - logger.error("Unable to clear storage from index due to general error.", e); - throw new PermanentBackendException(e); - } - } - - @Override - public boolean supports(KeyInformation information, TitanPredicate titanPredicate) { - Class<?> dataType = information.getDataType(); - Mapping mapping = getMapping(information); - if (mapping!= DEFAULT && !AttributeUtil.isString(dataType)) return false; - - if (Number.class.isAssignableFrom(dataType)) { - return titanPredicate instanceof Cmp; - } else if (dataType == Geoshape.class) { - return titanPredicate == Geo.WITHIN; - } else if (AttributeUtil.isString(dataType)) { - switch(mapping) { - case DEFAULT: - case TEXT: - return titanPredicate == Text.CONTAINS || titanPredicate == Text.CONTAINS_PREFIX || titanPredicate == Text.CONTAINS_REGEX; - case STRING: - return titanPredicate == EQUAL || titanPredicate== NOT_EQUAL || titanPredicate==Text.REGEX || titanPredicate==Text.PREFIX; - // case TEXTSTRING: - // return (titanPredicate instanceof Text) || titanPredicate == Cmp.EQUAL || titanPredicate==Cmp.NOT_EQUAL; - } - } else if (dataType == Date.class) { - if (titanPredicate instanceof Cmp) return true; - } else if (dataType == Boolean.class) { - return titanPredicate == EQUAL || titanPredicate == NOT_EQUAL; - } else if (dataType == UUID.class) { - return titanPredicate == EQUAL || titanPredicate== NOT_EQUAL; - } - return false; - } - - @Override - public boolean supports(KeyInformation information) { - Class<?> dataType = information.getDataType(); - Mapping mapping = getMapping(information); - if (Number.class.isAssignableFrom(dataType) || dataType == Geoshape.class || dataType == Date.class || dataType == Boolean.class || dataType == UUID.class) { - if (mapping== DEFAULT) return true; - } else if (AttributeUtil.isString(dataType)) { - if (mapping== DEFAULT || mapping== TEXT || mapping== STRING) return true; - } - return false; - } - - @Override - public String mapKey2Field(String key, KeyInformation keyInfo) { - Preconditions.checkArgument(!StringUtils.containsAny(key, new char[]{' '}),"Invalid key name provided: %s",key); - if (!dynFields) return key; - if (ParameterType.MAPPED_NAME.hasParameter(keyInfo.getParameters())) return key; - String postfix; - Class datatype = keyInfo.getDataType(); - if (AttributeUtil.isString(datatype)) { - Mapping map = getStringMapping(keyInfo); - switch (map) { - case TEXT: postfix = "_t"; break; - case STRING: postfix = "_s"; break; - default: throw new IllegalArgumentException("Unsupported string mapping: " + map); - } - } else if (AttributeUtil.isWholeNumber(datatype)) { - if (datatype.equals(Long.class)) postfix = "_l"; - else postfix = "_i"; - } else if (AttributeUtil.isDecimal(datatype)) { - if (datatype.equals(Float.class)) postfix = "_f"; - else postfix = "_d"; - } else if (datatype.equals(Geoshape.class)) { - postfix = "_g"; - } else if (datatype.equals(Date.class)) { - postfix = "_dt"; - } else if (datatype.equals(Boolean.class)) { - postfix = "_b"; - } else if (datatype.equals(UUID.class)) { - postfix = "_uuid"; - } else throw new IllegalArgumentException("Unsupported data type ["+datatype+"] for field: " + key); - return key+postfix; - } - - @Override - public IndexFeatures getFeatures() { - return SOLR_FEATURES; - } - - /* - ################# UTILITY METHODS ####################### - */ - - private static Mapping getStringMapping(KeyInformation information) { - assert AttributeUtil.isString(information.getDataType()); - Mapping map = getMapping(information); - if (map== DEFAULT) map = TEXT; - return map; - } - - private UpdateRequest newUpdateRequest() { - UpdateRequest req = new UpdateRequest(); - req.setAction(UpdateRequest.ACTION.COMMIT, true, true); - if (waitSearcher) { - req.setAction(UpdateRequest.ACTION.COMMIT, true, true); - } - return req; - } - - private BackendException storageException(Exception solrException) { - return new TemporaryBackendException("Unable to complete query on Solr.", solrException); - } - - private static void createCollectionIfNotExists(CloudSolrClient client, Configuration config, String collection) - throws IOException, SolrServerException, KeeperException, InterruptedException { - if (!checkIfCollectionExists(client, collection)) { - Integer numShards = config.get(NUM_SHARDS); - Integer maxShardsPerNode = config.get(MAX_SHARDS_PER_NODE); - Integer replicationFactor = config.get(REPLICATION_FACTOR); - - CollectionAdminRequest.Create createRequest = new CollectionAdminRequest.Create(); - - createRequest.setConfigName(collection); - createRequest.setCollectionName(collection); - createRequest.setNumShards(numShards); - createRequest.setMaxShardsPerNode(maxShardsPerNode); - createRequest.setReplicationFactor(replicationFactor); - - CollectionAdminResponse createResponse = createRequest.process(client); - if (createResponse.isSuccess()) { - logger.trace("Collection {} successfully created.", collection); - } else { - throw new SolrServerException(Joiner.on("\n").join(createResponse.getErrorMessages())); - } - } - - waitForRecoveriesToFinish(client, collection); - } - - /** - * Checks if the collection has already been created in Solr. - */ - private static boolean checkIfCollectionExists(CloudSolrClient server, String collection) throws KeeperException, InterruptedException { - ZkStateReader zkStateReader = server.getZkStateReader(); - zkStateReader.updateClusterState(); - ClusterState clusterState = zkStateReader.getClusterState(); - return clusterState.getCollectionOrNull(collection) != null; - } - - /** - * Wait for all the collection shards to be ready. - */ - private static void waitForRecoveriesToFinish(CloudSolrClient server, String collection) throws KeeperException, InterruptedException { - ZkStateReader zkStateReader = server.getZkStateReader(); - try { - boolean cont = true; - - while (cont) { - boolean sawLiveRecovering = false; - zkStateReader.updateClusterState(); - ClusterState clusterState = zkStateReader.getClusterState(); - Map<String, Slice> slices = clusterState.getSlicesMap(collection); - Preconditions.checkNotNull("Could not find collection:" + collection, slices); - - for (Map.Entry<String, Slice> entry : slices.entrySet()) { - Map<String, Replica> shards = entry.getValue().getReplicasMap(); - for (Map.Entry<String, Replica> shard : shards.entrySet()) { - String state = shard.getValue().getStr(ZkStateReader.STATE_PROP); - if ((state.equals(Replica.State.RECOVERING) - || state.equals(Replica.State.DOWN)) - && clusterState.liveNodesContain(shard.getValue().getStr( - ZkStateReader.NODE_NAME_PROP))) { - sawLiveRecovering = true; - } - } - } - if (!sawLiveRecovering) { - cont = false; - } else { - Thread.sleep(1000); - } - } - } finally { - logger.info("Exiting solr wait"); - } - } - - private static class GeoToWktConverter { - /** - * {@link com.thinkaurelius.titan.core.attribute.Geoshape} stores Points in the String format: point[X.0,Y.0]. - * Solr needs it to be in Well-Known Text format: POINT(X.0 Y.0) - */ - static String convertToWktString(Geoshape fieldValue) throws BackendException { - if (fieldValue.getType() == Geoshape.Type.POINT) { - Geoshape.Point point = fieldValue.getPoint(); - return "POINT(" + point.getLongitude() + " " + point.getLatitude() + ")"; - } else { - throw new PermanentBackendException("Cannot index " + fieldValue.getType()); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/titan/src/main/java/com/thinkaurelius/titan/graphdb/query/graph/GraphCentricQueryBuilder.java ---------------------------------------------------------------------- diff --git a/titan/src/main/java/com/thinkaurelius/titan/graphdb/query/graph/GraphCentricQueryBuilder.java b/titan/src/main/java/com/thinkaurelius/titan/graphdb/query/graph/GraphCentricQueryBuilder.java deleted file mode 100644 index c1a983b..0000000 --- a/titan/src/main/java/com/thinkaurelius/titan/graphdb/query/graph/GraphCentricQueryBuilder.java +++ /dev/null @@ -1,457 +0,0 @@ -/* - * Copyright 2012-2013 Aurelius LLC - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.thinkaurelius.titan.graphdb.query.graph; - -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.collect.*; -import com.thinkaurelius.titan.core.*; -import com.thinkaurelius.titan.core.attribute.Cmp; -import com.thinkaurelius.titan.core.Cardinality; -import com.thinkaurelius.titan.core.schema.SchemaStatus; -import com.thinkaurelius.titan.core.schema.TitanSchemaType; -import com.thinkaurelius.titan.graphdb.database.IndexSerializer; -import com.thinkaurelius.titan.graphdb.internal.ElementCategory; -import com.thinkaurelius.titan.graphdb.internal.InternalRelationType; -import com.thinkaurelius.titan.graphdb.internal.OrderList; -import com.thinkaurelius.titan.graphdb.query.*; -import com.thinkaurelius.titan.graphdb.query.condition.*; -import com.thinkaurelius.titan.graphdb.transaction.StandardTitanTx; -import com.thinkaurelius.titan.graphdb.types.*; -import com.thinkaurelius.titan.graphdb.types.system.ImplicitKey; -import com.thinkaurelius.titan.util.datastructures.Interval; -import com.thinkaurelius.titan.util.datastructures.PointInterval; -import com.tinkerpop.blueprints.Edge; -import com.tinkerpop.blueprints.Vertex; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; -import java.util.*; - -/** - * - * Builds a {@link TitanGraphQuery}, optimizes the query and compiles the result into a {@link GraphCentricQuery} which - * is then executed through a {@link QueryProcessor}. - * This class from titan-0.5.4 has no major changes except adding a few logs for debugging index usage - * - * @author Matthias Broecheler ([email protected]) - */ -public class GraphCentricQueryBuilder implements TitanGraphQuery<GraphCentricQueryBuilder> { - - private static final Logger log = LoggerFactory.getLogger(GraphCentricQueryBuilder.class); - - /** - * Transaction in which this query is executed. - */ - private final StandardTitanTx tx; - /** - * Serializer used to serialize the query conditions into backend queries. - */ - private final IndexSerializer serializer; - /** - * The constraints added to this query. None by default. - */ - private List<PredicateCondition<String, TitanElement>> constraints; - /** - * The order in which the elements should be returned. None by default. - */ - private OrderList orders = new OrderList(); - /** - * The limit of this query. No limit by default. - */ - private int limit = Query.NO_LIMIT; - - public GraphCentricQueryBuilder(StandardTitanTx tx, IndexSerializer serializer) { - log.debug("Loaded shaded version of class GraphCentricQueryBuilder"); - Preconditions.checkNotNull(tx); - Preconditions.checkNotNull(serializer); - this.tx = tx; - this.serializer = serializer; - this.constraints = new ArrayList<PredicateCondition<String, TitanElement>>(5); - } - - /* --------------------------------------------------------------- - * Query Construction - * --------------------------------------------------------------- - */ - - private GraphCentricQueryBuilder has(String key, TitanPredicate predicate, Object condition) { - Preconditions.checkNotNull(key); - Preconditions.checkNotNull(predicate); - Preconditions.checkArgument(predicate.isValidCondition(condition), "Invalid condition: %s", condition); - constraints.add(new PredicateCondition<String, TitanElement>(key, predicate, condition)); - return this; - } - - @Override - public GraphCentricQueryBuilder has(String key, com.tinkerpop.blueprints.Predicate predicate, Object condition) { - Preconditions.checkNotNull(key); - TitanPredicate titanPredicate = TitanPredicate.Converter.convert(predicate); - return has(key, titanPredicate, condition); - } - - @Override - public GraphCentricQueryBuilder has(PropertyKey key, TitanPredicate predicate, Object condition) { - Preconditions.checkNotNull(key); - Preconditions.checkNotNull(predicate); - return has(key.getName(), predicate, condition); - } - - @Override - public GraphCentricQueryBuilder has(String key) { - return has(key, Cmp.NOT_EQUAL, (Object) null); - } - - @Override - public GraphCentricQueryBuilder hasNot(String key) { - return has(key, Cmp.EQUAL, (Object) null); - } - - @Override - @Deprecated - public <T extends Comparable<T>> GraphCentricQueryBuilder has(String s, T t, Compare compare) { - return has(s, compare, t); - } - - @Override - public GraphCentricQueryBuilder has(String key, Object value) { - return has(key, Cmp.EQUAL, value); - } - - @Override - public GraphCentricQueryBuilder hasNot(String key, Object value) { - return has(key, Cmp.NOT_EQUAL, value); - } - - @Override - public <T extends Comparable<?>> GraphCentricQueryBuilder interval(String s, T t1, T t2) { - has(s, Cmp.GREATER_THAN_EQUAL, t1); - return has(s, Cmp.LESS_THAN, t2); - } - - @Override - public GraphCentricQueryBuilder limit(final int limit) { - Preconditions.checkArgument(limit >= 0, "Non-negative limit expected: %s", limit); - this.limit = limit; - return this; - } - - @Override - public GraphCentricQueryBuilder orderBy(String key, Order order) { - Preconditions.checkArgument(tx.containsPropertyKey(key),"Provided key does not exist: %s",key); - return orderBy(tx.getPropertyKey(key), order); - } - - @Override - public GraphCentricQueryBuilder orderBy(PropertyKey key, Order order) { - Preconditions.checkArgument(key!=null && order!=null,"Need to specify and key and an order"); - Preconditions.checkArgument(Comparable.class.isAssignableFrom(key.getDataType()), - "Can only order on keys with comparable data type. [%s] has datatype [%s]", key.getName(), key.getDataType()); - Preconditions.checkArgument(key.getCardinality()== Cardinality.SINGLE, "Ordering is undefined on multi-valued key [%s]", key.getName()); - Preconditions.checkArgument(!orders.containsKey(key)); - orders.add(key, order); - return this; - } - - /* --------------------------------------------------------------- - * Query Execution - * --------------------------------------------------------------- - */ - - @Override - public Iterable<Vertex> vertices() { - GraphCentricQuery query = constructQuery(ElementCategory.VERTEX); - return Iterables.filter(new QueryProcessor<GraphCentricQuery, TitanElement, JointIndexQuery>(query, tx.elementProcessor), Vertex.class); - } - - @Override - public Iterable<Edge> edges() { - GraphCentricQuery query = constructQuery(ElementCategory.EDGE); - return Iterables.filter(new QueryProcessor<GraphCentricQuery, TitanElement, JointIndexQuery>(query, tx.elementProcessor), Edge.class); - } - - @Override - public Iterable<TitanProperty> properties() { - GraphCentricQuery query = constructQuery(ElementCategory.PROPERTY); - return Iterables.filter(new QueryProcessor<GraphCentricQuery, TitanElement, JointIndexQuery>(query, tx.elementProcessor), TitanProperty.class); - } - - private QueryDescription describe(ElementCategory category) { - return new StandardQueryDescription(1,constructQuery(category)); - } - - public QueryDescription describeForVertices() { - return describe(ElementCategory.VERTEX); - } - - public QueryDescription describeForEdges() { - return describe(ElementCategory.EDGE); - } - - public QueryDescription describeForProperties() { - return describe(ElementCategory.PROPERTY); - } - - /* --------------------------------------------------------------- - * Query Construction - * --------------------------------------------------------------- - */ - - private static final int DEFAULT_NO_LIMIT = 1000; - private static final int MAX_BASE_LIMIT = 20000; - private static final int HARD_MAX_LIMIT = 100000; - - private static final double EQUAL_CONDITION_SCORE = 4; - private static final double OTHER_CONDITION_SCORE = 1; - private static final double ORDER_MATCH = 2; - private static final double ALREADY_MATCHED_ADJUSTOR = 0.1; - private static final double CARDINALITY_SINGE_SCORE = 1000; - private static final double CARDINALITY_OTHER_SCORE = 1000; - - public GraphCentricQuery constructQuery(final ElementCategory resultType) { - Preconditions.checkNotNull(resultType); - if (limit == 0) return GraphCentricQuery.emptyQuery(resultType); - - //Prepare constraints - And<TitanElement> conditions = QueryUtil.constraints2QNF(tx, constraints); - if (conditions == null) return GraphCentricQuery.emptyQuery(resultType); - - //Prepare orders - orders.makeImmutable(); - if (orders.isEmpty()) orders = OrderList.NO_ORDER; - - //Compile all indexes that cover at least one of the query conditions - final Set<IndexType> indexCandidates = new HashSet<IndexType>(); - ConditionUtil.traversal(conditions,new Predicate<Condition<TitanElement>>() { - @Override - public boolean apply(@Nullable Condition<TitanElement> condition) { - if (condition instanceof PredicateCondition) { - RelationType type = ((PredicateCondition<RelationType,TitanElement>)condition).getKey(); - Preconditions.checkArgument(type!=null && type.isPropertyKey()); - Iterables.addAll(indexCandidates,Iterables.filter(((InternalRelationType) type).getKeyIndexes(), new Predicate<IndexType>() { - @Override - public boolean apply(@Nullable IndexType indexType) { - return indexType.getElement()==resultType; - } - })); - } - return true; - } - }); - - /* - Determine the best join index query to answer this query: - Iterate over all potential indexes (as compiled above) and compute a score based on how many clauses - this index covers. The index with the highest score (as long as it covers at least one additional clause) - is picked and added to the joint query for as long as such exist. - */ - JointIndexQuery jointQuery = new JointIndexQuery(); - boolean isSorted = orders.isEmpty(); - Set<Condition> coveredClauses = Sets.newHashSet(); - while (true) { - IndexType bestCandidate = null; - double candidateScore = 0.0; - Set<Condition> candidateSubcover = null; - boolean candidateSupportsSort = false; - Object candidateSubcondition = null; - - for (IndexType index : indexCandidates) { - Set<Condition> subcover = Sets.newHashSet(); - Object subcondition; - boolean supportsSort = orders.isEmpty(); - //Check that this index actually applies in case of a schema constraint - if (index.hasSchemaTypeConstraint()) { - TitanSchemaType type = index.getSchemaTypeConstraint(); - Map.Entry<Condition,Collection<Object>> equalCon = getEqualityConditionValues(conditions,ImplicitKey.LABEL); - if (equalCon==null) continue; - Collection<Object> labels = equalCon.getValue(); - assert labels.size()>=1; - if (labels.size()>1) { - log.warn("The query optimizer currently does not support multiple label constraints in query: {}",this); - continue; - } - if (!type.getName().equals((String)Iterables.getOnlyElement(labels))) continue; - subcover.add(equalCon.getKey()); - } - - if (index.isCompositeIndex()) { - subcondition = indexCover((CompositeIndexType) index,conditions,subcover); - } else { - subcondition = indexCover((MixedIndexType) index,conditions,serializer,subcover); - if (coveredClauses.isEmpty() && !supportsSort - && indexCoversOrder((MixedIndexType)index,orders)) supportsSort=true; - } - if (subcondition==null) continue; - assert !subcover.isEmpty(); - double score = 0.0; - boolean coversAdditionalClause = false; - for (Condition c : subcover) { - double s = (c instanceof PredicateCondition && ((PredicateCondition)c).getPredicate()==Cmp.EQUAL)? - EQUAL_CONDITION_SCORE:OTHER_CONDITION_SCORE; - if (coveredClauses.contains(c)) s=s*ALREADY_MATCHED_ADJUSTOR; - else coversAdditionalClause = true; - score+=s; - if (index.isCompositeIndex()) - score+=((CompositeIndexType)index).getCardinality()==Cardinality.SINGLE? - CARDINALITY_SINGE_SCORE:CARDINALITY_OTHER_SCORE; - } - if (supportsSort) score+=ORDER_MATCH; - if (coversAdditionalClause && score>candidateScore) { - candidateScore=score; - bestCandidate=index; - candidateSubcover = subcover; - candidateSubcondition = subcondition; - candidateSupportsSort = supportsSort; - } - } - if (bestCandidate!=null) { - if (coveredClauses.isEmpty()) isSorted=candidateSupportsSort; - coveredClauses.addAll(candidateSubcover); - - log.debug("Index chosen for query {} {} " , bestCandidate.isCompositeIndex() ? "COMPOSITE" : "MIXED", coveredClauses); - if (bestCandidate.isCompositeIndex()) { - jointQuery.add((CompositeIndexType)bestCandidate, - serializer.getQuery((CompositeIndexType)bestCandidate,(List<Object[]>)candidateSubcondition)); - } else { - jointQuery.add((MixedIndexType)bestCandidate, - serializer.getQuery((MixedIndexType)bestCandidate,(Condition)candidateSubcondition,orders)); - } - } else { - break; - } - /* TODO: smarter optimization: - - use in-memory histograms to estimate selectivity of PredicateConditions and filter out low-selectivity ones - if they would result in an individual index call (better to filter afterwards in memory) - - move OR's up and extend GraphCentricQuery to allow multiple JointIndexQuery for proper or'ing of queries - */ - } - - BackendQueryHolder<JointIndexQuery> query; - if (!coveredClauses.isEmpty()) { - int indexLimit = limit == Query.NO_LIMIT ? HARD_MAX_LIMIT : limit; - if (tx.getGraph().getConfiguration().adjustQueryLimit()) { - indexLimit = limit == Query.NO_LIMIT ? DEFAULT_NO_LIMIT : Math.min(MAX_BASE_LIMIT, limit); - } - indexLimit = Math.min(HARD_MAX_LIMIT, QueryUtil.adjustLimitForTxModifications(tx, coveredClauses.size(), indexLimit)); - jointQuery.setLimit(indexLimit); - query = new BackendQueryHolder<JointIndexQuery>(jointQuery, coveredClauses.size()==conditions.numChildren(), isSorted, null); - } else { - query = new BackendQueryHolder<JointIndexQuery>(new JointIndexQuery(), false, isSorted, null); - } - - return new GraphCentricQuery(resultType, conditions, orders, query, limit); - } - - public static final boolean indexCoversOrder(MixedIndexType index, OrderList orders) { - for (int i = 0; i < orders.size(); i++) { - if (!index.indexesKey(orders.getKey(i))) return false; - } - return true; - } - - public static List<Object[]> indexCover(final CompositeIndexType index, Condition<TitanElement> condition, Set<Condition> covered) { - assert QueryUtil.isQueryNormalForm(condition); - assert condition instanceof And; - if (index.getStatus()!= SchemaStatus.ENABLED) return null; - IndexField[] fields = index.getFieldKeys(); - Object[] indexValues = new Object[fields.length]; - Set<Condition> coveredClauses = new HashSet<Condition>(fields.length); - List<Object[]> indexCovers = new ArrayList<Object[]>(4); - - constructIndexCover(indexValues,0,fields,condition,indexCovers,coveredClauses); - if (!indexCovers.isEmpty()) { - covered.addAll(coveredClauses); - return indexCovers; - } else return null; - } - - private static void constructIndexCover(Object[] indexValues, int position, IndexField[] fields, - Condition<TitanElement> condition, - List<Object[]> indexCovers, Set<Condition> coveredClauses) { - if (position>=fields.length) { - indexCovers.add(indexValues); - } else { - IndexField field = fields[position]; - Map.Entry<Condition,Collection<Object>> equalCon = getEqualityConditionValues(condition,field.getFieldKey()); - if (equalCon!=null) { - coveredClauses.add(equalCon.getKey()); - assert equalCon.getValue().size()>0; - for (Object value : equalCon.getValue()) { - Object[] newValues = Arrays.copyOf(indexValues,fields.length); - newValues[position]=value; - constructIndexCover(newValues,position+1,fields,condition,indexCovers,coveredClauses); - } - } else return; - } - - } - - private static final Map.Entry<Condition,Collection<Object>> getEqualityConditionValues(Condition<TitanElement> condition, RelationType type) { - for (Condition c : condition.getChildren()) { - if (c instanceof Or) { - Map.Entry<RelationType,Collection> orEqual = QueryUtil.extractOrCondition((Or)c); - if (orEqual!=null && orEqual.getKey().equals(type) && !orEqual.getValue().isEmpty()) { - return new AbstractMap.SimpleImmutableEntry(c,orEqual.getValue()); - } - } else if (c instanceof PredicateCondition) { - PredicateCondition<RelationType, TitanRelation> atom = (PredicateCondition)c; - if (atom.getKey().equals(type) && atom.getPredicate()==Cmp.EQUAL && atom.getValue()!=null) { - return new AbstractMap.SimpleImmutableEntry(c,ImmutableList.of(atom.getValue())); - } - } - - } - return null; - } - - public static final Condition<TitanElement> indexCover(final MixedIndexType index, Condition<TitanElement> condition, - final IndexSerializer indexInfo, final Set<Condition> covered) { - assert QueryUtil.isQueryNormalForm(condition); - assert condition instanceof And; - And<TitanElement> subcondition = new And<TitanElement>(condition.numChildren()); - for (Condition<TitanElement> subclause : condition.getChildren()) { - if (coversAll(index,subclause,indexInfo)) { - subcondition.add(subclause); - covered.add(subclause); - } - } - return subcondition.isEmpty()?null:subcondition; - } - - private static final boolean coversAll(final MixedIndexType index, Condition<TitanElement> condition, IndexSerializer indexInfo) { - if (condition.getType()==Condition.Type.LITERAL) { - if (!(condition instanceof PredicateCondition)) return false; - PredicateCondition<RelationType, TitanElement> atom = (PredicateCondition) condition; - if (atom.getValue()==null) return false; - - Preconditions.checkArgument(atom.getKey().isPropertyKey()); - PropertyKey key = (PropertyKey) atom.getKey(); - ParameterIndexField[] fields = index.getFieldKeys(); - ParameterIndexField match = null; - for (int i = 0; i < fields.length; i++) { - if (fields[i].getStatus()!= SchemaStatus.ENABLED) continue; - if (fields[i].getFieldKey().equals(key)) match = fields[i]; - } - if (match==null) return false; - return indexInfo.supports(index,match,atom.getPredicate()); - } else { - for (Condition<TitanElement> child : condition.getChildren()) { - if (!coversAll(index,child,indexInfo)) return false; - } - return true; - } - } -}
