Repository: storm
Updated Branches:
  refs/heads/master ce2d49b92 -> 6babbb08c


http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/test/jvm/backtype/storm/blobstore/ClientBlobStoreTest.java
----------------------------------------------------------------------
diff --git 
a/storm-core/test/jvm/backtype/storm/blobstore/ClientBlobStoreTest.java 
b/storm-core/test/jvm/backtype/storm/blobstore/ClientBlobStoreTest.java
new file mode 100644
index 0000000..17a8f71
--- /dev/null
+++ b/storm-core/test/jvm/backtype/storm/blobstore/ClientBlobStoreTest.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.generated.AccessControl;
+import backtype.storm.generated.AccessControlType;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.ReadableBlobMeta;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.utils.NimbusClient;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+public class ClientBlobStoreTest {
+
+  private ClientBlobStore client;
+  public class TestClientBlobStore extends ClientBlobStore {
+
+    private Map<String, SettableBlobMeta> allBlobs;
+    @Override
+    public void prepare(Map conf) {
+      this.conf = conf;
+      allBlobs = new HashMap<String, SettableBlobMeta>();
+    }
+
+    @Override
+    protected AtomicOutputStream createBlobToExtend(String key, 
SettableBlobMeta meta) throws AuthorizationException, KeyAlreadyExistsException 
{
+      allBlobs.put(key, meta);
+      return null;
+    }
+
+    @Override
+    public AtomicOutputStream updateBlob(String key) throws 
AuthorizationException, KeyNotFoundException {
+      return null;
+    }
+
+    @Override
+    public ReadableBlobMeta getBlobMeta(String key) throws 
AuthorizationException, KeyNotFoundException {
+      ReadableBlobMeta reableMeta = null;
+      if (allBlobs.containsKey(key)) {
+        reableMeta = new ReadableBlobMeta();
+        reableMeta.set_settable(allBlobs.get(key));
+      }
+      return reableMeta;
+    }
+
+    @Override
+    protected void setBlobMetaToExtend(String key, SettableBlobMeta meta) 
throws AuthorizationException, KeyNotFoundException {
+    }
+
+    @Override
+    public void deleteBlob(String key) throws AuthorizationException, 
KeyNotFoundException {
+    }
+
+    @Override
+    public InputStreamWithMeta getBlob(String key) throws 
AuthorizationException, KeyNotFoundException {
+      return null;
+    }
+
+    @Override
+    public Iterator<String> listKeys() {
+      return null;
+    }
+
+    @Override
+    public void shutdown() {
+    }
+
+    @Override
+    public int getBlobReplication(String key) {
+      return -1;
+    }
+
+    @Override
+    public int updateBlobReplication(String key, int replication) {
+      return -1;
+    }
+
+    @Override
+    public boolean setClient(Map conf, NimbusClient client) {
+      return false;
+    }
+
+    @Override
+    public void createStateInZookeeper(String key) {
+    }
+  }
+
+  @Before
+  public void setUp() throws Exception {
+
+    client = new TestClientBlobStore();
+    Map conf = new HashMap<String,String>();
+    client.prepare(conf);
+
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    client = null;
+  }
+
+  @Test(expected=AuthorizationException.class)
+  public void testDuplicateACLsForCreate() throws Exception {
+    SettableBlobMeta meta = new SettableBlobMeta();
+    AccessControl submitterAcl = 
BlobStoreAclHandler.parseAccessControl("u:tester:rwa");
+    meta.add_to_acl(submitterAcl);
+    AccessControl duplicateAcl = 
BlobStoreAclHandler.parseAccessControl("u:tester:r--");
+    meta.add_to_acl(duplicateAcl);
+    String testKey = "testDuplicateACLsBlobKey";
+    client.createBlob(testKey, meta);
+  }
+
+  @Test
+  public void testGoodACLsForCreate() throws Exception {
+    SettableBlobMeta meta = new SettableBlobMeta();
+    AccessControl submitterAcl = 
BlobStoreAclHandler.parseAccessControl("u:tester:rwa");
+    meta.add_to_acl(submitterAcl);
+    String testKey = "testBlobKey";
+    client.createBlob(testKey, meta);
+    validatedBlobAcls(testKey);
+  }
+
+  @Test(expected=AuthorizationException.class)
+  public void testDuplicateACLsForSetBlobMeta() throws Exception {
+    String testKey = "testDuplicateACLsBlobKey";
+    SettableBlobMeta meta = new SettableBlobMeta();
+    createTestBlob(testKey, meta);
+    AccessControl duplicateAcl = 
BlobStoreAclHandler.parseAccessControl("u:tester:r--");
+    meta.add_to_acl(duplicateAcl);
+    client.setBlobMeta(testKey, meta);
+  }
+
+  @Test
+  public void testGoodACLsForSetBlobMeta() throws Exception {
+    String testKey = "testBlobKey";
+    SettableBlobMeta meta = new SettableBlobMeta();
+    createTestBlob(testKey, meta);
+    meta.add_to_acl(BlobStoreAclHandler.parseAccessControl("u:nextuser:r--"));
+    client.setBlobMeta(testKey,meta);
+    validatedBlobAcls(testKey);
+  }
+
+  private void createTestBlob(String testKey, SettableBlobMeta meta) throws 
AuthorizationException, KeyAlreadyExistsException {
+    AccessControl submitterAcl = 
BlobStoreAclHandler.parseAccessControl("u:tester:rwa");
+    meta.add_to_acl(submitterAcl);
+    client.createBlob(testKey, meta);
+  }
+
+  private void validatedBlobAcls(String testKey) throws KeyNotFoundException, 
AuthorizationException {
+    ReadableBlobMeta blobMeta = client.getBlobMeta(testKey);
+    Assert.assertNotNull("The blob" + testKey + "does not have any readable 
blobMeta.", blobMeta);
+    SettableBlobMeta settableBlob = blobMeta.get_settable();
+    Assert.assertNotNull("The blob" + testKey + "does not have any settable 
blobMeta.", settableBlob);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/test/jvm/backtype/storm/localizer/LocalizedResourceRetentionSetTest.java
----------------------------------------------------------------------
diff --git 
a/storm-core/test/jvm/backtype/storm/localizer/LocalizedResourceRetentionSetTest.java
 
b/storm-core/test/jvm/backtype/storm/localizer/LocalizedResourceRetentionSetTest.java
new file mode 100644
index 0000000..15f3da3
--- /dev/null
+++ 
b/storm-core/test/jvm/backtype/storm/localizer/LocalizedResourceRetentionSetTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.localizer;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+public class LocalizedResourceRetentionSetTest {
+
+  @Test
+  public void testAddResources() throws Exception {
+    LocalizedResourceRetentionSet lrretset = new 
LocalizedResourceRetentionSet(10);
+    LocalizedResourceSet lrset = new LocalizedResourceSet("user1");
+    LocalizedResource localresource1 = new LocalizedResource("key1", 
"testfile1", false, "topo1");
+    LocalizedResource localresource2 = new LocalizedResource("key2", 
"testfile2", false, "topo1");
+    // check adding reference to local resource with topology of same name
+    localresource2.addReference(("topo2"));
+
+    lrset.addResource("key1", localresource1, false);
+    lrset.addResource("key2", localresource2, false);
+    lrretset.addResources(lrset);
+    assertEquals("number to clean is not 0", 0, 
lrretset.getSizeWithNoReferences());
+    localresource1.removeReference(("topo1"));
+    lrretset.addResources(lrset);
+    assertEquals("number to clean is not 1", 1, 
lrretset.getSizeWithNoReferences());
+    localresource2.removeReference(("topo1"));
+    lrretset.addResources(lrset);
+    assertEquals("number to clean is not 1", 1, 
lrretset.getSizeWithNoReferences());
+    localresource2.removeReference(("topo2"));
+    lrretset.addResources(lrset);
+    assertEquals("number to clean is not 2", 2, 
lrretset.getSizeWithNoReferences());
+  }
+
+  @Test
+  public void testCleanup() throws Exception {
+    LocalizedResourceRetentionSet lrretset = spy(new 
LocalizedResourceRetentionSet(10));
+    LocalizedResourceSet lrset = new LocalizedResourceSet("user1");
+    // no reference to key1
+    LocalizedResource localresource1 = new LocalizedResource("key1", 
"testfile1", false);
+    localresource1.setSize(10);
+    // no reference to archive1
+    LocalizedResource archiveresource1 = new LocalizedResource("archive1", 
"testarchive1", true);
+    archiveresource1.setSize(20);
+    // reference to key2
+    LocalizedResource localresource2 = new LocalizedResource("key2", 
"testfile2", false, "topo1");
+    // check adding reference to local resource with topology of same name
+    localresource2.addReference(("topo1"));
+    localresource2.setSize(10);
+    lrset.addResource("key1", localresource1, false);
+    lrset.addResource("key2", localresource2, false);
+    lrset.addResource("archive1", archiveresource1, true);
+
+    lrretset.addResources(lrset);
+    assertEquals("number to clean is not 2", 2, 
lrretset.getSizeWithNoReferences());
+
+    // shouldn't change number since file doesn't exist and delete fails
+    lrretset.cleanup();
+    assertEquals("resource cleaned up", 2, lrretset.getSizeWithNoReferences());
+
+    // make deleteResource return true even though file doesn't exist
+    when(lrretset.deleteResource(localresource1)).thenReturn(true);
+    when(lrretset.deleteResource(localresource2)).thenReturn(true);
+    when(lrretset.deleteResource(archiveresource1)).thenReturn(true);
+    lrretset.cleanup();
+    assertEquals("resource not cleaned up", 0, 
lrretset.getSizeWithNoReferences());
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/test/jvm/backtype/storm/localizer/LocalizedResourceSetTest.java
----------------------------------------------------------------------
diff --git 
a/storm-core/test/jvm/backtype/storm/localizer/LocalizedResourceSetTest.java 
b/storm-core/test/jvm/backtype/storm/localizer/LocalizedResourceSetTest.java
new file mode 100644
index 0000000..839113f
--- /dev/null
+++ b/storm-core/test/jvm/backtype/storm/localizer/LocalizedResourceSetTest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.localizer;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class LocalizedResourceSetTest {
+
+  private final String user1 = "user1";
+
+  @Test
+  public void testGetUser() throws Exception {
+    LocalizedResourceSet lrset = new LocalizedResourceSet(user1);
+    assertEquals("user is wrong", user1, lrset.getUser());
+  }
+
+  @Test
+  public void testGetSize() throws Exception {
+    LocalizedResourceSet lrset = new LocalizedResourceSet(user1);
+    LocalizedResource localresource1 = new LocalizedResource("key1", 
"testfile1", false, "topo1");
+    LocalizedResource localresource2 = new LocalizedResource("key2", 
"testfile2", true, "topo1");
+    assertEquals("size is wrong", 0, lrset.getSize());
+    lrset.addResource("key1", localresource1, false);
+    assertEquals("size is wrong", 1, lrset.getSize());
+    lrset.addResource("key2", localresource2, true);
+    assertEquals("size is wrong", 2, lrset.getSize());
+  }
+
+  @Test
+  public void testGet() throws Exception {
+    LocalizedResourceSet lrset = new LocalizedResourceSet(user1);
+    LocalizedResource localresource1 = new LocalizedResource("key1", 
"testfile1", false, "topo1");
+    LocalizedResource localresource2 = new LocalizedResource("key2", 
"testfile2", true, "topo1");
+    lrset.addResource("key1", localresource1, false);
+    lrset.addResource("key2", localresource2, true);
+    assertEquals("get doesn't return same object", localresource1, 
lrset.get("key1", false));
+    assertEquals("get doesn't return same object", localresource2, 
lrset.get("key2", true));
+
+  }
+
+  @Test
+  public void testExists() throws Exception {
+    LocalizedResourceSet lrset = new LocalizedResourceSet(user1);
+    LocalizedResource localresource1 = new LocalizedResource("key1", 
"testfile1", false, "topo1");
+    LocalizedResource localresource2 = new LocalizedResource("key2", 
"testfile2", true, "topo1");
+    lrset.addResource("key1", localresource1, false);
+    lrset.addResource("key2", localresource2, true);
+    assertEquals("doesn't exist", true, lrset.exists("key1", false));
+    assertEquals("doesn't exist", true, lrset.exists("key2", true));
+    boolean val = lrset.remove(localresource1);
+    assertTrue("remove failed", val);
+    assertEquals("does exist", false, lrset.exists("key1", false));
+    assertEquals("doesn't exist", true, lrset.exists("key2", true));
+    val = lrset.remove(localresource1);
+    assertFalse("remove success when shouldn't have been", val);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/test/jvm/backtype/storm/localizer/LocalizerTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/localizer/LocalizerTest.java 
b/storm-core/test/jvm/backtype/storm/localizer/LocalizerTest.java
new file mode 100644
index 0000000..b47a3b4
--- /dev/null
+++ b/storm-core/test/jvm/backtype/storm/localizer/LocalizerTest.java
@@ -0,0 +1,671 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.localizer;
+
+import backtype.storm.Config;
+import backtype.storm.blobstore.BlobStoreAclHandler;
+import backtype.storm.blobstore.ClientBlobStore;
+import backtype.storm.blobstore.InputStreamWithMeta;
+import backtype.storm.blobstore.LocalFsBlobStore;
+import backtype.storm.generated.AccessControl;
+import backtype.storm.generated.AccessControlType;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.generated.ReadableBlobMeta;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.utils.Utils;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.util.*;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+
+public class LocalizerTest {
+
+  private File baseDir;
+
+  private final String user1 = "user1";
+  private final String user2 = "user2";
+  private final String user3 = "user3";
+
+  ClientBlobStore mockblobstore = mock(ClientBlobStore.class);
+
+
+  class TestLocalizer extends Localizer {
+
+    TestLocalizer(Map conf, String baseDir) {
+      super(conf, baseDir);
+    }
+
+    @Override
+    protected ClientBlobStore getClientBlobStore() {
+      return mockblobstore;
+    }
+  }
+
+  class TestInputStreamWithMeta extends InputStreamWithMeta {
+    private InputStream iostream;
+    private byte[] buffer = null;
+    private int offset = 0;
+    private int end = 0;
+    private boolean eof = false;
+
+    public TestInputStreamWithMeta() {
+      iostream = IOUtils.toInputStream("some test data for my input stream");
+    }
+
+    public TestInputStreamWithMeta(InputStream istream) {
+       iostream = istream;
+    }
+
+    @Override
+    public long getVersion() throws IOException {
+      return 1;
+    }
+
+    @Override
+    public synchronized int read() {
+      return 0;
+    }
+
+    @Override
+    public synchronized int read(byte[] b)
+    throws IOException {
+      int length = iostream.read(b);
+      if (length == 0) {
+        return -1;
+      }
+      return length;
+    }
+
+    @Override
+    public long getFileLength() {
+        return 0;
+    }
+  };
+
+  @Before
+  public void setUp() throws Exception {
+    baseDir = new File("/tmp/blob-store-localizer-test-"+ UUID.randomUUID());
+    if (!baseDir.mkdir()) {
+      throw new IOException("failed to create base directory");
+    }
+    ClientBlobStore mockblobstore = mock(ClientBlobStore.class);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    FileUtils.deleteDirectory(baseDir);
+  }
+
+  public String constructUserCacheDir(String base, String user) {
+    return base + "/" + Localizer.USERCACHE + "/" + user;
+  }
+
+  public String constructExpectedFilesDir(String base, String user) {
+    return constructUserCacheDir(base, user) + "/" + Localizer.FILECACHE + "/" 
+ Localizer.FILESDIR;
+  }
+
+  public String constructExpectedArchivesDir(String base, String user) {
+    return constructUserCacheDir(base, user) + "/" + Localizer.FILECACHE + "/" 
+ Localizer
+        .ARCHIVESDIR;
+  }
+
+  @Test
+  public void testDirPaths() throws Exception {
+    Map conf = new HashMap();
+    Localizer localizer = new TestLocalizer(conf, baseDir.toString());
+
+    String expectedDir = constructUserCacheDir(baseDir.toString(), user1);
+    assertEquals("get local user dir doesn't return right value",
+        expectedDir, localizer.getLocalUserDir(user1).toString());
+
+    String expectedFileDir = expectedDir + "/" + Localizer.FILECACHE;
+    assertEquals("get local user file dir doesn't return right value",
+        expectedFileDir, localizer.getLocalUserFileCacheDir(user1).toString());
+  }
+
+  @Test
+  public void testReconstruct() throws Exception {
+    Map conf = new HashMap();
+
+    String expectedFileDir1 = constructExpectedFilesDir(baseDir.toString(), 
user1);
+    String expectedArchiveDir1 = 
constructExpectedArchivesDir(baseDir.toString(), user1);
+    String expectedFileDir2 = constructExpectedFilesDir(baseDir.toString(), 
user2);
+    String expectedArchiveDir2 = 
constructExpectedArchivesDir(baseDir.toString(), user2);
+
+    String key1 = "testfile1.txt";
+    String key2 = "testfile2.txt";
+    String key3 = "testfile3.txt";
+    String key4 = "testfile4.txt";
+
+    String archive1 = "archive1";
+    String archive2 = "archive2";
+
+    File user1file1 = new File(expectedFileDir1, key1 + 
Utils.DEFAULT_CURRENT_BLOB_SUFFIX);
+    File user1file2 = new File(expectedFileDir1, key2 + 
Utils.DEFAULT_CURRENT_BLOB_SUFFIX);
+    File user2file3 = new File(expectedFileDir2, key3 + 
Utils.DEFAULT_CURRENT_BLOB_SUFFIX);
+    File user2file4 = new File(expectedFileDir2, key4 + 
Utils.DEFAULT_CURRENT_BLOB_SUFFIX);
+
+    File user1archive1 = new File(expectedArchiveDir1, archive1 + 
Utils.DEFAULT_CURRENT_BLOB_SUFFIX);
+    File user2archive2 = new File(expectedArchiveDir2, archive2 + 
Utils.DEFAULT_CURRENT_BLOB_SUFFIX);
+    File user1archive1file = new File(user1archive1, "file1");
+    File user2archive2file = new File(user2archive2, "file2");
+
+    // setup some files/dirs to emulate supervisor restart
+    assertTrue("Failed setup filecache dir1", new 
File(expectedFileDir1).mkdirs());
+    assertTrue("Failed setup filecache dir2", new 
File(expectedFileDir2).mkdirs());
+    assertTrue("Failed setup file1", user1file1.createNewFile());
+    assertTrue("Failed setup file2", user1file2.createNewFile());
+    assertTrue("Failed setup file3", user2file3.createNewFile());
+    assertTrue("Failed setup file4", user2file4.createNewFile());
+    assertTrue("Failed setup archive dir1", user1archive1.mkdirs());
+    assertTrue("Failed setup archive dir2", user2archive2.mkdirs());
+    assertTrue("Failed setup file in archivedir1", 
user1archive1file.createNewFile());
+    assertTrue("Failed setup file in archivedir2", 
user2archive2file.createNewFile());
+
+    Localizer localizer = new TestLocalizer(conf, baseDir.toString());
+
+    ArrayList<LocalResource> arrUser1Keys = new ArrayList<LocalResource>();
+    arrUser1Keys.add(new LocalResource(key1, false));
+    arrUser1Keys.add(new LocalResource(archive1, true));
+    localizer.addReferences(arrUser1Keys, user1, "topo1");
+
+    LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1);
+    assertEquals("local resource set size wrong", 3, lrsrcSet.getSize());
+    assertEquals("user doesn't match", user1, lrsrcSet.getUser());
+    LocalizedResource key1rsrc = lrsrcSet.get(key1, false);
+    assertNotNull("Local resource doesn't exist but should", key1rsrc);
+    assertEquals("key doesn't match", key1, key1rsrc.getKey());
+    assertEquals("refcount doesn't match", 1, key1rsrc.getRefCount());
+    LocalizedResource key2rsrc = lrsrcSet.get(key2, false);
+    assertNotNull("Local resource doesn't exist but should", key2rsrc);
+    assertEquals("key doesn't match", key2, key2rsrc.getKey());
+    assertEquals("refcount doesn't match", 0, key2rsrc.getRefCount());
+    LocalizedResource archive1rsrc = lrsrcSet.get(archive1, true);
+    assertNotNull("Local resource doesn't exist but should", archive1rsrc);
+    assertEquals("key doesn't match", archive1, archive1rsrc.getKey());
+    assertEquals("refcount doesn't match", 1, archive1rsrc.getRefCount());
+
+    LocalizedResourceSet lrsrcSet2 = localizer.getUserResources().get(user2);
+    assertEquals("local resource set size wrong", 3, lrsrcSet2.getSize());
+    assertEquals("user doesn't match", user2, lrsrcSet2.getUser());
+    LocalizedResource key3rsrc = lrsrcSet2.get(key3, false);
+    assertNotNull("Local resource doesn't exist but should", key3rsrc);
+    assertEquals("key doesn't match", key3, key3rsrc.getKey());
+    assertEquals("refcount doesn't match", 0, key3rsrc.getRefCount());
+    LocalizedResource key4rsrc = lrsrcSet2.get(key4, false);
+    assertNotNull("Local resource doesn't exist but should", key4rsrc);
+    assertEquals("key doesn't match", key4, key4rsrc.getKey());
+    assertEquals("refcount doesn't match", 0, key4rsrc.getRefCount());
+    LocalizedResource archive2rsrc = lrsrcSet2.get(archive2, true);
+    assertNotNull("Local resource doesn't exist but should", archive2rsrc);
+    assertEquals("key doesn't match", archive2, archive2rsrc.getKey());
+    assertEquals("refcount doesn't match", 0, archive2rsrc.getRefCount());
+  }
+
+  @Test
+  public void testArchivesTgz() throws Exception {
+    testArchives("test/jvm/backtype/storm/localizer/localtestwithsymlink.tgz", 
true,
+        21344);
+  }
+
+  @Test
+  public void testArchivesZip() throws Exception {
+    testArchives("test/jvm/backtype/storm/localizer/localtest.zip", false,
+        21348);
+  }
+
+  @Test
+  public void testArchivesTarGz() throws Exception {
+    
testArchives("test/jvm/backtype/storm/localizer/localtestwithsymlink.tar.gz",
+        true, 21344);
+  }
+
+  @Test
+  public void testArchivesTar() throws Exception {
+    testArchives("test/jvm/backtype/storm/localizer/localtestwithsymlink.tar", 
true,
+        21344);
+  }
+
+  @Test
+  public void testArchivesJar() throws Exception {
+    testArchives("test/jvm/backtype/storm/localizer/localtestwithsymlink.jar", 
false,
+        21416);
+  }
+
+  // archive passed in must contain symlink named tmptestsymlink is not a zip 
file
+  public void testArchives(String archivePath, boolean supportSymlinks, int 
size) throws Exception {
+    Map conf = new HashMap();
+    // set clean time really high so doesn't kick in
+    conf.put(Config.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 
60*60*1000);
+
+    String key1 = new File(archivePath).getName();
+    String topo1 = "topo1";
+    Localizer localizer = new TestLocalizer(conf, baseDir.toString());
+    // set really small so will do cleanup
+    localizer.setTargetCacheSize(1);
+
+    ReadableBlobMeta rbm = new ReadableBlobMeta();
+    rbm.set_settable(new 
SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING));
+    when(mockblobstore.getBlobMeta(key1)).thenReturn(rbm);
+
+    when(mockblobstore.getBlob(key1)).thenReturn(new 
TestInputStreamWithMeta(new
+        FileInputStream(archivePath)));
+
+    long timeBefore = System.nanoTime();
+    File user1Dir = localizer.getLocalUserFileCacheDir(user1);
+    assertTrue("failed to create user dir", user1Dir.mkdirs());
+    LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, true), 
user1, topo1,
+        user1Dir);
+    long timeAfter = System.nanoTime();
+
+    String expectedUserDir = baseDir + "/" + Localizer.USERCACHE + "/" + user1;
+    String expectedFileDir = expectedUserDir + "/" + Localizer.FILECACHE + "/" 
+ Localizer.ARCHIVESDIR;
+    assertTrue("user filecache dir not created", new 
File(expectedFileDir).exists());
+    File keyFile = new File(expectedFileDir, key1 + ".0");
+    assertTrue("blob not created", keyFile.exists());
+    assertTrue("blob is not uncompressed", keyFile.isDirectory());
+    File symlinkFile = new File(keyFile, "tmptestsymlink");
+
+    if (supportSymlinks) {
+      assertTrue("blob uncompressed doesn't contain symlink", 
Files.isSymbolicLink(
+          symlinkFile.toPath()));
+    } else {
+      assertTrue("blob symlink file doesn't exist", symlinkFile.exists());
+    }
+
+    LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1);
+    assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
+    assertEquals("user doesn't match", user1, lrsrcSet.getUser());
+    LocalizedResource key1rsrc = lrsrcSet.get(key1, true);
+    assertNotNull("Local resource doesn't exist but should", key1rsrc);
+    assertEquals("key doesn't match", key1, key1rsrc.getKey());
+    assertEquals("refcount doesn't match", 1, key1rsrc.getRefCount());
+    assertEquals("file path doesn't match", keyFile.toString(), 
key1rsrc.getFilePathWithVersion());
+    assertEquals("size doesn't match", size, key1rsrc.getSize());
+    assertTrue("timestamp not within range", (key1rsrc.getLastAccessTime() >= 
timeBefore && key1rsrc
+        .getLastAccessTime() <= timeAfter));
+
+    timeBefore = System.nanoTime();
+    localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, true);
+    timeAfter = System.nanoTime();
+
+    lrsrcSet = localizer.getUserResources().get(user1);
+    assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
+    key1rsrc = lrsrcSet.get(key1, true);
+    assertNotNull("Local resource doesn't exist but should", key1rsrc);
+    assertEquals("refcount doesn't match", 0, key1rsrc.getRefCount());
+    assertTrue("timestamp not within range", (key1rsrc.getLastAccessTime() >= 
timeBefore && key1rsrc
+        .getLastAccessTime() <= timeAfter));
+
+    // should remove the blob since cache size set really small
+    localizer.handleCacheCleanup();
+
+    lrsrcSet = localizer.getUserResources().get(user1);
+    assertFalse("blob contents not deleted", symlinkFile.exists());
+    assertFalse("blob not deleted", keyFile.exists());
+    assertFalse("blob file dir not deleted", new 
File(expectedFileDir).exists());
+    assertFalse("blob dir not deleted", new File(expectedUserDir).exists());
+    assertNull("user set should be null", lrsrcSet);
+
+  }
+
+
+  @Test
+  public void testBasic() throws Exception {
+    Map conf = new HashMap();
+    // set clean time really high so doesn't kick in
+    conf.put(Config.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 
60*60*1000);
+
+    String key1 = "key1";
+    String topo1 = "topo1";
+    Localizer localizer = new TestLocalizer(conf, baseDir.toString());
+    // set really small so will do cleanup
+    localizer.setTargetCacheSize(1);
+
+    ReadableBlobMeta rbm = new ReadableBlobMeta();
+    rbm.set_settable(new 
SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING));
+    when(mockblobstore.getBlobMeta(key1)).thenReturn(rbm);
+
+    when(mockblobstore.getBlob(key1)).thenReturn(new 
TestInputStreamWithMeta());
+
+    long timeBefore = System.nanoTime();
+    File user1Dir = localizer.getLocalUserFileCacheDir(user1);
+    assertTrue("failed to create user dir", user1Dir.mkdirs());
+    LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, 
false), user1, topo1,
+        user1Dir);
+    long timeAfter = System.nanoTime();
+
+    String expectedUserDir = baseDir + "/" + Localizer.USERCACHE + "/" + user1;
+    String expectedFileDir = expectedUserDir + "/" + Localizer.FILECACHE + "/" 
+ Localizer.FILESDIR;
+    assertTrue("user filecache dir not created", new 
File(expectedFileDir).exists());
+    File keyFile = new File(expectedFileDir, key1);
+    File keyFileCurrentSymlink = new File(expectedFileDir, key1 + 
Utils.DEFAULT_CURRENT_BLOB_SUFFIX);
+
+    assertTrue("blob not created", keyFileCurrentSymlink.exists());
+
+    LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1);
+    assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
+    assertEquals("user doesn't match", user1, lrsrcSet.getUser());
+    LocalizedResource key1rsrc = lrsrcSet.get(key1, false);
+    assertNotNull("Local resource doesn't exist but should", key1rsrc);
+    assertEquals("key doesn't match", key1, key1rsrc.getKey());
+    assertEquals("refcount doesn't match", 1, key1rsrc.getRefCount());
+    assertEquals("file path doesn't match", keyFile.toString(), 
key1rsrc.getFilePath());
+    assertEquals("size doesn't match", 34, key1rsrc.getSize());
+    assertTrue("timestamp not within range", (key1rsrc.getLastAccessTime() >= 
timeBefore && key1rsrc
+        .getLastAccessTime() <= timeAfter));
+
+    timeBefore = System.nanoTime();
+    localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, false);
+    timeAfter = System.nanoTime();
+
+    lrsrcSet = localizer.getUserResources().get(user1);
+    assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
+    key1rsrc = lrsrcSet.get(key1, false);
+    assertNotNull("Local resource doesn't exist but should", key1rsrc);
+    assertEquals("refcount doesn't match", 0, key1rsrc.getRefCount());
+    assertTrue("timestamp not within range", (key1rsrc.getLastAccessTime() >= 
timeBefore && key1rsrc
+        .getLastAccessTime() <= timeAfter));
+
+    // should remove the blob since cache size set really small
+    localizer.handleCacheCleanup();
+
+    lrsrcSet = localizer.getUserResources().get(user1);
+    assertNull("user set should be null", lrsrcSet);
+    assertFalse("blob not deleted", keyFile.exists());
+    assertFalse("blob dir not deleted", new File(expectedFileDir).exists());
+    assertFalse("blob dir not deleted", new File(expectedUserDir).exists());
+  }
+
+  @Test
+  public void testMultipleKeysOneUser() throws Exception {
+    Map conf = new HashMap();
+    // set clean time really high so doesn't kick in
+    conf.put(Config.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 
60*60*1000);
+
+    String key1 = "key1";
+    String topo1 = "topo1";
+    String key2 = "key2";
+    String key3 = "key3";
+    Localizer localizer = new TestLocalizer(conf, baseDir.toString());
+    // set to keep 2 blobs (each of size 34)
+    localizer.setTargetCacheSize(68);
+
+    ReadableBlobMeta rbm = new ReadableBlobMeta();
+    rbm.set_settable(new 
SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING));
+    when(mockblobstore.getBlobMeta(anyString())).thenReturn(rbm);
+    when(mockblobstore.getBlob(key1)).thenReturn(new 
TestInputStreamWithMeta());
+    when(mockblobstore.getBlob(key2)).thenReturn(new 
TestInputStreamWithMeta());
+    when(mockblobstore.getBlob(key3)).thenReturn(new 
TestInputStreamWithMeta());
+
+    List<LocalResource> keys = Arrays.asList(new LocalResource[]{new 
LocalResource(key1, false),
+        new LocalResource(key2, false), new LocalResource(key3, false)});
+    File user1Dir = localizer.getLocalUserFileCacheDir(user1);
+    assertTrue("failed to create user dir", user1Dir.mkdirs());
+
+    List<LocalizedResource> lrsrcs = localizer.getBlobs(keys, user1, topo1, 
user1Dir);
+    LocalizedResource lrsrc = lrsrcs.get(0);
+    LocalizedResource lrsrc2 = lrsrcs.get(1);
+    LocalizedResource lrsrc3 = lrsrcs.get(2);
+
+    String expectedFileDir = baseDir + "/" + Localizer.USERCACHE + "/" + user1 
+
+        "/" + Localizer.FILECACHE + "/" + Localizer.FILESDIR;
+    assertTrue("user filecache dir not created", new 
File(expectedFileDir).exists());
+    File keyFile = new File(expectedFileDir, key1 + 
Utils.DEFAULT_CURRENT_BLOB_SUFFIX);
+    File keyFile2 = new File(expectedFileDir, key2 + 
Utils.DEFAULT_CURRENT_BLOB_SUFFIX);
+    File keyFile3 = new File(expectedFileDir, key3 + 
Utils.DEFAULT_CURRENT_BLOB_SUFFIX);
+
+    assertTrue("blob not created", keyFile.exists());
+    assertTrue("blob not created", keyFile2.exists());
+    assertTrue("blob not created", keyFile3.exists());
+    assertEquals("size doesn't match", 34, keyFile.length());
+    assertEquals("size doesn't match", 34, keyFile2.length());
+    assertEquals("size doesn't match", 34, keyFile3.length());
+    assertEquals("size doesn't match", 34, lrsrc.getSize());
+    assertEquals("size doesn't match", 34, lrsrc3.getSize());
+    assertEquals("size doesn't match", 34, lrsrc2.getSize());
+
+    LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1);
+    assertEquals("local resource set size wrong", 3, lrsrcSet.getSize());
+    assertEquals("user doesn't match", user1, lrsrcSet.getUser());
+
+    long timeBefore = System.nanoTime();
+    localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, false);
+    localizer.removeBlobReference(lrsrc2.getKey(), user1, topo1, false);
+    localizer.removeBlobReference(lrsrc3.getKey(), user1, topo1, false);
+    long timeAfter = System.nanoTime();
+
+    // add reference to one and then remove reference again so it has newer 
timestamp
+    lrsrc = localizer.getBlob(new LocalResource(key1, false), user1, topo1, 
user1Dir);
+    assertTrue("timestamp not within range", (lrsrc.getLastAccessTime() >= 
timeBefore && lrsrc
+        .getLastAccessTime() <= timeAfter));
+    localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, false);
+
+    // should remove the second blob first
+    localizer.handleCacheCleanup();
+
+    lrsrcSet = localizer.getUserResources().get(user1);
+    assertEquals("local resource set size wrong", 2, lrsrcSet.getSize());
+    assertFalse("blob not deleted", keyFile2.exists());
+    assertTrue("blob deleted", keyFile.exists());
+    assertTrue("blob deleted", keyFile3.exists());
+
+    // set size to cleanup another one
+    localizer.setTargetCacheSize(34);
+
+    // should remove the third blob
+    localizer.handleCacheCleanup();
+
+    lrsrcSet = localizer.getUserResources().get(user1);
+    assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
+    assertTrue("blob deleted", keyFile.exists());
+    assertFalse("blob not deleted", keyFile3.exists());
+  }
+
+  @Test(expected = AuthorizationException.class)
+  public void testFailAcls() throws Exception {
+    Map conf = new HashMap();
+    // set clean time really high so doesn't kick in
+    conf.put(Config.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60 * 60 * 
1000);
+
+    String topo1 = "topo1";
+    String key1 = "key1";
+    Localizer localizer = new TestLocalizer(conf, baseDir.toString());
+
+    ReadableBlobMeta rbm = new ReadableBlobMeta();
+    // set acl so user doesn't have read access
+    AccessControl acl = new AccessControl(AccessControlType.USER, 
BlobStoreAclHandler.ADMIN);
+    acl.set_name(user1);
+    rbm.set_settable(new SettableBlobMeta(Arrays.asList(acl)));
+    when(mockblobstore.getBlobMeta(anyString())).thenReturn(rbm);
+    when(mockblobstore.getBlob(key1)).thenReturn(new 
TestInputStreamWithMeta());
+    File user1Dir = localizer.getLocalUserFileCacheDir(user1);
+    assertTrue("failed to create user dir", user1Dir.mkdirs());
+
+    // This should throw AuthorizationException because auth failed
+    localizer.getBlob(new LocalResource(key1, false), user1, topo1, user1Dir);
+  }
+
+  @Test(expected = KeyNotFoundException.class)
+  public void testKeyNotFoundException() throws Exception {
+    Map conf = new HashMap();
+    String key1 = "key1";
+    conf.put(Config.STORM_LOCAL_DIR, "local");
+    conf.put(Config.BLOBSTORE_SUPERUSER, "superuser");
+    conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, 
"backtype.storm.security.auth.DefaultPrincipalToLocal");
+    LocalFsBlobStore bs = new LocalFsBlobStore();
+    LocalFsBlobStore spy = spy(bs);
+    Mockito.doReturn(true).when(spy).checkForBlobOrDownload(key1);
+    Mockito.doNothing().when(spy).checkForBlobUpdate(key1);
+    spy.prepare(conf,null,null);
+    spy.getBlob(key1, null);
+  }
+
+    @Test
+  public void testMultipleUsers() throws Exception {
+    Map conf = new HashMap();
+    // set clean time really high so doesn't kick in
+    conf.put(Config.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 
60*60*1000);
+
+    String topo1 = "topo1";
+    String topo2 = "topo2";
+    String topo3 = "topo3";
+    String key1 = "key1";
+    String key2 = "key2";
+    String key3 = "key3";
+    Localizer localizer = new TestLocalizer(conf, baseDir.toString());
+    // set to keep 2 blobs (each of size 34)
+    localizer.setTargetCacheSize(68);
+
+    ReadableBlobMeta rbm = new ReadableBlobMeta();
+    rbm.set_settable(new 
SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING));
+    when(mockblobstore.getBlobMeta(anyString())).thenReturn(rbm);
+    when(mockblobstore.getBlob(key1)).thenReturn(new 
TestInputStreamWithMeta());
+    when(mockblobstore.getBlob(key2)).thenReturn(new 
TestInputStreamWithMeta());
+    when(mockblobstore.getBlob(key3)).thenReturn(new 
TestInputStreamWithMeta());
+
+    File user1Dir = localizer.getLocalUserFileCacheDir(user1);
+    assertTrue("failed to create user dir", user1Dir.mkdirs());
+    File user2Dir = localizer.getLocalUserFileCacheDir(user2);
+    assertTrue("failed to create user dir", user2Dir.mkdirs());
+    File user3Dir = localizer.getLocalUserFileCacheDir(user3);
+    assertTrue("failed to create user dir", user3Dir.mkdirs());
+
+    LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, 
false), user1, topo1,
+        user1Dir);
+    LocalizedResource lrsrc2 = localizer.getBlob(new LocalResource(key2, 
false), user2, topo2,
+        user2Dir);
+    LocalizedResource lrsrc3 = localizer.getBlob(new LocalResource(key3, 
false), user3, topo3,
+        user3Dir);
+    // make sure we support different user reading same blob
+    LocalizedResource lrsrc1_user3 = localizer.getBlob(new LocalResource(key1, 
false), user3,
+        topo3, user3Dir);
+
+    String expectedUserDir1 = baseDir + "/" + Localizer.USERCACHE + "/" + 
user1;
+    String expectedFileDirUser1 = expectedUserDir1 + "/" + Localizer.FILECACHE 
+ "/" +
+        Localizer.FILESDIR;
+    String expectedFileDirUser2 = baseDir + "/" + Localizer.USERCACHE + "/" + 
user2 +
+        "/" + Localizer.FILECACHE + "/" + Localizer.FILESDIR;
+    String expectedFileDirUser3 = baseDir + "/" + Localizer.USERCACHE + "/" + 
user3 +
+        "/" + Localizer.FILECACHE + "/" + Localizer.FILESDIR;
+    assertTrue("user filecache dir user1 not created", new 
File(expectedFileDirUser1).exists());
+    assertTrue("user filecache dir user2 not created", new 
File(expectedFileDirUser2).exists());
+    assertTrue("user filecache dir user3 not created", new 
File(expectedFileDirUser3).exists());
+
+    File keyFile = new File(expectedFileDirUser1, key1 + 
Utils.DEFAULT_CURRENT_BLOB_SUFFIX);
+    File keyFile2 = new File(expectedFileDirUser2, key2 + 
Utils.DEFAULT_CURRENT_BLOB_SUFFIX);
+    File keyFile3 = new File(expectedFileDirUser3, key3 + 
Utils.DEFAULT_CURRENT_BLOB_SUFFIX);
+    File keyFile1user3 = new File(expectedFileDirUser3, key1 + 
Utils.DEFAULT_CURRENT_BLOB_SUFFIX);
+
+    assertTrue("blob not created", keyFile.exists());
+    assertTrue("blob not created", keyFile2.exists());
+    assertTrue("blob not created", keyFile3.exists());
+    assertTrue("blob not created", keyFile1user3.exists());
+
+    LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1);
+    assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
+    LocalizedResourceSet lrsrcSet2 = localizer.getUserResources().get(user2);
+    assertEquals("local resource set size wrong", 1, lrsrcSet2.getSize());
+    LocalizedResourceSet lrsrcSet3 = localizer.getUserResources().get(user3);
+    assertEquals("local resource set size wrong", 2, lrsrcSet3.getSize());
+
+    localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, false);
+    // should remove key1
+    localizer.handleCacheCleanup();
+
+    lrsrcSet = localizer.getUserResources().get(user1);
+    lrsrcSet3 = localizer.getUserResources().get(user3);
+    assertNull("user set should be null", lrsrcSet);
+    assertFalse("blob dir not deleted", new 
File(expectedFileDirUser1).exists());
+    assertFalse("blob dir not deleted", new File(expectedUserDir1).exists());
+    assertEquals("local resource set size wrong", 2, lrsrcSet3.getSize());
+
+    assertTrue("blob deleted", keyFile2.exists());
+    assertFalse("blob not deleted", keyFile.exists());
+    assertTrue("blob deleted", keyFile3.exists());
+    assertTrue("blob deleted", keyFile1user3.exists());
+  }
+
+  @Test
+  public void testUpdate() throws Exception {
+    Map conf = new HashMap();
+    // set clean time really high so doesn't kick in
+    conf.put(Config.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 
60*60*1000);
+
+    String key1 = "key1";
+    String topo1 = "topo1";
+    String topo2 = "topo2";
+    Localizer localizer = new TestLocalizer(conf, baseDir.toString());
+
+    ReadableBlobMeta rbm = new ReadableBlobMeta();
+    rbm.set_version(1);
+    rbm.set_settable(new 
SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING));
+    when(mockblobstore.getBlobMeta(key1)).thenReturn(rbm);
+    when(mockblobstore.getBlob(key1)).thenReturn(new 
TestInputStreamWithMeta());
+
+    File user1Dir = localizer.getLocalUserFileCacheDir(user1);
+    assertTrue("failed to create user dir", user1Dir.mkdirs());
+    LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, 
false), user1, topo1,
+        user1Dir);
+
+    String expectedUserDir = baseDir + "/" + Localizer.USERCACHE + "/" + user1;
+    String expectedFileDir = expectedUserDir + "/" + Localizer.FILECACHE + "/" 
+ Localizer.FILESDIR;
+    assertTrue("user filecache dir not created", new 
File(expectedFileDir).exists());
+    File keyFile = new File(expectedFileDir, key1);
+    File keyFileCurrentSymlink = new File(expectedFileDir, key1 + 
Utils.DEFAULT_CURRENT_BLOB_SUFFIX);
+    assertTrue("blob not created", keyFileCurrentSymlink.exists());
+    File versionFile = new File(expectedFileDir, key1 + 
Utils.DEFAULT_BLOB_VERSION_SUFFIX);
+    assertTrue("blob version file not created", versionFile.exists());
+    assertEquals("blob version not correct", 1, 
Utils.localVersionOfBlob(keyFile.toString()));
+
+    LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1);
+    assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
+
+    // test another topology getting blob with updated version - it should 
update version now
+    rbm.set_version(2);
+
+    localizer.getBlob(new LocalResource(key1, false), user1, topo2, user1Dir);
+    assertTrue("blob version file not created", versionFile.exists());
+    assertEquals("blob version not correct", 2, 
Utils.localVersionOfBlob(keyFile.toString()));
+    assertTrue("blob file with version 2 not created", new File(keyFile + 
".2").exists());
+
+    // now test regular updateBlob
+    rbm.set_version(3);
+
+    ArrayList<LocalResource> arr = new ArrayList<LocalResource>();
+    arr.add(new LocalResource(key1, false));
+    localizer.updateBlobs(arr, user1);
+    assertTrue("blob version file not created", versionFile.exists());
+    assertEquals("blob version not correct", 3, 
Utils.localVersionOfBlob(keyFile.toString()));
+    assertTrue("blob file with version 3 not created", new File(keyFile + 
".3").exists());
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/test/jvm/backtype/storm/localizer/localtest.zip
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/localizer/localtest.zip 
b/storm-core/test/jvm/backtype/storm/localizer/localtest.zip
new file mode 100644
index 0000000..00ffd20
Binary files /dev/null and 
b/storm-core/test/jvm/backtype/storm/localizer/localtest.zip differ

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/test/jvm/backtype/storm/localizer/localtestwithsymlink.jar
----------------------------------------------------------------------
diff --git 
a/storm-core/test/jvm/backtype/storm/localizer/localtestwithsymlink.jar 
b/storm-core/test/jvm/backtype/storm/localizer/localtestwithsymlink.jar
new file mode 100644
index 0000000..f60cc7b
Binary files /dev/null and 
b/storm-core/test/jvm/backtype/storm/localizer/localtestwithsymlink.jar differ

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/test/jvm/backtype/storm/localizer/localtestwithsymlink.tar
----------------------------------------------------------------------
diff --git 
a/storm-core/test/jvm/backtype/storm/localizer/localtestwithsymlink.tar 
b/storm-core/test/jvm/backtype/storm/localizer/localtestwithsymlink.tar
new file mode 100644
index 0000000..caa316b
Binary files /dev/null and 
b/storm-core/test/jvm/backtype/storm/localizer/localtestwithsymlink.tar differ

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/test/jvm/backtype/storm/localizer/localtestwithsymlink.tar.gz
----------------------------------------------------------------------
diff --git 
a/storm-core/test/jvm/backtype/storm/localizer/localtestwithsymlink.tar.gz 
b/storm-core/test/jvm/backtype/storm/localizer/localtestwithsymlink.tar.gz
new file mode 100644
index 0000000..e0f995f
Binary files /dev/null and 
b/storm-core/test/jvm/backtype/storm/localizer/localtestwithsymlink.tar.gz 
differ

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/test/jvm/backtype/storm/localizer/localtestwithsymlink.tgz
----------------------------------------------------------------------
diff --git 
a/storm-core/test/jvm/backtype/storm/localizer/localtestwithsymlink.tgz 
b/storm-core/test/jvm/backtype/storm/localizer/localtestwithsymlink.tgz
new file mode 100644
index 0000000..0dd2134
Binary files /dev/null and 
b/storm-core/test/jvm/backtype/storm/localizer/localtestwithsymlink.tgz differ

Reply via email to