Github user kishorvpatil commented on a diff in the pull request: https://github.com/apache/storm/pull/2345#discussion_r143559023 --- Diff: storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java --- @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.localizer; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.LinkOption; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.storm.blobstore.BlobStore; +import org.apache.storm.blobstore.ClientBlobStore; +import org.apache.storm.generated.AuthorizationException; +import org.apache.storm.generated.KeyNotFoundException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Represents a blob that is cached locally on disk by the supervisor. + */ +public abstract class LocallyCachedBlob { + private static final Logger LOG = LoggerFactory.getLogger(LocallyCachedBlob.class); + public static final long NOT_DOWNLOADED_VERSION = -1; + // A callback that does nothing. + private static final BlobChangingCallback NOOP_CB = (assignment, port, resource, go) -> {}; + + private long lastUsed = System.currentTimeMillis(); + private final Map<PortAndAssignment, BlobChangingCallback> references = new HashMap<>(); + private final String blobDescription; + private final String blobKey; + private CompletableFuture<Void> doneUpdating = null; + + /** + * Create a new LocallyCachedBlob. + * @param blobDescription a description of the blob this represents. Typically it should at least be the blob key, but ideally also + * include if it is an archive or not, what user or topology it is for, or if it is a storm.jar etc. + */ + protected LocallyCachedBlob(String blobDescription, String blobKey) { + this.blobDescription = blobDescription; + this.blobKey = blobKey; + } + + /** + * Get the version of the blob cached locally. If the version is unknown or it has not been downloaded NOT_DOWNLOADED_VERSION + * should be returned. + * PRECONDITION: this can only be called with a lock on this instance held. + */ + public abstract long getLocalVersion(); + + /** + * Get the version of the blob in the blob store. + * PRECONDITION: this can only be called with a lock on this instance held. + */ + public abstract long getRemoteVersion(ClientBlobStore store) throws KeyNotFoundException, AuthorizationException; + + /** + * Download the latest version to a temp location. This may also include unzipping some or all of the data to a temp location. + * PRECONDITION: this can only be called with a lock on this instance held. + * @param store the store to us to download the data. + * @return the version that was downloaded. + */ + public abstract long downloadToTempLocation(ClientBlobStore store) throws IOException, KeyNotFoundException, AuthorizationException; + + /** + * Commit the new version and make it available for the end user. + * PRECONDITION: uncompressToTempLocationIfNeeded will have been called. + * PRECONDITION: this can only be called with a lock on this instance held. + * @param version the version of the blob to commit. + */ + public abstract void commitNewVersion(long version) throws IOException; + + /** + * Clean up any temporary files. This will be called after updating a blob, either successfully or if an error has occured. + * The goal is to find any files that may be left over and remove them so space is not leaked. + * PRECONDITION: this can only be called with a lock on this instance held. + */ + public abstract void cleanupOrphanedData() throws IOException; + + /** + * Completely remove anything that is cached locally for this blob and all tracking files also stored for it. + * This will be called after the blob was determined to no longer be needed in the cache. + * PRECONDITION: this can only be called with a lock on this instance held. + */ + public abstract void completelyRemove() throws IOException; + + /** + * Get the amount of disk space that is used by this blob. If the blob is uncompressed it should be the sum of the space used by all + * of the uncompressed files. In general this will not be called with any locks held so it is a good idea to cache it and updated it + * when committing a new version. + */ + public abstract long getSizeOnDisk(); + + /** + * Updates the last updated time. This should be called when references are added or removed. + */ + private synchronized void touch() { + lastUsed = System.currentTimeMillis(); + } + + /** + * Get the last time that this used for LRU calculations. + */ + public synchronized long getLastUsed() { + return lastUsed; + } + + /** + * Return true if this blob is actively being used, else false (meaning it can be deleted, but might not be). + */ + public synchronized boolean isUsed() { + return !references.isEmpty(); + } + + /** + * Get the size of p in bytes. + * @param p the path to read. + * @return the size of p in bytes. + */ + protected long getSizeOnDisk(Path p) throws IOException { + if (!Files.exists(p)) { + return 0; + } else if (Files.isRegularFile(p)) { + return Files.size(p); + } else { + //We will not follow sym links + return Files.walk(p) + .filter((subp) -> Files.isRegularFile(subp, LinkOption.NOFOLLOW_LINKS)) + .mapToLong((subp) -> { + try { + return Files.size(subp); + } catch (IOException e) { + LOG.warn("Could not get the size of "); + } + return 0; + }).sum(); + } + } + + /** + * Mark that a given port and assignemnt are using this. + * @param pna the slot and assignment that are using this blob. + * @param cb an optional callback indicating that they want to know/synchronize when a blob is updated. + */ + public void addReference(final PortAndAssignment pna, BlobChangingCallback cb) { + if (cb == null) { + cb = NOOP_CB; + } + if (references.put(pna, cb) != null) { + LOG.warn("{} already has a reservation for {}", pna, blobDescription); + } + } + + /** + * Removes a reservation for this blob from a given slot and assignemnt. + * @param pna the slot + assignment that no longer needs this blob. + */ + public void removeReference(final PortAndAssignment pna) { + if (references.remove(pna) == null) { + LOG.warn("{} had no reservation for {}", pna, blobDescription); + } + } + + /** + * Inform all of the callbacks that a change is going to happen and then wait for + * them to all get back that it is OK to make that change. + */ + public synchronized void informAllOfChangeAndWaitForConsensus() { + CountDownLatch cdl = new CountDownLatch(references.size()); + doneUpdating = new CompletableFuture<>(); + for (Map.Entry<PortAndAssignment, BlobChangingCallback> entry : references.entrySet()) { + GoodToGo gtg = new GoodToGo(cdl, doneUpdating); + try { + PortAndAssignment pna = entry.getKey(); + BlobChangingCallback cb = entry.getValue(); + //TODO we probably want to not use this, or make it just return something that has less power to modify things + cb.blobChanging(pna.getAssignment(), pna.getPort(), this, gtg); + } finally { + gtg.countDownIfLatchWasNotGotten(); + } + } + try { + cdl.await(3, TimeUnit.MINUTES); + } catch (InterruptedException e) { + //TODO need to think about error handling here in general. --- End diff -- Should we at least log the error being ignored?
---