http://git-wip-us.apache.org/repos/asf/hadoop/blob/daf3e4ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDBTimelineStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDBTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDBTimelineStore.java new file mode 100644 index 0000000..8b6a51b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDBTimelineStore.java @@ -0,0 +1,1807 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timeline; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeMap; + +import org.apache.commons.collections.map.LRUMap; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; +import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; +import org.apache.hadoop.yarn.server.timeline.RollingLevelDB.RollingWriteBatch; +import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl; +import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyBuilder; +import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyParser; + +import org.fusesource.leveldbjni.JniDBFactory; +import org.iq80.leveldb.DB; +import org.iq80.leveldb.DBException; +import org.iq80.leveldb.DBIterator; +import org.iq80.leveldb.Options; +import org.iq80.leveldb.ReadOptions; +import org.iq80.leveldb.WriteBatch; +import org.nustaq.serialization.FSTConfiguration; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.readReverseOrderedLong; +import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.writeReverseOrderedLong; +import static org.apache.hadoop.yarn.server.timeline.TimelineDataManager.DEFAULT_DOMAIN_ID; +import static org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.prefixMatches; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_TTL_MS; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_TTL_MS; + +import static org.fusesource.leveldbjni.JniDBFactory.bytes; + +/** + * <p> + * An implementation of an application timeline store backed by leveldb. + * </p> + * + * <p> + * There are three sections of the db, the start time section, the entity + * section, and the indexed entity section. + * </p> + * + * <p> + * The start time section is used to retrieve the unique start time for a given + * entity. Its values each contain a start time while its keys are of the form: + * </p> + * + * <pre> + * START_TIME_LOOKUP_PREFIX + entity type + entity id + * </pre> + * + * <p> + * The entity section is ordered by entity type, then entity start time + * descending, then entity ID. There are four sub-sections of the entity + * section: events, primary filters, related entities, and other info. The event + * entries have event info serialized into their values. The other info entries + * have values corresponding to the values of the other info name/value map for + * the entry (note the names are contained in the key). All other entries have + * empty values. The key structure is as follows: + * </p> + * + * <pre> + * ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id + * + * ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id + + * DOMAIN_ID_COLUMN + * + * ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id + + * EVENTS_COLUMN + reveventtimestamp + eventtype + * + * ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id + + * PRIMARY_FILTERS_COLUMN + name + value + * + * ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id + + * OTHER_INFO_COLUMN + name + * + * ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id + + * RELATED_ENTITIES_COLUMN + relatedentity type + relatedentity id + * </pre> + * + * <p> + * The indexed entity section contains a primary filter name and primary filter + * value as the prefix. Within a given name/value, entire entity entries are + * stored in the same format as described in the entity section above (below, + * "key" represents any one of the possible entity entry keys described above). + * </p> + * + * <pre> + * INDEXED_ENTRY_PREFIX + primaryfilter name + primaryfilter value + + * key + * </pre> + */ [email protected] [email protected] +public class RollingLevelDBTimelineStore extends AbstractService implements + TimelineStore { + private static final Log LOG = LogFactory + .getLog(RollingLevelDBTimelineStore.class); + private static FSTConfiguration fstConf = + FSTConfiguration.createDefaultConfiguration(); + + static { + fstConf.setShareReferences(false); + } + + @Private + @VisibleForTesting + static final String FILENAME = "leveldb-timeline-store"; + static final String DOMAIN = "domain-ldb"; + static final String ENTITY = "entity-ldb"; + static final String INDEX = "indexes-ldb"; + static final String STARTTIME = "starttime-ldb"; + static final String OWNER = "owner-ldb"; + + private static final byte[] DOMAIN_ID_COLUMN = "d".getBytes(UTF_8); + private static final byte[] EVENTS_COLUMN = "e".getBytes(UTF_8); + private static final byte[] PRIMARY_FILTERS_COLUMN = "f".getBytes(UTF_8); + private static final byte[] OTHER_INFO_COLUMN = "i".getBytes(UTF_8); + private static final byte[] RELATED_ENTITIES_COLUMN = "r".getBytes(UTF_8); + + private static final byte[] DESCRIPTION_COLUMN = "d".getBytes(UTF_8); + private static final byte[] OWNER_COLUMN = "o".getBytes(UTF_8); + private static final byte[] READER_COLUMN = "r".getBytes(UTF_8); + private static final byte[] WRITER_COLUMN = "w".getBytes(UTF_8); + private static final byte[] TIMESTAMP_COLUMN = "t".getBytes(UTF_8); + + private static final byte[] EMPTY_BYTES = new byte[0]; + + private static final String TIMELINE_STORE_VERSION_KEY = + "timeline-store-version"; + + private static final Version CURRENT_VERSION_INFO = Version.newInstance(1, 0); + + private static long writeBatchSize = 10000; + + @Private + @VisibleForTesting + static final FsPermission LEVELDB_DIR_UMASK = FsPermission + .createImmutable((short) 0700); + + private Map<EntityIdentifier, Long> startTimeWriteCache; + private Map<EntityIdentifier, Long> startTimeReadCache; + + private DB domaindb; + private RollingLevelDB entitydb; + private RollingLevelDB indexdb; + private DB starttimedb; + private DB ownerdb; + + private Thread deletionThread; + + public RollingLevelDBTimelineStore() { + super(RollingLevelDBTimelineStore.class.getName()); + } + + @Override + @SuppressWarnings("unchecked") + protected void serviceInit(Configuration conf) throws Exception { + Preconditions + .checkArgument(conf.getLong(TIMELINE_SERVICE_TTL_MS, + DEFAULT_TIMELINE_SERVICE_TTL_MS) > 0, + "%s property value should be greater than zero", + TIMELINE_SERVICE_TTL_MS); + Preconditions.checkArgument(conf.getLong( + TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS, + DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS) > 0, + "%s property value should be greater than zero", + TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS); + Preconditions.checkArgument(conf.getLong( + TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE, + DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE) >= 0, + "%s property value should be greater than or equal to zero", + TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE); + Preconditions.checkArgument(conf.getLong( + TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE, + DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE) > 0, + " %s property value should be greater than zero", + TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE); + Preconditions.checkArgument(conf.getLong( + TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE, + DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE) > 0, + "%s property value should be greater than zero", + TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE); + Preconditions.checkArgument(conf.getLong( + TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES, + DEFAULT_TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES) > 0, + "%s property value should be greater than zero", + TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES); + Preconditions.checkArgument(conf.getLong( + TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE, + DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE) > 0, + "%s property value should be greater than zero", + TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE); + + Options options = new Options(); + options.createIfMissing(true); + options.cacheSize(conf.getLong( + TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE, + DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE)); + JniDBFactory factory = new JniDBFactory(); + Path dbPath = new Path( + conf.get(TIMELINE_SERVICE_LEVELDB_PATH), FILENAME); + Path domainDBPath = new Path(dbPath, DOMAIN); + Path starttimeDBPath = new Path(dbPath, STARTTIME); + Path ownerDBPath = new Path(dbPath, OWNER); + FileSystem localFS = null; + try { + localFS = FileSystem.getLocal(conf); + if (!localFS.exists(dbPath)) { + if (!localFS.mkdirs(dbPath)) { + throw new IOException("Couldn't create directory for leveldb " + + "timeline store " + dbPath); + } + localFS.setPermission(dbPath, LEVELDB_DIR_UMASK); + } + if (!localFS.exists(domainDBPath)) { + if (!localFS.mkdirs(domainDBPath)) { + throw new IOException("Couldn't create directory for leveldb " + + "timeline store " + domainDBPath); + } + localFS.setPermission(domainDBPath, LEVELDB_DIR_UMASK); + } + if (!localFS.exists(starttimeDBPath)) { + if (!localFS.mkdirs(starttimeDBPath)) { + throw new IOException("Couldn't create directory for leveldb " + + "timeline store " + starttimeDBPath); + } + localFS.setPermission(starttimeDBPath, LEVELDB_DIR_UMASK); + } + if (!localFS.exists(ownerDBPath)) { + if (!localFS.mkdirs(ownerDBPath)) { + throw new IOException("Couldn't create directory for leveldb " + + "timeline store " + ownerDBPath); + } + localFS.setPermission(ownerDBPath, LEVELDB_DIR_UMASK); + } + } finally { + IOUtils.cleanup(LOG, localFS); + } + options.maxOpenFiles(conf.getInt( + TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES, + DEFAULT_TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES)); + options.writeBufferSize(conf.getInt( + TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE, + DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE)); + LOG.info("Using leveldb path " + dbPath); + domaindb = factory.open(new File(domainDBPath.toString()), options); + entitydb = new RollingLevelDB(ENTITY); + entitydb.init(conf); + indexdb = new RollingLevelDB(INDEX); + indexdb.init(conf); + starttimedb = factory.open(new File(starttimeDBPath.toString()), options); + ownerdb = factory.open(new File(ownerDBPath.toString()), options); + checkVersion(); + startTimeWriteCache = Collections.synchronizedMap(new LRUMap( + getStartTimeWriteCacheSize(conf))); + startTimeReadCache = Collections.synchronizedMap(new LRUMap( + getStartTimeReadCacheSize(conf))); + + writeBatchSize = conf.getInt( + TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE, + DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE); + + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + if (getConfig().getBoolean(TIMELINE_SERVICE_TTL_ENABLE, true)) { + deletionThread = new EntityDeletionThread(getConfig()); + deletionThread.start(); + } + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + if (deletionThread != null) { + deletionThread.interrupt(); + LOG.info("Waiting for deletion thread to complete its current action"); + try { + deletionThread.join(); + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting for deletion thread to complete," + + " closing db now", e); + } + } + IOUtils.cleanup(LOG, domaindb); + IOUtils.cleanup(LOG, starttimedb); + IOUtils.cleanup(LOG, ownerdb); + entitydb.stop(); + indexdb.stop(); + super.serviceStop(); + } + + private class EntityDeletionThread extends Thread { + private final long ttl; + private final long ttlInterval; + + public EntityDeletionThread(Configuration conf) { + ttl = conf.getLong(TIMELINE_SERVICE_TTL_MS, + DEFAULT_TIMELINE_SERVICE_TTL_MS); + ttlInterval = conf.getLong( + TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS, + DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS); + LOG.info("Starting deletion thread with ttl " + ttl + " and cycle " + + "interval " + ttlInterval); + } + + @Override + public void run() { + Thread.currentThread().setName("Leveldb Timeline Store Retention"); + while (true) { + long timestamp = System.currentTimeMillis() - ttl; + try { + discardOldEntities(timestamp); + Thread.sleep(ttlInterval); + } catch (IOException e) { + LOG.error(e); + } catch (InterruptedException e) { + LOG.info("Deletion thread received interrupt, exiting"); + break; + } + } + } + } + + @Override + public TimelineEntity getEntity(String entityId, String entityType, + EnumSet<Field> fields) throws IOException { + Long revStartTime = getStartTimeLong(entityId, entityType); + if (revStartTime == null) { + return null; + } + byte[] prefix = KeyBuilder.newInstance().add(entityType) + .add(writeReverseOrderedLong(revStartTime)).add(entityId) + .getBytesForLookup(); + + DBIterator iterator = null; + try { + DB db = entitydb.getDBForStartTime(revStartTime); + if (db == null) { + return null; + } + iterator = db.iterator(); + iterator.seek(prefix); + + return getEntity(entityId, entityType, revStartTime, fields, iterator, + prefix, prefix.length); + } finally { + IOUtils.cleanup(LOG, iterator); + } + } + + /** + * Read entity from a db iterator. If no information is found in the specified + * fields for this entity, return null. + */ + private static TimelineEntity getEntity(String entityId, String entityType, + Long startTime, EnumSet<Field> fields, DBIterator iterator, + byte[] prefix, int prefixlen) throws IOException { + if (fields == null) { + fields = EnumSet.allOf(Field.class); + } + + TimelineEntity entity = new TimelineEntity(); + boolean events = false; + boolean lastEvent = false; + if (fields.contains(Field.EVENTS)) { + events = true; + } else if (fields.contains(Field.LAST_EVENT_ONLY)) { + lastEvent = true; + } else { + entity.setEvents(null); + } + boolean relatedEntities = false; + if (fields.contains(Field.RELATED_ENTITIES)) { + relatedEntities = true; + } else { + entity.setRelatedEntities(null); + } + boolean primaryFilters = false; + if (fields.contains(Field.PRIMARY_FILTERS)) { + primaryFilters = true; + } else { + entity.setPrimaryFilters(null); + } + boolean otherInfo = false; + if (fields.contains(Field.OTHER_INFO)) { + otherInfo = true; + } else { + entity.setOtherInfo(null); + } + + // iterate through the entity's entry, parsing information if it is part + // of a requested field + for (; iterator.hasNext(); iterator.next()) { + byte[] key = iterator.peekNext().getKey(); + if (!prefixMatches(prefix, prefixlen, key)) { + break; + } + if (key.length == prefixlen) { + continue; + } + if (key[prefixlen] == PRIMARY_FILTERS_COLUMN[0]) { + if (primaryFilters) { + addPrimaryFilter(entity, key, prefixlen + + PRIMARY_FILTERS_COLUMN.length); + } + } else if (key[prefixlen] == OTHER_INFO_COLUMN[0]) { + if (otherInfo) { + entity.addOtherInfo( + parseRemainingKey(key, prefixlen + OTHER_INFO_COLUMN.length), + fstConf.asObject(iterator.peekNext().getValue())); + } + } else if (key[prefixlen] == RELATED_ENTITIES_COLUMN[0]) { + if (relatedEntities) { + addRelatedEntity(entity, key, prefixlen + + RELATED_ENTITIES_COLUMN.length); + } + } else if (key[prefixlen] == EVENTS_COLUMN[0]) { + if (events || (lastEvent && entity.getEvents().size() == 0)) { + TimelineEvent event = getEntityEvent(null, key, prefixlen + + EVENTS_COLUMN.length, iterator.peekNext().getValue()); + if (event != null) { + entity.addEvent(event); + } + } + } else if (key[prefixlen] == DOMAIN_ID_COLUMN[0]) { + byte[] v = iterator.peekNext().getValue(); + String domainId = new String(v, UTF_8); + entity.setDomainId(domainId); + } else { + LOG.warn(String.format("Found unexpected column for entity %s of " + + "type %s (0x%02x)", entityId, entityType, key[prefixlen])); + } + } + + entity.setEntityId(entityId); + entity.setEntityType(entityType); + entity.setStartTime(startTime); + + return entity; + } + + @Override + public TimelineEvents getEntityTimelines(String entityType, + SortedSet<String> entityIds, Long limit, Long windowStart, + Long windowEnd, Set<String> eventType) throws IOException { + TimelineEvents events = new TimelineEvents(); + if (entityIds == null || entityIds.isEmpty()) { + return events; + } + // create a lexicographically-ordered map from start time to entities + Map<byte[], List<EntityIdentifier>> startTimeMap = + new TreeMap<byte[], List<EntityIdentifier>>( + new Comparator<byte[]>() { + @Override + public int compare(byte[] o1, byte[] o2) { + return WritableComparator.compareBytes(o1, 0, o1.length, o2, 0, + o2.length); + } + }); + DBIterator iterator = null; + try { + // look up start times for the specified entities + // skip entities with no start time + for (String entityId : entityIds) { + byte[] startTime = getStartTime(entityId, entityType); + if (startTime != null) { + List<EntityIdentifier> entities = startTimeMap.get(startTime); + if (entities == null) { + entities = new ArrayList<EntityIdentifier>(); + startTimeMap.put(startTime, entities); + } + entities.add(new EntityIdentifier(entityId, entityType)); + } + } + for (Entry<byte[], List<EntityIdentifier>> entry : startTimeMap + .entrySet()) { + // look up the events matching the given parameters (limit, + // start time, end time, event types) for entities whose start times + // were found and add the entities to the return list + byte[] revStartTime = entry.getKey(); + for (EntityIdentifier entityIdentifier : entry.getValue()) { + EventsOfOneEntity entity = new EventsOfOneEntity(); + entity.setEntityId(entityIdentifier.getId()); + entity.setEntityType(entityType); + events.addEvent(entity); + KeyBuilder kb = KeyBuilder.newInstance().add(entityType) + .add(revStartTime).add(entityIdentifier.getId()) + .add(EVENTS_COLUMN); + byte[] prefix = kb.getBytesForLookup(); + if (windowEnd == null) { + windowEnd = Long.MAX_VALUE; + } + byte[] revts = writeReverseOrderedLong(windowEnd); + kb.add(revts); + byte[] first = kb.getBytesForLookup(); + byte[] last = null; + if (windowStart != null) { + last = KeyBuilder.newInstance().add(prefix) + .add(writeReverseOrderedLong(windowStart)).getBytesForLookup(); + } + if (limit == null) { + limit = DEFAULT_LIMIT; + } + DB db = entitydb.getDBForStartTime(readReverseOrderedLong( + revStartTime, 0)); + if (db == null) { + continue; + } + iterator = db.iterator(); + for (iterator.seek(first); entity.getEvents().size() < limit + && iterator.hasNext(); iterator.next()) { + byte[] key = iterator.peekNext().getKey(); + if (!prefixMatches(prefix, prefix.length, key) + || (last != null && WritableComparator.compareBytes(key, 0, + key.length, last, 0, last.length) > 0)) { + break; + } + TimelineEvent event = getEntityEvent(eventType, key, prefix.length, + iterator.peekNext().getValue()); + if (event != null) { + entity.addEvent(event); + } + } + } + } + } finally { + IOUtils.cleanup(LOG, iterator); + } + return events; + } + + @Override + public TimelineEntities getEntities(String entityType, Long limit, + Long windowStart, Long windowEnd, String fromId, Long fromTs, + NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters, + EnumSet<Field> fields, CheckAcl checkAcl) throws IOException { + if (primaryFilter == null) { + // if no primary filter is specified, prefix the lookup with + // ENTITY_ENTRY_PREFIX + return getEntityByTime(EMPTY_BYTES, entityType, limit, windowStart, + windowEnd, fromId, fromTs, secondaryFilters, fields, checkAcl, false); + } else { + // if a primary filter is specified, prefix the lookup with + // INDEXED_ENTRY_PREFIX + primaryFilterName + primaryFilterValue + + // ENTITY_ENTRY_PREFIX + byte[] base = KeyBuilder.newInstance().add(primaryFilter.getName()) + .add(fstConf.asByteArray(primaryFilter.getValue()), true) + .getBytesForLookup(); + return getEntityByTime(base, entityType, limit, windowStart, windowEnd, + fromId, fromTs, secondaryFilters, fields, checkAcl, true); + } + } + + /** + * Retrieves a list of entities satisfying given parameters. + * + * @param base + * A byte array prefix for the lookup + * @param entityType + * The type of the entity + * @param limit + * A limit on the number of entities to return + * @param starttime + * The earliest entity start time to retrieve (exclusive) + * @param endtime + * The latest entity start time to retrieve (inclusive) + * @param fromId + * Retrieve entities starting with this entity + * @param fromTs + * Ignore entities with insert timestamp later than this ts + * @param secondaryFilters + * Filter pairs that the entities should match + * @param fields + * The set of fields to retrieve + * @param usingPrimaryFilter + * true if this query is using a primary filter + * @return A list of entities + * @throws IOException + */ + private TimelineEntities getEntityByTime(byte[] base, String entityType, + Long limit, Long starttime, Long endtime, String fromId, Long fromTs, + Collection<NameValuePair> secondaryFilters, EnumSet<Field> fields, + CheckAcl checkAcl, boolean usingPrimaryFilter) throws IOException { + DBIterator iterator = null; + try { + KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType); + // only db keys matching the prefix (base + entity type) will be parsed + byte[] prefix = kb.getBytesForLookup(); + if (endtime == null) { + // if end time is null, place no restriction on end time + endtime = Long.MAX_VALUE; + } + + // Sanitize the fields parameter + if (fields == null) { + fields = EnumSet.allOf(Field.class); + } + + // construct a first key that will be seeked to using end time or fromId + long firstStartTime = Long.MAX_VALUE; + byte[] first = null; + if (fromId != null) { + Long fromIdStartTime = getStartTimeLong(fromId, entityType); + if (fromIdStartTime == null) { + // no start time for provided id, so return empty entities + return new TimelineEntities(); + } + if (fromIdStartTime <= endtime) { + // if provided id's start time falls before the end of the window, + // use it to construct the seek key + firstStartTime = fromIdStartTime; + first = kb.add(writeReverseOrderedLong(fromIdStartTime)).add(fromId) + .getBytesForLookup(); + } + } + // if seek key wasn't constructed using fromId, construct it using end ts + if (first == null) { + firstStartTime = endtime; + first = kb.add(writeReverseOrderedLong(endtime)).getBytesForLookup(); + } + byte[] last = null; + if (starttime != null) { + // if start time is not null, set a last key that will not be + // iterated past + last = KeyBuilder.newInstance().add(base).add(entityType) + .add(writeReverseOrderedLong(starttime)).getBytesForLookup(); + } + if (limit == null) { + // if limit is not specified, use the default + limit = DEFAULT_LIMIT; + } + + TimelineEntities entities = new TimelineEntities(); + RollingLevelDB rollingdb = null; + if (usingPrimaryFilter) { + rollingdb = indexdb; + } else { + rollingdb = entitydb; + } + + DB db = rollingdb.getDBForStartTime(firstStartTime); + while (entities.getEntities().size() < limit && db != null) { + iterator = db.iterator(); + iterator.seek(first); + + // iterate until one of the following conditions is met: limit is + // reached, there are no more keys, the key prefix no longer matches, + // or a start time has been specified and reached/exceeded + while (entities.getEntities().size() < limit && iterator.hasNext()) { + byte[] key = iterator.peekNext().getKey(); + if (!prefixMatches(prefix, prefix.length, key) + || (last != null && WritableComparator.compareBytes(key, 0, + key.length, last, 0, last.length) > 0)) { + break; + } + // read the start time and entity id from the current key + KeyParser kp = new KeyParser(key, prefix.length); + Long startTime = kp.getNextLong(); + String entityId = kp.getNextString(); + + if (fromTs != null) { + long insertTime = readReverseOrderedLong(iterator.peekNext() + .getValue(), 0); + if (insertTime > fromTs) { + byte[] firstKey = key; + while (iterator.hasNext()) { + key = iterator.peekNext().getKey(); + iterator.next(); + if (!prefixMatches(firstKey, kp.getOffset(), key)) { + break; + } + } + continue; + } + } + // Even if other info and primary filter fields are not included, we + // still need to load them to match secondary filters when they are + // non-empty + EnumSet<Field> queryFields = EnumSet.copyOf(fields); + boolean addPrimaryFilters = false; + boolean addOtherInfo = false; + if (secondaryFilters != null && secondaryFilters.size() > 0) { + if (!queryFields.contains(Field.PRIMARY_FILTERS)) { + queryFields.add(Field.PRIMARY_FILTERS); + addPrimaryFilters = true; + } + if (!queryFields.contains(Field.OTHER_INFO)) { + queryFields.add(Field.OTHER_INFO); + addOtherInfo = true; + } + } + + // parse the entity that owns this key, iterating over all keys for + // the entity + TimelineEntity entity = null; + if (usingPrimaryFilter) { + entity = getEntity(entityId, entityType, queryFields); + iterator.next(); + } else { + entity = getEntity(entityId, entityType, startTime, queryFields, + iterator, key, kp.getOffset()); + } + // determine if the retrieved entity matches the provided secondary + // filters, and if so add it to the list of entities to return + boolean filterPassed = true; + if (secondaryFilters != null) { + for (NameValuePair filter : secondaryFilters) { + Object v = entity.getOtherInfo().get(filter.getName()); + if (v == null) { + Set<Object> vs = entity.getPrimaryFilters() + .get(filter.getName()); + if (vs == null || !vs.contains(filter.getValue())) { + filterPassed = false; + break; + } + } else if (!v.equals(filter.getValue())) { + filterPassed = false; + break; + } + } + } + if (filterPassed) { + if (entity.getDomainId() == null) { + entity.setDomainId(DEFAULT_DOMAIN_ID); + } + if (checkAcl == null || checkAcl.check(entity)) { + // Remove primary filter and other info if they are added for + // matching secondary filters + if (addPrimaryFilters) { + entity.setPrimaryFilters(null); + } + if (addOtherInfo) { + entity.setOtherInfo(null); + } + entities.addEntity(entity); + } + } + } + db = rollingdb.getPreviousDB(db); + } + return entities; + } finally { + IOUtils.cleanup(LOG, iterator); + } + } + + /** + * Put a single entity. If there is an error, add a TimelinePutError to the + * given response. + * + * @param entityUpdates + * a map containing all the scheduled writes for this put to the + * entity db + * @param indexUpdates + * a map containing all the scheduled writes for this put to the + * index db + */ + private long putEntities(TreeMap<Long, RollingWriteBatch> entityUpdates, + TreeMap<Long, RollingWriteBatch> indexUpdates, TimelineEntity entity, + TimelinePutResponse response) { + + long putCount = 0; + List<EntityIdentifier> relatedEntitiesWithoutStartTimes = + new ArrayList<EntityIdentifier>(); + byte[] revStartTime = null; + Map<String, Set<Object>> primaryFilters = null; + try { + List<TimelineEvent> events = entity.getEvents(); + // look up the start time for the entity + Long startTime = getAndSetStartTime(entity.getEntityId(), + entity.getEntityType(), entity.getStartTime(), events); + if (startTime == null) { + // if no start time is found, add an error and return + TimelinePutError error = new TimelinePutError(); + error.setEntityId(entity.getEntityId()); + error.setEntityType(entity.getEntityType()); + error.setErrorCode(TimelinePutError.NO_START_TIME); + response.addError(error); + return putCount; + } + + // Must have a domain + if (StringUtils.isEmpty(entity.getDomainId())) { + TimelinePutError error = new TimelinePutError(); + error.setEntityId(entity.getEntityId()); + error.setEntityType(entity.getEntityType()); + error.setErrorCode(TimelinePutError.NO_DOMAIN); + response.addError(error); + return putCount; + } + + revStartTime = writeReverseOrderedLong(startTime); + long roundedStartTime = entitydb.computeCurrentCheckMillis(startTime); + RollingWriteBatch rollingWriteBatch = entityUpdates.get(roundedStartTime); + if (rollingWriteBatch == null) { + DB db = entitydb.getDBForStartTime(startTime); + if (db != null) { + WriteBatch writeBatch = db.createWriteBatch(); + rollingWriteBatch = new RollingWriteBatch(db, writeBatch); + entityUpdates.put(roundedStartTime, rollingWriteBatch); + } + } + if (rollingWriteBatch == null) { + // if no start time is found, add an error and return + TimelinePutError error = new TimelinePutError(); + error.setEntityId(entity.getEntityId()); + error.setEntityType(entity.getEntityType()); + error.setErrorCode(TimelinePutError.EXPIRED_ENTITY); + response.addError(error); + return putCount; + } + WriteBatch writeBatch = rollingWriteBatch.getWriteBatch(); + + // Save off the getBytes conversion to avoid unnecessary cost + byte[] entityIdBytes = entity.getEntityId().getBytes(UTF_8); + byte[] entityTypeBytes = entity.getEntityType().getBytes(UTF_8); + byte[] domainIdBytes = entity.getDomainId().getBytes(UTF_8); + + // write entity marker + byte[] markerKey = KeyBuilder.newInstance(3).add(entityTypeBytes, true) + .add(revStartTime).add(entityIdBytes, true).getBytesForLookup(); + writeBatch.put(markerKey, EMPTY_BYTES); + ++putCount; + + // write domain id entry + byte[] domainkey = KeyBuilder.newInstance(4).add(entityTypeBytes, true) + .add(revStartTime).add(entityIdBytes, true).add(DOMAIN_ID_COLUMN) + .getBytes(); + writeBatch.put(domainkey, domainIdBytes); + ++putCount; + + // write event entries + if (events != null) { + for (TimelineEvent event : events) { + byte[] revts = writeReverseOrderedLong(event.getTimestamp()); + byte[] key = KeyBuilder.newInstance().add(entityTypeBytes, true) + .add(revStartTime).add(entityIdBytes, true).add(EVENTS_COLUMN) + .add(revts).add(event.getEventType().getBytes(UTF_8)).getBytes(); + byte[] value = fstConf.asByteArray(event.getEventInfo()); + writeBatch.put(key, value); + ++putCount; + } + } + + // write primary filter entries + primaryFilters = entity.getPrimaryFilters(); + if (primaryFilters != null) { + for (Entry<String, Set<Object>> primaryFilter : primaryFilters + .entrySet()) { + for (Object primaryFilterValue : primaryFilter.getValue()) { + byte[] key = KeyBuilder.newInstance(6).add(entityTypeBytes, true) + .add(revStartTime).add(entityIdBytes, true) + .add(PRIMARY_FILTERS_COLUMN).add(primaryFilter.getKey()) + .add(fstConf.asByteArray(primaryFilterValue)).getBytes(); + writeBatch.put(key, EMPTY_BYTES); + ++putCount; + } + } + } + + // write other info entries + Map<String, Object> otherInfo = entity.getOtherInfo(); + if (otherInfo != null) { + for (Entry<String, Object> info : otherInfo.entrySet()) { + byte[] key = KeyBuilder.newInstance(5).add(entityTypeBytes, true) + .add(revStartTime).add(entityIdBytes, true) + .add(OTHER_INFO_COLUMN).add(info.getKey()).getBytes(); + byte[] value = fstConf.asByteArray(info.getValue()); + writeBatch.put(key, value); + ++putCount; + } + } + + // write related entity entries + Map<String, Set<String>> relatedEntities = entity.getRelatedEntities(); + if (relatedEntities != null) { + for (Entry<String, Set<String>> relatedEntityList : relatedEntities + .entrySet()) { + String relatedEntityType = relatedEntityList.getKey(); + for (String relatedEntityId : relatedEntityList.getValue()) { + // look up start time of related entity + Long relatedStartTimeLong = getStartTimeLong(relatedEntityId, + relatedEntityType); + // delay writing the related entity if no start time is found + if (relatedStartTimeLong == null) { + relatedEntitiesWithoutStartTimes.add(new EntityIdentifier( + relatedEntityId, relatedEntityType)); + continue; + } + + byte[] relatedEntityStartTime = + writeReverseOrderedLong(relatedStartTimeLong); + long relatedRoundedStartTime = entitydb + .computeCurrentCheckMillis(relatedStartTimeLong); + RollingWriteBatch relatedRollingWriteBatch = entityUpdates + .get(relatedRoundedStartTime); + if (relatedRollingWriteBatch == null) { + DB db = entitydb.getDBForStartTime(relatedStartTimeLong); + if (db != null) { + WriteBatch relatedWriteBatch = db.createWriteBatch(); + relatedRollingWriteBatch = new RollingWriteBatch(db, + relatedWriteBatch); + entityUpdates.put(relatedRoundedStartTime, + relatedRollingWriteBatch); + } + } + if (relatedRollingWriteBatch == null) { + // if no start time is found, add an error and return + TimelinePutError error = new TimelinePutError(); + error.setEntityId(entity.getEntityId()); + error.setEntityType(entity.getEntityType()); + error.setErrorCode(TimelinePutError.EXPIRED_ENTITY); + response.addError(error); + continue; + } + // This is the existing entity + byte[] relatedDomainIdBytes = relatedRollingWriteBatch.getDB().get( + createDomainIdKey(relatedEntityId, relatedEntityType, + relatedEntityStartTime)); + // The timeline data created by the server before 2.6 won't have + // the domain field. We assume this timeline data is in the + // default timeline domain. + String domainId = null; + if (relatedDomainIdBytes == null) { + domainId = TimelineDataManager.DEFAULT_DOMAIN_ID; + } else { + domainId = new String(relatedDomainIdBytes, UTF_8); + } + if (!domainId.equals(entity.getDomainId())) { + // in this case the entity will be put, but the relation will be + // ignored + TimelinePutError error = new TimelinePutError(); + error.setEntityId(entity.getEntityId()); + error.setEntityType(entity.getEntityType()); + error.setErrorCode(TimelinePutError.FORBIDDEN_RELATION); + response.addError(error); + continue; + } + // write "forward" entry (related entity -> entity) + byte[] key = createRelatedEntityKey(relatedEntityId, + relatedEntityType, relatedEntityStartTime, + entity.getEntityId(), entity.getEntityType()); + WriteBatch relatedWriteBatch = relatedRollingWriteBatch + .getWriteBatch(); + relatedWriteBatch.put(key, EMPTY_BYTES); + ++putCount; + } + } + } + + // write index entities + RollingWriteBatch indexRollingWriteBatch = indexUpdates + .get(roundedStartTime); + if (indexRollingWriteBatch == null) { + DB db = indexdb.getDBForStartTime(startTime); + if (db != null) { + WriteBatch indexWriteBatch = db.createWriteBatch(); + indexRollingWriteBatch = new RollingWriteBatch(db, indexWriteBatch); + indexUpdates.put(roundedStartTime, indexRollingWriteBatch); + } + } + if (indexRollingWriteBatch == null) { + // if no start time is found, add an error and return + TimelinePutError error = new TimelinePutError(); + error.setEntityId(entity.getEntityId()); + error.setEntityType(entity.getEntityType()); + error.setErrorCode(TimelinePutError.EXPIRED_ENTITY); + response.addError(error); + return putCount; + } + WriteBatch indexWriteBatch = indexRollingWriteBatch.getWriteBatch(); + putCount += writePrimaryFilterEntries(indexWriteBatch, primaryFilters, + markerKey, EMPTY_BYTES); + } catch (IOException e) { + LOG.error("Error putting entity " + entity.getEntityId() + " of type " + + entity.getEntityType(), e); + TimelinePutError error = new TimelinePutError(); + error.setEntityId(entity.getEntityId()); + error.setEntityType(entity.getEntityType()); + error.setErrorCode(TimelinePutError.IO_EXCEPTION); + response.addError(error); + } + + for (EntityIdentifier relatedEntity : relatedEntitiesWithoutStartTimes) { + try { + Long relatedEntityStartAndInsertTime = getAndSetStartTime( + relatedEntity.getId(), relatedEntity.getType(), + readReverseOrderedLong(revStartTime, 0), null); + if (relatedEntityStartAndInsertTime == null) { + throw new IOException("Error setting start time for related entity"); + } + long relatedStartTimeLong = relatedEntityStartAndInsertTime; + long relatedRoundedStartTime = entitydb + .computeCurrentCheckMillis(relatedStartTimeLong); + RollingWriteBatch relatedRollingWriteBatch = entityUpdates + .get(relatedRoundedStartTime); + if (relatedRollingWriteBatch == null) { + DB db = entitydb.getDBForStartTime(relatedStartTimeLong); + if (db != null) { + WriteBatch relatedWriteBatch = db.createWriteBatch(); + relatedRollingWriteBatch = new RollingWriteBatch(db, + relatedWriteBatch); + entityUpdates + .put(relatedRoundedStartTime, relatedRollingWriteBatch); + } + } + if (relatedRollingWriteBatch == null) { + // if no start time is found, add an error and return + TimelinePutError error = new TimelinePutError(); + error.setEntityId(entity.getEntityId()); + error.setEntityType(entity.getEntityType()); + error.setErrorCode(TimelinePutError.EXPIRED_ENTITY); + response.addError(error); + continue; + } + WriteBatch relatedWriteBatch = relatedRollingWriteBatch.getWriteBatch(); + byte[] relatedEntityStartTime = + writeReverseOrderedLong(relatedEntityStartAndInsertTime); + // This is the new entity, the domain should be the same + byte[] key = createDomainIdKey(relatedEntity.getId(), + relatedEntity.getType(), relatedEntityStartTime); + relatedWriteBatch.put(key, entity.getDomainId().getBytes(UTF_8)); + ++putCount; + relatedWriteBatch.put( + createRelatedEntityKey(relatedEntity.getId(), + relatedEntity.getType(), relatedEntityStartTime, + entity.getEntityId(), entity.getEntityType()), EMPTY_BYTES); + ++putCount; + relatedWriteBatch.put( + createEntityMarkerKey(relatedEntity.getId(), + relatedEntity.getType(), relatedEntityStartTime), EMPTY_BYTES); + ++putCount; + } catch (IOException e) { + LOG.error( + "Error putting related entity " + relatedEntity.getId() + + " of type " + relatedEntity.getType() + " for entity " + + entity.getEntityId() + " of type " + entity.getEntityType(), + e); + TimelinePutError error = new TimelinePutError(); + error.setEntityId(entity.getEntityId()); + error.setEntityType(entity.getEntityType()); + error.setErrorCode(TimelinePutError.IO_EXCEPTION); + response.addError(error); + } + } + + return putCount; + } + + /** + * For a given key / value pair that has been written to the db, write + * additional entries to the db for each primary filter. + */ + private static long writePrimaryFilterEntries(WriteBatch writeBatch, + Map<String, Set<Object>> primaryFilters, byte[] key, byte[] value) + throws IOException { + long putCount = 0; + if (primaryFilters != null) { + for (Entry<String, Set<Object>> pf : primaryFilters.entrySet()) { + for (Object pfval : pf.getValue()) { + writeBatch.put(addPrimaryFilterToKey(pf.getKey(), pfval, key), value); + ++putCount; + } + } + } + return putCount; + } + + @Override + public TimelinePutResponse put(TimelineEntities entities) { + if (LOG.isDebugEnabled()) { + LOG.debug("Starting put"); + } + TimelinePutResponse response = new TimelinePutResponse(); + TreeMap<Long, RollingWriteBatch> entityUpdates = + new TreeMap<Long, RollingWriteBatch>(); + TreeMap<Long, RollingWriteBatch> indexUpdates = + new TreeMap<Long, RollingWriteBatch>(); + + long entityCount = 0; + long indexCount = 0; + + try { + + for (TimelineEntity entity : entities.getEntities()) { + entityCount += putEntities(entityUpdates, indexUpdates, entity, + response); + } + + for (RollingWriteBatch entityUpdate : entityUpdates.values()) { + entityUpdate.write(); + } + + for (RollingWriteBatch indexUpdate : indexUpdates.values()) { + indexUpdate.write(); + } + + } finally { + + for (RollingWriteBatch entityRollingWriteBatch : entityUpdates.values()) { + entityRollingWriteBatch.close(); + } + for (RollingWriteBatch indexRollingWriteBatch : indexUpdates.values()) { + indexRollingWriteBatch.close(); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Put " + entityCount + " new leveldb entity entries and " + + indexCount + " new leveldb index entries from " + + entities.getEntities().size() + " timeline entities"); + } + return response; + } + + /** + * Get the unique start time for a given entity as a byte array that sorts the + * timestamps in reverse order (see + * {@link GenericObjectMapper#writeReverseOrderedLong(long)}). + * + * @param entityId + * The id of the entity + * @param entityType + * The type of the entity + * @return A byte array, null if not found + * @throws IOException + */ + private byte[] getStartTime(String entityId, String entityType) + throws IOException { + Long l = getStartTimeLong(entityId, entityType); + return l == null ? null : writeReverseOrderedLong(l); + } + + /** + * Get the unique start time for a given entity as a Long. + * + * @param entityId + * The id of the entity + * @param entityType + * The type of the entity + * @return A Long, null if not found + * @throws IOException + */ + private Long getStartTimeLong(String entityId, String entityType) + throws IOException { + EntityIdentifier entity = new EntityIdentifier(entityId, entityType); + // start time is not provided, so try to look it up + if (startTimeReadCache.containsKey(entity)) { + // found the start time in the cache + return startTimeReadCache.get(entity); + } else { + // try to look up the start time in the db + byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType()); + byte[] v = starttimedb.get(b); + if (v == null) { + // did not find the start time in the db + return null; + } else { + // found the start time in the db + Long l = readReverseOrderedLong(v, 0); + startTimeReadCache.put(entity, l); + return l; + } + } + } + + /** + * Get the unique start time for a given entity as a byte array that sorts the + * timestamps in reverse order (see + * {@link GenericObjectMapper#writeReverseOrderedLong(long)}). If the start + * time doesn't exist, set it based on the information provided. Should only + * be called when a lock has been obtained on the entity. + * + * @param entityId + * The id of the entity + * @param entityType + * The type of the entity + * @param startTime + * The start time of the entity, or null + * @param events + * A list of events for the entity, or null + * @return A StartAndInsertTime + * @throws IOException + */ + private Long getAndSetStartTime(String entityId, String entityType, + Long startTime, List<TimelineEvent> events) throws IOException { + EntityIdentifier entity = new EntityIdentifier(entityId, entityType); + Long time = startTimeWriteCache.get(entity); + if (time != null) { + // return the value in the cache + return time; + } + if (startTime == null && events != null) { + // calculate best guess start time based on lowest event time + startTime = Long.MAX_VALUE; + for (TimelineEvent e : events) { + if (e.getTimestamp() < startTime) { + startTime = e.getTimestamp(); + } + } + } + // check the provided start time matches the db + return checkStartTimeInDb(entity, startTime); + } + + /** + * Checks db for start time and returns it if it exists. If it doesn't exist, + * writes the suggested start time (if it is not null). This is only called + * when the start time is not found in the cache, so it adds it back into the + * cache if it is found. Should only be called when a lock has been obtained + * on the entity. + */ + private Long checkStartTimeInDb(EntityIdentifier entity, + Long suggestedStartTime) throws IOException { + Long startAndInsertTime = null; + // create lookup key for start time + byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType()); + // retrieve value for key + byte[] v = starttimedb.get(b); + if (v == null) { + // start time doesn't exist in db + if (suggestedStartTime == null) { + return null; + } + startAndInsertTime = suggestedStartTime; + + // write suggested start time + starttimedb.put(b, writeReverseOrderedLong(suggestedStartTime)); + } else { + // found start time in db, so ignore suggested start time + startAndInsertTime = readReverseOrderedLong(v, 0); + } + startTimeWriteCache.put(entity, startAndInsertTime); + startTimeReadCache.put(entity, startAndInsertTime); + return startAndInsertTime; + } + + /** + * Creates a key for looking up the start time of a given entity, of the form + * START_TIME_LOOKUP_PREFIX + entity type + entity id. + */ + private static byte[] createStartTimeLookupKey(String entityId, + String entityType) throws IOException { + return KeyBuilder.newInstance().add(entityType).add(entityId).getBytes(); + } + + /** + * Creates an entity marker, serializing ENTITY_ENTRY_PREFIX + entity type + + * revstarttime + entity id. + */ + private static byte[] createEntityMarkerKey(String entityId, + String entityType, byte[] revStartTime) throws IOException { + return KeyBuilder.newInstance().add(entityType).add(revStartTime) + .add(entityId).getBytesForLookup(); + } + + /** + * Creates an index entry for the given key of the form INDEXED_ENTRY_PREFIX + + * primaryfiltername + primaryfiltervalue + key. + */ + private static byte[] addPrimaryFilterToKey(String primaryFilterName, + Object primaryFilterValue, byte[] key) throws IOException { + return KeyBuilder.newInstance().add(primaryFilterName) + .add(fstConf.asByteArray(primaryFilterValue), true).add(key).getBytes(); + } + + /** + * Creates an event object from the given key, offset, and value. If the event + * type is not contained in the specified set of event types, returns null. + */ + private static TimelineEvent getEntityEvent(Set<String> eventTypes, + byte[] key, int offset, byte[] value) throws IOException { + KeyParser kp = new KeyParser(key, offset); + long ts = kp.getNextLong(); + String tstype = kp.getNextString(); + if (eventTypes == null || eventTypes.contains(tstype)) { + TimelineEvent event = new TimelineEvent(); + event.setTimestamp(ts); + event.setEventType(tstype); + Object o = fstConf.asObject(value); + if (o == null) { + event.setEventInfo(null); + } else if (o instanceof Map) { + @SuppressWarnings("unchecked") + Map<String, Object> m = (Map<String, Object>) o; + event.setEventInfo(m); + } else { + throw new IOException("Couldn't deserialize event info map"); + } + return event; + } + return null; + } + + /** + * Parses the primary filter from the given key at the given offset and adds + * it to the given entity. + */ + private static void addPrimaryFilter(TimelineEntity entity, byte[] key, + int offset) throws IOException { + KeyParser kp = new KeyParser(key, offset); + String name = kp.getNextString(); + byte[] bytes = kp.getRemainingBytes(); + Object value = fstConf.asObject(bytes); + entity.addPrimaryFilter(name, value); + } + + /** + * Creates a string representation of the byte array from the given offset to + * the end of the array (for parsing other info keys). + */ + private static String parseRemainingKey(byte[] b, int offset) { + return new String(b, offset, b.length - offset, UTF_8); + } + + /** + * Creates a related entity key, serializing ENTITY_ENTRY_PREFIX + entity type + * + revstarttime + entity id + RELATED_ENTITIES_COLUMN + relatedentity type + + * relatedentity id. + */ + private static byte[] createRelatedEntityKey(String entityId, + String entityType, byte[] revStartTime, String relatedEntityId, + String relatedEntityType) throws IOException { + return KeyBuilder.newInstance().add(entityType).add(revStartTime) + .add(entityId).add(RELATED_ENTITIES_COLUMN).add(relatedEntityType) + .add(relatedEntityId).getBytes(); + } + + /** + * Parses the related entity from the given key at the given offset and adds + * it to the given entity. + */ + private static void addRelatedEntity(TimelineEntity entity, byte[] key, + int offset) throws IOException { + KeyParser kp = new KeyParser(key, offset); + String type = kp.getNextString(); + String id = kp.getNextString(); + entity.addRelatedEntity(type, id); + } + + /** + * Creates a domain id key, serializing ENTITY_ENTRY_PREFIX + entity type + + * revstarttime + entity id + DOMAIN_ID_COLUMN. + */ + private static byte[] createDomainIdKey(String entityId, String entityType, + byte[] revStartTime) throws IOException { + return KeyBuilder.newInstance().add(entityType).add(revStartTime) + .add(entityId).add(DOMAIN_ID_COLUMN).getBytes(); + } + + /** + * Clears the cache to test reloading start times from leveldb (only for + * testing). + */ + @VisibleForTesting + void clearStartTimeCache() { + startTimeWriteCache.clear(); + startTimeReadCache.clear(); + } + + @VisibleForTesting + static int getStartTimeReadCacheSize(Configuration conf) { + return conf + .getInt( + TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE, + DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE); + } + + @VisibleForTesting + static int getStartTimeWriteCacheSize(Configuration conf) { + return conf + .getInt( + TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE, + DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE); + } + + @VisibleForTesting + long evictOldStartTimes(long minStartTime) throws IOException { + LOG.info("Searching for start times to evict earlier than " + minStartTime); + + long batchSize = 0; + long totalCount = 0; + long startTimesCount = 0; + + WriteBatch writeBatch = null; + DBIterator iterator = null; + + try { + writeBatch = starttimedb.createWriteBatch(); + ReadOptions readOptions = new ReadOptions(); + readOptions.fillCache(false); + iterator = starttimedb.iterator(readOptions); + // seek to the first start time entry + iterator.seekToFirst(); + + // evaluate each start time entry to see if it needs to be evicted or not + while (iterator.hasNext()) { + Map.Entry<byte[], byte[]> current = iterator.next(); + byte[] entityKey = current.getKey(); + byte[] entityValue = current.getValue(); + long startTime = readReverseOrderedLong(entityValue, 0); + if (startTime < minStartTime) { + ++batchSize; + ++startTimesCount; + writeBatch.delete(entityKey); + + // a large delete will hold the lock for too long + if (batchSize >= writeBatchSize) { + if (LOG.isDebugEnabled()) { + LOG.debug("Preparing to delete a batch of " + batchSize + + " old start times"); + } + starttimedb.write(writeBatch); + if (LOG.isDebugEnabled()) { + LOG.debug("Deleted batch of " + batchSize + + ". Total start times deleted so far this cycle: " + + startTimesCount); + } + IOUtils.cleanup(LOG, writeBatch); + writeBatch = starttimedb.createWriteBatch(); + batchSize = 0; + } + } + ++totalCount; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Preparing to delete a batch of " + batchSize + + " old start times"); + } + starttimedb.write(writeBatch); + if (LOG.isDebugEnabled()) { + LOG.debug("Deleted batch of " + batchSize + + ". Total start times deleted so far this cycle: " + + startTimesCount); + } + LOG.info("Deleted " + startTimesCount + "/" + totalCount + + " start time entities earlier than " + minStartTime); + } finally { + IOUtils.cleanup(LOG, writeBatch); + IOUtils.cleanup(LOG, iterator); + } + return startTimesCount; + } + + /** + * Discards entities with start timestamp less than or equal to the given + * timestamp. + */ + @VisibleForTesting + void discardOldEntities(long timestamp) throws IOException, + InterruptedException { + long totalCount = 0; + long t1 = System.currentTimeMillis(); + try { + totalCount += evictOldStartTimes(timestamp); + indexdb.evictOldDBs(); + entitydb.evictOldDBs(); + } finally { + long t2 = System.currentTimeMillis(); + LOG.info("Discarded " + totalCount + " entities for timestamp " + + timestamp + " and earlier in " + (t2 - t1) / 1000.0 + " seconds"); + } + } + + Version loadVersion() throws IOException { + byte[] data = starttimedb.get(bytes(TIMELINE_STORE_VERSION_KEY)); + // if version is not stored previously, treat it as 1.0. + if (data == null || data.length == 0) { + return Version.newInstance(1, 0); + } + Version version = new VersionPBImpl(VersionProto.parseFrom(data)); + return version; + } + + // Only used for test + @VisibleForTesting + void storeVersion(Version state) throws IOException { + dbStoreVersion(state); + } + + private void dbStoreVersion(Version state) throws IOException { + String key = TIMELINE_STORE_VERSION_KEY; + byte[] data = ((VersionPBImpl) state).getProto().toByteArray(); + try { + starttimedb.put(bytes(key), data); + } catch (DBException e) { + throw new IOException(e); + } + } + + Version getCurrentVersion() { + return CURRENT_VERSION_INFO; + } + + /** + * 1) Versioning timeline store: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, + * 2.0 etc. 2) Any incompatible change of TS-store is a major upgrade, and any + * compatible change of TS-store is a minor upgrade. 3) Within a minor + * upgrade, say 1.1 to 1.2: overwrite the version info and proceed as normal. + * 4) Within a major upgrade, say 1.2 to 2.0: throw exception and indicate + * user to use a separate upgrade tool to upgrade timeline store or remove + * incompatible old state. + */ + private void checkVersion() throws IOException { + Version loadedVersion = loadVersion(); + LOG.info("Loaded timeline store version info " + loadedVersion); + if (loadedVersion.equals(getCurrentVersion())) { + return; + } + if (loadedVersion.isCompatibleTo(getCurrentVersion())) { + LOG.info("Storing timeline store version info " + getCurrentVersion()); + dbStoreVersion(CURRENT_VERSION_INFO); + } else { + String incompatibleMessage = "Incompatible version for timeline store: " + + "expecting version " + getCurrentVersion() + + ", but loading version " + loadedVersion; + LOG.fatal(incompatibleMessage); + throw new IOException(incompatibleMessage); + } + } + + // TODO: make data retention work with the domain data as well + @Override + public void put(TimelineDomain domain) throws IOException { + WriteBatch domainWriteBatch = null; + WriteBatch ownerWriteBatch = null; + try { + domainWriteBatch = domaindb.createWriteBatch(); + ownerWriteBatch = ownerdb.createWriteBatch(); + if (domain.getId() == null || domain.getId().length() == 0) { + throw new IllegalArgumentException("Domain doesn't have an ID"); + } + if (domain.getOwner() == null || domain.getOwner().length() == 0) { + throw new IllegalArgumentException("Domain doesn't have an owner."); + } + + // Write description + byte[] domainEntryKey = createDomainEntryKey(domain.getId(), + DESCRIPTION_COLUMN); + byte[] ownerLookupEntryKey = createOwnerLookupKey(domain.getOwner(), + domain.getId(), DESCRIPTION_COLUMN); + if (domain.getDescription() != null) { + domainWriteBatch.put(domainEntryKey, + domain.getDescription().getBytes(UTF_8)); + ownerWriteBatch.put(ownerLookupEntryKey, domain.getDescription() + .getBytes(UTF_8)); + } else { + domainWriteBatch.put(domainEntryKey, EMPTY_BYTES); + ownerWriteBatch.put(ownerLookupEntryKey, EMPTY_BYTES); + } + + // Write owner + domainEntryKey = createDomainEntryKey(domain.getId(), OWNER_COLUMN); + ownerLookupEntryKey = createOwnerLookupKey(domain.getOwner(), + domain.getId(), OWNER_COLUMN); + // Null check for owner is done before + domainWriteBatch.put(domainEntryKey, domain.getOwner().getBytes(UTF_8)); + ownerWriteBatch.put(ownerLookupEntryKey, domain.getOwner() + .getBytes(UTF_8)); + + // Write readers + domainEntryKey = createDomainEntryKey(domain.getId(), READER_COLUMN); + ownerLookupEntryKey = createOwnerLookupKey(domain.getOwner(), + domain.getId(), READER_COLUMN); + if (domain.getReaders() != null && domain.getReaders().length() > 0) { + domainWriteBatch.put(domainEntryKey, domain.getReaders() + .getBytes(UTF_8)); + ownerWriteBatch.put(ownerLookupEntryKey, + domain.getReaders().getBytes(UTF_8)); + } else { + domainWriteBatch.put(domainEntryKey, EMPTY_BYTES); + ownerWriteBatch.put(ownerLookupEntryKey, EMPTY_BYTES); + } + + // Write writers + domainEntryKey = createDomainEntryKey(domain.getId(), WRITER_COLUMN); + ownerLookupEntryKey = createOwnerLookupKey(domain.getOwner(), + domain.getId(), WRITER_COLUMN); + if (domain.getWriters() != null && domain.getWriters().length() > 0) { + domainWriteBatch.put(domainEntryKey, domain.getWriters() + .getBytes(UTF_8)); + ownerWriteBatch.put(ownerLookupEntryKey, + domain.getWriters().getBytes(UTF_8)); + } else { + domainWriteBatch.put(domainEntryKey, EMPTY_BYTES); + ownerWriteBatch.put(ownerLookupEntryKey, EMPTY_BYTES); + } + + // Write creation time and modification time + // We put both timestamps together because they are always retrieved + // together, and store them in the same way as we did for the entity's + // start time and insert time. + domainEntryKey = createDomainEntryKey(domain.getId(), TIMESTAMP_COLUMN); + ownerLookupEntryKey = createOwnerLookupKey(domain.getOwner(), + domain.getId(), TIMESTAMP_COLUMN); + long currentTimestamp = System.currentTimeMillis(); + byte[] timestamps = domaindb.get(domainEntryKey); + if (timestamps == null) { + timestamps = new byte[16]; + writeReverseOrderedLong(currentTimestamp, timestamps, 0); + writeReverseOrderedLong(currentTimestamp, timestamps, 8); + } else { + writeReverseOrderedLong(currentTimestamp, timestamps, 8); + } + domainWriteBatch.put(domainEntryKey, timestamps); + ownerWriteBatch.put(ownerLookupEntryKey, timestamps); + domaindb.write(domainWriteBatch); + ownerdb.write(ownerWriteBatch); + } finally { + IOUtils.cleanup(LOG, domainWriteBatch); + IOUtils.cleanup(LOG, ownerWriteBatch); + } + } + + /** + * Creates a domain entity key with column name suffix, of the form + * DOMAIN_ENTRY_PREFIX + domain id + column name. + */ + private static byte[] createDomainEntryKey(String domainId, byte[] columnName) + throws IOException { + return KeyBuilder.newInstance().add(domainId).add(columnName).getBytes(); + } + + /** + * Creates an owner lookup key with column name suffix, of the form + * OWNER_LOOKUP_PREFIX + owner + domain id + column name. + */ + private static byte[] createOwnerLookupKey(String owner, String domainId, + byte[] columnName) throws IOException { + return KeyBuilder.newInstance().add(owner).add(domainId).add(columnName) + .getBytes(); + } + + @Override + public TimelineDomain getDomain(String domainId) throws IOException { + DBIterator iterator = null; + try { + byte[] prefix = KeyBuilder.newInstance().add(domainId) + .getBytesForLookup(); + iterator = domaindb.iterator(); + iterator.seek(prefix); + return getTimelineDomain(iterator, domainId, prefix); + } finally { + IOUtils.cleanup(LOG, iterator); + } + } + + @Override + public TimelineDomains getDomains(String owner) throws IOException { + DBIterator iterator = null; + try { + byte[] prefix = KeyBuilder.newInstance().add(owner).getBytesForLookup(); + List<TimelineDomain> domains = new ArrayList<TimelineDomain>(); + for (iterator = ownerdb.iterator(), iterator.seek(prefix); iterator + .hasNext();) { + byte[] key = iterator.peekNext().getKey(); + if (!prefixMatches(prefix, prefix.length, key)) { + break; + } + // Iterator to parse the rows of an individual domain + KeyParser kp = new KeyParser(key, prefix.length); + String domainId = kp.getNextString(); + byte[] prefixExt = KeyBuilder.newInstance().add(owner).add(domainId) + .getBytesForLookup(); + TimelineDomain domainToReturn = getTimelineDomain(iterator, domainId, + prefixExt); + if (domainToReturn != null) { + domains.add(domainToReturn); + } + } + // Sort the domains to return + Collections.sort(domains, new Comparator<TimelineDomain>() { + @Override + public int compare(TimelineDomain domain1, TimelineDomain domain2) { + int result = domain2.getCreatedTime().compareTo( + domain1.getCreatedTime()); + if (result == 0) { + return domain2.getModifiedTime().compareTo( + domain1.getModifiedTime()); + } else { + return result; + } + } + }); + TimelineDomains domainsToReturn = new TimelineDomains(); + domainsToReturn.addDomains(domains); + return domainsToReturn; + } finally { + IOUtils.cleanup(LOG, iterator); + } + } + + private static TimelineDomain getTimelineDomain(DBIterator iterator, + String domainId, byte[] prefix) throws IOException { + // Iterate over all the rows whose key starts with prefix to retrieve the + // domain information. + TimelineDomain domain = new TimelineDomain(); + domain.setId(domainId); + boolean noRows = true; + for (; iterator.hasNext(); iterator.next()) { + byte[] key = iterator.peekNext().getKey(); + if (!prefixMatches(prefix, prefix.length, key)) { + break; + } + if (noRows) { + noRows = false; + } + byte[] value = iterator.peekNext().getValue(); + if (value != null && value.length > 0) { + if (key[prefix.length] == DESCRIPTION_COLUMN[0]) { + domain.setDescription(new String(value, UTF_8)); + } else if (key[prefix.length] == OWNER_COLUMN[0]) { + domain.setOwner(new String(value, UTF_8)); + } else if (key[prefix.length] == READER_COLUMN[0]) { + domain.setReaders(new String(value, UTF_8)); + } else if (key[prefix.length] == WRITER_COLUMN[0]) { + domain.setWriters(new String(value, UTF_8)); + } else if (key[prefix.length] == TIMESTAMP_COLUMN[0]) { + domain.setCreatedTime(readReverseOrderedLong(value, 0)); + domain.setModifiedTime(readReverseOrderedLong(value, 8)); + } else { + LOG.error("Unrecognized domain column: " + key[prefix.length]); + } + } + } + if (noRows) { + return null; + } else { + return domain; + } + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/daf3e4ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java index 8c6b83a..86aae77 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java @@ -18,8 +18,6 @@ package org.apache.hadoop.yarn.server.timeline; -import static org.apache.hadoop.yarn.util.StringHelper.CSV_JOINER; - import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -43,7 +41,6 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field; import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager; -import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import com.google.common.annotations.VisibleForTesting; @@ -51,7 +48,7 @@ import com.google.common.annotations.VisibleForTesting; * The class wrap over the timeline store and the ACLs manager. It does some non * trivial manipulation of the timeline data before putting or after getting it * from the timeline store, and checks the user's access to it. - * + * */ public class TimelineDataManager extends AbstractService { @@ -119,7 +116,7 @@ public class TimelineDataManager extends AbstractService { * Get the timeline entities that the given user have access to. The meaning * of each argument has been documented with * {@link TimelineReader#getEntities}. - * + * * @see TimelineReader#getEntities */ public TimelineEntities getEntities( @@ -156,7 +153,7 @@ public class TimelineDataManager extends AbstractService { * Get the single timeline entity that the given user has access to. The * meaning of each argument has been documented with * {@link TimelineReader#getEntity}. - * + * * @see TimelineReader#getEntity */ public TimelineEntity getEntity( @@ -182,7 +179,7 @@ public class TimelineDataManager extends AbstractService { * Get the events whose entities the given user has access to. The meaning of * each argument has been documented with * {@link TimelineReader#getEntityTimelines}. - * + * * @see TimelineReader#getEntityTimelines */ public TimelineEvents getEvents( @@ -218,7 +215,7 @@ public class TimelineDataManager extends AbstractService { eventsItr.remove(); } } catch (Exception e) { - LOG.error("Error when verifying access for user " + callerUGI + LOG.warn("Error when verifying access for user " + callerUGI + " on the events of the timeline entity " + new EntityIdentifier(eventsOfOneEntity.getEntityId(), eventsOfOneEntity.getEntityType()), e); @@ -242,13 +239,10 @@ public class TimelineDataManager extends AbstractService { if (entities == null) { return new TimelinePutResponse(); } - List<EntityIdentifier> entityIDs = new ArrayList<EntityIdentifier>(); TimelineEntities entitiesToPut = new TimelineEntities(); List<TimelinePutResponse.TimelinePutError> errors = new ArrayList<TimelinePutResponse.TimelinePutError>(); for (TimelineEntity entity : entities.getEntities()) { - EntityIdentifier entityID = - new EntityIdentifier(entity.getEntityId(), entity.getEntityType()); // if the domain id is not specified, the entity will be put into // the default domain @@ -261,44 +255,42 @@ public class TimelineDataManager extends AbstractService { TimelineEntity existingEntity = null; try { existingEntity = - store.getEntity(entityID.getId(), entityID.getType(), + store.getEntity(entity.getEntityId(), entity.getEntityType(), EnumSet.of(Field.PRIMARY_FILTERS)); if (existingEntity != null) { addDefaultDomainIdIfAbsent(existingEntity); if (!existingEntity.getDomainId().equals(entity.getDomainId())) { throw new YarnException("The domain of the timeline entity " - + entityID + " is not allowed to be changed."); + + "{ id: " + entity.getEntityId() + ", type: " + + entity.getEntityType() + " } is not allowed to be changed from " + + existingEntity.getDomainId() + " to " + entity.getDomainId()); } } if (!timelineACLsManager.checkAccess( callerUGI, ApplicationAccessType.MODIFY_APP, entity)) { throw new YarnException(callerUGI - + " is not allowed to put the timeline entity " + entityID - + " into the domain " + entity.getDomainId() + "."); + + " is not allowed to put the timeline entity " + + "{ id: " + entity.getEntityId() + ", type: " + + entity.getEntityType() + " } into the domain " + + entity.getDomainId() + "."); } } catch (Exception e) { // Skip the entity which already exists and was put by others - LOG.error("Skip the timeline entity: " + entityID, e); + LOG.warn("Skip the timeline entity: { id: " + entity.getEntityId() + + ", type: "+ entity.getEntityType() + " }", e); TimelinePutResponse.TimelinePutError error = new TimelinePutResponse.TimelinePutError(); - error.setEntityId(entityID.getId()); - error.setEntityType(entityID.getType()); + error.setEntityId(entity.getEntityId()); + error.setEntityType(entity.getEntityType()); error.setErrorCode( TimelinePutResponse.TimelinePutError.ACCESS_DENIED); errors.add(error); continue; } - entityIDs.add(entityID); entitiesToPut.addEntity(entity); - if (LOG.isDebugEnabled()) { - LOG.debug("Storing the entity " + entityID + ", JSON-style content: " - + TimelineUtils.dumpTimelineRecordtoJSON(entity)); - } - } - if (LOG.isDebugEnabled()) { - LOG.debug("Storing entities: " + CSV_JOINER.join(entityIDs)); } + TimelinePutResponse response = store.put(entitiesToPut); // add the errors of timeline system filter key conflict response.addErrors(errors); http://git-wip-us.apache.org/repos/asf/hadoop/blob/daf3e4ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/util/LeveldbUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/util/LeveldbUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/util/LeveldbUtils.java index fc6cc7d..5638581 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/util/LeveldbUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/util/LeveldbUtils.java @@ -21,15 +21,16 @@ package org.apache.hadoop.yarn.server.timeline.util; import org.apache.hadoop.io.WritableComparator; -import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.nio.charset.Charset; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.readReverseOrderedLong; public class LeveldbUtils { + /** A string builder utility for building timeline server leveldb keys. */ public static class KeyBuilder { + /** Maximum subkeys that can be added to construct a key. */ private static final int MAX_NUMBER_OF_KEY_ELEMENTS = 10; private byte[][] b; private boolean[] useSeparator; @@ -47,8 +48,15 @@ public class LeveldbUtils { return new KeyBuilder(MAX_NUMBER_OF_KEY_ELEMENTS); } + /** Instantiate a new key build with the given maximum subkes. + * @param size maximum subkeys that can be added to this key builder + * @return a newly constructed key builder */ + public static KeyBuilder newInstance(final int size) { + return new KeyBuilder(size); + } + public KeyBuilder add(String s) { - return add(s.getBytes(Charset.forName("UTF-8")), true); + return add(s.getBytes(UTF_8), true); } public KeyBuilder add(byte[] t) { @@ -66,26 +74,37 @@ public class LeveldbUtils { return this; } - public byte[] getBytes() throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(length); + /** Builds a byte array without the final string delimiter. */ + public byte[] getBytes() { + // check the last valid entry to see the final length + int bytesLength = length; + if (useSeparator[index - 1]) { + bytesLength = length - 1; + } + byte[] bytes = new byte[bytesLength]; + int curPos = 0; for (int i = 0; i < index; i++) { - baos.write(b[i]); + System.arraycopy(b[i], 0, bytes, curPos, b[i].length); + curPos += b[i].length; if (i < index - 1 && useSeparator[i]) { - baos.write(0x0); + bytes[curPos++] = 0x0; } } - return baos.toByteArray(); + return bytes; } - public byte[] getBytesForLookup() throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(length); + /** Builds a byte array including the final string delimiter. */ + public byte[] getBytesForLookup() { + byte[] bytes = new byte[length]; + int curPos = 0; for (int i = 0; i < index; i++) { - baos.write(b[i]); + System.arraycopy(b[i], 0, bytes, curPos, b[i].length); + curPos += b[i].length; if (useSeparator[i]) { - baos.write(0x0); + bytes[curPos++] = 0x0; } } - return baos.toByteArray(); + return bytes; } } @@ -93,11 +112,12 @@ public class LeveldbUtils { private final byte[] b; private int offset; - public KeyParser(byte[] b, int offset) { + public KeyParser(final byte[] b, final int offset) { this.b = b; this.offset = offset; } + /** Returns a string from the offset until the next string delimiter. */ public String getNextString() throws IOException { if (offset >= b.length) { throw new IOException( @@ -107,23 +127,42 @@ public class LeveldbUtils { while (offset + i < b.length && b[offset + i] != 0x0) { i++; } - String s = new String(b, offset, i, Charset.forName("UTF-8")); + String s = new String(b, offset, i, UTF_8); offset = offset + i + 1; return s; } + /** Moves current position until after the next end of string marker. */ + public void skipNextString() throws IOException { + if (offset >= b.length) { + throw new IOException("tried to read nonexistent string from byte array"); + } + while (offset < b.length && b[offset] != 0x0) { + ++offset; + } + ++offset; + } + + /** Read the next 8 bytes in the byte buffer as a long. */ public long getNextLong() throws IOException { if (offset + 8 >= b.length) { throw new IOException("byte array ran out when trying to read long"); } - long l = readReverseOrderedLong(b, offset); + long value = readReverseOrderedLong(b, offset); offset += 8; - return l; + return value; } public int getOffset() { return offset; } + + /** Returns a copy of the remaining bytes. */ + public byte[] getRemainingBytes() { + byte[] bytes = new byte[b.length - offset]; + System.arraycopy(b, offset, bytes, 0, b.length - offset); + return bytes; + } } /**
