http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/BrooklynMementoPersisterToObjectStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/BrooklynMementoPersisterToObjectStore.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/BrooklynMementoPersisterToObjectStore.java new file mode 100644 index 0000000..10f349a --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/BrooklynMementoPersisterToObjectStore.java @@ -0,0 +1,697 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.mgmt.persist; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import javax.annotation.Nullable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.brooklyn.api.catalog.CatalogItem; +import org.apache.brooklyn.api.mgmt.rebind.PersistenceExceptionHandler; +import org.apache.brooklyn.api.mgmt.rebind.RebindExceptionHandler; +import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMemento; +import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoManifest; +import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoPersister; +import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoRawData; +import org.apache.brooklyn.api.mgmt.rebind.mementos.CatalogItemMemento; +import org.apache.brooklyn.api.mgmt.rebind.mementos.Memento; +import org.apache.brooklyn.api.objs.BrooklynObject; +import org.apache.brooklyn.api.objs.BrooklynObjectType; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.config.StringConfigMap; +import org.apache.brooklyn.core.catalog.internal.CatalogUtils; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.mgmt.classloading.ClassLoaderFromBrooklynClassLoadingContext; +import org.apache.brooklyn.core.mgmt.persist.PersistenceObjectStore.StoreObjectAccessor; +import org.apache.brooklyn.core.mgmt.persist.PersistenceObjectStore.StoreObjectAccessorWithLock; +import org.apache.brooklyn.core.mgmt.rebind.PeriodicDeltaChangeListener; +import org.apache.brooklyn.core.mgmt.rebind.PersisterDeltaImpl; +import org.apache.brooklyn.core.mgmt.rebind.dto.BrooklynMementoImpl; +import org.apache.brooklyn.core.mgmt.rebind.dto.BrooklynMementoManifestImpl; +import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.util.core.xstream.XmlUtil; +import org.apache.brooklyn.util.exceptions.CompoundRuntimeException; +import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.text.Strings; +import org.apache.brooklyn.util.time.Duration; +import org.apache.brooklyn.util.time.Time; + +import com.google.common.annotations.Beta; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; +import com.google.common.base.Stopwatch; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; + +/** Implementation of the {@link BrooklynMementoPersister} backed by a pluggable + * {@link PersistenceObjectStore} such as a file system or a jclouds object store */ +public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPersister { + + // TODO Crazy amount of duplication between handling entity, location, policy, enricher + feed; + // Need to remove that duplication. + + // TODO Should stop() take a timeout, and shutdown the executor gracefully? + + private static final Logger LOG = LoggerFactory.getLogger(BrooklynMementoPersisterToObjectStore.class); + + public static final ConfigKey<Integer> PERSISTER_MAX_THREAD_POOL_SIZE = ConfigKeys.newIntegerConfigKey( + "persister.threadpool.maxSize", + "Maximum number of concurrent operations for persistence (reads/writes/deletes of *different* objects)", + 10); + + public static final ConfigKey<Integer> PERSISTER_MAX_SERIALIZATION_ATTEMPTS = ConfigKeys.newIntegerConfigKey( + "persister.maxSerializationAttempts", + "Maximum number of attempts to serialize a memento (e.g. if first attempts fail because of concurrent modifications of an entity)", + 5); + + private final PersistenceObjectStore objectStore; + private final MementoSerializer<Object> serializerWithStandardClassLoader; + + private final Map<String, StoreObjectAccessorWithLock> writers = new LinkedHashMap<String, PersistenceObjectStore.StoreObjectAccessorWithLock>(); + + private final ListeningExecutorService executor; + + private volatile boolean writesAllowed = false; + private volatile boolean writesShuttingDown = false; + private StringConfigMap brooklynProperties; + + private List<Delta> queuedDeltas = new CopyOnWriteArrayList<BrooklynMementoPersister.Delta>(); + + /** + * Lock used on writes (checkpoint + delta) so that {@link #waitForWritesCompleted(Duration)} can block + * for any concurrent call to complete. + */ + private final ReadWriteLock lock = new ReentrantReadWriteLock(true); + + public BrooklynMementoPersisterToObjectStore(PersistenceObjectStore objectStore, StringConfigMap brooklynProperties, ClassLoader classLoader) { + this.objectStore = checkNotNull(objectStore, "objectStore"); + this.brooklynProperties = brooklynProperties; + + int maxSerializationAttempts = brooklynProperties.getConfig(PERSISTER_MAX_SERIALIZATION_ATTEMPTS); + MementoSerializer<Object> rawSerializer = new XmlMementoSerializer<Object>(classLoader); + this.serializerWithStandardClassLoader = new RetryingMementoSerializer<Object>(rawSerializer, maxSerializationAttempts); + + int maxThreadPoolSize = brooklynProperties.getConfig(PERSISTER_MAX_THREAD_POOL_SIZE); + + objectStore.createSubPath("entities"); + objectStore.createSubPath("locations"); + objectStore.createSubPath("policies"); + objectStore.createSubPath("enrichers"); + objectStore.createSubPath("feeds"); + objectStore.createSubPath("catalog"); + + // FIXME does it belong here or to ManagementPlaneSyncRecordPersisterToObjectStore ? + objectStore.createSubPath("plane"); + + executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(maxThreadPoolSize, new ThreadFactory() { + @Override public Thread newThread(Runnable r) { + // Note: Thread name referenced in logback-includes' ThreadNameDiscriminator + return new Thread(r, "brooklyn-persister"); + }})); + } + + public MementoSerializer<Object> getMementoSerializer() { + return getSerializerWithStandardClassLoader(); + } + + protected MementoSerializer<Object> getSerializerWithStandardClassLoader() { + return serializerWithStandardClassLoader; + } + + protected MementoSerializer<Object> getSerializerWithCustomClassLoader(LookupContext lookupContext, BrooklynObjectType type, String objectId) { + ClassLoader cl = getCustomClassLoaderForBrooklynObject(lookupContext, type, objectId); + if (cl==null) return serializerWithStandardClassLoader; + return getSerializerWithCustomClassLoader(lookupContext, cl); + } + + protected MementoSerializer<Object> getSerializerWithCustomClassLoader(LookupContext lookupContext, ClassLoader classLoader) { + int maxSerializationAttempts = brooklynProperties.getConfig(PERSISTER_MAX_SERIALIZATION_ATTEMPTS); + MementoSerializer<Object> rawSerializer = new XmlMementoSerializer<Object>(classLoader); + MementoSerializer<Object> result = new RetryingMementoSerializer<Object>(rawSerializer, maxSerializationAttempts); + result.setLookupContext(lookupContext); + return result; + } + + @Nullable protected ClassLoader getCustomClassLoaderForBrooklynObject(LookupContext lookupContext, BrooklynObjectType type, String objectId) { + BrooklynObject item = lookupContext.peek(type, objectId); + String catalogItemId = (item == null) ? null : item.getCatalogItemId(); + // TODO enrichers etc aren't yet known -- would need to backtrack to the entity to get them from bundles + if (catalogItemId == null) { + return null; + } + // See RebindIteration.BrooklynObjectInstantiator.load(), for handling where catalog item is missing; + // similar logic here. + CatalogItem<?, ?> catalogItem = CatalogUtils.getCatalogItemOptionalVersion(lookupContext.lookupManagementContext(), catalogItemId); + if (catalogItem == null) { + // TODO do we need to only log once, rather than risk log.warn too often? I think this only happens on rebind, so ok. + LOG.warn("Unable to load catalog item "+catalogItemId+" for custom class loader of "+type+" "+objectId+"; will use default class loader"); + return null; + } else { + return ClassLoaderFromBrooklynClassLoadingContext.of(CatalogUtils.newClassLoadingContext(lookupContext.lookupManagementContext(), catalogItem)); + } + } + + @Override public void enableWriteAccess() { + writesAllowed = true; + } + + @Override + public void disableWriteAccess(boolean graceful) { + writesShuttingDown = true; + try { + writesAllowed = false; + // a very long timeout to ensure we don't lose state. + // If persisting thousands of entities over slow network to Object Store, could take minutes. + waitForWritesCompleted(Duration.ONE_HOUR); + + } catch (Exception e) { + throw Exceptions.propagate(e); + } finally { + writesShuttingDown = false; + } + } + + @Override + public void stop(boolean graceful) { + disableWriteAccess(graceful); + + if (executor != null) { + if (graceful) { + executor.shutdown(); + try { + // should be quick because we've just turned off writes, waiting for their completion + executor.awaitTermination(1, TimeUnit.MINUTES); + } catch (InterruptedException e) { + throw Exceptions.propagate(e); + } + } else { + executor.shutdownNow(); + } + } + } + + public PersistenceObjectStore getObjectStore() { + return objectStore; + } + + protected StoreObjectAccessorWithLock getWriter(String path) { + String id = path.substring(path.lastIndexOf('/')+1); + synchronized (writers) { + StoreObjectAccessorWithLock writer = writers.get(id); + if (writer == null) { + writer = new StoreObjectAccessorLocking( objectStore.newAccessor(path) ); + writers.put(id, writer); + } + return writer; + } + } + + private Map<String,String> makeIdSubPathMap(Iterable<String> subPathLists) { + Map<String,String> result = MutableMap.of(); + for (String subpath: subPathLists) { + String id = subpath; + id = id.substring(id.lastIndexOf('/')+1); + id = id.substring(id.lastIndexOf('\\')+1); + // assumes id is the filename; should work even if not, as id is later read from xpath + // but you'll get warnings (and possibility of loss if there is a collision) + result.put(id, subpath); + } + return result; + } + + protected BrooklynMementoRawData listMementoSubPathsAsData(final RebindExceptionHandler exceptionHandler) { + final BrooklynMementoRawData.Builder subPathDataBuilder = BrooklynMementoRawData.builder(); + + Stopwatch stopwatch = Stopwatch.createStarted(); + try { + for (BrooklynObjectType type: BrooklynPersistenceUtils.STANDARD_BROOKLYN_OBJECT_TYPE_PERSISTENCE_ORDER) + subPathDataBuilder.putAll(type, makeIdSubPathMap(objectStore.listContentsWithSubPath(type.getSubPathName()))); + + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + exceptionHandler.onLoadMementoFailed(BrooklynObjectType.UNKNOWN, "Failed to list files", e); + throw new IllegalStateException("Failed to list memento files in "+objectStore, e); + } + + BrooklynMementoRawData subPathData = subPathDataBuilder.build(); + LOG.debug("Loaded rebind lists; took {}: {} entities, {} locations, {} policies, {} enrichers, {} feeds, {} catalog items; from {}", new Object[]{ + Time.makeTimeStringRounded(stopwatch), + subPathData.getEntities().size(), subPathData.getLocations().size(), subPathData.getPolicies().size(), subPathData.getEnrichers().size(), + subPathData.getFeeds().size(), subPathData.getCatalogItems().size(), + objectStore.getSummaryName() }); + + return subPathData; + } + + public BrooklynMementoRawData loadMementoRawData(final RebindExceptionHandler exceptionHandler) { + BrooklynMementoRawData subPathData = listMementoSubPathsAsData(exceptionHandler); + + final BrooklynMementoRawData.Builder builder = BrooklynMementoRawData.builder(); + + Visitor loaderVisitor = new Visitor() { + @Override + public void visit(BrooklynObjectType type, String id, String contentsSubpath) throws Exception { + String contents = null; + try { + contents = read(contentsSubpath); + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + exceptionHandler.onLoadMementoFailed(type, "memento "+id+" read error", e); + } + + String xmlId = (String) XmlUtil.xpath(contents, "/"+type.toCamelCase()+"/id"); + String safeXmlId = Strings.makeValidFilename(xmlId); + if (!Objects.equal(id, safeXmlId)) + LOG.warn("ID mismatch on "+type.toCamelCase()+", "+id+" from path, "+safeXmlId+" from xml"); + + builder.put(type, xmlId, contents); + } + }; + + Stopwatch stopwatch = Stopwatch.createStarted(); + + visitMemento("loading raw", subPathData, loaderVisitor, exceptionHandler); + + BrooklynMementoRawData result = builder.build(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Loaded rebind raw data; took {}; {} entities, {} locations, {} policies, {} enrichers, {} feeds, {} catalog items, from {}", new Object[]{ + Time.makeTimeStringRounded(stopwatch.elapsed(TimeUnit.MILLISECONDS)), result.getEntities().size(), + result.getLocations().size(), result.getPolicies().size(), result.getEnrichers().size(), + result.getFeeds().size(), result.getCatalogItems().size(), + objectStore.getSummaryName() }); + } + + return result; + } + + @Override + public BrooklynMementoManifest loadMementoManifest(BrooklynMementoRawData mementoData, final RebindExceptionHandler exceptionHandler) throws IOException { + if (mementoData==null) + mementoData = loadMementoRawData(exceptionHandler); + + final BrooklynMementoManifestImpl.Builder builder = BrooklynMementoManifestImpl.builder(); + + Visitor visitor = new Visitor() { + @Override + public void visit(BrooklynObjectType type, String objectId, final String contents) throws Exception { + final String prefix = "/"+type.toCamelCase()+"/"; + + class XPathHelper { + private String get(String innerPath) { + return (String) XmlUtil.xpath(contents, prefix+innerPath); + } + } + XPathHelper x = new XPathHelper(); + + switch (type) { + case ENTITY: + builder.entity(x.get("id"), x.get("type"), + Strings.emptyToNull(x.get("parent")), Strings.emptyToNull(x.get("catalogItemId"))); + break; + case LOCATION: + case POLICY: + case ENRICHER: + case FEED: + builder.putType(type, x.get("id"), x.get("type")); + break; + case CATALOG_ITEM: + try { + CatalogItemMemento memento = (CatalogItemMemento) getSerializerWithStandardClassLoader().fromString(contents); + if (memento == null) { + LOG.warn("No "+type.toCamelCase()+"-memento deserialized from " + objectId + "; ignoring and continuing"); + } else { + builder.catalogItem(memento); + } + } catch (Exception e) { + exceptionHandler.onLoadMementoFailed(type, "memento "+objectId+" early catalog deserialization error", e); + } + break; + default: + throw new IllegalStateException("Unexpected brooklyn type: "+type); + } + } + }; + + Stopwatch stopwatch = Stopwatch.createStarted(); + + visitMemento("manifests", mementoData, visitor, exceptionHandler); + + BrooklynMementoManifest result = builder.build(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Loaded rebind manifests; took {}: {} entities, {} locations, {} policies, {} enrichers, {} feeds, {} catalog items; from {}", new Object[]{ + Time.makeTimeStringRounded(stopwatch), + result.getEntityIdToManifest().size(), result.getLocationIdToType().size(), + result.getPolicyIdToType().size(), result.getEnricherIdToType().size(), result.getFeedIdToType().size(), + result.getCatalogItemMementos().size(), + objectStore.getSummaryName() }); + } + + return result; + } + + @Override + public BrooklynMemento loadMemento(BrooklynMementoRawData mementoData, final LookupContext lookupContext, final RebindExceptionHandler exceptionHandler) throws IOException { + if (mementoData==null) + mementoData = loadMementoRawData(exceptionHandler); + + Stopwatch stopwatch = Stopwatch.createStarted(); + + final BrooklynMementoImpl.Builder builder = BrooklynMementoImpl.builder(); + + Visitor visitor = new Visitor() { + @Override + public void visit(BrooklynObjectType type, String objectId, String contents) throws Exception { + try { + Memento memento = (Memento) getSerializerWithCustomClassLoader(lookupContext, type, objectId).fromString(contents); + if (memento == null) { + LOG.warn("No "+type.toCamelCase()+"-memento deserialized from " + objectId + "; ignoring and continuing"); + } else { + builder.memento(memento); + } + } catch (Exception e) { + exceptionHandler.onLoadMementoFailed(type, "memento "+objectId+" deserialization error", e); + } + } + + }; + + // TODO not convinced this is single threaded on reads; maybe should get a new one each time? + getSerializerWithStandardClassLoader().setLookupContext(lookupContext); + try { + visitMemento("deserialization", mementoData, visitor, exceptionHandler); + } finally { + getSerializerWithStandardClassLoader().unsetLookupContext(); + } + + BrooklynMemento result = builder.build(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Loaded rebind mementos; took {}: {} entities, {} locations, {} policies, {} enrichers, {} feeds, {} catalog items, from {}", new Object[]{ + Time.makeTimeStringRounded(stopwatch.elapsed(TimeUnit.MILLISECONDS)), result.getEntityIds().size(), + result.getLocationIds().size(), result.getPolicyIds().size(), result.getEnricherIds().size(), + result.getFeedIds().size(), result.getCatalogItemIds().size(), + objectStore.getSummaryName() }); + } + + return result; + } + + protected interface Visitor { + public void visit(BrooklynObjectType type, String id, String contents) throws Exception; + } + + protected void visitMemento(final String phase, final BrooklynMementoRawData rawData, final Visitor visitor, final RebindExceptionHandler exceptionHandler) { + List<ListenableFuture<?>> futures = Lists.newArrayList(); + + class VisitorWrapper implements Runnable { + private final BrooklynObjectType type; + private final Map.Entry<String,String> objectIdAndData; + public VisitorWrapper(BrooklynObjectType type, Map.Entry<String,String> objectIdAndData) { + this.type = type; + this.objectIdAndData = objectIdAndData; + } + public void run() { + try { + visitor.visit(type, objectIdAndData.getKey(), objectIdAndData.getValue()); + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + exceptionHandler.onLoadMementoFailed(type, "memento "+objectIdAndData.getKey()+" "+phase+" error", e); + } + } + } + + for (BrooklynObjectType type: BrooklynPersistenceUtils.STANDARD_BROOKLYN_OBJECT_TYPE_PERSISTENCE_ORDER) { + for (final Map.Entry<String,String> entry : rawData.getObjectsOfType(type).entrySet()) { + futures.add(executor.submit(new VisitorWrapper(type, entry))); + } + } + + try { + // Wait for all, failing fast if any exceptions. + Futures.allAsList(futures).get(); + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + + List<Exception> exceptions = Lists.newArrayList(); + + for (ListenableFuture<?> future : futures) { + if (future.isDone()) { + try { + future.get(); + } catch (InterruptedException e2) { + throw Exceptions.propagate(e2); + } catch (ExecutionException e2) { + LOG.warn("Problem loading memento ("+phase+"): "+e2, e2); + exceptions.add(e2); + } + future.cancel(true); + } + } + if (exceptions.isEmpty()) { + throw Exceptions.propagate(e); + } else { + // Normally there should be at lesat one failure; otherwise all.get() would not have failed. + throw new CompoundRuntimeException("Problem loading mementos ("+phase+")", exceptions); + } + } + } + + protected void checkWritesAllowed() { + if (!writesAllowed && !writesShuttingDown) { + throw new IllegalStateException("Writes not allowed in "+this); + } + } + + /** See {@link BrooklynPersistenceUtils} for conveniences for using this method. */ + @Override + @Beta + public void checkpoint(BrooklynMementoRawData newMemento, PersistenceExceptionHandler exceptionHandler) { + checkWritesAllowed(); + try { + lock.writeLock().lockInterruptibly(); + } catch (InterruptedException e) { + throw Exceptions.propagate(e); + } + + try { + objectStore.prepareForMasterUse(); + + Stopwatch stopwatch = Stopwatch.createStarted(); + List<ListenableFuture<?>> futures = Lists.newArrayList(); + + for (BrooklynObjectType type: BrooklynPersistenceUtils.STANDARD_BROOKLYN_OBJECT_TYPE_PERSISTENCE_ORDER) { + for (Map.Entry<String, String> entry : newMemento.getObjectsOfType(type).entrySet()) { + futures.add(asyncPersist(type.getSubPathName(), type, entry.getKey(), entry.getValue(), exceptionHandler)); + } + } + + try { + // Wait for all the tasks to complete or fail, rather than aborting on the first failure. + // But then propagate failure if any fail. (hence the two calls). + Futures.successfulAsList(futures).get(); + Futures.allAsList(futures).get(); + } catch (Exception e) { + throw Exceptions.propagate(e); + } + if (LOG.isDebugEnabled()) LOG.debug("Checkpointed entire memento in {}", Time.makeTimeStringRounded(stopwatch)); + } finally { + lock.writeLock().unlock(); + } + } + + @Override + public void delta(Delta delta, PersistenceExceptionHandler exceptionHandler) { + checkWritesAllowed(); + + while (!queuedDeltas.isEmpty()) { + Delta extraDelta = queuedDeltas.remove(0); + doDelta(extraDelta, exceptionHandler, true); + } + + doDelta(delta, exceptionHandler, false); + } + + protected void doDelta(Delta delta, PersistenceExceptionHandler exceptionHandler, boolean previouslyQueued) { + Stopwatch stopwatch = deltaImpl(delta, exceptionHandler); + + if (LOG.isDebugEnabled()) LOG.debug("Checkpointed "+(previouslyQueued ? "previously queued " : "")+"delta of memento in {}: " + + "updated {} entities, {} locations, {} policies, {} enrichers, {} catalog items; " + + "removed {} entities, {} locations, {} policies, {} enrichers, {} catalog items", + new Object[] {Time.makeTimeStringRounded(stopwatch), + delta.entities().size(), delta.locations().size(), delta.policies().size(), delta.enrichers().size(), delta.catalogItems().size(), + delta.removedEntityIds().size(), delta.removedLocationIds().size(), delta.removedPolicyIds().size(), delta.removedEnricherIds().size(), delta.removedCatalogItemIds().size()}); + } + + @Override + public void queueDelta(Delta delta) { + queuedDeltas.add(delta); + } + + /** + * Concurrent calls will queue-up (the lock is "fair", which means an "approximately arrival-order policy"). + * Current usage is with the {@link PeriodicDeltaChangeListener} so we expect only one call at a time. + * + * TODO Longer term, if we care more about concurrent calls we could merge the queued deltas so that we + * don't do unnecessary repeated writes of an entity. + */ + private Stopwatch deltaImpl(Delta delta, PersistenceExceptionHandler exceptionHandler) { + try { + lock.writeLock().lockInterruptibly(); + } catch (InterruptedException e) { + throw Exceptions.propagate(e); + } + try { + objectStore.prepareForMasterUse(); + + Stopwatch stopwatch = Stopwatch.createStarted(); + List<ListenableFuture<?>> futures = Lists.newArrayList(); + + for (BrooklynObjectType type: BrooklynPersistenceUtils.STANDARD_BROOKLYN_OBJECT_TYPE_PERSISTENCE_ORDER) { + for (Memento entity : delta.getObjectsOfType(type)) { + futures.add(asyncPersist(type.getSubPathName(), entity, exceptionHandler)); + } + } + for (BrooklynObjectType type: BrooklynPersistenceUtils.STANDARD_BROOKLYN_OBJECT_TYPE_PERSISTENCE_ORDER) { + for (String id : delta.getRemovedIdsOfType(type)) { + futures.add(asyncDelete(type.getSubPathName(), id, exceptionHandler)); + } + } + + try { + // Wait for all the tasks to complete or fail, rather than aborting on the first failure. + // But then propagate failure if any fail. (hence the two calls). + Futures.successfulAsList(futures).get(); + Futures.allAsList(futures).get(); + } catch (Exception e) { + throw Exceptions.propagate(e); + } + + return stopwatch; + } finally { + lock.writeLock().unlock(); + } + } + + @Override + public void waitForWritesCompleted(Duration timeout) throws InterruptedException, TimeoutException { + boolean locked = lock.readLock().tryLock(timeout.toMillisecondsRoundingUp(), TimeUnit.MILLISECONDS); + if (locked) { + ImmutableSet<StoreObjectAccessorWithLock> wc; + synchronized (writers) { + wc = ImmutableSet.copyOf(writers.values()); + } + lock.readLock().unlock(); + + // Belt-and-braces: the lock above should be enough to ensure no outstanding writes, because + // each writer is now synchronous. + for (StoreObjectAccessorWithLock writer : wc) { + writer.waitForCurrentWrites(timeout); + } + } else { + throw new TimeoutException("Timeout waiting for writes to "+objectStore); + } + } + + private String read(String subPath) { + StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath); + return objectAccessor.get(); + } + + private void persist(String subPath, Memento memento, PersistenceExceptionHandler exceptionHandler) { + try { + getWriter(getPath(subPath, memento.getId())).put(getSerializerWithStandardClassLoader().toString(memento)); + } catch (Exception e) { + exceptionHandler.onPersistMementoFailed(memento, e); + } + } + + private void persist(String subPath, BrooklynObjectType type, String id, String content, PersistenceExceptionHandler exceptionHandler) { + try { + if (content==null) { + LOG.warn("Null content for "+type+" "+id); + } + getWriter(getPath(subPath, id)).put(content); + } catch (Exception e) { + exceptionHandler.onPersistRawMementoFailed(type, id, e); + } + } + + private void delete(String subPath, String id, PersistenceExceptionHandler exceptionHandler) { + try { + StoreObjectAccessorWithLock w = getWriter(getPath(subPath, id)); + w.delete(); + synchronized (writers) { + writers.remove(id); + } + } catch (Exception e) { + exceptionHandler.onDeleteMementoFailed(id, e); + } + } + + private ListenableFuture<?> asyncPersist(final String subPath, final Memento memento, final PersistenceExceptionHandler exceptionHandler) { + return executor.submit(new Runnable() { + public void run() { + persist(subPath, memento, exceptionHandler); + }}); + } + + private ListenableFuture<?> asyncPersist(final String subPath, final BrooklynObjectType type, final String id, final String content, final PersistenceExceptionHandler exceptionHandler) { + return executor.submit(new Runnable() { + public void run() { + persist(subPath, type, id, content, exceptionHandler); + }}); + } + + private ListenableFuture<?> asyncDelete(final String subPath, final String id, final PersistenceExceptionHandler exceptionHandler) { + return executor.submit(new Runnable() { + public void run() { + delete(subPath, id, exceptionHandler); + }}); + } + + private String getPath(String subPath, String id) { + return subPath+"/"+Strings.makeValidFilename(id); + } + + @Override + public String getBackingStoreDescription() { + return getObjectStore().getSummaryName(); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/BrooklynPersistenceUtils.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/BrooklynPersistenceUtils.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/BrooklynPersistenceUtils.java new file mode 100644 index 0000000..b3db32c --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/BrooklynPersistenceUtils.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.mgmt.persist; + +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.brooklyn.api.catalog.CatalogItem; +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.location.Location; +import org.apache.brooklyn.api.location.LocationSpec; +import org.apache.brooklyn.api.mgmt.ManagementContext; +import org.apache.brooklyn.api.mgmt.ha.HighAvailabilityMode; +import org.apache.brooklyn.api.mgmt.ha.ManagementNodeState; +import org.apache.brooklyn.api.mgmt.ha.ManagementPlaneSyncRecord; +import org.apache.brooklyn.api.mgmt.ha.MementoCopyMode; +import org.apache.brooklyn.api.mgmt.rebind.PersistenceExceptionHandler; +import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoRawData; +import org.apache.brooklyn.api.mgmt.rebind.mementos.Memento; +import org.apache.brooklyn.api.objs.BrooklynObject; +import org.apache.brooklyn.api.objs.BrooklynObjectType; +import org.apache.brooklyn.api.policy.Policy; +import org.apache.brooklyn.api.sensor.Enricher; +import org.apache.brooklyn.api.sensor.Feed; +import org.apache.brooklyn.core.mgmt.ha.ManagementPlaneSyncRecordPersisterToObjectStore; +import org.apache.brooklyn.core.mgmt.internal.LocalLocationManager; +import org.apache.brooklyn.core.mgmt.internal.ManagementContextInternal; +import org.apache.brooklyn.core.mgmt.rebind.PersistenceExceptionHandlerImpl; +import org.apache.brooklyn.core.mgmt.rebind.transformer.CompoundTransformer; +import org.apache.brooklyn.core.mgmt.rebind.transformer.CompoundTransformerLoader; +import org.apache.brooklyn.core.objs.BrooklynObjectInternal; +import org.apache.brooklyn.core.server.BrooklynServerConfig; +import org.apache.brooklyn.core.server.BrooklynServerPaths; +import org.apache.brooklyn.entity.core.Entities; +import org.apache.brooklyn.entity.core.EntityInternal; +import org.apache.brooklyn.location.localhost.LocalhostMachineProvisioningLocation; +import org.apache.brooklyn.util.core.ResourceUtils; +import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.text.Strings; +import org.apache.brooklyn.util.time.Duration; +import org.apache.brooklyn.util.time.Time; + +import com.google.common.annotations.Beta; +import com.google.common.base.Stopwatch; +import com.google.common.collect.ImmutableList; + +public class BrooklynPersistenceUtils { + + private static final Logger log = LoggerFactory.getLogger(BrooklynPersistenceUtils.class); + + @Beta + public static final List<BrooklynObjectType> STANDARD_BROOKLYN_OBJECT_TYPE_PERSISTENCE_ORDER = ImmutableList.of( + BrooklynObjectType.ENTITY, BrooklynObjectType.LOCATION, BrooklynObjectType.POLICY, + BrooklynObjectType.ENRICHER, BrooklynObjectType.FEED, BrooklynObjectType.CATALOG_ITEM); + + /** Creates a {@link PersistenceObjectStore} for general-purpose use. */ + public static PersistenceObjectStore newPersistenceObjectStore(ManagementContext managementContext, + String locationSpec, String locationContainer) { + + return newPersistenceObjectStore(managementContext, locationSpec, locationContainer, + PersistMode.AUTO, HighAvailabilityMode.STANDBY); + } + + /** Creates a {@link PersistenceObjectStore} for use with a specified set of modes. */ + public static PersistenceObjectStore newPersistenceObjectStore(ManagementContext managementContext, + String locationSpec, String locationContainer, PersistMode persistMode, HighAvailabilityMode highAvailabilityMode) { + PersistenceObjectStore destinationObjectStore; + locationContainer = BrooklynServerPaths.newMainPersistencePathResolver(managementContext).location(locationSpec).dir(locationContainer).resolve(); + + Location location = null; + if (Strings.isBlank(locationSpec)) { + location = managementContext.getLocationManager().createLocation(LocationSpec.create(LocalhostMachineProvisioningLocation.class) + .configure(LocalLocationManager.CREATE_UNMANAGED, true)); + } else { + location = managementContext.getLocationRegistry().resolve(locationSpec, false, null).get(); + if (!(location instanceof LocationWithObjectStore)) { + throw new IllegalArgumentException("Destination location "+location+" does not offer a persistent store"); + } + } + destinationObjectStore = ((LocationWithObjectStore)location).newPersistenceObjectStore(locationContainer); + + destinationObjectStore.injectManagementContext(managementContext); + destinationObjectStore.prepareForSharedUse(persistMode, highAvailabilityMode); + return destinationObjectStore; + } + + public static void writeMemento(ManagementContext managementContext, BrooklynMementoRawData memento, + PersistenceObjectStore destinationObjectStore) { + BrooklynMementoPersisterToObjectStore persister = new BrooklynMementoPersisterToObjectStore( + destinationObjectStore, + ((ManagementContextInternal)managementContext).getBrooklynProperties(), + managementContext.getCatalogClassLoader()); + PersistenceExceptionHandler exceptionHandler = PersistenceExceptionHandlerImpl.builder().build(); + persister.enableWriteAccess(); + persister.checkpoint(memento, exceptionHandler); + } + + public static void writeManagerMemento(ManagementContext managementContext, ManagementPlaneSyncRecord optionalPlaneRecord, + PersistenceObjectStore destinationObjectStore) { + if (optionalPlaneRecord != null) { + ManagementPlaneSyncRecordPersisterToObjectStore managementPersister = new ManagementPlaneSyncRecordPersisterToObjectStore( + managementContext, destinationObjectStore, managementContext.getCatalogClassLoader()); + managementPersister.checkpoint(optionalPlaneRecord); + } + } + + public static CompoundTransformer loadTransformer(ResourceUtils resources, String transformationsFileUrl) { + if (Strings.isBlank(transformationsFileUrl)) { + return CompoundTransformer.NOOP; + } else { + String contents = resources.getResourceAsString(transformationsFileUrl); + return CompoundTransformerLoader.load(contents); + } + } + + public static Memento newObjectMemento(BrooklynObject instance) { + return ((BrooklynObjectInternal)instance).getRebindSupport().getMemento(); + } + + public static BrooklynMementoRawData newStateMemento(ManagementContext mgmt, MementoCopyMode source) { + switch (source) { + case LOCAL: + return newStateMementoFromLocal(mgmt); + case REMOTE: + return mgmt.getRebindManager().retrieveMementoRawData(); + case AUTO: + throw new IllegalStateException("Copy mode AUTO not supported here"); + } + throw new IllegalStateException("Should not come here, unknown mode "+source); + } + + public static ManagementPlaneSyncRecord newManagerMemento(ManagementContext mgmt, MementoCopyMode source) { + switch (source) { + case LOCAL: + return mgmt.getHighAvailabilityManager().getLastManagementPlaneSyncRecord(); + case REMOTE: + return mgmt.getHighAvailabilityManager().loadManagementPlaneSyncRecord(true); + case AUTO: + throw new IllegalStateException("Copy mode AUTO not supported here"); + } + throw new IllegalStateException("Should not come here, unknown mode "+source); + } + + + private static BrooklynMementoRawData newStateMementoFromLocal(ManagementContext mgmt) { + BrooklynMementoRawData.Builder result = BrooklynMementoRawData.builder(); + MementoSerializer<Object> rawSerializer = new XmlMementoSerializer<Object>(mgmt.getClass().getClassLoader()); + RetryingMementoSerializer<Object> serializer = new RetryingMementoSerializer<Object>(rawSerializer, 1); + + for (Location instance: mgmt.getLocationManager().getLocations()) + result.location(instance.getId(), serializer.toString(newObjectMemento(instance))); + for (Entity instance: mgmt.getEntityManager().getEntities()) { + instance = Entities.deproxy(instance); + result.entity(instance.getId(), serializer.toString(newObjectMemento(instance))); + for (Feed instanceAdjunct: ((EntityInternal)instance).feeds().getFeeds()) + result.feed(instanceAdjunct.getId(), serializer.toString(newObjectMemento(instanceAdjunct))); + for (Enricher instanceAdjunct: instance.getEnrichers()) + result.enricher(instanceAdjunct.getId(), serializer.toString(newObjectMemento(instanceAdjunct))); + for (Policy instanceAdjunct: instance.getPolicies()) + result.policy(instanceAdjunct.getId(), serializer.toString(newObjectMemento(instanceAdjunct))); + } + for (CatalogItem<?,?> instance: mgmt.getCatalog().getCatalogItems()) + result.catalogItem(instance.getId(), serializer.toString(newObjectMemento(instance))); + + return result.build(); + } + + /** generates and writes mementos for the given mgmt context to the given targetStore; + * this may be taken from {@link MementoCopyMode#LOCAL} current state + * or {@link MementoCopyMode#REMOTE} persisted state, or the default {@link MementoCopyMode#AUTO} detected + */ + public static void writeMemento(ManagementContext mgmt, PersistenceObjectStore targetStore, MementoCopyMode source) { + if (source==null || source==MementoCopyMode.AUTO) + source = (mgmt.getHighAvailabilityManager().getNodeState()==ManagementNodeState.MASTER ? MementoCopyMode.LOCAL : MementoCopyMode.REMOTE); + + Stopwatch timer = Stopwatch.createStarted(); + + BrooklynMementoRawData dataRecord = newStateMemento(mgmt, source); + ManagementPlaneSyncRecord mgmtRecord = newManagerMemento(mgmt, source); + + writeMemento(mgmt, dataRecord, targetStore); + writeManagerMemento(mgmt, mgmtRecord, targetStore); + + log.debug("Wrote full memento to "+targetStore+" in "+Time.makeTimeStringRounded(Duration.of(timer))); + } + + public static enum CreateBackupMode { PROMOTION, DEMOTION, CUSTOM; + @Override public String toString() { return super.toString().toLowerCase(); } + } + + public static void createBackup(ManagementContext managementContext, CreateBackupMode mode, MementoCopyMode source) { + if (source==null || source==MementoCopyMode.AUTO) { + switch (mode) { + case PROMOTION: source = MementoCopyMode.REMOTE; break; + case DEMOTION: source = MementoCopyMode.LOCAL; break; + default: + throw new IllegalArgumentException("Cannot detect copy mode for "+mode+"/"+source); + } + } + BrooklynMementoRawData memento = null; + ManagementPlaneSyncRecord planeState = null; + + try { + log.debug("Loading persisted state on "+mode+" for backup purposes"); + memento = newStateMemento(managementContext, source); + try { + planeState = newManagerMemento(managementContext, source); + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + log.warn("Unable to access management plane sync state on "+mode+" (ignoring): "+e, e); + } + + PersistenceObjectStore destinationObjectStore = null; + String backupSpec = managementContext.getConfig().getConfig(BrooklynServerConfig.PERSISTENCE_BACKUPS_LOCATION_SPEC); + String nonBackupSpec = managementContext.getConfig().getConfig(BrooklynServerConfig.PERSISTENCE_LOCATION_SPEC); + try { + String backupContainer = BrooklynServerPaths.newBackupPersistencePathResolver(managementContext) + .location(backupSpec).nonBackupLocation(nonBackupSpec).resolveWithSubpathFor(managementContext, mode.toString()); + destinationObjectStore = BrooklynPersistenceUtils.newPersistenceObjectStore(managementContext, backupSpec, backupContainer); + log.debug("Backing up persisted state on "+mode+", to "+destinationObjectStore.getSummaryName()); + BrooklynPersistenceUtils.writeMemento(managementContext, memento, destinationObjectStore); + BrooklynPersistenceUtils.writeManagerMemento(managementContext, planeState, destinationObjectStore); + if (!memento.isEmpty()) { + log.info("Back-up of persisted state created on "+mode+", in "+destinationObjectStore.getSummaryName()); + } else { + log.debug("Back-up of (empty) persisted state created on "+mode+", in "+destinationObjectStore.getSummaryName()); + } + + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + PersistenceObjectStore failedStore = destinationObjectStore; + if (!Strings.isBlank(backupSpec) && !"localhost".equals(backupSpec)) { + String failedSpec = backupSpec; + backupSpec = "localhost"; + String backupContainer = BrooklynServerPaths.newBackupPersistencePathResolver(managementContext) + .location(backupSpec).nonBackupLocation(nonBackupSpec).resolveWithSubpathFor(managementContext, mode.toString()); + destinationObjectStore = BrooklynPersistenceUtils.newPersistenceObjectStore(managementContext, backupSpec, backupContainer); + log.warn("Persisted state back-up to "+(failedStore!=null ? failedStore.getSummaryName() : failedSpec) + +" failed with "+e, e); + + log.debug("Backing up persisted state on "+mode+", locally because remote failed, to "+destinationObjectStore.getSummaryName()); + BrooklynPersistenceUtils.writeMemento(managementContext, memento, destinationObjectStore); + BrooklynPersistenceUtils.writeManagerMemento(managementContext, planeState, destinationObjectStore); + log.info("Back-up of persisted state created on "+mode+", locally because remote failed, in "+destinationObjectStore.getSummaryName()); + } + } + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + log.warn("Unable to backup management plane sync state on "+mode+" (ignoring): "+e, e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/CatalogItemLibrariesConverter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/CatalogItemLibrariesConverter.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/CatalogItemLibrariesConverter.java new file mode 100644 index 0000000..ce6731b --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/CatalogItemLibrariesConverter.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.mgmt.persist; + +import java.util.ArrayList; +import java.util.Collection; + +import org.apache.brooklyn.api.catalog.CatalogItem.CatalogBundle; +import org.apache.brooklyn.api.catalog.CatalogItem.CatalogItemLibraries; +import org.apache.brooklyn.core.catalog.internal.CatalogBundleDto; + +import com.thoughtworks.xstream.converters.Converter; +import com.thoughtworks.xstream.converters.MarshallingContext; +import com.thoughtworks.xstream.converters.UnmarshallingContext; +import com.thoughtworks.xstream.io.HierarchicalStreamReader; +import com.thoughtworks.xstream.io.HierarchicalStreamWriter; + +/** + * Convert old-style rebind file formats to the latest version. + * The code is needed only during transition to the new version, can be removed after a while. + */ +@Deprecated +public class CatalogItemLibrariesConverter implements Converter { + + @Override + public boolean canConvert(@SuppressWarnings("rawtypes") Class type) { + return CatalogItemLibraries.class.isAssignableFrom(type) || + Collection.class.isAssignableFrom(type); + } + + @Override + public void marshal(Object source, HierarchicalStreamWriter writer, MarshallingContext context) { + context.convertAnother(source); + } + + @Override + public Object unmarshal(HierarchicalStreamReader reader, UnmarshallingContext context) { + Object obj = context.convertAnother(context.currentObject(), context.getRequiredType()); + if (CatalogItemLibraries.class.isAssignableFrom(context.getRequiredType())) { + CatalogItemLibraries libs = (CatalogItemLibraries)obj; + Collection<String> bundles = libs.getBundles(); + Collection<CatalogBundle> libraries = new ArrayList<CatalogBundle>(bundles.size()); + for (String url : bundles) { + libraries.add(new CatalogBundleDto(null, null, url)); + } + return libraries; + } else { + return obj; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/FileBasedObjectStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/FileBasedObjectStore.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/FileBasedObjectStore.java new file mode 100644 index 0000000..cd54053 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/FileBasedObjectStore.java @@ -0,0 +1,425 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.mgmt.persist; + +import static com.google.common.base.Preconditions.checkNotNull; +import static java.lang.String.format; + +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Arrays; +import java.util.Date; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.annotation.Nullable; + +import org.apache.brooklyn.api.mgmt.ManagementContext; +import org.apache.brooklyn.api.mgmt.ha.HighAvailabilityMode; +import org.apache.brooklyn.core.server.BrooklynServerConfig; +import org.apache.brooklyn.util.collections.MutableList; +import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.util.core.internal.ssh.process.ProcessTool; +import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.exceptions.FatalConfigurationRuntimeException; +import org.apache.brooklyn.util.io.FileUtil; +import org.apache.brooklyn.util.os.Os; +import org.apache.brooklyn.util.os.Os.DeletionResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Function; +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableList; +import com.google.common.io.Files; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; + +/** + * @author Andrea Turli + */ +public class FileBasedObjectStore implements PersistenceObjectStore { + + private static final Logger log = LoggerFactory.getLogger(FileBasedObjectStore.class); + + private static final int SHUTDOWN_TIMEOUT_MS = 10*1000; + + private final File basedir; + private final ListeningExecutorService executor; + private ManagementContext mgmt; + private boolean prepared = false; + private boolean deferredBackupNeeded = false; + private AtomicBoolean doneFirstContentiousWrite = new AtomicBoolean(false); + + /** + * @param basedir + */ + public FileBasedObjectStore(File basedir) { + this.basedir = checkPersistenceDirPlausible(basedir); + this.executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); + log.debug("File-based objectStore will use directory {}", basedir); + // don't check accessible yet, we do that when we prepare + } + + @Override + public String getSummaryName() { + return getBaseDir().getAbsolutePath(); + } + + public File getBaseDir() { + return basedir; + } + + public void prepareForMasterUse() { + if (doneFirstContentiousWrite.get()) + return; + synchronized (this) { + if (doneFirstContentiousWrite.get()) + return; + try { + if (deferredBackupNeeded) { + // defer backup and path creation until first write + // this way if node is standby or auto, the backup is not created superfluously + + File backup = backupDirByCopying(basedir); + log.info("Persistence deferred backup, directory "+basedir+" backed up to "+backup.getAbsolutePath()); + + deferredBackupNeeded = false; + } + } catch (Exception e) { + throw Exceptions.propagate(e); + } + doneFirstContentiousWrite.getAndSet(true); + } + } + + @Override + public void createSubPath(String subPath) { + if (!prepared) throw new IllegalStateException("Not yet prepared: "+this); + + File dir = new File(getBaseDir(), subPath); + if (dir.mkdir()) { + try { + FileUtil.setFilePermissionsTo700(dir); + } catch (IOException e) { + log.warn("Unable to set sub-directory permissions to 700 (continuing): "+dir); + } + } else { + if (!dir.exists()) + throw new IllegalStateException("Cannot create "+dir+"; call returned false"); + } + checkPersistenceDirAccessible(dir); + } + + @Override + public StoreObjectAccessor newAccessor(String path) { + if (!prepared) throw new IllegalStateException("Not yet prepared: "+this); + + String tmpExt = ".tmp"; + if (mgmt!=null && mgmt.getManagementNodeId()!=null) tmpExt = "."+mgmt.getManagementNodeId()+tmpExt; + return new FileBasedStoreObjectAccessor(new File(Os.mergePaths(getBaseDir().getAbsolutePath(), path)), tmpExt); + } + + @Override + public List<String> listContentsWithSubPath(final String parentSubPath) { + if (!prepared) throw new IllegalStateException("Not yet prepared: "+this); + + Preconditions.checkNotNull(parentSubPath); + File subPathDir = new File(basedir, parentSubPath); + + FileFilter fileFilter = new FileFilter() { + @Override public boolean accept(File file) { + // An inclusion filter would be safer than exclusion + return !file.getName().endsWith(".tmp") && !file.getName().endsWith(".swp"); + } + }; + File[] subPathDirFiles = subPathDir.listFiles(fileFilter); + if (subPathDirFiles==null) return ImmutableList.<String>of(); + return FluentIterable.from(Arrays.asList(subPathDirFiles)) + .transform(new Function<File, String>() { + @Nullable + @Override + public String apply(@Nullable File input) { + return format("%s/%s", parentSubPath, input.getName()); + } + }).toList(); + } + + @Override + public void close() { + executor.shutdown(); + try { + executor.awaitTermination(SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw Exceptions.propagate(e); + } + } + + @Override + public String toString() { + return Objects.toStringHelper(this).add("basedir", basedir).toString(); + } + + @Override + public void injectManagementContext(ManagementContext mgmt) { + if (this.mgmt!=null && !this.mgmt.equals(mgmt)) + throw new IllegalStateException("Cannot change mgmt context of "+this); + this.mgmt = mgmt; + } + + @Override + public void prepareForSharedUse(@Nullable PersistMode persistMode, HighAvailabilityMode haMode) { + if (mgmt==null) throw new NullPointerException("Must inject ManagementContext before preparing "+this); + + if (persistMode==null || persistMode==PersistMode.DISABLED) { + // TODO is this check needed? shouldn't come here now without persistence on. + prepared = true; + return; + } + + @SuppressWarnings("deprecation") + Boolean backups = mgmt.getConfig().getConfig(BrooklynServerConfig.PERSISTENCE_BACKUPS_REQUIRED); + if (Boolean.TRUE.equals(backups)) { + log.warn("Using legacy backup for "+this+"; functionality will be removed in future versions, in favor of promotion/demotion-specific backups to a configurable backup location."); + } + // default backups behaviour here changed to false, Nov 2014, because these backups are now legacy; + // we prefer the made when persistence is enabled, using routines in BrooklynPersistenceUtils + if (backups==null) backups = false; + + File dir = getBaseDir(); + try { + String persistencePath = dir.getAbsolutePath(); + + switch (persistMode) { + case CLEAN: + if (dir.exists()) { + checkPersistenceDirAccessible(dir); + try { + if (backups) { + File old = backupDirByMoving(dir); + log.info("Persistence mode CLEAN, directory "+persistencePath+" backed up to "+old.getAbsolutePath()); + } else { + deleteCompletely(); + log.info("Persistence mode CLEAN, directory "+persistencePath+" deleted"); + } + } catch (IOException e) { + throw new FatalConfigurationRuntimeException("Error using existing persistence directory "+dir.getAbsolutePath(), e); + } + } else { + log.debug("Persistence mode CLEAN, directory "+persistencePath+", no previous state"); + } + break; + case REBIND: + checkPersistenceDirAccessible(dir); + checkPersistenceDirNonEmpty(dir); + try { + if (backups) { + if (haMode==HighAvailabilityMode.MASTER) { + File backup = backupDirByCopying(dir); + log.info("Persistence mode REBIND, directory "+persistencePath+" backed up to "+backup.getAbsolutePath()); + } else { + deferredBackupNeeded = true; + } + } + } catch (IOException e) { + throw new FatalConfigurationRuntimeException("Error backing up persistence directory "+dir.getAbsolutePath(), e); + } + break; + case AUTO: + if (dir.exists()) { + checkPersistenceDirAccessible(dir); + } + if (dir.exists() && !isMementoDirExistButEmpty(dir)) { + try { + if (backups) { + if (haMode==HighAvailabilityMode.MASTER) { + File backup = backupDirByCopying(dir); + log.info("Persistence mode REBIND, directory "+persistencePath+" backed up to "+backup.getAbsolutePath()); + } else { + deferredBackupNeeded = true; + } + } + } catch (IOException e) { + throw new FatalConfigurationRuntimeException("Error backing up persistence directory "+dir.getAbsolutePath(), e); + } + } else { + log.debug("Persistence mode AUTO, directory "+persistencePath+", no previous state"); + } + break; + default: + throw new FatalConfigurationRuntimeException("Unexpected persist mode "+persistMode+"; modified during initialization?!"); + }; + + if (!dir.exists()) { + boolean success = dir.mkdirs(); + if (success) { + FileUtil.setFilePermissionsTo700(dir); + } else { + throw new FatalConfigurationRuntimeException("Failed to create persistence directory "+dir); + } + } + + } catch (Exception e) { + throw Exceptions.propagate(e); + } + + prepared = true; + } + + protected File checkPersistenceDirPlausible(File dir) { + checkNotNull(dir, "directory"); + if (!dir.exists()) return dir; + if (dir.isFile()) throw new FatalConfigurationRuntimeException("Invalid persistence directory" + dir + ": must not be a file"); + if (!(dir.canRead() && dir.canWrite())) throw new FatalConfigurationRuntimeException("Invalid persistence directory" + dir + ": " + + (!dir.canRead() ? "not readable" : + (!dir.canWrite() ? "not writable" : "unknown reason"))); + return dir; + } + + protected void checkPersistenceDirAccessible(File dir) { + if (!(dir.exists() && dir.isDirectory() && dir.canRead() && dir.canWrite())) { + FatalConfigurationRuntimeException problem = new FatalConfigurationRuntimeException("Invalid persistence directory " + dir + ": " + + (!dir.exists() ? "does not exist" : + (!dir.isDirectory() ? "not a directory" : + (!dir.canRead() ? "not readable" : + (!dir.canWrite() ? "not writable" : "unknown reason"))))); + log.debug("Invalid persistence directory "+dir+" (rethrowing): "+problem, problem); + } else { + log.debug("Created dir {} for {}", dir, this); + } + } + + protected void checkPersistenceDirNonEmpty(File persistenceDir) { + FatalConfigurationRuntimeException problem; + if (!persistenceDir.exists()) { + problem = new FatalConfigurationRuntimeException("Invalid persistence directory "+persistenceDir+" because directory does not exist"); + log.debug("Invalid persistence directory "+persistenceDir+" (rethrowing): "+problem, problem); + throw problem; + } if (isMementoDirExistButEmpty(persistenceDir)) { + problem = new FatalConfigurationRuntimeException("Invalid persistence directory "+persistenceDir+" because directory is empty"); + log.debug("Invalid persistence directory "+persistenceDir+" (rethrowing): "+problem, problem); + throw problem; + } + } + + protected File backupDirByCopying(File dir) throws IOException, InterruptedException { + File parentDir = dir.getParentFile(); + String simpleName = dir.getName(); + String timestamp = new SimpleDateFormat("yyyyMMdd-hhmmssSSS").format(new Date()); + File backupDir = new File(parentDir, simpleName+"."+timestamp+".bak"); + + FileUtil.copyDir(dir, backupDir); + FileUtil.setFilePermissionsTo700(backupDir); + + return backupDir; + } + + protected File backupDirByMoving(File dir) throws InterruptedException, IOException { + File parentDir = dir.getParentFile(); + String simpleName = dir.getName(); + String timestamp = new SimpleDateFormat("yyyyMMdd-hhmmssSSS").format(new Date()); + File newDir = new File(parentDir, simpleName+"."+timestamp+".bak"); + + FileUtil.moveDir(dir, newDir); + return newDir; + } + + private static boolean WARNED_ON_NON_ATOMIC_FILE_UPDATES = false; + /** + * Attempts an fs level atomic move then fall back to pure java rename. + * Assumes files are on same mount point. + * <p> + * TODO Java 7 gives an atomic Files.move() which would be preferred. + */ + static void moveFile(File srcFile, File destFile) throws IOException, InterruptedException { + // Try rename first - it is a *much* cheaper call than invoking a system call in Java. + // However, rename is not guaranteed cross platform to succeed if the destination exists, + // and not guaranteed to be atomic, but it usually seems to do the right thing... + boolean result; + result = srcFile.renameTo(destFile); + if (result) { + if (log.isTraceEnabled()) log.trace("java rename of {} to {} completed", srcFile, destFile); + return; + } + + if (!Os.isMicrosoftWindows()) { + // this command, if it succeeds, is guaranteed to be atomic, and it will usually overwrite + String cmd = "mv '"+srcFile.getAbsolutePath()+"' '"+destFile.getAbsolutePath()+"'"; + + int exitStatus = new ProcessTool().execCommands(MutableMap.<String,String>of(), MutableList.of(cmd), null); + // prefer the above to the below because it wraps it in the appropriate bash +// Process proc = Runtime.getRuntime().exec(cmd); +// result = proc.waitFor(); + + if (log.isTraceEnabled()) log.trace("FS move of {} to {} completed, code {}", new Object[] { srcFile, destFile, exitStatus }); + if (exitStatus == 0) return; + } + + // finally try a delete - but explicitly warn this is not going to be atomic + // so if another node reads it might see no master + if (!WARNED_ON_NON_ATOMIC_FILE_UPDATES) { + WARNED_ON_NON_ATOMIC_FILE_UPDATES = true; + log.warn("Unable to perform atomic file update ("+srcFile+" to "+destFile+"); file system not recommended for production HA/DR"); + } + destFile.delete(); + result = srcFile.renameTo(destFile); + if (log.isTraceEnabled()) log.trace("java delete and rename of {} to {} completed, code {}", new Object[] { srcFile, destFile, result }); + if (result) + return; + Files.copy(srcFile, destFile); + srcFile.delete(); + throw new IOException("Could not move "+destFile+" to "+srcFile); + } + + /** + * True if directory exists, but is entirely empty, or only contains empty directories. + */ + static boolean isMementoDirExistButEmpty(String dir) { + return isMementoDirExistButEmpty(new File(dir)); + } + + static boolean isMementoDirExistButEmpty(File dir) { + if (!dir.exists()) return false; + File[] contents = dir.listFiles(); + if (contents == null) return false; + + for (File sub : contents) { + if (sub.isFile()) return false; + if (sub.isDirectory() && sub.listFiles().length > 0) return false; + } + return true; + } + + @Override + public void deleteCompletely() { + deleteCompletely(getBaseDir()); + } + + public static void deleteCompletely(File d) { + DeletionResult result = Os.deleteRecursively(d); + if (!result.wasSuccessful()) + log.warn("Unable to delete persistence dir "+d); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/FileBasedStoreObjectAccessor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/FileBasedStoreObjectAccessor.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/FileBasedStoreObjectAccessor.java new file mode 100644 index 0000000..41aefc0 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/FileBasedStoreObjectAccessor.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.mgmt.persist; + +import java.io.File; +import java.io.IOException; +import java.util.Date; + +import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.io.FileUtil; +import org.apache.brooklyn.util.text.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Charsets; +import com.google.common.base.Objects; +import com.google.common.base.Throwables; +import com.google.common.io.Files; + +/** + * Reads/writes to a file. This impl does it immediately, with no synchronisation. + * Callers should wrap in {@link StoreObjectAccessorLocking} if multiple threads may be accessing this. + * + * @author aled + */ +public class FileBasedStoreObjectAccessor implements PersistenceObjectStore.StoreObjectAccessor { + private static final Logger LOG = LoggerFactory.getLogger(FileBasedStoreObjectAccessor.class); + + public FileBasedStoreObjectAccessor(File file, String tmpExtension) { + this.file = file; + this.tmpFile = new File(file.getParentFile(), file.getName()+(Strings.isBlank(tmpExtension) ? ".tmp" : tmpExtension)); + } + + private final File file; + private final File tmpFile; + + @Override + public String get() { + try { + if (!exists()) return null; + return Files.asCharSource(file, Charsets.UTF_8).read(); + } catch (IOException e) { + throw Throwables.propagate(e); + } + } + + @Override + public byte[] getBytes() { + try { + if (!exists()) return null; + return Files.asByteSource(file).read(); + } catch (IOException e) { + throw Throwables.propagate(e); + } + } + + @Override + public boolean exists() { + return file.exists(); + } + + // Setting permissions to 600 reduces objectAccessor.put performance from about 5000 per second to 3000 per second + // in java 6. With Java 7's Files.setPosixFilePermissions, this might well improve. + @Override + public void put(String val) { + try { + if (val==null) val = ""; + FileUtil.setFilePermissionsTo600(tmpFile); + Files.write(val, tmpFile, Charsets.UTF_8); + FileBasedObjectStore.moveFile(tmpFile, file); + } catch (IOException e) { + throw Exceptions.propagate(e); + } catch (InterruptedException e) { + throw Exceptions.propagate(e); + } + } + + @Override + public void append(String val) { + try { + if (val==null) val = ""; + FileUtil.setFilePermissionsTo600(file); + Files.append(val, file, Charsets.UTF_8); + + } catch (IOException e) { + throw Exceptions.propagate(e); + } + } + + @Override + public void delete() { + if (!file.delete()) { + if (!file.exists()) { + LOG.debug("Unable to delete " + file.getAbsolutePath() + ". Probably did not exist."); + } else { + LOG.warn("Unable to delete " + file.getAbsolutePath() + ". Probably still locked."); + } + } + if (tmpFile.exists() && !tmpFile.delete()) { + // tmpFile is probably already deleted, so don't even log debug if it does not exist + LOG.warn("Unable to delete " + tmpFile.getAbsolutePath() + ". Probably still locked."); + } + } + + @Override + public Date getLastModifiedDate() { + long result = file.lastModified(); + if (result==0) return null; + return new Date(result); + } + + @Override + public String toString() { + return Objects.toStringHelper(this).add("file", file).toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/LocationWithObjectStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/LocationWithObjectStore.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/LocationWithObjectStore.java new file mode 100644 index 0000000..9a5c693 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/LocationWithObjectStore.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.mgmt.persist; + +/** Marker interface for locations which can create a {@link PersistenceObjectStore} */ +public interface LocationWithObjectStore { + + /** Creates a {@link PersistenceObjectStore} pointed at the given container/directory. */ + public PersistenceObjectStore newPersistenceObjectStore(String container); + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/MementoSerializer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/MementoSerializer.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/MementoSerializer.java new file mode 100644 index 0000000..1e924ec --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/MementoSerializer.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.mgmt.persist; + +import org.apache.brooklyn.api.mgmt.ha.ManagementNodeSyncRecord; +import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMemento; +import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoPersister.LookupContext; + +/** Serializes the given object; it is often used with {@link BrooklynMemento} for persisting and restoring, + * though it can be used for any object (and is also used for the {@link ManagementNodeSyncRecord} instances) */ +public interface MementoSerializer<T> { + + public static final MementoSerializer<String> NOOP = new MementoSerializer<String>() { + @Override + public String toString(String memento) { + return memento; + } + @Override + public String fromString(String string) { + return string; + } + @Override + public void setLookupContext(LookupContext lookupContext) { + // no-op + } + @Override + public void unsetLookupContext() { + // no-op + } + }; + + String toString(T memento); + T fromString(String string); + void setLookupContext(LookupContext lookupContext); + void unsetLookupContext(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/PersistMode.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/PersistMode.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/PersistMode.java new file mode 100644 index 0000000..fac764b --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/PersistMode.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.mgmt.persist; + +public enum PersistMode { + DISABLED, + AUTO, + REBIND, + CLEAN; +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/PersistenceActivityMetrics.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/PersistenceActivityMetrics.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/PersistenceActivityMetrics.java new file mode 100644 index 0000000..14943bb --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/PersistenceActivityMetrics.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.mgmt.persist; + +import java.util.List; +import java.util.Map; + +import org.apache.brooklyn.util.collections.MutableList; +import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.util.time.Duration; + +public class PersistenceActivityMetrics { + + final static int MAX_ERRORS = 200; + + long count=0, failureCount=0; + Long lastSuccessTime, lastDuration, lastFailureTime; + List<Map<String,Object>> errorMessages = MutableList.of(); + + public void noteSuccess(Duration duration) { + count++; + lastSuccessTime = System.currentTimeMillis(); + lastDuration = duration.toMilliseconds(); + } + + public void noteFailure(Duration duration) { + count++; + failureCount++; + lastFailureTime = System.currentTimeMillis(); + lastDuration = duration!=null ? duration.toMilliseconds() : -1; + } + + public void noteError(String error) { + noteErrorObject(error); + } + + public void noteError(List<?> error) { + noteErrorObject(error); + } + + /** error should be json-serializable; exceptions can be problematic */ + protected synchronized void noteErrorObject(Object error) { + errorMessages.add(0, MutableMap.<String,Object>of("error", error, "timestamp", System.currentTimeMillis())); + while (errorMessages.size() > MAX_ERRORS) { + errorMessages.remove(errorMessages.size()-1); + } + } + + public synchronized Map<String,Object> asMap() { + Map<String,Object> result = MutableMap.of(); + result.put("count", count); + result.put("lastSuccessTimeUtc", lastSuccessTime); + result.put("lastSuccessTimeMillisSince", since(lastSuccessTime)); + result.put("lastDuration", lastDuration); + result.put("failureCount", failureCount); + result.put("lastFailureTimeUtc", lastFailureTime); + result.put("lastFailureTimeMillisSince", since(lastFailureTime)); + result.put("errorMessages", MutableList.copyOf(errorMessages)); + return result; + } + + private Long since(Long time) { + if (time==null) return null; + return System.currentTimeMillis() - time; + } + +} \ No newline at end of file
