This is an automated email from the ASF dual-hosted git repository. srdo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push: new 9c47a02 [STORM-3501] local cluster worker restarts because of missing resources folder (#3120) 9c47a02 is described below commit 9c47a02e52173b55c0184f3d9ecfa4b7d2a79984 Author: Diogo Monteiro <diogo.monteir...@gmail.com> AuthorDate: Sat Aug 31 18:19:14 2019 +0100 [STORM-3501] local cluster worker restarts because of missing resources folder (#3120) * [STORM-3501] local cluster worker restarts because of missing resources folder --- .../storm/daemon/supervisor/AdvancedFSOps.java | 12 +++ .../storm/daemon/supervisor/IAdvancedFSOps.java | 10 +++ .../storm/localizer/LocallyCachedTopologyBlob.java | 9 +- .../java/org/apache/storm/utils/ServerUtils.java | 14 +++ .../apache/storm/localizer/AsyncLocalizerTest.java | 99 ++++++++++++++++++++++ 5 files changed, 141 insertions(+), 3 deletions(-) diff --git a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java index 23e91bf..d2a9ced 100644 --- a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java +++ b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java @@ -97,6 +97,18 @@ public class AdvancedFSOps implements IAdvancedFSOps { } /** + * Moves a file to a given destination. + * + * @param fromFile file to move + * @param toFile where to move it + * @throws IOException on any error + */ + @Override + public void moveFile(File fromFile, File toFile) throws IOException { + Files.move(fromFile.toPath(), toFile.toPath()); + } + + /** * Check whether supports atomic directory move. * @return true if an atomic directory move works, else false */ diff --git a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/IAdvancedFSOps.java b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/IAdvancedFSOps.java index f55ba7f..ccb4a1b 100644 --- a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/IAdvancedFSOps.java +++ b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/IAdvancedFSOps.java @@ -46,6 +46,16 @@ public interface IAdvancedFSOps { */ void moveDirectoryPreferAtomic(File fromDir, File toDir) throws IOException; + + /** + * Moves a file to a given destination. + * + * @param fromFile file to move + * @param toFile where to move it + * @throws IOException on any error + */ + void moveFile(File fromFile, File toFile) throws IOException; + /** * Check whether supports atomic directory move. * @return true if an atomic directory move works, else false diff --git a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java index 4ba41f9..babed56 100644 --- a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java +++ b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java @@ -139,9 +139,8 @@ public class LocallyCachedTopologyBlob extends LocallyCachedBlob { if (isLocalMode && type == TopologyBlobType.TOPO_JAR) { LOG.debug("DOWNLOADING LOCAL JAR to TEMP LOCATION... {}", topologyId); //This is a special case where the jar was not uploaded so we will not download it (it is already on the classpath) - ClassLoader classloader = Thread.currentThread().getContextClassLoader(); String resourcesJar = resourcesJar(); - URL url = classloader.getResource(ServerConfigUtils.RESOURCES_SUBDIR); + URL url = ServerUtils.getResourceFromClassloader(ServerConfigUtils.RESOURCES_SUBDIR); Path extractionDest = topologyBasicBlobsRootDir.resolve(type.getTempExtractionDir(LOCAL_MODE_JAR_VERSION)); if (resourcesJar != null) { LOG.info("Extracting resources from jar at {} to {}", resourcesJar, extractionDest); @@ -154,6 +153,10 @@ public class LocallyCachedTopologyBlob extends LocallyCachedBlob { } else { fsOps.copyDirectory(new File(url.getFile()), extractionDest.toFile()); } + } else if (!fsOps.fileExists(extractionDest)) { + // if we can't find the resources directory in a resources jar or in the classpath just create an empty + // resources directory. This way we can check later that the topology jar was fully downloaded. + fsOps.forceMkdir(extractionDest); } return LOCAL_MODE_JAR_VERSION; } @@ -225,7 +228,7 @@ public class LocallyCachedTopologyBlob extends LocallyCachedBlob { } if (!(isLocalMode && type == TopologyBlobType.TOPO_JAR)) { //Don't try to move the JAR file in local mode, it does not exist because it was not uploaded - Files.move(tempLoc, dest); + fsOps.moveFile(tempLoc.toFile(), dest.toFile()); } synchronized (LocallyCachedTopologyBlob.class) { //This is a bit ugly, but it works. In order to maintain the same directory structure that existed before diff --git a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java index eb72594..c026160 100644 --- a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java +++ b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.RandomAccessFile; +import java.net.URL; import java.nio.file.FileSystems; import java.nio.file.Files; import java.nio.file.Path; @@ -234,6 +235,14 @@ public class ServerUtils { return _instance.currentClasspathImpl(); } + + /** + * Returns the current thread classloader. + */ + public static URL getResourceFromClassloader(String name) { + return _instance.getResourceFromClassloaderImpl(name); + } + /** * Determines if a zip archive contains a particular directory. * @@ -748,6 +757,11 @@ public class ServerUtils { return System.getProperty("java.class.path"); } + + public URL getResourceFromClassloaderImpl(String name) { + return Thread.currentThread().getContextClassLoader().getResource(name); + } + public void downloadResourcesAsSupervisorImpl(String key, String localFile, ClientBlobStore cb) throws AuthorizationException, KeyNotFoundException, IOException { final int maxRetryAttempts = 2; diff --git a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java index 8228702..550c808 100644 --- a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java +++ b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java @@ -18,6 +18,7 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -66,6 +67,7 @@ import org.slf4j.LoggerFactory; import static org.apache.storm.blobstore.BlobStoreAclHandler.WORLD_EVERYTHING; import static org.apache.storm.localizer.LocalizedResource.USERCACHE; +import static org.apache.storm.localizer.LocallyCachedTopologyBlob.LOCAL_MODE_JAR_VERSION; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -249,6 +251,103 @@ public class AsyncLocalizerTest { } } + + @Test + public void testRequestDownloadTopologyBlobsLocalMode() throws Exception { + // tests download of topology blobs in local mode on a topology without resources folder + final String topoId = "TOPO-12345"; + final String user = "user"; + LocalAssignment la = new LocalAssignment(); + la.set_topology_id(topoId); + la.set_owner(user); + ExecutorInfo ei = new ExecutorInfo(); + ei.set_task_start(1); + ei.set_task_end(1); + la.add_to_executors(ei); + final String topoName = "TOPO"; + final int port = 8080; + final String simpleLocalName = "simple.txt"; + final String simpleKey = "simple"; + + final String stormLocal = "/tmp/storm-local/"; + final File userDir = new File(stormLocal, user); + final String stormRoot = stormLocal + topoId + "/"; + + final String localizerRoot = getTestLocalizerRoot(); + + final StormTopology st = new StormTopology(); + st.set_spouts(new HashMap<>()); + st.set_bolts(new HashMap<>()); + st.set_state_spouts(new HashMap<>()); + + Map<String, Map<String, Object>> topoBlobMap = new HashMap<>(); + Map<String, Object> simple = new HashMap<>(); + simple.put("localname", simpleLocalName); + simple.put("uncompress", false); + topoBlobMap.put(simpleKey, simple); + + Map<String, Object> conf = new HashMap<>(); + conf.put(Config.STORM_LOCAL_DIR, stormLocal); + conf.put(Config.STORM_CLUSTER_MODE, "local"); + AdvancedFSOps ops = mock(AdvancedFSOps.class); + ConfigUtils mockedCU = mock(ConfigUtils.class); + ServerUtils mockedSU = mock(ServerUtils.class); + + Map<String, Object> topoConf = new HashMap<>(conf); + topoConf.put(Config.TOPOLOGY_BLOBSTORE_MAP, topoBlobMap); + topoConf.put(Config.TOPOLOGY_NAME, topoName); + + List<LocalizedResource> localizedList = new ArrayList<>(); + StormMetricsRegistry metricsRegistry = new StormMetricsRegistry(); + LocalizedResource simpleLocal = new LocalizedResource(simpleKey, Paths.get(localizerRoot), false, ops, conf, user, metricsRegistry); + localizedList.add(simpleLocal); + + AsyncLocalizer bl = spy(new AsyncLocalizer(conf, ops, localizerRoot, metricsRegistry)); + ConfigUtils orig = ConfigUtils.setInstance(mockedCU); + ServerUtils origSU = ServerUtils.setInstance(mockedSU); + + try { + when(mockedCU.supervisorStormDistRootImpl(conf, topoId)).thenReturn(stormRoot); + when(mockedCU.readSupervisorStormConfImpl(conf, topoId)).thenReturn(topoConf); + when(mockedCU.readSupervisorTopologyImpl(conf, topoId, ops)).thenReturn(st); + + doReturn(mockblobstore).when(bl).getClientBlobStore(); + doReturn(userDir).when(bl).getLocalUserFileCacheDir(user); + doReturn(localizedList).when(bl).getBlobs(any(List.class), any(), any()); + doReturn(mock(OutputStream.class)).when(ops).getOutputStream(any()); + + ReadableBlobMeta blobMeta = new ReadableBlobMeta(); + blobMeta.set_version(1); + doReturn(blobMeta).when(mockblobstore).getBlobMeta(any()); + when(mockblobstore.getBlob(any())).thenAnswer(invocation -> new TestInputStreamWithMeta(LOCAL_MODE_JAR_VERSION)); + + Future<Void> f = bl.requestDownloadTopologyBlobs(la, port, null); + f.get(20, TimeUnit.SECONDS); + + verify(bl).getLocalUserFileCacheDir(user); + + verify(ops).fileExists(userDir); + verify(ops).forceMkdir(userDir); + + verify(bl).getBlobs(any(List.class), any(), any()); + + Path extractionDir = Paths.get(stormRoot, + LocallyCachedTopologyBlob.TopologyBlobType.TOPO_JAR.getTempExtractionDir(LOCAL_MODE_JAR_VERSION)); + + // make sure resources dir is created. + verify(ops).forceMkdir(extractionDir); + + } finally { + try { + ConfigUtils.setInstance(orig); + ServerUtils.setInstance(origSU); + bl.close(); + } catch (Throwable e) { + LOG.error("ERROR trying to close an object", e); + } + } + } + @Before public void setUp() throws Exception { baseDir = new File(System.getProperty("java.io.tmpdir") + "/blob-store-localizer-test-" + UUID.randomUUID());