[
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15014310#comment-15014310
]
ASF GitHub Bot commented on STORM-876:
--------------------------------------
Github user d2r commented on a diff in the pull request:
https://github.com/apache/storm/pull/845#discussion_r45397478
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java
---
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.generated.ReadableBlobMeta;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.thrift.TBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+
+import java.util.*;
+
+import static backtype.storm.blobstore.BlobStoreAclHandler.ADMIN;
+import static backtype.storm.blobstore.BlobStoreAclHandler.READ;
+import static backtype.storm.blobstore.BlobStoreAclHandler.WRITE;
+
+/**
+ * Provides a local file system backed blob store implementation for
Nimbus.
+ */
+public class LocalFsBlobStore extends BlobStore {
+ public static final Logger LOG =
LoggerFactory.getLogger(LocalFsBlobStore.class);
+ private static final String DATA_PREFIX = "data_";
+ private static final String META_PREFIX = "meta_";
+ protected BlobStoreAclHandler _aclHandler;
+ private final String BLOBSTORE_SUBTREE = "/blobstore/";
+ private NimbusInfo nimbusInfo;
+ private FileBlobStoreImpl fbs;
+ private Map conf;
+
+ @Override
+ public void prepare(Map conf, String overrideBase, NimbusInfo
nimbusInfo) {
+ this.conf = conf;
+ this.nimbusInfo = nimbusInfo;
+ if (overrideBase == null) {
+ overrideBase = (String)conf.get(Config.BLOBSTORE_DIR);
+ if (overrideBase == null) {
+ overrideBase = (String) conf.get(Config.STORM_LOCAL_DIR);
+ }
+ }
+ File baseDir = new File(overrideBase, BASE_BLOBS_DIR_NAME);
+ try {
+ fbs = new FileBlobStoreImpl(baseDir, conf);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ _aclHandler = new BlobStoreAclHandler(conf);
+ }
+
+ @Override
+ public AtomicOutputStream createBlob(String key, SettableBlobMeta meta,
Subject who) throws AuthorizationException, KeyAlreadyExistsException {
+ LOG.debug("Creating Blob for key {}", key);
+ validateKey(key);
+ _aclHandler.normalizeSettableBlobMeta(key, meta, who, READ | WRITE |
ADMIN);
+ BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl());
+ _aclHandler.validateACL(meta.get_acl(), READ | WRITE | ADMIN, who,
key);
+ if (fbs.exists(DATA_PREFIX+key)) {
+ throw new KeyAlreadyExistsException(key);
+ }
+ BlobStoreFileOutputStream mOut = null;
+ try {
+ mOut = new BlobStoreFileOutputStream(fbs.write(META_PREFIX+key,
true));
+ mOut.write(Utils.thriftSerialize((TBase) meta));
+ mOut.close();
+ mOut = null;
+ return new BlobStoreFileOutputStream(fbs.write(DATA_PREFIX+key,
true));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (mOut != null) {
+ try {
+ mOut.cancel();
+ } catch (IOException e) {
+ //Ignored
+ }
+ }
+ }
+ }
+
+ @Override
+ public AtomicOutputStream updateBlob(String key, Subject who) throws
AuthorizationException, KeyNotFoundException {
+ checkForBlobOrDownload(key);
+ SettableBlobMeta meta = getStoredBlobMeta(key);
+ _aclHandler.validateACL(meta.get_acl(), WRITE, who, key);
+ validateKey(key);
+ try {
+ return new BlobStoreFileOutputStream(fbs.write(DATA_PREFIX+key,
false));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private SettableBlobMeta getStoredBlobMeta(String key) throws
KeyNotFoundException {
+ InputStream in = null;
+ try {
+ LocalFsBlobStoreFile pf = fbs.read(META_PREFIX+key);
+ try {
+ in = pf.getInputStream();
+ } catch (FileNotFoundException fnf) {
+ throw new KeyNotFoundException(key);
+ }
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ byte [] buffer = new byte[2048];
+ int len;
+ while ((len = in.read(buffer)) > 0) {
+ out.write(buffer, 0, len);
+ }
+ in.close();
+ in = null;
+ SettableBlobMeta sbm =
Utils.thriftDeserialize(SettableBlobMeta.class, out.toByteArray());
+ return sbm;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (in != null) {
+ try {
+ in.close();
+ } catch (IOException e) {
+ //Ignored
+ }
+ }
+ }
+ }
+
+ @Override
+ public ReadableBlobMeta getBlobMeta(String key, Subject who) throws
AuthorizationException, KeyNotFoundException {
+ if(!checkForBlobOrDownload(key)) {
+ checkForBlobUpdate(key);
+ }
+ validateKey(key);
+ SettableBlobMeta meta = getStoredBlobMeta(key);
+ _aclHandler.validateUserCanReadMeta(meta.get_acl(), who, key);
+ ReadableBlobMeta rbm = new ReadableBlobMeta();
+ rbm.set_settable(meta);
+ try {
+ LocalFsBlobStoreFile pf = fbs.read(DATA_PREFIX+key);
+ rbm.set_version(pf.getModTime());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return rbm;
+ }
+
+ @Override
+ public void setBlobMeta(String key, SettableBlobMeta meta, Subject who)
throws AuthorizationException, KeyNotFoundException {
+ checkForBlobOrDownload(key);
+ validateKey(key);
+ _aclHandler.normalizeSettableBlobMeta(key, meta, who, ADMIN);
+ BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl());
+ SettableBlobMeta orig = getStoredBlobMeta(key);
+ _aclHandler.validateACL(orig.get_acl(), ADMIN, who, key);
+ BlobStoreFileOutputStream mOut = null;
+ try {
+ mOut = new BlobStoreFileOutputStream(fbs.write(META_PREFIX+key,
false));
+ mOut.write(Utils.thriftSerialize((TBase) meta));
+ mOut.close();
+ mOut = null;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (mOut != null) {
+ try {
+ mOut.cancel();
+ } catch (IOException e) {
+ //Ignored
+ }
+ }
+ }
+ }
+
+ @Override
+ public void deleteBlob(String key, Subject who) throws
AuthorizationException, KeyNotFoundException {
+ checkForBlobOrDownload(key);
+ validateKey(key);
+ SettableBlobMeta meta = getStoredBlobMeta(key);
+ _aclHandler.validateACL(meta.get_acl(), WRITE, who, key);
+ try {
+ fbs.deleteKey(DATA_PREFIX+key);
+ fbs.deleteKey(META_PREFIX+key);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public InputStreamWithMeta getBlob(String key, Subject who) throws
AuthorizationException, KeyNotFoundException {
+ if(!checkForBlobOrDownload(key)) {
+ checkForBlobUpdate(key);
+ }
+ validateKey(key);
+ SettableBlobMeta meta = getStoredBlobMeta(key);
+ _aclHandler.validateACL(meta.get_acl(), READ, who, key);
+ try {
+ return new BlobStoreFileInputStream(fbs.read(DATA_PREFIX+key));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Iterator<String> listKeys(Subject who) {
+ try {
+ return new KeyTranslationIterator(fbs.listKeys(), DATA_PREFIX);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ }
+
+ @Override
+ public int getBlobReplication(String key, Subject who) throws Exception {
+ validateKey(key);
+ SettableBlobMeta meta = getStoredBlobMeta(key);
+ _aclHandler.validateACL(meta.get_acl(), READ, who, key);
+ CuratorFramework zkClient = Utils.createZKClient(conf);
+ if (zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + key) == null) {
+ zkClient.close();
+ return 0;
+ }
+ int replicationCount =
zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + key).size();
+ zkClient.close();
+ return replicationCount;
+ }
+
+ @Override
+ public int updateBlobReplication(String key, int replication, Subject
who) throws AuthorizationException, KeyNotFoundException {
+ int replicationCount = 0;
+ validateKey(key);
+ SettableBlobMeta meta = getStoredBlobMeta(key);
+ _aclHandler.validateACL(meta.get_acl(), READ, who, key);
--- End diff --
This should be `WRITE`, not `READ`. The supervisor should not be allowed
to update the blob replication.
> Dist Cache: Basic Functionality
> -------------------------------
>
> Key: STORM-876
> URL: https://issues.apache.org/jira/browse/STORM-876
> Project: Apache Storm
> Issue Type: Improvement
> Components: storm-core
> Reporter: Robert Joseph Evans
> Assignee: Robert Joseph Evans
> Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and
> downloading dist cache items. storm-core.ser, storm-conf.ser and storm.jar
> should be written into the blob store instead of residing locally. We need a
> default implementation of the blob store that does essentially what nimbus
> currently does and does not need anything extra. But having an HDFS backend
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and
> provide a working directory for the worker process with symlinks to the
> blobs. It should also allow the blobs to be updated and switch the symlink
> atomically to point to the new blob once it is downloaded.
> All of this is already done by code internal to Yahoo! we are in the process
> of getting it ready to push back to open source shortly.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)