Github user kishorvpatil commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2345#discussion_r143559023
  
    --- Diff: 
storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java ---
    @@ -0,0 +1,220 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.localizer;
    +
    +import java.io.IOException;
    +import java.nio.file.Files;
    +import java.nio.file.LinkOption;
    +import java.nio.file.Path;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.TimeUnit;
    +import org.apache.storm.blobstore.BlobStore;
    +import org.apache.storm.blobstore.ClientBlobStore;
    +import org.apache.storm.generated.AuthorizationException;
    +import org.apache.storm.generated.KeyNotFoundException;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Represents a blob that is cached locally on disk by the supervisor.
    + */
    +public abstract class LocallyCachedBlob {
    +    private static final Logger LOG = 
LoggerFactory.getLogger(LocallyCachedBlob.class);
    +    public static final long NOT_DOWNLOADED_VERSION = -1;
    +    // A callback that does nothing.
    +    private static final BlobChangingCallback NOOP_CB = (assignment, port, 
resource, go) -> {};
    +
    +    private long lastUsed = System.currentTimeMillis();
    +    private final Map<PortAndAssignment, BlobChangingCallback> references 
= new HashMap<>();
    +    private final String blobDescription;
    +    private final String blobKey;
    +    private CompletableFuture<Void> doneUpdating = null;
    +
    +    /**
    +     * Create a new LocallyCachedBlob.
    +     * @param blobDescription a description of the blob this represents.  
Typically it should at least be the blob key, but ideally also
    +     * include if it is an archive or not, what user or topology it is 
for, or if it is a storm.jar etc.
    +     */
    +    protected LocallyCachedBlob(String blobDescription, String blobKey) {
    +        this.blobDescription = blobDescription;
    +        this.blobKey = blobKey;
    +    }
    +
    +    /**
    +     * Get the version of the blob cached locally.  If the version is 
unknown or it has not been downloaded NOT_DOWNLOADED_VERSION
    +     * should be returned.
    +     * PRECONDITION: this can only be called with a lock on this instance 
held.
    +     */
    +    public abstract long getLocalVersion();
    +
    +    /**
    +     * Get the version of the blob in the blob store.
    +     * PRECONDITION: this can only be called with a lock on this instance 
held.
    +     */
    +    public abstract long getRemoteVersion(ClientBlobStore store) throws 
KeyNotFoundException, AuthorizationException;
    +
    +    /**
    +     * Download the latest version to a temp location. This may also 
include unzipping some or all of the data to a temp location.
    +     * PRECONDITION: this can only be called with a lock on this instance 
held.
    +     * @param store the store to us to download the data.
    +     * @return the version that was downloaded.
    +     */
    +    public abstract long downloadToTempLocation(ClientBlobStore store) 
throws IOException, KeyNotFoundException, AuthorizationException;
    +
    +    /**
    +     * Commit the new version and make it available for the end user.
    +     * PRECONDITION: uncompressToTempLocationIfNeeded will have been 
called.
    +     * PRECONDITION: this can only be called with a lock on this instance 
held.
    +     * @param version the version of the blob to commit.
    +     */
    +    public abstract void commitNewVersion(long version) throws IOException;
    +
    +    /**
    +     * Clean up any temporary files.  This will be called after updating a 
blob, either successfully or if an error has occured.
    +     * The goal is to find any files that may be left over and remove them 
so space is not leaked.
    +     * PRECONDITION: this can only be called with a lock on this instance 
held.
    +     */
    +    public abstract void cleanupOrphanedData() throws IOException;
    +
    +    /**
    +     * Completely remove anything that is cached locally for this blob and 
all tracking files also stored for it.
    +     * This will be called after the blob was determined to no longer be 
needed in the cache.
    +     * PRECONDITION: this can only be called with a lock on this instance 
held.
    +     */
    +    public abstract void completelyRemove() throws IOException;
    +
    +    /**
    +     * Get the amount of disk space that is used by this blob.  If the 
blob is uncompressed it should be the sum of the space used by all
    +     * of the uncompressed files.  In general this will not be called with 
any locks held so it is a good idea to cache it and updated it
    +     * when committing a new version.
    +     */
    +    public abstract long getSizeOnDisk();
    +
    +    /**
    +     * Updates the last updated time.  This should be called when 
references are added or removed.
    +     */
    +    private synchronized void touch() {
    +        lastUsed = System.currentTimeMillis();
    +    }
    +
    +    /**
    +     * Get the last time that this used for LRU calculations.
    +     */
    +    public synchronized long getLastUsed() {
    +        return lastUsed;
    +    }
    +
    +    /**
    +     * Return true if this blob is actively being used, else false 
(meaning it can be deleted, but might not be).
    +     */
    +    public synchronized boolean isUsed() {
    +        return !references.isEmpty();
    +    }
    +
    +    /**
    +     * Get the size of p in bytes.
    +     * @param p the path to read.
    +     * @return the size of p in bytes.
    +     */
    +    protected long getSizeOnDisk(Path p) throws IOException {
    +        if (!Files.exists(p)) {
    +            return 0;
    +        } else if (Files.isRegularFile(p)) {
    +            return Files.size(p);
    +        } else {
    +            //We will not follow sym links
    +            return Files.walk(p)
    +                .filter((subp) -> Files.isRegularFile(subp, 
LinkOption.NOFOLLOW_LINKS))
    +                .mapToLong((subp) -> {
    +                    try {
    +                        return Files.size(subp);
    +                    } catch (IOException e) {
    +                        LOG.warn("Could not get the size of ");
    +                    }
    +                    return 0;
    +                }).sum();
    +        }
    +    }
    +
    +    /**
    +     * Mark that a given port and assignemnt are using this.
    +     * @param pna the slot and assignment that are using this blob.
    +     * @param cb an optional callback indicating that they want to 
know/synchronize when a blob is updated.
    +     */
    +    public void addReference(final PortAndAssignment pna, 
BlobChangingCallback cb) {
    +        if (cb == null) {
    +            cb = NOOP_CB;
    +        }
    +        if (references.put(pna, cb) != null) {
    +            LOG.warn("{} already has a reservation for {}", pna, 
blobDescription);
    +        }
    +    }
    +
    +    /**
    +     * Removes a reservation for this blob from a given slot and 
assignemnt.
    +     * @param pna the slot + assignment that no longer needs this blob.
    +     */
    +    public void removeReference(final PortAndAssignment pna) {
    +        if (references.remove(pna) == null) {
    +            LOG.warn("{} had no reservation for {}", pna, blobDescription);
    +        }
    +    }
    +
    +    /**
    +     * Inform all of the callbacks that a change is going to happen and 
then wait for
    +     * them to all get back that it is OK to make that change.
    +     */
    +    public synchronized void informAllOfChangeAndWaitForConsensus() {
    +        CountDownLatch cdl = new CountDownLatch(references.size());
    +        doneUpdating = new CompletableFuture<>();
    +        for (Map.Entry<PortAndAssignment, BlobChangingCallback> entry : 
references.entrySet()) {
    +            GoodToGo gtg = new GoodToGo(cdl, doneUpdating);
    +            try {
    +                PortAndAssignment pna = entry.getKey();
    +                BlobChangingCallback cb = entry.getValue();
    +                //TODO we probably want to not use this, or make it just 
return something that has less power to modify things
    +                cb.blobChanging(pna.getAssignment(), pna.getPort(), this, 
gtg);
    +            } finally {
    +                gtg.countDownIfLatchWasNotGotten();
    +            }
    +        }
    +        try {
    +            cdl.await(3, TimeUnit.MINUTES);
    +        } catch (InterruptedException e) {
    +            //TODO need to think about error handling here in general.
    --- End diff --
    
    Should we at least log the error being ignored?


---

Reply via email to