Re: Getting total number of bolt tasks from Storm's REST API
Help. Anyone? On Thu, Sep 15, 2016 at 3:47 PM, Navin Ipewrote: > Hi, > > I've been using Storm's REST API (http://domainName:8080/api/ > v1/topology/summary) to retrieve the number of bolts in my topology. > > > > > > > *JSONObject topology = new JSONObject(jsonText);JSONArray topos = > (JSONArray) topology.get(bolts);for(int i = 0; i < topos.length(); ++i) { > JSONObject j = topos.getJSONObject(i); boltDetails.put(boltId, > j.getString(boltId)); boltDetails.put(tasks, j.getInt(tasks));}* > > I was earlier using just 1 bolt MyBolt1, and created 10 tasks with it. The > REST API correctly informed me that there were 10 bolts. > > When I added another bolt MyBolt2, and created another 10 tasks with it, > the REST API is still showing 10 bolts instead of 20 bolts. A single spout > (with 5 spout tasks) is emitting to both bolts. > > Moreover, storm returns tasksTotal as = 40. > > Why is tasksTotal not = 10+10+5 = 25? > How do I get the total number of bolt tasks only? 10+10=20? > > -- > Regards, > Navin > -- Regards, Navin
Re: How will storm replay the tuple tree?
Thank you Ambud, very comprehensive answer! On Wed, Sep 14, 2016 at 9:55 PM, Ambud Sharmawrote: > Two things here extending what Ravi talked about: > > 1. You fail tuples either explicitly or they timeout as an indicator of a > recoverable issue in the topology. > > If the error is not recoverable don't fail the tuple, ack it and forward > the error to another bolt so you can record it somewhere for further > investigation like kafka (we have a topic in kafka for this) > > 2. Real-time processing means you have to worry about latencies at the > nano second level at times, this means fail fast strategy must be used. > Point to point failure at the granularity of a single tuple can be > implemented using transactions with size of 1. This will slow down the > topology substantially. You can try an implementation yourself and see. > > The XOR based tuple tree is a genius innovation from Nathan Marz to do > tuple tracking very very fast while using predictable memory. So regardless > of however many hops your tuple has to go through Storm uses 20 bytes to > track it down. > > > > Now about exactly once processing. There is no such this as exactly once > processing unless using transactions with batch size of 1. (Including > trident) > > What topology developers should focus on is idempotent processing! > > What does that mean? Idempotent processing means if your tuple was to > replay the result would not change. So if you are using trident > micro-batching or you wrote your own micro-batching in Storm the net result > is, in case of failures your tuples will replay but your are okay doing > that since your net result will be the same. > > With trident it will not process the next batch until the current one is > processed. Which means the entire batch has to be handled via rollback > transactions (as in you flush to the db at the end of the batch) or better > write to db in an idempotent manner where each tuple has an id such that if > you wrote it again it will just rewrite the same info. > > Most modern data stores have the concept of a key which can be used e.g. > elastic document id, hbase row key, MySQL primary key etc. > > Now how to get UUID of the tuple? > 1. Handle in your application logic if you already know what is a unique > event > 2. Worry from Kafka onwards (we do this) use partition id + offset + event > timestamp (inside the event payload) as the UUID > 3. MD5 the payload of the event (have a risk of collision here depending > on your event volume and application logic) > > For things like unique counting you can use in-memory approach like we did > (Hendrix) or use something like Redis with structures like set and > hhperloglog. > > Thanks, > Ambud > > On Sep 14, 2016 1:38 AM, "Cheney Chen" wrote: > >> Thank you guys for the discussion. >> >> What if I want exact-once processing for all nodes (bolts), even when >> failure happens, will Trident be the one? >> >> On Wed, Sep 14, 2016 at 3:49 PM, Ravi Sharma wrote: >> >>> Hi T.I. >>> Few things why Spout is responsible for replay rather then Various Bolts. >>> >>> 1. ack and fail messages carry only message ID, Usually your spouts >>> generate messaged Id and knows what tuple/message is linked to it(via >>> source i.e. jms etc). If ack or fail happens then Spout can do various >>> things like on ack delete from queue, on fail put in some dead letter >>> queue. intermediate Bolt Wont know what message it sent, unless you >>> implement something of your own. Technically you can put Delete message >>> from JMS in bolts but then your whole topology knows from where you are >>> getting data, what if tommorow you start processing data from JMS, Http >>> rest service, Database and file system etc. >>> >>> 2. BoltB fails, it tells BoltA, BoltA retry 3 times, it fails 3 times, >>> now what BoltA should do,? Send it to another bolt(say BoltPreA exists >>> between him and spout) or send it to Spout.? >>> If it sends to BoltPreA that means BoltPreA will retry 3 >>> times(just using 3 number consider as N), that means for each try to >>> BoltPreA, BoltA will retry again 3 times, so total 9 retries.(basically >>> total retries will be based on Total bolt from Spout to Failure Bolt TB and >>> total Retries TR, it will be like TR + Power(TR,2) . + Power(TR,TB) >>> If you send back from failure from BoltA to Spout then we can >>> argue why not send it to Spout from BoltB, as a framework i shouldnt be >>> looking into if BoltB is really costly or BoltA is really costly. >>> >>> 3. Also failure scenario are suppose to be really really low, and if >>> your database is down(means 100% tuple will fail), then performance wont be >>> your only concern. your concern will be to make sure database comes up and >>> reprocess all failed tuple. >>> >>> 4. Also you will have to take care of retry logic in every Bolt. >>> Currently its only at one place. >>> >>>
Trident partitioned windows
Hi community, I am trying to use Storm with Trident API. My use case is, partitioning stream and making aggregations on partitioned sliding windows. However, when I debug the outputs, I see that the state of windows in all partitions are same. So, I would expect, if the tuples' keys are different then they go to different partitions and are processed on different windows. Therefore, the state in partitioned windows should not be same. I am running application on local machine. Am I doing something wrong? or are partitioned windows not supported in Trident API? Here is my code: .. ... topology .newStream("aggregation", spout) .each(new Fields("json"), new SelectFields(), new Fields("geo","val","max_price","min_price")).parallelismHint(parallelism) .partitionBy(new Fields("geo")).parallelismHint(parallelism) .slidingWindow(new BaseWindowedBolt.Duration(slideWindowLength, TimeUnit.MILLISECONDS), new BaseWindowedBolt.Duration(slideWindowSlide, TimeUnit.MILLISECONDS), new InMemoryWindowsStoreFactory(), new Fields("geo","val","max_price","min_price") , new MinMaxAggregator(), new Fields("geo","val","max_price","min_price")).parallelismHint(parallelism). peek(new Consumer() { @Override public void accept(TridentTuple input) { System.out.println( input); } }); . @SuppressWarnings("serial") class SelectFields extends BaseFunction { @Override public void execute(TridentTuple tuple, TridentCollector collector) { JSONObject obj = new JSONObject(tuple.getString(0)); String geo = obj.getJSONObject("t").getString("geo"); Double price = obj.getJSONObject("m").getDouble("price"); collector.emit( new Values( geo, System.nanoTime(), price, price )); } } class MinMaxAggregator extends BaseAggregator { class State { double max = 0.0; double min = 0.0; long val = 0; String id = ""; } @Override public State init(Object batchId, TridentCollector collector) { return new State(); } @Override public void aggregate(State state, TridentTuple tuple, TridentCollector collector) { Double maxPrice = tuple.getDouble(2); Double minPrice = tuple.getDouble(3); Long val = tuple.getLong(1); String id = tuple.getString(0); state.val = val; state.max = Math.max(state.max, maxPrice); state.min = Math.min(state.min, minPrice); } @Override public void complete(State state, TridentCollector collector) { collector.emit(new Values(state.id, state.val, state.max, state.min)); } } -- -Cheers Jeyhun
Frame size issue with DRPC - Storm 1.0.1
Hello Storm group, We are in process of migrating from storm 0.9.3 to 1.0.1 but are facing an issue with DRPC where we see an exception of "frame size greater than 16MB" while processing DRPC request. We have NOT changed the default SimpleTransportPlugin. Solutions tried so far, 1. Updated the nimbus and DRPC max_buffer_size in storm.yaml to a value greater than 16 MB. But the plug-in still picks up 16MB harcoded size. 2. SimpleTransportPlugin seems to be deprecated so tried using PlainSaslTransportPlugin but that fails with "Invalid status: 0" exception. Has anyone encountered similar issue with DRPC Storm 1.0.1 ? Regards, Dev
Getting total number of bolt tasks from Storm's REST API
Hi, I've been using Storm's REST API ( http://domainName:8080/api/v1/topology/summary) to retrieve the number of bolts in my topology. *JSONObject topology = new JSONObject(jsonText);JSONArray topos = (JSONArray) topology.get(bolts);for(int i = 0; i < topos.length(); ++i) { JSONObject j = topos.getJSONObject(i); boltDetails.put(boltId, j.getString(boltId)); boltDetails.put(tasks, j.getInt(tasks));}* I was earlier using just 1 bolt MyBolt1, and created 10 tasks with it. The REST API correctly informed me that there were 10 bolts. When I added another bolt MyBolt2, and created another 10 tasks with it, the REST API is still showing 10 bolts instead of 20 bolts. A single spout (with 5 spout tasks) is emitting to both bolts. Moreover, storm returns tasksTotal as = 40. Why is tasksTotal not = 10+10+5 = 25? How do I get the total number of bolt tasks only? 10+10=20? -- Regards, Navin
Storm Integration Tests
Hi Guys, Recently i have written a small framework for integration tests(including flux yaml file), thought of sharing with you all. May be it can help someone. https://github.com/ping2ravi/storm-integration-test Thanks Ravi.
KafkaSpout failed - stream: default not found
Hi, I’ve defined the following KafkaSpout: BrokerHosts hosts = new ZkHosts("localhost:2181"); SpoutConfig spoutConfig = new SpoutConfig(hosts, "bytes", "", UUID.randomUUID().toString()); spoutConfig.scheme = new RawMultiScheme(); KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("bytes", kafkaSpout); But, after submitting the topology using the StormSubmitter, and executing bin/storm monitor topology -m bytes I get the following error: Exception in thread "main" java.lang.IllegalArgumentException: stream: default not found at org.apache.storm.utils.Monitor.metrics(Monitor.java:223) at org.apache.storm.utils.Monitor.metrics(Monitor.java:159) at org.apache.storm.command.monitor$_main.doInvoke(monitor.clj:36) at clojure.lang.RestFn.applyTo(RestFn.java:137) at org.apache.storm.command.monitor.main(Unknown Source) which to me indicates that Storm i.e. the KafkaSpout could have not connect to the Kafka broker and its topic respectively, hence no stream has been constructed. Does anyone have experience with similar issues? Is my definition of the KafkaSpout valid. For integrating Kafka, I’ve followed the official Storm documentation http://storm.apache.org/releases/2.0.0-SNAPSHOT/storm-kafka.html. How does Storm connect to Kafka actually? Because from both the docs, and the example of mine, no explicit definition of the Kafka bootstrap servers was given - such as localhost:9092 (assuming Kafka is using the default 9092 port). Thanks in advance! Dominik