Repository: storm Updated Branches: refs/heads/1.x-branch a1a4df7a1 -> 2d9f9036f
STORM-1602 Blobstore UTs are failed on Windows * ensures objects of InputStream / OutputStream are closed after using * clojure: with-open * java: try-with-resource * skip checking symbolic link in LocalizerTest when on Windows * Windows seems not handle symbolic link in compressed file properly Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8965a6c2 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8965a6c2 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8965a6c2 Branch: refs/heads/1.x-branch Commit: 8965a6c2459841c607fb9587897502581f1e3cdf Parents: eeeb7b9 Author: Jungtaek Lim <kabh...@gmail.com> Authored: Thu Mar 17 15:53:15 2016 +0900 Committer: Jungtaek Lim <kabh...@gmail.com> Committed: Thu Mar 17 15:53:15 2016 +0900 ---------------------------------------------------------------------- .../clj/org/apache/storm/daemon/supervisor.clj | 6 +- .../org/apache/storm/blobstore/BlobStore.java | 5 + .../apache/storm/blobstore/BlobStoreTest.java | 171 +++++++++---------- .../apache/storm/localizer/LocalizerTest.java | 14 +- 4 files changed, 105 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/8965a6c2/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj index 1287d77..dc47988 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj @@ -1168,8 +1168,10 @@ blob-store (Utils/getNimbusBlobStore conf master-code-dir nil)] (try (FileUtils/forceMkdir (File. tmproot)) - (.readBlobTo blob-store (master-stormcode-key storm-id) (FileOutputStream. (supervisor-stormcode-path tmproot)) nil) - (.readBlobTo blob-store (master-stormconf-key storm-id) (FileOutputStream. (supervisor-stormconf-path tmproot)) nil) + (with-open [fos-storm-code (FileOutputStream. (supervisor-stormcode-path tmproot)) + fos-storm-conf (FileOutputStream. (supervisor-stormconf-path tmproot))] + (.readBlobTo blob-store (master-stormcode-key storm-id) fos-storm-code nil) + (.readBlobTo blob-store (master-stormconf-key storm-id) fos-storm-conf nil)) (finally (.shutdown blob-store))) (FileUtils/moveDirectory (File. tmproot) (File. stormroot)) http://git-wip-us.apache.org/repos/asf/storm/blob/8965a6c2/storm-core/src/jvm/org/apache/storm/blobstore/BlobStore.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/BlobStore.java b/storm-core/src/jvm/org/apache/storm/blobstore/BlobStore.java index 09093a2..14879b4 100644 --- a/storm-core/src/jvm/org/apache/storm/blobstore/BlobStore.java +++ b/storm-core/src/jvm/org/apache/storm/blobstore/BlobStore.java @@ -396,6 +396,11 @@ public abstract class BlobStore implements Shutdownable { public long getFileLength() throws IOException { return part.getFileLength(); } + + @Override + public void close() throws IOException { + in.close(); + } } /** http://git-wip-us.apache.org/repos/asf/storm/blob/8965a6c2/storm-core/test/jvm/org/apache/storm/blobstore/BlobStoreTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/blobstore/BlobStoreTest.java b/storm-core/test/jvm/org/apache/storm/blobstore/BlobStoreTest.java index 712537a..5f6f50a 100644 --- a/storm-core/test/jvm/org/apache/storm/blobstore/BlobStoreTest.java +++ b/storm-core/test/jvm/org/apache/storm/blobstore/BlobStoreTest.java @@ -181,30 +181,30 @@ public class BlobStoreTest { Subject admin = getSubject("admin"); assertStoreHasExactly(store); SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); - AtomicOutputStream out = store.createBlob("test", metadata, admin); - assertStoreHasExactly(store, "test"); - out.write(1); - out.close(); + try (AtomicOutputStream out = store.createBlob("test", metadata, admin)) { + assertStoreHasExactly(store, "test"); + out.write(1); + } store.deleteBlob("test", admin); //Test for Supervisor Admin Subject supervisor = getSubject("supervisor"); assertStoreHasExactly(store); metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); - out = store.createBlob("test", metadata, supervisor); - assertStoreHasExactly(store, "test"); - out.write(1); - out.close(); + try (AtomicOutputStream out = store.createBlob("test", metadata, supervisor)) { + assertStoreHasExactly(store, "test"); + out.write(1); + } store.deleteBlob("test", supervisor); //Test for Nimbus itself as a user Subject nimbus = getNimbusSubject(); assertStoreHasExactly(store); metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); - out = store.createBlob("test", metadata, nimbus); - assertStoreHasExactly(store, "test"); - out.write(1); - out.close(); + try (AtomicOutputStream out = store.createBlob("test", metadata, nimbus)) { + assertStoreHasExactly(store, "test"); + out.write(1); + } store.deleteBlob("test", nimbus); // Test with a dummy test_subject for cases where subject !=null (security turned on) @@ -214,9 +214,9 @@ public class BlobStoreTest { // Tests for case when subject != null (security turned on) and // acls for the blob are set to WORLD_EVERYTHING metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING); - out = store.createBlob("test", metadata, who); - out.write(1); - out.close(); + try (AtomicOutputStream out = store.createBlob("test", metadata, who)) { + out.write(1); + } assertStoreHasExactly(store, "test"); // Testing whether acls are set to WORLD_EVERYTHING assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)")); @@ -230,9 +230,9 @@ public class BlobStoreTest { // acls are not set for the blob (DEFAULT) LOG.info("Creating test again"); metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); - out = store.createBlob("test", metadata, who); - out.write(2); - out.close(); + try (AtomicOutputStream out = store.createBlob("test", metadata, who)) { + out.write(2); + } assertStoreHasExactly(store, "test"); // Testing whether acls are set to WORLD_EVERYTHING. Here the acl should not contain WORLD_EVERYTHING because // the subject is neither null nor empty. The ACL should however contain USER_EVERYTHING as user needs to have @@ -241,28 +241,29 @@ public class BlobStoreTest { readAssertEqualsWithAuth(store, who, "test", 2); LOG.info("Updating test"); - out = store.updateBlob("test", who); - out.write(3); - out.close(); + try (AtomicOutputStream out = store.updateBlob("test", who)) { + out.write(3); + } assertStoreHasExactly(store, "test"); readAssertEqualsWithAuth(store, who, "test", 3); LOG.info("Updating test again"); - out = store.updateBlob("test", who); - out.write(4); - out.flush(); - LOG.info("SLEEPING"); - Thread.sleep(2); - assertStoreHasExactly(store, "test"); - readAssertEqualsWithAuth(store, who, "test", 3); + try (AtomicOutputStream out = store.updateBlob("test", who)) { + out.write(4); + out.flush(); + LOG.info("SLEEPING"); + Thread.sleep(2); + assertStoreHasExactly(store, "test"); + readAssertEqualsWithAuth(store, who, "test", 3); + } // Test for subject with no principals and acls set to WORLD_EVERYTHING who = new Subject(); metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING); LOG.info("Creating test"); - out = store.createBlob("test-empty-subject-WE", metadata, who); - out.write(2); - out.close(); + try (AtomicOutputStream out = store.createBlob("test-empty-subject-WE", metadata, who)) { + out.write(2); + } assertStoreHasExactly(store, "test-empty-subject-WE", "test"); // Testing whether acls are set to WORLD_EVERYTHING assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)")); @@ -272,9 +273,10 @@ public class BlobStoreTest { who = new Subject(); metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); LOG.info("Creating other"); - out = store.createBlob("test-empty-subject-DEF", metadata, who); - out.write(2); - out.close(); + + try (AtomicOutputStream out = store.createBlob("test-empty-subject-DEF", metadata, who)) { + out.write(2); + } assertStoreHasExactly(store, "test-empty-subject-DEF", "test", "test-empty-subject-WE"); // Testing whether acls are set to WORLD_EVERYTHING assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)")); @@ -285,12 +287,6 @@ public class BlobStoreTest { } else { fail("Error the blobstore is of unknowntype"); } - try { - out.close(); - } catch (IOException e) { - // This is likely to happen when we try to commit something that - // was cleaned up. This is expected and acceptable. - } } public void testBasic(BlobStore store) throws Exception { @@ -300,9 +296,9 @@ public class BlobStoreTest { // acls for the blob are set to WORLD_EVERYTHING SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler .WORLD_EVERYTHING); - AtomicOutputStream out = store.createBlob("test", metadata, null); - out.write(1); - out.close(); + try (AtomicOutputStream out = store.createBlob("test", metadata, null)) { + out.write(1); + } assertStoreHasExactly(store, "test"); // Testing whether acls are set to WORLD_EVERYTHING assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)")); @@ -316,37 +312,38 @@ public class BlobStoreTest { // update blob interface metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING); LOG.info("Creating test again"); - out = store.createBlob("test", metadata, null); - out.write(2); - out.close(); + try (AtomicOutputStream out = store.createBlob("test", metadata, null)) { + out.write(2); + } assertStoreHasExactly(store, "test"); if (store instanceof LocalFsBlobStore) { assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)")); } readAssertEquals(store, "test", 2); LOG.info("Updating test"); - out = store.updateBlob("test", null); - out.write(3); - out.close(); + try (AtomicOutputStream out = store.updateBlob("test", null)) { + out.write(3); + } assertStoreHasExactly(store, "test"); readAssertEquals(store, "test", 3); LOG.info("Updating test again"); - out = store.updateBlob("test", null); - out.write(4); - out.flush(); - LOG.info("SLEEPING"); - Thread.sleep(2); + try (AtomicOutputStream out = store.updateBlob("test", null)) { + out.write(4); + out.flush(); + LOG.info("SLEEPING"); + Thread.sleep(2); + } // Tests for case when subject == null (security turned off) and // acls for the blob are set to DEFAULT (Empty ACL List) only for LocalFsBlobstore if (store instanceof LocalFsBlobStore) { metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); LOG.info("Creating test for empty acls when security is off"); - out = store.createBlob("test-empty-acls", metadata, null); - LOG.info("metadata {}", metadata); - out.write(2); - out.close(); + try (AtomicOutputStream out = store.createBlob("test-empty-acls", metadata, null)) { + LOG.info("metadata {}", metadata); + out.write(2); + } assertStoreHasExactly(store, "test-empty-acls", "test"); // Testing whether acls are set to WORLD_EVERYTHING, Here we are testing only for LocalFsBlobstore // as the HdfsBlobstore gets the subject information of the local system user and behaves as it is @@ -362,12 +359,6 @@ public class BlobStoreTest { } else { fail("Error the blobstore is of unknowntype"); } - try { - out.close(); - } catch (IOException e) { - // This is likely to happen when we try to commit something that - // was cleaned up. This is expected and acceptable. - } } @@ -375,26 +366,26 @@ public class BlobStoreTest { assertStoreHasExactly(store); LOG.info("Creating test"); - AtomicOutputStream out = store.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler - .WORLD_EVERYTHING), null); - out.write(1); - out.close(); + try (AtomicOutputStream out = store.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler + .WORLD_EVERYTHING), null)) { + out.write(1); + } assertStoreHasExactly(store, "test"); readAssertEquals(store, "test", 1); LOG.info("Creating other"); - out = store.createBlob("other", new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING), - null); - out.write(2); - out.close(); + try (AtomicOutputStream out = store.createBlob("other", new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING), + null)) { + out.write(2); + } assertStoreHasExactly(store, "test", "other"); readAssertEquals(store, "test", 1); readAssertEquals(store, "other", 2); LOG.info("Updating other"); - out = store.updateBlob("other", null); - out.write(5); - out.close(); + try (AtomicOutputStream out = store.updateBlob("other", null)) { + out.write(5); + } assertStoreHasExactly(store, "test", "other"); readAssertEquals(store, "test", 1); readAssertEquals(store, "other", 5); @@ -405,18 +396,18 @@ public class BlobStoreTest { readAssertEquals(store, "other", 5); LOG.info("Creating test again"); - out = store.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING), - null); - out.write(2); - out.close(); + try (AtomicOutputStream out = store.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING), + null)) { + out.write(2); + } assertStoreHasExactly(store, "test", "other"); readAssertEquals(store, "test", 2); readAssertEquals(store, "other", 5); LOG.info("Updating test"); - out = store.updateBlob("test", null); - out.write(3); - out.close(); + try (AtomicOutputStream out = store.updateBlob("test", null)) { + out.write(3); + } assertStoreHasExactly(store, "test", "other"); readAssertEquals(store, "test", 3); readAssertEquals(store, "other", 5); @@ -427,7 +418,9 @@ public class BlobStoreTest { readAssertEquals(store, "test", 3); LOG.info("Updating test again"); - out = store.updateBlob("test", null); + + // intended to not guarding with try-with-resource since otherwise test will fail + AtomicOutputStream out = store.updateBlob("test", null); out.write(4); out.flush(); LOG.info("SLEEPING"); @@ -451,10 +444,12 @@ public class BlobStoreTest { public void testGetFileLength() throws AuthorizationException, KeyNotFoundException, KeyAlreadyExistsException, IOException { LocalFsBlobStore store = initLocalFs(); - AtomicOutputStream out = store.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler - .WORLD_EVERYTHING), null); - out.write(1); - out.close(); - assertEquals(1, store.getBlob("test", null).getFileLength()); + try (AtomicOutputStream out = store.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler + .WORLD_EVERYTHING), null)) { + out.write(1); + } + try (InputStreamWithMeta blobInputStream = store.getBlob("test", null)) { + assertEquals(1, blobInputStream.getFileLength()); + } } } http://git-wip-us.apache.org/repos/asf/storm/blob/8965a6c2/storm-core/test/jvm/org/apache/storm/localizer/LocalizerTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/localizer/LocalizerTest.java b/storm-core/test/jvm/org/apache/storm/localizer/LocalizerTest.java index 096c4b0..45ba108 100644 --- a/storm-core/test/jvm/org/apache/storm/localizer/LocalizerTest.java +++ b/storm-core/test/jvm/org/apache/storm/localizer/LocalizerTest.java @@ -110,7 +110,7 @@ public class LocalizerTest { @Before public void setUp() throws Exception { - baseDir = new File("/tmp/blob-store-localizer-test-"+ UUID.randomUUID()); + baseDir = new File(System.getProperty("java.io.tmpdir") + "/blob-store-localizer-test-"+ UUID.randomUUID()); if (!baseDir.mkdir()) { throw new IOException("failed to create base directory"); } @@ -259,6 +259,11 @@ public class LocalizerTest { // archive passed in must contain symlink named tmptestsymlink if not a zip file public void testArchives(String archivePath, boolean supportSymlinks, int size) throws Exception { + if (isOnWindows()) { + // Windows should set this to false cause symlink in compressed file doesn't work properly. + supportSymlinks = false; + } + Map conf = new HashMap(); // set clean time really high so doesn't kick in conf.put(Config.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000); @@ -664,4 +669,11 @@ public class LocalizerTest { assertEquals("blob version not correct", 3, Utils.localVersionOfBlob(keyFile.toString())); assertTrue("blob file with version 3 not created", new File(keyFile + ".3").exists()); } + + private boolean isOnWindows() { + if (System.getenv("OS") != null) { + return System.getenv("OS").equals("Windows_NT"); + } + return false; + } }