Re: Getting total number of bolt tasks from Storm's REST API

2016-09-15 Thread Navin Ipe
Help. Anyone?

On Thu, Sep 15, 2016 at 3:47 PM, Navin Ipe 
wrote:

> Hi,
>
> I've been using Storm's REST API (http://domainName:8080/api/
> v1/topology/summary) to retrieve the number of bolts in my topology.
>
>
>
>
>
>
> *JSONObject topology = new JSONObject(jsonText);JSONArray topos =
> (JSONArray) topology.get(bolts);for(int i = 0; i < topos.length(); ++i) {
> JSONObject j = topos.getJSONObject(i);   boltDetails.put(boltId,
> j.getString(boltId));   boltDetails.put(tasks, j.getInt(tasks));}*
>
> I was earlier using just 1 bolt MyBolt1, and created 10 tasks with it. The
> REST API correctly informed me that there were 10 bolts.
>
> When I added another bolt MyBolt2, and created another 10 tasks with it,
> the REST API is still showing 10 bolts instead of 20 bolts. A single spout
> (with 5 spout tasks) is emitting to both bolts.
>
> Moreover, storm returns tasksTotal as = 40.
>
> Why is tasksTotal not = 10+10+5 = 25?
> How do I get the total number of bolt tasks only? 10+10=20?
>
> --
> Regards,
> Navin
>



-- 
Regards,
Navin


Re: How will storm replay the tuple tree?

2016-09-15 Thread Cheney Chen
Thank you Ambud, very comprehensive answer!

On Wed, Sep 14, 2016 at 9:55 PM, Ambud Sharma 
wrote:

> Two things here extending what Ravi talked about:
>
> 1. You fail tuples either explicitly or they timeout as an indicator of a
> recoverable issue in the topology.
>
> If the error is not recoverable don't fail the tuple, ack it and forward
> the error to another bolt so you can record it somewhere for further
> investigation like kafka (we have a topic in kafka for this)
>
> 2. Real-time processing means you have to worry about latencies at the
> nano second level at times, this means fail fast strategy must be used.
> Point to point failure at the granularity of a single tuple can be
> implemented using transactions with size of 1. This will slow down the
> topology substantially. You can try an implementation yourself and see.
>
> The XOR based tuple tree is a genius innovation from Nathan Marz to do
> tuple tracking very very fast while using predictable memory. So regardless
> of however many hops your tuple has to go through Storm uses 20 bytes to
> track it down.
>
> 
>
> Now about exactly once processing. There is no such this as exactly once
> processing unless using transactions with batch size of 1. (Including
> trident)
>
> What topology developers should focus on is idempotent processing!
>
> What does that mean? Idempotent processing means if your tuple was to
> replay the result would not change. So if you are using trident
> micro-batching or you wrote your own micro-batching in Storm the net result
> is, in case of failures your tuples will replay but your are okay doing
> that since your net result will be the same.
>
> With trident it will not process the next batch until the current one is
> processed. Which means the entire batch has to be handled via rollback
> transactions (as in you flush to the db at the end of the batch) or better
> write to db in an idempotent manner where each tuple has an id such that if
> you wrote it again it will just rewrite the same info.
>
> Most modern data stores have the concept of a key which can be used e.g.
> elastic document id,  hbase row key, MySQL primary key etc.
>
> Now how to get UUID of the tuple?
> 1. Handle in your application logic if you already know what is a unique
> event
> 2. Worry from Kafka onwards (we do this) use partition id + offset + event
> timestamp (inside the event payload) as the UUID
> 3. MD5 the payload of the event (have a risk of collision here depending
> on your event volume and application logic)
>
> For things like unique counting you can use in-memory approach like we did
> (Hendrix) or use something like Redis with structures like set and
> hhperloglog.
>
> Thanks,
> Ambud
>
> On Sep 14, 2016 1:38 AM, "Cheney Chen"  wrote:
>
>> Thank you guys for the discussion.
>>
>> What if I want exact-once processing for all nodes (bolts), even when
>> failure happens, will Trident be the one?
>>
>> On Wed, Sep 14, 2016 at 3:49 PM, Ravi Sharma  wrote:
>>
>>> Hi T.I.
>>> Few things why Spout is responsible for replay rather then Various Bolts.
>>>
>>> 1. ack and fail messages carry only message ID, Usually your spouts
>>> generate messaged Id and knows what tuple/message is linked to it(via
>>> source i.e. jms  etc). If ack or fail happens then Spout can do various
>>> things like on ack delete from queue, on fail put in some dead letter
>>> queue. intermediate Bolt Wont know what message it sent, unless you
>>> implement something of your own. Technically you can put Delete message
>>> from JMS in bolts but then your whole topology knows from where you are
>>> getting data, what if tommorow you start processing data from JMS, Http
>>> rest service, Database and file system etc.
>>>
>>> 2. BoltB fails, it tells BoltA, BoltA retry 3 times, it fails 3 times,
>>> now what BoltA should do,? Send it to another bolt(say BoltPreA exists
>>> between him and spout) or send it to Spout.?
>>> If it sends to BoltPreA that means BoltPreA will retry 3
>>> times(just using 3 number consider as N), that means for each try to
>>> BoltPreA, BoltA will retry again 3 times, so total 9 retries.(basically
>>> total retries will be based on Total bolt from Spout to Failure Bolt TB and
>>> total Retries TR, it will be like TR + Power(TR,2) . + Power(TR,TB)
>>> If you send back from failure from BoltA to Spout then we can
>>> argue why not send it to Spout from BoltB, as a framework i shouldnt be
>>> looking into if BoltB is really costly or BoltA is really costly.
>>>
>>> 3. Also failure scenario are suppose to be really really low, and if
>>> your database is down(means 100% tuple will fail), then performance wont be
>>> your only concern. your concern will be to make sure database comes up and
>>> reprocess all failed tuple.
>>>
>>> 4. Also you will have to take care of retry logic in every Bolt.
>>> Currently its only at one place.
>>>
>>>

Trident partitioned windows

2016-09-15 Thread Jeyhun Karimov
Hi community,

I am trying to use Storm with Trident API. My use case is, partitioning
stream and making aggregations on partitioned sliding windows.
However, when I debug the outputs, I see that the state of windows in all
partitions are same. So, I would expect, if the tuples' keys are different
then they go to different partitions and are processed on different
windows. Therefore, the state in partitioned windows should not be same. I
am running application on local machine. Am I doing something wrong? or are
partitioned windows not supported in Trident API?
Here is my code:

..
...
topology
.newStream("aggregation", spout)
.each(new Fields("json"), new SelectFields(), new
Fields("geo","val","max_price","min_price")).parallelismHint(parallelism)
.partitionBy(new Fields("geo")).parallelismHint(parallelism)
.slidingWindow(new
BaseWindowedBolt.Duration(slideWindowLength, TimeUnit.MILLISECONDS),
new BaseWindowedBolt.Duration(slideWindowSlide,
TimeUnit.MILLISECONDS),
new InMemoryWindowsStoreFactory(),
new Fields("geo","val","max_price","min_price")  ,
new MinMaxAggregator(),
new
Fields("geo","val","max_price","min_price")).parallelismHint(parallelism).
peek(new Consumer() {
@Override
public void accept(TridentTuple input) {
System.out.println( input);
}
});


.

@SuppressWarnings("serial")
class SelectFields extends BaseFunction {

@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
JSONObject obj = new JSONObject(tuple.getString(0));
String geo = obj.getJSONObject("t").getString("geo");
Double price = obj.getJSONObject("m").getDouble("price");
collector.emit( new Values(
geo,
System.nanoTime(),
price,
price
));
}
}

class MinMaxAggregator extends BaseAggregator {
 class State {
double max = 0.0;
double min = 0.0;
long val = 0;
String id = "";
}

@Override
public State init(Object batchId, TridentCollector collector) {
return new State();
}

@Override
public void aggregate(State state, TridentTuple tuple, TridentCollector
collector) {
Double maxPrice = tuple.getDouble(2);
Double minPrice = tuple.getDouble(3);
Long val = tuple.getLong(1);
String id = tuple.getString(0);
state.val = val;
state.max = Math.max(state.max, maxPrice);
state.min = Math.min(state.min, minPrice);
}

@Override
public void complete(State state, TridentCollector collector) {
collector.emit(new Values(state.id, state.val, state.max,
state.min));
}

}

-- 
-Cheers

Jeyhun


Frame size issue with DRPC - Storm 1.0.1

2016-09-15 Thread Devang Shah
Hello Storm group,

We are in process of migrating from storm 0.9.3 to 1.0.1 but are facing an
issue with DRPC where we see an exception of "frame size greater than 16MB"
while processing DRPC request. We have NOT changed the default
SimpleTransportPlugin.

Solutions tried so far,

1. Updated the nimbus and DRPC max_buffer_size in storm.yaml to a value
greater than 16 MB. But the plug-in still picks up 16MB harcoded size.

2. SimpleTransportPlugin seems to be deprecated so tried using
PlainSaslTransportPlugin but that fails with  "Invalid status: 0" exception.

Has anyone encountered similar issue with DRPC Storm 1.0.1 ?

Regards,
Dev


Getting total number of bolt tasks from Storm's REST API

2016-09-15 Thread Navin Ipe
Hi,

I've been using Storm's REST API (
http://domainName:8080/api/v1/topology/summary) to retrieve the number of
bolts in my topology.






*JSONObject topology = new JSONObject(jsonText);JSONArray topos =
(JSONArray) topology.get(bolts);for(int i = 0; i < topos.length(); ++i) {
JSONObject j = topos.getJSONObject(i);   boltDetails.put(boltId,
j.getString(boltId));   boltDetails.put(tasks, j.getInt(tasks));}*

I was earlier using just 1 bolt MyBolt1, and created 10 tasks with it. The
REST API correctly informed me that there were 10 bolts.

When I added another bolt MyBolt2, and created another 10 tasks with it,
the REST API is still showing 10 bolts instead of 20 bolts. A single spout
(with 5 spout tasks) is emitting to both bolts.

Moreover, storm returns tasksTotal as = 40.

Why is tasksTotal not = 10+10+5 = 25?
How do I get the total number of bolt tasks only? 10+10=20?

-- 
Regards,
Navin


Storm Integration Tests

2016-09-15 Thread Ravi Sharma
Hi Guys,
Recently i have written a small framework for integration tests(including
flux yaml file), thought of sharing with you all. May be it can help
someone.

https://github.com/ping2ravi/storm-integration-test


Thanks
Ravi.


KafkaSpout failed - stream: default not found

2016-09-15 Thread Dominik Safaric
Hi, 

I’ve defined the following KafkaSpout: 
BrokerHosts hosts = new ZkHosts("localhost:2181");
SpoutConfig spoutConfig = new SpoutConfig(hosts, "bytes", "", 
UUID.randomUUID().toString());
spoutConfig.scheme = new RawMultiScheme();
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("bytes", kafkaSpout);
But, after submitting the topology using the StormSubmitter, and executing  
bin/storm monitor topology -m bytes I get the following error:

Exception in thread "main" java.lang.IllegalArgumentException: stream: default 
not found
at org.apache.storm.utils.Monitor.metrics(Monitor.java:223)
at org.apache.storm.utils.Monitor.metrics(Monitor.java:159)
at org.apache.storm.command.monitor$_main.doInvoke(monitor.clj:36)
at clojure.lang.RestFn.applyTo(RestFn.java:137)
at org.apache.storm.command.monitor.main(Unknown Source)

which to me indicates that Storm i.e. the KafkaSpout could have not connect to 
the Kafka broker and its topic respectively, hence no stream has been 
constructed.

Does anyone have experience with similar issues? Is my definition of the 
KafkaSpout valid. For integrating Kafka, I’ve followed the official Storm 
documentation http://storm.apache.org/releases/2.0.0-SNAPSHOT/storm-kafka.html. 

How does Storm connect to Kafka actually? Because from both the docs, and the 
example of mine, no explicit definition of the Kafka bootstrap servers was 
given - such as localhost:9092 (assuming Kafka is using the default 9092 port). 

Thanks in advance!

Dominik