http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/coordination/BatchSubtopologyBuilder.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/coordination/BatchSubtopologyBuilder.java b/jstorm-core/src/main/java/backtype/storm/coordination/BatchSubtopologyBuilder.java index 2a77f3b..ce4c955 100755 --- a/jstorm-core/src/main/java/backtype/storm/coordination/BatchSubtopologyBuilder.java +++ b/jstorm-core/src/main/java/backtype/storm/coordination/BatchSubtopologyBuilder.java @@ -42,87 +42,83 @@ public class BatchSubtopologyBuilder { Map<String, Component> _bolts = new HashMap<String, Component>(); Component _masterBolt; String _masterId; - + public BatchSubtopologyBuilder(String masterBoltId, IBasicBolt masterBolt, Number boltParallelism) { Integer p = boltParallelism == null ? null : boltParallelism.intValue(); _masterBolt = new Component(new BasicBoltExecutor(masterBolt), p); _masterId = masterBoltId; } - + public BatchSubtopologyBuilder(String masterBoltId, IBasicBolt masterBolt) { this(masterBoltId, masterBolt, null); } - + public BoltDeclarer getMasterDeclarer() { return new BoltDeclarerImpl(_masterBolt); } - + public BoltDeclarer setBolt(String id, IBatchBolt bolt) { return setBolt(id, bolt, null); } - + public BoltDeclarer setBolt(String id, IBatchBolt bolt, Number parallelism) { return setBolt(id, new BatchBoltExecutor(bolt), parallelism); - } - + } + public BoltDeclarer setBolt(String id, IBasicBolt bolt) { return setBolt(id, bolt, null); - } - + } + public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism) { return setBolt(id, new BasicBoltExecutor(bolt), parallelism); } - + private BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism) { Integer p = null; - if(parallelism!=null) p = parallelism.intValue(); + if (parallelism != null) + p = parallelism.intValue(); Component component = new Component(bolt, p); _bolts.put(id, component); return new BoltDeclarerImpl(component); } - + public void extendTopology(TopologyBuilder builder) { BoltDeclarer declarer = builder.setBolt(_masterId, new CoordinatedBolt(_masterBolt.bolt), _masterBolt.parallelism); - for(InputDeclaration decl: _masterBolt.declarations) { + for (InputDeclaration decl : _masterBolt.declarations) { decl.declare(declarer); } - for(Map conf: _masterBolt.componentConfs) { + for (Map conf : _masterBolt.componentConfs) { declarer.addConfigurations(conf); } - for(String id: _bolts.keySet()) { + for (String id : _bolts.keySet()) { Component component = _bolts.get(id); Map<String, SourceArgs> coordinatedArgs = new HashMap<String, SourceArgs>(); - for(String c: componentBoltSubscriptions(component)) { + for (String c : componentBoltSubscriptions(component)) { SourceArgs source; - if(c.equals(_masterId)) { + if (c.equals(_masterId)) { source = SourceArgs.single(); } else { source = SourceArgs.all(); } - coordinatedArgs.put(c, source); + coordinatedArgs.put(c, source); } - - - BoltDeclarer input = builder.setBolt(id, - new CoordinatedBolt(component.bolt, - coordinatedArgs, - null), - component.parallelism); - for(Map conf: component.componentConfs) { + + BoltDeclarer input = builder.setBolt(id, new CoordinatedBolt(component.bolt, coordinatedArgs, null), component.parallelism); + for (Map conf : component.componentConfs) { input.addConfigurations(conf); } - for(String c: componentBoltSubscriptions(component)) { + for (String c : componentBoltSubscriptions(component)) { input.directGrouping(c, Constants.COORDINATED_STREAM_ID); } - for(InputDeclaration d: component.declarations) { + for (InputDeclaration d : component.declarations) { d.declare(input); } - } + } } - + private Set<String> componentBoltSubscriptions(Component component) { Set<String> ret = new HashSet<String>(); - for(InputDeclaration d: component.declarations) { + for (InputDeclaration d : component.declarations) { ret.add(d.getComponent()); } return ret; @@ -133,25 +129,26 @@ public class BatchSubtopologyBuilder { public Integer parallelism; public List<InputDeclaration> declarations = new ArrayList<InputDeclaration>(); public List<Map> componentConfs = new ArrayList<Map>(); - + public Component(IRichBolt bolt, Integer parallelism) { this.bolt = bolt; this.parallelism = parallelism; } } - + private static interface InputDeclaration { void declare(InputDeclarer declarer); + String getComponent(); } - + private class BoltDeclarerImpl extends BaseConfigurationDeclarer<BoltDeclarer> implements BoltDeclarer { Component _component; - + public BoltDeclarerImpl(Component component) { _component = component; } - + @Override public BoltDeclarer fieldsGrouping(final String component, final Fields fields) { addDeclaration(new InputDeclaration() { @@ -163,7 +160,7 @@ public class BatchSubtopologyBuilder { @Override public String getComponent() { return component; - } + } }); return this; } @@ -174,12 +171,12 @@ public class BatchSubtopologyBuilder { @Override public void declare(InputDeclarer declarer) { declarer.fieldsGrouping(component, streamId, fields); - } + } @Override public String getComponent() { return component; - } + } }); return this; } @@ -190,12 +187,12 @@ public class BatchSubtopologyBuilder { @Override public void declare(InputDeclarer declarer) { declarer.globalGrouping(component); - } + } @Override public String getComponent() { return component; - } + } }); return this; } @@ -206,12 +203,12 @@ public class BatchSubtopologyBuilder { @Override public void declare(InputDeclarer declarer) { declarer.globalGrouping(component, streamId); - } + } @Override public String getComponent() { return component; - } + } }); return this; } @@ -222,12 +219,12 @@ public class BatchSubtopologyBuilder { @Override public void declare(InputDeclarer declarer) { declarer.shuffleGrouping(component); - } + } @Override public String getComponent() { return component; - } + } }); return this; } @@ -238,12 +235,12 @@ public class BatchSubtopologyBuilder { @Override public void declare(InputDeclarer declarer) { declarer.shuffleGrouping(component, streamId); - } + } @Override public String getComponent() { return component; - } + } }); return this; } @@ -254,12 +251,12 @@ public class BatchSubtopologyBuilder { @Override public void declare(InputDeclarer declarer) { declarer.localOrShuffleGrouping(component); - } + } @Override public String getComponent() { return component; - } + } }); return this; } @@ -270,8 +267,8 @@ public class BatchSubtopologyBuilder { @Override public void declare(InputDeclarer declarer) { declarer.localOrShuffleGrouping(component, streamId); - } - + } + @Override public String getComponent() { return component; @@ -279,7 +276,7 @@ public class BatchSubtopologyBuilder { }); return this; } - + @Override public BoltDeclarer localFirstGrouping(final String componentId) { addDeclaration(new InputDeclaration() { @@ -287,7 +284,7 @@ public class BatchSubtopologyBuilder { public void declare(InputDeclarer declarer) { declarer.localFirstGrouping(componentId); } - + @Override public String getComponent() { return componentId; @@ -295,7 +292,7 @@ public class BatchSubtopologyBuilder { }); return this; } - + @Override public BoltDeclarer localFirstGrouping(final String component, final String streamId) { addDeclaration(new InputDeclaration() { @@ -303,27 +300,27 @@ public class BatchSubtopologyBuilder { public void declare(InputDeclarer declarer) { declarer.localFirstGrouping(component, streamId); } - + @Override public String getComponent() { return component; - } + } }); return this; } - + @Override public BoltDeclarer noneGrouping(final String component) { addDeclaration(new InputDeclaration() { @Override public void declare(InputDeclarer declarer) { declarer.noneGrouping(component); - } + } @Override public String getComponent() { return component; - } + } }); return this; } @@ -334,12 +331,12 @@ public class BatchSubtopologyBuilder { @Override public void declare(InputDeclarer declarer) { declarer.noneGrouping(component, streamId); - } + } @Override public String getComponent() { return component; - } + } }); return this; } @@ -350,12 +347,12 @@ public class BatchSubtopologyBuilder { @Override public void declare(InputDeclarer declarer) { declarer.allGrouping(component); - } + } @Override public String getComponent() { return component; - } + } }); return this; } @@ -366,12 +363,12 @@ public class BatchSubtopologyBuilder { @Override public void declare(InputDeclarer declarer) { declarer.allGrouping(component, streamId); - } + } @Override public String getComponent() { return component; - } + } }); return this; } @@ -382,12 +379,12 @@ public class BatchSubtopologyBuilder { @Override public void declare(InputDeclarer declarer) { declarer.directGrouping(component); - } + } @Override public String getComponent() { return component; - } + } }); return this; } @@ -398,12 +395,12 @@ public class BatchSubtopologyBuilder { @Override public void declare(InputDeclarer declarer) { declarer.directGrouping(component, streamId); - } + } @Override public String getComponent() { return component; - } + } }); return this; } @@ -417,21 +414,21 @@ public class BatchSubtopologyBuilder { public BoltDeclarer partialKeyGrouping(String componentId, String streamId, Fields fields) { return customGrouping(componentId, streamId, new PartialKeyGrouping(fields)); } - + @Override public BoltDeclarer customGrouping(final String component, final CustomStreamGrouping grouping) { addDeclaration(new InputDeclaration() { @Override public void declare(InputDeclarer declarer) { declarer.customGrouping(component, grouping); - } + } @Override public String getComponent() { return component; - } + } }); - return this; + return this; } @Override @@ -440,12 +437,12 @@ public class BatchSubtopologyBuilder { @Override public void declare(InputDeclarer declarer) { declarer.customGrouping(component, streamId, grouping); - } + } @Override public String getComponent() { return component; - } + } }); return this; } @@ -456,16 +453,16 @@ public class BatchSubtopologyBuilder { @Override public void declare(InputDeclarer declarer) { declarer.grouping(stream, grouping); - } + } @Override public String getComponent() { return stream.get_componentId(); - } + } }); return this; } - + private void addDeclaration(InputDeclaration declaration) { _component.declarations.add(declaration); }
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/coordination/CoordinatedBolt.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/coordination/CoordinatedBolt.java b/jstorm-core/src/main/java/backtype/storm/coordination/CoordinatedBolt.java index 6f337a6..39a158d 100755 --- a/jstorm-core/src/main/java/backtype/storm/coordination/CoordinatedBolt.java +++ b/jstorm-core/src/main/java/backtype/storm/coordination/CoordinatedBolt.java @@ -45,8 +45,7 @@ import org.slf4j.LoggerFactory; import static backtype.storm.utils.Utils.get; /** - * Coordination requires the request ids to be globally unique for awhile. This is so it doesn't get confused - * in the case of retries. + * Coordination requires the request ids to be globally unique for awhile. This is so it doesn't get confused in the case of retries. */ public class CoordinatedBolt implements IRichBolt { public static Logger LOG = LoggerFactory.getLogger(CoordinatedBolt.class); @@ -58,8 +57,7 @@ public class CoordinatedBolt implements IRichBolt { public static interface TimeoutCallback { void timeoutId(Object id); } - - + public static class SourceArgs implements Serializable { public boolean singleCount; @@ -74,7 +72,7 @@ public class CoordinatedBolt implements IRichBolt { public static SourceArgs all() { return new SourceArgs(false); } - + @Override public String toString() { return "<Single: " + singleCount + ">"; @@ -101,14 +99,14 @@ public class CoordinatedBolt implements IRichBolt { public void ack(Tuple tuple) { Object id = tuple.getValue(0); - synchronized(_tracked) { + synchronized (_tracked) { TrackingInfo track = _tracked.get(id); if (track != null) track.receivedTuples++; } boolean failed = checkFinishId(tuple, TupleType.REGULAR); - if(failed) { - _delegate.fail(tuple); + if (failed) { + _delegate.fail(tuple); } else { _delegate.ack(tuple); } @@ -116,7 +114,7 @@ public class CoordinatedBolt implements IRichBolt { public void fail(Tuple tuple) { Object id = tuple.getValue(0); - synchronized(_tracked) { + synchronized (_tracked) { TrackingInfo track = _tracked.get(id); if (track != null) track.failed = true; @@ -124,18 +122,17 @@ public class CoordinatedBolt implements IRichBolt { checkFinishId(tuple, TupleType.REGULAR); _delegate.fail(tuple); } - + public void reportError(Throwable error) { _delegate.reportError(error); } - private void updateTaskCounts(Object id, List<Integer> tasks) { - synchronized(_tracked) { + synchronized (_tracked) { TrackingInfo track = _tracked.get(id); if (track != null) { Map<Integer, Integer> taskEmittedTuples = track.taskEmittedTuples; - for(Integer task: tasks) { + for (Integer task : tasks) { int newCount = get(taskEmittedTuples, task, 0) + 1; taskEmittedTuples.put(task, newCount); } @@ -161,34 +158,30 @@ public class CoordinatedBolt implements IRichBolt { boolean receivedId = false; boolean finished = false; List<Tuple> ackTuples = new ArrayList<Tuple>(); - + @Override public String toString() { - return "reportCount: " + reportCount + "\n" + - "expectedTupleCount: " + expectedTupleCount + "\n" + - "receivedTuples: " + receivedTuples + "\n" + - "failed: " + failed + "\n" + - taskEmittedTuples.toString(); + return "reportCount: " + reportCount + "\n" + "expectedTupleCount: " + expectedTupleCount + "\n" + "receivedTuples: " + receivedTuples + "\n" + + "failed: " + failed + "\n" + taskEmittedTuples.toString(); } } - public static class IdStreamSpec implements Serializable { GlobalStreamId _id; - + public GlobalStreamId getGlobalStreamId() { return _id; } public static IdStreamSpec makeDetectSpec(String component, String stream) { return new IdStreamSpec(component, stream); - } - + } + protected IdStreamSpec(String component, String stream) { _id = new GlobalStreamId(component, stream); } } - + public CoordinatedBolt(IRichBolt delegate) { this(delegate, null, null); } @@ -196,37 +189,35 @@ public class CoordinatedBolt implements IRichBolt { public CoordinatedBolt(IRichBolt delegate, String sourceComponent, SourceArgs sourceArgs, IdStreamSpec idStreamSpec) { this(delegate, singleSourceArgs(sourceComponent, sourceArgs), idStreamSpec); } - + public CoordinatedBolt(IRichBolt delegate, Map<String, SourceArgs> sourceArgs, IdStreamSpec idStreamSpec) { _sourceArgs = sourceArgs; - if(_sourceArgs==null) _sourceArgs = new HashMap<String, SourceArgs>(); + if (_sourceArgs == null) + _sourceArgs = new HashMap<String, SourceArgs>(); _delegate = delegate; _idStreamSpec = idStreamSpec; } - + public void prepare(Map config, TopologyContext context, OutputCollector collector) { TimeCacheMap.ExpiredCallback<Object, TrackingInfo> callback = null; - if(_delegate instanceof TimeoutCallback) { + if (_delegate instanceof TimeoutCallback) { callback = new TimeoutItems(); } _tracked = new TimeCacheMap<Object, TrackingInfo>(context.maxTopologyMessageTimeout(), callback); _collector = collector; _delegate.prepare(config, context, new OutputCollector(new CoordinatedOutputCollector(collector))); - for(String component: Utils.get(context.getThisTargets(), - Constants.COORDINATED_STREAM_ID, - new HashMap<String, Grouping>()) - .keySet()) { - for(Integer task: context.getComponentTasks(component)) { + for (String component : Utils.get(context.getThisTargets(), Constants.COORDINATED_STREAM_ID, new HashMap<String, Grouping>()).keySet()) { + for (Integer task : context.getComponentTasks(component)) { _countOutTasks.add(task); } } - if(!_sourceArgs.isEmpty()) { + if (!_sourceArgs.isEmpty()) { _numSourceReports = 0; - for(Entry<String, SourceArgs> entry: _sourceArgs.entrySet()) { - if(entry.getValue().singleCount) { - _numSourceReports+=1; + for (Entry<String, SourceArgs> entry : _sourceArgs.entrySet()) { + if (entry.getValue().singleCount) { + _numSourceReports += 1; } else { - _numSourceReports+=context.getComponentTasks(entry.getKey()).size(); + _numSourceReports += context.getComponentTasks(entry.getKey()).size(); } } } @@ -235,57 +226,56 @@ public class CoordinatedBolt implements IRichBolt { private boolean checkFinishId(Tuple tup, TupleType type) { Object id = tup.getValue(0); boolean failed = false; - - synchronized(_tracked) { + + synchronized (_tracked) { TrackingInfo track = _tracked.get(id); try { - if(track!=null) { + if (track != null) { boolean delayed = false; - if(_idStreamSpec==null && type == TupleType.COORD || _idStreamSpec!=null && type==TupleType.ID) { + if (_idStreamSpec == null && type == TupleType.COORD || _idStreamSpec != null && type == TupleType.ID) { track.ackTuples.add(tup); delayed = true; } - if(track.failed) { + if (track.failed) { failed = true; - for(Tuple t: track.ackTuples) { + for (Tuple t : track.ackTuples) { _collector.fail(t); } _tracked.remove(id); - } else if(track.receivedId - && (_sourceArgs.isEmpty() || - track.reportCount==_numSourceReports && - track.expectedTupleCount == track.receivedTuples)){ - if(_delegate instanceof FinishedCallback) { - ((FinishedCallback)_delegate).finishedId(id); + } else if (track.receivedId + && (_sourceArgs.isEmpty() || track.reportCount == _numSourceReports && track.expectedTupleCount == track.receivedTuples)) { + if (_delegate instanceof FinishedCallback) { + ((FinishedCallback) _delegate).finishedId(id); } - if(!(_sourceArgs.isEmpty() || type!=TupleType.REGULAR)) { + if (!(_sourceArgs.isEmpty() || type != TupleType.REGULAR)) { throw new IllegalStateException("Coordination condition met on a non-coordinating tuple. Should be impossible"); } Iterator<Integer> outTasks = _countOutTasks.iterator(); - while(outTasks.hasNext()) { + while (outTasks.hasNext()) { int task = outTasks.next(); int numTuples = get(track.taskEmittedTuples, task, 0); _collector.emitDirect(task, Constants.COORDINATED_STREAM_ID, tup, new Values(id, numTuples)); } - for(Tuple t: track.ackTuples) { + for (Tuple t : track.ackTuples) { _collector.ack(t); } track.finished = true; _tracked.remove(id); } - if(!delayed && type!=TupleType.REGULAR) { - if(track.failed) { + if (!delayed && type != TupleType.REGULAR) { + if (track.failed) { _collector.fail(tup); } else { - _collector.ack(tup); + _collector.ack(tup); } } } else { - if(type!=TupleType.REGULAR) _collector.fail(tup); + if (type != TupleType.REGULAR) + _collector.fail(tup); } - } catch(FailedException e) { + } catch (FailedException e) { LOG.error("Failed to finish batch", e); - for(Tuple t: track.ackTuples) { + for (Tuple t : track.ackTuples) { _collector.fail(t); } _tracked.remove(id); @@ -299,29 +289,30 @@ public class CoordinatedBolt implements IRichBolt { Object id = tuple.getValue(0); TrackingInfo track; TupleType type = getTupleType(tuple); - synchronized(_tracked) { + synchronized (_tracked) { track = _tracked.get(id); - if(track==null) { + if (track == null) { track = new TrackingInfo(); - if(_idStreamSpec==null) track.receivedId = true; + if (_idStreamSpec == null) + track.receivedId = true; _tracked.put(id, track); } } - - if(type==TupleType.ID) { - synchronized(_tracked) { + + if (type == TupleType.ID) { + synchronized (_tracked) { track.receivedId = true; } - checkFinishId(tuple, type); - } else if(type==TupleType.COORD) { + checkFinishId(tuple, type); + } else if (type == TupleType.COORD) { int count = (Integer) tuple.getValue(1); - synchronized(_tracked) { + synchronized (_tracked) { track.reportCount++; - track.expectedTupleCount+=count; + track.expectedTupleCount += count; } checkFinishId(tuple, type); - } else { - synchronized(_tracked) { + } else { + synchronized (_tracked) { _delegate.execute(tuple); } } @@ -341,42 +332,38 @@ public class CoordinatedBolt implements IRichBolt { public Map<String, Object> getComponentConfiguration() { return _delegate.getComponentConfiguration(); } - + private static Map<String, SourceArgs> singleSourceArgs(String sourceComponent, SourceArgs sourceArgs) { Map<String, SourceArgs> ret = new HashMap<String, SourceArgs>(); ret.put(sourceComponent, sourceArgs); return ret; } - + private class TimeoutItems implements TimeCacheMap.ExpiredCallback<Object, TrackingInfo> { @Override public void expire(Object id, TrackingInfo val) { - synchronized(_tracked) { + synchronized (_tracked) { // the combination of the lock and the finished flag ensure that // an id is never timed out if it has been finished val.failed = true; - if(!val.finished) { + if (!val.finished) { ((TimeoutCallback) _delegate).timeoutId(id); } } } } - + private TupleType getTupleType(Tuple tuple) { - if(_idStreamSpec!=null - && tuple.getSourceGlobalStreamid().equals(_idStreamSpec._id)) { + if (_idStreamSpec != null && tuple.getSourceGlobalStreamid().equals(_idStreamSpec._id)) { return TupleType.ID; - } else if(!_sourceArgs.isEmpty() - && tuple.getSourceStreamId().equals(Constants.COORDINATED_STREAM_ID)) { + } else if (!_sourceArgs.isEmpty() && tuple.getSourceStreamId().equals(Constants.COORDINATED_STREAM_ID)) { return TupleType.COORD; } else { return TupleType.REGULAR; } } - + static enum TupleType { - REGULAR, - ID, - COORD + REGULAR, ID, COORD } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/coordination/IBatchBolt.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/coordination/IBatchBolt.java b/jstorm-core/src/main/java/backtype/storm/coordination/IBatchBolt.java index ee5d9bd..9a1abfa 100755 --- a/jstorm-core/src/main/java/backtype/storm/coordination/IBatchBolt.java +++ b/jstorm-core/src/main/java/backtype/storm/coordination/IBatchBolt.java @@ -25,6 +25,8 @@ import java.util.Map; public interface IBatchBolt<T> extends Serializable, IComponent { void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, T id); + void execute(Tuple tuple); + void finishBatch(); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/drpc/DRPCInvocationsClient.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/drpc/DRPCInvocationsClient.java b/jstorm-core/src/main/java/backtype/storm/drpc/DRPCInvocationsClient.java index 624db3e..d10872f 100755 --- a/jstorm-core/src/main/java/backtype/storm/drpc/DRPCInvocationsClient.java +++ b/jstorm-core/src/main/java/backtype/storm/drpc/DRPCInvocationsClient.java @@ -17,23 +17,22 @@ */ package backtype.storm.drpc; -import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; - +import backtype.storm.generated.AuthorizationException; import backtype.storm.generated.DRPCRequest; import backtype.storm.generated.DistributedRPCInvocations; -import backtype.storm.generated.AuthorizationException; import backtype.storm.security.auth.ThriftClient; import backtype.storm.security.auth.ThriftConnectionType; -import org.apache.thrift.transport.TTransportException; import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + public class DRPCInvocationsClient extends ThriftClient implements DistributedRPCInvocations.Iface { public static Logger LOG = LoggerFactory.getLogger(DRPCInvocationsClient.class); - private final AtomicReference<DistributedRPCInvocations.Client> client = - new AtomicReference<DistributedRPCInvocations.Client>(); + private final AtomicReference<DistributedRPCInvocations.Client> client = new AtomicReference<DistributedRPCInvocations.Client>(); private String host; private int port; @@ -43,14 +42,14 @@ public class DRPCInvocationsClient extends ThriftClient implements DistributedRP this.port = port; client.set(new DistributedRPCInvocations.Client(_protocol)); } - + public String getHost() { return host; } - + public int getPort() { return port; - } + } public void reconnectClient() throws TException { if (client.get() == null) { @@ -70,9 +69,9 @@ public class DRPCInvocationsClient extends ThriftClient implements DistributedRP throw new TException("Client is not connected..."); } c.result(id, result); - } catch(AuthorizationException aze) { + } catch (AuthorizationException aze) { throw aze; - } catch(TException e) { + } catch (TException e) { client.compareAndSet(c, null); throw e; } @@ -85,24 +84,24 @@ public class DRPCInvocationsClient extends ThriftClient implements DistributedRP throw new TException("Client is not connected..."); } return c.fetchRequest(func); - } catch(AuthorizationException aze) { + } catch (AuthorizationException aze) { throw aze; - } catch(TException e) { + } catch (TException e) { client.compareAndSet(c, null); throw e; } - } + } - public void failRequest(String id) throws TException, AuthorizationException { + public void failRequest(String id) throws TException { DistributedRPCInvocations.Client c = client.get(); try { if (c == null) { throw new TException("Client is not connected..."); } c.failRequest(id); - } catch(AuthorizationException aze) { + } catch (AuthorizationException aze) { throw aze; - } catch(TException e) { + } catch (TException e) { client.compareAndSet(c, null); throw e; } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/drpc/DRPCSpout.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/drpc/DRPCSpout.java b/jstorm-core/src/main/java/backtype/storm/drpc/DRPCSpout.java index 4ed24d4..c490efd 100644 --- a/jstorm-core/src/main/java/backtype/storm/drpc/DRPCSpout.java +++ b/jstorm-core/src/main/java/backtype/storm/drpc/DRPCSpout.java @@ -17,25 +17,6 @@ */ package backtype.storm.drpc; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.TimeUnit; - -import org.apache.thrift.TException; -import org.json.simple.JSONValue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.alibaba.jstorm.utils.NetWorkUtils; - import backtype.storm.Config; import backtype.storm.ILocalDRPC; import backtype.storm.generated.AuthorizationException; @@ -50,31 +31,38 @@ import backtype.storm.tuple.Values; import backtype.storm.utils.ExtendedThreadPoolExecutor; import backtype.storm.utils.ServiceRegistry; import backtype.storm.utils.Utils; +import com.alibaba.jstorm.utils.NetWorkUtils; +import org.apache.thrift.TException; +import org.json.simple.JSONValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.*; public class DRPCSpout extends BaseRichSpout { - //ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE PROBLEMS + // ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE PROBLEMS static final long serialVersionUID = 2387848310969237877L; public static Logger LOG = LoggerFactory.getLogger(DRPCSpout.class); - + SpoutOutputCollector _collector; List<DRPCInvocationsClient> _clients = new ArrayList<DRPCInvocationsClient>(); transient LinkedList<Future<Void>> _futures = null; transient ExecutorService _backround = null; String _function; String _local_drpc_id = null; - + private static class DRPCMessageId { String id; int index; - + public DRPCMessageId(String id, int index) { this.id = id; this.index = index; } } - - + public DRPCSpout(String function) { _function = function; } @@ -83,7 +71,7 @@ public class DRPCSpout extends BaseRichSpout { _function = function; _local_drpc_id = drpc.getServiceId(); } - + private class Adder implements Callable<Void> { private String server; private int port; @@ -129,16 +117,12 @@ public class DRPCSpout extends BaseRichSpout { } } } - - - + @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; - if(_local_drpc_id==null) { - _backround = new ExtendedThreadPoolExecutor(0, Integer.MAX_VALUE, - 60L, TimeUnit.SECONDS, - new SynchronousQueue<Runnable>()); + if (_local_drpc_id == null) { + _backround = new ExtendedThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); _futures = new LinkedList<Future<Void>>(); int numTasks = context.getComponentTasks(context.getThisComponentId()).size(); @@ -146,26 +130,26 @@ public class DRPCSpout extends BaseRichSpout { int port = Utils.getInt(conf.get(Config.DRPC_INVOCATIONS_PORT)); List<String> servers = NetWorkUtils.host2Ip((List<String>) conf.get(Config.DRPC_SERVERS)); - - if(servers == null || servers.isEmpty()) { - throw new RuntimeException("No DRPC servers configured for topology"); + + if (servers == null || servers.isEmpty()) { + throw new RuntimeException("No DRPC servers configured for topology"); } - + if (numTasks < servers.size()) { - for (String s: servers) { + for (String s : servers) { _futures.add(_backround.submit(new Adder(s, port, conf))); } - } else { + } else { int i = index % servers.size(); _futures.add(_backround.submit(new Adder(servers.get(i), port, conf))); } } - + } @Override public void close() { - for(DRPCInvocationsClient client: _clients) { + for (DRPCInvocationsClient client : _clients) { client.close(); } } @@ -173,12 +157,12 @@ public class DRPCSpout extends BaseRichSpout { @Override public void nextTuple() { boolean gotRequest = false; - if(_local_drpc_id==null) { + if (_local_drpc_id == null) { int size = 0; synchronized (_clients) { - size = _clients.size(); //This will only ever grow, so no need to worry about falling off the end + size = _clients.size(); // This will only ever grow, so no need to worry about falling off the end } - for(int i=0; i<size; i++) { + for (int i = 0; i < size; i++) { DRPCInvocationsClient client; synchronized (_clients) { client = _clients.get(i); @@ -188,7 +172,7 @@ public class DRPCSpout extends BaseRichSpout { } try { DRPCRequest req = client.fetchRequest(_function); - if(req.get_request_id().length() > 0) { + if (req.get_request_id().length() > 0) { Map returnInfo = new HashMap(); returnInfo.put("id", req.get_request_id()); returnInfo.put("host", client.getHost()); @@ -210,10 +194,10 @@ public class DRPCSpout extends BaseRichSpout { checkFutures(); } else { DistributedRPCInvocations.Iface drpc = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(_local_drpc_id); - if(drpc!=null) { // can happen during shutdown of drpc while topology is still up + if (drpc != null) { // can happen during shutdown of drpc while topology is still up try { DRPCRequest req = drpc.fetchRequest(_function); - if(req.get_request_id().length() > 0) { + if (req.get_request_id().length() > 0) { Map returnInfo = new HashMap(); returnInfo.put("id", req.get_request_id()); returnInfo.put("host", _local_drpc_id); @@ -228,7 +212,7 @@ public class DRPCSpout extends BaseRichSpout { } } } - if(!gotRequest) { + if (!gotRequest) { Utils.sleep(1); } } @@ -241,8 +225,8 @@ public class DRPCSpout extends BaseRichSpout { public void fail(Object msgId) { DRPCMessageId did = (DRPCMessageId) msgId; DistributedRPCInvocations.Iface client; - - if(_local_drpc_id == null) { + + if (_local_drpc_id == null) { client = _clients.get(did.index); } else { client = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(_local_drpc_id); @@ -259,5 +243,5 @@ public class DRPCSpout extends BaseRichSpout { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("args", "return-info")); - } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/drpc/JoinResult.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/drpc/JoinResult.java b/jstorm-core/src/main/java/backtype/storm/drpc/JoinResult.java index b74b97e..e9195e7 100755 --- a/jstorm-core/src/main/java/backtype/storm/drpc/JoinResult.java +++ b/jstorm-core/src/main/java/backtype/storm/drpc/JoinResult.java @@ -31,7 +31,6 @@ import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class JoinResult extends BaseRichBolt { public static Logger LOG = LoggerFactory.getLogger(JoinResult.class); @@ -43,27 +42,27 @@ public class JoinResult extends BaseRichBolt { public JoinResult(String returnComponent) { this.returnComponent = returnComponent; } - + public void prepare(Map map, TopologyContext context, OutputCollector collector) { _collector = collector; } public void execute(Tuple tuple) { Object requestId = tuple.getValue(0); - if(tuple.getSourceComponent().equals(returnComponent)) { + if (tuple.getSourceComponent().equals(returnComponent)) { returns.put(requestId, tuple); } else { results.put(requestId, tuple); } - if(returns.containsKey(requestId) && results.containsKey(requestId)) { + if (returns.containsKey(requestId) && results.containsKey(requestId)) { Tuple result = results.remove(requestId); Tuple returner = returns.remove(requestId); LOG.debug(result.getValue(1).toString()); List<Tuple> anchors = new ArrayList<Tuple>(); anchors.add(result); - anchors.add(returner); - _collector.emit(anchors, new Values(""+result.getValue(1), returner.getValue(1))); + anchors.add(returner); + _collector.emit(anchors, new Values("" + result.getValue(1), returner.getValue(1))); _collector.ack(result); _collector.ack(returner); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/drpc/KeyedFairBolt.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/drpc/KeyedFairBolt.java b/jstorm-core/src/main/java/backtype/storm/drpc/KeyedFairBolt.java index 113163d..2294c54 100755 --- a/jstorm-core/src/main/java/backtype/storm/drpc/KeyedFairBolt.java +++ b/jstorm-core/src/main/java/backtype/storm/drpc/KeyedFairBolt.java @@ -29,7 +29,6 @@ import backtype.storm.utils.KeyedRoundRobinQueue; import java.util.HashMap; import java.util.Map; - public class KeyedFairBolt implements IRichBolt, FinishedCallback { IRichBolt _delegate; KeyedRoundRobinQueue<Tuple> _rrQueue; @@ -39,14 +38,13 @@ public class KeyedFairBolt implements IRichBolt, FinishedCallback { public KeyedFairBolt(IRichBolt delegate) { _delegate = delegate; } - + public KeyedFairBolt(IBasicBolt delegate) { this(new BasicBoltExecutor(delegate)); } - - + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { - if(_delegate instanceof FinishedCallback) { + if (_delegate instanceof FinishedCallback) { _callback = (FinishedCallback) _delegate; } _delegate.prepare(stormConf, context, collector); @@ -54,7 +52,7 @@ public class KeyedFairBolt implements IRichBolt, FinishedCallback { _executor = new Thread(new Runnable() { public void run() { try { - while(true) { + while (true) { _delegate.execute(_rrQueue.take()); } } catch (InterruptedException e) { @@ -81,7 +79,7 @@ public class KeyedFairBolt implements IRichBolt, FinishedCallback { } public void finishedId(Object id) { - if(_callback!=null) { + if (_callback != null) { _callback.finishedId(id); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/drpc/LinearDRPCInputDeclarer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/drpc/LinearDRPCInputDeclarer.java b/jstorm-core/src/main/java/backtype/storm/drpc/LinearDRPCInputDeclarer.java index d03075e..ddcac35 100755 --- a/jstorm-core/src/main/java/backtype/storm/drpc/LinearDRPCInputDeclarer.java +++ b/jstorm-core/src/main/java/backtype/storm/drpc/LinearDRPCInputDeclarer.java @@ -23,30 +23,39 @@ import backtype.storm.tuple.Fields; public interface LinearDRPCInputDeclarer extends ComponentConfigurationDeclarer<LinearDRPCInputDeclarer> { public LinearDRPCInputDeclarer fieldsGrouping(Fields fields); + public LinearDRPCInputDeclarer fieldsGrouping(String streamId, Fields fields); public LinearDRPCInputDeclarer globalGrouping(); + public LinearDRPCInputDeclarer globalGrouping(String streamId); public LinearDRPCInputDeclarer shuffleGrouping(); + public LinearDRPCInputDeclarer shuffleGrouping(String streamId); public LinearDRPCInputDeclarer localOrShuffleGrouping(); + public LinearDRPCInputDeclarer localOrShuffleGrouping(String streamId); - + public LinearDRPCInputDeclarer noneGrouping(); + public LinearDRPCInputDeclarer noneGrouping(String streamId); public LinearDRPCInputDeclarer allGrouping(); + public LinearDRPCInputDeclarer allGrouping(String streamId); public LinearDRPCInputDeclarer directGrouping(); + public LinearDRPCInputDeclarer directGrouping(String streamId); public LinearDRPCInputDeclarer partialKeyGrouping(Fields fields); + public LinearDRPCInputDeclarer partialKeyGrouping(String streamId, Fields fields); public LinearDRPCInputDeclarer customGrouping(CustomStreamGrouping grouping); + public LinearDRPCInputDeclarer customGrouping(String streamId, CustomStreamGrouping grouping); - + } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/drpc/LinearDRPCTopologyBuilder.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/drpc/LinearDRPCTopologyBuilder.java b/jstorm-core/src/main/java/backtype/storm/drpc/LinearDRPCTopologyBuilder.java index ebbbc6d..e8c202e 100755 --- a/jstorm-core/src/main/java/backtype/storm/drpc/LinearDRPCTopologyBuilder.java +++ b/jstorm-core/src/main/java/backtype/storm/drpc/LinearDRPCTopologyBuilder.java @@ -43,39 +43,38 @@ import java.util.HashMap; import java.util.List; import java.util.Map; - // Trident subsumes the functionality provided by this class, so it's deprecated @Deprecated -public class LinearDRPCTopologyBuilder { +public class LinearDRPCTopologyBuilder { String _function; List<Component> _components = new ArrayList<Component>(); - - + public LinearDRPCTopologyBuilder(String function) { _function = function; } - + public LinearDRPCInputDeclarer addBolt(IBatchBolt bolt, Number parallelism) { return addBolt(new BatchBoltExecutor(bolt), parallelism); } - + public LinearDRPCInputDeclarer addBolt(IBatchBolt bolt) { return addBolt(bolt, 1); } - + @Deprecated public LinearDRPCInputDeclarer addBolt(IRichBolt bolt, Number parallelism) { - if(parallelism==null) parallelism = 1; + if (parallelism == null) + parallelism = 1; Component component = new Component(bolt, parallelism.intValue()); _components.add(component); return new InputDeclarerImpl(component); } - + @Deprecated public LinearDRPCInputDeclarer addBolt(IRichBolt bolt) { return addBolt(bolt, null); } - + public LinearDRPCInputDeclarer addBolt(IBasicBolt bolt, Number parallelism) { return addBolt(new BasicBoltExecutor(bolt), parallelism); } @@ -83,125 +82,119 @@ public class LinearDRPCTopologyBuilder { public LinearDRPCInputDeclarer addBolt(IBasicBolt bolt) { return addBolt(bolt, null); } - + public StormTopology createLocalTopology(ILocalDRPC drpc) { return createTopology(new DRPCSpout(_function, drpc)); } - + public StormTopology createRemoteTopology() { return createTopology(new DRPCSpout(_function)); } - - + private StormTopology createTopology(DRPCSpout spout) { final String SPOUT_ID = "spout"; final String PREPARE_ID = "prepare-request"; - + TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(SPOUT_ID, spout); - builder.setBolt(PREPARE_ID, new PrepareRequest()) - .noneGrouping(SPOUT_ID); - int i=0; - for(; i<_components.size();i++) { + builder.setBolt(PREPARE_ID, new PrepareRequest()).noneGrouping(SPOUT_ID); + int i = 0; + for (; i < _components.size(); i++) { Component component = _components.get(i); - + Map<String, SourceArgs> source = new HashMap<String, SourceArgs>(); - if (i==1) { - source.put(boltId(i-1), SourceArgs.single()); - } else if (i>=2) { - source.put(boltId(i-1), SourceArgs.all()); + if (i == 1) { + source.put(boltId(i - 1), SourceArgs.single()); + } else if (i >= 2) { + source.put(boltId(i - 1), SourceArgs.all()); } IdStreamSpec idSpec = null; - if(i==_components.size()-1 && component.bolt instanceof FinishedCallback) { + if (i == _components.size() - 1 && component.bolt instanceof FinishedCallback) { idSpec = IdStreamSpec.makeDetectSpec(PREPARE_ID, PrepareRequest.ID_STREAM); } - BoltDeclarer declarer = builder.setBolt( - boltId(i), - new CoordinatedBolt(component.bolt, source, idSpec), - component.parallelism); - - for(Map conf: component.componentConfs) { + BoltDeclarer declarer = builder.setBolt(boltId(i), new CoordinatedBolt(component.bolt, source, idSpec), component.parallelism); + + for (Map conf : component.componentConfs) { declarer.addConfigurations(conf); } - - if(idSpec!=null) { + + if (idSpec != null) { declarer.fieldsGrouping(idSpec.getGlobalStreamId().get_componentId(), PrepareRequest.ID_STREAM, new Fields("request")); } - if(i==0 && component.declarations.isEmpty()) { + if (i == 0 && component.declarations.isEmpty()) { declarer.noneGrouping(PREPARE_ID, PrepareRequest.ARGS_STREAM); } else { String prevId; - if(i==0) { + if (i == 0) { prevId = PREPARE_ID; } else { - prevId = boltId(i-1); + prevId = boltId(i - 1); } - for(InputDeclaration declaration: component.declarations) { + for (InputDeclaration declaration : component.declarations) { declaration.declare(prevId, declarer); } } - if(i>0) { - declarer.directGrouping(boltId(i-1), Constants.COORDINATED_STREAM_ID); + if (i > 0) { + declarer.directGrouping(boltId(i - 1), Constants.COORDINATED_STREAM_ID); } } - - IRichBolt lastBolt = _components.get(_components.size()-1).bolt; + + IRichBolt lastBolt = _components.get(_components.size() - 1).bolt; OutputFieldsGetter getter = new OutputFieldsGetter(); lastBolt.declareOutputFields(getter); Map<String, StreamInfo> streams = getter.getFieldsDeclaration(); - if(streams.size()!=1) { + if (streams.size() != 1) { throw new RuntimeException("Must declare exactly one stream from last bolt in LinearDRPCTopology"); } String outputStream = streams.keySet().iterator().next(); List<String> fields = streams.get(outputStream).get_output_fields(); - if(fields.size()!=2) { - throw new RuntimeException("Output stream of last component in LinearDRPCTopology must contain exactly two fields. The first should be the request id, and the second should be the result."); + if (fields.size() != 2) { + throw new RuntimeException( + "Output stream of last component in LinearDRPCTopology must contain exactly two fields. The first should be the request id, and the second should be the result."); } - builder.setBolt("JoinResult", new JoinResult(PREPARE_ID)) - .fieldsGrouping(boltId(i-1), outputStream, new Fields(fields.get(0))) + builder.setBolt("JoinResult", new JoinResult(PREPARE_ID)).fieldsGrouping(boltId(i - 1), outputStream, new Fields(fields.get(0))) .fieldsGrouping(PREPARE_ID, PrepareRequest.RETURN_STREAM, new Fields("request")); i++; - builder.setBolt("ReturnResults", new ReturnResults()) - .noneGrouping("JoinResult"); + builder.setBolt("ReturnResults", new ReturnResults()).noneGrouping("JoinResult"); return builder.createTopology(); } - + private static String boltId(int index) { return "bolt" + index; } - + private static class Component { public IRichBolt bolt; public int parallelism; public List<Map> componentConfs; public List<InputDeclaration> declarations = new ArrayList<InputDeclaration>(); - + public Component(IRichBolt bolt, int parallelism) { this.bolt = bolt; this.parallelism = parallelism; this.componentConfs = new ArrayList(); } } - + private static interface InputDeclaration { public void declare(String prevComponent, InputDeclarer declarer); } - + private class InputDeclarerImpl extends BaseConfigurationDeclarer<LinearDRPCInputDeclarer> implements LinearDRPCInputDeclarer { Component _component; - + public InputDeclarerImpl(Component component) { _component = component; } - + @Override public LinearDRPCInputDeclarer fieldsGrouping(final Fields fields) { addDeclaration(new InputDeclaration() { @Override public void declare(String prevComponent, InputDeclarer declarer) { declarer.fieldsGrouping(prevComponent, fields); - } + } }); return this; } @@ -212,7 +205,7 @@ public class LinearDRPCTopologyBuilder { @Override public void declare(String prevComponent, InputDeclarer declarer) { declarer.fieldsGrouping(prevComponent, streamId, fields); - } + } }); return this; } @@ -223,7 +216,7 @@ public class LinearDRPCTopologyBuilder { @Override public void declare(String prevComponent, InputDeclarer declarer) { declarer.globalGrouping(prevComponent); - } + } }); return this; } @@ -234,7 +227,7 @@ public class LinearDRPCTopologyBuilder { @Override public void declare(String prevComponent, InputDeclarer declarer) { declarer.globalGrouping(prevComponent, streamId); - } + } }); return this; } @@ -245,7 +238,7 @@ public class LinearDRPCTopologyBuilder { @Override public void declare(String prevComponent, InputDeclarer declarer) { declarer.shuffleGrouping(prevComponent); - } + } }); return this; } @@ -256,7 +249,7 @@ public class LinearDRPCTopologyBuilder { @Override public void declare(String prevComponent, InputDeclarer declarer) { declarer.shuffleGrouping(prevComponent, streamId); - } + } }); return this; } @@ -267,7 +260,7 @@ public class LinearDRPCTopologyBuilder { @Override public void declare(String prevComponent, InputDeclarer declarer) { declarer.localOrShuffleGrouping(prevComponent); - } + } }); return this; } @@ -278,18 +271,18 @@ public class LinearDRPCTopologyBuilder { @Override public void declare(String prevComponent, InputDeclarer declarer) { declarer.localOrShuffleGrouping(prevComponent, streamId); - } + } }); return this; } - + @Override public LinearDRPCInputDeclarer noneGrouping() { addDeclaration(new InputDeclaration() { @Override public void declare(String prevComponent, InputDeclarer declarer) { declarer.noneGrouping(prevComponent); - } + } }); return this; } @@ -300,7 +293,7 @@ public class LinearDRPCTopologyBuilder { @Override public void declare(String prevComponent, InputDeclarer declarer) { declarer.noneGrouping(prevComponent, streamId); - } + } }); return this; } @@ -311,7 +304,7 @@ public class LinearDRPCTopologyBuilder { @Override public void declare(String prevComponent, InputDeclarer declarer) { declarer.allGrouping(prevComponent); - } + } }); return this; } @@ -322,7 +315,7 @@ public class LinearDRPCTopologyBuilder { @Override public void declare(String prevComponent, InputDeclarer declarer) { declarer.allGrouping(prevComponent, streamId); - } + } }); return this; } @@ -333,7 +326,7 @@ public class LinearDRPCTopologyBuilder { @Override public void declare(String prevComponent, InputDeclarer declarer) { declarer.directGrouping(prevComponent); - } + } }); return this; } @@ -344,7 +337,7 @@ public class LinearDRPCTopologyBuilder { @Override public void declare(String prevComponent, InputDeclarer declarer) { declarer.directGrouping(prevComponent, streamId); - } + } }); return this; } @@ -365,7 +358,7 @@ public class LinearDRPCTopologyBuilder { @Override public void declare(String prevComponent, InputDeclarer declarer) { declarer.customGrouping(prevComponent, grouping); - } + } }); return this; } @@ -376,11 +369,11 @@ public class LinearDRPCTopologyBuilder { @Override public void declare(String prevComponent, InputDeclarer declarer) { declarer.customGrouping(prevComponent, streamId, grouping); - } + } }); return this; } - + private void addDeclaration(InputDeclaration declaration) { _component.declarations.add(declaration); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/drpc/PrepareRequest.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/drpc/PrepareRequest.java b/jstorm-core/src/main/java/backtype/storm/drpc/PrepareRequest.java index bd32169..fea8b36 100755 --- a/jstorm-core/src/main/java/backtype/storm/drpc/PrepareRequest.java +++ b/jstorm-core/src/main/java/backtype/storm/drpc/PrepareRequest.java @@ -28,7 +28,6 @@ import java.util.Map; import java.util.Random; import backtype.storm.utils.Utils; - public class PrepareRequest extends BaseBasicBolt { public static final String ARGS_STREAM = Utils.DEFAULT_STREAM_ID; public static final String RETURN_STREAM = "ret"; http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/drpc/ReturnResults.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/drpc/ReturnResults.java b/jstorm-core/src/main/java/backtype/storm/drpc/ReturnResults.java index 2ca517e..129e2b3 100644 --- a/jstorm-core/src/main/java/backtype/storm/drpc/ReturnResults.java +++ b/jstorm-core/src/main/java/backtype/storm/drpc/ReturnResults.java @@ -37,15 +37,14 @@ import org.apache.thrift.TException; import org.apache.thrift.transport.TTransportException; import org.json.simple.JSONValue; - public class ReturnResults extends BaseRichBolt { - //ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE PROBLEMS + // ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE PROBLEMS static final long serialVersionUID = -774882142710631591L; public static final Logger LOG = LoggerFactory.getLogger(ReturnResults.class); OutputCollector _collector; boolean local; - Map _conf; + Map _conf; Map<List, DRPCInvocationsClient> _clients = new HashMap<List, DRPCInvocationsClient>(); @Override @@ -59,22 +58,24 @@ public class ReturnResults extends BaseRichBolt { public void execute(Tuple input) { String result = (String) input.getValue(0); String returnInfo = (String) input.getValue(1); - //LOG.info("Receive one message, resultInfo:{}, result:{}", returnInfo, result); - if(returnInfo!=null) { + // LOG.info("Receive one message, resultInfo:{}, result:{}", returnInfo, result); + if (returnInfo != null) { Map retMap = (Map) JSONValue.parse(returnInfo); final String host = (String) retMap.get("host"); final int port = Utils.getInt(retMap.get("port")); String id = (String) retMap.get("id"); DistributedRPCInvocations.Iface client; - if(local) { + if (local) { client = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(host); } else { - List server = new ArrayList() {{ - add(host); - add(port); - }}; - - if(!_clients.containsKey(server)) { + List server = new ArrayList() { + { + add(host); + add(port); + } + }; + + if (!_clients.containsKey(server)) { try { _clients.put(server, new DRPCInvocationsClient(_conf, host, port)); } catch (TTransportException ex) { @@ -83,7 +84,7 @@ public class ReturnResults extends BaseRichBolt { } client = _clients.get(server); } - + try { client.result(id, result); _collector.ack(input); @@ -93,29 +94,29 @@ public class ReturnResults extends BaseRichBolt { if (client instanceof DRPCInvocationsClient) { try { LOG.info("reconnecting... "); - ((DRPCInvocationsClient)client).reconnectClient(); //Blocking call + ((DRPCInvocationsClient) client).reconnectClient(); // Blocking call } catch (TException e2) { throw new RuntimeException(e2); } } - } catch(TException e) { + } catch (TException e) { LOG.error("Failed to return results to DRPC server", e); _collector.fail(input); if (client instanceof DRPCInvocationsClient) { try { LOG.info("reconnecting... "); - ((DRPCInvocationsClient)client).reconnectClient(); //Blocking call + ((DRPCInvocationsClient) client).reconnectClient(); // Blocking call } catch (TException e2) { throw new RuntimeException(e2); } } } } - } + } @Override public void cleanup() { - for(DRPCInvocationsClient c: _clients.values()) { + for (DRPCInvocationsClient c : _clients.values()) { c.close(); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/generated/AlreadyAliveException.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/generated/AlreadyAliveException.java b/jstorm-core/src/main/java/backtype/storm/generated/AlreadyAliveException.java index 06eadaf..533b112 100644 --- a/jstorm-core/src/main/java/backtype/storm/generated/AlreadyAliveException.java +++ b/jstorm-core/src/main/java/backtype/storm/generated/AlreadyAliveException.java @@ -34,7 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-7-27") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-20") public class AlreadyAliveException extends TException implements org.apache.thrift.TBase<AlreadyAliveException, AlreadyAliveException._Fields>, java.io.Serializable, Cloneable, Comparable<AlreadyAliveException> { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("AlreadyAliveException"); @@ -264,11 +264,11 @@ public class AlreadyAliveException extends TException implements org.apache.thri return _Fields.findByThriftId(fieldId); } - public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + public void read(org.apache.thrift.protocol.TProtocol iprot) throws TException { schemes.get(iprot.getScheme()).getScheme().read(iprot, this); } - public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + public void write(org.apache.thrift.protocol.TProtocol oprot) throws TException { schemes.get(oprot.getScheme()).getScheme().write(oprot, this); } @@ -288,10 +288,10 @@ public class AlreadyAliveException extends TException implements org.apache.thri return sb.toString(); } - public void validate() throws org.apache.thrift.TException { + public void validate() throws TException { // check for required fields if (!is_set_msg()) { - throw new org.apache.thrift.protocol.TProtocolException("Required field 'msg' is unset! Struct:" + toString()); + throw new TProtocolException("Required field 'msg' is unset! Struct:" + toString()); } // check for sub-struct validity @@ -300,7 +300,7 @@ public class AlreadyAliveException extends TException implements org.apache.thri private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { try { write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); - } catch (org.apache.thrift.TException te) { + } catch (TException te) { throw new java.io.IOException(te); } } @@ -308,7 +308,7 @@ public class AlreadyAliveException extends TException implements org.apache.thri private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { try { read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); - } catch (org.apache.thrift.TException te) { + } catch (TException te) { throw new java.io.IOException(te); } } @@ -321,7 +321,7 @@ public class AlreadyAliveException extends TException implements org.apache.thri private static class AlreadyAliveExceptionStandardScheme extends StandardScheme<AlreadyAliveException> { - public void read(org.apache.thrift.protocol.TProtocol iprot, AlreadyAliveException struct) throws org.apache.thrift.TException { + public void read(org.apache.thrift.protocol.TProtocol iprot, AlreadyAliveException struct) throws TException { org.apache.thrift.protocol.TField schemeField; iprot.readStructBegin(); while (true) @@ -348,7 +348,7 @@ public class AlreadyAliveException extends TException implements org.apache.thri struct.validate(); } - public void write(org.apache.thrift.protocol.TProtocol oprot, AlreadyAliveException struct) throws org.apache.thrift.TException { + public void write(org.apache.thrift.protocol.TProtocol oprot, AlreadyAliveException struct) throws TException { struct.validate(); oprot.writeStructBegin(STRUCT_DESC); @@ -372,13 +372,13 @@ public class AlreadyAliveException extends TException implements org.apache.thri private static class AlreadyAliveExceptionTupleScheme extends TupleScheme<AlreadyAliveException> { @Override - public void write(org.apache.thrift.protocol.TProtocol prot, AlreadyAliveException struct) throws org.apache.thrift.TException { + public void write(org.apache.thrift.protocol.TProtocol prot, AlreadyAliveException struct) throws TException { TTupleProtocol oprot = (TTupleProtocol) prot; oprot.writeString(struct.msg); } @Override - public void read(org.apache.thrift.protocol.TProtocol prot, AlreadyAliveException struct) throws org.apache.thrift.TException { + public void read(org.apache.thrift.protocol.TProtocol prot, AlreadyAliveException struct) throws TException { TTupleProtocol iprot = (TTupleProtocol) prot; struct.msg = iprot.readString(); struct.set_msg_isSet(true); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/generated/AuthorizationException.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/generated/AuthorizationException.java b/jstorm-core/src/main/java/backtype/storm/generated/AuthorizationException.java index 02f72f0..0822f50 100644 --- a/jstorm-core/src/main/java/backtype/storm/generated/AuthorizationException.java +++ b/jstorm-core/src/main/java/backtype/storm/generated/AuthorizationException.java @@ -34,7 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-7-27") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-20") public class AuthorizationException extends TException implements org.apache.thrift.TBase<AuthorizationException, AuthorizationException._Fields>, java.io.Serializable, Cloneable, Comparable<AuthorizationException> { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("AuthorizationException"); @@ -264,11 +264,11 @@ public class AuthorizationException extends TException implements org.apache.thr return _Fields.findByThriftId(fieldId); } - public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + public void read(org.apache.thrift.protocol.TProtocol iprot) throws TException { schemes.get(iprot.getScheme()).getScheme().read(iprot, this); } - public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + public void write(org.apache.thrift.protocol.TProtocol oprot) throws TException { schemes.get(oprot.getScheme()).getScheme().write(oprot, this); } @@ -288,10 +288,10 @@ public class AuthorizationException extends TException implements org.apache.thr return sb.toString(); } - public void validate() throws org.apache.thrift.TException { + public void validate() throws TException { // check for required fields if (!is_set_msg()) { - throw new org.apache.thrift.protocol.TProtocolException("Required field 'msg' is unset! Struct:" + toString()); + throw new TProtocolException("Required field 'msg' is unset! Struct:" + toString()); } // check for sub-struct validity @@ -300,7 +300,7 @@ public class AuthorizationException extends TException implements org.apache.thr private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { try { write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); - } catch (org.apache.thrift.TException te) { + } catch (TException te) { throw new java.io.IOException(te); } } @@ -308,7 +308,7 @@ public class AuthorizationException extends TException implements org.apache.thr private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { try { read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); - } catch (org.apache.thrift.TException te) { + } catch (TException te) { throw new java.io.IOException(te); } } @@ -321,7 +321,7 @@ public class AuthorizationException extends TException implements org.apache.thr private static class AuthorizationExceptionStandardScheme extends StandardScheme<AuthorizationException> { - public void read(org.apache.thrift.protocol.TProtocol iprot, AuthorizationException struct) throws org.apache.thrift.TException { + public void read(org.apache.thrift.protocol.TProtocol iprot, AuthorizationException struct) throws TException { org.apache.thrift.protocol.TField schemeField; iprot.readStructBegin(); while (true) @@ -348,7 +348,7 @@ public class AuthorizationException extends TException implements org.apache.thr struct.validate(); } - public void write(org.apache.thrift.protocol.TProtocol oprot, AuthorizationException struct) throws org.apache.thrift.TException { + public void write(org.apache.thrift.protocol.TProtocol oprot, AuthorizationException struct) throws TException { struct.validate(); oprot.writeStructBegin(STRUCT_DESC); @@ -372,13 +372,13 @@ public class AuthorizationException extends TException implements org.apache.thr private static class AuthorizationExceptionTupleScheme extends TupleScheme<AuthorizationException> { @Override - public void write(org.apache.thrift.protocol.TProtocol prot, AuthorizationException struct) throws org.apache.thrift.TException { + public void write(org.apache.thrift.protocol.TProtocol prot, AuthorizationException struct) throws TException { TTupleProtocol oprot = (TTupleProtocol) prot; oprot.writeString(struct.msg); } @Override - public void read(org.apache.thrift.protocol.TProtocol prot, AuthorizationException struct) throws org.apache.thrift.TException { + public void read(org.apache.thrift.protocol.TProtocol prot, AuthorizationException struct) throws TException { TTupleProtocol iprot = (TTupleProtocol) prot; struct.msg = iprot.readString(); struct.set_msg_isSet(true); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/generated/Bolt.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/generated/Bolt.java b/jstorm-core/src/main/java/backtype/storm/generated/Bolt.java index e3d0a07..9241322 100644 --- a/jstorm-core/src/main/java/backtype/storm/generated/Bolt.java +++ b/jstorm-core/src/main/java/backtype/storm/generated/Bolt.java @@ -34,7 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-7-27") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-20") public class Bolt implements org.apache.thrift.TBase<Bolt, Bolt._Fields>, java.io.Serializable, Cloneable, Comparable<Bolt> { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("Bolt"); @@ -337,11 +337,11 @@ public class Bolt implements org.apache.thrift.TBase<Bolt, Bolt._Fields>, java.i return _Fields.findByThriftId(fieldId); } - public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + public void read(org.apache.thrift.protocol.TProtocol iprot) throws TException { schemes.get(iprot.getScheme()).getScheme().read(iprot, this); } - public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + public void write(org.apache.thrift.protocol.TProtocol oprot) throws TException { schemes.get(oprot.getScheme()).getScheme().write(oprot, this); } @@ -369,14 +369,14 @@ public class Bolt implements org.apache.thrift.TBase<Bolt, Bolt._Fields>, java.i return sb.toString(); } - public void validate() throws org.apache.thrift.TException { + public void validate() throws TException { // check for required fields if (!is_set_bolt_object()) { - throw new org.apache.thrift.protocol.TProtocolException("Required field 'bolt_object' is unset! Struct:" + toString()); + throw new TProtocolException("Required field 'bolt_object' is unset! Struct:" + toString()); } if (!is_set_common()) { - throw new org.apache.thrift.protocol.TProtocolException("Required field 'common' is unset! Struct:" + toString()); + throw new TProtocolException("Required field 'common' is unset! Struct:" + toString()); } // check for sub-struct validity @@ -388,7 +388,7 @@ public class Bolt implements org.apache.thrift.TBase<Bolt, Bolt._Fields>, java.i private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { try { write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); - } catch (org.apache.thrift.TException te) { + } catch (TException te) { throw new java.io.IOException(te); } } @@ -396,7 +396,7 @@ public class Bolt implements org.apache.thrift.TBase<Bolt, Bolt._Fields>, java.i private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { try { read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); - } catch (org.apache.thrift.TException te) { + } catch (TException te) { throw new java.io.IOException(te); } } @@ -409,7 +409,7 @@ public class Bolt implements org.apache.thrift.TBase<Bolt, Bolt._Fields>, java.i private static class BoltStandardScheme extends StandardScheme<Bolt> { - public void read(org.apache.thrift.protocol.TProtocol iprot, Bolt struct) throws org.apache.thrift.TException { + public void read(org.apache.thrift.protocol.TProtocol iprot, Bolt struct) throws TException { org.apache.thrift.protocol.TField schemeField; iprot.readStructBegin(); while (true) @@ -446,7 +446,7 @@ public class Bolt implements org.apache.thrift.TBase<Bolt, Bolt._Fields>, java.i struct.validate(); } - public void write(org.apache.thrift.protocol.TProtocol oprot, Bolt struct) throws org.apache.thrift.TException { + public void write(org.apache.thrift.protocol.TProtocol oprot, Bolt struct) throws TException { struct.validate(); oprot.writeStructBegin(STRUCT_DESC); @@ -475,14 +475,14 @@ public class Bolt implements org.apache.thrift.TBase<Bolt, Bolt._Fields>, java.i private static class BoltTupleScheme extends TupleScheme<Bolt> { @Override - public void write(org.apache.thrift.protocol.TProtocol prot, Bolt struct) throws org.apache.thrift.TException { + public void write(org.apache.thrift.protocol.TProtocol prot, Bolt struct) throws TException { TTupleProtocol oprot = (TTupleProtocol) prot; struct.bolt_object.write(oprot); struct.common.write(oprot); } @Override - public void read(org.apache.thrift.protocol.TProtocol prot, Bolt struct) throws org.apache.thrift.TException { + public void read(org.apache.thrift.protocol.TProtocol prot, Bolt struct) throws TException { TTupleProtocol iprot = (TTupleProtocol) prot; struct.bolt_object = new ComponentObject(); struct.bolt_object.read(iprot);
