[jira] [Commented] (STORM-1167) Add sliding & tumbling window support for core storm

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15015339#comment-15015339
 ] 

ASF GitHub Bot commented on STORM-1167:
---

Github user harshach commented on the pull request:

https://github.com/apache/storm/pull/855#issuecomment-158301323
  
Thanks @arunmahadevan merged into trunk.


> Add sliding & tumbling window support for core storm
> 
>
> Key: STORM-1167
> URL: https://issues.apache.org/jira/browse/STORM-1167
> Project: Apache Storm
>  Issue Type: Improvement
>Reporter: Arun Mahadevan
>Assignee: Arun Mahadevan
>
> Currently, topologies that needs windowing support requires writing custom 
> logic inside bolts making it tedious to handle the windowing and acking logic 
> with custom logic.
> We can add framework level support to core storm bolts to process tuples in a 
> time or a count based window. Sliding and tumbling windows can be supported.
> Later this can be extended to trident apis as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

2015-11-19 Thread harshach
Github user harshach commented on the pull request:

https://github.com/apache/storm/pull/855#issuecomment-158301323
  
Thanks @arunmahadevan merged into trunk.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1167) Add sliding & tumbling window support for core storm

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15015338#comment-15015338
 ] 

ASF GitHub Bot commented on STORM-1167:
---

Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/855


> Add sliding & tumbling window support for core storm
> 
>
> Key: STORM-1167
> URL: https://issues.apache.org/jira/browse/STORM-1167
> Project: Apache Storm
>  Issue Type: Improvement
>Reporter: Arun Mahadevan
>Assignee: Arun Mahadevan
>
> Currently, topologies that needs windowing support requires writing custom 
> logic inside bolts making it tedious to handle the windowing and acking logic 
> with custom logic.
> We can add framework level support to core storm bolts to process tuples in a 
> time or a count based window. Sliding and tumbling windows can be supported.
> Later this can be extended to trident apis as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

2015-11-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/855


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] Storm 2.0 plan

2015-11-19 Thread Boyang(Jerry) Peng
I am not such of fan of simply substituting the Apache Storm clojure core with 
the JStorm java core either.  Don't get me wrong, I would very much like to 
merge the two projects together and build a stronger community but we cannot 
just hastily rush this process. Like others have said the projects have 
diverged quite a bit in the last couple of years.  I am just nervous to take to 
JStorm core whole sale without carefully examining the implementation.  I am 
sure JStorm has a lot of good features that Apache Storm doesn't have, but 
compatibility is a key issue that determines the future of the project.  We can 
use the JStorm code base as a reference but we really need examine and 
potentially rewrite every line of clojure code to do this right and to make 
sure we minimize compatibility issues even for the features that are small.  
This may sound like it can take a while but with the potential number of full 
time developers working on this, I see it taking only a month or so.
Also, Apache Storm has also received numerous improvements (heartbeat server, 
backpressure, disruptor queue optimizations, etc...) over the last year so it 
would be nice if we can run some sort of performance tests between JStorm and 
Apache Storm to see the strengths and weakness of both implementations.  
Perhaps after that we can have a better gauge of what pieces needs to be taken 
from where for Storm 2.0.   
As for being scared of the other projects.  I think each project has its own 
fortes and area it does well in but Storm definitely still does pretty well in 
the area it was designed the strive in, thus I am not that concerned.
Just some of my thoughts.  In the end, i am really excited for the future of 
this project, especially with the addition of developers from the JStorm 
project.
Best,
Boyang Jerry Peng 


On Thursday, November 19, 2015 9:59 PM, Sean Zhong  
wrote:
 

 Hi All,

I think there are may be some misproper use of words or misunderstanding.

Here is what I can see both agrees these goals:
1. We want to migrate clojure to java
2. We want to merge important features together.
3. We want to do this in a step by step, transparent, reviewable way,
especially with close examination and reviews for code that has
architecture change.
4. We want the final version to remain compatibility.

The only difference is the process we use to achieve these goals.
Longda's view:
1. do a parallel migration from clojure core to java part by part. parallel
means equivalent, no new features added in this step. He suggest to follow
the order "modules/client/utils/nimbus/supervisor/drpc/worker & task/
web ui/others"
He use word "copy", which is mis-proper in my idea. It is more like a
merging.
quote on his words.

>  2.1 Copy modules from JStorm, one module from one module

2.2 The sequence is extern modules/client/utils/nimbus/
> supervisor/drpc/worker & task/web ui/others

2. upon the java core code base, incremental add new feature blocks.
quote on his words.

> 3.1 Discuss solution for each difference(jira)
> 3.2 Once the solution is finalized, we can start the
> merging. (Some issues could be start concurrently. It
> depends on the discussion.)

3.  His goal is to remain compatibility. "this version is stable and
compatible with API of Storm 1.0." is not accurate statement from my point,
at least not for the security feature.
4. He share his concern on other streaming engines.


Bobby and Jungtaek 's view:
1. "Copy" is not acceptable, it will impact the security features. (Copy is
a wrong phase to use, I think Longda means more a merging)
2. With JStorm team, we start with clojure -> java translation first,
3. By optimistic view, with JStorm team, one month should be enough for
above stage.
3. Adding new features after whole code is migrated to java.
4. No need to that worry about other engines.

If my understanding of both parties are correct. I think we agree on most
of things about the process.
first: clojure -> java
second: merge features.

With a slight difference about how aggressive we want to do "clojure ->
java", and how long it takes.


@Longda, can you clarify whether my understanding of your opinion is right?


Sean


On Fri, Nov 20, 2015 at 11:40 AM, P. Taylor Goetz  wrote:

> Very well stated Juntaek.
>
> I should also point out that there's nothing stopping the JStorm team from
> releasing new versions of JStorm, or adding new features. But you would
> have to be careful to note that any such release is "JStorm" and not
> "Apache Storm." And any such release cannot be hosted on Apache
> infrastructure.
>
> We also shouldn't be too worried about competition with other stream
> processing frameworks. Competition is healthy and leads to improvements
> across the board. Spark Streaming borrowed ideas from Storm for its Kafka
> integration. It also borrowed memory management ideas from Flink. I don't
> see that as a problem. This is open source. We can, and should, do the same
> where applicable.
>
> Did w

Why am I getting OffsetOutOfRange: Updating offset from offset?

2015-11-19 Thread Sachin Pasalkar
Hi,

We are developing application where after some days of topology run, we get 
continuous warning messages


2015-11-20 05:05:42.226 s.k.KafkaUtils [WARN] Got fetch request with offset out 
of range: [7238824446]

2015-11-20 05:05:42.229 s.k.t.TridentKafkaEmitter [WARN] OffsetOutOfRange: 
Updating offset from offset = 7238824446 to offset = 7241183683

2015-11-20 05:05:43.207 s.k.KafkaUtils [WARN] Got fetch request with offset out 
of range: [7022945051]

2015-11-20 05:05:43.208 s.k.t.TridentKafkaEmitter [WARN] OffsetOutOfRange: 
Updating offset from offset = 7022945051 to offset = 7025309343

2015-11-20 05:05:44.260 s.k.KafkaUtils [WARN] Got fetch request with offset out 
of range: [7170559432]

2015-11-20 05:05:44.264 s.k.t.TridentKafkaEmitter [WARN] OffsetOutOfRange: 
Updating offset from offset = 7170559432 to offset = 7172920769

2015-11-20 05:05:45.332 s.k.KafkaUtils [WARN] Got fetch request with offset out 
of range: [7132495867]……


After some point topology stop processing messages, I need to rebalance it to 
start it again.


My spout config is


BrokerHosts brokers = new ZkHosts((String) 
stormConfiguration.get(ZOOKEEPER_HOSTS));

TridentKafkaConfig spoutConfig = new TridentKafkaConfig(brokers, (String) 
stormConfiguration.get(KAFKA_INPUT_TOPIC));


spoutConfig.scheme = getSpoutScheme(stormConfiguration);

Boolean forceFromStart = (Boolean) stormConfiguration.get(FORCE_FROM_START);


spoutConfig.ignoreZkOffsets = false;

spoutConfig.fetchSizeBytes = 
stormConfiguration.getIntProperty(KAFKA_CONSUMER_FETCH_SIZE_BYTE, 
KAFKA_CONSUMER_DEFAULT_FETCH_SIZE_BYTE);

spoutConfig.bufferSizeBytes = 
stormConfiguration.getIntProperty(KAFKA_CONSUMER_BUFFER_SIZE_BYTE, 
KAFKA_CONSUMER_DEFAULT_BUFFER_SIZE_BYTE);

As per my knowledge, only thing we are doing wrong is topic has 12 partitions 
but we are reading using only 1 spout, but that’s limitation on our side. I am 
not sure why its getting halted? It just keep printing below lines and does 
nothing


2015-11-20 05:44:41.574 b.s.m.n.Server [INFO] Getting metrics for server on 
port 6700

2015-11-20 05:44:41.574 b.s.m.n.Client [INFO] Getting metrics for client 
connection to Netty-Client-b-bdata-xx.net/xxx.xx.xxx.xxx:6700

2015-11-20 05:44:41.574 b.s.m.n.Client [INFO] Getting metrics for client 
connection to Netty-Client-b-bdata-xx.net/xxx.xx.xxx.xxx:6709

2015-11-20 05:44:41.574 b.s.m.n.Client [INFO] Getting metrics for client 
connection to Netty-Client-b-bdata-xx.net/xxx.xx.xxx.xxx:6707


Thanks,

Sachin


Re: [DISCUSS] Storm 2.0 plan

2015-11-19 Thread Harsha
Longda,

 "2.1 Copy modules from JStorm, one module from one module
> 2.2 The sequence is extern
> modules/client/utils/nimbus/supervisor/drpc/worker & task/web ui/others"

Are you suggesting we just copy the Jstorm code in place of clojure? If
so thats not going to work. There might be some code that can be easily
replaceable with Jstorm's . But not everything will be that
straightforward especially with feature disparity between Storm &
JStorm. 
We should be moving code to java i.e rewriting parts of the code where
needed and if something that can be picked up from JStorm we should do
that .

Thanks,
Harsha

 

On Wed, Nov 18, 2015, at 10:13 PM, Longda Feng wrote:
> Sorry for changing the Subject.
> 
> I am +1 for releasing Storm 2.0 with java core, which is merged with
> JStorm.
> 
> I think the change of this release will be the biggest one in history. It
> will probably take a long time to develop. At the same time, Heron is
> going to open source, and the latest release of Flink provides the
> compatibility to Storm’s API. These might be the threat to Storm. So I
> suggest we start the development of Storm 2.0 as quickly as possible. In
> order to accelerate the development cycle, I proposed to take JStorm
> 2.1.0 core and UI as the base version since this version is stable and
> compatible with API of Storm 1.0. Please refer to the phases below for
> the detailed merging plan.
> 
> Note: We provide a demo of JStorm’s web UI. Please refer to
> storm.taobao.org . I think JStorm will give a totally different view to
> you.
> 
> I would like to share the experience of initial development of JStorm
> (Migrate from clojure core to java core). 
> Our team(4 developers) have spent almost one year to finish the
> migration. We took 4 months to release the first JStorm version, and 6
> months to make JStorm stable. During this period, we tried to switch more
> than online 100 applications with different scenarios from Storm to
> JStorm, and many bugs were fixed. Then more and more applications were
> switched to JStorm in Alibaba.
> Currently, there are 7000+ nodes of JStorm clusters in Alibaba and 2000+
> applications are running on them. The JStorm Clusters here can handle 1.5
> PB/2 Trillion messages per day. The use cases are not only in BigData
> field but also in many other online scenarios.
> Besides it, we have experienced the November 11th Shopping Festival of
> Alibaba for last three years. At that day, the computation in our cluster
> increased several times than usual. All applications worked well during
> the peak time. I can say the stability of JStorm is no doubt today.
> Actually, besides Alibaba, the most powerful Chinese IT company are also
> using JStorm.
> 
> 
> Phase 1:
>  
> Define the target of Storm 2.0. List the requirement of Storm 2.0
> 1. Open a new Umbrella Jira
> (https://issues.apache.org/jira/browse/STORM-717)
> 2. Create one 2.0 branch, 
> 2.1 Copy modules from JStorm, one module from one module
> 2.2 The sequence is extern
> modules/client/utils/nimbus/supervisor/drpc/worker & task/web ui/others
> 3. Create jira for all differences between JStorm 2.1.0 and Storm 1.0
> 3.1 Discuss solution for each difference(jira)
> 3.2 Once the solution is finalized, we can start the merging. (Some
> issues could be start concurrently. It depends on the discussion.)
> 
> The phase mainly try to define target and finalize the solution.
> Hopefully this phase could be finished in 2 month(before 2016/1/31). . 
> 
> 
> Phase 2:
> Release Storm 2.0 beta
> 1. Based on phrase 1's discussion, finish all features of Storm 2.0
> 2. Integrate all modules, make the simplest storm example can run on the
> system.
> 3. Test with all example and modules in Storm code base.
> 4. All daily test can be passed.
>  
> Hopefully this phase could be finished in 2 month(before 2016/3/31)
> 
> 
> Phase 3:
> Persuade some user to have a try.
> Alibaba will try to run some online applications on the beta version
> 
> Hopefully this phase could be finished in 1 month(before 2016/4/31).
> 
> 
> Any comments are welcome.
> 
> 
> Thanks
> Longda--From:P.
> Taylor Goetz Send Time:2015年11月19日(星期四) 06:23To:dev
> ,Bobby Evans Subject:Re:
> [DISCUSS] Plan for Merging JStorm Code
> All I have at this point is a placeholder wiki entry [1], and a lot of
> local notes that likely would only make sense to me.
> 
> Let me know your wiki username and I’ll give you permissions. The same
> goes for anyone else who wants to help.
> 
> -Taylor
> 
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=61328109
> 
> > On Nov 18, 2015, at 2:08 PM, Bobby Evans  
> >wrote:
> > 
> > Taylor and others I was hoping to get started filing JIRA and planning on 
> >how we are going to do the java migration + JStorm merger.  Is anyone else 
> >starting to do this?  If not would anyone object to me starting on it? - 
> >Bobby
> > 
> > 
> >On Thursday, November 12, 2

[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15015260#comment-15015260
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on the pull request:

https://github.com/apache/storm/pull/845#issuecomment-158284724
  
Revert changes to ConfigValidationAnnotations.java; it's only white-space


> Dist Cache: Basic Functionality
> ---
>
> Key: STORM-876
> URL: https://issues.apache.org/jira/browse/STORM-876
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: Robert Joseph Evans
> Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and 
> downloading dist cache items.  storm-core.ser, storm-conf.ser and storm.jar 
> should be written into the blob store instead of residing locally. We need a 
> default implementation of the blob store that does essentially what nimbus 
> currently does and does not need anything extra.  But having an HDFS backend 
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and 
> provide a working directory for the worker process with symlinks to the 
> blobs.  It should also allow the blobs to be updated and switch the symlink 
> atomically to point to the new blob once it is downloaded.
> All of this is already done by code internal to Yahoo! we are in the process 
> of getting it ready to push back to open source shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-19 Thread d2r
Github user d2r commented on the pull request:

https://github.com/apache/storm/pull/845#issuecomment-158284724
  
Revert changes to ConfigValidationAnnotations.java; it's only white-space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-19 Thread d2r
Github user d2r commented on the pull request:

https://github.com/apache/storm/pull/845#issuecomment-158284600
  
ConfigValidation.java
```
480/**
481 * Validate topology.map config
482 */
483 public static class MapOfStringToMapOfStringToObjectValidator 
extends Validator {
```
It's more useful to describe what the validator does, if necessary, not who 
might be using it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15015257#comment-15015257
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on the pull request:

https://github.com/apache/storm/pull/845#issuecomment-158284600
  
ConfigValidation.java
```
480/**
481 * Validate topology.map config
482 */
483 public static class MapOfStringToMapOfStringToObjectValidator 
extends Validator {
```
It's more useful to describe what the validator does, if necessary, not who 
might be using it.


> Dist Cache: Basic Functionality
> ---
>
> Key: STORM-876
> URL: https://issues.apache.org/jira/browse/STORM-876
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: Robert Joseph Evans
> Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and 
> downloading dist cache items.  storm-core.ser, storm-conf.ser and storm.jar 
> should be written into the blob store instead of residing locally. We need a 
> default implementation of the blob store that does essentially what nimbus 
> currently does and does not need anything extra.  But having an HDFS backend 
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and 
> provide a working directory for the worker process with symlinks to the 
> blobs.  It should also allow the blobs to be updated and switch the symlink 
> atomically to point to the new blob once it is downloaded.
> All of this is already done by code internal to Yahoo! we are in the process 
> of getting it ready to push back to open source shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1222) Support Kafka as external tables in StormSQL

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15015254#comment-15015254
 ] 

ASF GitHub Bot commented on STORM-1222:
---

GitHub user haohui opened a pull request:

https://github.com/apache/storm/pull/896

STORM-1222. Support Kafka as external tables in StormSQL

This PR adds the support of Kafka as external tables in StormSQL.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/haohui/storm STORM-1222

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/896.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #896


commit 7d442d084a61c15056d86977a5238160124a10f7
Author: Haohui Mai 
Date:   2015-11-11T18:03:07Z

Allow deserializing Java class with custom classloaders in tests.

commit cdc0538038faf35efdcc5418a54967f8e0caac85
Author: Haohui Mai 
Date:   2015-11-19T22:28:55Z

STORM-1221. Create a common interface for all Trident spout.

commit 337b902ef68beb4efaf4458ece898f3992e1d3b5
Author: Haohui Mai 
Date:   2015-11-06T23:50:31Z

Refactor to support compiling StormSQL to Trident topology.

commit 3c12d0b4e410923bacffee8e7a0121a9a188d4e8
Author: Haohui Mai 
Date:   2015-11-16T22:48:43Z

STORM-1181. Compile SQLs into Tridient topology and execute them in 
LocalCluster.

commit 0eebb38221debddf6824b0cff1d8faa395d5df80
Author: Haohui Mai 
Date:   2015-11-19T22:09:26Z

STORM-1222. Support Kafka as external tables in StormSQL.




> Support Kafka as external tables in StormSQL
> 
>
> Key: STORM-1222
> URL: https://issues.apache.org/jira/browse/STORM-1222
> Project: Apache Storm
>  Issue Type: New Feature
>  Components: storm-sql
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> This jira propose to support Kafka as both the data ingress and egress point 
> in StormSQL.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: STORM-1222. Support Kafka as external tables i...

2015-11-19 Thread haohui
GitHub user haohui opened a pull request:

https://github.com/apache/storm/pull/896

STORM-1222. Support Kafka as external tables in StormSQL

This PR adds the support of Kafka as external tables in StormSQL.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/haohui/storm STORM-1222

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/896.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #896


commit 7d442d084a61c15056d86977a5238160124a10f7
Author: Haohui Mai 
Date:   2015-11-11T18:03:07Z

Allow deserializing Java class with custom classloaders in tests.

commit cdc0538038faf35efdcc5418a54967f8e0caac85
Author: Haohui Mai 
Date:   2015-11-19T22:28:55Z

STORM-1221. Create a common interface for all Trident spout.

commit 337b902ef68beb4efaf4458ece898f3992e1d3b5
Author: Haohui Mai 
Date:   2015-11-06T23:50:31Z

Refactor to support compiling StormSQL to Trident topology.

commit 3c12d0b4e410923bacffee8e7a0121a9a188d4e8
Author: Haohui Mai 
Date:   2015-11-16T22:48:43Z

STORM-1181. Compile SQLs into Tridient topology and execute them in 
LocalCluster.

commit 0eebb38221debddf6824b0cff1d8faa395d5df80
Author: Haohui Mai 
Date:   2015-11-19T22:09:26Z

STORM-1222. Support Kafka as external tables in StormSQL.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (STORM-1222) Support Kafka as external tables in StormSQL

2015-11-19 Thread Haohui Mai (JIRA)
Haohui Mai created STORM-1222:
-

 Summary: Support Kafka as external tables in StormSQL
 Key: STORM-1222
 URL: https://issues.apache.org/jira/browse/STORM-1222
 Project: Apache Storm
  Issue Type: New Feature
  Components: storm-sql
Reporter: Haohui Mai
Assignee: Haohui Mai


This jira propose to support Kafka as both the data ingress and egress point in 
StormSQL.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15015250#comment-15015250
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on the pull request:

https://github.com/apache/storm/pull/845#issuecomment-158283901
  
Utils.java

* In general, the new methods we are adding here might be better off 
somewhere in the blobstore package.
* Check indentation of changed code.  I see some indented at 2 spaces, some 
at 4.  Also, I see some inconsistent initial indentation.
* Some of these methods are synchronized and hold a lock on Utils while 
they access ZK.  I can see this causing lots of problems.  Can we try and use 
more granular locking?  Some of this can be helped by moving the code to a more 
appropriate class or namespace.


```
447 nconf.put(Config.BLOBSTORE_CLEANUP_ENABLE, new Boolean(true));
```
I read it is dangerous to create instances of Boolean with given values, 
and is preferable to use `Boolean.TRUE` and `Boolean.FALSE`.


```
 452   // Meant to be called only by the supervisor for 
stormjar/stormconf/stormcode files.
 453   public static void downloadResourcesAsSupervisor(Map conf, String 
key, String localFile,
 454ClientBlobStore cb) 
throws AuthorizationExcep tion, KeyNotFoundException, IOException {
 455 final int MAX_RETRY_ATTEMPTS = 2;
 456 final int ATTEMPTS_INTERVAL_TIME = 100;
 457 for (int retryAttempts = 0; retryAttempts < MAX_RETRY_ATTEMPTS; 
retryAttempts++) {
 458   if (downloadResourcesAsSupervisorAttempt(cb, key, localFile)) {
 459 break;
 460   }
 461   Utils.sleep(ATTEMPTS_INTERVAL_TIME);
 462 }
 463 //NO Exception on error the supervisor will try again after a while
 464   }
```

* `conf` parameter not used.
* The values here are hard-coded and are small.  What benefit does the 
sleep have?  Could we unroll the `for` loop?
* If this is meant for the supervisor, does it belong in supervisor.clj?


```
 486 } finally {
 487   try {
 488 if (out != null) out.close();
 489   } catch (IOException ignored) {}
 490   try {
 491 if (in != null) in.close();
 492   } catch (IOException ignored) {}
 493 }
```

* minor: braces for the `if` statements

```
 510 try {
 511   ReadableBlobMeta metadata = cb.getBlobMeta(key);
 512   nimbusBlobVersion = metadata.get_version();
 513 } catch (AuthorizationException | KeyNotFoundException exp) {
 514   throw exp;
 515 } catch (TException e) {
 516   throw new RuntimeException(e);
 517 }
```

Can we ever catch a `TException` here?

```
 562   // only works on operating  systems that support posix
 563   public static void restrictPermissions(String baseDir) {
 564 try {
 565   Set perms = new 
HashSet(
 566   Arrays.asList(PosixFilePermission.OWNER_READ, 
PosixFilePermission.OWNER_WRITE,
 567   PosixFilePermission.OWNER_EXECUTE, 
PosixFilePermission.GROUP_READ,
 568   PosixFilePermission.GROUP_EXECUTE));
 569   
Files.setPosixFilePermissions(FileSystems.getDefault().getPath(baseDir), perms);
 570 } catch (IOException e) {
 571   throw new RuntimeException(e);
 572 }
 573   }
```

If this is only used by the supervisor, should we move it to supervisor.clj?

```
 627 public static byte[] thriftSerialize(TBase t) {

 641 public static  T thriftDeserialize(Class c, byte[] b, int 
offset, int length) {
 
 652 public static  T thriftDeserialize(Class c, byte[] b) {
```

* `thriftDeserialize(Class c, byte[] b, int offset, int length)` does not 
appear to be used anywhere
* The other two methods look nearly identical to code that is in Trident.  
Could we make these common?

```
 756* @param in InputStrem to read from
```

`InputStream`


```
 928   // java equivalent of util.on-windows?
 929   public static boolean onWindows() {
 930 return System.getenv("OS") == "Windows_NT";
 931   }
```

* Use `.equals` for String
* Remove comment
* Remove `util.on-windows?` and have clojure code call into this one.

```
 933   public static long unpack(File localrsrc, File dst) throws 
IOException {
```

This always returns `0`, and the only place it's called ignores the return 
value.  We can make it `void`.

```
1102  * Takes an input dir or file and returns the du on that local 
directory. Very ba

[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-19 Thread d2r
Github user d2r commented on the pull request:

https://github.com/apache/storm/pull/845#issuecomment-158283901
  
Utils.java

* In general, the new methods we are adding here might be better off 
somewhere in the blobstore package.
* Check indentation of changed code.  I see some indented at 2 spaces, some 
at 4.  Also, I see some inconsistent initial indentation.
* Some of these methods are synchronized and hold a lock on Utils while 
they access ZK.  I can see this causing lots of problems.  Can we try and use 
more granular locking?  Some of this can be helped by moving the code to a more 
appropriate class or namespace.


```
447 nconf.put(Config.BLOBSTORE_CLEANUP_ENABLE, new Boolean(true));
```
I read it is dangerous to create instances of Boolean with given values, 
and is preferable to use `Boolean.TRUE` and `Boolean.FALSE`.


```
 452   // Meant to be called only by the supervisor for 
stormjar/stormconf/stormcode files.
 453   public static void downloadResourcesAsSupervisor(Map conf, String 
key, String localFile,
 454ClientBlobStore cb) 
throws AuthorizationExcep tion, KeyNotFoundException, IOException {
 455 final int MAX_RETRY_ATTEMPTS = 2;
 456 final int ATTEMPTS_INTERVAL_TIME = 100;
 457 for (int retryAttempts = 0; retryAttempts < MAX_RETRY_ATTEMPTS; 
retryAttempts++) {
 458   if (downloadResourcesAsSupervisorAttempt(cb, key, localFile)) {
 459 break;
 460   }
 461   Utils.sleep(ATTEMPTS_INTERVAL_TIME);
 462 }
 463 //NO Exception on error the supervisor will try again after a while
 464   }
```

* `conf` parameter not used.
* The values here are hard-coded and are small.  What benefit does the 
sleep have?  Could we unroll the `for` loop?
* If this is meant for the supervisor, does it belong in supervisor.clj?


```
 486 } finally {
 487   try {
 488 if (out != null) out.close();
 489   } catch (IOException ignored) {}
 490   try {
 491 if (in != null) in.close();
 492   } catch (IOException ignored) {}
 493 }
```

* minor: braces for the `if` statements

```
 510 try {
 511   ReadableBlobMeta metadata = cb.getBlobMeta(key);
 512   nimbusBlobVersion = metadata.get_version();
 513 } catch (AuthorizationException | KeyNotFoundException exp) {
 514   throw exp;
 515 } catch (TException e) {
 516   throw new RuntimeException(e);
 517 }
```

Can we ever catch a `TException` here?

```
 562   // only works on operating  systems that support posix
 563   public static void restrictPermissions(String baseDir) {
 564 try {
 565   Set perms = new 
HashSet(
 566   Arrays.asList(PosixFilePermission.OWNER_READ, 
PosixFilePermission.OWNER_WRITE,
 567   PosixFilePermission.OWNER_EXECUTE, 
PosixFilePermission.GROUP_READ,
 568   PosixFilePermission.GROUP_EXECUTE));
 569   
Files.setPosixFilePermissions(FileSystems.getDefault().getPath(baseDir), perms);
 570 } catch (IOException e) {
 571   throw new RuntimeException(e);
 572 }
 573   }
```

If this is only used by the supervisor, should we move it to supervisor.clj?

```
 627 public static byte[] thriftSerialize(TBase t) {

 641 public static  T thriftDeserialize(Class c, byte[] b, int 
offset, int length) {
 
 652 public static  T thriftDeserialize(Class c, byte[] b) {
```

* `thriftDeserialize(Class c, byte[] b, int offset, int length)` does not 
appear to be used anywhere
* The other two methods look nearly identical to code that is in Trident.  
Could we make these common?

```
 756* @param in InputStrem to read from
```

`InputStream`


```
 928   // java equivalent of util.on-windows?
 929   public static boolean onWindows() {
 930 return System.getenv("OS") == "Windows_NT";
 931   }
```

* Use `.equals` for String
* Remove comment
* Remove `util.on-windows?` and have clojure code call into this one.

```
 933   public static long unpack(File localrsrc, File dst) throws 
IOException {
```

This always returns `0`, and the only place it's called ignores the return 
value.  We can make it `void`.

```
1102  * Takes an input dir or file and returns the du on that local 
directory. Very basic
```

`du` -> `disk usage`


```
1375   if (isSuccess != true) {
```

`!isSuccess`


```
1405 public static synchronized void updateKeyForBlobStore (Map conf, 
BlobStore blobStore, Curat

[jira] [Commented] (STORM-1220) Avoid double copying in the Kafka spout

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15015237#comment-15015237
 ] 

ASF GitHub Bot commented on STORM-1220:
---

Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/894#discussion_r45436425
  
--- Diff: storm-core/src/jvm/backtype/storm/spout/RawScheme.java ---
@@ -18,11 +18,13 @@
 package backtype.storm.spout;
 
 import backtype.storm.tuple.Fields;
+
+import java.nio.ByteBuffer;
 import java.util.List;
 import static backtype.storm.utils.Utils.tuple;
 
 public class RawScheme implements Scheme {
-public List deserialize(byte[] ser) {
+public List deserialize(ByteBuffer ser) {
--- End diff --

@harshach Actually I was referring to the receiver (the bolts) that might 
be currently doing something like `byte[] bytes = 
inputTuple.getBinaryByField("bytes");` to get the data emitted from kafka 
spout. It appears that the `deserialize` method returns a tuple that wraps the 
input ByteBuffer which gets emitted.


> Avoid double copying in the Kafka spout
> ---
>
> Key: STORM-1220
> URL: https://issues.apache.org/jira/browse/STORM-1220
> Project: Apache Storm
>  Issue Type: Bug
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> Currently the kafka spout takes a {{ByteBuffer}} from Kafka. However, the 
> serialization scheme takes a {{byte[]}} array as input. Therefore the current 
> implementation copies the {{ByteBuffer}} to a new {{byte[]}} array in order 
> to hook everything together.
> This jira proposes to changes the interfaces of serialization scheme to avoid 
> copying the data twice in the spout.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: STORM-1220. Avoid double copying in the Kafka ...

2015-11-19 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/894#discussion_r45436425
  
--- Diff: storm-core/src/jvm/backtype/storm/spout/RawScheme.java ---
@@ -18,11 +18,13 @@
 package backtype.storm.spout;
 
 import backtype.storm.tuple.Fields;
+
+import java.nio.ByteBuffer;
 import java.util.List;
 import static backtype.storm.utils.Utils.tuple;
 
 public class RawScheme implements Scheme {
-public List deserialize(byte[] ser) {
+public List deserialize(ByteBuffer ser) {
--- End diff --

@harshach Actually I was referring to the receiver (the bolts) that might 
be currently doing something like `byte[] bytes = 
inputTuple.getBinaryByField("bytes");` to get the data emitted from kafka 
spout. It appears that the `deserialize` method returns a tuple that wraps the 
input ByteBuffer which gets emitted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1155: Supervisor recurring health checks

2015-11-19 Thread hustfxj
Github user hustfxj commented on a diff in the pull request:

https://github.com/apache/storm/pull/849#discussion_r45436295
  
--- Diff: storm-core/src/clj/backtype/storm/command/healthcheck.clj ---
@@ -0,0 +1,88 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns backtype.storm.command.healthcheck
+  (:require [backtype.storm
+ [config :refer :all]
+ [log :refer :all]]
+[clojure.java [io :as io]]
+[clojure [string :refer [split]]])
+  (:gen-class))
+
+(defn interrupter
+  "Interrupt a given thread after ms milliseconds."
+  [thread ms]
+  (let [interrupter (Thread.
+ (fn []
+   (try
+ (Thread/sleep ms)
+ (.interrupt thread)
+ (catch InterruptedException e]
+(.start interrupter)
+interrupter))
+
+(defn check-output [lines]
+  (if (some #(.startsWith % "ERROR") lines)
+:failed
+:success))
+
+(defn process-script [conf script]
+  (let [script-proc (. (Runtime/getRuntime) (exec script))
+curthread (Thread/currentThread)
+interrupter-thread (interrupter curthread
+(conf 
STORM-HEALTH-CHECK-TIMEOUT-MS))]
+(try
+  (.waitFor script-proc)
+  (.interrupt interrupter-thread)
--- End diff --

@revans2  If script-proc is blocked,then throw InterruptedException and 
println "Script" script "timed out.".But the script-proc isn't really stop.Like 
that:
admin12755 1  0 12:49 pts/000:00:00 /bin/sh 
/home/admin/test/healthCheck.sh
admin12978 1  0 12:50 pts/000:00:00 /bin/sh 
/home/admin/test/healthCheck.sh
admin13228 1  0 12:51 pts/000:00:00 /bin/sh 
/home/admin/test/healthCheck.sh
admin13504 1  0 12:52 pts/000:00:00 /bin/sh 
/home/admin/test/healthCheck.sh
admin13644 13465  0 12:52 pts/000:00:00 /bin/sh 
/home/admin/test/healthCheck.sh

Maybe we can stop the process ?

(defn interrupter
+  "Interrupt a given thread after ms milliseconds."
+  [script-proc ms]
+  (let [interrupter (Thread.
+ (fn []
+   (try
+ (Thread/sleep ms)
+ (.destory script-proc)
+ (catch InterruptedException e]
+(.start interrupter)
+interrupter))



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1155) Supervisor recurring health checks

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15015233#comment-15015233
 ] 

ASF GitHub Bot commented on STORM-1155:
---

Github user hustfxj commented on a diff in the pull request:

https://github.com/apache/storm/pull/849#discussion_r45436295
  
--- Diff: storm-core/src/clj/backtype/storm/command/healthcheck.clj ---
@@ -0,0 +1,88 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns backtype.storm.command.healthcheck
+  (:require [backtype.storm
+ [config :refer :all]
+ [log :refer :all]]
+[clojure.java [io :as io]]
+[clojure [string :refer [split]]])
+  (:gen-class))
+
+(defn interrupter
+  "Interrupt a given thread after ms milliseconds."
+  [thread ms]
+  (let [interrupter (Thread.
+ (fn []
+   (try
+ (Thread/sleep ms)
+ (.interrupt thread)
+ (catch InterruptedException e]
+(.start interrupter)
+interrupter))
+
+(defn check-output [lines]
+  (if (some #(.startsWith % "ERROR") lines)
+:failed
+:success))
+
+(defn process-script [conf script]
+  (let [script-proc (. (Runtime/getRuntime) (exec script))
+curthread (Thread/currentThread)
+interrupter-thread (interrupter curthread
+(conf 
STORM-HEALTH-CHECK-TIMEOUT-MS))]
+(try
+  (.waitFor script-proc)
+  (.interrupt interrupter-thread)
--- End diff --

@revans2  If script-proc is blocked,then throw InterruptedException and 
println "Script" script "timed out.".But the script-proc isn't really stop.Like 
that:
admin12755 1  0 12:49 pts/000:00:00 /bin/sh 
/home/admin/test/healthCheck.sh
admin12978 1  0 12:50 pts/000:00:00 /bin/sh 
/home/admin/test/healthCheck.sh
admin13228 1  0 12:51 pts/000:00:00 /bin/sh 
/home/admin/test/healthCheck.sh
admin13504 1  0 12:52 pts/000:00:00 /bin/sh 
/home/admin/test/healthCheck.sh
admin13644 13465  0 12:52 pts/000:00:00 /bin/sh 
/home/admin/test/healthCheck.sh

Maybe we can stop the process ?

(defn interrupter
+  "Interrupt a given thread after ms milliseconds."
+  [script-proc ms]
+  (let [interrupter (Thread.
+ (fn []
+   (try
+ (Thread/sleep ms)
+ (.destory script-proc)
+ (catch InterruptedException e]
+(.start interrupter)
+interrupter))



> Supervisor recurring health checks
> --
>
> Key: STORM-1155
> URL: https://issues.apache.org/jira/browse/STORM-1155
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Thomas Graves
>Assignee: Thomas Graves
> Fix For: 0.11.0
>
>
> Add the ability for the supervisor to call out to health check scripts to 
> allow some validation of the health of the node the supervisor is running on.
> It could regularly run scripts in a directory provided by the cluster admin. 
> If any scripts fail, it should kill the workers and stop itself.
> This could work very much like the Hadoop scripts and if ERROR is returned on 
> stdout it means the node has some issue and we should shut down.
> If a non-zero exit code is returned it indicates that the scripts failed to 
> execute properly so you don't want to mark the node as unhealthy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1220) Avoid double copying in the Kafka spout

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15015227#comment-15015227
 ] 

ASF GitHub Bot commented on STORM-1220:
---

Github user harshach commented on a diff in the pull request:

https://github.com/apache/storm/pull/894#discussion_r45436089
  
--- Diff: storm-core/src/jvm/backtype/storm/spout/RawScheme.java ---
@@ -18,11 +18,13 @@
 package backtype.storm.spout;
 
 import backtype.storm.tuple.Fields;
+
+import java.nio.ByteBuffer;
 import java.util.List;
 import static backtype.storm.utils.Utils.tuple;
 
 public class RawScheme implements Scheme {
-public List deserialize(byte[] ser) {
+public List deserialize(ByteBuffer ser) {
--- End diff --

@arunmahadevan it might but I don't seen any one using this apart from 
KafkaSpout


> Avoid double copying in the Kafka spout
> ---
>
> Key: STORM-1220
> URL: https://issues.apache.org/jira/browse/STORM-1220
> Project: Apache Storm
>  Issue Type: Bug
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> Currently the kafka spout takes a {{ByteBuffer}} from Kafka. However, the 
> serialization scheme takes a {{byte[]}} array as input. Therefore the current 
> implementation copies the {{ByteBuffer}} to a new {{byte[]}} array in order 
> to hook everything together.
> This jira proposes to changes the interfaces of serialization scheme to avoid 
> copying the data twice in the spout.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: STORM-1220. Avoid double copying in the Kafka ...

2015-11-19 Thread harshach
Github user harshach commented on a diff in the pull request:

https://github.com/apache/storm/pull/894#discussion_r45436089
  
--- Diff: storm-core/src/jvm/backtype/storm/spout/RawScheme.java ---
@@ -18,11 +18,13 @@
 package backtype.storm.spout;
 
 import backtype.storm.tuple.Fields;
+
+import java.nio.ByteBuffer;
 import java.util.List;
 import static backtype.storm.utils.Utils.tuple;
 
 public class RawScheme implements Scheme {
-public List deserialize(byte[] ser) {
+public List deserialize(ByteBuffer ser) {
--- End diff --

@arunmahadevan it might but I don't seen any one using this apart from 
KafkaSpout


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1220) Avoid double copying in the Kafka spout

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15015197#comment-15015197
 ] 

ASF GitHub Bot commented on STORM-1220:
---

Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/894#discussion_r45435067
  
--- Diff: external/storm-kafka/src/jvm/storm/kafka/StringScheme.java ---
@@ -20,21 +20,23 @@
 import backtype.storm.spout.Scheme;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
 
 import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
 import java.util.List;
 
 public class StringScheme implements Scheme {
 
 public static final String STRING_SCHEME_KEY = "str";
 
-public List deserialize(byte[] bytes) {
+public List deserialize(ByteBuffer bytes) {
 return new Values(deserializeString(bytes));
 }
 
-public static String deserializeString(byte[] string) {
+public static String deserializeString(ByteBuffer string) {
 try {
-return new String(string, "UTF-8");
+return new String(Utils.toByteArray(string), "UTF-8");
--- End diff --

This would create a copy. If the ByteBuffer is array backed, you could use 
`ByteBuffer.array()` to avoid copy.


> Avoid double copying in the Kafka spout
> ---
>
> Key: STORM-1220
> URL: https://issues.apache.org/jira/browse/STORM-1220
> Project: Apache Storm
>  Issue Type: Bug
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> Currently the kafka spout takes a {{ByteBuffer}} from Kafka. However, the 
> serialization scheme takes a {{byte[]}} array as input. Therefore the current 
> implementation copies the {{ByteBuffer}} to a new {{byte[]}} array in order 
> to hook everything together.
> This jira proposes to changes the interfaces of serialization scheme to avoid 
> copying the data twice in the spout.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: STORM-1220. Avoid double copying in the Kafka ...

2015-11-19 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/894#discussion_r45435067
  
--- Diff: external/storm-kafka/src/jvm/storm/kafka/StringScheme.java ---
@@ -20,21 +20,23 @@
 import backtype.storm.spout.Scheme;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
 
 import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
 import java.util.List;
 
 public class StringScheme implements Scheme {
 
 public static final String STRING_SCHEME_KEY = "str";
 
-public List deserialize(byte[] bytes) {
+public List deserialize(ByteBuffer bytes) {
 return new Values(deserializeString(bytes));
 }
 
-public static String deserializeString(byte[] string) {
+public static String deserializeString(ByteBuffer string) {
 try {
-return new String(string, "UTF-8");
+return new String(Utils.toByteArray(string), "UTF-8");
--- End diff --

This would create a copy. If the ByteBuffer is array backed, you could use 
`ByteBuffer.array()` to avoid copy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1220) Avoid double copying in the Kafka spout

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15015195#comment-15015195
 ] 

ASF GitHub Bot commented on STORM-1220:
---

Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/894#discussion_r45434875
  
--- Diff: storm-core/src/jvm/backtype/storm/spout/RawScheme.java ---
@@ -18,11 +18,13 @@
 package backtype.storm.spout;
 
 import backtype.storm.tuple.Fields;
+
+import java.nio.ByteBuffer;
 import java.util.List;
 import static backtype.storm.utils.Utils.tuple;
 
 public class RawScheme implements Scheme {
-public List deserialize(byte[] ser) {
+public List deserialize(ByteBuffer ser) {
--- End diff --

Will this break the compatibility at the receiver ?. Earlier the tuple 
contained a `byte[]` and with the change it would contain a `ByteBuffer`.


> Avoid double copying in the Kafka spout
> ---
>
> Key: STORM-1220
> URL: https://issues.apache.org/jira/browse/STORM-1220
> Project: Apache Storm
>  Issue Type: Bug
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> Currently the kafka spout takes a {{ByteBuffer}} from Kafka. However, the 
> serialization scheme takes a {{byte[]}} array as input. Therefore the current 
> implementation copies the {{ByteBuffer}} to a new {{byte[]}} array in order 
> to hook everything together.
> This jira proposes to changes the interfaces of serialization scheme to avoid 
> copying the data twice in the spout.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: STORM-1220. Avoid double copying in the Kafka ...

2015-11-19 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/894#discussion_r45434875
  
--- Diff: storm-core/src/jvm/backtype/storm/spout/RawScheme.java ---
@@ -18,11 +18,13 @@
 package backtype.storm.spout;
 
 import backtype.storm.tuple.Fields;
+
+import java.nio.ByteBuffer;
 import java.util.List;
 import static backtype.storm.utils.Utils.tuple;
 
 public class RawScheme implements Scheme {
-public List deserialize(byte[] ser) {
+public List deserialize(ByteBuffer ser) {
--- End diff --

Will this break the compatibility at the receiver ?. Earlier the tuple 
contained a `byte[]` and with the change it would contain a `ByteBuffer`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15015176#comment-15015176
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on the pull request:

https://github.com/apache/storm/pull/845#issuecomment-158273465
  
LocalizedResourceRetentionSet.java
```
140 public boolean equals(Object other) {
141   return this == other;
142 }
```

Object is the superclass, and this is what Object#equals does.



> Dist Cache: Basic Functionality
> ---
>
> Key: STORM-876
> URL: https://issues.apache.org/jira/browse/STORM-876
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: Robert Joseph Evans
> Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and 
> downloading dist cache items.  storm-core.ser, storm-conf.ser and storm.jar 
> should be written into the blob store instead of residing locally. We need a 
> default implementation of the blob store that does essentially what nimbus 
> currently does and does not need anything extra.  But having an HDFS backend 
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and 
> provide a working directory for the worker process with symlinks to the 
> blobs.  It should also allow the blobs to be updated and switch the symlink 
> atomically to point to the new blob once it is downloaded.
> All of this is already done by code internal to Yahoo! we are in the process 
> of getting it ready to push back to open source shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-19 Thread d2r
Github user d2r commented on the pull request:

https://github.com/apache/storm/pull/845#issuecomment-158273465
  
LocalizedResourceRetentionSet.java
```
140 public boolean equals(Object other) {
141   return this == other;
142 }
```

Object is the superclass, and this is what Object#equals does.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] Storm 2.0 plan

2015-11-19 Thread Sean Zhong
Hi All,

I think there are may be some misproper use of words or misunderstanding.

Here is what I can see both agrees these goals:
1. We want to migrate clojure to java
2. We want to merge important features together.
3. We want to do this in a step by step, transparent, reviewable way,
especially with close examination and reviews for code that has
architecture change.
4. We want the final version to remain compatibility.

The only difference is the process we use to achieve these goals.
Longda's view:
1. do a parallel migration from clojure core to java part by part. parallel
means equivalent, no new features added in this step. He suggest to follow
the order "modules/client/utils/nimbus/supervisor/drpc/worker & task/
web ui/others"
He use word "copy", which is mis-proper in my idea. It is more like a
merging.
quote on his words.

>  2.1 Copy modules from JStorm, one module from one module

2.2 The sequence is extern modules/client/utils/nimbus/
> supervisor/drpc/worker & task/web ui/others

2. upon the java core code base, incremental add new feature blocks.
quote on his words.

> 3.1 Discuss solution for each difference(jira)
> 3.2 Once the solution is finalized, we can start the
> merging. (Some issues could be start concurrently. It
> depends on the discussion.)

3.  His goal is to remain compatibility. "this version is stable and
compatible with API of Storm 1.0." is not accurate statement from my point,
at least not for the security feature.
4. He share his concern on other streaming engines.


Bobby and Jungtaek 's view:
1. "Copy" is not acceptable, it will impact the security features. (Copy is
a wrong phase to use, I think Longda means more a merging)
2. With JStorm team, we start with clojure -> java translation first,
3. By optimistic view, with JStorm team, one month should be enough for
above stage.
3. Adding new features after whole code is migrated to java.
4. No need to that worry about other engines.

If my understanding of both parties are correct. I think we agree on most
of things about the process.
first: clojure -> java
second: merge features.

With a slight difference about how aggressive we want to do "clojure ->
java", and how long it takes.


@Longda, can you clarify whether my understanding of your opinion is right?


Sean


On Fri, Nov 20, 2015 at 11:40 AM, P. Taylor Goetz  wrote:

> Very well stated Juntaek.
>
> I should also point out that there's nothing stopping the JStorm team from
> releasing new versions of JStorm, or adding new features. But you would
> have to be careful to note that any such release is "JStorm" and not
> "Apache Storm." And any such release cannot be hosted on Apache
> infrastructure.
>
> We also shouldn't be too worried about competition with other stream
> processing frameworks. Competition is healthy and leads to improvements
> across the board. Spark Streaming borrowed ideas from Storm for its Kafka
> integration. It also borrowed memory management ideas from Flink. I don't
> see that as a problem. This is open source. We can, and should, do the same
> where applicable.
>
> Did we learn anything from the Heron paper? Nothing we didn't already
> know. And a lot of the points have been addressed. We dealt security first,
> which is more important for adoption, especially in the enterprise. Now
> we've addressed many performance, scaling, and usability issues. Most of
> the production deployments I've seen are nowhere near the magnitude of what
> twitter requires. But I've seen many deployments that  only exist because
> we offer security. I doubt heron has that.
>
> We've also seen an uptick in community and developer involvement, which
> means a likely increase in committers, which likely means a faster
> turnaround for patch reviews, which means a tighter release cycle for new
> features, which means we will be moving faster. This is healthy for an
> Apache project.
>
> And the inclusion of the JStorm team will only make that more so.
>
> I feel we are headed in the right direction, and there are good things to
> come.
>
> -Taylor
>
>
> > On Nov 19, 2015, at 6:38 PM, 임정택  wrote:
> >
> > Sorry Longda, but I can't help telling that I also disagree about
> changing codebase.
> >
> > Feature matrix shows us how far Apache Storm and JStorm are diverged,
> just in point of feature's view. We can't be safe to change although
> feature matrixes are identical, because feature matrix doesn't contain the
> details.
> >
> > I mean, users could be scared when expected behaviors are not in place
> although they're small. User experience is the one of the most important
> part of the project, and if UX changes are huge, barrier for upgrading
> their Storm cluster to 2.0 is not far easier than migrating to Heron. It
> should be the worst scenario I can imagine after merging.
> >
> > The safest way to merge is applying JStorm's great features to Apache
> Storm.
> > I think porting language of Apache Storm to Java is not tightly related
> to merge JStorm. I agree

[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-19 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45433430
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java 
---
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.generated.ReadableBlobMeta;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.thrift.TBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+
+import java.util.*;
+
+import static backtype.storm.blobstore.BlobStoreAclHandler.ADMIN;
+import static backtype.storm.blobstore.BlobStoreAclHandler.READ;
+import static backtype.storm.blobstore.BlobStoreAclHandler.WRITE;
+
+/**
+ * Provides a local file system backed blob store implementation for 
Nimbus.
+ */
+public class LocalFsBlobStore extends BlobStore {
+  public static final Logger LOG = 
LoggerFactory.getLogger(LocalFsBlobStore.class);
+  private static final String DATA_PREFIX = "data_";
+  private static final String META_PREFIX = "meta_";
+  protected BlobStoreAclHandler _aclHandler;
+  private final String BLOBSTORE_SUBTREE = "/blobstore/";
+  private NimbusInfo nimbusInfo;
+  private FileBlobStoreImpl fbs;
+  private Map conf;
+
+  @Override
+  public void prepare(Map conf, String overrideBase, NimbusInfo 
nimbusInfo) {
+this.conf = conf;
+this.nimbusInfo = nimbusInfo;
+ if (overrideBase == null) {
+  overrideBase = (String)conf.get(Config.BLOBSTORE_DIR);
+  if (overrideBase == null) {
+overrideBase = (String) conf.get(Config.STORM_LOCAL_DIR);
+  }
+}
+File baseDir = new File(overrideBase, BASE_BLOBS_DIR_NAME);
+try {
+  fbs = new FileBlobStoreImpl(baseDir, conf);
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+_aclHandler = new BlobStoreAclHandler(conf);
+  }
+
+  @Override
+  public AtomicOutputStream createBlob(String key, SettableBlobMeta meta, 
Subject who) throws AuthorizationException, KeyAlreadyExistsException {
+LOG.debug("Creating Blob for key {}", key);
+validateKey(key);
+_aclHandler.normalizeSettableBlobMeta(key, meta, who, READ | WRITE | 
ADMIN);
+BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl());
+_aclHandler.hasPermissions(meta.get_acl(), READ | WRITE | ADMIN, who, 
key);
+if (fbs.exists(DATA_PREFIX+key)) {
+  throw new KeyAlreadyExistsException(key);
+}
+BlobStoreFileOutputStream mOut = null;
+try {
+  mOut = new BlobStoreFileOutputStream(fbs.write(META_PREFIX+key, 
true));
+  mOut.write(Utils.thriftSerialize((TBase) meta));
+  mOut.close();
+  mOut = null;
+  return new BlobStoreFileOutputStream(fbs.write(DATA_PREFIX+key, 
true));
+} catch (IOException e) {
+  throw new RuntimeException(e);
+} finally {
+  if (mOut != null) {
+try {
+  mOut.cancel();
+} catch (IOException e) {
+  //Ignored
+}
+  }
+}
+  }
+
+  @Override
+  public AtomicOutputStream updateBlob(String key, Subject who) throws 
AuthorizationException, KeyNotFoundException {
+validateKey(key);
+checkForBlobOrDownload(key);
+SettableBlobMeta meta = getStore

[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15015158#comment-15015158
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45433430
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java 
---
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.generated.ReadableBlobMeta;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.thrift.TBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+
+import java.util.*;
+
+import static backtype.storm.blobstore.BlobStoreAclHandler.ADMIN;
+import static backtype.storm.blobstore.BlobStoreAclHandler.READ;
+import static backtype.storm.blobstore.BlobStoreAclHandler.WRITE;
+
+/**
+ * Provides a local file system backed blob store implementation for 
Nimbus.
+ */
+public class LocalFsBlobStore extends BlobStore {
+  public static final Logger LOG = 
LoggerFactory.getLogger(LocalFsBlobStore.class);
+  private static final String DATA_PREFIX = "data_";
+  private static final String META_PREFIX = "meta_";
+  protected BlobStoreAclHandler _aclHandler;
+  private final String BLOBSTORE_SUBTREE = "/blobstore/";
+  private NimbusInfo nimbusInfo;
+  private FileBlobStoreImpl fbs;
+  private Map conf;
+
+  @Override
+  public void prepare(Map conf, String overrideBase, NimbusInfo 
nimbusInfo) {
+this.conf = conf;
+this.nimbusInfo = nimbusInfo;
+ if (overrideBase == null) {
+  overrideBase = (String)conf.get(Config.BLOBSTORE_DIR);
+  if (overrideBase == null) {
+overrideBase = (String) conf.get(Config.STORM_LOCAL_DIR);
+  }
+}
+File baseDir = new File(overrideBase, BASE_BLOBS_DIR_NAME);
+try {
+  fbs = new FileBlobStoreImpl(baseDir, conf);
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+_aclHandler = new BlobStoreAclHandler(conf);
+  }
+
+  @Override
+  public AtomicOutputStream createBlob(String key, SettableBlobMeta meta, 
Subject who) throws AuthorizationException, KeyAlreadyExistsException {
+LOG.debug("Creating Blob for key {}", key);
+validateKey(key);
+_aclHandler.normalizeSettableBlobMeta(key, meta, who, READ | WRITE | 
ADMIN);
+BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl());
+_aclHandler.hasPermissions(meta.get_acl(), READ | WRITE | ADMIN, who, 
key);
+if (fbs.exists(DATA_PREFIX+key)) {
+  throw new KeyAlreadyExistsException(key);
+}
+BlobStoreFileOutputStream mOut = null;
+try {
+  mOut = new BlobStoreFileOutputStream(fbs.write(META_PREFIX+key, 
true));
+  mOut.write(Utils.thriftSerialize((TBase) meta));
+  mOut.close();
+  mOut = null;
+  return new BlobStoreFileOutputStream(fbs.write(DATA_PREFIX+key, 
true));
+} catch (IOException e) {
+  throw new RuntimeException(e);
+} finally {
+  if (mOut != null) {
+try {
+  mOut.cancel();
+} catch (IOException e) {
+  //Ignored
+}
+  }
+}
+  }

[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15015155#comment-15015155
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45433397
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java 
---
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.generated.ReadableBlobMeta;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.thrift.TBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+
+import java.util.*;
+
+import static backtype.storm.blobstore.BlobStoreAclHandler.ADMIN;
+import static backtype.storm.blobstore.BlobStoreAclHandler.READ;
+import static backtype.storm.blobstore.BlobStoreAclHandler.WRITE;
+
+/**
+ * Provides a local file system backed blob store implementation for 
Nimbus.
+ */
+public class LocalFsBlobStore extends BlobStore {
+  public static final Logger LOG = 
LoggerFactory.getLogger(LocalFsBlobStore.class);
+  private static final String DATA_PREFIX = "data_";
+  private static final String META_PREFIX = "meta_";
+  protected BlobStoreAclHandler _aclHandler;
+  private final String BLOBSTORE_SUBTREE = "/blobstore/";
+  private NimbusInfo nimbusInfo;
+  private FileBlobStoreImpl fbs;
+  private Map conf;
+
+  @Override
+  public void prepare(Map conf, String overrideBase, NimbusInfo 
nimbusInfo) {
+this.conf = conf;
+this.nimbusInfo = nimbusInfo;
+ if (overrideBase == null) {
+  overrideBase = (String)conf.get(Config.BLOBSTORE_DIR);
+  if (overrideBase == null) {
+overrideBase = (String) conf.get(Config.STORM_LOCAL_DIR);
+  }
+}
+File baseDir = new File(overrideBase, BASE_BLOBS_DIR_NAME);
+try {
+  fbs = new FileBlobStoreImpl(baseDir, conf);
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+_aclHandler = new BlobStoreAclHandler(conf);
+  }
+
+  @Override
+  public AtomicOutputStream createBlob(String key, SettableBlobMeta meta, 
Subject who) throws AuthorizationException, KeyAlreadyExistsException {
+LOG.debug("Creating Blob for key {}", key);
+validateKey(key);
+_aclHandler.normalizeSettableBlobMeta(key, meta, who, READ | WRITE | 
ADMIN);
+BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl());
+_aclHandler.hasPermissions(meta.get_acl(), READ | WRITE | ADMIN, who, 
key);
+if (fbs.exists(DATA_PREFIX+key)) {
+  throw new KeyAlreadyExistsException(key);
+}
+BlobStoreFileOutputStream mOut = null;
+try {
+  mOut = new BlobStoreFileOutputStream(fbs.write(META_PREFIX+key, 
true));
+  mOut.write(Utils.thriftSerialize((TBase) meta));
+  mOut.close();
+  mOut = null;
+  return new BlobStoreFileOutputStream(fbs.write(DATA_PREFIX+key, 
true));
+} catch (IOException e) {
+  throw new RuntimeException(e);
+} finally {
+  if (mOut != null) {
+try {
+  mOut.cancel();
+} catch (IOException e) {
+  //Ignored
+}
+  }
+}
+  }

[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-19 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45433397
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java 
---
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.generated.ReadableBlobMeta;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.thrift.TBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+
+import java.util.*;
+
+import static backtype.storm.blobstore.BlobStoreAclHandler.ADMIN;
+import static backtype.storm.blobstore.BlobStoreAclHandler.READ;
+import static backtype.storm.blobstore.BlobStoreAclHandler.WRITE;
+
+/**
+ * Provides a local file system backed blob store implementation for 
Nimbus.
+ */
+public class LocalFsBlobStore extends BlobStore {
+  public static final Logger LOG = 
LoggerFactory.getLogger(LocalFsBlobStore.class);
+  private static final String DATA_PREFIX = "data_";
+  private static final String META_PREFIX = "meta_";
+  protected BlobStoreAclHandler _aclHandler;
+  private final String BLOBSTORE_SUBTREE = "/blobstore/";
+  private NimbusInfo nimbusInfo;
+  private FileBlobStoreImpl fbs;
+  private Map conf;
+
+  @Override
+  public void prepare(Map conf, String overrideBase, NimbusInfo 
nimbusInfo) {
+this.conf = conf;
+this.nimbusInfo = nimbusInfo;
+ if (overrideBase == null) {
+  overrideBase = (String)conf.get(Config.BLOBSTORE_DIR);
+  if (overrideBase == null) {
+overrideBase = (String) conf.get(Config.STORM_LOCAL_DIR);
+  }
+}
+File baseDir = new File(overrideBase, BASE_BLOBS_DIR_NAME);
+try {
+  fbs = new FileBlobStoreImpl(baseDir, conf);
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+_aclHandler = new BlobStoreAclHandler(conf);
+  }
+
+  @Override
+  public AtomicOutputStream createBlob(String key, SettableBlobMeta meta, 
Subject who) throws AuthorizationException, KeyAlreadyExistsException {
+LOG.debug("Creating Blob for key {}", key);
+validateKey(key);
+_aclHandler.normalizeSettableBlobMeta(key, meta, who, READ | WRITE | 
ADMIN);
+BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl());
+_aclHandler.hasPermissions(meta.get_acl(), READ | WRITE | ADMIN, who, 
key);
+if (fbs.exists(DATA_PREFIX+key)) {
+  throw new KeyAlreadyExistsException(key);
+}
+BlobStoreFileOutputStream mOut = null;
+try {
+  mOut = new BlobStoreFileOutputStream(fbs.write(META_PREFIX+key, 
true));
+  mOut.write(Utils.thriftSerialize((TBase) meta));
+  mOut.close();
+  mOut = null;
+  return new BlobStoreFileOutputStream(fbs.write(DATA_PREFIX+key, 
true));
+} catch (IOException e) {
+  throw new RuntimeException(e);
+} finally {
+  if (mOut != null) {
+try {
+  mOut.cancel();
+} catch (IOException e) {
+  //Ignored
+}
+  }
+}
+  }
+
+  @Override
+  public AtomicOutputStream updateBlob(String key, Subject who) throws 
AuthorizationException, KeyNotFoundException {
+validateKey(key);
+checkForBlobOrDownload(key);
+SettableBlobMeta meta = getStore

[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15015151#comment-15015151
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user redsanket commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45433271
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/KeyVersion.java ---
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.blobstore;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.TreeSet;
+import java.util.Map;
+import java.util.List;
+
+/**
+ * Class hands over the version of the key to be stored within the 
zookeeper
+ */
+public class KeyVersion {
+  private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+  private final String BLOBSTORE_SUBTREE="/blobstore";
+  private final String 
BLOBSTORE_KEY_COUNTER_SUBTREE="/blobstorekeycounter";
+  private String key;
+  private NimbusInfo nimbusInfo;
+
+  public KeyVersion(String key, NimbusInfo nimbusInfo) {
+this.key = key;
+this.nimbusInfo = nimbusInfo;
+  }
+
+  public int getKeyVersion(Map conf) {
+TreeSet versions = new TreeSet();
+CuratorFramework zkClient = Utils.createZKClient(conf);
+try {
+  // Key has not been created yet and it is the first time it is being 
created
+  if(zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) == 
null) {
+
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
+
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(BLOBSTORE_KEY_COUNTER_SUBTREE + 
"/" + key + "/" + 1);
+return 1;
+  }
+
+  // When all nimbodes go down and one or few of them come up
+  // Unfortunately there might not be an exact way to know which one 
contains the most updated blob
+  // if all go down which is unlikely. Hence there might be a need to 
update the blob if all go down
+  List stateInfoList = 
zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + "/" + key);
+  LOG.debug("stateInfoList {} stateInfoList {}", stateInfoList.size(), 
stateInfoList);
+  if(stateInfoList.isEmpty()) {
+return getKeyVersionCounterValue(zkClient, key);
+  }
+
+  LOG.debug("stateInfoSize {}", stateInfoList.size());
+  // In all other cases check for the latest version on the nimbus and 
assign the version
+  // check if all are have same version, if not assign the highest 
version
+  for (String stateInfo:stateInfoList) {
+
versions.add(Integer.parseInt(Utils.normalizeVersionInfo(stateInfo)[1]));
+  }
+
+  int currentCounter = getKeyVersionCounterValue(zkClient, key);
+  // This condition returns version when a nimbus crashes and comes up
+  if (!checkIfStateContainsCurrentNimbusHost(stateInfoList, 
nimbusInfo) && !nimbusInfo.isLeader()) {
+if (versions.last() < currentCounter) {
+  return currentCounter;
+} else {
+  return currentCounter - 1;
+}
+  }
+  // Condition checks for an update scenario
+  if (stateInfoList.size() >= 1 && versions.size() == 1) {
--- End diff --

I will document it properly and explain my thought process


> Dist Cache: Basic Functionality
> ---
>
> Key: STORM-876
> URL: https://issues.apache.org/jira/browse/STORM-876
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assi

[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-19 Thread redsanket
Github user redsanket commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45433271
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/KeyVersion.java ---
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.blobstore;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.TreeSet;
+import java.util.Map;
+import java.util.List;
+
+/**
+ * Class hands over the version of the key to be stored within the 
zookeeper
+ */
+public class KeyVersion {
+  private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+  private final String BLOBSTORE_SUBTREE="/blobstore";
+  private final String 
BLOBSTORE_KEY_COUNTER_SUBTREE="/blobstorekeycounter";
+  private String key;
+  private NimbusInfo nimbusInfo;
+
+  public KeyVersion(String key, NimbusInfo nimbusInfo) {
+this.key = key;
+this.nimbusInfo = nimbusInfo;
+  }
+
+  public int getKeyVersion(Map conf) {
+TreeSet versions = new TreeSet();
+CuratorFramework zkClient = Utils.createZKClient(conf);
+try {
+  // Key has not been created yet and it is the first time it is being 
created
+  if(zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) == 
null) {
+
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
+
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(BLOBSTORE_KEY_COUNTER_SUBTREE + 
"/" + key + "/" + 1);
+return 1;
+  }
+
+  // When all nimbodes go down and one or few of them come up
+  // Unfortunately there might not be an exact way to know which one 
contains the most updated blob
+  // if all go down which is unlikely. Hence there might be a need to 
update the blob if all go down
+  List stateInfoList = 
zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + "/" + key);
+  LOG.debug("stateInfoList {} stateInfoList {}", stateInfoList.size(), 
stateInfoList);
+  if(stateInfoList.isEmpty()) {
+return getKeyVersionCounterValue(zkClient, key);
+  }
+
+  LOG.debug("stateInfoSize {}", stateInfoList.size());
+  // In all other cases check for the latest version on the nimbus and 
assign the version
+  // check if all are have same version, if not assign the highest 
version
+  for (String stateInfo:stateInfoList) {
+
versions.add(Integer.parseInt(Utils.normalizeVersionInfo(stateInfo)[1]));
+  }
+
+  int currentCounter = getKeyVersionCounterValue(zkClient, key);
+  // This condition returns version when a nimbus crashes and comes up
+  if (!checkIfStateContainsCurrentNimbusHost(stateInfoList, 
nimbusInfo) && !nimbusInfo.isLeader()) {
+if (versions.last() < currentCounter) {
+  return currentCounter;
+} else {
+  return currentCounter - 1;
+}
+  }
+  // Condition checks for an update scenario
+  if (stateInfoList.size() >= 1 && versions.size() == 1) {
--- End diff --

I will document it properly and explain my thought process


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] Storm 2.0 plan

2015-11-19 Thread P. Taylor Goetz
Very well stated Juntaek.

I should also point out that there's nothing stopping the JStorm team from 
releasing new versions of JStorm, or adding new features. But you would have to 
be careful to note that any such release is "JStorm" and not "Apache Storm." 
And any such release cannot be hosted on Apache infrastructure.

We also shouldn't be too worried about competition with other stream processing 
frameworks. Competition is healthy and leads to improvements across the board. 
Spark Streaming borrowed ideas from Storm for its Kafka integration. It also 
borrowed memory management ideas from Flink. I don't see that as a problem. 
This is open source. We can, and should, do the same where applicable.

Did we learn anything from the Heron paper? Nothing we didn't already know. And 
a lot of the points have been addressed. We dealt security first, which is more 
important for adoption, especially in the enterprise. Now we've addressed many 
performance, scaling, and usability issues. Most of the production deployments 
I've seen are nowhere near the magnitude of what twitter requires. But I've 
seen many deployments that  only exist because we offer security. I doubt heron 
has that.

We've also seen an uptick in community and developer involvement, which means a 
likely increase in committers, which likely means a faster turnaround for patch 
reviews, which means a tighter release cycle for new features, which means we 
will be moving faster. This is healthy for an Apache project.

And the inclusion of the JStorm team will only make that more so.

I feel we are headed in the right direction, and there are good things to come.

-Taylor


> On Nov 19, 2015, at 6:38 PM, 임정택  wrote:
> 
> Sorry Longda, but I can't help telling that I also disagree about changing 
> codebase.
> 
> Feature matrix shows us how far Apache Storm and JStorm are diverged, just in 
> point of feature's view. We can't be safe to change although feature matrixes 
> are identical, because feature matrix doesn't contain the details.
> 
> I mean, users could be scared when expected behaviors are not in place 
> although they're small. User experience is the one of the most important part 
> of the project, and if UX changes are huge, barrier for upgrading their Storm 
> cluster to 2.0 is not far easier than migrating to Heron. It should be the 
> worst scenario I can imagine after merging.
> 
> The safest way to merge is applying JStorm's great features to Apache Storm.
> I think porting language of Apache Storm to Java is not tightly related to 
> merge JStorm. I agree that merging becomes a trigger, but Apache Storm itself 
> can port to other languages like Java, Scala, or something else which are 
> more popular than Clojure.
> 
> And I'm also not scary about Flink, Heron, Spark, etc.
> It doesn't mean other projects are not greater then Storm. Just I'm saying 
> each projects have their own strength.
> For example, all conferences are saying about Spark, and as one of users of 
> Spark, Spark is really great. If you are a little bit familiar with Scala, 
> you can just apply Scala-like functional methods to RDD. Really easy to use.
> But it doesn't mean that Spark can replace Storm in all kind of use cases. 
> Recently I've seen some articles that why Storm is more preferred in realtime 
> streaming processing.
> 
> Competition should give us a positive motivation. I hope that our roadmap 
> isn't focused to defeat competitors, but is focused to present great 
> features, better performance, and better UX to Storm community. It's not 
> commercial product, it's open source project!
> 
> tl;dr. Please don't change codebase unless we plan to release a brand new 
> project. It breaks UX completely which could make users leave.
> 
> I'm also open to other opinions as well.
> 
> Best,
> Jungtaek Lim (HeartSaVioR)
> 
> 
> 2015-11-20 0:00 GMT+09:00 Bobby Evans :
>> I disagree completely.  You claim that JStorm is compatible with storm 1.0.  
>> I don't believe that it is 100% compatible.  There has been more then 2 
>> years of software development happening on both sides.  Security was not 
>> done in a day, and porting it over to JStorm is not going to happen quickly, 
>> and because of the major architectural changes between storm and JStorm I 
>> believe we would have to make some serious enhancements to fully support a 
>> secure TopologyMaster, but I need to look into it more.  The blob store is 
>> another piece of code that has taken a very long time to develop.  There are 
>> numberous others.  The big features are not the ones that make me nervous 
>> because we can plan for them, it is the hundreds of small JIRA and features 
>> that will result in minor incompatibilities.  If we start with storm itself, 
>> and follow the same process that we have been doing up until now, if there 
>> is a reason to stop the port add in an important feature and do a release, 
>> we can.  We will know that we have compatibility vs starting with 

[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-19 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45432871
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java 
---
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.generated.ReadableBlobMeta;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.thrift.TBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+
+import java.util.*;
+
+import static backtype.storm.blobstore.BlobStoreAclHandler.ADMIN;
+import static backtype.storm.blobstore.BlobStoreAclHandler.READ;
+import static backtype.storm.blobstore.BlobStoreAclHandler.WRITE;
+
+/**
+ * Provides a local file system backed blob store implementation for 
Nimbus.
+ */
+public class LocalFsBlobStore extends BlobStore {
+  public static final Logger LOG = 
LoggerFactory.getLogger(LocalFsBlobStore.class);
+  private static final String DATA_PREFIX = "data_";
+  private static final String META_PREFIX = "meta_";
+  protected BlobStoreAclHandler _aclHandler;
+  private final String BLOBSTORE_SUBTREE = "/blobstore/";
+  private NimbusInfo nimbusInfo;
+  private FileBlobStoreImpl fbs;
+  private Map conf;
+
+  @Override
+  public void prepare(Map conf, String overrideBase, NimbusInfo 
nimbusInfo) {
+this.conf = conf;
+this.nimbusInfo = nimbusInfo;
+ if (overrideBase == null) {
+  overrideBase = (String)conf.get(Config.BLOBSTORE_DIR);
+  if (overrideBase == null) {
+overrideBase = (String) conf.get(Config.STORM_LOCAL_DIR);
+  }
+}
+File baseDir = new File(overrideBase, BASE_BLOBS_DIR_NAME);
+try {
+  fbs = new FileBlobStoreImpl(baseDir, conf);
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+_aclHandler = new BlobStoreAclHandler(conf);
+  }
+
+  @Override
+  public AtomicOutputStream createBlob(String key, SettableBlobMeta meta, 
Subject who) throws AuthorizationException, KeyAlreadyExistsException {
+LOG.debug("Creating Blob for key {}", key);
+validateKey(key);
+_aclHandler.normalizeSettableBlobMeta(key, meta, who, READ | WRITE | 
ADMIN);
+BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl());
+_aclHandler.hasPermissions(meta.get_acl(), READ | WRITE | ADMIN, who, 
key);
+if (fbs.exists(DATA_PREFIX+key)) {
+  throw new KeyAlreadyExistsException(key);
+}
+BlobStoreFileOutputStream mOut = null;
+try {
+  mOut = new BlobStoreFileOutputStream(fbs.write(META_PREFIX+key, 
true));
+  mOut.write(Utils.thriftSerialize((TBase) meta));
+  mOut.close();
+  mOut = null;
+  return new BlobStoreFileOutputStream(fbs.write(DATA_PREFIX+key, 
true));
+} catch (IOException e) {
+  throw new RuntimeException(e);
+} finally {
+  if (mOut != null) {
+try {
+  mOut.cancel();
+} catch (IOException e) {
+  //Ignored
+}
+  }
+}
+  }
+
+  @Override
+  public AtomicOutputStream updateBlob(String key, Subject who) throws 
AuthorizationException, KeyNotFoundException {
+validateKey(key);
+checkForBlobOrDownload(key);
+SettableBlobMeta meta = getStore

[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15015141#comment-15015141
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45432871
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java 
---
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.generated.ReadableBlobMeta;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.thrift.TBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+
+import java.util.*;
+
+import static backtype.storm.blobstore.BlobStoreAclHandler.ADMIN;
+import static backtype.storm.blobstore.BlobStoreAclHandler.READ;
+import static backtype.storm.blobstore.BlobStoreAclHandler.WRITE;
+
+/**
+ * Provides a local file system backed blob store implementation for 
Nimbus.
+ */
+public class LocalFsBlobStore extends BlobStore {
+  public static final Logger LOG = 
LoggerFactory.getLogger(LocalFsBlobStore.class);
+  private static final String DATA_PREFIX = "data_";
+  private static final String META_PREFIX = "meta_";
+  protected BlobStoreAclHandler _aclHandler;
+  private final String BLOBSTORE_SUBTREE = "/blobstore/";
+  private NimbusInfo nimbusInfo;
+  private FileBlobStoreImpl fbs;
+  private Map conf;
+
+  @Override
+  public void prepare(Map conf, String overrideBase, NimbusInfo 
nimbusInfo) {
+this.conf = conf;
+this.nimbusInfo = nimbusInfo;
+ if (overrideBase == null) {
+  overrideBase = (String)conf.get(Config.BLOBSTORE_DIR);
+  if (overrideBase == null) {
+overrideBase = (String) conf.get(Config.STORM_LOCAL_DIR);
+  }
+}
+File baseDir = new File(overrideBase, BASE_BLOBS_DIR_NAME);
+try {
+  fbs = new FileBlobStoreImpl(baseDir, conf);
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+_aclHandler = new BlobStoreAclHandler(conf);
+  }
+
+  @Override
+  public AtomicOutputStream createBlob(String key, SettableBlobMeta meta, 
Subject who) throws AuthorizationException, KeyAlreadyExistsException {
+LOG.debug("Creating Blob for key {}", key);
+validateKey(key);
+_aclHandler.normalizeSettableBlobMeta(key, meta, who, READ | WRITE | 
ADMIN);
+BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl());
+_aclHandler.hasPermissions(meta.get_acl(), READ | WRITE | ADMIN, who, 
key);
+if (fbs.exists(DATA_PREFIX+key)) {
+  throw new KeyAlreadyExistsException(key);
+}
+BlobStoreFileOutputStream mOut = null;
+try {
+  mOut = new BlobStoreFileOutputStream(fbs.write(META_PREFIX+key, 
true));
+  mOut.write(Utils.thriftSerialize((TBase) meta));
+  mOut.close();
+  mOut = null;
+  return new BlobStoreFileOutputStream(fbs.write(DATA_PREFIX+key, 
true));
+} catch (IOException e) {
+  throw new RuntimeException(e);
+} finally {
+  if (mOut != null) {
+try {
+  mOut.cancel();
+} catch (IOException e) {
+  //Ignored
+}
+  }
+}
+  }

[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15015138#comment-15015138
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45432818
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java 
---
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.generated.ReadableBlobMeta;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.thrift.TBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+
+import java.util.*;
+
+import static backtype.storm.blobstore.BlobStoreAclHandler.ADMIN;
+import static backtype.storm.blobstore.BlobStoreAclHandler.READ;
+import static backtype.storm.blobstore.BlobStoreAclHandler.WRITE;
+
+/**
+ * Provides a local file system backed blob store implementation for 
Nimbus.
+ */
+public class LocalFsBlobStore extends BlobStore {
+  public static final Logger LOG = 
LoggerFactory.getLogger(LocalFsBlobStore.class);
+  private static final String DATA_PREFIX = "data_";
+  private static final String META_PREFIX = "meta_";
+  protected BlobStoreAclHandler _aclHandler;
+  private final String BLOBSTORE_SUBTREE = "/blobstore/";
+  private NimbusInfo nimbusInfo;
+  private FileBlobStoreImpl fbs;
+  private Map conf;
+
+  @Override
+  public void prepare(Map conf, String overrideBase, NimbusInfo 
nimbusInfo) {
+this.conf = conf;
+this.nimbusInfo = nimbusInfo;
+ if (overrideBase == null) {
+  overrideBase = (String)conf.get(Config.BLOBSTORE_DIR);
+  if (overrideBase == null) {
+overrideBase = (String) conf.get(Config.STORM_LOCAL_DIR);
+  }
+}
+File baseDir = new File(overrideBase, BASE_BLOBS_DIR_NAME);
+try {
+  fbs = new FileBlobStoreImpl(baseDir, conf);
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+_aclHandler = new BlobStoreAclHandler(conf);
+  }
+
+  @Override
+  public AtomicOutputStream createBlob(String key, SettableBlobMeta meta, 
Subject who) throws AuthorizationException, KeyAlreadyExistsException {
+LOG.debug("Creating Blob for key {}", key);
+validateKey(key);
+_aclHandler.normalizeSettableBlobMeta(key, meta, who, READ | WRITE | 
ADMIN);
+BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl());
+_aclHandler.hasPermissions(meta.get_acl(), READ | WRITE | ADMIN, who, 
key);
+if (fbs.exists(DATA_PREFIX+key)) {
+  throw new KeyAlreadyExistsException(key);
+}
+BlobStoreFileOutputStream mOut = null;
+try {
+  mOut = new BlobStoreFileOutputStream(fbs.write(META_PREFIX+key, 
true));
+  mOut.write(Utils.thriftSerialize((TBase) meta));
+  mOut.close();
+  mOut = null;
+  return new BlobStoreFileOutputStream(fbs.write(DATA_PREFIX+key, 
true));
+} catch (IOException e) {
+  throw new RuntimeException(e);
+} finally {
+  if (mOut != null) {
+try {
+  mOut.cancel();
+} catch (IOException e) {
+  //Ignored
+}
+  }
+}
+  }

[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-19 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45432818
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java 
---
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.generated.ReadableBlobMeta;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.thrift.TBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+
+import java.util.*;
+
+import static backtype.storm.blobstore.BlobStoreAclHandler.ADMIN;
+import static backtype.storm.blobstore.BlobStoreAclHandler.READ;
+import static backtype.storm.blobstore.BlobStoreAclHandler.WRITE;
+
+/**
+ * Provides a local file system backed blob store implementation for 
Nimbus.
+ */
+public class LocalFsBlobStore extends BlobStore {
+  public static final Logger LOG = 
LoggerFactory.getLogger(LocalFsBlobStore.class);
+  private static final String DATA_PREFIX = "data_";
+  private static final String META_PREFIX = "meta_";
+  protected BlobStoreAclHandler _aclHandler;
+  private final String BLOBSTORE_SUBTREE = "/blobstore/";
+  private NimbusInfo nimbusInfo;
+  private FileBlobStoreImpl fbs;
+  private Map conf;
+
+  @Override
+  public void prepare(Map conf, String overrideBase, NimbusInfo 
nimbusInfo) {
+this.conf = conf;
+this.nimbusInfo = nimbusInfo;
+ if (overrideBase == null) {
+  overrideBase = (String)conf.get(Config.BLOBSTORE_DIR);
+  if (overrideBase == null) {
+overrideBase = (String) conf.get(Config.STORM_LOCAL_DIR);
+  }
+}
+File baseDir = new File(overrideBase, BASE_BLOBS_DIR_NAME);
+try {
+  fbs = new FileBlobStoreImpl(baseDir, conf);
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+_aclHandler = new BlobStoreAclHandler(conf);
+  }
+
+  @Override
+  public AtomicOutputStream createBlob(String key, SettableBlobMeta meta, 
Subject who) throws AuthorizationException, KeyAlreadyExistsException {
+LOG.debug("Creating Blob for key {}", key);
+validateKey(key);
+_aclHandler.normalizeSettableBlobMeta(key, meta, who, READ | WRITE | 
ADMIN);
+BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl());
+_aclHandler.hasPermissions(meta.get_acl(), READ | WRITE | ADMIN, who, 
key);
+if (fbs.exists(DATA_PREFIX+key)) {
+  throw new KeyAlreadyExistsException(key);
+}
+BlobStoreFileOutputStream mOut = null;
+try {
+  mOut = new BlobStoreFileOutputStream(fbs.write(META_PREFIX+key, 
true));
+  mOut.write(Utils.thriftSerialize((TBase) meta));
+  mOut.close();
+  mOut = null;
+  return new BlobStoreFileOutputStream(fbs.write(DATA_PREFIX+key, 
true));
+} catch (IOException e) {
+  throw new RuntimeException(e);
+} finally {
+  if (mOut != null) {
+try {
+  mOut.cancel();
+} catch (IOException e) {
+  //Ignored
+}
+  }
+}
+  }
+
+  @Override
+  public AtomicOutputStream updateBlob(String key, Subject who) throws 
AuthorizationException, KeyNotFoundException {
+validateKey(key);
+checkForBlobOrDownload(key);
+SettableBlobMeta meta = getStore

[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-19 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45432788
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java 
---
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.generated.ReadableBlobMeta;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.thrift.TBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+
+import java.util.*;
+
+import static backtype.storm.blobstore.BlobStoreAclHandler.ADMIN;
+import static backtype.storm.blobstore.BlobStoreAclHandler.READ;
+import static backtype.storm.blobstore.BlobStoreAclHandler.WRITE;
+
+/**
+ * Provides a local file system backed blob store implementation for 
Nimbus.
+ */
+public class LocalFsBlobStore extends BlobStore {
+  public static final Logger LOG = 
LoggerFactory.getLogger(LocalFsBlobStore.class);
+  private static final String DATA_PREFIX = "data_";
+  private static final String META_PREFIX = "meta_";
+  protected BlobStoreAclHandler _aclHandler;
+  private final String BLOBSTORE_SUBTREE = "/blobstore/";
+  private NimbusInfo nimbusInfo;
+  private FileBlobStoreImpl fbs;
+  private Map conf;
+
+  @Override
+  public void prepare(Map conf, String overrideBase, NimbusInfo 
nimbusInfo) {
+this.conf = conf;
+this.nimbusInfo = nimbusInfo;
+ if (overrideBase == null) {
+  overrideBase = (String)conf.get(Config.BLOBSTORE_DIR);
+  if (overrideBase == null) {
+overrideBase = (String) conf.get(Config.STORM_LOCAL_DIR);
+  }
+}
+File baseDir = new File(overrideBase, BASE_BLOBS_DIR_NAME);
+try {
+  fbs = new FileBlobStoreImpl(baseDir, conf);
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+_aclHandler = new BlobStoreAclHandler(conf);
+  }
+
+  @Override
+  public AtomicOutputStream createBlob(String key, SettableBlobMeta meta, 
Subject who) throws AuthorizationException, KeyAlreadyExistsException {
+LOG.debug("Creating Blob for key {}", key);
+validateKey(key);
+_aclHandler.normalizeSettableBlobMeta(key, meta, who, READ | WRITE | 
ADMIN);
+BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl());
+_aclHandler.hasPermissions(meta.get_acl(), READ | WRITE | ADMIN, who, 
key);
+if (fbs.exists(DATA_PREFIX+key)) {
+  throw new KeyAlreadyExistsException(key);
+}
+BlobStoreFileOutputStream mOut = null;
+try {
+  mOut = new BlobStoreFileOutputStream(fbs.write(META_PREFIX+key, 
true));
+  mOut.write(Utils.thriftSerialize((TBase) meta));
--- End diff --

`meta` is already a `TBase`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15015136#comment-15015136
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45432788
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java 
---
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.generated.ReadableBlobMeta;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.thrift.TBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+
+import java.util.*;
+
+import static backtype.storm.blobstore.BlobStoreAclHandler.ADMIN;
+import static backtype.storm.blobstore.BlobStoreAclHandler.READ;
+import static backtype.storm.blobstore.BlobStoreAclHandler.WRITE;
+
+/**
+ * Provides a local file system backed blob store implementation for 
Nimbus.
+ */
+public class LocalFsBlobStore extends BlobStore {
+  public static final Logger LOG = 
LoggerFactory.getLogger(LocalFsBlobStore.class);
+  private static final String DATA_PREFIX = "data_";
+  private static final String META_PREFIX = "meta_";
+  protected BlobStoreAclHandler _aclHandler;
+  private final String BLOBSTORE_SUBTREE = "/blobstore/";
+  private NimbusInfo nimbusInfo;
+  private FileBlobStoreImpl fbs;
+  private Map conf;
+
+  @Override
+  public void prepare(Map conf, String overrideBase, NimbusInfo 
nimbusInfo) {
+this.conf = conf;
+this.nimbusInfo = nimbusInfo;
+ if (overrideBase == null) {
+  overrideBase = (String)conf.get(Config.BLOBSTORE_DIR);
+  if (overrideBase == null) {
+overrideBase = (String) conf.get(Config.STORM_LOCAL_DIR);
+  }
+}
+File baseDir = new File(overrideBase, BASE_BLOBS_DIR_NAME);
+try {
+  fbs = new FileBlobStoreImpl(baseDir, conf);
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+_aclHandler = new BlobStoreAclHandler(conf);
+  }
+
+  @Override
+  public AtomicOutputStream createBlob(String key, SettableBlobMeta meta, 
Subject who) throws AuthorizationException, KeyAlreadyExistsException {
+LOG.debug("Creating Blob for key {}", key);
+validateKey(key);
+_aclHandler.normalizeSettableBlobMeta(key, meta, who, READ | WRITE | 
ADMIN);
+BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl());
+_aclHandler.hasPermissions(meta.get_acl(), READ | WRITE | ADMIN, who, 
key);
+if (fbs.exists(DATA_PREFIX+key)) {
+  throw new KeyAlreadyExistsException(key);
+}
+BlobStoreFileOutputStream mOut = null;
+try {
+  mOut = new BlobStoreFileOutputStream(fbs.write(META_PREFIX+key, 
true));
+  mOut.write(Utils.thriftSerialize((TBase) meta));
--- End diff --

`meta` is already a `TBase`


> Dist Cache: Basic Functionality
> ---
>
> Key: STORM-876
> URL: https://issues.apache.org/jira/browse/STORM-876
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: Robert Joseph Evans

[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15015131#comment-15015131
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45432555
  
--- Diff: 
storm-core/src/jvm/backtype/storm/blobstore/BlobStoreAclHandler.java ---
@@ -0,0 +1,401 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.generated.AccessControl;
+import backtype.storm.generated.AccessControlType;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.security.auth.AuthUtils;
+import backtype.storm.security.auth.IPrincipalToLocal;
+import backtype.storm.security.auth.NimbusPrincipal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Provides common handling of acls for Blobstores.
+ * Also contains some static utility functions related to Blobstores.
+ */
+public class BlobStoreAclHandler {
+  public static final Logger LOG = 
LoggerFactory.getLogger(BlobStoreAclHandler.class);
+  private final IPrincipalToLocal _ptol;
+
+  public static final int READ = 0x01;
+  public static final int WRITE = 0x02;
+  public static final int ADMIN = 0x04;
+  public static final List WORLD_EVERYTHING =
+  Arrays.asList(new AccessControl(AccessControlType.OTHER, READ | 
WRITE | ADMIN));
+  public static final List DEFAULT = new 
ArrayList();
+  private Set _supervisors;
+  private Set _admins;
+
+  public BlobStoreAclHandler(Map conf) {
+_ptol = AuthUtils.GetPrincipalToLocalPlugin(conf);
+_supervisors = new HashSet();
+_admins = new HashSet();
+if (conf.containsKey(Config.NIMBUS_SUPERVISOR_USERS)) {
+  
_supervisors.addAll((List)conf.get(Config.NIMBUS_SUPERVISOR_USERS));
+}
+if (conf.containsKey(Config.NIMBUS_ADMINS)) {
+  _admins.addAll((List)conf.get(Config.NIMBUS_ADMINS));
+}
+  }
+
+  private static AccessControlType parseACLType(String type) {
+if ("other".equalsIgnoreCase(type) || "o".equalsIgnoreCase(type)) {
+  return AccessControlType.OTHER;
+} else if ("user".equalsIgnoreCase(type) || 
"u".equalsIgnoreCase(type)) {
+  return AccessControlType.USER;
+}
+throw new IllegalArgumentException(type+" is not a valid access 
control type");
+  }
+
+  private static int parseAccess(String access) {
+int ret = 0;
+for (char c: access.toCharArray()) {
+  if ('r' == c) {
+ret = ret | READ;
+  } else if ('w' == c) {
+ret = ret | WRITE;
+  } else if ('a' == c) {
+ret = ret | ADMIN;
+  } else if ('-' == c) {
+//ignored
+  } else {
+throw new IllegalArgumentException("");
+  }
+}
+return ret;
+  }
+
+  public static AccessControl parseAccessControl(String str) {
+String[] parts = str.split(":");
+String type = "other";
+String name = "";
+String access = "-";
+if (parts.length > 3) {
+  throw new IllegalArgumentException("Don't know how to parse "+str+" 
into an ACL value");
+} else if (parts.length == 1) {
+  type = "other";
+  name = "";
+  access = parts[0];
+} else if (parts.length == 2) {
+  type = "user";
+  name = parts[0];
+  access = parts[1];
+} else if (parts.length == 3) {
+  type = parts[0];
+  n

[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15015130#comment-15015130
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45432533
  
--- Diff: 
storm-core/src/jvm/backtype/storm/blobstore/BlobStoreAclHandler.java ---
@@ -0,0 +1,401 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.generated.AccessControl;
+import backtype.storm.generated.AccessControlType;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.security.auth.AuthUtils;
+import backtype.storm.security.auth.IPrincipalToLocal;
+import backtype.storm.security.auth.NimbusPrincipal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Provides common handling of acls for Blobstores.
+ * Also contains some static utility functions related to Blobstores.
+ */
+public class BlobStoreAclHandler {
+  public static final Logger LOG = 
LoggerFactory.getLogger(BlobStoreAclHandler.class);
+  private final IPrincipalToLocal _ptol;
+
+  public static final int READ = 0x01;
+  public static final int WRITE = 0x02;
+  public static final int ADMIN = 0x04;
+  public static final List WORLD_EVERYTHING =
+  Arrays.asList(new AccessControl(AccessControlType.OTHER, READ | 
WRITE | ADMIN));
+  public static final List DEFAULT = new 
ArrayList();
+  private Set _supervisors;
+  private Set _admins;
+
+  public BlobStoreAclHandler(Map conf) {
+_ptol = AuthUtils.GetPrincipalToLocalPlugin(conf);
+_supervisors = new HashSet();
+_admins = new HashSet();
+if (conf.containsKey(Config.NIMBUS_SUPERVISOR_USERS)) {
+  
_supervisors.addAll((List)conf.get(Config.NIMBUS_SUPERVISOR_USERS));
+}
+if (conf.containsKey(Config.NIMBUS_ADMINS)) {
+  _admins.addAll((List)conf.get(Config.NIMBUS_ADMINS));
+}
+  }
+
+  private static AccessControlType parseACLType(String type) {
+if ("other".equalsIgnoreCase(type) || "o".equalsIgnoreCase(type)) {
+  return AccessControlType.OTHER;
+} else if ("user".equalsIgnoreCase(type) || 
"u".equalsIgnoreCase(type)) {
+  return AccessControlType.USER;
+}
+throw new IllegalArgumentException(type+" is not a valid access 
control type");
+  }
+
+  private static int parseAccess(String access) {
+int ret = 0;
+for (char c: access.toCharArray()) {
+  if ('r' == c) {
+ret = ret | READ;
+  } else if ('w' == c) {
+ret = ret | WRITE;
+  } else if ('a' == c) {
+ret = ret | ADMIN;
+  } else if ('-' == c) {
+//ignored
+  } else {
+throw new IllegalArgumentException("");
+  }
+}
+return ret;
+  }
+
+  public static AccessControl parseAccessControl(String str) {
+String[] parts = str.split(":");
+String type = "other";
+String name = "";
+String access = "-";
+if (parts.length > 3) {
+  throw new IllegalArgumentException("Don't know how to parse "+str+" 
into an ACL value");
+} else if (parts.length == 1) {
+  type = "other";
+  name = "";
+  access = parts[0];
+} else if (parts.length == 2) {
+  type = "user";
+  name = parts[0];
+  access = parts[1];
+} else if (parts.length == 3) {
+  type = parts[0];
+  n

[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-19 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45432555
  
--- Diff: 
storm-core/src/jvm/backtype/storm/blobstore/BlobStoreAclHandler.java ---
@@ -0,0 +1,401 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.generated.AccessControl;
+import backtype.storm.generated.AccessControlType;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.security.auth.AuthUtils;
+import backtype.storm.security.auth.IPrincipalToLocal;
+import backtype.storm.security.auth.NimbusPrincipal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Provides common handling of acls for Blobstores.
+ * Also contains some static utility functions related to Blobstores.
+ */
+public class BlobStoreAclHandler {
+  public static final Logger LOG = 
LoggerFactory.getLogger(BlobStoreAclHandler.class);
+  private final IPrincipalToLocal _ptol;
+
+  public static final int READ = 0x01;
+  public static final int WRITE = 0x02;
+  public static final int ADMIN = 0x04;
+  public static final List WORLD_EVERYTHING =
+  Arrays.asList(new AccessControl(AccessControlType.OTHER, READ | 
WRITE | ADMIN));
+  public static final List DEFAULT = new 
ArrayList();
+  private Set _supervisors;
+  private Set _admins;
+
+  public BlobStoreAclHandler(Map conf) {
+_ptol = AuthUtils.GetPrincipalToLocalPlugin(conf);
+_supervisors = new HashSet();
+_admins = new HashSet();
+if (conf.containsKey(Config.NIMBUS_SUPERVISOR_USERS)) {
+  
_supervisors.addAll((List)conf.get(Config.NIMBUS_SUPERVISOR_USERS));
+}
+if (conf.containsKey(Config.NIMBUS_ADMINS)) {
+  _admins.addAll((List)conf.get(Config.NIMBUS_ADMINS));
+}
+  }
+
+  private static AccessControlType parseACLType(String type) {
+if ("other".equalsIgnoreCase(type) || "o".equalsIgnoreCase(type)) {
+  return AccessControlType.OTHER;
+} else if ("user".equalsIgnoreCase(type) || 
"u".equalsIgnoreCase(type)) {
+  return AccessControlType.USER;
+}
+throw new IllegalArgumentException(type+" is not a valid access 
control type");
+  }
+
+  private static int parseAccess(String access) {
+int ret = 0;
+for (char c: access.toCharArray()) {
+  if ('r' == c) {
+ret = ret | READ;
+  } else if ('w' == c) {
+ret = ret | WRITE;
+  } else if ('a' == c) {
+ret = ret | ADMIN;
+  } else if ('-' == c) {
+//ignored
+  } else {
+throw new IllegalArgumentException("");
+  }
+}
+return ret;
+  }
+
+  public static AccessControl parseAccessControl(String str) {
+String[] parts = str.split(":");
+String type = "other";
+String name = "";
+String access = "-";
+if (parts.length > 3) {
+  throw new IllegalArgumentException("Don't know how to parse "+str+" 
into an ACL value");
+} else if (parts.length == 1) {
+  type = "other";
+  name = "";
+  access = parts[0];
+} else if (parts.length == 2) {
+  type = "user";
+  name = parts[0];
+  access = parts[1];
+} else if (parts.length == 3) {
+  type = parts[0];
+  name = parts[1];
+  access = parts[2];
+}
+AccessControl ret = new AccessControl();
+ret.set_type(parseACLType(type));
+ret.set_name(name);
+ret.set_access(parseAccess(access));
+return ret;
+  

[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-19 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45432533
  
--- Diff: 
storm-core/src/jvm/backtype/storm/blobstore/BlobStoreAclHandler.java ---
@@ -0,0 +1,401 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.generated.AccessControl;
+import backtype.storm.generated.AccessControlType;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.security.auth.AuthUtils;
+import backtype.storm.security.auth.IPrincipalToLocal;
+import backtype.storm.security.auth.NimbusPrincipal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Provides common handling of acls for Blobstores.
+ * Also contains some static utility functions related to Blobstores.
+ */
+public class BlobStoreAclHandler {
+  public static final Logger LOG = 
LoggerFactory.getLogger(BlobStoreAclHandler.class);
+  private final IPrincipalToLocal _ptol;
+
+  public static final int READ = 0x01;
+  public static final int WRITE = 0x02;
+  public static final int ADMIN = 0x04;
+  public static final List WORLD_EVERYTHING =
+  Arrays.asList(new AccessControl(AccessControlType.OTHER, READ | 
WRITE | ADMIN));
+  public static final List DEFAULT = new 
ArrayList();
+  private Set _supervisors;
+  private Set _admins;
+
+  public BlobStoreAclHandler(Map conf) {
+_ptol = AuthUtils.GetPrincipalToLocalPlugin(conf);
+_supervisors = new HashSet();
+_admins = new HashSet();
+if (conf.containsKey(Config.NIMBUS_SUPERVISOR_USERS)) {
+  
_supervisors.addAll((List)conf.get(Config.NIMBUS_SUPERVISOR_USERS));
+}
+if (conf.containsKey(Config.NIMBUS_ADMINS)) {
+  _admins.addAll((List)conf.get(Config.NIMBUS_ADMINS));
+}
+  }
+
+  private static AccessControlType parseACLType(String type) {
+if ("other".equalsIgnoreCase(type) || "o".equalsIgnoreCase(type)) {
+  return AccessControlType.OTHER;
+} else if ("user".equalsIgnoreCase(type) || 
"u".equalsIgnoreCase(type)) {
+  return AccessControlType.USER;
+}
+throw new IllegalArgumentException(type+" is not a valid access 
control type");
+  }
+
+  private static int parseAccess(String access) {
+int ret = 0;
+for (char c: access.toCharArray()) {
+  if ('r' == c) {
+ret = ret | READ;
+  } else if ('w' == c) {
+ret = ret | WRITE;
+  } else if ('a' == c) {
+ret = ret | ADMIN;
+  } else if ('-' == c) {
+//ignored
+  } else {
+throw new IllegalArgumentException("");
+  }
+}
+return ret;
+  }
+
+  public static AccessControl parseAccessControl(String str) {
+String[] parts = str.split(":");
+String type = "other";
+String name = "";
+String access = "-";
+if (parts.length > 3) {
+  throw new IllegalArgumentException("Don't know how to parse "+str+" 
into an ACL value");
+} else if (parts.length == 1) {
+  type = "other";
+  name = "";
+  access = parts[0];
+} else if (parts.length == 2) {
+  type = "user";
+  name = parts[0];
+  access = parts[1];
+} else if (parts.length == 3) {
+  type = parts[0];
+  name = parts[1];
+  access = parts[2];
+}
+AccessControl ret = new AccessControl();
+ret.set_type(parseACLType(type));
+ret.set_name(name);
+ret.set_access(parseAccess(access));
+return ret;
+  

[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15015124#comment-15015124
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45432465
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/BlobStore.java ---
@@ -0,0 +1,447 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import javax.security.auth.Subject;
+
+import backtype.storm.nimbus.NimbusInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.daemon.Shutdownable;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.ReadableBlobMeta;
+import backtype.storm.generated.SettableBlobMeta;
+
+/**
+ * Provides a way to store blobs that can be downloaded.
+ * Blobs must be able to be uploaded and listed from Nimbus,
+ * and downloaded from the Supervisors. It is a key value based
+ * store. Key being a string and value being the blob data.
+ *
+ * ACL checking must take place against the provided subject.
+ * If the blob store does not support Security it must validate
+ * that all ACLs set are always WORLD, everything.
+ *
+ * The users can upload their blobs through the blob store command
+ * line. The command line also allows us to update and delete blobs.
+ *
+ * Modifying the replication factor only works for HdfsBlobStore
+ * as for the LocalFsBlobStore the replication is dependent on
+ * the number of Nimbodes available.
+ */
+public abstract class BlobStore implements Shutdownable {
+  public static final Logger LOG = 
LoggerFactory.getLogger(BlobStore.class);
+  private static final Pattern KEY_PATTERN = Pattern.compile("^[\\w 
\\t\\.:_-]+$");
+  protected static final String BASE_BLOBS_DIR_NAME = "blobs";
+
+  /**
+   * Allows us to initialize the blob store
+   * @param conf The storm configuration
+   * @param baseDir The directory path to store the blobs
+   * @param nimbusInfo Contains the nimbus host, port and leadership 
information.
+   */
+  public abstract void prepare(Map conf, String baseDir, NimbusInfo 
nimbusInfo);
+
+  /**
+   * Creates the blob.
+   * @param key Key for the blob.
+   * @param meta Metadata which contains the acls information
+   * @param who Is the subject creating the blob.
+   * @return AtomicOutputStream returns a stream into which the data
+   * can be written.
+   * @throws AuthorizationException
+   * @throws KeyAlreadyExistsException
+   */
+  public abstract AtomicOutputStream createBlob(String key, 
SettableBlobMeta meta, Subject who) throws AuthorizationException, 
KeyAlreadyExistsException;
+
+  /**
+   * Updates the blob data.
+   * @param key Key for the blob.
+   * @param who Is the subject having the write privilege for the blob.
+   * @return AtomicOutputStream returns a stream into which the data
+   * can be written.
+   * @throws AuthorizationException
+   * @throws KeyNotFoundException
+   */
+  public abstract AtomicOutputStream updateBlob(String key, Subject who) 
throws AuthorizationException, KeyNotFoundException;
+
+  /**
+   * Gets the current version of metadata for a blob
+   * to be viewed by the user or downloaded by the supervisor.
+   * @param key Key for the blob.
   

[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-19 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45432465
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/BlobStore.java ---
@@ -0,0 +1,447 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import javax.security.auth.Subject;
+
+import backtype.storm.nimbus.NimbusInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.daemon.Shutdownable;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.ReadableBlobMeta;
+import backtype.storm.generated.SettableBlobMeta;
+
+/**
+ * Provides a way to store blobs that can be downloaded.
+ * Blobs must be able to be uploaded and listed from Nimbus,
+ * and downloaded from the Supervisors. It is a key value based
+ * store. Key being a string and value being the blob data.
+ *
+ * ACL checking must take place against the provided subject.
+ * If the blob store does not support Security it must validate
+ * that all ACLs set are always WORLD, everything.
+ *
+ * The users can upload their blobs through the blob store command
+ * line. The command line also allows us to update and delete blobs.
+ *
+ * Modifying the replication factor only works for HdfsBlobStore
+ * as for the LocalFsBlobStore the replication is dependent on
+ * the number of Nimbodes available.
+ */
+public abstract class BlobStore implements Shutdownable {
+  public static final Logger LOG = 
LoggerFactory.getLogger(BlobStore.class);
+  private static final Pattern KEY_PATTERN = Pattern.compile("^[\\w 
\\t\\.:_-]+$");
+  protected static final String BASE_BLOBS_DIR_NAME = "blobs";
+
+  /**
+   * Allows us to initialize the blob store
+   * @param conf The storm configuration
+   * @param baseDir The directory path to store the blobs
+   * @param nimbusInfo Contains the nimbus host, port and leadership 
information.
+   */
+  public abstract void prepare(Map conf, String baseDir, NimbusInfo 
nimbusInfo);
+
+  /**
+   * Creates the blob.
+   * @param key Key for the blob.
+   * @param meta Metadata which contains the acls information
+   * @param who Is the subject creating the blob.
+   * @return AtomicOutputStream returns a stream into which the data
+   * can be written.
+   * @throws AuthorizationException
+   * @throws KeyAlreadyExistsException
+   */
+  public abstract AtomicOutputStream createBlob(String key, 
SettableBlobMeta meta, Subject who) throws AuthorizationException, 
KeyAlreadyExistsException;
+
+  /**
+   * Updates the blob data.
+   * @param key Key for the blob.
+   * @param who Is the subject having the write privilege for the blob.
+   * @return AtomicOutputStream returns a stream into which the data
+   * can be written.
+   * @throws AuthorizationException
+   * @throws KeyNotFoundException
+   */
+  public abstract AtomicOutputStream updateBlob(String key, Subject who) 
throws AuthorizationException, KeyNotFoundException;
+
+  /**
+   * Gets the current version of metadata for a blob
+   * to be viewed by the user or downloaded by the supervisor.
+   * @param key Key for the blob.
+   * @param who Is the subject having the read privilege for the blob.
+   * @return AtomicOutputStream returns a stream into which the data
+   * can be written.
+   * @throws AuthorizationException
+   * @throws KeyNotFoundException

[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15015112#comment-15015112
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45432068
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java 
---
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.generated.ReadableBlobMeta;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.thrift.TBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+
+import java.util.*;
--- End diff --

Let's expand the include.


> Dist Cache: Basic Functionality
> ---
>
> Key: STORM-876
> URL: https://issues.apache.org/jira/browse/STORM-876
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: Robert Joseph Evans
> Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and 
> downloading dist cache items.  storm-core.ser, storm-conf.ser and storm.jar 
> should be written into the blob store instead of residing locally. We need a 
> default implementation of the blob store that does essentially what nimbus 
> currently does and does not need anything extra.  But having an HDFS backend 
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and 
> provide a working directory for the worker process with symlinks to the 
> blobs.  It should also allow the blobs to be updated and switch the symlink 
> atomically to point to the new blob once it is downloaded.
> All of this is already done by code internal to Yahoo! we are in the process 
> of getting it ready to push back to open source shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-19 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45432068
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java 
---
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.generated.ReadableBlobMeta;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.thrift.TBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+
+import java.util.*;
--- End diff --

Let's expand the include.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-19 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45431937
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/KeyVersion.java ---
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.blobstore;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.TreeSet;
+import java.util.Map;
+import java.util.List;
+
+/**
+ * Class hands over the version of the key to be stored within the 
zookeeper
+ */
+public class KeyVersion {
+  private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+  private final String BLOBSTORE_SUBTREE="/blobstore";
+  private final String 
BLOBSTORE_KEY_COUNTER_SUBTREE="/blobstorekeycounter";
+  private String key;
+  private NimbusInfo nimbusInfo;
+
+  public KeyVersion(String key, NimbusInfo nimbusInfo) {
+this.key = key;
+this.nimbusInfo = nimbusInfo;
+  }
+
+  public int getKeyVersion(Map conf) {
+TreeSet versions = new TreeSet();
+CuratorFramework zkClient = Utils.createZKClient(conf);
+try {
+  // Key has not been created yet and it is the first time it is being 
created
+  if(zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) == 
null) {
+
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
+
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(BLOBSTORE_KEY_COUNTER_SUBTREE + 
"/" + key + "/" + 1);
+return 1;
+  }
+
+  // When all nimbodes go down and one or few of them come up
+  // Unfortunately there might not be an exact way to know which one 
contains the most updated blob
+  // if all go down which is unlikely. Hence there might be a need to 
update the blob if all go down
+  List stateInfoList = 
zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + "/" + key);
+  LOG.debug("stateInfoList {} stateInfoList {}", stateInfoList.size(), 
stateInfoList);
+  if(stateInfoList.isEmpty()) {
+return getKeyVersionCounterValue(zkClient, key);
+  }
+
+  LOG.debug("stateInfoSize {}", stateInfoList.size());
+  // In all other cases check for the latest version on the nimbus and 
assign the version
+  // check if all are have same version, if not assign the highest 
version
+  for (String stateInfo:stateInfoList) {
+
versions.add(Integer.parseInt(Utils.normalizeVersionInfo(stateInfo)[1]));
+  }
+
+  int currentCounter = getKeyVersionCounterValue(zkClient, key);
+  // This condition returns version when a nimbus crashes and comes up
+  if (!checkIfStateContainsCurrentNimbusHost(stateInfoList, 
nimbusInfo) && !nimbusInfo.isLeader()) {
+if (versions.last() < currentCounter) {
+  return currentCounter;
+} else {
+  return currentCounter - 1;
+}
+  }
+  // Condition checks for an update scenario
+  if (stateInfoList.size() >= 1 && versions.size() == 1) {
+if (versions.first() < getKeyVersionCounterValue(zkClient, key)) {
+  incrementCounter(zkClient, key, currentCounter);
+  return currentCounter + 1;
+} else {
+  incrementCounter(zkClient, key, currentCounter);
+  return versions.first() + 1;
--- End diff --

I need help understanding what is happening here.  I think what would help 
me is an explanation of the relationship between the values stored under 
`BLOBSTORE_KEY_COUNTER_SUBTREE` and the versions derived from the data stored 
under `BLOBSTORE_SUBTREE`.  The comments about handling crashing nimbus daemons 
does not make it clear enough to 

[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15015103#comment-15015103
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45431937
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/KeyVersion.java ---
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.blobstore;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.TreeSet;
+import java.util.Map;
+import java.util.List;
+
+/**
+ * Class hands over the version of the key to be stored within the 
zookeeper
+ */
+public class KeyVersion {
+  private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+  private final String BLOBSTORE_SUBTREE="/blobstore";
+  private final String 
BLOBSTORE_KEY_COUNTER_SUBTREE="/blobstorekeycounter";
+  private String key;
+  private NimbusInfo nimbusInfo;
+
+  public KeyVersion(String key, NimbusInfo nimbusInfo) {
+this.key = key;
+this.nimbusInfo = nimbusInfo;
+  }
+
+  public int getKeyVersion(Map conf) {
+TreeSet versions = new TreeSet();
+CuratorFramework zkClient = Utils.createZKClient(conf);
+try {
+  // Key has not been created yet and it is the first time it is being 
created
+  if(zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) == 
null) {
+
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
+
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(BLOBSTORE_KEY_COUNTER_SUBTREE + 
"/" + key + "/" + 1);
+return 1;
+  }
+
+  // When all nimbodes go down and one or few of them come up
+  // Unfortunately there might not be an exact way to know which one 
contains the most updated blob
+  // if all go down which is unlikely. Hence there might be a need to 
update the blob if all go down
+  List stateInfoList = 
zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + "/" + key);
+  LOG.debug("stateInfoList {} stateInfoList {}", stateInfoList.size(), 
stateInfoList);
+  if(stateInfoList.isEmpty()) {
+return getKeyVersionCounterValue(zkClient, key);
+  }
+
+  LOG.debug("stateInfoSize {}", stateInfoList.size());
+  // In all other cases check for the latest version on the nimbus and 
assign the version
+  // check if all are have same version, if not assign the highest 
version
+  for (String stateInfo:stateInfoList) {
+
versions.add(Integer.parseInt(Utils.normalizeVersionInfo(stateInfo)[1]));
+  }
+
+  int currentCounter = getKeyVersionCounterValue(zkClient, key);
+  // This condition returns version when a nimbus crashes and comes up
+  if (!checkIfStateContainsCurrentNimbusHost(stateInfoList, 
nimbusInfo) && !nimbusInfo.isLeader()) {
+if (versions.last() < currentCounter) {
+  return currentCounter;
+} else {
+  return currentCounter - 1;
+}
+  }
+  // Condition checks for an update scenario
+  if (stateInfoList.size() >= 1 && versions.size() == 1) {
+if (versions.first() < getKeyVersionCounterValue(zkClient, key)) {
+  incrementCounter(zkClient, key, currentCounter);
+  return currentCounter + 1;
+} else {
+  incrementCounter(zkClient, key, currentCounter);
+  return versions.first() + 1;
--- End diff --

I need help understanding what is happening here.  I think what would help 
me is an expla

[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-19 Thread d2r
Github user d2r commented on the pull request:

https://github.com/apache/storm/pull/845#issuecomment-158264627
  
```
396 struct ReadableBlobMeta {
397   1: required SettableBlobMeta settable;
398   //This is some indication of a version of a BLOB.  The only guarantee 
is
399   // if the data changed in the blob the version will be different.
400   2: required i64 version;
401 }
```

Since the code now deals with more than one nimbus daemon, is it still true 
that there are no other guarantees?

Can a blob be updated such that its version equals a previous version's 
value?  Is there ordering to blob versions?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15015088#comment-15015088
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on the pull request:

https://github.com/apache/storm/pull/845#issuecomment-158264627
  
```
396 struct ReadableBlobMeta {
397   1: required SettableBlobMeta settable;
398   //This is some indication of a version of a BLOB.  The only guarantee 
is
399   // if the data changed in the blob the version will be different.
400   2: required i64 version;
401 }
```

Since the code now deals with more than one nimbus daemon, is it still true 
that there are no other guarantees?

Can a blob be updated such that its version equals a previous version's 
value?  Is there ordering to blob versions?



> Dist Cache: Basic Functionality
> ---
>
> Key: STORM-876
> URL: https://issues.apache.org/jira/browse/STORM-876
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: Robert Joseph Evans
> Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and 
> downloading dist cache items.  storm-core.ser, storm-conf.ser and storm.jar 
> should be written into the blob store instead of residing locally. We need a 
> default implementation of the blob store that does essentially what nimbus 
> currently does and does not need anything extra.  But having an HDFS backend 
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and 
> provide a working directory for the worker process with symlinks to the 
> blobs.  It should also allow the blobs to be updated and switch the symlink 
> atomically to point to the new blob once it is downloaded.
> All of this is already done by code internal to Yahoo! we are in the process 
> of getting it ready to push back to open source shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (STORM-1199) Create HDFS Spout

2015-11-19 Thread Roshan Naik (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-1199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roshan Naik updated STORM-1199:
---
Attachment: HDFSSpoutforStorm v2.pdf

Uploading the v2 of the design and requirements doc.  
In this version added a section on 'Progress Tracking'

> Create HDFS Spout
> -
>
> Key: STORM-1199
> URL: https://issues.apache.org/jira/browse/STORM-1199
> Project: Apache Storm
>  Issue Type: New Feature
>Reporter: Roshan Naik
>Assignee: Roshan Naik
> Attachments: HDFSSpoutforStorm v2.pdf, HDFSSpoutforStorm.pdf
>
>
> Create an HDFS spout so that Storm can suck in data from files in a HDFS 
> directory



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: MINOR: Use /usr/bin/env to find bash

2015-11-19 Thread lujinhong
Github user lujinhong commented on the pull request:

https://github.com/apache/storm/pull/892#issuecomment-158262273
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (STORM-1199) Create HDFS Spout

2015-11-19 Thread Roshan Naik (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-1199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roshan Naik reassigned STORM-1199:
--

Assignee: Roshan Naik

> Create HDFS Spout
> -
>
> Key: STORM-1199
> URL: https://issues.apache.org/jira/browse/STORM-1199
> Project: Apache Storm
>  Issue Type: New Feature
>Reporter: Roshan Naik
>Assignee: Roshan Naik
> Attachments: HDFSSpoutforStorm.pdf
>
>
> Create an HDFS spout so that Storm can suck in data from files in a HDFS 
> directory



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-19 Thread d2r
Github user d2r commented on the pull request:

https://github.com/apache/storm/pull/845#issuecomment-158261254
  
Utils.java:

```
1252 // Normalize state
1253 public static String[] normalizeVersionInfo(String 
nimbusKeyVersionInfo) {
1254   String[] normalizeVersionInfo = new String[2];
1255   int lastIndex = nimbusKeyVersionInfo.lastIndexOf("-");
1256   normalizeVersionInfo[0] = nimbusKeyVersionInfo.substring(0, 
lastIndex);
1257   normalizeVersionInfo[1] = 
nimbusKeyVersionInfo.substring(lastIndex + 1);
1258   return normalizeVersionInfo;
1259 }
```

We should make a class out of BlobVersion, with `name` and `version` as 
members. It's easier to understand than a `String[]` value.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15015058#comment-15015058
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on the pull request:

https://github.com/apache/storm/pull/845#issuecomment-158261254
  
Utils.java:

```
1252 // Normalize state
1253 public static String[] normalizeVersionInfo(String 
nimbusKeyVersionInfo) {
1254   String[] normalizeVersionInfo = new String[2];
1255   int lastIndex = nimbusKeyVersionInfo.lastIndexOf("-");
1256   normalizeVersionInfo[0] = nimbusKeyVersionInfo.substring(0, 
lastIndex);
1257   normalizeVersionInfo[1] = 
nimbusKeyVersionInfo.substring(lastIndex + 1);
1258   return normalizeVersionInfo;
1259 }
```

We should make a class out of BlobVersion, with `name` and `version` as 
members. It's easier to understand than a `String[]` value.



> Dist Cache: Basic Functionality
> ---
>
> Key: STORM-876
> URL: https://issues.apache.org/jira/browse/STORM-876
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: Robert Joseph Evans
> Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and 
> downloading dist cache items.  storm-core.ser, storm-conf.ser and storm.jar 
> should be written into the blob store instead of residing locally. We need a 
> default implementation of the blob store that does essentially what nimbus 
> currently does and does not need anything extra.  But having an HDFS backend 
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and 
> provide a working directory for the worker process with symlinks to the 
> blobs.  It should also allow the blobs to be updated and switch the symlink 
> atomically to point to the new blob once it is downloaded.
> All of this is already done by code internal to Yahoo! we are in the process 
> of getting it ready to push back to open source shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: MINOR: Use /usr/bin/env to find bash

2015-11-19 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/892#issuecomment-158260413
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1009: link on http://storm.apache.org/do...

2015-11-19 Thread wuchong
Github user wuchong commented on the pull request:

https://github.com/apache/storm/pull/882#issuecomment-158260200
  
LGTM. +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1009) link on http://storm.apache.org/documentation/Kestrel-and-Storm.html is wrong

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15015047#comment-15015047
 ] 

ASF GitHub Bot commented on STORM-1009:
---

Github user wuchong commented on the pull request:

https://github.com/apache/storm/pull/882#issuecomment-158260200
  
LGTM. +1


> link on http://storm.apache.org/documentation/Kestrel-and-Storm.html is wrong
> -
>
> Key: STORM-1009
> URL: https://issues.apache.org/jira/browse/STORM-1009
> Project: Apache Storm
>  Issue Type: Documentation
>  Components: documentation
>Reporter: Golda Velez
>Assignee: Jayapriya Surendran
>Priority: Trivial
>
> trivial but annoying: the link to Setting Up a Development Environment shoudl 
> go to  
> https://storm.apache.org/documentation/Setting-up-development-environment.html
> but it goes to a 404  
> http://storm.apache.org/documentation/Setting-up-a-development-environment.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15015039#comment-15015039
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45429107
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/KeyVersion.java ---
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.blobstore;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.TreeSet;
+import java.util.Map;
+import java.util.List;
+
+/**
+ * Class hands over the version of the key to be stored within the 
zookeeper
+ */
+public class KeyVersion {
+  private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+  private final String BLOBSTORE_SUBTREE="/blobstore";
+  private final String 
BLOBSTORE_KEY_COUNTER_SUBTREE="/blobstorekeycounter";
+  private String key;
+  private NimbusInfo nimbusInfo;
+
+  public KeyVersion(String key, NimbusInfo nimbusInfo) {
+this.key = key;
+this.nimbusInfo = nimbusInfo;
+  }
+
+  public int getKeyVersion(Map conf) {
+TreeSet versions = new TreeSet();
+CuratorFramework zkClient = Utils.createZKClient(conf);
+try {
+  // Key has not been created yet and it is the first time it is being 
created
+  if(zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) == 
null) {
+
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
+
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(BLOBSTORE_KEY_COUNTER_SUBTREE + 
"/" + key + "/" + 1);
+return 1;
+  }
+
+  // When all nimbodes go down and one or few of them come up
+  // Unfortunately there might not be an exact way to know which one 
contains the most updated blob
+  // if all go down which is unlikely. Hence there might be a need to 
update the blob if all go down
+  List stateInfoList = 
zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + "/" + key);
+  LOG.debug("stateInfoList {} stateInfoList {}", stateInfoList.size(), 
stateInfoList);
+  if(stateInfoList.isEmpty()) {
+return getKeyVersionCounterValue(zkClient, key);
+  }
+
+  LOG.debug("stateInfoSize {}", stateInfoList.size());
+  // In all other cases check for the latest version on the nimbus and 
assign the version
+  // check if all are have same version, if not assign the highest 
version
+  for (String stateInfo:stateInfoList) {
+
versions.add(Integer.parseInt(Utils.normalizeVersionInfo(stateInfo)[1]));
+  }
+
+  int currentCounter = getKeyVersionCounterValue(zkClient, key);
+  // This condition returns version when a nimbus crashes and comes up
+  if (!checkIfStateContainsCurrentNimbusHost(stateInfoList, 
nimbusInfo) && !nimbusInfo.isLeader()) {
+if (versions.last() < currentCounter) {
+  return currentCounter;
+} else {
+  return currentCounter - 1;
+}
+  }
+  // Condition checks for an update scenario
+  if (stateInfoList.size() >= 1 && versions.size() == 1) {
--- End diff --

If `versions.size()` is `1`, it is redundant to check for 
`stateInfoList.size()` since it must be 1 or greater. We don't add to 
`versions` unless there is an element in `stateInfoList`.


> Dist Cache: Basic Functionality
> ---
>
> Key: STORM-876
> URL: https://issues.apache.org/jira/browse/STORM-876
> Project: Apache Storm
> 

[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-19 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45429107
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/KeyVersion.java ---
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.blobstore;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.TreeSet;
+import java.util.Map;
+import java.util.List;
+
+/**
+ * Class hands over the version of the key to be stored within the 
zookeeper
+ */
+public class KeyVersion {
+  private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+  private final String BLOBSTORE_SUBTREE="/blobstore";
+  private final String 
BLOBSTORE_KEY_COUNTER_SUBTREE="/blobstorekeycounter";
+  private String key;
+  private NimbusInfo nimbusInfo;
+
+  public KeyVersion(String key, NimbusInfo nimbusInfo) {
+this.key = key;
+this.nimbusInfo = nimbusInfo;
+  }
+
+  public int getKeyVersion(Map conf) {
+TreeSet versions = new TreeSet();
+CuratorFramework zkClient = Utils.createZKClient(conf);
+try {
+  // Key has not been created yet and it is the first time it is being 
created
+  if(zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) == 
null) {
+
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
+
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(BLOBSTORE_KEY_COUNTER_SUBTREE + 
"/" + key + "/" + 1);
+return 1;
+  }
+
+  // When all nimbodes go down and one or few of them come up
+  // Unfortunately there might not be an exact way to know which one 
contains the most updated blob
+  // if all go down which is unlikely. Hence there might be a need to 
update the blob if all go down
+  List stateInfoList = 
zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + "/" + key);
+  LOG.debug("stateInfoList {} stateInfoList {}", stateInfoList.size(), 
stateInfoList);
+  if(stateInfoList.isEmpty()) {
+return getKeyVersionCounterValue(zkClient, key);
+  }
+
+  LOG.debug("stateInfoSize {}", stateInfoList.size());
+  // In all other cases check for the latest version on the nimbus and 
assign the version
+  // check if all are have same version, if not assign the highest 
version
+  for (String stateInfo:stateInfoList) {
+
versions.add(Integer.parseInt(Utils.normalizeVersionInfo(stateInfo)[1]));
+  }
+
+  int currentCounter = getKeyVersionCounterValue(zkClient, key);
+  // This condition returns version when a nimbus crashes and comes up
+  if (!checkIfStateContainsCurrentNimbusHost(stateInfoList, 
nimbusInfo) && !nimbusInfo.isLeader()) {
+if (versions.last() < currentCounter) {
+  return currentCounter;
+} else {
+  return currentCounter - 1;
+}
+  }
+  // Condition checks for an update scenario
+  if (stateInfoList.size() >= 1 && versions.size() == 1) {
--- End diff --

If `versions.size()` is `1`, it is redundant to check for 
`stateInfoList.size()` since it must be 1 or greater. We don't add to 
`versions` unless there is an element in `stateInfoList`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15015030#comment-15015030
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45428570
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/KeyVersion.java ---
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.blobstore;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.TreeSet;
+import java.util.Map;
+import java.util.List;
+
+/**
+ * Class hands over the version of the key to be stored within the 
zookeeper
+ */
+public class KeyVersion {
+  private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+  private final String BLOBSTORE_SUBTREE="/blobstore";
+  private final String 
BLOBSTORE_KEY_COUNTER_SUBTREE="/blobstorekeycounter";
+  private String key;
+  private NimbusInfo nimbusInfo;
+
+  public KeyVersion(String key, NimbusInfo nimbusInfo) {
+this.key = key;
+this.nimbusInfo = nimbusInfo;
+  }
+
+  public int getKeyVersion(Map conf) {
+TreeSet versions = new TreeSet();
+CuratorFramework zkClient = Utils.createZKClient(conf);
+try {
+  // Key has not been created yet and it is the first time it is being 
created
+  if(zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) == 
null) {
+
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
+
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(BLOBSTORE_KEY_COUNTER_SUBTREE + 
"/" + key + "/" + 1);
+return 1;
+  }
+
+  // When all nimbodes go down and one or few of them come up
+  // Unfortunately there might not be an exact way to know which one 
contains the most updated blob
+  // if all go down which is unlikely. Hence there might be a need to 
update the blob if all go down
+  List stateInfoList = 
zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + "/" + key);
+  LOG.debug("stateInfoList {} stateInfoList {}", stateInfoList.size(), 
stateInfoList);
+  if(stateInfoList.isEmpty()) {
+return getKeyVersionCounterValue(zkClient, key);
+  }
+
+  LOG.debug("stateInfoSize {}", stateInfoList.size());
+  // In all other cases check for the latest version on the nimbus and 
assign the version
+  // check if all are have same version, if not assign the highest 
version
+  for (String stateInfo:stateInfoList) {
+
versions.add(Integer.parseInt(Utils.normalizeVersionInfo(stateInfo)[1]));
+  }
+
+  int currentCounter = getKeyVersionCounterValue(zkClient, key);
+  // This condition returns version when a nimbus crashes and comes up
+  if (!checkIfStateContainsCurrentNimbusHost(stateInfoList, 
nimbusInfo) && !nimbusInfo.isLeader()) {
+if (versions.last() < currentCounter) {
+  return currentCounter;
+} else {
+  return currentCounter - 1;
--- End diff --

Could we add a comment explaining why we want to return `currentCounter - 
1` here?


> Dist Cache: Basic Functionality
> ---
>
> Key: STORM-876
> URL: https://issues.apache.org/jira/browse/STORM-876
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: Robert Joseph Evans
> Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the

[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-19 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45428570
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/KeyVersion.java ---
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.blobstore;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.TreeSet;
+import java.util.Map;
+import java.util.List;
+
+/**
+ * Class hands over the version of the key to be stored within the 
zookeeper
+ */
+public class KeyVersion {
+  private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+  private final String BLOBSTORE_SUBTREE="/blobstore";
+  private final String 
BLOBSTORE_KEY_COUNTER_SUBTREE="/blobstorekeycounter";
+  private String key;
+  private NimbusInfo nimbusInfo;
+
+  public KeyVersion(String key, NimbusInfo nimbusInfo) {
+this.key = key;
+this.nimbusInfo = nimbusInfo;
+  }
+
+  public int getKeyVersion(Map conf) {
+TreeSet versions = new TreeSet();
+CuratorFramework zkClient = Utils.createZKClient(conf);
+try {
+  // Key has not been created yet and it is the first time it is being 
created
+  if(zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) == 
null) {
+
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
+
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(BLOBSTORE_KEY_COUNTER_SUBTREE + 
"/" + key + "/" + 1);
+return 1;
+  }
+
+  // When all nimbodes go down and one or few of them come up
+  // Unfortunately there might not be an exact way to know which one 
contains the most updated blob
+  // if all go down which is unlikely. Hence there might be a need to 
update the blob if all go down
+  List stateInfoList = 
zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + "/" + key);
+  LOG.debug("stateInfoList {} stateInfoList {}", stateInfoList.size(), 
stateInfoList);
+  if(stateInfoList.isEmpty()) {
+return getKeyVersionCounterValue(zkClient, key);
+  }
+
+  LOG.debug("stateInfoSize {}", stateInfoList.size());
+  // In all other cases check for the latest version on the nimbus and 
assign the version
+  // check if all are have same version, if not assign the highest 
version
+  for (String stateInfo:stateInfoList) {
+
versions.add(Integer.parseInt(Utils.normalizeVersionInfo(stateInfo)[1]));
+  }
+
+  int currentCounter = getKeyVersionCounterValue(zkClient, key);
+  // This condition returns version when a nimbus crashes and comes up
+  if (!checkIfStateContainsCurrentNimbusHost(stateInfoList, 
nimbusInfo) && !nimbusInfo.isLeader()) {
+if (versions.last() < currentCounter) {
+  return currentCounter;
+} else {
+  return currentCounter - 1;
--- End diff --

Could we add a comment explaining why we want to return `currentCounter - 
1` here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15015020#comment-15015020
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45427940
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/KeyVersion.java ---
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.blobstore;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.TreeSet;
+import java.util.Map;
+import java.util.List;
+
+/**
+ * Class hands over the version of the key to be stored within the 
zookeeper
+ */
+public class KeyVersion {
+  private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+  private final String BLOBSTORE_SUBTREE="/blobstore";
+  private final String 
BLOBSTORE_KEY_COUNTER_SUBTREE="/blobstorekeycounter";
+  private String key;
+  private NimbusInfo nimbusInfo;
+
+  public KeyVersion(String key, NimbusInfo nimbusInfo) {
+this.key = key;
+this.nimbusInfo = nimbusInfo;
+  }
+
+  public int getKeyVersion(Map conf) {
+TreeSet versions = new TreeSet();
+CuratorFramework zkClient = Utils.createZKClient(conf);
+try {
+  // Key has not been created yet and it is the first time it is being 
created
+  if(zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) == 
null) {
+
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
+
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(BLOBSTORE_KEY_COUNTER_SUBTREE + 
"/" + key + "/" + 1);
+return 1;
+  }
+
+  // When all nimbodes go down and one or few of them come up
+  // Unfortunately there might not be an exact way to know which one 
contains the most updated blob
+  // if all go down which is unlikely. Hence there might be a need to 
update the blob if all go down
+  List stateInfoList = 
zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + "/" + key);
+  LOG.debug("stateInfoList {} stateInfoList {}", stateInfoList.size(), 
stateInfoList);
+  if(stateInfoList.isEmpty()) {
+return getKeyVersionCounterValue(zkClient, key);
+  }
+
+  LOG.debug("stateInfoSize {}", stateInfoList.size());
+  // In all other cases check for the latest version on the nimbus and 
assign the version
+  // check if all are have same version, if not assign the highest 
version
+  for (String stateInfo:stateInfoList) {
+
versions.add(Integer.parseInt(Utils.normalizeVersionInfo(stateInfo)[1]));
+  }
+
+  int currentCounter = getKeyVersionCounterValue(zkClient, key);
+  // This condition returns version when a nimbus crashes and comes up
+  if (!checkIfStateContainsCurrentNimbusHost(stateInfoList, 
nimbusInfo) && !nimbusInfo.isLeader()) {
+if (versions.last() < currentCounter) {
+  return currentCounter;
+} else {
+  return currentCounter - 1;
+}
+  }
+  // Condition checks for an update scenario
+  if (stateInfoList.size() >= 1 && versions.size() == 1) {
+if (versions.first() < getKeyVersionCounterValue(zkClient, key)) {
+  incrementCounter(zkClient, key, currentCounter);
+  return currentCounter + 1;
+} else {
+  incrementCounter(zkClient, key, currentCounter);
+  return versions.first() + 1;
+}
+  }
+} catch(Exception e) {
+  LOG.error("Exception {}", e);
+} finally

[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15015018#comment-15015018
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45427929
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/KeyVersion.java ---
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.blobstore;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.TreeSet;
+import java.util.Map;
+import java.util.List;
+
+/**
+ * Class hands over the version of the key to be stored within the 
zookeeper
+ */
+public class KeyVersion {
+  private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+  private final String BLOBSTORE_SUBTREE="/blobstore";
+  private final String 
BLOBSTORE_KEY_COUNTER_SUBTREE="/blobstorekeycounter";
+  private String key;
+  private NimbusInfo nimbusInfo;
+
+  public KeyVersion(String key, NimbusInfo nimbusInfo) {
+this.key = key;
+this.nimbusInfo = nimbusInfo;
+  }
+
+  public int getKeyVersion(Map conf) {
+TreeSet versions = new TreeSet();
+CuratorFramework zkClient = Utils.createZKClient(conf);
+try {
+  // Key has not been created yet and it is the first time it is being 
created
+  if(zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) == 
null) {
+
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
+
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(BLOBSTORE_KEY_COUNTER_SUBTREE + 
"/" + key + "/" + 1);
+return 1;
+  }
+
+  // When all nimbodes go down and one or few of them come up
+  // Unfortunately there might not be an exact way to know which one 
contains the most updated blob
+  // if all go down which is unlikely. Hence there might be a need to 
update the blob if all go down
+  List stateInfoList = 
zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + "/" + key);
+  LOG.debug("stateInfoList {} stateInfoList {}", stateInfoList.size(), 
stateInfoList);
+  if(stateInfoList.isEmpty()) {
+return getKeyVersionCounterValue(zkClient, key);
+  }
+
+  LOG.debug("stateInfoSize {}", stateInfoList.size());
+  // In all other cases check for the latest version on the nimbus and 
assign the version
+  // check if all are have same version, if not assign the highest 
version
+  for (String stateInfo:stateInfoList) {
+
versions.add(Integer.parseInt(Utils.normalizeVersionInfo(stateInfo)[1]));
+  }
+
+  int currentCounter = getKeyVersionCounterValue(zkClient, key);
+  // This condition returns version when a nimbus crashes and comes up
+  if (!checkIfStateContainsCurrentNimbusHost(stateInfoList, 
nimbusInfo) && !nimbusInfo.isLeader()) {
+if (versions.last() < currentCounter) {
+  return currentCounter;
+} else {
+  return currentCounter - 1;
+}
+  }
+  // Condition checks for an update scenario
+  if (stateInfoList.size() >= 1 && versions.size() == 1) {
+if (versions.first() < getKeyVersionCounterValue(zkClient, key)) {
+  incrementCounter(zkClient, key, currentCounter);
+  return currentCounter + 1;
+} else {
+  incrementCounter(zkClient, key, currentCounter);
+  return versions.first() + 1;
+}
+  }
+} catch(Exception e) {
+  LOG.error("Exception {}", e);
+} finally

[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15015019#comment-15015019
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45427932
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/KeyVersion.java ---
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.blobstore;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.TreeSet;
+import java.util.Map;
+import java.util.List;
+
+/**
+ * Class hands over the version of the key to be stored within the 
zookeeper
+ */
+public class KeyVersion {
+  private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+  private final String BLOBSTORE_SUBTREE="/blobstore";
+  private final String 
BLOBSTORE_KEY_COUNTER_SUBTREE="/blobstorekeycounter";
+  private String key;
+  private NimbusInfo nimbusInfo;
+
+  public KeyVersion(String key, NimbusInfo nimbusInfo) {
+this.key = key;
+this.nimbusInfo = nimbusInfo;
+  }
+
+  public int getKeyVersion(Map conf) {
+TreeSet versions = new TreeSet();
+CuratorFramework zkClient = Utils.createZKClient(conf);
+try {
+  // Key has not been created yet and it is the first time it is being 
created
+  if(zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) == 
null) {
+
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
+
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(BLOBSTORE_KEY_COUNTER_SUBTREE + 
"/" + key + "/" + 1);
+return 1;
+  }
+
+  // When all nimbodes go down and one or few of them come up
+  // Unfortunately there might not be an exact way to know which one 
contains the most updated blob
+  // if all go down which is unlikely. Hence there might be a need to 
update the blob if all go down
+  List stateInfoList = 
zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + "/" + key);
+  LOG.debug("stateInfoList {} stateInfoList {}", stateInfoList.size(), 
stateInfoList);
+  if(stateInfoList.isEmpty()) {
+return getKeyVersionCounterValue(zkClient, key);
+  }
+
+  LOG.debug("stateInfoSize {}", stateInfoList.size());
+  // In all other cases check for the latest version on the nimbus and 
assign the version
+  // check if all are have same version, if not assign the highest 
version
+  for (String stateInfo:stateInfoList) {
+
versions.add(Integer.parseInt(Utils.normalizeVersionInfo(stateInfo)[1]));
+  }
+
+  int currentCounter = getKeyVersionCounterValue(zkClient, key);
+  // This condition returns version when a nimbus crashes and comes up
+  if (!checkIfStateContainsCurrentNimbusHost(stateInfoList, 
nimbusInfo) && !nimbusInfo.isLeader()) {
+if (versions.last() < currentCounter) {
+  return currentCounter;
+} else {
+  return currentCounter - 1;
+}
+  }
+  // Condition checks for an update scenario
+  if (stateInfoList.size() >= 1 && versions.size() == 1) {
+if (versions.first() < getKeyVersionCounterValue(zkClient, key)) {
+  incrementCounter(zkClient, key, currentCounter);
+  return currentCounter + 1;
+} else {
+  incrementCounter(zkClient, key, currentCounter);
+  return versions.first() + 1;
+}
+  }
+} catch(Exception e) {
+  LOG.error("Exception {}", e);
+} finally

[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-19 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45427932
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/KeyVersion.java ---
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.blobstore;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.TreeSet;
+import java.util.Map;
+import java.util.List;
+
+/**
+ * Class hands over the version of the key to be stored within the 
zookeeper
+ */
+public class KeyVersion {
+  private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+  private final String BLOBSTORE_SUBTREE="/blobstore";
+  private final String 
BLOBSTORE_KEY_COUNTER_SUBTREE="/blobstorekeycounter";
+  private String key;
+  private NimbusInfo nimbusInfo;
+
+  public KeyVersion(String key, NimbusInfo nimbusInfo) {
+this.key = key;
+this.nimbusInfo = nimbusInfo;
+  }
+
+  public int getKeyVersion(Map conf) {
+TreeSet versions = new TreeSet();
+CuratorFramework zkClient = Utils.createZKClient(conf);
+try {
+  // Key has not been created yet and it is the first time it is being 
created
+  if(zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) == 
null) {
+
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
+
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(BLOBSTORE_KEY_COUNTER_SUBTREE + 
"/" + key + "/" + 1);
+return 1;
+  }
+
+  // When all nimbodes go down and one or few of them come up
+  // Unfortunately there might not be an exact way to know which one 
contains the most updated blob
+  // if all go down which is unlikely. Hence there might be a need to 
update the blob if all go down
+  List stateInfoList = 
zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + "/" + key);
+  LOG.debug("stateInfoList {} stateInfoList {}", stateInfoList.size(), 
stateInfoList);
+  if(stateInfoList.isEmpty()) {
+return getKeyVersionCounterValue(zkClient, key);
+  }
+
+  LOG.debug("stateInfoSize {}", stateInfoList.size());
+  // In all other cases check for the latest version on the nimbus and 
assign the version
+  // check if all are have same version, if not assign the highest 
version
+  for (String stateInfo:stateInfoList) {
+
versions.add(Integer.parseInt(Utils.normalizeVersionInfo(stateInfo)[1]));
+  }
+
+  int currentCounter = getKeyVersionCounterValue(zkClient, key);
+  // This condition returns version when a nimbus crashes and comes up
+  if (!checkIfStateContainsCurrentNimbusHost(stateInfoList, 
nimbusInfo) && !nimbusInfo.isLeader()) {
+if (versions.last() < currentCounter) {
+  return currentCounter;
+} else {
+  return currentCounter - 1;
+}
+  }
+  // Condition checks for an update scenario
+  if (stateInfoList.size() >= 1 && versions.size() == 1) {
+if (versions.first() < getKeyVersionCounterValue(zkClient, key)) {
+  incrementCounter(zkClient, key, currentCounter);
+  return currentCounter + 1;
+} else {
+  incrementCounter(zkClient, key, currentCounter);
+  return versions.first() + 1;
+}
+  }
+} catch(Exception e) {
+  LOG.error("Exception {}", e);
+} finally {
+  if (zkClient != null) {
+zkClient.close();
+  }
+}
+return versions.last();
+  }
+
+  public boolean checkIfStateContainsCurrentNimbusHost(List 
stateInfoList, NimbusInfo nimbusInfo) {
+ 

[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-19 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45427940
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/KeyVersion.java ---
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.blobstore;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.TreeSet;
+import java.util.Map;
+import java.util.List;
+
+/**
+ * Class hands over the version of the key to be stored within the 
zookeeper
+ */
+public class KeyVersion {
+  private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+  private final String BLOBSTORE_SUBTREE="/blobstore";
+  private final String 
BLOBSTORE_KEY_COUNTER_SUBTREE="/blobstorekeycounter";
+  private String key;
+  private NimbusInfo nimbusInfo;
+
+  public KeyVersion(String key, NimbusInfo nimbusInfo) {
+this.key = key;
+this.nimbusInfo = nimbusInfo;
+  }
+
+  public int getKeyVersion(Map conf) {
+TreeSet versions = new TreeSet();
+CuratorFramework zkClient = Utils.createZKClient(conf);
+try {
+  // Key has not been created yet and it is the first time it is being 
created
+  if(zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) == 
null) {
+
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
+
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(BLOBSTORE_KEY_COUNTER_SUBTREE + 
"/" + key + "/" + 1);
+return 1;
+  }
+
+  // When all nimbodes go down and one or few of them come up
+  // Unfortunately there might not be an exact way to know which one 
contains the most updated blob
+  // if all go down which is unlikely. Hence there might be a need to 
update the blob if all go down
+  List stateInfoList = 
zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + "/" + key);
+  LOG.debug("stateInfoList {} stateInfoList {}", stateInfoList.size(), 
stateInfoList);
+  if(stateInfoList.isEmpty()) {
+return getKeyVersionCounterValue(zkClient, key);
+  }
+
+  LOG.debug("stateInfoSize {}", stateInfoList.size());
+  // In all other cases check for the latest version on the nimbus and 
assign the version
+  // check if all are have same version, if not assign the highest 
version
+  for (String stateInfo:stateInfoList) {
+
versions.add(Integer.parseInt(Utils.normalizeVersionInfo(stateInfo)[1]));
+  }
+
+  int currentCounter = getKeyVersionCounterValue(zkClient, key);
+  // This condition returns version when a nimbus crashes and comes up
+  if (!checkIfStateContainsCurrentNimbusHost(stateInfoList, 
nimbusInfo) && !nimbusInfo.isLeader()) {
+if (versions.last() < currentCounter) {
+  return currentCounter;
+} else {
+  return currentCounter - 1;
+}
+  }
+  // Condition checks for an update scenario
+  if (stateInfoList.size() >= 1 && versions.size() == 1) {
+if (versions.first() < getKeyVersionCounterValue(zkClient, key)) {
+  incrementCounter(zkClient, key, currentCounter);
+  return currentCounter + 1;
+} else {
+  incrementCounter(zkClient, key, currentCounter);
+  return versions.first() + 1;
+}
+  }
+} catch(Exception e) {
+  LOG.error("Exception {}", e);
+} finally {
+  if (zkClient != null) {
+zkClient.close();
+  }
+}
+return versions.last();
+  }
+
+  public boolean checkIfStateContainsCurrentNimbusHost(List 
stateInfoList, NimbusInfo nimbusInfo) {
+ 

[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-19 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45427929
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/KeyVersion.java ---
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.blobstore;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.TreeSet;
+import java.util.Map;
+import java.util.List;
+
+/**
+ * Class hands over the version of the key to be stored within the 
zookeeper
+ */
+public class KeyVersion {
+  private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+  private final String BLOBSTORE_SUBTREE="/blobstore";
+  private final String 
BLOBSTORE_KEY_COUNTER_SUBTREE="/blobstorekeycounter";
+  private String key;
+  private NimbusInfo nimbusInfo;
+
+  public KeyVersion(String key, NimbusInfo nimbusInfo) {
+this.key = key;
+this.nimbusInfo = nimbusInfo;
+  }
+
+  public int getKeyVersion(Map conf) {
+TreeSet versions = new TreeSet();
+CuratorFramework zkClient = Utils.createZKClient(conf);
+try {
+  // Key has not been created yet and it is the first time it is being 
created
+  if(zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) == 
null) {
+
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
+
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(BLOBSTORE_KEY_COUNTER_SUBTREE + 
"/" + key + "/" + 1);
+return 1;
+  }
+
+  // When all nimbodes go down and one or few of them come up
+  // Unfortunately there might not be an exact way to know which one 
contains the most updated blob
+  // if all go down which is unlikely. Hence there might be a need to 
update the blob if all go down
+  List stateInfoList = 
zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + "/" + key);
+  LOG.debug("stateInfoList {} stateInfoList {}", stateInfoList.size(), 
stateInfoList);
+  if(stateInfoList.isEmpty()) {
+return getKeyVersionCounterValue(zkClient, key);
+  }
+
+  LOG.debug("stateInfoSize {}", stateInfoList.size());
+  // In all other cases check for the latest version on the nimbus and 
assign the version
+  // check if all are have same version, if not assign the highest 
version
+  for (String stateInfo:stateInfoList) {
+
versions.add(Integer.parseInt(Utils.normalizeVersionInfo(stateInfo)[1]));
+  }
+
+  int currentCounter = getKeyVersionCounterValue(zkClient, key);
+  // This condition returns version when a nimbus crashes and comes up
+  if (!checkIfStateContainsCurrentNimbusHost(stateInfoList, 
nimbusInfo) && !nimbusInfo.isLeader()) {
+if (versions.last() < currentCounter) {
+  return currentCounter;
+} else {
+  return currentCounter - 1;
+}
+  }
+  // Condition checks for an update scenario
+  if (stateInfoList.size() >= 1 && versions.size() == 1) {
+if (versions.first() < getKeyVersionCounterValue(zkClient, key)) {
+  incrementCounter(zkClient, key, currentCounter);
+  return currentCounter + 1;
+} else {
+  incrementCounter(zkClient, key, currentCounter);
+  return versions.first() + 1;
+}
+  }
+} catch(Exception e) {
+  LOG.error("Exception {}", e);
+} finally {
+  if (zkClient != null) {
+zkClient.close();
+  }
+}
+return versions.last();
+  }
+
+  public boolean checkIfStateContainsCurrentNimbusHost(List 
stateInfoList, NimbusInfo nimbusInfo) {
--

[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15015014#comment-15015014
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45427729
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/KeyVersion.java ---
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.blobstore;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.TreeSet;
+import java.util.Map;
+import java.util.List;
+
+/**
+ * Class hands over the version of the key to be stored within the 
zookeeper
+ */
+public class KeyVersion {
+  private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+  private final String BLOBSTORE_SUBTREE="/blobstore";
+  private final String 
BLOBSTORE_KEY_COUNTER_SUBTREE="/blobstorekeycounter";
+  private String key;
+  private NimbusInfo nimbusInfo;
+
+  public KeyVersion(String key, NimbusInfo nimbusInfo) {
+this.key = key;
+this.nimbusInfo = nimbusInfo;
+  }
+
+  public int getKeyVersion(Map conf) {
+TreeSet versions = new TreeSet();
+CuratorFramework zkClient = Utils.createZKClient(conf);
+try {
+  // Key has not been created yet and it is the first time it is being 
created
+  if(zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) == 
null) {
+
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
+
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(BLOBSTORE_KEY_COUNTER_SUBTREE + 
"/" + key + "/" + 1);
+return 1;
+  }
+
+  // When all nimbodes go down and one or few of them come up
+  // Unfortunately there might not be an exact way to know which one 
contains the most updated blob
+  // if all go down which is unlikely. Hence there might be a need to 
update the blob if all go down
+  List stateInfoList = 
zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + "/" + key);
+  LOG.debug("stateInfoList {} stateInfoList {}", stateInfoList.size(), 
stateInfoList);
--- End diff --

Check log message.


> Dist Cache: Basic Functionality
> ---
>
> Key: STORM-876
> URL: https://issues.apache.org/jira/browse/STORM-876
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: Robert Joseph Evans
> Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and 
> downloading dist cache items.  storm-core.ser, storm-conf.ser and storm.jar 
> should be written into the blob store instead of residing locally. We need a 
> default implementation of the blob store that does essentially what nimbus 
> currently does and does not need anything extra.  But having an HDFS backend 
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and 
> provide a working directory for the worker process with symlinks to the 
> blobs.  It should also allow the blobs to be updated and switch the symlink 
> atomically to point to the new blob once it is downloaded.
> All of this is already done by code internal to Yahoo! we are in the process 
> of getting it ready to push back to open source shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-19 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45427729
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/KeyVersion.java ---
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.blobstore;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.TreeSet;
+import java.util.Map;
+import java.util.List;
+
+/**
+ * Class hands over the version of the key to be stored within the 
zookeeper
+ */
+public class KeyVersion {
+  private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+  private final String BLOBSTORE_SUBTREE="/blobstore";
+  private final String 
BLOBSTORE_KEY_COUNTER_SUBTREE="/blobstorekeycounter";
+  private String key;
+  private NimbusInfo nimbusInfo;
+
+  public KeyVersion(String key, NimbusInfo nimbusInfo) {
+this.key = key;
+this.nimbusInfo = nimbusInfo;
+  }
+
+  public int getKeyVersion(Map conf) {
+TreeSet versions = new TreeSet();
+CuratorFramework zkClient = Utils.createZKClient(conf);
+try {
+  // Key has not been created yet and it is the first time it is being 
created
+  if(zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) == 
null) {
+
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
+
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(BLOBSTORE_KEY_COUNTER_SUBTREE + 
"/" + key + "/" + 1);
+return 1;
+  }
+
+  // When all nimbodes go down and one or few of them come up
+  // Unfortunately there might not be an exact way to know which one 
contains the most updated blob
+  // if all go down which is unlikely. Hence there might be a need to 
update the blob if all go down
+  List stateInfoList = 
zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + "/" + key);
+  LOG.debug("stateInfoList {} stateInfoList {}", stateInfoList.size(), 
stateInfoList);
--- End diff --

Check log message.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15015007#comment-15015007
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45427569
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/KeyVersion.java ---
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.blobstore;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.TreeSet;
+import java.util.Map;
+import java.util.List;
+
+/**
+ * Class hands over the version of the key to be stored within the 
zookeeper
+ */
+public class KeyVersion {
+  private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+  private final String BLOBSTORE_SUBTREE="/blobstore";
+  private final String 
BLOBSTORE_KEY_COUNTER_SUBTREE="/blobstorekeycounter";
+  private String key;
+  private NimbusInfo nimbusInfo;
+
+  public KeyVersion(String key, NimbusInfo nimbusInfo) {
+this.key = key;
+this.nimbusInfo = nimbusInfo;
+  }
+
+  public int getKeyVersion(Map conf) {
+TreeSet versions = new TreeSet();
+CuratorFramework zkClient = Utils.createZKClient(conf);
+try {
+  // Key has not been created yet and it is the first time it is being 
created
+  if(zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) == 
null) {
+
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
+
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(BLOBSTORE_KEY_COUNTER_SUBTREE + 
"/" + key + "/" + 1);
--- End diff --

`+ "/1");`


> Dist Cache: Basic Functionality
> ---
>
> Key: STORM-876
> URL: https://issues.apache.org/jira/browse/STORM-876
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: Robert Joseph Evans
> Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and 
> downloading dist cache items.  storm-core.ser, storm-conf.ser and storm.jar 
> should be written into the blob store instead of residing locally. We need a 
> default implementation of the blob store that does essentially what nimbus 
> currently does and does not need anything extra.  But having an HDFS backend 
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and 
> provide a working directory for the worker process with symlinks to the 
> blobs.  It should also allow the blobs to be updated and switch the symlink 
> atomically to point to the new blob once it is downloaded.
> All of this is already done by code internal to Yahoo! we are in the process 
> of getting it ready to push back to open source shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-19 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45427569
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/KeyVersion.java ---
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.blobstore;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.TreeSet;
+import java.util.Map;
+import java.util.List;
+
+/**
+ * Class hands over the version of the key to be stored within the 
zookeeper
+ */
+public class KeyVersion {
+  private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+  private final String BLOBSTORE_SUBTREE="/blobstore";
+  private final String 
BLOBSTORE_KEY_COUNTER_SUBTREE="/blobstorekeycounter";
+  private String key;
+  private NimbusInfo nimbusInfo;
+
+  public KeyVersion(String key, NimbusInfo nimbusInfo) {
+this.key = key;
+this.nimbusInfo = nimbusInfo;
+  }
+
+  public int getKeyVersion(Map conf) {
+TreeSet versions = new TreeSet();
+CuratorFramework zkClient = Utils.createZKClient(conf);
+try {
+  // Key has not been created yet and it is the first time it is being 
created
+  if(zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) == 
null) {
+
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
+
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(BLOBSTORE_KEY_COUNTER_SUBTREE + 
"/" + key + "/" + 1);
--- End diff --

`+ "/1");`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15015006#comment-15015006
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45427507
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/KeyVersion.java ---
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.blobstore;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.TreeSet;
+import java.util.Map;
+import java.util.List;
+
+/**
+ * Class hands over the version of the key to be stored within the 
zookeeper
+ */
+public class KeyVersion {
+  private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+  private final String BLOBSTORE_SUBTREE="/blobstore";
+  private final String 
BLOBSTORE_KEY_COUNTER_SUBTREE="/blobstorekeycounter";
+  private String key;
+  private NimbusInfo nimbusInfo;
--- End diff --

`key` and `nimbusInfo` can be `final`, and we do not need to pass them 
around as parameters in several places below if they are already class 
variables.


> Dist Cache: Basic Functionality
> ---
>
> Key: STORM-876
> URL: https://issues.apache.org/jira/browse/STORM-876
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: Robert Joseph Evans
> Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and 
> downloading dist cache items.  storm-core.ser, storm-conf.ser and storm.jar 
> should be written into the blob store instead of residing locally. We need a 
> default implementation of the blob store that does essentially what nimbus 
> currently does and does not need anything extra.  But having an HDFS backend 
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and 
> provide a working directory for the worker process with symlinks to the 
> blobs.  It should also allow the blobs to be updated and switch the symlink 
> atomically to point to the new blob once it is downloaded.
> All of this is already done by code internal to Yahoo! we are in the process 
> of getting it ready to push back to open source shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-19 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45427507
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/KeyVersion.java ---
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.blobstore;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.TreeSet;
+import java.util.Map;
+import java.util.List;
+
+/**
+ * Class hands over the version of the key to be stored within the 
zookeeper
+ */
+public class KeyVersion {
+  private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+  private final String BLOBSTORE_SUBTREE="/blobstore";
+  private final String 
BLOBSTORE_KEY_COUNTER_SUBTREE="/blobstorekeycounter";
+  private String key;
+  private NimbusInfo nimbusInfo;
--- End diff --

`key` and `nimbusInfo` can be `final`, and we do not need to pass them 
around as parameters in several places below if they are already class 
variables.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15014997#comment-15014997
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45427102
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/KeyFilter.java ---
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+public interface KeyFilter {
+  public R filter(String key);
--- End diff --

Don't need the `public`


> Dist Cache: Basic Functionality
> ---
>
> Key: STORM-876
> URL: https://issues.apache.org/jira/browse/STORM-876
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: Robert Joseph Evans
> Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and 
> downloading dist cache items.  storm-core.ser, storm-conf.ser and storm.jar 
> should be written into the blob store instead of residing locally. We need a 
> default implementation of the blob store that does essentially what nimbus 
> currently does and does not need anything extra.  But having an HDFS backend 
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and 
> provide a working directory for the worker process with symlinks to the 
> blobs.  It should also allow the blobs to be updated and switch the symlink 
> atomically to point to the new blob once it is downloaded.
> All of this is already done by code internal to Yahoo! we are in the process 
> of getting it ready to push back to open source shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-19 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45427102
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/KeyFilter.java ---
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+public interface KeyFilter {
+  public R filter(String key);
--- End diff --

Don't need the `public`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15014993#comment-15014993
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45426985
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/IBlobWatcher.java ---
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+/**
+ * Provides a way to know when a blob changes.
+ */
+public interface IBlobWatcher {
+public void blobChanged(String key);
+}
--- End diff --

This interface may not be needed if we remove watching methods from the 
API.  See [comment](https://github.com/apache/storm/pull/845/files#r45425945)


> Dist Cache: Basic Functionality
> ---
>
> Key: STORM-876
> URL: https://issues.apache.org/jira/browse/STORM-876
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: Robert Joseph Evans
> Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and 
> downloading dist cache items.  storm-core.ser, storm-conf.ser and storm.jar 
> should be written into the blob store instead of residing locally. We need a 
> default implementation of the blob store that does essentially what nimbus 
> currently does and does not need anything extra.  But having an HDFS backend 
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and 
> provide a working directory for the worker process with symlinks to the 
> blobs.  It should also allow the blobs to be updated and switch the symlink 
> atomically to point to the new blob once it is downloaded.
> All of this is already done by code internal to Yahoo! we are in the process 
> of getting it ready to push back to open source shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-19 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45426985
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/IBlobWatcher.java ---
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+/**
+ * Provides a way to know when a blob changes.
+ */
+public interface IBlobWatcher {
+public void blobChanged(String key);
+}
--- End diff --

This interface may not be needed if we remove watching methods from the 
API.  See [comment](https://github.com/apache/storm/pull/845/files#r45425945)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15014990#comment-15014990
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45426783
  
--- Diff: 
storm-core/src/jvm/backtype/storm/blobstore/FileBlobStoreImpl.java ---
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Timer;
+import java.util.TimerTask;
+
+/**
+ * Very basic blob store impl with no ACL handling.
+ */
+public class FileBlobStoreImpl {
+  private static final long FULL_CLEANUP_FREQ = 60 * 60 * 1000l;
+  private static final int BUCKETS = 1024;
+  private static final Logger LOG = 
LoggerFactory.getLogger(FileBlobStoreImpl.class);
+  private static final Timer timer = new Timer("FileBlobStore cleanup 
thread", true);
+
+  public class KeyInHashDirIterator implements Iterator {
+private int currentBucket = 0;
+private Iterator it = null;
+private String next = null;
+
+public KeyInHashDirIterator() throws IOException {
+  primeNext();
+}
+
+private void primeNext() throws IOException {
+  while (it == null && currentBucket < BUCKETS) {
+String name = String.valueOf(currentBucket);
+File dir = new File(_fullPath, name);
+try {
+  it = listKeys(dir);
+} catch (FileNotFoundException e) {
+  it = null;
+}
+if (it == null || !it.hasNext()) {
+  it = null;
+  currentBucket++;
+} else {
+  next = it.next();
+}
+  }
+}
+
+@Override
+public boolean hasNext() {
+  return next != null;
+}
+
+@Override
+public String next() {
+  if (!hasNext()) {
+throw new NoSuchElementException();
+  }
+  String current = next;
+  next = null;
+  if (it != null) {
+if (!it.hasNext()) {
+  it = null;
+  currentBucket++;
+  try {
+primeNext();
+  } catch (IOException e) {
+throw new RuntimeException(e);
+  }
+} else {
+  next = it.next();
+}
+  }
+  return current;
+}
+
+@Override
+public void remove() {
+  throw new UnsupportedOperationException("Delete Not Supported");
+}
+  }
+
+  private File _fullPath;
+  private TimerTask cleanup = null;
+
+  public FileBlobStoreImpl(File path, Map conf) throws 
IOException {
+LOG.info("Creating new blob store based in {}", path);
+_fullPath = path;
+_fullPath.mkdirs();
+Object shouldCleanup = conf.get(Config.BLOBSTORE_CLEANUP_ENABLE);
+if (Utils.getBoolean(shouldCleanup, false)) {
+  LOG.debug("Starting File blobstore cleaner");
+  cleanup = new TimerTask() {
+@Override
+public void run() {
+  try {
+fullCleanup(FULL_CLEANUP_FREQ);
+  } catch (IOException e) {
+LOG.error("Error trying to cleanup", e);
+  }
+}
+  };
+  timer.scheduleAtFixedRate(cleanup, 0, FULL_CLEANUP_FREQ);
+}
+  }
+
+  /**
+   *

[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-19 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45426783
  
--- Diff: 
storm-core/src/jvm/backtype/storm/blobstore/FileBlobStoreImpl.java ---
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Timer;
+import java.util.TimerTask;
+
+/**
+ * Very basic blob store impl with no ACL handling.
+ */
+public class FileBlobStoreImpl {
+  private static final long FULL_CLEANUP_FREQ = 60 * 60 * 1000l;
+  private static final int BUCKETS = 1024;
+  private static final Logger LOG = 
LoggerFactory.getLogger(FileBlobStoreImpl.class);
+  private static final Timer timer = new Timer("FileBlobStore cleanup 
thread", true);
+
+  public class KeyInHashDirIterator implements Iterator {
+private int currentBucket = 0;
+private Iterator it = null;
+private String next = null;
+
+public KeyInHashDirIterator() throws IOException {
+  primeNext();
+}
+
+private void primeNext() throws IOException {
+  while (it == null && currentBucket < BUCKETS) {
+String name = String.valueOf(currentBucket);
+File dir = new File(_fullPath, name);
+try {
+  it = listKeys(dir);
+} catch (FileNotFoundException e) {
+  it = null;
+}
+if (it == null || !it.hasNext()) {
+  it = null;
+  currentBucket++;
+} else {
+  next = it.next();
+}
+  }
+}
+
+@Override
+public boolean hasNext() {
+  return next != null;
+}
+
+@Override
+public String next() {
+  if (!hasNext()) {
+throw new NoSuchElementException();
+  }
+  String current = next;
+  next = null;
+  if (it != null) {
+if (!it.hasNext()) {
+  it = null;
+  currentBucket++;
+  try {
+primeNext();
+  } catch (IOException e) {
+throw new RuntimeException(e);
+  }
+} else {
+  next = it.next();
+}
+  }
+  return current;
+}
+
+@Override
+public void remove() {
+  throw new UnsupportedOperationException("Delete Not Supported");
+}
+  }
+
+  private File _fullPath;
+  private TimerTask cleanup = null;
+
+  public FileBlobStoreImpl(File path, Map conf) throws 
IOException {
+LOG.info("Creating new blob store based in {}", path);
+_fullPath = path;
+_fullPath.mkdirs();
+Object shouldCleanup = conf.get(Config.BLOBSTORE_CLEANUP_ENABLE);
+if (Utils.getBoolean(shouldCleanup, false)) {
+  LOG.debug("Starting File blobstore cleaner");
+  cleanup = new TimerTask() {
+@Override
+public void run() {
+  try {
+fullCleanup(FULL_CLEANUP_FREQ);
+  } catch (IOException e) {
+LOG.error("Error trying to cleanup", e);
+  }
+}
+  };
+  timer.scheduleAtFixedRate(cleanup, 0, FULL_CLEANUP_FREQ);
+}
+  }
+
+  /**
+   * @return all keys that are available for reading.
+   * @throws IOException on any error.
+   */ 
+  public Iterator listKeys() throws IOException {
+return new KeyInHashDirIterator();
+  }
+
+  /**
+   * Get an input

[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-19 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45426264
  
--- Diff: 
storm-core/src/jvm/backtype/storm/blobstore/FileBlobStoreImpl.java ---
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Timer;
+import java.util.TimerTask;
+
+/**
+ * Very basic blob store impl with no ACL handling.
+ */
+public class FileBlobStoreImpl {
+  private static final long FULL_CLEANUP_FREQ = 60 * 60 * 1000l;
+  private static final int BUCKETS = 1024;
+  private static final Logger LOG = 
LoggerFactory.getLogger(FileBlobStoreImpl.class);
+  private static final Timer timer = new Timer("FileBlobStore cleanup 
thread", true);
--- End diff --

Let's pick a convention for the naming of private variables.  We are mixing 
the underscore `_` prefix with no prefix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15014982#comment-15014982
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45426264
  
--- Diff: 
storm-core/src/jvm/backtype/storm/blobstore/FileBlobStoreImpl.java ---
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Timer;
+import java.util.TimerTask;
+
+/**
+ * Very basic blob store impl with no ACL handling.
+ */
+public class FileBlobStoreImpl {
+  private static final long FULL_CLEANUP_FREQ = 60 * 60 * 1000l;
+  private static final int BUCKETS = 1024;
+  private static final Logger LOG = 
LoggerFactory.getLogger(FileBlobStoreImpl.class);
+  private static final Timer timer = new Timer("FileBlobStore cleanup 
thread", true);
--- End diff --

Let's pick a convention for the naming of private variables.  We are mixing 
the underscore `_` prefix with no prefix.


> Dist Cache: Basic Functionality
> ---
>
> Key: STORM-876
> URL: https://issues.apache.org/jira/browse/STORM-876
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: Robert Joseph Evans
> Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and 
> downloading dist cache items.  storm-core.ser, storm-conf.ser and storm.jar 
> should be written into the blob store instead of residing locally. We need a 
> default implementation of the blob store that does essentially what nimbus 
> currently does and does not need anything extra.  But having an HDFS backend 
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and 
> provide a working directory for the worker process with symlinks to the 
> blobs.  It should also allow the blobs to be updated and switch the symlink 
> atomically to point to the new blob once it is downloaded.
> All of this is already done by code internal to Yahoo! we are in the process 
> of getting it ready to push back to open source shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15014973#comment-15014973
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45425945
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/ClientBlobStore.java 
---
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.daemon.Shutdownable;
+import backtype.storm.generated.AccessControl;
+import backtype.storm.generated.AccessControlType;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.ReadableBlobMeta;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+
+import java.util.Iterator;
+import java.util.Map;
+
+
+public abstract class ClientBlobStore implements Shutdownable {
+  protected Map conf;
+
+  public abstract void prepare(Map conf);
+  protected abstract AtomicOutputStream createBlobToExtend(String key, 
SettableBlobMeta meta) throws AuthorizationException, KeyAlreadyExistsException;
+  public abstract AtomicOutputStream updateBlob(String key) throws 
AuthorizationException, KeyNotFoundException;
+  public abstract ReadableBlobMeta getBlobMeta(String key) throws 
AuthorizationException, KeyNotFoundException;
+  protected abstract void setBlobMetaToExtend(String key, SettableBlobMeta 
meta) throws AuthorizationException, KeyNotFoundException;
+  public abstract void deleteBlob(String key) throws 
AuthorizationException, KeyNotFoundException;
+  public abstract InputStreamWithMeta getBlob(String key) throws 
AuthorizationException, KeyNotFoundException;
+  public abstract Iterator listKeys();
+  public abstract void watchBlob(String key, IBlobWatcher watcher) throws 
AuthorizationException;
+  public abstract void stopWatchingBlob(String key) throws 
AuthorizationException;
--- End diff --

All implementations of `watchBlob` and `stopWatchingBlob` either do nothing 
or throw RuntimeException.  Should we remove these for now?  If we have a 
useful implementation, we can add them back later.


> Dist Cache: Basic Functionality
> ---
>
> Key: STORM-876
> URL: https://issues.apache.org/jira/browse/STORM-876
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: Robert Joseph Evans
> Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and 
> downloading dist cache items.  storm-core.ser, storm-conf.ser and storm.jar 
> should be written into the blob store instead of residing locally. We need a 
> default implementation of the blob store that does essentially what nimbus 
> currently does and does not need anything extra.  But having an HDFS backend 
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and 
> provide a working directory for the worker process with symlinks to the 
> blobs.  It should also allow the blobs to be updated and switch the symlink 
> atomically to point to the new blob once it is downloaded.
> All of this is already done by code internal to Yahoo! we are in the process 
> of getting it ready to push back to open source shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-19 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45425945
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/ClientBlobStore.java 
---
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.daemon.Shutdownable;
+import backtype.storm.generated.AccessControl;
+import backtype.storm.generated.AccessControlType;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.ReadableBlobMeta;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+
+import java.util.Iterator;
+import java.util.Map;
+
+
+public abstract class ClientBlobStore implements Shutdownable {
+  protected Map conf;
+
+  public abstract void prepare(Map conf);
+  protected abstract AtomicOutputStream createBlobToExtend(String key, 
SettableBlobMeta meta) throws AuthorizationException, KeyAlreadyExistsException;
+  public abstract AtomicOutputStream updateBlob(String key) throws 
AuthorizationException, KeyNotFoundException;
+  public abstract ReadableBlobMeta getBlobMeta(String key) throws 
AuthorizationException, KeyNotFoundException;
+  protected abstract void setBlobMetaToExtend(String key, SettableBlobMeta 
meta) throws AuthorizationException, KeyNotFoundException;
+  public abstract void deleteBlob(String key) throws 
AuthorizationException, KeyNotFoundException;
+  public abstract InputStreamWithMeta getBlob(String key) throws 
AuthorizationException, KeyNotFoundException;
+  public abstract Iterator listKeys();
+  public abstract void watchBlob(String key, IBlobWatcher watcher) throws 
AuthorizationException;
+  public abstract void stopWatchingBlob(String key) throws 
AuthorizationException;
--- End diff --

All implementations of `watchBlob` and `stopWatchingBlob` either do nothing 
or throw RuntimeException.  Should we remove these for now?  If we have a 
useful implementation, we can add them back later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15014958#comment-15014958
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45425624
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/ClientBlobStore.java 
---
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.daemon.Shutdownable;
+import backtype.storm.generated.AccessControl;
+import backtype.storm.generated.AccessControlType;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.ReadableBlobMeta;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
--- End diff --

Unused imports:
* `Config`
* `AccessControl`
* `AccessControlType`
* `Utils`



> Dist Cache: Basic Functionality
> ---
>
> Key: STORM-876
> URL: https://issues.apache.org/jira/browse/STORM-876
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: Robert Joseph Evans
> Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and 
> downloading dist cache items.  storm-core.ser, storm-conf.ser and storm.jar 
> should be written into the blob store instead of residing locally. We need a 
> default implementation of the blob store that does essentially what nimbus 
> currently does and does not need anything extra.  But having an HDFS backend 
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and 
> provide a working directory for the worker process with symlinks to the 
> blobs.  It should also allow the blobs to be updated and switch the symlink 
> atomically to point to the new blob once it is downloaded.
> All of this is already done by code internal to Yahoo! we are in the process 
> of getting it ready to push back to open source shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-19 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45425624
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/ClientBlobStore.java 
---
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.daemon.Shutdownable;
+import backtype.storm.generated.AccessControl;
+import backtype.storm.generated.AccessControlType;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.ReadableBlobMeta;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
--- End diff --

Unused imports:
* `Config`
* `AccessControl`
* `AccessControlType`
* `Utils`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-19 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45425519
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java 
---
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;;
+
+/**
+ * Is called periodically and updates the nimbus with blobs based on the 
state stored inside the zookeeper
+ */
+public class BlobSynchronizer {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BlobSynchronizer.class);
+  private CuratorFramework zkClient;
+  private Map conf;
+  private BlobStore blobStore;
+  private Set blobStoreKeySet = new HashSet();
+  private Set zookeeperKeySet = new HashSet();
+  private NimbusInfo nimbusInfo;
+
+  public BlobSynchronizer(BlobStore blobStore, Map conf) {
+this.blobStore = blobStore;
+this.conf = conf;
+  }
+
+  public void setNimbusInfo(NimbusInfo nimbusInfo) {
+this.nimbusInfo = nimbusInfo;
+  }
+
+  public void setZookeeperKeySet(Set zookeeperKeySet) {
+this.zookeeperKeySet = zookeeperKeySet;
+  }
+
+  public void setBlobStoreKeySet(Set blobStoreKeySet) {
+this.blobStoreKeySet = blobStoreKeySet;
+  }
+
+  public Set getBlobStoreKeySet() {
+Set keySet = new HashSet();
+keySet.addAll(blobStoreKeySet);
+return keySet;
+  }
+
+  public Set getZookeeperKeySet() {
+Set keySet = new HashSet();
+keySet.addAll(zookeeperKeySet);
+return keySet;
+  }
+
+  public synchronized void syncBlobs() {
+try {
+LOG.debug("Sync blobs - blobstore {} keys {} zookeeperkeys {}", 
blobStore, getBlobStoreKeySet(), getZookeeperKeySet());
+zkClient = Utils.createZKClient(conf);
+deleteKeySetFromBlobStoreNotOnZookeeper(getBlobStoreKeySet(), 
getZookeeperKeySet());
+updateKeySetForBlobStore(getBlobStoreKeySet());
+Set keySetToDownload = 
getKeySetToDownload(getBlobStoreKeySet(), getZookeeperKeySet());
+LOG.debug("Key Set Blobstore-> Zookeeper-> DownloadSet {}-> {}-> {}", 
getBlobStoreKeySet(), getZookeeperKeySet(), keySetToDownload);
+
+for (String key : keySetToDownload) {
+  Set nimbusInfoSet = 
Utils.getNimbodesWithLatestVersionOfBlob(zkClient, key);
+  if(Utils.downloadMissingBlob(conf, blobStore, key, nimbusInfoSet)) {
+Utils.createStateInZookeeper(conf, key, nimbusInfo);
+  }
+}
+if (zkClient !=null) {
+  zkClient.close();
+}
+} catch(InterruptedException exp) {
+LOG.error("InterruptedException {}", exp);
+} catch(TTransportException exp) {
+throw new RuntimeException(exp);
+} catch(Exception exp) {
+throw new RuntimeException(exp);
+}
--- End diff --

Do we need to catch `TTransportException` separately from `Exception`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15014954#comment-15014954
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45425519
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java 
---
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;;
+
+/**
+ * Is called periodically and updates the nimbus with blobs based on the 
state stored inside the zookeeper
+ */
+public class BlobSynchronizer {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BlobSynchronizer.class);
+  private CuratorFramework zkClient;
+  private Map conf;
+  private BlobStore blobStore;
+  private Set blobStoreKeySet = new HashSet();
+  private Set zookeeperKeySet = new HashSet();
+  private NimbusInfo nimbusInfo;
+
+  public BlobSynchronizer(BlobStore blobStore, Map conf) {
+this.blobStore = blobStore;
+this.conf = conf;
+  }
+
+  public void setNimbusInfo(NimbusInfo nimbusInfo) {
+this.nimbusInfo = nimbusInfo;
+  }
+
+  public void setZookeeperKeySet(Set zookeeperKeySet) {
+this.zookeeperKeySet = zookeeperKeySet;
+  }
+
+  public void setBlobStoreKeySet(Set blobStoreKeySet) {
+this.blobStoreKeySet = blobStoreKeySet;
+  }
+
+  public Set getBlobStoreKeySet() {
+Set keySet = new HashSet();
+keySet.addAll(blobStoreKeySet);
+return keySet;
+  }
+
+  public Set getZookeeperKeySet() {
+Set keySet = new HashSet();
+keySet.addAll(zookeeperKeySet);
+return keySet;
+  }
+
+  public synchronized void syncBlobs() {
+try {
+LOG.debug("Sync blobs - blobstore {} keys {} zookeeperkeys {}", 
blobStore, getBlobStoreKeySet(), getZookeeperKeySet());
+zkClient = Utils.createZKClient(conf);
+deleteKeySetFromBlobStoreNotOnZookeeper(getBlobStoreKeySet(), 
getZookeeperKeySet());
+updateKeySetForBlobStore(getBlobStoreKeySet());
+Set keySetToDownload = 
getKeySetToDownload(getBlobStoreKeySet(), getZookeeperKeySet());
+LOG.debug("Key Set Blobstore-> Zookeeper-> DownloadSet {}-> {}-> {}", 
getBlobStoreKeySet(), getZookeeperKeySet(), keySetToDownload);
+
+for (String key : keySetToDownload) {
+  Set nimbusInfoSet = 
Utils.getNimbodesWithLatestVersionOfBlob(zkClient, key);
+  if(Utils.downloadMissingBlob(conf, blobStore, key, nimbusInfoSet)) {
+Utils.createStateInZookeeper(conf, key, nimbusInfo);
+  }
+}
+if (zkClient !=null) {
+  zkClient.close();
+}
+} catch(InterruptedException exp) {
+LOG.error("InterruptedException {}", exp);
+} catch(TTransportException exp) {
+throw new RuntimeException(exp);
+} catch(Exception exp) {
+throw new RuntimeException(exp);
+}
--- End diff --

Do we need to catch `TTransportException` separately from `Exception`?


> Dist Cache: Basic Functionality
> ---
>
> Key: STORM-876
> URL: https://issues.apache.org/jira/browse/STORM-876
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: Robert Joseph Evans
> Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist C

[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15014945#comment-15014945
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45425302
  
--- Diff: 
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
 ---
@@ -0,0 +1,383 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.blobstore.AtomicOutputStream;
+import backtype.storm.blobstore.AtomicOutputStream;
+import backtype.storm.blobstore.BlobStore;
+import backtype.storm.blobstore.BlobStoreAclHandler;
+import backtype.storm.blobstore.BlobStoreFile;
+import backtype.storm.blobstore.InputStreamWithMeta;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.ReadableBlobMeta;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift7.TBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.io.ByteArrayOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.Iterator;
+import java.util.Map;
+
+import static backtype.storm.blobstore.BlobStoreAclHandler.ADMIN;
+import static backtype.storm.blobstore.BlobStoreAclHandler.READ;
+import static backtype.storm.blobstore.BlobStoreAclHandler.WRITE;
+
+/**
+ * Provides a HDFS file system backed blob store implementation.
+ * Note that this provides an api for having HDFS be the backing store for 
the blobstore,
+ * it is not a service/daemon.
+ */
+public class HdfsBlobStore extends BlobStore {
+  public static final Logger LOG = 
LoggerFactory.getLogger(HdfsBlobStore.class);
+  private static final String DATA_PREFIX = "data_";
+  private static final String META_PREFIX = "meta_";
+  private BlobStoreAclHandler _aclHandler;
+  private HdfsBlobStoreImpl _hbs;
+  private Subject _localSubject;
+  private Map conf;
+
+  /**
+   * Get the subject from Hadoop so we can use it to validate the acls. 
There is no direct
+   * interface from UserGroupInformation to get the subject, so do a doAs 
and get the context.
+   * We could probably run everything in the doAs but for now just grab 
the subject.
+   */
+  private Subject getHadoopUser() {
+Subject subj;
+try {
+  subj = UserGroupInformation.getCurrentUser().doAs(
+  new PrivilegedAction() {
+@Override
+public Subject run() {
+  return Subject.getSubject(AccessController.getContext());
+}
+  });
+} catch (IOException e) {
+  throw new RuntimeException("Error creating subject and logging user 
in!", e);
+}
+return subj;
+  }
+
+  /**
+   * If who is null then we want to use the user hadoop says we are.
+   * Required for the supervisor to call these routines as its not
+   * logged in as anyone.
+   */
+  private Subject checkAndGetSubject(Subject who) {
+if (who == null) {
+  return _localSubject;
+}
+return who;
+  }
+
+  @Override
+  public void prepare(Map conf, String overrideBase, NimbusInfo 
nimbus

[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-19 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45425302
  
--- Diff: 
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
 ---
@@ -0,0 +1,383 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.blobstore.AtomicOutputStream;
+import backtype.storm.blobstore.AtomicOutputStream;
+import backtype.storm.blobstore.BlobStore;
+import backtype.storm.blobstore.BlobStoreAclHandler;
+import backtype.storm.blobstore.BlobStoreFile;
+import backtype.storm.blobstore.InputStreamWithMeta;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.ReadableBlobMeta;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift7.TBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.io.ByteArrayOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.Iterator;
+import java.util.Map;
+
+import static backtype.storm.blobstore.BlobStoreAclHandler.ADMIN;
+import static backtype.storm.blobstore.BlobStoreAclHandler.READ;
+import static backtype.storm.blobstore.BlobStoreAclHandler.WRITE;
+
+/**
+ * Provides a HDFS file system backed blob store implementation.
+ * Note that this provides an api for having HDFS be the backing store for 
the blobstore,
+ * it is not a service/daemon.
+ */
+public class HdfsBlobStore extends BlobStore {
+  public static final Logger LOG = 
LoggerFactory.getLogger(HdfsBlobStore.class);
+  private static final String DATA_PREFIX = "data_";
+  private static final String META_PREFIX = "meta_";
+  private BlobStoreAclHandler _aclHandler;
+  private HdfsBlobStoreImpl _hbs;
+  private Subject _localSubject;
+  private Map conf;
+
+  /**
+   * Get the subject from Hadoop so we can use it to validate the acls. 
There is no direct
+   * interface from UserGroupInformation to get the subject, so do a doAs 
and get the context.
+   * We could probably run everything in the doAs but for now just grab 
the subject.
+   */
+  private Subject getHadoopUser() {
+Subject subj;
+try {
+  subj = UserGroupInformation.getCurrentUser().doAs(
+  new PrivilegedAction() {
+@Override
+public Subject run() {
+  return Subject.getSubject(AccessController.getContext());
+}
+  });
+} catch (IOException e) {
+  throw new RuntimeException("Error creating subject and logging user 
in!", e);
+}
+return subj;
+  }
+
+  /**
+   * If who is null then we want to use the user hadoop says we are.
+   * Required for the supervisor to call these routines as its not
+   * logged in as anyone.
+   */
+  private Subject checkAndGetSubject(Subject who) {
+if (who == null) {
+  return _localSubject;
+}
+return who;
+  }
+
+  @Override
+  public void prepare(Map conf, String overrideBase, NimbusInfo 
nimbusInfo) {
+this.conf = conf;
+prepareInternal(conf, overrideBase, null);
+  }
+
+  /**
+   * Allow a Hadoop Configuration to be passed for testing. If it's null 
then the hadoop configs
+   * must be in your classpath.

[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15014942#comment-15014942
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45425181
  
--- Diff: 
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
 ---
@@ -0,0 +1,383 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.blobstore.AtomicOutputStream;
+import backtype.storm.blobstore.AtomicOutputStream;
+import backtype.storm.blobstore.BlobStore;
+import backtype.storm.blobstore.BlobStoreAclHandler;
+import backtype.storm.blobstore.BlobStoreFile;
+import backtype.storm.blobstore.InputStreamWithMeta;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.ReadableBlobMeta;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift7.TBase;
--- End diff --

Are we still using org.apache.thrift7, or have we changed to 
org.apache.thrift?


> Dist Cache: Basic Functionality
> ---
>
> Key: STORM-876
> URL: https://issues.apache.org/jira/browse/STORM-876
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: Robert Joseph Evans
> Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and 
> downloading dist cache items.  storm-core.ser, storm-conf.ser and storm.jar 
> should be written into the blob store instead of residing locally. We need a 
> default implementation of the blob store that does essentially what nimbus 
> currently does and does not need anything extra.  But having an HDFS backend 
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and 
> provide a working directory for the worker process with symlinks to the 
> blobs.  It should also allow the blobs to be updated and switch the symlink 
> atomically to point to the new blob once it is downloaded.
> All of this is already done by code internal to Yahoo! we are in the process 
> of getting it ready to push back to open source shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-19 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45425181
  
--- Diff: 
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
 ---
@@ -0,0 +1,383 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.blobstore.AtomicOutputStream;
+import backtype.storm.blobstore.AtomicOutputStream;
+import backtype.storm.blobstore.BlobStore;
+import backtype.storm.blobstore.BlobStoreAclHandler;
+import backtype.storm.blobstore.BlobStoreFile;
+import backtype.storm.blobstore.InputStreamWithMeta;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.ReadableBlobMeta;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift7.TBase;
--- End diff --

Are we still using org.apache.thrift7, or have we changed to 
org.apache.thrift?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15014941#comment-15014941
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45425150
  
--- Diff: 
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
 ---
@@ -0,0 +1,383 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.blobstore.AtomicOutputStream;
+import backtype.storm.blobstore.AtomicOutputStream;
+import backtype.storm.blobstore.BlobStore;
+import backtype.storm.blobstore.BlobStoreAclHandler;
+import backtype.storm.blobstore.BlobStoreFile;
+import backtype.storm.blobstore.InputStreamWithMeta;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.ReadableBlobMeta;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.NimbusClient;
--- End diff --

Unused?


> Dist Cache: Basic Functionality
> ---
>
> Key: STORM-876
> URL: https://issues.apache.org/jira/browse/STORM-876
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: Robert Joseph Evans
> Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and 
> downloading dist cache items.  storm-core.ser, storm-conf.ser and storm.jar 
> should be written into the blob store instead of residing locally. We need a 
> default implementation of the blob store that does essentially what nimbus 
> currently does and does not need anything extra.  But having an HDFS backend 
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and 
> provide a working directory for the worker process with symlinks to the 
> blobs.  It should also allow the blobs to be updated and switch the symlink 
> atomically to point to the new blob once it is downloaded.
> All of this is already done by code internal to Yahoo! we are in the process 
> of getting it ready to push back to open source shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15014940#comment-15014940
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45425144
  
--- Diff: 
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
 ---
@@ -0,0 +1,383 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.blobstore.AtomicOutputStream;
+import backtype.storm.blobstore.AtomicOutputStream;
--- End diff --

Duplicate import


> Dist Cache: Basic Functionality
> ---
>
> Key: STORM-876
> URL: https://issues.apache.org/jira/browse/STORM-876
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: Robert Joseph Evans
> Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and 
> downloading dist cache items.  storm-core.ser, storm-conf.ser and storm.jar 
> should be written into the blob store instead of residing locally. We need a 
> default implementation of the blob store that does essentially what nimbus 
> currently does and does not need anything extra.  But having an HDFS backend 
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and 
> provide a working directory for the worker process with symlinks to the 
> blobs.  It should also allow the blobs to be updated and switch the symlink 
> atomically to point to the new blob once it is downloaded.
> All of this is already done by code internal to Yahoo! we are in the process 
> of getting it ready to push back to open source shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-19 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45425150
  
--- Diff: 
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
 ---
@@ -0,0 +1,383 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.blobstore.AtomicOutputStream;
+import backtype.storm.blobstore.AtomicOutputStream;
+import backtype.storm.blobstore.BlobStore;
+import backtype.storm.blobstore.BlobStoreAclHandler;
+import backtype.storm.blobstore.BlobStoreFile;
+import backtype.storm.blobstore.InputStreamWithMeta;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.ReadableBlobMeta;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.NimbusClient;
--- End diff --

Unused?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-19 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45425144
  
--- Diff: 
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
 ---
@@ -0,0 +1,383 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.blobstore.AtomicOutputStream;
+import backtype.storm.blobstore.AtomicOutputStream;
--- End diff --

Duplicate import


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   >