Hi community,
I am trying to use Storm with Trident API. My use case is, partitioning
stream and making aggregations on partitioned sliding windows.
However, when I debug the outputs, I see that the state of windows in all
partitions are same. So, I would expect, if the tuples' keys are different
then they go to different partitions and are processed on different
windows. Therefore, the state in partitioned windows should not be same. I
am running application on local machine. Am I doing something wrong? or are
partitioned windows not supported in Trident API?
Here is my code:
......
.......
topology
.newStream("aggregation", spout)
.each(new Fields("json"), new SelectFields(), new
Fields("geo","val","max_price","min_price")).parallelismHint(parallelism)
.partitionBy(new Fields("geo")).parallelismHint(parallelism)
.slidingWindow(new
BaseWindowedBolt.Duration(slideWindowLength, TimeUnit.MILLISECONDS),
new BaseWindowedBolt.Duration(slideWindowSlide,
TimeUnit.MILLISECONDS),
new InMemoryWindowsStoreFactory(),
new Fields("geo","val","max_price","min_price") ,
new MinMaxAggregator(),
new
Fields("geo","val","max_price","min_price")).parallelismHint(parallelism).
peek(new Consumer() {
@Override
public void accept(TridentTuple input) {
System.out.println( input);
}
});
........
.........
@SuppressWarnings("serial")
class SelectFields extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
JSONObject obj = new JSONObject(tuple.getString(0));
String geo = obj.getJSONObject("t").getString("geo");
Double price = obj.getJSONObject("m").getDouble("price");
collector.emit( new Values(
geo,
System.nanoTime(),
price,
price
));
}
}
class MinMaxAggregator extends BaseAggregator<MinMaxAggregator.State> {
class State {
double max = 0.0;
double min = 0.0;
long val = 0;
String id = "";
}
@Override
public State init(Object batchId, TridentCollector collector) {
return new State();
}
@Override
public void aggregate(State state, TridentTuple tuple, TridentCollector
collector) {
Double maxPrice = tuple.getDouble(2);
Double minPrice = tuple.getDouble(3);
Long val = tuple.getLong(1);
String id = tuple.getString(0);
state.val = val;
state.max = Math.max(state.max, maxPrice);
state.min = Math.min(state.min, minPrice);
}
@Override
public void complete(State state, TridentCollector collector) {
collector.emit(new Values(state.id, state.val, state.max,
state.min));
}
}
--
-Cheers
Jeyhun