http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/impl/pb/ContainerStartDataPBImpl.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/impl/pb/ContainerStartDataPBImpl.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/impl/pb/ContainerStartDataPBImpl.java deleted file mode 100644 index 6d248b2..0000000 --- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/impl/pb/ContainerStartDataPBImpl.java +++ /dev/null @@ -1,258 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb; - -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; -import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ContainerStartDataProto; -import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ContainerStartDataProtoOrBuilder; -import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; -import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; -import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto; -import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; -import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData; - -import com.google.protobuf.TextFormat; - -public class ContainerStartDataPBImpl extends ContainerStartData { - - ContainerStartDataProto proto = ContainerStartDataProto.getDefaultInstance(); - ContainerStartDataProto.Builder builder = null; - boolean viaProto = false; - - private ContainerId containerId; - private Resource resource; - private NodeId nodeId; - private Priority priority; - - public ContainerStartDataPBImpl() { - builder = ContainerStartDataProto.newBuilder(); - } - - public ContainerStartDataPBImpl(ContainerStartDataProto proto) { - this.proto = proto; - viaProto = true; - } - - @Override - public ContainerId getContainerId() { - if (this.containerId != null) { - return this.containerId; - } - ContainerStartDataProtoOrBuilder p = viaProto ? proto : builder; - if (!p.hasContainerId()) { - return null; - } - this.containerId = convertFromProtoFormat(p.getContainerId()); - return this.containerId; - } - - @Override - public void setContainerId(ContainerId containerId) { - maybeInitBuilder(); - if (containerId == null) { - builder.clearContainerId(); - } - this.containerId = containerId; - } - - @Override - public Resource getAllocatedResource() { - if (this.resource != null) { - return this.resource; - } - ContainerStartDataProtoOrBuilder p = viaProto ? proto : builder; - if (!p.hasAllocatedResource()) { - return null; - } - this.resource = convertFromProtoFormat(p.getAllocatedResource()); - return this.resource; - } - - @Override - public void setAllocatedResource(Resource resource) { - maybeInitBuilder(); - if (resource == null) { - builder.clearAllocatedResource(); - } - this.resource = resource; - } - - @Override - public NodeId getAssignedNode() { - if (this.nodeId != null) { - return this.nodeId; - } - ContainerStartDataProtoOrBuilder p = viaProto ? proto : builder; - if (!p.hasAssignedNodeId()) { - return null; - } - this.nodeId = convertFromProtoFormat(p.getAssignedNodeId()); - return this.nodeId; - } - - @Override - public void setAssignedNode(NodeId nodeId) { - maybeInitBuilder(); - if (nodeId == null) { - builder.clearAssignedNodeId(); - } - this.nodeId = nodeId; - } - - @Override - public Priority getPriority() { - if (this.priority != null) { - return this.priority; - } - ContainerStartDataProtoOrBuilder p = viaProto ? proto : builder; - if (!p.hasPriority()) { - return null; - } - this.priority = convertFromProtoFormat(p.getPriority()); - return this.priority; - } - - @Override - public void setPriority(Priority priority) { - maybeInitBuilder(); - if (priority == null) { - builder.clearPriority(); - } - this.priority = priority; - } - - @Override - public long getStartTime() { - ContainerStartDataProtoOrBuilder p = viaProto ? proto : builder; - return p.getStartTime(); - } - - @Override - public void setStartTime(long startTime) { - maybeInitBuilder(); - builder.setStartTime(startTime); - } - - public ContainerStartDataProto getProto() { - mergeLocalToProto(); - proto = viaProto ? proto : builder.build(); - viaProto = true; - return proto; - } - - @Override - public int hashCode() { - return getProto().hashCode(); - } - - @Override - public boolean equals(Object other) { - if (other == null) - return false; - if (other.getClass().isAssignableFrom(this.getClass())) { - return this.getProto().equals(this.getClass().cast(other).getProto()); - } - return false; - } - - @Override - public String toString() { - return TextFormat.shortDebugString(getProto()); - } - - private void mergeLocalToBuilder() { - if (this.containerId != null - && !((ContainerIdPBImpl) this.containerId).getProto().equals( - builder.getContainerId())) { - builder.setContainerId(convertToProtoFormat(this.containerId)); - } - if (this.resource != null - && !((ResourcePBImpl) this.resource).getProto().equals( - builder.getAllocatedResource())) { - builder.setAllocatedResource(convertToProtoFormat(this.resource)); - } - if (this.nodeId != null - && !((NodeIdPBImpl) this.nodeId).getProto().equals( - builder.getAssignedNodeId())) { - builder.setAssignedNodeId(convertToProtoFormat(this.nodeId)); - } - if (this.priority != null - && !((PriorityPBImpl) this.priority).getProto().equals( - builder.getPriority())) { - builder.setPriority(convertToProtoFormat(this.priority)); - } - } - - private void mergeLocalToProto() { - if (viaProto) { - maybeInitBuilder(); - } - mergeLocalToBuilder(); - proto = builder.build(); - viaProto = true; - } - - private void maybeInitBuilder() { - if (viaProto || builder == null) { - builder = ContainerStartDataProto.newBuilder(proto); - } - viaProto = false; - } - - private ContainerIdProto convertToProtoFormat(ContainerId containerId) { - return ((ContainerIdPBImpl) containerId).getProto(); - } - - private ContainerIdPBImpl - convertFromProtoFormat(ContainerIdProto containerId) { - return new ContainerIdPBImpl(containerId); - } - - private ResourceProto convertToProtoFormat(Resource resource) { - return ((ResourcePBImpl) resource).getProto(); - } - - private ResourcePBImpl convertFromProtoFormat(ResourceProto resource) { - return new ResourcePBImpl(resource); - } - - private NodeIdProto convertToProtoFormat(NodeId nodeId) { - return ((NodeIdPBImpl) nodeId).getProto(); - } - - private NodeIdPBImpl convertFromProtoFormat(NodeIdProto nodeId) { - return new NodeIdPBImpl(nodeId); - } - - private PriorityProto convertToProtoFormat(Priority priority) { - return ((PriorityPBImpl) priority).getProto(); - } - - private PriorityPBImpl convertFromProtoFormat(PriorityProto priority) { - return new PriorityPBImpl(priority); - } - -}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/EntityIdentifier.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/EntityIdentifier.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/EntityIdentifier.java deleted file mode 100644 index 4b202d8..0000000 --- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/EntityIdentifier.java +++ /dev/null @@ -1,100 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline; - -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; - -/** - * The unique identifier for an entity - */ -@Private -@Unstable -public class EntityIdentifier implements Comparable<EntityIdentifier> { - - private String id; - private String type; - - public EntityIdentifier(String id, String type) { - this.id = id; - this.type = type; - } - - /** - * Get the entity Id. - * @return The entity Id. - */ - public String getId() { - return id; - } - - /** - * Get the entity type. - * @return The entity type. - */ - public String getType() { - return type; - } - - @Override - public int compareTo(EntityIdentifier other) { - int c = type.compareTo(other.type); - if (c != 0) return c; - return id.compareTo(other.id); - } - - @Override - public int hashCode() { - // generated by eclipse - final int prime = 31; - int result = 1; - result = prime * result + ((id == null) ? 0 : id.hashCode()); - result = prime * result + ((type == null) ? 0 : type.hashCode()); - return result; - } - - @Override - public boolean equals(Object obj) { - // generated by eclipse - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - EntityIdentifier other = (EntityIdentifier) obj; - if (id == null) { - if (other.id != null) - return false; - } else if (!id.equals(other.id)) - return false; - if (type == null) { - if (other.type != null) - return false; - } else if (!type.equals(other.type)) - return false; - return true; - } - - @Override - public String toString() { - return "{ id: " + id + ", type: "+ type + " }"; - } - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/GenericObjectMapper.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/GenericObjectMapper.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/GenericObjectMapper.java deleted file mode 100644 index b1846a3..0000000 --- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/GenericObjectMapper.java +++ /dev/null @@ -1,135 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline; - -import java.io.IOException; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.map.ObjectReader; -import org.codehaus.jackson.map.ObjectWriter; - -/** - * A utility class providing methods for serializing and deserializing - * objects. The {@link #write(Object)} and {@link #read(byte[])} methods are - * used by the {@link LeveldbTimelineStore} to store and retrieve arbitrary - * JSON, while the {@link #writeReverseOrderedLong} and {@link - * #readReverseOrderedLong} methods are used to sort entities in descending - * start time order. - */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -public class GenericObjectMapper { - private static final byte[] EMPTY_BYTES = new byte[0]; - - public static final ObjectReader OBJECT_READER; - public static final ObjectWriter OBJECT_WRITER; - - static { - ObjectMapper mapper = new ObjectMapper(); - OBJECT_READER = mapper.reader(Object.class); - OBJECT_WRITER = mapper.writer(); - } - - /** - * Serializes an Object into a byte array. Along with {@link #read(byte[])}, - * can be used to serialize an Object and deserialize it into an Object of - * the same type without needing to specify the Object's type, - * as long as it is one of the JSON-compatible objects understood by - * ObjectMapper. - * - * @param o An Object - * @return A byte array representation of the Object - * @throws IOException if there is a write error - */ - public static byte[] write(Object o) throws IOException { - if (o == null) { - return EMPTY_BYTES; - } - return OBJECT_WRITER.writeValueAsBytes(o); - } - - /** - * Deserializes an Object from a byte array created with - * {@link #write(Object)}. - * - * @param b A byte array - * @return An Object - * @throws IOException if there is a read error - */ - public static Object read(byte[] b) throws IOException { - return read(b, 0); - } - - /** - * Deserializes an Object from a byte array at a specified offset, assuming - * the bytes were created with {@link #write(Object)}. - * - * @param b A byte array - * @param offset Offset into the array - * @return An Object - * @throws IOException if there is a read error - */ - public static Object read(byte[] b, int offset) throws IOException { - if (b == null || b.length == 0) { - return null; - } - return OBJECT_READER.readValue(b, offset, b.length - offset); - } - - /** - * Converts a long to a 8-byte array so that lexicographic ordering of the - * produced byte arrays sort the longs in descending order. - * - * @param l A long - * @return A byte array - */ - public static byte[] writeReverseOrderedLong(long l) { - byte[] b = new byte[8]; - return writeReverseOrderedLong(l, b, 0); - } - - public static byte[] writeReverseOrderedLong(long l, byte[] b, int offset) { - b[offset] = (byte)(0x7f ^ ((l >> 56) & 0xff)); - for (int i = offset+1; i < offset+7; i++) { - b[i] = (byte)(0xff ^ ((l >> 8*(7-i)) & 0xff)); - } - b[offset+7] = (byte)(0xff ^ (l & 0xff)); - return b; - } - - /** - * Reads 8 bytes from an array starting at the specified offset and - * converts them to a long. The bytes are assumed to have been created - * with {@link #writeReverseOrderedLong}. - * - * @param b A byte array - * @param offset An offset into the byte array - * @return A long - */ - public static long readReverseOrderedLong(byte[] b, int offset) { - long l = b[offset] & 0xff; - for (int i = 1; i < 8; i++) { - l = l << 8; - l = l | (b[offset+i]&0xff); - } - return l ^ 0x7fffffffffffffffl; - } - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/LeveldbTimelineStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/LeveldbTimelineStore.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/LeveldbTimelineStore.java deleted file mode 100644 index edd4842..0000000 --- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/LeveldbTimelineStore.java +++ /dev/null @@ -1,1473 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline; - -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeMap; -import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.collections.map.LRUMap; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.WritableComparator; -import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents; -import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity; -import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.fusesource.leveldbjni.JniDBFactory; -import org.iq80.leveldb.DB; -import org.iq80.leveldb.DBIterator; -import org.iq80.leveldb.Options; -import org.iq80.leveldb.ReadOptions; -import org.iq80.leveldb.WriteBatch; -import org.iq80.leveldb.WriteOptions; - -import static org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.GenericObjectMapper.readReverseOrderedLong; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.GenericObjectMapper.writeReverseOrderedLong; - -/** - * <p>An implementation of an application timeline store backed by leveldb.</p> - * - * <p>There are three sections of the db, the start time section, - * the entity section, and the indexed entity section.</p> - * - * <p>The start time section is used to retrieve the unique start time for - * a given entity. Its values each contain a start time while its keys are of - * the form:</p> - * <pre> - * START_TIME_LOOKUP_PREFIX + entity type + entity id</pre> - * - * <p>The entity section is ordered by entity type, then entity start time - * descending, then entity ID. There are four sub-sections of the entity - * section: events, primary filters, related entities, - * and other info. The event entries have event info serialized into their - * values. The other info entries have values corresponding to the values of - * the other info name/value map for the entry (note the names are contained - * in the key). All other entries have empty values. The key structure is as - * follows:</p> - * <pre> - * ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id - * - * ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id + - * EVENTS_COLUMN + reveventtimestamp + eventtype - * - * ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id + - * PRIMARY_FILTERS_COLUMN + name + value - * - * ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id + - * OTHER_INFO_COLUMN + name - * - * ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id + - * RELATED_ENTITIES_COLUMN + relatedentity type + relatedentity id - * - * ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id + - * INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN + relatedentity type + - * relatedentity id</pre> - * - * <p>The indexed entity section contains a primary filter name and primary - * filter value as the prefix. Within a given name/value, entire entity - * entries are stored in the same format as described in the entity section - * above (below, "key" represents any one of the possible entity entry keys - * described above).</p> - * <pre> - * INDEXED_ENTRY_PREFIX + primaryfilter name + primaryfilter value + - * key</pre> - */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -public class LeveldbTimelineStore extends AbstractService - implements TimelineStore { - private static final Log LOG = LogFactory - .getLog(LeveldbTimelineStore.class); - - private static final String FILENAME = "leveldb-timeline-store.ldb"; - - private static final byte[] START_TIME_LOOKUP_PREFIX = "k".getBytes(); - private static final byte[] ENTITY_ENTRY_PREFIX = "e".getBytes(); - private static final byte[] INDEXED_ENTRY_PREFIX = "i".getBytes(); - - private static final byte[] EVENTS_COLUMN = "e".getBytes(); - private static final byte[] PRIMARY_FILTERS_COLUMN = "f".getBytes(); - private static final byte[] OTHER_INFO_COLUMN = "i".getBytes(); - private static final byte[] RELATED_ENTITIES_COLUMN = "r".getBytes(); - private static final byte[] INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN = - "z".getBytes(); - - private static final byte[] EMPTY_BYTES = new byte[0]; - - private Map<EntityIdentifier, StartAndInsertTime> startTimeWriteCache; - private Map<EntityIdentifier, Long> startTimeReadCache; - - /** - * Per-entity locks are obtained when writing. - */ - private final LockMap<EntityIdentifier> writeLocks = - new LockMap<EntityIdentifier>(); - - private final ReentrantReadWriteLock deleteLock = - new ReentrantReadWriteLock(); - - private DB db; - - private Thread deletionThread; - - public LeveldbTimelineStore() { - super(LeveldbTimelineStore.class.getName()); - } - - @Override - @SuppressWarnings("unchecked") - protected void serviceInit(Configuration conf) throws Exception { - Options options = new Options(); - options.createIfMissing(true); - options.cacheSize(conf.getLong( - YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE)); - JniDBFactory factory = new JniDBFactory(); - String path = conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH); - File p = new File(path); - if (!p.exists()) { - if (!p.mkdirs()) { - throw new IOException("Couldn't create directory for leveldb " + - "timeline store " + path); - } - } - LOG.info("Using leveldb path " + path); - db = factory.open(new File(path, FILENAME), options); - startTimeWriteCache = - Collections.synchronizedMap(new LRUMap(getStartTimeWriteCacheSize( - conf))); - startTimeReadCache = - Collections.synchronizedMap(new LRUMap(getStartTimeReadCacheSize( - conf))); - - if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, true)) { - deletionThread = new EntityDeletionThread(conf); - deletionThread.start(); - } - - super.serviceInit(conf); - } - - @Override - protected void serviceStop() throws Exception { - if (deletionThread != null) { - deletionThread.interrupt(); - LOG.info("Waiting for deletion thread to complete its current action"); - try { - deletionThread.join(); - } catch (InterruptedException e) { - LOG.warn("Interrupted while waiting for deletion thread to complete," + - " closing db now", e); - } - } - IOUtils.cleanup(LOG, db); - super.serviceStop(); - } - - private static class StartAndInsertTime { - final long startTime; - final long insertTime; - - public StartAndInsertTime(long startTime, long insertTime) { - this.startTime = startTime; - this.insertTime = insertTime; - } - } - - private class EntityDeletionThread extends Thread { - private final long ttl; - private final long ttlInterval; - - public EntityDeletionThread(Configuration conf) { - ttl = conf.getLong(YarnConfiguration.TIMELINE_SERVICE_TTL_MS, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_TTL_MS); - ttlInterval = conf.getLong( - YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS); - LOG.info("Starting deletion thread with ttl " + ttl + " and cycle " + - "interval " + ttlInterval); - } - - @Override - public void run() { - while (true) { - long timestamp = System.currentTimeMillis() - ttl; - try { - discardOldEntities(timestamp); - Thread.sleep(ttlInterval); - } catch (IOException e) { - LOG.error(e); - } catch (InterruptedException e) { - LOG.info("Deletion thread received interrupt, exiting"); - break; - } - } - } - } - - private static class LockMap<K> { - private static class CountingReentrantLock<K> extends ReentrantLock { - private static final long serialVersionUID = 1L; - private int count; - private K key; - - CountingReentrantLock(K key) { - super(); - this.count = 0; - this.key = key; - } - } - - private Map<K, CountingReentrantLock<K>> locks = - new HashMap<K, CountingReentrantLock<K>>(); - - synchronized CountingReentrantLock<K> getLock(K key) { - CountingReentrantLock<K> lock = locks.get(key); - if (lock == null) { - lock = new CountingReentrantLock<K>(key); - locks.put(key, lock); - } - - lock.count++; - return lock; - } - - synchronized void returnLock(CountingReentrantLock<K> lock) { - if (lock.count == 0) { - throw new IllegalStateException("Returned lock more times than it " + - "was retrieved"); - } - lock.count--; - - if (lock.count == 0) { - locks.remove(lock.key); - } - } - } - - private static class KeyBuilder { - private static final int MAX_NUMBER_OF_KEY_ELEMENTS = 10; - private byte[][] b; - private boolean[] useSeparator; - private int index; - private int length; - - public KeyBuilder(int size) { - b = new byte[size][]; - useSeparator = new boolean[size]; - index = 0; - length = 0; - } - - public static KeyBuilder newInstance() { - return new KeyBuilder(MAX_NUMBER_OF_KEY_ELEMENTS); - } - - public KeyBuilder add(String s) { - return add(s.getBytes(), true); - } - - public KeyBuilder add(byte[] t) { - return add(t, false); - } - - public KeyBuilder add(byte[] t, boolean sep) { - b[index] = t; - useSeparator[index] = sep; - length += t.length; - if (sep) { - length++; - } - index++; - return this; - } - - public byte[] getBytes() throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(length); - for (int i = 0; i < index; i++) { - baos.write(b[i]); - if (i < index-1 && useSeparator[i]) { - baos.write(0x0); - } - } - return baos.toByteArray(); - } - - public byte[] getBytesForLookup() throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(length); - for (int i = 0; i < index; i++) { - baos.write(b[i]); - if (useSeparator[i]) { - baos.write(0x0); - } - } - return baos.toByteArray(); - } - } - - private static class KeyParser { - private final byte[] b; - private int offset; - - public KeyParser(byte[] b, int offset) { - this.b = b; - this.offset = offset; - } - - public String getNextString() throws IOException { - if (offset >= b.length) { - throw new IOException( - "tried to read nonexistent string from byte array"); - } - int i = 0; - while (offset+i < b.length && b[offset+i] != 0x0) { - i++; - } - String s = new String(b, offset, i); - offset = offset + i + 1; - return s; - } - - public long getNextLong() throws IOException { - if (offset+8 >= b.length) { - throw new IOException("byte array ran out when trying to read long"); - } - long l = readReverseOrderedLong(b, offset); - offset += 8; - return l; - } - - public int getOffset() { - return offset; - } - } - - @Override - public TimelineEntity getEntity(String entityId, String entityType, - EnumSet<Field> fields) throws IOException { - Long revStartTime = getStartTimeLong(entityId, entityType); - if (revStartTime == null) { - return null; - } - byte[] prefix = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX) - .add(entityType).add(writeReverseOrderedLong(revStartTime)) - .add(entityId).getBytesForLookup(); - - DBIterator iterator = null; - try { - iterator = db.iterator(); - iterator.seek(prefix); - - return getEntity(entityId, entityType, revStartTime, fields, iterator, - prefix, prefix.length); - } finally { - IOUtils.cleanup(LOG, iterator); - } - } - - /** - * Read entity from a db iterator. If no information is found in the - * specified fields for this entity, return null. - */ - private static TimelineEntity getEntity(String entityId, String entityType, - Long startTime, EnumSet<Field> fields, DBIterator iterator, - byte[] prefix, int prefixlen) throws IOException { - if (fields == null) { - fields = EnumSet.allOf(Field.class); - } - - TimelineEntity entity = new TimelineEntity(); - boolean events = false; - boolean lastEvent = false; - if (fields.contains(Field.EVENTS)) { - events = true; - } else if (fields.contains(Field.LAST_EVENT_ONLY)) { - lastEvent = true; - } else { - entity.setEvents(null); - } - boolean relatedEntities = false; - if (fields.contains(Field.RELATED_ENTITIES)) { - relatedEntities = true; - } else { - entity.setRelatedEntities(null); - } - boolean primaryFilters = false; - if (fields.contains(Field.PRIMARY_FILTERS)) { - primaryFilters = true; - } else { - entity.setPrimaryFilters(null); - } - boolean otherInfo = false; - if (fields.contains(Field.OTHER_INFO)) { - otherInfo = true; - } else { - entity.setOtherInfo(null); - } - - // iterate through the entity's entry, parsing information if it is part - // of a requested field - for (; iterator.hasNext(); iterator.next()) { - byte[] key = iterator.peekNext().getKey(); - if (!prefixMatches(prefix, prefixlen, key)) { - break; - } - if (key.length == prefixlen) { - continue; - } - if (key[prefixlen] == PRIMARY_FILTERS_COLUMN[0]) { - if (primaryFilters) { - addPrimaryFilter(entity, key, - prefixlen + PRIMARY_FILTERS_COLUMN.length); - } - } else if (key[prefixlen] == OTHER_INFO_COLUMN[0]) { - if (otherInfo) { - entity.addOtherInfo(parseRemainingKey(key, - prefixlen + OTHER_INFO_COLUMN.length), - GenericObjectMapper.read(iterator.peekNext().getValue())); - } - } else if (key[prefixlen] == RELATED_ENTITIES_COLUMN[0]) { - if (relatedEntities) { - addRelatedEntity(entity, key, - prefixlen + RELATED_ENTITIES_COLUMN.length); - } - } else if (key[prefixlen] == EVENTS_COLUMN[0]) { - if (events || (lastEvent && - entity.getEvents().size() == 0)) { - TimelineEvent event = getEntityEvent(null, key, prefixlen + - EVENTS_COLUMN.length, iterator.peekNext().getValue()); - if (event != null) { - entity.addEvent(event); - } - } - } else { - if (key[prefixlen] != - INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN[0]) { - LOG.warn(String.format("Found unexpected column for entity %s of " + - "type %s (0x%02x)", entityId, entityType, key[prefixlen])); - } - } - } - - entity.setEntityId(entityId); - entity.setEntityType(entityType); - entity.setStartTime(startTime); - - return entity; - } - - @Override - public TimelineEvents getEntityTimelines(String entityType, - SortedSet<String> entityIds, Long limit, Long windowStart, - Long windowEnd, Set<String> eventType) throws IOException { - TimelineEvents events = new TimelineEvents(); - if (entityIds == null || entityIds.isEmpty()) { - return events; - } - // create a lexicographically-ordered map from start time to entities - Map<byte[], List<EntityIdentifier>> startTimeMap = new TreeMap<byte[], - List<EntityIdentifier>>(new Comparator<byte[]>() { - @Override - public int compare(byte[] o1, byte[] o2) { - return WritableComparator.compareBytes(o1, 0, o1.length, o2, 0, - o2.length); - } - }); - DBIterator iterator = null; - try { - // look up start times for the specified entities - // skip entities with no start time - for (String entityId : entityIds) { - byte[] startTime = getStartTime(entityId, entityType); - if (startTime != null) { - List<EntityIdentifier> entities = startTimeMap.get(startTime); - if (entities == null) { - entities = new ArrayList<EntityIdentifier>(); - startTimeMap.put(startTime, entities); - } - entities.add(new EntityIdentifier(entityId, entityType)); - } - } - for (Entry<byte[], List<EntityIdentifier>> entry : - startTimeMap.entrySet()) { - // look up the events matching the given parameters (limit, - // start time, end time, event types) for entities whose start times - // were found and add the entities to the return list - byte[] revStartTime = entry.getKey(); - for (EntityIdentifier entityIdentifier : entry.getValue()) { - EventsOfOneEntity entity = new EventsOfOneEntity(); - entity.setEntityId(entityIdentifier.getId()); - entity.setEntityType(entityType); - events.addEvent(entity); - KeyBuilder kb = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX) - .add(entityType).add(revStartTime).add(entityIdentifier.getId()) - .add(EVENTS_COLUMN); - byte[] prefix = kb.getBytesForLookup(); - if (windowEnd == null) { - windowEnd = Long.MAX_VALUE; - } - byte[] revts = writeReverseOrderedLong(windowEnd); - kb.add(revts); - byte[] first = kb.getBytesForLookup(); - byte[] last = null; - if (windowStart != null) { - last = KeyBuilder.newInstance().add(prefix) - .add(writeReverseOrderedLong(windowStart)).getBytesForLookup(); - } - if (limit == null) { - limit = DEFAULT_LIMIT; - } - iterator = db.iterator(); - for (iterator.seek(first); entity.getEvents().size() < limit && - iterator.hasNext(); iterator.next()) { - byte[] key = iterator.peekNext().getKey(); - if (!prefixMatches(prefix, prefix.length, key) || (last != null && - WritableComparator.compareBytes(key, 0, key.length, last, 0, - last.length) > 0)) { - break; - } - TimelineEvent event = getEntityEvent(eventType, key, prefix.length, - iterator.peekNext().getValue()); - if (event != null) { - entity.addEvent(event); - } - } - } - } - } finally { - IOUtils.cleanup(LOG, iterator); - } - return events; - } - - /** - * Returns true if the byte array begins with the specified prefix. - */ - private static boolean prefixMatches(byte[] prefix, int prefixlen, - byte[] b) { - if (b.length < prefixlen) { - return false; - } - return WritableComparator.compareBytes(prefix, 0, prefixlen, b, 0, - prefixlen) == 0; - } - - @Override - public TimelineEntities getEntities(String entityType, - Long limit, Long windowStart, Long windowEnd, String fromId, Long fromTs, - NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters, - EnumSet<Field> fields) throws IOException { - if (primaryFilter == null) { - // if no primary filter is specified, prefix the lookup with - // ENTITY_ENTRY_PREFIX - return getEntityByTime(ENTITY_ENTRY_PREFIX, entityType, limit, - windowStart, windowEnd, fromId, fromTs, secondaryFilters, fields); - } else { - // if a primary filter is specified, prefix the lookup with - // INDEXED_ENTRY_PREFIX + primaryFilterName + primaryFilterValue + - // ENTITY_ENTRY_PREFIX - byte[] base = KeyBuilder.newInstance().add(INDEXED_ENTRY_PREFIX) - .add(primaryFilter.getName()) - .add(GenericObjectMapper.write(primaryFilter.getValue()), true) - .add(ENTITY_ENTRY_PREFIX).getBytesForLookup(); - return getEntityByTime(base, entityType, limit, windowStart, windowEnd, - fromId, fromTs, secondaryFilters, fields); - } - } - - /** - * Retrieves a list of entities satisfying given parameters. - * - * @param base A byte array prefix for the lookup - * @param entityType The type of the entity - * @param limit A limit on the number of entities to return - * @param starttime The earliest entity start time to retrieve (exclusive) - * @param endtime The latest entity start time to retrieve (inclusive) - * @param fromId Retrieve entities starting with this entity - * @param fromTs Ignore entities with insert timestamp later than this ts - * @param secondaryFilters Filter pairs that the entities should match - * @param fields The set of fields to retrieve - * @return A list of entities - * @throws IOException - */ - private TimelineEntities getEntityByTime(byte[] base, - String entityType, Long limit, Long starttime, Long endtime, - String fromId, Long fromTs, Collection<NameValuePair> secondaryFilters, - EnumSet<Field> fields) throws IOException { - DBIterator iterator = null; - try { - KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType); - // only db keys matching the prefix (base + entity type) will be parsed - byte[] prefix = kb.getBytesForLookup(); - if (endtime == null) { - // if end time is null, place no restriction on end time - endtime = Long.MAX_VALUE; - } - // construct a first key that will be seeked to using end time or fromId - byte[] first = null; - if (fromId != null) { - Long fromIdStartTime = getStartTimeLong(fromId, entityType); - if (fromIdStartTime == null) { - // no start time for provided id, so return empty entities - return new TimelineEntities(); - } - if (fromIdStartTime <= endtime) { - // if provided id's start time falls before the end of the window, - // use it to construct the seek key - first = kb.add(writeReverseOrderedLong(fromIdStartTime)) - .add(fromId).getBytesForLookup(); - } - } - // if seek key wasn't constructed using fromId, construct it using end ts - if (first == null) { - first = kb.add(writeReverseOrderedLong(endtime)).getBytesForLookup(); - } - byte[] last = null; - if (starttime != null) { - // if start time is not null, set a last key that will not be - // iterated past - last = KeyBuilder.newInstance().add(base).add(entityType) - .add(writeReverseOrderedLong(starttime)).getBytesForLookup(); - } - if (limit == null) { - // if limit is not specified, use the default - limit = DEFAULT_LIMIT; - } - - TimelineEntities entities = new TimelineEntities(); - iterator = db.iterator(); - iterator.seek(first); - // iterate until one of the following conditions is met: limit is - // reached, there are no more keys, the key prefix no longer matches, - // or a start time has been specified and reached/exceeded - while (entities.getEntities().size() < limit && iterator.hasNext()) { - byte[] key = iterator.peekNext().getKey(); - if (!prefixMatches(prefix, prefix.length, key) || (last != null && - WritableComparator.compareBytes(key, 0, key.length, last, 0, - last.length) > 0)) { - break; - } - // read the start time and entity id from the current key - KeyParser kp = new KeyParser(key, prefix.length); - Long startTime = kp.getNextLong(); - String entityId = kp.getNextString(); - - if (fromTs != null) { - long insertTime = readReverseOrderedLong(iterator.peekNext() - .getValue(), 0); - if (insertTime > fromTs) { - byte[] firstKey = key; - while (iterator.hasNext() && prefixMatches(firstKey, - kp.getOffset(), key)) { - iterator.next(); - key = iterator.peekNext().getKey(); - } - continue; - } - } - - // parse the entity that owns this key, iterating over all keys for - // the entity - TimelineEntity entity = getEntity(entityId, entityType, startTime, - fields, iterator, key, kp.getOffset()); - // determine if the retrieved entity matches the provided secondary - // filters, and if so add it to the list of entities to return - boolean filterPassed = true; - if (secondaryFilters != null) { - for (NameValuePair filter : secondaryFilters) { - Object v = entity.getOtherInfo().get(filter.getName()); - if (v == null) { - Set<Object> vs = entity.getPrimaryFilters() - .get(filter.getName()); - if (vs != null && !vs.contains(filter.getValue())) { - filterPassed = false; - break; - } - } else if (!v.equals(filter.getValue())) { - filterPassed = false; - break; - } - } - } - if (filterPassed) { - entities.addEntity(entity); - } - } - return entities; - } finally { - IOUtils.cleanup(LOG, iterator); - } - } - - /** - * Put a single entity. If there is an error, add a TimelinePutError to the - * given response. - */ - private void put(TimelineEntity entity, TimelinePutResponse response) { - LockMap.CountingReentrantLock<EntityIdentifier> lock = - writeLocks.getLock(new EntityIdentifier(entity.getEntityId(), - entity.getEntityType())); - lock.lock(); - WriteBatch writeBatch = null; - List<EntityIdentifier> relatedEntitiesWithoutStartTimes = - new ArrayList<EntityIdentifier>(); - byte[] revStartTime = null; - try { - writeBatch = db.createWriteBatch(); - List<TimelineEvent> events = entity.getEvents(); - // look up the start time for the entity - StartAndInsertTime startAndInsertTime = getAndSetStartTime( - entity.getEntityId(), entity.getEntityType(), - entity.getStartTime(), events); - if (startAndInsertTime == null) { - // if no start time is found, add an error and return - TimelinePutError error = new TimelinePutError(); - error.setEntityId(entity.getEntityId()); - error.setEntityType(entity.getEntityType()); - error.setErrorCode(TimelinePutError.NO_START_TIME); - response.addError(error); - return; - } - revStartTime = writeReverseOrderedLong(startAndInsertTime - .startTime); - - Map<String, Set<Object>> primaryFilters = entity.getPrimaryFilters(); - - // write entity marker - byte[] markerKey = createEntityMarkerKey(entity.getEntityId(), - entity.getEntityType(), revStartTime); - byte[] markerValue = writeReverseOrderedLong(startAndInsertTime - .insertTime); - writeBatch.put(markerKey, markerValue); - writePrimaryFilterEntries(writeBatch, primaryFilters, markerKey, - markerValue); - - // write event entries - if (events != null && !events.isEmpty()) { - for (TimelineEvent event : events) { - byte[] revts = writeReverseOrderedLong(event.getTimestamp()); - byte[] key = createEntityEventKey(entity.getEntityId(), - entity.getEntityType(), revStartTime, revts, - event.getEventType()); - byte[] value = GenericObjectMapper.write(event.getEventInfo()); - writeBatch.put(key, value); - writePrimaryFilterEntries(writeBatch, primaryFilters, key, value); - } - } - - // write related entity entries - Map<String, Set<String>> relatedEntities = - entity.getRelatedEntities(); - if (relatedEntities != null && !relatedEntities.isEmpty()) { - for (Entry<String, Set<String>> relatedEntityList : - relatedEntities.entrySet()) { - String relatedEntityType = relatedEntityList.getKey(); - for (String relatedEntityId : relatedEntityList.getValue()) { - // invisible "reverse" entries (entity -> related entity) - byte[] key = createReverseRelatedEntityKey(entity.getEntityId(), - entity.getEntityType(), revStartTime, relatedEntityId, - relatedEntityType); - writeBatch.put(key, EMPTY_BYTES); - // look up start time of related entity - byte[] relatedEntityStartTime = getStartTime(relatedEntityId, - relatedEntityType); - // delay writing the related entity if no start time is found - if (relatedEntityStartTime == null) { - relatedEntitiesWithoutStartTimes.add( - new EntityIdentifier(relatedEntityId, relatedEntityType)); - continue; - } - // write "forward" entry (related entity -> entity) - key = createRelatedEntityKey(relatedEntityId, - relatedEntityType, relatedEntityStartTime, - entity.getEntityId(), entity.getEntityType()); - writeBatch.put(key, EMPTY_BYTES); - } - } - } - - // write primary filter entries - if (primaryFilters != null && !primaryFilters.isEmpty()) { - for (Entry<String, Set<Object>> primaryFilter : - primaryFilters.entrySet()) { - for (Object primaryFilterValue : primaryFilter.getValue()) { - byte[] key = createPrimaryFilterKey(entity.getEntityId(), - entity.getEntityType(), revStartTime, - primaryFilter.getKey(), primaryFilterValue); - writeBatch.put(key, EMPTY_BYTES); - writePrimaryFilterEntries(writeBatch, primaryFilters, key, - EMPTY_BYTES); - } - } - } - - // write other info entries - Map<String, Object> otherInfo = entity.getOtherInfo(); - if (otherInfo != null && !otherInfo.isEmpty()) { - for (Entry<String, Object> i : otherInfo.entrySet()) { - byte[] key = createOtherInfoKey(entity.getEntityId(), - entity.getEntityType(), revStartTime, i.getKey()); - byte[] value = GenericObjectMapper.write(i.getValue()); - writeBatch.put(key, value); - writePrimaryFilterEntries(writeBatch, primaryFilters, key, value); - } - } - db.write(writeBatch); - } catch (IOException e) { - LOG.error("Error putting entity " + entity.getEntityId() + - " of type " + entity.getEntityType(), e); - TimelinePutError error = new TimelinePutError(); - error.setEntityId(entity.getEntityId()); - error.setEntityType(entity.getEntityType()); - error.setErrorCode(TimelinePutError.IO_EXCEPTION); - response.addError(error); - } finally { - lock.unlock(); - writeLocks.returnLock(lock); - IOUtils.cleanup(LOG, writeBatch); - } - - for (EntityIdentifier relatedEntity : relatedEntitiesWithoutStartTimes) { - lock = writeLocks.getLock(relatedEntity); - lock.lock(); - try { - StartAndInsertTime relatedEntityStartAndInsertTime = - getAndSetStartTime(relatedEntity.getId(), relatedEntity.getType(), - readReverseOrderedLong(revStartTime, 0), null); - if (relatedEntityStartAndInsertTime == null) { - throw new IOException("Error setting start time for related entity"); - } - byte[] relatedEntityStartTime = writeReverseOrderedLong( - relatedEntityStartAndInsertTime.startTime); - db.put(createRelatedEntityKey(relatedEntity.getId(), - relatedEntity.getType(), relatedEntityStartTime, - entity.getEntityId(), entity.getEntityType()), EMPTY_BYTES); - db.put(createEntityMarkerKey(relatedEntity.getId(), - relatedEntity.getType(), relatedEntityStartTime), - writeReverseOrderedLong(relatedEntityStartAndInsertTime - .insertTime)); - } catch (IOException e) { - LOG.error("Error putting related entity " + relatedEntity.getId() + - " of type " + relatedEntity.getType() + " for entity " + - entity.getEntityId() + " of type " + entity.getEntityType(), e); - TimelinePutError error = new TimelinePutError(); - error.setEntityId(entity.getEntityId()); - error.setEntityType(entity.getEntityType()); - error.setErrorCode(TimelinePutError.IO_EXCEPTION); - response.addError(error); - } finally { - lock.unlock(); - writeLocks.returnLock(lock); - } - } - } - - /** - * For a given key / value pair that has been written to the db, - * write additional entries to the db for each primary filter. - */ - private static void writePrimaryFilterEntries(WriteBatch writeBatch, - Map<String, Set<Object>> primaryFilters, byte[] key, byte[] value) - throws IOException { - if (primaryFilters != null && !primaryFilters.isEmpty()) { - for (Entry<String, Set<Object>> pf : primaryFilters.entrySet()) { - for (Object pfval : pf.getValue()) { - writeBatch.put(addPrimaryFilterToKey(pf.getKey(), pfval, - key), value); - } - } - } - } - - @Override - public TimelinePutResponse put(TimelineEntities entities) { - try { - deleteLock.readLock().lock(); - TimelinePutResponse response = new TimelinePutResponse(); - for (TimelineEntity entity : entities.getEntities()) { - put(entity, response); - } - return response; - } finally { - deleteLock.readLock().unlock(); - } - } - - /** - * Get the unique start time for a given entity as a byte array that sorts - * the timestamps in reverse order (see {@link - * GenericObjectMapper#writeReverseOrderedLong(long)}). - * - * @param entityId The id of the entity - * @param entityType The type of the entity - * @return A byte array, null if not found - * @throws IOException - */ - private byte[] getStartTime(String entityId, String entityType) - throws IOException { - Long l = getStartTimeLong(entityId, entityType); - return l == null ? null : writeReverseOrderedLong(l); - } - - /** - * Get the unique start time for a given entity as a Long. - * - * @param entityId The id of the entity - * @param entityType The type of the entity - * @return A Long, null if not found - * @throws IOException - */ - private Long getStartTimeLong(String entityId, String entityType) - throws IOException { - EntityIdentifier entity = new EntityIdentifier(entityId, entityType); - // start time is not provided, so try to look it up - if (startTimeReadCache.containsKey(entity)) { - // found the start time in the cache - return startTimeReadCache.get(entity); - } else { - // try to look up the start time in the db - byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType()); - byte[] v = db.get(b); - if (v == null) { - // did not find the start time in the db - return null; - } else { - // found the start time in the db - Long l = readReverseOrderedLong(v, 0); - startTimeReadCache.put(entity, l); - return l; - } - } - } - - /** - * Get the unique start time for a given entity as a byte array that sorts - * the timestamps in reverse order (see {@link - * GenericObjectMapper#writeReverseOrderedLong(long)}). If the start time - * doesn't exist, set it based on the information provided. Should only be - * called when a lock has been obtained on the entity. - * - * @param entityId The id of the entity - * @param entityType The type of the entity - * @param startTime The start time of the entity, or null - * @param events A list of events for the entity, or null - * @return A StartAndInsertTime - * @throws IOException - */ - private StartAndInsertTime getAndSetStartTime(String entityId, - String entityType, Long startTime, List<TimelineEvent> events) - throws IOException { - EntityIdentifier entity = new EntityIdentifier(entityId, entityType); - if (startTime == null) { - // start time is not provided, so try to look it up - if (startTimeWriteCache.containsKey(entity)) { - // found the start time in the cache - return startTimeWriteCache.get(entity); - } else { - if (events != null) { - // prepare a start time from events in case it is needed - Long min = Long.MAX_VALUE; - for (TimelineEvent e : events) { - if (min > e.getTimestamp()) { - min = e.getTimestamp(); - } - } - startTime = min; - } - return checkStartTimeInDb(entity, startTime); - } - } else { - // start time is provided - if (startTimeWriteCache.containsKey(entity)) { - // always use start time from cache if it exists - return startTimeWriteCache.get(entity); - } else { - // check the provided start time matches the db - return checkStartTimeInDb(entity, startTime); - } - } - } - - /** - * Checks db for start time and returns it if it exists. If it doesn't - * exist, writes the suggested start time (if it is not null). This is - * only called when the start time is not found in the cache, - * so it adds it back into the cache if it is found. Should only be called - * when a lock has been obtained on the entity. - */ - private StartAndInsertTime checkStartTimeInDb(EntityIdentifier entity, - Long suggestedStartTime) throws IOException { - StartAndInsertTime startAndInsertTime = null; - // create lookup key for start time - byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType()); - // retrieve value for key - byte[] v = db.get(b); - if (v == null) { - // start time doesn't exist in db - if (suggestedStartTime == null) { - return null; - } - startAndInsertTime = new StartAndInsertTime(suggestedStartTime, - System.currentTimeMillis()); - - // write suggested start time - v = new byte[16]; - writeReverseOrderedLong(suggestedStartTime, v, 0); - writeReverseOrderedLong(startAndInsertTime.insertTime, v, 8); - WriteOptions writeOptions = new WriteOptions(); - writeOptions.sync(true); - db.put(b, v, writeOptions); - } else { - // found start time in db, so ignore suggested start time - startAndInsertTime = new StartAndInsertTime(readReverseOrderedLong(v, 0), - readReverseOrderedLong(v, 8)); - } - startTimeWriteCache.put(entity, startAndInsertTime); - startTimeReadCache.put(entity, startAndInsertTime.startTime); - return startAndInsertTime; - } - - /** - * Creates a key for looking up the start time of a given entity, - * of the form START_TIME_LOOKUP_PREFIX + entity type + entity id. - */ - private static byte[] createStartTimeLookupKey(String entityId, - String entityType) throws IOException { - return KeyBuilder.newInstance().add(START_TIME_LOOKUP_PREFIX) - .add(entityType).add(entityId).getBytes(); - } - - /** - * Creates an entity marker, serializing ENTITY_ENTRY_PREFIX + entity type + - * revstarttime + entity id. - */ - private static byte[] createEntityMarkerKey(String entityId, - String entityType, byte[] revStartTime) throws IOException { - return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX) - .add(entityType).add(revStartTime).add(entityId).getBytesForLookup(); - } - - /** - * Creates an index entry for the given key of the form - * INDEXED_ENTRY_PREFIX + primaryfiltername + primaryfiltervalue + key. - */ - private static byte[] addPrimaryFilterToKey(String primaryFilterName, - Object primaryFilterValue, byte[] key) throws IOException { - return KeyBuilder.newInstance().add(INDEXED_ENTRY_PREFIX) - .add(primaryFilterName) - .add(GenericObjectMapper.write(primaryFilterValue), true).add(key) - .getBytes(); - } - - /** - * Creates an event key, serializing ENTITY_ENTRY_PREFIX + entity type + - * revstarttime + entity id + EVENTS_COLUMN + reveventtimestamp + event type. - */ - private static byte[] createEntityEventKey(String entityId, - String entityType, byte[] revStartTime, byte[] revEventTimestamp, - String eventType) throws IOException { - return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX) - .add(entityType).add(revStartTime).add(entityId).add(EVENTS_COLUMN) - .add(revEventTimestamp).add(eventType).getBytes(); - } - - /** - * Creates an event object from the given key, offset, and value. If the - * event type is not contained in the specified set of event types, - * returns null. - */ - private static TimelineEvent getEntityEvent(Set<String> eventTypes, - byte[] key, int offset, byte[] value) throws IOException { - KeyParser kp = new KeyParser(key, offset); - long ts = kp.getNextLong(); - String tstype = kp.getNextString(); - if (eventTypes == null || eventTypes.contains(tstype)) { - TimelineEvent event = new TimelineEvent(); - event.setTimestamp(ts); - event.setEventType(tstype); - Object o = GenericObjectMapper.read(value); - if (o == null) { - event.setEventInfo(null); - } else if (o instanceof Map) { - @SuppressWarnings("unchecked") - Map<String, Object> m = (Map<String, Object>) o; - event.setEventInfo(m); - } else { - throw new IOException("Couldn't deserialize event info map"); - } - return event; - } - return null; - } - - /** - * Creates a primary filter key, serializing ENTITY_ENTRY_PREFIX + - * entity type + revstarttime + entity id + PRIMARY_FILTERS_COLUMN + name + - * value. - */ - private static byte[] createPrimaryFilterKey(String entityId, - String entityType, byte[] revStartTime, String name, Object value) - throws IOException { - return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entityType) - .add(revStartTime).add(entityId).add(PRIMARY_FILTERS_COLUMN).add(name) - .add(GenericObjectMapper.write(value)).getBytes(); - } - - /** - * Parses the primary filter from the given key at the given offset and - * adds it to the given entity. - */ - private static void addPrimaryFilter(TimelineEntity entity, byte[] key, - int offset) throws IOException { - KeyParser kp = new KeyParser(key, offset); - String name = kp.getNextString(); - Object value = GenericObjectMapper.read(key, kp.getOffset()); - entity.addPrimaryFilter(name, value); - } - - /** - * Creates an other info key, serializing ENTITY_ENTRY_PREFIX + entity type + - * revstarttime + entity id + OTHER_INFO_COLUMN + name. - */ - private static byte[] createOtherInfoKey(String entityId, String entityType, - byte[] revStartTime, String name) throws IOException { - return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entityType) - .add(revStartTime).add(entityId).add(OTHER_INFO_COLUMN).add(name) - .getBytes(); - } - - /** - * Creates a string representation of the byte array from the given offset - * to the end of the array (for parsing other info keys). - */ - private static String parseRemainingKey(byte[] b, int offset) { - return new String(b, offset, b.length - offset); - } - - /** - * Creates a related entity key, serializing ENTITY_ENTRY_PREFIX + - * entity type + revstarttime + entity id + RELATED_ENTITIES_COLUMN + - * relatedentity type + relatedentity id. - */ - private static byte[] createRelatedEntityKey(String entityId, - String entityType, byte[] revStartTime, String relatedEntityId, - String relatedEntityType) throws IOException { - return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entityType) - .add(revStartTime).add(entityId).add(RELATED_ENTITIES_COLUMN) - .add(relatedEntityType).add(relatedEntityId).getBytes(); - } - - /** - * Parses the related entity from the given key at the given offset and - * adds it to the given entity. - */ - private static void addRelatedEntity(TimelineEntity entity, byte[] key, - int offset) throws IOException { - KeyParser kp = new KeyParser(key, offset); - String type = kp.getNextString(); - String id = kp.getNextString(); - entity.addRelatedEntity(type, id); - } - - /** - * Creates a reverse related entity key, serializing ENTITY_ENTRY_PREFIX + - * entity type + revstarttime + entity id + - * INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN + - * relatedentity type + relatedentity id. - */ - private static byte[] createReverseRelatedEntityKey(String entityId, - String entityType, byte[] revStartTime, String relatedEntityId, - String relatedEntityType) throws IOException { - return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entityType) - .add(revStartTime).add(entityId) - .add(INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN) - .add(relatedEntityType).add(relatedEntityId).getBytes(); - } - - /** - * Clears the cache to test reloading start times from leveldb (only for - * testing). - */ - @VisibleForTesting - void clearStartTimeCache() { - startTimeWriteCache.clear(); - startTimeReadCache.clear(); - } - - @VisibleForTesting - static int getStartTimeReadCacheSize(Configuration conf) { - return conf.getInt( - YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE, - YarnConfiguration. - DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE); - } - - @VisibleForTesting - static int getStartTimeWriteCacheSize(Configuration conf) { - return conf.getInt( - YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE, - YarnConfiguration. - DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE); - } - - // warning is suppressed to prevent eclipse from noting unclosed resource - @SuppressWarnings("resource") - @VisibleForTesting - List<String> getEntityTypes() throws IOException { - DBIterator iterator = null; - try { - iterator = getDbIterator(false); - List<String> entityTypes = new ArrayList<String>(); - iterator.seek(ENTITY_ENTRY_PREFIX); - while (iterator.hasNext()) { - byte[] key = iterator.peekNext().getKey(); - if (key[0] != ENTITY_ENTRY_PREFIX[0]) { - break; - } - KeyParser kp = new KeyParser(key, - ENTITY_ENTRY_PREFIX.length); - String entityType = kp.getNextString(); - entityTypes.add(entityType); - byte[] lookupKey = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX) - .add(entityType).getBytesForLookup(); - if (lookupKey[lookupKey.length - 1] != 0x0) { - throw new IOException("Found unexpected end byte in lookup key"); - } - lookupKey[lookupKey.length - 1] = 0x1; - iterator.seek(lookupKey); - } - return entityTypes; - } finally { - IOUtils.cleanup(LOG, iterator); - } - } - - /** - * Finds all keys in the db that have a given prefix and deletes them on - * the given write batch. - */ - private void deleteKeysWithPrefix(WriteBatch writeBatch, byte[] prefix, - DBIterator iterator) { - for (iterator.seek(prefix); iterator.hasNext(); iterator.next()) { - byte[] key = iterator.peekNext().getKey(); - if (!prefixMatches(prefix, prefix.length, key)) { - break; - } - writeBatch.delete(key); - } - } - - @VisibleForTesting - boolean deleteNextEntity(String entityType, byte[] reverseTimestamp, - DBIterator iterator, DBIterator pfIterator, boolean seeked) - throws IOException { - WriteBatch writeBatch = null; - try { - KeyBuilder kb = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX) - .add(entityType); - byte[] typePrefix = kb.getBytesForLookup(); - kb.add(reverseTimestamp); - if (!seeked) { - iterator.seek(kb.getBytesForLookup()); - } - if (!iterator.hasNext()) { - return false; - } - byte[] entityKey = iterator.peekNext().getKey(); - if (!prefixMatches(typePrefix, typePrefix.length, entityKey)) { - return false; - } - - // read the start time and entity id from the current key - KeyParser kp = new KeyParser(entityKey, typePrefix.length + 8); - String entityId = kp.getNextString(); - int prefixlen = kp.getOffset(); - byte[] deletePrefix = new byte[prefixlen]; - System.arraycopy(entityKey, 0, deletePrefix, 0, prefixlen); - - writeBatch = db.createWriteBatch(); - - if (LOG.isDebugEnabled()) { - LOG.debug("Deleting entity type:" + entityType + " id:" + entityId); - } - // remove start time from cache and db - writeBatch.delete(createStartTimeLookupKey(entityId, entityType)); - EntityIdentifier entityIdentifier = - new EntityIdentifier(entityId, entityType); - startTimeReadCache.remove(entityIdentifier); - startTimeWriteCache.remove(entityIdentifier); - - // delete current entity - for (; iterator.hasNext(); iterator.next()) { - byte[] key = iterator.peekNext().getKey(); - if (!prefixMatches(entityKey, prefixlen, key)) { - break; - } - writeBatch.delete(key); - - if (key.length == prefixlen) { - continue; - } - if (key[prefixlen] == PRIMARY_FILTERS_COLUMN[0]) { - kp = new KeyParser(key, - prefixlen + PRIMARY_FILTERS_COLUMN.length); - String name = kp.getNextString(); - Object value = GenericObjectMapper.read(key, kp.getOffset()); - deleteKeysWithPrefix(writeBatch, addPrimaryFilterToKey(name, value, - deletePrefix), pfIterator); - if (LOG.isDebugEnabled()) { - LOG.debug("Deleting entity type:" + entityType + " id:" + - entityId + " primary filter entry " + name + " " + - value); - } - } else if (key[prefixlen] == RELATED_ENTITIES_COLUMN[0]) { - kp = new KeyParser(key, - prefixlen + RELATED_ENTITIES_COLUMN.length); - String type = kp.getNextString(); - String id = kp.getNextString(); - byte[] relatedEntityStartTime = getStartTime(id, type); - if (relatedEntityStartTime == null) { - LOG.warn("Found no start time for " + - "related entity " + id + " of type " + type + " while " + - "deleting " + entityId + " of type " + entityType); - continue; - } - writeBatch.delete(createReverseRelatedEntityKey(id, type, - relatedEntityStartTime, entityId, entityType)); - if (LOG.isDebugEnabled()) { - LOG.debug("Deleting entity type:" + entityType + " id:" + - entityId + " from invisible reverse related entity " + - "entry of type:" + type + " id:" + id); - } - } else if (key[prefixlen] == - INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN[0]) { - kp = new KeyParser(key, prefixlen + - INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN.length); - String type = kp.getNextString(); - String id = kp.getNextString(); - byte[] relatedEntityStartTime = getStartTime(id, type); - if (relatedEntityStartTime == null) { - LOG.warn("Found no start time for reverse " + - "related entity " + id + " of type " + type + " while " + - "deleting " + entityId + " of type " + entityType); - continue; - } - writeBatch.delete(createRelatedEntityKey(id, type, - relatedEntityStartTime, entityId, entityType)); - if (LOG.isDebugEnabled()) { - LOG.debug("Deleting entity type:" + entityType + " id:" + - entityId + " from related entity entry of type:" + - type + " id:" + id); - } - } - } - WriteOptions writeOptions = new WriteOptions(); - writeOptions.sync(true); - db.write(writeBatch, writeOptions); - return true; - } finally { - IOUtils.cleanup(LOG, writeBatch); - } - } - - /** - * Discards entities with start timestamp less than or equal to the given - * timestamp. - */ - @VisibleForTesting - void discardOldEntities(long timestamp) - throws IOException, InterruptedException { - byte[] reverseTimestamp = writeReverseOrderedLong(timestamp); - long totalCount = 0; - long t1 = System.currentTimeMillis(); - try { - List<String> entityTypes = getEntityTypes(); - for (String entityType : entityTypes) { - DBIterator iterator = null; - DBIterator pfIterator = null; - long typeCount = 0; - try { - deleteLock.writeLock().lock(); - iterator = getDbIterator(false); - pfIterator = getDbIterator(false); - - if (deletionThread != null && deletionThread.isInterrupted()) { - throw new InterruptedException(); - } - boolean seeked = false; - while (deleteNextEntity(entityType, reverseTimestamp, iterator, - pfIterator, seeked)) { - typeCount++; - totalCount++; - seeked = true; - if (deletionThread != null && deletionThread.isInterrupted()) { - throw new InterruptedException(); - } - } - } catch (IOException e) { - LOG.error("Got IOException while deleting entities for type " + - entityType + ", continuing to next type", e); - } finally { - IOUtils.cleanup(LOG, iterator, pfIterator); - deleteLock.writeLock().unlock(); - if (typeCount > 0) { - LOG.info("Deleted " + typeCount + " entities of type " + - entityType); - } - } - } - } finally { - long t2 = System.currentTimeMillis(); - LOG.info("Discarded " + totalCount + " entities for timestamp " + - timestamp + " and earlier in " + (t2 - t1) / 1000.0 + " seconds"); - } - } - - @VisibleForTesting - DBIterator getDbIterator(boolean fillCache) { - ReadOptions readOptions = new ReadOptions(); - readOptions.fillCache(fillCache); - return db.iterator(readOptions); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/MemoryTimelineStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/MemoryTimelineStore.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/MemoryTimelineStore.java deleted file mode 100644 index 86ac1f8..0000000 --- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/MemoryTimelineStore.java +++ /dev/null @@ -1,360 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.PriorityQueue; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; - -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents; -import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity; -import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError; - -/** - * In-memory implementation of {@link TimelineStore}. This - * implementation is for test purpose only. If users improperly instantiate it, - * they may encounter reading and writing history data in different memory - * store. - * - */ -@Private -@Unstable -public class MemoryTimelineStore - extends AbstractService implements TimelineStore { - - private Map<EntityIdentifier, TimelineEntity> entities = - new HashMap<EntityIdentifier, TimelineEntity>(); - private Map<EntityIdentifier, Long> entityInsertTimes = - new HashMap<EntityIdentifier, Long>(); - - public MemoryTimelineStore() { - super(MemoryTimelineStore.class.getName()); - } - - @Override - public TimelineEntities getEntities(String entityType, Long limit, - Long windowStart, Long windowEnd, String fromId, Long fromTs, - NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters, - EnumSet<Field> fields) { - if (limit == null) { - limit = DEFAULT_LIMIT; - } - if (windowStart == null) { - windowStart = Long.MIN_VALUE; - } - if (windowEnd == null) { - windowEnd = Long.MAX_VALUE; - } - if (fields == null) { - fields = EnumSet.allOf(Field.class); - } - - Iterator<TimelineEntity> entityIterator = null; - if (fromId != null) { - TimelineEntity firstEntity = entities.get(new EntityIdentifier(fromId, - entityType)); - if (firstEntity == null) { - return new TimelineEntities(); - } else { - entityIterator = new TreeSet<TimelineEntity>(entities.values()) - .tailSet(firstEntity, true).iterator(); - } - } - if (entityIterator == null) { - entityIterator = new PriorityQueue<TimelineEntity>(entities.values()) - .iterator(); - } - - List<TimelineEntity> entitiesSelected = new ArrayList<TimelineEntity>(); - while (entityIterator.hasNext()) { - TimelineEntity entity = entityIterator.next(); - if (entitiesSelected.size() >= limit) { - break; - } - if (!entity.getEntityType().equals(entityType)) { - continue; - } - if (entity.getStartTime() <= windowStart) { - continue; - } - if (entity.getStartTime() > windowEnd) { - continue; - } - if (fromTs != null && entityInsertTimes.get(new EntityIdentifier( - entity.getEntityId(), entity.getEntityType())) > fromTs) { - continue; - } - if (primaryFilter != null && - !matchPrimaryFilter(entity.getPrimaryFilters(), primaryFilter)) { - continue; - } - if (secondaryFilters != null) { // AND logic - boolean flag = true; - for (NameValuePair secondaryFilter : secondaryFilters) { - if (secondaryFilter != null && !matchPrimaryFilter( - entity.getPrimaryFilters(), secondaryFilter) && - !matchFilter(entity.getOtherInfo(), secondaryFilter)) { - flag = false; - break; - } - } - if (!flag) { - continue; - } - } - entitiesSelected.add(entity); - } - List<TimelineEntity> entitiesToReturn = new ArrayList<TimelineEntity>(); - for (TimelineEntity entitySelected : entitiesSelected) { - entitiesToReturn.add(maskFields(entitySelected, fields)); - } - Collections.sort(entitiesToReturn); - TimelineEntities entitiesWrapper = new TimelineEntities(); - entitiesWrapper.setEntities(entitiesToReturn); - return entitiesWrapper; - } - - @Override - public TimelineEntity getEntity(String entityId, String entityType, - EnumSet<Field> fieldsToRetrieve) { - if (fieldsToRetrieve == null) { - fieldsToRetrieve = EnumSet.allOf(Field.class); - } - TimelineEntity entity = entities.get(new EntityIdentifier(entityId, entityType)); - if (entity == null) { - return null; - } else { - return maskFields(entity, fieldsToRetrieve); - } - } - - @Override - public TimelineEvents getEntityTimelines(String entityType, - SortedSet<String> entityIds, Long limit, Long windowStart, - Long windowEnd, - Set<String> eventTypes) { - TimelineEvents allEvents = new TimelineEvents(); - if (entityIds == null) { - return allEvents; - } - if (limit == null) { - limit = DEFAULT_LIMIT; - } - if (windowStart == null) { - windowStart = Long.MIN_VALUE; - } - if (windowEnd == null) { - windowEnd = Long.MAX_VALUE; - } - for (String entityId : entityIds) { - EntityIdentifier entityID = new EntityIdentifier(entityId, entityType); - TimelineEntity entity = entities.get(entityID); - if (entity == null) { - continue; - } - EventsOfOneEntity events = new EventsOfOneEntity(); - events.setEntityId(entityId); - events.setEntityType(entityType); - for (TimelineEvent event : entity.getEvents()) { - if (events.getEvents().size() >= limit) { - break; - } - if (event.getTimestamp() <= windowStart) { - continue; - } - if (event.getTimestamp() > windowEnd) { - continue; - } - if (eventTypes != null && !eventTypes.contains(event.getEventType())) { - continue; - } - events.addEvent(event); - } - allEvents.addEvent(events); - } - return allEvents; - } - - @Override - public TimelinePutResponse put(TimelineEntities data) { - TimelinePutResponse response = new TimelinePutResponse(); - for (TimelineEntity entity : data.getEntities()) { - EntityIdentifier entityId = - new EntityIdentifier(entity.getEntityId(), entity.getEntityType()); - // store entity info in memory - TimelineEntity existingEntity = entities.get(entityId); - if (existingEntity == null) { - existingEntity = new TimelineEntity(); - existingEntity.setEntityId(entity.getEntityId()); - existingEntity.setEntityType(entity.getEntityType()); - existingEntity.setStartTime(entity.getStartTime()); - entities.put(entityId, existingEntity); - entityInsertTimes.put(entityId, System.currentTimeMillis()); - } - if (entity.getEvents() != null) { - if (existingEntity.getEvents() == null) { - existingEntity.setEvents(entity.getEvents()); - } else { - existingEntity.addEvents(entity.getEvents()); - } - Collections.sort(existingEntity.getEvents()); - } - // check startTime - if (existingEntity.getStartTime() == null) { - if (existingEntity.getEvents() == null - || existingEntity.getEvents().isEmpty()) { - TimelinePutError error = new TimelinePutError(); - error.setEntityId(entityId.getId()); - error.setEntityType(entityId.getType()); - error.setErrorCode(TimelinePutError.NO_START_TIME); - response.addError(error); - entities.remove(entityId); - entityInsertTimes.remove(entityId); - continue; - } else { - Long min = Long.MAX_VALUE; - for (TimelineEvent e : entity.getEvents()) { - if (min > e.getTimestamp()) { - min = e.getTimestamp(); - } - } - existingEntity.setStartTime(min); - } - } - if (entity.getPrimaryFilters() != null) { - if (existingEntity.getPrimaryFilters() == null) { - existingEntity.setPrimaryFilters(new HashMap<String, Set<Object>>()); - } - for (Entry<String, Set<Object>> pf : - entity.getPrimaryFilters().entrySet()) { - for (Object pfo : pf.getValue()) { - existingEntity.addPrimaryFilter(pf.getKey(), maybeConvert(pfo)); - } - } - } - if (entity.getOtherInfo() != null) { - if (existingEntity.getOtherInfo() == null) { - existingEntity.setOtherInfo(new HashMap<String, Object>()); - } - for (Entry<String, Object> info : entity.getOtherInfo().entrySet()) { - existingEntity.addOtherInfo(info.getKey(), - maybeConvert(info.getValue())); - } - } - // relate it to other entities - if (entity.getRelatedEntities() == null) { - continue; - } - for (Map.Entry<String, Set<String>> partRelatedEntities : entity - .getRelatedEntities().entrySet()) { - if (partRelatedEntities == null) { - continue; - } - for (String idStr : partRelatedEntities.getValue()) { - EntityIdentifier relatedEntityId = - new EntityIdentifier(idStr, partRelatedEntities.getKey()); - TimelineEntity relatedEntity = entities.get(relatedEntityId); - if (relatedEntity != null) { - relatedEntity.addRelatedEntity( - existingEntity.getEntityType(), existingEntity.getEntityId()); - } else { - relatedEntity = new TimelineEntity(); - relatedEntity.setEntityId(relatedEntityId.getId()); - relatedEntity.setEntityType(relatedEntityId.getType()); - relatedEntity.setStartTime(existingEntity.getStartTime()); - relatedEntity.addRelatedEntity(existingEntity.getEntityType(), - existingEntity.getEntityId()); - entities.put(relatedEntityId, relatedEntity); - entityInsertTimes.put(relatedEntityId, System.currentTimeMillis()); - } - } - } - } - return response; - } - - private static TimelineEntity maskFields( - TimelineEntity entity, EnumSet<Field> fields) { - // Conceal the fields that are not going to be exposed - TimelineEntity entityToReturn = new TimelineEntity(); - entityToReturn.setEntityId(entity.getEntityId()); - entityToReturn.setEntityType(entity.getEntityType()); - entityToReturn.setStartTime(entity.getStartTime()); - entityToReturn.setEvents(fields.contains(Field.EVENTS) ? - entity.getEvents() : fields.contains(Field.LAST_EVENT_ONLY) ? - Arrays.asList(entity.getEvents().get(0)) : null); - entityToReturn.setRelatedEntities(fields.contains(Field.RELATED_ENTITIES) ? - entity.getRelatedEntities() : null); - entityToReturn.setPrimaryFilters(fields.contains(Field.PRIMARY_FILTERS) ? - entity.getPrimaryFilters() : null); - entityToReturn.setOtherInfo(fields.contains(Field.OTHER_INFO) ? - entity.getOtherInfo() : null); - return entityToReturn; - } - - private static boolean matchFilter(Map<String, Object> tags, - NameValuePair filter) { - Object value = tags.get(filter.getName()); - if (value == null) { // doesn't have the filter - return false; - } else if (!value.equals(filter.getValue())) { // doesn't match the filter - return false; - } - return true; - } - - private static boolean matchPrimaryFilter(Map<String, Set<Object>> tags, - NameValuePair filter) { - Set<Object> value = tags.get(filter.getName()); - if (value == null) { // doesn't have the filter - return false; - } else { - return value.contains(filter.getValue()); - } - } - - private static Object maybeConvert(Object o) { - if (o instanceof Long) { - Long l = (Long)o; - if (l >= Integer.MIN_VALUE && l <= Integer.MAX_VALUE) { - return l.intValue(); - } - } - return o; - } - -}