Ethanlm commented on a change in pull request #3363:
URL: https://github.com/apache/storm/pull/3363#discussion_r555840062



##########
File path: 
external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
##########
@@ -312,4 +321,70 @@ public void shutdown() {
             timer.cancel();
         }
     }
+
+    /**
+     * Get the last update time of any blob.
+     *
+     * @return the last updated time of blobs within the blobstore.
+     * @throws IOException on any error
+     */
+    public long getLastBlobUpdateTime() throws IOException {
+        Path modTimeFile = new Path(fullPath, BLOBSTORE_UPDATE_TIME_FILE);
+        if (!fileSystem.exists(modTimeFile)) {
+            return -1L;
+        }
+        FSDataInputStream inputStream = fileSystem.open(modTimeFile);
+        String timestamp = IOUtils.toString(inputStream, "UTF-8");
+        inputStream.close();
+        try {
+            long modTime = Long.parseLong(timestamp);

Review comment:
       nit: can be changed to updateTime

##########
File path: storm-client/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java
##########
@@ -182,5 +183,12 @@ public final void setBlobMeta(String key, SettableBlobMeta 
meta) throws Authoriz
         void run(ClientBlobStore blobStore) throws Exception;
     }
 
-
+    /**
+     * Client facing API to get the last update time of existing blobs in a 
blobstore.  This only required for use on
+     * supervisors.
+     *
+     * @return the timestamp of when the blobstore was last modified.  -1L if 
the blobstore

Review comment:
       nit: last updated

##########
File path: 
storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
##########
@@ -295,6 +301,90 @@ public String getKey() {
 
     public abstract boolean isFullyDownloaded();
 
+    /**
+     * Checks to see if the local blob requires update with respect to a 
remote blob.
+     *
+     * @param blobStore the client blobstore
+     * @param remoteBlobstoreUpdateTime last update time of remote blobstore
+     * @return true of the local blob requires update, false otherwise.
+     *
+     * @throws KeyNotFoundException if the remote blob is missing
+     * @throws AuthorizationException if authorization is failed
+     */
+    boolean requiresUpdate(ClientBlobStore blobStore, long 
remoteBlobstoreUpdateTime) throws KeyNotFoundException, AuthorizationException {
+        if (!this.isUsed()) {
+            return false;
+        }
+
+        if (!this.isFullyDownloaded()) {
+            return true;
+        }
+
+        // If we are already up to date with respect to the remote blob store, 
don't query
+        // the remote blobstore for the remote file.  This reduces Hadoop 
namenode impact of
+        // 100's of supervisors querying multiple blobs.
+        if (remoteBlobstoreUpdateTime > 0 && this.localUpdateTime == 
remoteBlobstoreUpdateTime) {
+            LOG.debug("{} is up to date, blob updatedModTime matches remote 
timestamp {}", this, remoteBlobstoreUpdateTime);

Review comment:
       nit: updatedModTime can be changed to "localUpdateTime"

##########
File path: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
##########
@@ -1427,6 +1427,18 @@ public void launchServer() throws Exception {
                     }
                 });
 
+            // Periodically make sure the blobstore modtime is up to date.  
This could have failed if Nimbus encountered
+            // an exception updating the mod time, or due to bugs causing a 
missed update of the blobstore mod time on a blob

Review comment:
       nit: two"mod time" can be changed to "update time"

##########
File path: 
external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
##########
@@ -312,4 +321,70 @@ public void shutdown() {
             timer.cancel();
         }
     }
+
+    /**
+     * Get the last update time of any blob.
+     *
+     * @return the last updated time of blobs within the blobstore.
+     * @throws IOException on any error
+     */
+    public long getLastBlobUpdateTime() throws IOException {
+        Path modTimeFile = new Path(fullPath, BLOBSTORE_UPDATE_TIME_FILE);
+        if (!fileSystem.exists(modTimeFile)) {
+            return -1L;
+        }
+        FSDataInputStream inputStream = fileSystem.open(modTimeFile);
+        String timestamp = IOUtils.toString(inputStream, "UTF-8");
+        inputStream.close();
+        try {
+            long modTime = Long.parseLong(timestamp);
+            return modTime;
+        } catch (NumberFormatException e) {
+            LOG.error("Invalid blobstore modtime {} in file {}", timestamp, 
modTimeFile);
+            return -1L;
+        }
+    }
+
+    /**
+     * Updates the last updated time of existing blobstores to the current 
time.
+     *
+     * @throws IOException on any error
+     */
+    public synchronized void updateLastBlobUpdateTime() throws IOException {
+        Long timestamp = Time.currentTimeMillis();
+        Path modTimeFile = new Path(fullPath, BLOBSTORE_UPDATE_TIME_FILE);

Review comment:
       nit: can be changed to updateTimeFile

##########
File path: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
##########
@@ -1427,6 +1427,18 @@ public void launchServer() throws Exception {
                     }
                 });
 
+            // Periodically make sure the blobstore modtime is up to date.  
This could have failed if Nimbus encountered

Review comment:
       nit: modtime  can be changed to "update time"

##########
File path: 
external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
##########
@@ -312,4 +321,70 @@ public void shutdown() {
             timer.cancel();
         }
     }
+
+    /**
+     * Get the last update time of any blob.
+     *
+     * @return the last updated time of blobs within the blobstore.
+     * @throws IOException on any error
+     */
+    public long getLastBlobUpdateTime() throws IOException {
+        Path modTimeFile = new Path(fullPath, BLOBSTORE_UPDATE_TIME_FILE);
+        if (!fileSystem.exists(modTimeFile)) {
+            return -1L;
+        }
+        FSDataInputStream inputStream = fileSystem.open(modTimeFile);
+        String timestamp = IOUtils.toString(inputStream, "UTF-8");
+        inputStream.close();
+        try {
+            long modTime = Long.parseLong(timestamp);
+            return modTime;
+        } catch (NumberFormatException e) {
+            LOG.error("Invalid blobstore modtime {} in file {}", timestamp, 
modTimeFile);

Review comment:
       nit: can be changed to update time

##########
File path: 
external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
##########
@@ -312,4 +321,70 @@ public void shutdown() {
             timer.cancel();
         }
     }
+
+    /**
+     * Get the last update time of any blob.
+     *
+     * @return the last updated time of blobs within the blobstore.
+     * @throws IOException on any error
+     */
+    public long getLastBlobUpdateTime() throws IOException {
+        Path modTimeFile = new Path(fullPath, BLOBSTORE_UPDATE_TIME_FILE);

Review comment:
       nit: can be changed to updateTimeFile

##########
File path: 
storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
##########
@@ -261,6 +257,9 @@ private LocalizedResource getUserFile(String user, String 
key) {
     }
 
     private CompletableFuture<Void> downloadOrUpdate(Collection<? extends 
LocallyCachedBlob> blobs) {
+
+        final long remoteBlobstoreModTime = getRemoteBlobstoreUpdateTime();

Review comment:
       nit: change be changed to `remoteBlobstoreModTime` to 
`remoteBlobstoreUpdateTime`

##########
File path: 
storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
##########
@@ -312,6 +289,17 @@ private LocalizedResource getUserFile(String user, String 
key) {
         return CompletableFuture.allOf(all);
     }
 
+    private long getRemoteBlobstoreUpdateTime() {
+        try (ClientBlobStore blobStore = getClientBlobStore()) {
+            try {
+                return blobStore.getRemoteBlobstoreUpdateTime();
+            } catch (IOException e) {
+                LOG.error("Failed to get remote blobstore modtime", e);

Review comment:
       nit: can change `modtime` to "update time"

##########
File path: storm-client/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java
##########
@@ -182,5 +183,12 @@ public final void setBlobMeta(String key, SettableBlobMeta 
meta) throws Authoriz
         void run(ClientBlobStore blobStore) throws Exception;
     }
 
-
+    /**
+     * Client facing API to get the last update time of existing blobs in a 
blobstore.  This only required for use on

Review comment:
       nit: This is

##########
File path: 
storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
##########
@@ -295,6 +301,90 @@ public String getKey() {
 
     public abstract boolean isFullyDownloaded();
 
+    /**
+     * Checks to see if the local blob requires update with respect to a 
remote blob.
+     *
+     * @param blobStore the client blobstore
+     * @param remoteBlobstoreUpdateTime last update time of remote blobstore
+     * @return true of the local blob requires update, false otherwise.
+     *
+     * @throws KeyNotFoundException if the remote blob is missing
+     * @throws AuthorizationException if authorization is failed
+     */
+    boolean requiresUpdate(ClientBlobStore blobStore, long 
remoteBlobstoreUpdateTime) throws KeyNotFoundException, AuthorizationException {
+        if (!this.isUsed()) {
+            return false;
+        }
+
+        if (!this.isFullyDownloaded()) {
+            return true;
+        }
+
+        // If we are already up to date with respect to the remote blob store, 
don't query
+        // the remote blobstore for the remote file.  This reduces Hadoop 
namenode impact of
+        // 100's of supervisors querying multiple blobs.
+        if (remoteBlobstoreUpdateTime > 0 && this.localUpdateTime == 
remoteBlobstoreUpdateTime) {
+            LOG.debug("{} is up to date, blob updatedModTime matches remote 
timestamp {}", this, remoteBlobstoreUpdateTime);
+            return false;
+        }
+
+        long localVersion = this.getLocalVersion();
+        long remoteVersion = this.getRemoteVersion(blobStore);
+        if (localVersion != remoteVersion) {
+            return true;
+        } else {
+            // track that we are now up to date with respect to last time the 
remote blobstore was updated
+            this.localUpdateTime = remoteBlobstoreUpdateTime;
+            return false;
+        }
+    }
+
+    /**
+     * Downloads a blob locally.
+     *
+     * @param blobStore the client blobstore
+     * @param remoteBlobstoreModTime last modification time of remote blobstore
+     *
+     * @throws KeyNotFoundException if the remote blob is missing
+     * @throws AuthorizationException if authorization is failed
+     * @throws IOException on errors
+     */
+    private void download(ClientBlobStore blobStore, long 
remoteBlobstoreModTime)

Review comment:
       nit: can be changed to `remoteBlobstoreUpdateTime`

##########
File path: 
storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
##########
@@ -271,29 +270,7 @@ private LocalizedResource getUserFile(String user, String 
key) {
                     long failures = 0;
                     while (!done) {
                         try {
-                            synchronized (blob) {
-                                if (blob.isUsed()) {
-                                    long localVersion = blob.getLocalVersion();
-                                    long remoteVersion = 
blob.getRemoteVersion(blobStore);
-                                    if (localVersion != remoteVersion || 
!blob.isFullyDownloaded()) {
-                                        if (blob.isFullyDownloaded()) {
-                                            //Avoid case of different blob 
version
-                                            // when blob is not downloaded 
(first time download)
-                                            numBlobUpdateVersionChanged.mark();
-                                        }
-                                        Timer.Context t = 
singleBlobLocalizationDuration.time();
-                                        try {
-                                            long newVersion = 
blob.fetchUnzipToTemp(blobStore);
-                                            
blob.informReferencesAndCommitNewVersion(newVersion);
-                                            t.stop();
-                                        } finally {
-                                            blob.cleanupOrphanedData();
-                                        }
-                                    }
-                                } else {
-                                    LOG.debug("Skipping update of unused blob 
{}", blob);
-                                }
-                            }
+                            blob.update(blobStore, remoteBlobstoreModTime);

Review comment:
       nit: can be changed to `remoteBlobstoreUpdateTime`

##########
File path: 
storm-server/src/test/java/org/apache/storm/localizer/LocallyCachedBlobTest.java
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.localizer;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.daemon.supervisor.AdvancedFSOps;
+import org.apache.storm.daemon.supervisor.IAdvancedFSOps;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class LocallyCachedBlobTest {
+    private static ClientBlobStore blobStore = 
Mockito.mock(ClientBlobStore.class);
+    private static PortAndAssignment pna = new PortAndAssignmentImpl(6077, new 
LocalAssignment());
+    private static Map<String, Object> conf = new HashMap<>();
+
+    @Test
+    public void testNotUsed() throws KeyNotFoundException, 
AuthorizationException {
+        LocallyCachedBlob blob = new LocalizedResource("key", 
Paths.get("/bogus"), false,
+                AdvancedFSOps.make(conf), conf, "user1", new 
StormMetricsRegistry());
+        Assert.assertFalse(blob.isUsed());
+        Assert.assertFalse(blob.requiresUpdate(blobStore, -1L));
+    }
+
+    @Test
+    public void testNotDownloaded() throws KeyNotFoundException, 
AuthorizationException {
+        LocallyCachedBlob blob = new LocalizedResource("key", 
Paths.get("/bogus"), false,
+                AdvancedFSOps.make(conf), conf, "user1", new 
StormMetricsRegistry());
+        blob.addReference(pna, null);
+        Assert.assertTrue(blob.isUsed());
+        Assert.assertFalse(blob.isFullyDownloaded());
+        Assert.assertTrue(blob.requiresUpdate(blobStore, -1L));
+    }
+
+    @Test
+    public void testOutOfDate() throws KeyNotFoundException, 
AuthorizationException {
+        TestableBlob blob = new TestableBlob("key", Paths.get("/bogus"), false,
+                AdvancedFSOps.make(conf), conf, "user1", new 
StormMetricsRegistry());
+        blob.addReference(pna, null);
+        Assert.assertTrue(blob.isUsed());
+        Assert.assertTrue(blob.isFullyDownloaded());
+
+        // validate blob needs update due to version mismatch
+        Assert.assertTrue(blob.requiresUpdate(blobStore, -1L));
+
+        // when blob update time matches remote blobstore modtime, validate 
blob
+        // will skip looking at remote version and assume it's up to date
+        blob.localUpdateTime = 101L;
+        Assert.assertFalse(blob.requiresUpdate(blobStore, 101L));
+
+        // now when the mod time on the remote blobstore differs, we should 
again see that the

Review comment:
       nit: can change `mod time` to update time

##########
File path: 
storm-server/src/test/java/org/apache/storm/localizer/LocallyCachedBlobTest.java
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.localizer;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.daemon.supervisor.AdvancedFSOps;
+import org.apache.storm.daemon.supervisor.IAdvancedFSOps;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class LocallyCachedBlobTest {
+    private static ClientBlobStore blobStore = 
Mockito.mock(ClientBlobStore.class);
+    private static PortAndAssignment pna = new PortAndAssignmentImpl(6077, new 
LocalAssignment());
+    private static Map<String, Object> conf = new HashMap<>();
+
+    @Test
+    public void testNotUsed() throws KeyNotFoundException, 
AuthorizationException {
+        LocallyCachedBlob blob = new LocalizedResource("key", 
Paths.get("/bogus"), false,
+                AdvancedFSOps.make(conf), conf, "user1", new 
StormMetricsRegistry());
+        Assert.assertFalse(blob.isUsed());
+        Assert.assertFalse(blob.requiresUpdate(blobStore, -1L));
+    }
+
+    @Test
+    public void testNotDownloaded() throws KeyNotFoundException, 
AuthorizationException {
+        LocallyCachedBlob blob = new LocalizedResource("key", 
Paths.get("/bogus"), false,
+                AdvancedFSOps.make(conf), conf, "user1", new 
StormMetricsRegistry());
+        blob.addReference(pna, null);
+        Assert.assertTrue(blob.isUsed());
+        Assert.assertFalse(blob.isFullyDownloaded());
+        Assert.assertTrue(blob.requiresUpdate(blobStore, -1L));
+    }
+
+    @Test
+    public void testOutOfDate() throws KeyNotFoundException, 
AuthorizationException {
+        TestableBlob blob = new TestableBlob("key", Paths.get("/bogus"), false,
+                AdvancedFSOps.make(conf), conf, "user1", new 
StormMetricsRegistry());
+        blob.addReference(pna, null);
+        Assert.assertTrue(blob.isUsed());
+        Assert.assertTrue(blob.isFullyDownloaded());
+
+        // validate blob needs update due to version mismatch
+        Assert.assertTrue(blob.requiresUpdate(blobStore, -1L));
+
+        // when blob update time matches remote blobstore modtime, validate 
blob

Review comment:
       nit: can change `modtime` to update time

##########
File path: 
storm-server/src/test/java/org/apache/storm/localizer/LocallyCachedBlobTest.java
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.localizer;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.daemon.supervisor.AdvancedFSOps;
+import org.apache.storm.daemon.supervisor.IAdvancedFSOps;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class LocallyCachedBlobTest {
+    private static ClientBlobStore blobStore = 
Mockito.mock(ClientBlobStore.class);
+    private static PortAndAssignment pna = new PortAndAssignmentImpl(6077, new 
LocalAssignment());
+    private static Map<String, Object> conf = new HashMap<>();
+
+    @Test
+    public void testNotUsed() throws KeyNotFoundException, 
AuthorizationException {
+        LocallyCachedBlob blob = new LocalizedResource("key", 
Paths.get("/bogus"), false,
+                AdvancedFSOps.make(conf), conf, "user1", new 
StormMetricsRegistry());
+        Assert.assertFalse(blob.isUsed());
+        Assert.assertFalse(blob.requiresUpdate(blobStore, -1L));
+    }
+
+    @Test
+    public void testNotDownloaded() throws KeyNotFoundException, 
AuthorizationException {
+        LocallyCachedBlob blob = new LocalizedResource("key", 
Paths.get("/bogus"), false,
+                AdvancedFSOps.make(conf), conf, "user1", new 
StormMetricsRegistry());
+        blob.addReference(pna, null);
+        Assert.assertTrue(blob.isUsed());
+        Assert.assertFalse(blob.isFullyDownloaded());
+        Assert.assertTrue(blob.requiresUpdate(blobStore, -1L));
+    }
+
+    @Test
+    public void testOutOfDate() throws KeyNotFoundException, 
AuthorizationException {
+        TestableBlob blob = new TestableBlob("key", Paths.get("/bogus"), false,
+                AdvancedFSOps.make(conf), conf, "user1", new 
StormMetricsRegistry());
+        blob.addReference(pna, null);
+        Assert.assertTrue(blob.isUsed());
+        Assert.assertTrue(blob.isFullyDownloaded());
+
+        // validate blob needs update due to version mismatch
+        Assert.assertTrue(blob.requiresUpdate(blobStore, -1L));
+
+        // when blob update time matches remote blobstore modtime, validate 
blob
+        // will skip looking at remote version and assume it's up to date
+        blob.localUpdateTime = 101L;
+        Assert.assertFalse(blob.requiresUpdate(blobStore, 101L));
+
+        // now when the mod time on the remote blobstore differs, we should 
again see that the
+        // blob version differs from the remote blobstore
+        Assert.assertTrue(blob.requiresUpdate(blobStore, 102L));
+
+        // now validate we don't need any update as versions match, regardless 
of remote blobstore mod time

Review comment:
       nit: can change mod time to update time

##########
File path: 
storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
##########
@@ -295,6 +301,90 @@ public String getKey() {
 
     public abstract boolean isFullyDownloaded();
 
+    /**
+     * Checks to see if the local blob requires update with respect to a 
remote blob.
+     *
+     * @param blobStore the client blobstore
+     * @param remoteBlobstoreUpdateTime last update time of remote blobstore
+     * @return true of the local blob requires update, false otherwise.
+     *
+     * @throws KeyNotFoundException if the remote blob is missing
+     * @throws AuthorizationException if authorization is failed
+     */
+    boolean requiresUpdate(ClientBlobStore blobStore, long 
remoteBlobstoreUpdateTime) throws KeyNotFoundException, AuthorizationException {
+        if (!this.isUsed()) {
+            return false;
+        }
+
+        if (!this.isFullyDownloaded()) {
+            return true;
+        }
+
+        // If we are already up to date with respect to the remote blob store, 
don't query
+        // the remote blobstore for the remote file.  This reduces Hadoop 
namenode impact of
+        // 100's of supervisors querying multiple blobs.
+        if (remoteBlobstoreUpdateTime > 0 && this.localUpdateTime == 
remoteBlobstoreUpdateTime) {
+            LOG.debug("{} is up to date, blob updatedModTime matches remote 
timestamp {}", this, remoteBlobstoreUpdateTime);
+            return false;
+        }
+
+        long localVersion = this.getLocalVersion();
+        long remoteVersion = this.getRemoteVersion(blobStore);
+        if (localVersion != remoteVersion) {
+            return true;
+        } else {
+            // track that we are now up to date with respect to last time the 
remote blobstore was updated
+            this.localUpdateTime = remoteBlobstoreUpdateTime;
+            return false;
+        }
+    }
+
+    /**
+     * Downloads a blob locally.
+     *
+     * @param blobStore the client blobstore
+     * @param remoteBlobstoreModTime last modification time of remote blobstore
+     *
+     * @throws KeyNotFoundException if the remote blob is missing
+     * @throws AuthorizationException if authorization is failed
+     * @throws IOException on errors
+     */
+    private void download(ClientBlobStore blobStore, long 
remoteBlobstoreModTime)
+            throws AuthorizationException, IOException, KeyNotFoundException {
+        if (this.isFullyDownloaded()) {
+            numBlobUpdateVersionChanged.mark();
+        }
+        Timer.Context timer = singleBlobLocalizationDuration.time();
+        try {
+            long newVersion = this.fetchUnzipToTemp(blobStore);
+            this.informReferencesAndCommitNewVersion(newVersion);
+            this.localUpdateTime = remoteBlobstoreModTime;
+            LOG.debug("local blob {} downloaded, in sync with remote blobstore 
to time {}", this, remoteBlobstoreModTime);
+        } finally {
+            timer.stop();
+            this.cleanupOrphanedData();
+        }
+    }
+
+    /**
+     * Checks and downloads a blob locally as necessary.
+     *
+     * @param blobStore the client blobstore
+     * @param remoteBlobstoreModTime last modification time of remote blobstore
+     *
+     * @throws KeyNotFoundException if the remote blob is missing
+     * @throws AuthorizationException if authorization is failed
+     * @throws IOException on errors
+     */
+    public void update(ClientBlobStore blobStore, long remoteBlobstoreModTime)

Review comment:
       nit: can be changed to `remoteBlobstoreUpdateTime`

##########
File path: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
##########
@@ -3815,6 +3828,7 @@ public void setBlobMeta(String key, SettableBlobMeta meta)
         throws AuthorizationException, KeyNotFoundException, TException {
         try {
             blobStore.setBlobMeta(key, meta, getSubject());
+            blobStore.updateLastBlobUpdateTime();

Review comment:
       Do we need this on `updateBlobReplication` method too?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to