http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalSpoutCoordinator.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalSpoutCoordinator.java b/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalSpoutCoordinator.java index f7ce534..3768cb1 100755 --- a/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalSpoutCoordinator.java +++ b/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalSpoutCoordinator.java @@ -35,40 +35,38 @@ import java.util.Random; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class TransactionalSpoutCoordinator extends BaseRichSpout { +public class TransactionalSpoutCoordinator extends BaseRichSpout { public static final Logger LOG = LoggerFactory.getLogger(TransactionalSpoutCoordinator.class); - + public static final BigInteger INIT_TXID = BigInteger.ONE; - - + public static final String TRANSACTION_BATCH_STREAM_ID = TransactionalSpoutCoordinator.class.getName() + "/batch"; public static final String TRANSACTION_COMMIT_STREAM_ID = TransactionalSpoutCoordinator.class.getName() + "/commit"; private static final String CURRENT_TX = "currtx"; private static final String META_DIR = "meta"; - + private ITransactionalSpout _spout; private ITransactionalSpout.Coordinator _coordinator; private TransactionalState _state; private RotatingTransactionalState _coordinatorState; - + TreeMap<BigInteger, TransactionStatus> _activeTx = new TreeMap<BigInteger, TransactionStatus>(); - + private SpoutOutputCollector _collector; private Random _rand; BigInteger _currTransaction; int _maxTransactionActive; StateInitializer _initializer; - - + public TransactionalSpoutCoordinator(ITransactionalSpout spout) { _spout = spout; } - + public ITransactionalSpout getSpout() { return _spout; } - + @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _rand = new Random(Utils.secureRandomLong()); @@ -78,7 +76,7 @@ public class TransactionalSpoutCoordinator extends BaseRichSpout { _coordinator = _spout.getCoordinator(conf, context); _currTransaction = getStoredCurrTransaction(_state); Object active = conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING); - if(active==null) { + if (active == null) { _maxTransactionActive = 1; } else { _maxTransactionActive = Utils.getInt(active); @@ -100,10 +98,10 @@ public class TransactionalSpoutCoordinator extends BaseRichSpout { public void ack(Object msgId) { TransactionAttempt tx = (TransactionAttempt) msgId; TransactionStatus status = _activeTx.get(tx.getTransactionId()); - if(status!=null && tx.equals(status.attempt)) { - if(status.status==AttemptStatus.PROCESSING) { + if (status != null && tx.equals(status.attempt)) { + if (status.status == AttemptStatus.PROCESSING) { status.status = AttemptStatus.PROCESSED; - } else if(status.status==AttemptStatus.COMMITTING) { + } else if (status.status == AttemptStatus.COMMITTING) { _activeTx.remove(tx.getTransactionId()); _coordinatorState.cleanupBefore(tx.getTransactionId()); _currTransaction = nextTransactionId(tx.getTransactionId()); @@ -117,12 +115,12 @@ public class TransactionalSpoutCoordinator extends BaseRichSpout { public void fail(Object msgId) { TransactionAttempt tx = (TransactionAttempt) msgId; TransactionStatus stored = _activeTx.remove(tx.getTransactionId()); - if(stored!=null && tx.equals(stored.attempt)) { + if (stored != null && tx.equals(stored.attempt)) { _activeTx.tailMap(tx.getTransactionId()).clear(); sync(); } } - + @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // in partitioned example, in case an emitter task receives a later transaction than it's emitted so far, @@ -130,24 +128,23 @@ public class TransactionalSpoutCoordinator extends BaseRichSpout { declarer.declareStream(TRANSACTION_BATCH_STREAM_ID, new Fields("tx", "tx-meta", "committed-txid")); declarer.declareStream(TRANSACTION_COMMIT_STREAM_ID, new Fields("tx")); } - + private void sync() { // note that sometimes the tuples active may be less than max_spout_pending, e.g. // max_spout_pending = 3 // tx 1, 2, 3 active, tx 2 is acked. there won't be a commit for tx 2 (because tx 1 isn't committed yet), // and there won't be a batch for tx 4 because there's max_spout_pending tx active TransactionStatus maybeCommit = _activeTx.get(_currTransaction); - if(maybeCommit!=null && maybeCommit.status == AttemptStatus.PROCESSED) { + if (maybeCommit != null && maybeCommit.status == AttemptStatus.PROCESSED) { maybeCommit.status = AttemptStatus.COMMITTING; _collector.emit(TRANSACTION_COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt); } - + try { - if(_activeTx.size() < _maxTransactionActive) { + if (_activeTx.size() < _maxTransactionActive) { BigInteger curr = _currTransaction; - for(int i=0; i<_maxTransactionActive; i++) { - if((_coordinatorState.hasCache(curr) || _coordinator.isReady()) - && !_activeTx.containsKey(curr)) { + for (int i = 0; i < _maxTransactionActive; i++) { + if ((_coordinatorState.hasCache(curr) || _coordinator.isReady()) && !_activeTx.containsKey(curr)) { TransactionAttempt attempt = new TransactionAttempt(curr, _rand.nextLong()); Object state = _coordinatorState.getState(curr, _initializer); _activeTx.put(curr, new TransactionStatus(attempt)); @@ -155,8 +152,8 @@ public class TransactionalSpoutCoordinator extends BaseRichSpout { } curr = nextTransactionId(curr); } - } - } catch(FailedException e) { + } + } catch (FailedException e) { LOG.warn("Failed to get metadata for a transaction", e); } } @@ -167,17 +164,15 @@ public class TransactionalSpoutCoordinator extends BaseRichSpout { ret.setMaxTaskParallelism(1); return ret; } - + private static enum AttemptStatus { - PROCESSING, - PROCESSED, - COMMITTING + PROCESSING, PROCESSED, COMMITTING } - + private static class TransactionStatus { TransactionAttempt attempt; AttemptStatus status; - + public TransactionStatus(TransactionAttempt attempt) { this.attempt = attempt; this.status = AttemptStatus.PROCESSING; @@ -186,28 +181,29 @@ public class TransactionalSpoutCoordinator extends BaseRichSpout { @Override public String toString() { return attempt.toString() + " <" + status.toString() + ">"; - } + } } - - + private BigInteger nextTransactionId(BigInteger id) { return id.add(BigInteger.ONE); } - + private BigInteger previousTransactionId(BigInteger id) { - if(id.equals(INIT_TXID)) { + if (id.equals(INIT_TXID)) { return null; } else { return id.subtract(BigInteger.ONE); } - } - + } + private BigInteger getStoredCurrTransaction(TransactionalState state) { BigInteger ret = (BigInteger) state.getData(CURRENT_TX); - if(ret==null) return INIT_TXID; - else return ret; + if (ret == null) + return INIT_TXID; + else + return ret; } - + private class StateInitializer implements RotatingTransactionalState.StateInitializer { @Override public Object init(BigInteger txid, Object lastState) {
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalTopologyBuilder.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalTopologyBuilder.java b/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalTopologyBuilder.java index 98d1163..e775eb5 100755 --- a/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalTopologyBuilder.java +++ b/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalTopologyBuilder.java @@ -50,8 +50,7 @@ import java.util.Map; import java.util.Set; /** - * Trident subsumes the functionality provided by transactional topologies, so this - * class is deprecated. + * Trident subsumes the functionality provided by transactional topologies, so this class is deprecated. * */ @Deprecated @@ -62,16 +61,16 @@ public class TransactionalTopologyBuilder { Map<String, Component> _bolts = new HashMap<String, Component>(); Integer _spoutParallelism; List<Map> _spoutConfs = new ArrayList(); - + // id is used to store the state of this transactionalspout in zookeeper - // it would be very dangerous to have 2 topologies active with the same id in the same cluster + // it would be very dangerous to have 2 topologies active with the same id in the same cluster public TransactionalTopologyBuilder(String id, String spoutId, ITransactionalSpout spout, Number spoutParallelism) { _id = id; _spoutId = spoutId; _spout = spout; _spoutParallelism = (spoutParallelism == null) ? null : spoutParallelism.intValue(); } - + public TransactionalTopologyBuilder(String id, String spoutId, ITransactionalSpout spout) { this(id, spoutId, spout, null); } @@ -79,27 +78,27 @@ public class TransactionalTopologyBuilder { public TransactionalTopologyBuilder(String id, String spoutId, IPartitionedTransactionalSpout spout, Number spoutParallelism) { this(id, spoutId, new PartitionedTransactionalSpoutExecutor(spout), spoutParallelism); } - + public TransactionalTopologyBuilder(String id, String spoutId, IPartitionedTransactionalSpout spout) { this(id, spoutId, spout, null); } - + public TransactionalTopologyBuilder(String id, String spoutId, IOpaquePartitionedTransactionalSpout spout, Number spoutParallelism) { this(id, spoutId, new OpaquePartitionedTransactionalSpoutExecutor(spout), spoutParallelism); } - + public TransactionalTopologyBuilder(String id, String spoutId, IOpaquePartitionedTransactionalSpout spout) { this(id, spoutId, spout, null); } - + public SpoutDeclarer getSpoutDeclarer() { return new SpoutDeclarerImpl(); } - + public BoltDeclarer setBolt(String id, IBatchBolt bolt) { return setBolt(id, bolt, null); } - + public BoltDeclarer setBolt(String id, IBatchBolt bolt, Number parallelism) { return setBolt(id, new BatchBoltExecutor(bolt), parallelism, bolt instanceof ICommitter); } @@ -107,86 +106,79 @@ public class TransactionalTopologyBuilder { public BoltDeclarer setCommitterBolt(String id, IBatchBolt bolt) { return setCommitterBolt(id, bolt, null); } - + public BoltDeclarer setCommitterBolt(String id, IBatchBolt bolt, Number parallelism) { return setBolt(id, new BatchBoltExecutor(bolt), parallelism, true); - } - + } + public BoltDeclarer setBolt(String id, IBasicBolt bolt) { return setBolt(id, bolt, null); - } - + } + public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism) { return setBolt(id, new BasicBoltExecutor(bolt), parallelism, false); } - + private BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism, boolean committer) { Integer p = null; - if(parallelism!=null) p = parallelism.intValue(); + if (parallelism != null) + p = parallelism.intValue(); Component component = new Component(bolt, p, committer); _bolts.put(id, component); return new BoltDeclarerImpl(component); } - + public TopologyBuilder buildTopologyBuilder() { String coordinator = _spoutId + "/coordinator"; TopologyBuilder builder = new TopologyBuilder(); SpoutDeclarer declarer = builder.setSpout(coordinator, new TransactionalSpoutCoordinator(_spout)); - for(Map conf: _spoutConfs) { + for (Map conf : _spoutConfs) { declarer.addConfigurations(conf); } declarer.addConfiguration(Config.TOPOLOGY_TRANSACTIONAL_ID, _id); - BoltDeclarer emitterDeclarer = - builder.setBolt(_spoutId, - new CoordinatedBolt(new TransactionalSpoutBatchExecutor(_spout), - null, - null), - _spoutParallelism) - .allGrouping(coordinator, TransactionalSpoutCoordinator.TRANSACTION_BATCH_STREAM_ID) - .addConfiguration(Config.TOPOLOGY_TRANSACTIONAL_ID, _id); - if(_spout instanceof ICommitterTransactionalSpout) { + BoltDeclarer emitterDeclarer = + builder.setBolt(_spoutId, new CoordinatedBolt(new TransactionalSpoutBatchExecutor(_spout), null, null), _spoutParallelism) + .allGrouping(coordinator, TransactionalSpoutCoordinator.TRANSACTION_BATCH_STREAM_ID) + .addConfiguration(Config.TOPOLOGY_TRANSACTIONAL_ID, _id); + if (_spout instanceof ICommitterTransactionalSpout) { emitterDeclarer.allGrouping(coordinator, TransactionalSpoutCoordinator.TRANSACTION_COMMIT_STREAM_ID); } - for(String id: _bolts.keySet()) { + for (String id : _bolts.keySet()) { Component component = _bolts.get(id); Map<String, SourceArgs> coordinatedArgs = new HashMap<String, SourceArgs>(); - for(String c: componentBoltSubscriptions(component)) { + for (String c : componentBoltSubscriptions(component)) { coordinatedArgs.put(c, SourceArgs.all()); } - + IdStreamSpec idSpec = null; - if(component.committer) { - idSpec = IdStreamSpec.makeDetectSpec(coordinator, TransactionalSpoutCoordinator.TRANSACTION_COMMIT_STREAM_ID); + if (component.committer) { + idSpec = IdStreamSpec.makeDetectSpec(coordinator, TransactionalSpoutCoordinator.TRANSACTION_COMMIT_STREAM_ID); } - BoltDeclarer input = builder.setBolt(id, - new CoordinatedBolt(component.bolt, - coordinatedArgs, - idSpec), - component.parallelism); - for(Map conf: component.componentConfs) { + BoltDeclarer input = builder.setBolt(id, new CoordinatedBolt(component.bolt, coordinatedArgs, idSpec), component.parallelism); + for (Map conf : component.componentConfs) { input.addConfigurations(conf); } - for(String c: componentBoltSubscriptions(component)) { + for (String c : componentBoltSubscriptions(component)) { input.directGrouping(c, Constants.COORDINATED_STREAM_ID); } - for(InputDeclaration d: component.declarations) { + for (InputDeclaration d : component.declarations) { d.declare(input); } - if(component.committer) { - input.allGrouping(coordinator, TransactionalSpoutCoordinator.TRANSACTION_COMMIT_STREAM_ID); + if (component.committer) { + input.allGrouping(coordinator, TransactionalSpoutCoordinator.TRANSACTION_COMMIT_STREAM_ID); } } return builder; } - + public StormTopology buildTopology() { return buildTopologyBuilder().createTopology(); } - + private Set<String> componentBoltSubscriptions(Component component) { Set<String> ret = new HashSet<String>(); - for(InputDeclaration d: component.declarations) { + for (InputDeclaration d : component.declarations) { ret.add(d.getComponent()); } return ret; @@ -198,34 +190,35 @@ public class TransactionalTopologyBuilder { public List<InputDeclaration> declarations = new ArrayList<InputDeclaration>(); public List<Map> componentConfs = new ArrayList<Map>(); public boolean committer; - + public Component(IRichBolt bolt, Integer parallelism, boolean committer) { this.bolt = bolt; this.parallelism = parallelism; this.committer = committer; } } - + private static interface InputDeclaration { void declare(InputDeclarer declarer); + String getComponent(); } - + private class SpoutDeclarerImpl extends BaseConfigurationDeclarer<SpoutDeclarer> implements SpoutDeclarer { @Override public SpoutDeclarer addConfigurations(Map conf) { _spoutConfs.add(conf); return this; - } + } } - + private class BoltDeclarerImpl extends BaseConfigurationDeclarer<BoltDeclarer> implements BoltDeclarer { Component _component; - + public BoltDeclarerImpl(Component component) { _component = component; } - + @Override public BoltDeclarer fieldsGrouping(final String component, final Fields fields) { addDeclaration(new InputDeclaration() { @@ -237,7 +230,7 @@ public class TransactionalTopologyBuilder { @Override public String getComponent() { return component; - } + } }); return this; } @@ -248,12 +241,12 @@ public class TransactionalTopologyBuilder { @Override public void declare(InputDeclarer declarer) { declarer.fieldsGrouping(component, streamId, fields); - } + } @Override public String getComponent() { return component; - } + } }); return this; } @@ -264,12 +257,12 @@ public class TransactionalTopologyBuilder { @Override public void declare(InputDeclarer declarer) { declarer.globalGrouping(component); - } + } @Override public String getComponent() { return component; - } + } }); return this; } @@ -280,12 +273,12 @@ public class TransactionalTopologyBuilder { @Override public void declare(InputDeclarer declarer) { declarer.globalGrouping(component, streamId); - } + } @Override public String getComponent() { return component; - } + } }); return this; } @@ -296,12 +289,12 @@ public class TransactionalTopologyBuilder { @Override public void declare(InputDeclarer declarer) { declarer.shuffleGrouping(component); - } + } @Override public String getComponent() { return component; - } + } }); return this; } @@ -312,12 +305,12 @@ public class TransactionalTopologyBuilder { @Override public void declare(InputDeclarer declarer) { declarer.shuffleGrouping(component, streamId); - } + } @Override public String getComponent() { return component; - } + } }); return this; } @@ -328,12 +321,12 @@ public class TransactionalTopologyBuilder { @Override public void declare(InputDeclarer declarer) { declarer.localOrShuffleGrouping(component); - } + } @Override public String getComponent() { return component; - } + } }); return this; } @@ -345,7 +338,7 @@ public class TransactionalTopologyBuilder { public void declare(InputDeclarer declarer) { declarer.localOrShuffleGrouping(component, streamId); } - + @Override public String getComponent() { return component; @@ -353,7 +346,7 @@ public class TransactionalTopologyBuilder { }); return this; } - + @Override public BoltDeclarer localFirstGrouping(final String component) { addDeclaration(new InputDeclaration() { @@ -361,7 +354,7 @@ public class TransactionalTopologyBuilder { public void declare(InputDeclarer declarer) { declarer.localFirstGrouping(component); } - + @Override public String getComponent() { return component; @@ -369,7 +362,7 @@ public class TransactionalTopologyBuilder { }); return this; } - + @Override public BoltDeclarer localFirstGrouping(final String component, final String streamId) { addDeclaration(new InputDeclaration() { @@ -377,7 +370,7 @@ public class TransactionalTopologyBuilder { public void declare(InputDeclarer declarer) { declarer.localFirstGrouping(component, streamId); } - + @Override public String getComponent() { return component; @@ -385,19 +378,19 @@ public class TransactionalTopologyBuilder { }); return this; } - + @Override public BoltDeclarer noneGrouping(final String component) { addDeclaration(new InputDeclaration() { @Override public void declare(InputDeclarer declarer) { declarer.noneGrouping(component); - } + } @Override public String getComponent() { return component; - } + } }); return this; } @@ -408,12 +401,12 @@ public class TransactionalTopologyBuilder { @Override public void declare(InputDeclarer declarer) { declarer.noneGrouping(component, streamId); - } + } @Override public String getComponent() { return component; - } + } }); return this; } @@ -424,12 +417,12 @@ public class TransactionalTopologyBuilder { @Override public void declare(InputDeclarer declarer) { declarer.allGrouping(component); - } + } @Override public String getComponent() { return component; - } + } }); return this; } @@ -440,12 +433,12 @@ public class TransactionalTopologyBuilder { @Override public void declare(InputDeclarer declarer) { declarer.allGrouping(component, streamId); - } + } @Override public String getComponent() { return component; - } + } }); return this; } @@ -456,12 +449,12 @@ public class TransactionalTopologyBuilder { @Override public void declare(InputDeclarer declarer) { declarer.directGrouping(component); - } + } @Override public String getComponent() { return component; - } + } }); return this; } @@ -472,12 +465,12 @@ public class TransactionalTopologyBuilder { @Override public void declare(InputDeclarer declarer) { declarer.directGrouping(component, streamId); - } + } @Override public String getComponent() { return component; - } + } }); return this; } @@ -498,14 +491,14 @@ public class TransactionalTopologyBuilder { @Override public void declare(InputDeclarer declarer) { declarer.customGrouping(component, grouping); - } + } @Override public String getComponent() { return component; - } + } }); - return this; + return this; } @Override @@ -514,12 +507,12 @@ public class TransactionalTopologyBuilder { @Override public void declare(InputDeclarer declarer) { declarer.customGrouping(component, streamId, grouping); - } + } @Override public String getComponent() { return component; - } + } }); return this; } @@ -530,16 +523,16 @@ public class TransactionalTopologyBuilder { @Override public void declare(InputDeclarer declarer) { declarer.grouping(stream, grouping); - } + } @Override public String getComponent() { return stream.get_componentId(); - } + } }); return this; } - + private void addDeclaration(InputDeclaration declaration) { _component.declarations.add(declaration); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/IOpaquePartitionedTransactionalSpout.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/IOpaquePartitionedTransactionalSpout.java b/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/IOpaquePartitionedTransactionalSpout.java index 8d1f60b..35fb1c6 100755 --- a/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/IOpaquePartitionedTransactionalSpout.java +++ b/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/IOpaquePartitionedTransactionalSpout.java @@ -24,33 +24,34 @@ import backtype.storm.transactional.TransactionAttempt; import java.util.Map; /** - * This defines a transactional spout which does *not* necessarily - * replay the same batch every time it emits a batch for a transaction id. + * This defines a transactional spout which does *not* necessarily replay the same batch every time it emits a batch for a transaction id. */ public interface IOpaquePartitionedTransactionalSpout<T> extends IComponent { public interface Coordinator { /** * Returns true if its ok to emit start a new transaction, false otherwise (will skip this transaction). * - * You should sleep here if you want a delay between asking for the next transaction (this will be called - * repeatedly in a loop). + * You should sleep here if you want a delay between asking for the next transaction (this will be called repeatedly in a loop). */ boolean isReady(); + void close(); } - + public interface Emitter<X> { /** - * Emit a batch of tuples for a partition/transaction. + * Emit a batch of tuples for a partition/transaction. * - * Return the metadata describing this batch that will be used as lastPartitionMeta - * for defining the parameters of the next batch. + * Return the metadata describing this batch that will be used as lastPartitionMeta for defining the parameters of the next batch. */ X emitPartitionBatch(TransactionAttempt tx, BatchOutputCollector collector, int partition, X lastPartitionMeta); + int numPartitions(); + void close(); } - - Emitter<T> getEmitter(Map conf, TopologyContext context); - Coordinator getCoordinator(Map conf, TopologyContext context); + + Emitter<T> getEmitter(Map conf, TopologyContext context); + + Coordinator getCoordinator(Map conf, TopologyContext context); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/IPartitionedTransactionalSpout.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/IPartitionedTransactionalSpout.java b/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/IPartitionedTransactionalSpout.java index e428328..7b1e4fb 100755 --- a/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/IPartitionedTransactionalSpout.java +++ b/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/IPartitionedTransactionalSpout.java @@ -24,46 +24,43 @@ import backtype.storm.coordination.BatchOutputCollector; import java.util.Map; /** - * This interface defines a transactional spout that reads its tuples from a partitioned set of - * brokers. It automates the storing of metadata for each partition to ensure that the same batch - * is always emitted for the same transaction id. The partition metadata is stored in Zookeeper. + * This interface defines a transactional spout that reads its tuples from a partitioned set of brokers. It automates the storing of metadata for each partition + * to ensure that the same batch is always emitted for the same transaction id. The partition metadata is stored in Zookeeper. */ public interface IPartitionedTransactionalSpout<T> extends IComponent { public interface Coordinator { /** - * Return the number of partitions currently in the source of data. The idea is - * is that if a new partition is added and a prior transaction is replayed, it doesn't - * emit tuples for the new partition because it knows how many partitions were in - * that transaction. + * Return the number of partitions currently in the source of data. The idea is is that if a new partition is added and a prior transaction is replayed, + * it doesn't emit tuples for the new partition because it knows how many partitions were in that transaction. */ int numPartitions(); - + /** * Returns true if its ok to emit start a new transaction, false otherwise (will skip this transaction). * - * You should sleep here if you want a delay between asking for the next transaction (this will be called - * repeatedly in a loop). + * You should sleep here if you want a delay between asking for the next transaction (this will be called repeatedly in a loop). */ boolean isReady(); - + void close(); } - + public interface Emitter<X> { /** - * Emit a batch of tuples for a partition/transaction that's never been emitted before. - * Return the metadata that can be used to reconstruct this partition/batch in the future. + * Emit a batch of tuples for a partition/transaction that's never been emitted before. Return the metadata that can be used to reconstruct this + * partition/batch in the future. */ X emitPartitionBatchNew(TransactionAttempt tx, BatchOutputCollector collector, int partition, X lastPartitionMeta); /** - * Emit a batch of tuples for a partition/transaction that has been emitted before, using - * the metadata created when it was first emitted. + * Emit a batch of tuples for a partition/transaction that has been emitted before, using the metadata created when it was first emitted. */ void emitPartitionBatch(TransactionAttempt tx, BatchOutputCollector collector, int partition, X partitionMeta); + void close(); } - + Coordinator getCoordinator(Map conf, TopologyContext context); - Emitter<T> getEmitter(Map conf, TopologyContext context); + + Emitter<T> getEmitter(Map conf, TopologyContext context); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/OpaquePartitionedTransactionalSpoutExecutor.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/OpaquePartitionedTransactionalSpoutExecutor.java b/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/OpaquePartitionedTransactionalSpoutExecutor.java index aabcb7a..4f894d9 100755 --- a/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/OpaquePartitionedTransactionalSpoutExecutor.java +++ b/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/OpaquePartitionedTransactionalSpoutExecutor.java @@ -33,17 +33,16 @@ import java.util.Map; import java.util.Map.Entry; import java.util.TreeMap; - public class OpaquePartitionedTransactionalSpoutExecutor implements ICommitterTransactionalSpout<Object> { IOpaquePartitionedTransactionalSpout _spout; - + public class Coordinator implements ITransactionalSpout.Coordinator<Object> { IOpaquePartitionedTransactionalSpout.Coordinator _coordinator; public Coordinator(Map conf, TopologyContext context) { _coordinator = _spout.getCoordinator(conf, context); } - + @Override public Object initializeTransaction(BigInteger txid, Object prevMetadata) { return null; @@ -52,14 +51,14 @@ public class OpaquePartitionedTransactionalSpoutExecutor implements ICommitterTr @Override public boolean isReady() { return _coordinator.isReady(); - } + } @Override public void close() { _coordinator.close(); - } + } } - + public class Emitter implements ICommitterTransactionalSpout.Emitter { IOpaquePartitionedTransactionalSpout.Emitter _emitter; TransactionalState _state; @@ -67,21 +66,21 @@ public class OpaquePartitionedTransactionalSpoutExecutor implements ICommitterTr Map<Integer, RotatingTransactionalState> _partitionStates = new HashMap<Integer, RotatingTransactionalState>(); int _index; int _numTasks; - + public Emitter(Map conf, TopologyContext context) { _emitter = _spout.getEmitter(conf, context); _index = context.getThisTaskIndex(); _numTasks = context.getComponentTasks(context.getThisComponentId()).size(); - _state = TransactionalState.newUserState(conf, (String) conf.get(Config.TOPOLOGY_TRANSACTIONAL_ID), getComponentConfiguration()); + _state = TransactionalState.newUserState(conf, (String) conf.get(Config.TOPOLOGY_TRANSACTIONAL_ID), getComponentConfiguration()); List<String> existingPartitions = _state.list(""); - for(String p: existingPartitions) { + for (String p : existingPartitions) { int partition = Integer.parseInt(p); - if((partition - _index) % _numTasks == 0) { + if ((partition - _index) % _numTasks == 0) { _partitionStates.put(partition, new RotatingTransactionalState(_state, p)); } } } - + @Override public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, BatchOutputCollector collector) { Map<Integer, Object> metas = new HashMap<Integer, Object>(); @@ -89,21 +88,22 @@ public class OpaquePartitionedTransactionalSpoutExecutor implements ICommitterTr int partitions = _emitter.numPartitions(); Entry<BigInteger, Map<Integer, Object>> entry = _cachedMetas.lowerEntry(tx.getTransactionId()); Map<Integer, Object> prevCached; - if(entry!=null) { + if (entry != null) { prevCached = entry.getValue(); } else { prevCached = new HashMap<Integer, Object>(); } - - for(int i=_index; i < partitions; i+=_numTasks) { + + for (int i = _index; i < partitions; i += _numTasks) { RotatingTransactionalState state = _partitionStates.get(i); - if(state==null) { + if (state == null) { state = new RotatingTransactionalState(_state, "" + i); _partitionStates.put(i, state); } state.removeState(tx.getTransactionId()); Object lastMeta = prevCached.get(i); - if(lastMeta==null) lastMeta = state.getLastState(); + if (lastMeta == null) + lastMeta = state.getLastState(); Object meta = _emitter.emitPartitionBatch(tx, collector, i, lastMeta); metas.put(i, meta); } @@ -111,16 +111,16 @@ public class OpaquePartitionedTransactionalSpoutExecutor implements ICommitterTr @Override public void cleanupBefore(BigInteger txid) { - for(RotatingTransactionalState state: _partitionStates.values()) { + for (RotatingTransactionalState state : _partitionStates.values()) { state.cleanupBefore(txid); - } + } } @Override public void commit(TransactionAttempt attempt) { BigInteger txid = attempt.getTransactionId(); Map<Integer, Object> metas = _cachedMetas.remove(txid); - for(Integer partition: metas.keySet()) { + for (Integer partition : metas.keySet()) { Object meta = metas.get(partition); _partitionStates.get(partition).overrideState(txid, meta); } @@ -130,12 +130,12 @@ public class OpaquePartitionedTransactionalSpoutExecutor implements ICommitterTr public void close() { _emitter.close(); } - } - + } + public OpaquePartitionedTransactionalSpoutExecutor(IOpaquePartitionedTransactionalSpout spout) { _spout = spout; } - + @Override public ITransactionalSpout.Coordinator<Object> getCoordinator(Map conf, TopologyContext context) { return new Coordinator(conf, context); @@ -155,5 +155,5 @@ public class OpaquePartitionedTransactionalSpoutExecutor implements ICommitterTr public Map<String, Object> getComponentConfiguration() { return _spout.getComponentConfiguration(); } - + } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/PartitionedTransactionalSpoutExecutor.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/PartitionedTransactionalSpoutExecutor.java b/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/PartitionedTransactionalSpoutExecutor.java index 479dda4..8422576 100644 --- a/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/PartitionedTransactionalSpoutExecutor.java +++ b/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/PartitionedTransactionalSpoutExecutor.java @@ -29,30 +29,29 @@ import java.math.BigInteger; import java.util.HashMap; import java.util.Map; - public class PartitionedTransactionalSpoutExecutor implements ITransactionalSpout<Integer> { IPartitionedTransactionalSpout _spout; - + public PartitionedTransactionalSpoutExecutor(IPartitionedTransactionalSpout spout) { _spout = spout; } - + public IPartitionedTransactionalSpout getPartitionedSpout() { return _spout; } - + class Coordinator implements ITransactionalSpout.Coordinator<Integer> { private IPartitionedTransactionalSpout.Coordinator _coordinator; - + public Coordinator(Map conf, TopologyContext context) { _coordinator = _spout.getCoordinator(conf, context); } - + @Override public Integer initializeTransaction(BigInteger txid, Integer prevMetadata) { return _coordinator.numPartitions(); } - + @Override public boolean isReady() { return _coordinator.isReady(); @@ -61,53 +60,51 @@ public class PartitionedTransactionalSpoutExecutor implements ITransactionalSpou @Override public void close() { _coordinator.close(); - } + } } - + class Emitter implements ITransactionalSpout.Emitter<Integer> { private IPartitionedTransactionalSpout.Emitter _emitter; private TransactionalState _state; private Map<Integer, RotatingTransactionalState> _partitionStates = new HashMap<Integer, RotatingTransactionalState>(); private int _index; private int _numTasks; - + public Emitter(Map conf, TopologyContext context) { _emitter = _spout.getEmitter(conf, context); - _state = TransactionalState.newUserState(conf, (String) conf.get(Config.TOPOLOGY_TRANSACTIONAL_ID), getComponentConfiguration()); + _state = TransactionalState.newUserState(conf, (String) conf.get(Config.TOPOLOGY_TRANSACTIONAL_ID), getComponentConfiguration()); _index = context.getThisTaskIndex(); _numTasks = context.getComponentTasks(context.getThisComponentId()).size(); } @Override - public void emitBatch(final TransactionAttempt tx, final Integer partitions, - final BatchOutputCollector collector) { - for(int i=_index; i < partitions; i+=_numTasks) { - if(!_partitionStates.containsKey(i)) { + public void emitBatch(final TransactionAttempt tx, final Integer partitions, final BatchOutputCollector collector) { + for (int i = _index; i < partitions; i += _numTasks) { + if (!_partitionStates.containsKey(i)) { _partitionStates.put(i, new RotatingTransactionalState(_state, "" + i)); } RotatingTransactionalState state = _partitionStates.get(i); final int partition = i; - Object meta = state.getStateOrCreate(tx.getTransactionId(), - new RotatingTransactionalState.StateInitializer() { + Object meta = state.getStateOrCreate(tx.getTransactionId(), new RotatingTransactionalState.StateInitializer() { @Override public Object init(BigInteger txid, Object lastState) { return _emitter.emitPartitionBatchNew(tx, collector, partition, lastState); } }); // it's null if one of: - // a) a later transaction batch was emitted before this, so we should skip this batch - // b) if didn't exist and was created (in which case the StateInitializer was invoked and - // it was emitted - if(meta!=null) { + // a) a later transaction batch was emitted before this, so we should skip this batch + // b) if didn't exist and was created (in which case the StateInitializer was invoked and + // it was emitted + if (meta != null) { _emitter.emitPartitionBatch(tx, collector, partition, meta); } } - + } @Override public void cleanupBefore(BigInteger txid) { - for(RotatingTransactionalState state: _partitionStates.values()) { + for (RotatingTransactionalState state : _partitionStates.values()) { state.cleanupBefore(txid); } } @@ -117,7 +114,7 @@ public class PartitionedTransactionalSpoutExecutor implements ITransactionalSpou _state.close(); _emitter.close(); } - } + } @Override public ITransactionalSpout.Coordinator getCoordinator(Map conf, TopologyContext context) { @@ -138,5 +135,5 @@ public class PartitionedTransactionalSpoutExecutor implements ITransactionalSpou public Map<String, Object> getComponentConfiguration() { return _spout.getComponentConfiguration(); } - + } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/transactional/state/RotatingTransactionalState.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/transactional/state/RotatingTransactionalState.java b/jstorm-core/src/main/java/backtype/storm/transactional/state/RotatingTransactionalState.java index 20c5cd3..63aced9 100644 --- a/jstorm-core/src/main/java/backtype/storm/transactional/state/RotatingTransactionalState.java +++ b/jstorm-core/src/main/java/backtype/storm/transactional/state/RotatingTransactionalState.java @@ -27,19 +27,19 @@ import java.util.SortedMap; import java.util.TreeMap; /** - * A map from txid to a value. Automatically deletes txids that have been committed. + * A map from txid to a value. Automatically deletes txids that have been committed. */ public class RotatingTransactionalState { public static interface StateInitializer { Object init(BigInteger txid, Object lastState); - } + } private TransactionalState _state; private String _subdir; private boolean _strictOrder; - + private TreeMap<BigInteger, Object> _curr = new TreeMap<BigInteger, Object>(); - + public RotatingTransactionalState(TransactionalState state, String subdir, boolean strictOrder) { _state = state; _subdir = subdir; @@ -51,32 +51,35 @@ public class RotatingTransactionalState { public RotatingTransactionalState(TransactionalState state, String subdir) { this(state, subdir, false); } - + public Object getLastState() { - if(_curr.isEmpty()) return null; - else return _curr.lastEntry().getValue(); + if (_curr.isEmpty()) + return null; + else + return _curr.lastEntry().getValue(); } - + public void overrideState(BigInteger txid, Object state) { _state.setData(txPath(txid), state); _curr.put(txid, state); } public void removeState(BigInteger txid) { - if(_curr.containsKey(txid)) { + if (_curr.containsKey(txid)) { _curr.remove(txid); _state.delete(txPath(txid)); } } - + public Object getState(BigInteger txid, StateInitializer init) { - if(!_curr.containsKey(txid)) { + if (!_curr.containsKey(txid)) { SortedMap<BigInteger, Object> prevMap = _curr.headMap(txid); - SortedMap<BigInteger, Object> afterMap = _curr.tailMap(txid); - + SortedMap<BigInteger, Object> afterMap = _curr.tailMap(txid); + BigInteger prev = null; - if(!prevMap.isEmpty()) prev = prevMap.lastKey(); - + if (!prevMap.isEmpty()) + prev = prevMap.lastKey(); + if (_strictOrder) { if (prev == null && !txid.equals(TransactionalSpoutCoordinator.INIT_TXID)) { throw new IllegalStateException("Trying to initialize transaction for which there should be a previous state"); @@ -88,7 +91,7 @@ public class RotatingTransactionalState { throw new IllegalStateException("Expecting tx state to be initialized in strict order but there are txids after that have state"); } } - + Object data; if (afterMap.isEmpty()) { Object prevData; @@ -106,11 +109,11 @@ public class RotatingTransactionalState { } return _curr.get(txid); } - + public boolean hasCache(BigInteger txid) { return _curr.containsKey(txid); } - + /** * Returns null if it was created, the value otherwise. */ @@ -122,7 +125,7 @@ public class RotatingTransactionalState { return null; } } - + public void cleanupBefore(BigInteger txid) { Set<BigInteger> toDelete = new HashSet<BigInteger>(); toDelete.addAll(_curr.headMap(txid).keySet()); @@ -131,21 +134,21 @@ public class RotatingTransactionalState { _state.delete(txPath(tx)); } } - + private void sync() { List<String> txids = _state.list(_subdir); - for(String txid_s: txids) { + for (String txid_s : txids) { Object data = _state.getData(txPath(txid_s)); _curr.put(new BigInteger(txid_s), data); } } - + private String txPath(BigInteger tx) { return txPath(tx.toString()); } private String txPath(String tx) { return _subdir + "/" + tx; - } - + } + } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/transactional/state/TestTransactionalState.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/transactional/state/TestTransactionalState.java b/jstorm-core/src/main/java/backtype/storm/transactional/state/TestTransactionalState.java index 3d4a463..02b3d0d 100755 --- a/jstorm-core/src/main/java/backtype/storm/transactional/state/TestTransactionalState.java +++ b/jstorm-core/src/main/java/backtype/storm/transactional/state/TestTransactionalState.java @@ -32,16 +32,13 @@ import org.apache.zookeeper.data.ACL; public class TestTransactionalState extends TransactionalState { /** - * Matching constructor in absence of a default constructor in the parent - * class. + * Matching constructor in absence of a default constructor in the parent class. */ protected TestTransactionalState(Map conf, String id, Map componentConf, String subroot) { super(conf, id, componentConf, subroot); } - public static void createNode(CuratorFramework curator, - String rootDir, byte[] data, List<ACL> acls, CreateMode mode) - throws Exception { - TransactionalState.createNode(curator, rootDir, data, acls, mode); + public static void createNode(CuratorFramework curator, String rootDir, byte[] data, List<ACL> acls, CreateMode mode) throws Exception { + TransactionalState.createNode(curator, rootDir, data, acls, mode); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/transactional/state/TransactionalState.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/transactional/state/TransactionalState.java b/jstorm-core/src/main/java/backtype/storm/transactional/state/TransactionalState.java index 5afcd0a..71d7cc3 100755 --- a/jstorm-core/src/main/java/backtype/storm/transactional/state/TransactionalState.java +++ b/jstorm-core/src/main/java/backtype/storm/transactional/state/TransactionalState.java @@ -40,25 +40,23 @@ public class TransactionalState { KryoValuesSerializer _ser; KryoValuesDeserializer _des; List<ACL> _zkAcls = null; - + public static TransactionalState newUserState(Map conf, String id, Map componentConf) { return new TransactionalState(conf, id, componentConf, "user"); } - + public static TransactionalState newCoordinatorState(Map conf, String id, Map componentConf) { - return new TransactionalState(conf, id, componentConf, "coordinator"); + return new TransactionalState(conf, id, componentConf, "coordinator"); } - + protected TransactionalState(Map conf, String id, Map componentConf, String subroot) { try { conf = new HashMap(conf); // ensure that the serialization registrations are consistent with the declarations in this spout - if(componentConf!=null) { - conf.put(Config.TOPOLOGY_KRYO_REGISTER, - componentConf - .get(Config.TOPOLOGY_KRYO_REGISTER)); + if (componentConf != null) { + conf.put(Config.TOPOLOGY_KRYO_REGISTER, componentConf.get(Config.TOPOLOGY_KRYO_REGISTER)); } - String transactionalRoot = (String)conf.get(Config.TRANSACTIONAL_ZOOKEEPER_ROOT); + String transactionalRoot = (String) conf.get(Config.TRANSACTIONAL_ZOOKEEPER_ROOT); String rootDir = transactionalRoot + "/" + id + "/" + subroot; List<String> servers = (List<String>) getWithBackup(conf, Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Config.STORM_ZOOKEEPER_SERVERS); Object port = getWithBackup(conf, Config.TRANSACTIONAL_ZOOKEEPER_PORT, Config.STORM_ZOOKEEPER_PORT); @@ -74,29 +72,24 @@ public class TransactionalState { } catch (KeeperException.NodeExistsException e) { } initter.close(); - + _curator = Utils.newCuratorStarted(conf, servers, port, rootDir, auth); _ser = new KryoValuesSerializer(conf); _des = new KryoValuesDeserializer(conf); } catch (Exception e) { - throw new RuntimeException(e); + throw new RuntimeException(e); } } - protected static String forPath(PathAndBytesable<String> builder, - String path, byte[] data) throws Exception { - return (data == null) - ? builder.forPath(path) - : builder.forPath(path, data); + protected static String forPath(PathAndBytesable<String> builder, String path, byte[] data) throws Exception { + return (data == null) ? builder.forPath(path) : builder.forPath(path, data); } - protected static void createNode(CuratorFramework curator, String path, - byte[] data, List<ACL> acls, CreateMode mode) throws Exception { - ProtectACLCreateModePathAndBytesable<String> builder = - curator.create().creatingParentsIfNeeded(); - + protected static void createNode(CuratorFramework curator, String path, byte[] data, List<ACL> acls, CreateMode mode) throws Exception { + ProtectACLCreateModePathAndBytesable<String> builder = curator.create().creatingParentsIfNeeded(); + if (acls == null) { - if (mode == null ) { + if (mode == null) { TransactionalState.forPath(builder, path, data); } else { TransactionalState.forPath(builder.withMode(mode), path, data); @@ -111,17 +104,16 @@ public class TransactionalState { path = "/" + path; byte[] ser = _ser.serializeObject(obj); try { - if(_curator.checkExists().forPath(path)!=null) { + if (_curator.checkExists().forPath(path) != null) { _curator.setData().forPath(path, ser); } else { - TransactionalState.createNode(_curator, path, ser, _zkAcls, - CreateMode.PERSISTENT); + TransactionalState.createNode(_curator, path, ser, _zkAcls, CreateMode.PERSISTENT); } - } catch(Exception e) { + } catch (Exception e) { throw new RuntimeException(e); - } + } } - + public void delete(String path) { path = "/" + path; try { @@ -130,44 +122,45 @@ public class TransactionalState { throw new RuntimeException(e); } } - + public List<String> list(String path) { path = "/" + path; try { - if(_curator.checkExists().forPath(path)==null) { + if (_curator.checkExists().forPath(path) == null) { return new ArrayList<String>(); } else { return _curator.getChildren().forPath(path); } - } catch(Exception e) { + } catch (Exception e) { throw new RuntimeException(e); - } + } } - + public void mkdir(String path) { setData(path, 7); } - + public Object getData(String path) { path = "/" + path; try { - if(_curator.checkExists().forPath(path)!=null) { + if (_curator.checkExists().forPath(path) != null) { return _des.deserializeObject(_curator.getData().forPath(path)); } else { return null; } - } catch(Exception e) { + } catch (Exception e) { throw new RuntimeException(e); } } - + public void close() { _curator.close(); } - + private Object getWithBackup(Map amap, Object primary, Object backup) { Object ret = amap.get(primary); - if(ret==null) return amap.get(backup); + if (ret == null) + return amap.get(backup); return ret; } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/tuple/BatchTuple.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/tuple/BatchTuple.java b/jstorm-core/src/main/java/backtype/storm/tuple/BatchTuple.java index 47df545..eb3d0ce 100644 --- a/jstorm-core/src/main/java/backtype/storm/tuple/BatchTuple.java +++ b/jstorm-core/src/main/java/backtype/storm/tuple/BatchTuple.java @@ -20,11 +20,10 @@ package backtype.storm.tuple; import java.util.ArrayList; import java.util.List; - -public class BatchTuple { +public class BatchTuple implements ITupleExt{ private int targetTaskId; - private List<Tuple> batch; + private List<Tuple> batch = new ArrayList<Tuple>(); private int batchSize; public BatchTuple() { @@ -37,15 +36,12 @@ public class BatchTuple { } public void addToBatch(Tuple tuple) { - if (batch == null) { - batch = new ArrayList<Tuple>(); - } batch.add(tuple); } public boolean isBatchFull() { boolean ret = false; - if (batch != null && batch.size() >= batchSize) + if (batch.size() >= batchSize) ret = true; return ret; @@ -60,7 +56,7 @@ public class BatchTuple { } public int currBatchSize() { - return batch == null ? 0 : batch.size(); + return batch.size(); } public void setTargetTaskId(int taskId) { @@ -74,4 +70,16 @@ public class BatchTuple { public void setBatchSize(int batchSize) { this.batchSize = batchSize; } -} + + @Deprecated + public long getCreationTimeStamp() { + // TODO Auto-generated method stub + return 0; + } + + @Deprecated + public void setCreationTimeStamp(long timeStamp) { + // TODO Auto-generated method stub + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/tuple/Fields.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/tuple/Fields.java b/jstorm-core/src/main/java/backtype/storm/tuple/Fields.java index 9805ba6..6ba1e5c 100644 --- a/jstorm-core/src/main/java/backtype/storm/tuple/Fields.java +++ b/jstorm-core/src/main/java/backtype/storm/tuple/Fields.java @@ -28,26 +28,24 @@ import java.io.Serializable; public class Fields implements Iterable<String>, Serializable { private List<String> _fields; private Map<String, Integer> _index = new HashMap<String, Integer>(); - + public Fields(String... fields) { this(Arrays.asList(fields)); } - + public Fields(List<String> fields) { _fields = new ArrayList<String>(fields.size()); for (String field : fields) { if (_fields.contains(field)) - throw new IllegalArgumentException( - String.format("duplicate field '%s'", field) - ); + throw new IllegalArgumentException(String.format("duplicate field '%s'", field)); _fields.add(field); } index(); } - + public List<Object> select(Fields selector, List<Object> tuple) { List<Object> ret = new ArrayList<Object>(selector.size()); - for(String s: selector) { + for (String s : selector) { ret.add(tuple.get(_index.get(s))); } return ret; @@ -56,7 +54,7 @@ public class Fields implements Iterable<String>, Serializable { public List<String> toList() { return new ArrayList<String>(_fields); } - + public int size() { return _fields.size(); } @@ -68,27 +66,27 @@ public class Fields implements Iterable<String>, Serializable { public Iterator<String> iterator() { return _fields.iterator(); } - + /** * Returns the position of the specified field. */ public int fieldIndex(String field) { Integer ret = _index.get(field); - if(ret==null) { + if (ret == null) { throw new IllegalArgumentException(field + " does not exist"); } return ret; } - + /** * Returns true if this contains the specified name of the field. */ public boolean contains(String field) { return _index.containsKey(field); } - + private void index() { - for(int i=0; i<_fields.size(); i++) { + for (int i = 0; i < _fields.size(); i++) { _index.put(_fields.get(i), i); } } @@ -96,5 +94,5 @@ public class Fields implements Iterable<String>, Serializable { @Override public String toString() { return _fields.toString(); - } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/tuple/ITuple.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/tuple/ITuple.java b/jstorm-core/src/main/java/backtype/storm/tuple/ITuple.java index c85848d..21696b5 100755 --- a/jstorm-core/src/main/java/backtype/storm/tuple/ITuple.java +++ b/jstorm-core/src/main/java/backtype/storm/tuple/ITuple.java @@ -52,60 +52,50 @@ public interface ITuple { public Object getValue(int i); /** - * Returns the String at position i in the tuple. If that field is not a String, - * you will get a runtime error. + * Returns the String at position i in the tuple. If that field is not a String, you will get a runtime error. */ public String getString(int i); /** - * Returns the Integer at position i in the tuple. If that field is not an Integer, - * you will get a runtime error. + * Returns the Integer at position i in the tuple. If that field is not an Integer, you will get a runtime error. */ public Integer getInteger(int i); /** - * Returns the Long at position i in the tuple. If that field is not a Long, - * you will get a runtime error. + * Returns the Long at position i in the tuple. If that field is not a Long, you will get a runtime error. */ public Long getLong(int i); /** - * Returns the Boolean at position i in the tuple. If that field is not a Boolean, - * you will get a runtime error. + * Returns the Boolean at position i in the tuple. If that field is not a Boolean, you will get a runtime error. */ public Boolean getBoolean(int i); /** - * Returns the Short at position i in the tuple. If that field is not a Short, - * you will get a runtime error. + * Returns the Short at position i in the tuple. If that field is not a Short, you will get a runtime error. */ public Short getShort(int i); /** - * Returns the Byte at position i in the tuple. If that field is not a Byte, - * you will get a runtime error. + * Returns the Byte at position i in the tuple. If that field is not a Byte, you will get a runtime error. */ public Byte getByte(int i); /** - * Returns the Double at position i in the tuple. If that field is not a Double, - * you will get a runtime error. + * Returns the Double at position i in the tuple. If that field is not a Double, you will get a runtime error. */ public Double getDouble(int i); /** - * Returns the Float at position i in the tuple. If that field is not a Float, - * you will get a runtime error. + * Returns the Float at position i in the tuple. If that field is not a Float, you will get a runtime error. */ public Float getFloat(int i); /** - * Returns the byte array at position i in the tuple. If that field is not a byte array, - * you will get a runtime error. + * Returns the byte array at position i in the tuple. If that field is not a byte array, you will get a runtime error. */ public byte[] getBinary(int i); - public Object getValueByField(String field); public String getStringByField(String field); @@ -131,6 +121,4 @@ public interface ITuple { */ public List<Object> getValues(); - - } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/tuple/ITupleExt.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/tuple/ITupleExt.java b/jstorm-core/src/main/java/backtype/storm/tuple/ITupleExt.java new file mode 100644 index 0000000..92a7157 --- /dev/null +++ b/jstorm-core/src/main/java/backtype/storm/tuple/ITupleExt.java @@ -0,0 +1,25 @@ +package backtype.storm.tuple; + +public interface ITupleExt { + + /** + * Get Target TaskId + * + * @return + */ + int getTargetTaskId(); + + void setTargetTaskId(int targetTaskId); + + /** + * Get the timeStamp of creating tuple + * + * @return + */ + long getCreationTimeStamp(); + + /* + * set ms + */ + void setCreationTimeStamp(long timeStamp); +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/tuple/MessageId.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/tuple/MessageId.java b/jstorm-core/src/main/java/backtype/storm/tuple/MessageId.java index 688946d..329a4ae 100755 --- a/jstorm-core/src/main/java/backtype/storm/tuple/MessageId.java +++ b/jstorm-core/src/main/java/backtype/storm/tuple/MessageId.java @@ -29,12 +29,12 @@ import java.util.Set; public class MessageId { private Map<Long, Long> _anchorsToIds; - + @Deprecated public static long generateId() { return Utils.secureRandomLong(); } - + public static long generateId(Random rand) { return rand.nextLong(); } @@ -42,17 +42,17 @@ public class MessageId { public static MessageId makeUnanchored() { return makeId(new HashMap<Long, Long>()); } - + public static MessageId makeId(Map<Long, Long> anchorsToIds) { return new MessageId(anchorsToIds); } - + public static MessageId makeRootId(long id, long val) { Map<Long, Long> anchorsToIds = new HashMap<Long, Long>(); anchorsToIds.put(id, val); return new MessageId(anchorsToIds); } - + protected MessageId(Map<Long, Long> anchorsToIds) { _anchorsToIds = anchorsToIds; } @@ -63,8 +63,8 @@ public class MessageId { public Set<Long> getAnchors() { return _anchorsToIds.keySet(); - } - + } + @Override public int hashCode() { return _anchorsToIds.hashCode(); @@ -72,7 +72,7 @@ public class MessageId { @Override public boolean equals(Object other) { - if(other instanceof MessageId) { + if (other instanceof MessageId) { return _anchorsToIds.equals(((MessageId) other)._anchorsToIds); } else { return false; @@ -86,7 +86,7 @@ public class MessageId { public void serialize(Output out) throws IOException { out.writeInt(_anchorsToIds.size(), true); - for(Entry<Long, Long> anchorToId: _anchorsToIds.entrySet()) { + for (Entry<Long, Long> anchorToId : _anchorsToIds.entrySet()) { out.writeLong(anchorToId.getKey()); out.writeLong(anchorToId.getValue()); } @@ -95,7 +95,7 @@ public class MessageId { public static MessageId deserialize(Input in) throws IOException { int numAnchors = in.readInt(true); Map<Long, Long> anchorsToIds = new HashMap<Long, Long>(); - for(int i=0; i<numAnchors; i++) { + for (int i = 0; i < numAnchors; i++) { anchorsToIds.put(in.readLong(), in.readLong()); } return new MessageId(anchorsToIds); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/tuple/Tuple.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/tuple/Tuple.java b/jstorm-core/src/main/java/backtype/storm/tuple/Tuple.java index 34dc61a..95253df 100755 --- a/jstorm-core/src/main/java/backtype/storm/tuple/Tuple.java +++ b/jstorm-core/src/main/java/backtype/storm/tuple/Tuple.java @@ -21,38 +21,35 @@ import backtype.storm.generated.GlobalStreamId; import java.util.List; /** - * The tuple is the main data structure in Storm. A tuple is a named list of values, - * where each value can be any type. Tuples are dynamically typed -- the types of the fields - * do not need to be declared. Tuples have helper methods like getInteger and getString - * to get field values without having to cast the result. + * The tuple is the main data structure in Storm. A tuple is a named list of values, where each value can be any type. Tuples are dynamically typed -- the types + * of the fields do not need to be declared. Tuples have helper methods like getInteger and getString to get field values without having to cast the result. * - * Storm needs to know how to serialize all the values in a tuple. By default, Storm - * knows how to serialize the primitive types, strings, and byte arrays. If you want to - * use another type, you'll need to implement and register a serializer for that type. - * See {@link http://github.com/nathanmarz/storm/wiki/Serialization} for more info. + * Storm needs to know how to serialize all the values in a tuple. By default, Storm knows how to serialize the primitive types, strings, and byte arrays. If + * you want to use another type, you'll need to implement and register a serializer for that type. See {@link http + * ://github.com/nathanmarz/storm/wiki/Serialization} for more info. */ -public interface Tuple extends ITuple{ +public interface Tuple extends ITuple { /** * Returns the global stream id (component + stream) of this tuple. */ public GlobalStreamId getSourceGlobalStreamid(); - + /** * Gets the id of the component that created this tuple. */ public String getSourceComponent(); - + /** * Gets the id of the task that created this tuple. */ public int getSourceTask(); - + /** * Gets the id of the stream that this tuple was emitted to. */ public String getSourceStreamId(); - + /** * Gets the message id that associated with this tuple. */ http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/tuple/TupleExt.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/tuple/TupleExt.java b/jstorm-core/src/main/java/backtype/storm/tuple/TupleExt.java index 60676c9..8f004cc 100755 --- a/jstorm-core/src/main/java/backtype/storm/tuple/TupleExt.java +++ b/jstorm-core/src/main/java/backtype/storm/tuple/TupleExt.java @@ -17,13 +17,6 @@ */ package backtype.storm.tuple; -public interface TupleExt extends Tuple { - /** - * Get Target TaskId - * - * @return - */ - int getTargetTaskId(); +public interface TupleExt extends Tuple, ITupleExt { - void setTargetTaskId(int targetTaskId); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/tuple/TupleImpl.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/tuple/TupleImpl.java b/jstorm-core/src/main/java/backtype/storm/tuple/TupleImpl.java index 818eff1..417774e 100755 --- a/jstorm-core/src/main/java/backtype/storm/tuple/TupleImpl.java +++ b/jstorm-core/src/main/java/backtype/storm/tuple/TupleImpl.java @@ -41,31 +41,29 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed, private GeneralTopologyContext context; private MessageId id; private IPersistentMap _meta = null; - + public TupleImpl(GeneralTopologyContext context, List<Object> values, int taskId, String streamId, MessageId id) { this.values = values; this.taskId = taskId; this.streamId = streamId; this.id = id; this.context = context; - + String componentId = context.getComponentId(taskId); Fields schema = context.getComponentOutputFields(componentId, streamId); - if(values.size()!=schema.size()) { - throw new IllegalArgumentException( - "Tuple created with wrong number of fields. " + - "Expected " + schema.size() + " fields but got " + - values.size() + " fields"); + if (values.size() != schema.size()) { + throw new IllegalArgumentException("Tuple created with wrong number of fields. " + "Expected " + schema.size() + " fields but got " + values.size() + + " fields"); } } public TupleImpl(GeneralTopologyContext context, List<Object> values, int taskId, String streamId) { this(context, values, taskId, streamId, MessageId.makeUnanchored()); - } - + } + Long _processSampleStartTime = null; Long _executeSampleStartTime = null; - + public void setProcessSampleStartTime(long ms) { _processSampleStartTime = ms; } @@ -73,7 +71,7 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed, public Long getProcessSampleStartTime() { return _processSampleStartTime; } - + public void setExecuteSampleStartTime(long ms) { _executeSampleStartTime = ms; } @@ -81,13 +79,13 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed, public Long getExecuteSampleStartTime() { return _executeSampleStartTime; } - + long _outAckVal = 0; - + public void updateAckVal(long val) { _outAckVal = _outAckVal ^ val; } - + public long getAckVal() { return _outAckVal; } @@ -95,15 +93,15 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed, public int size() { return values.size(); } - + public int fieldIndex(String field) { return getFields().fieldIndex(field); } - + public boolean contains(String field) { return getFields().contains(field); } - + public Object getValue(int i) { return values.get(i); } @@ -143,8 +141,7 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed, public byte[] getBinary(int i) { return (byte[]) values.get(i); } - - + public Object getValueByField(String field) { return values.get(fieldIndex(field)); } @@ -184,11 +181,11 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed, public byte[] getBinaryByField(String field) { return (byte[]) values.get(fieldIndex(field)); } - + public List<Object> getValues() { return values; } - + public Fields getFields() { return context.getComponentOutputFields(getSourceComponent(), getSourceStreamId()); } @@ -196,37 +193,37 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed, public List<Object> select(Fields selector) { return getFields().select(selector, values); } - + public GlobalStreamId getSourceGlobalStreamid() { return new GlobalStreamId(getSourceComponent(), streamId); } - + public String getSourceComponent() { return context.getComponentId(taskId); } - + public int getSourceTask() { return taskId; } - + public String getSourceStreamId() { return streamId; } - + public MessageId getMessageId() { return id; } - + @Override public String toString() { - return "source: " + getSourceComponent() + ":" + taskId + ", stream: " + streamId + ", id: "+ id.toString() + ", " + values.toString(); + return "source: " + getSourceComponent() + ":" + taskId + ", stream: " + streamId + ", id: " + id.toString() + ", " + values.toString(); } - + @Override public boolean equals(Object other) { return this == other; - } - + } + @Override public int hashCode() { return System.identityHashCode(this); @@ -234,25 +231,25 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed, private final Keyword makeKeyword(String name) { return Keyword.intern(Symbol.create(name)); - } + } /* ILookup */ @Override public Object valAt(Object o) { try { - if(o instanceof Keyword) { + if (o instanceof Keyword) { return getValueByField(((Keyword) o).getName()); - } else if(o instanceof String) { + } else if (o instanceof String) { return getValueByField((String) o); } - } catch(IllegalArgumentException e) { + } catch (IllegalArgumentException e) { } return null; } /* Seqable */ public ISeq seq() { - if(values.size() > 0) { + if (values.size() > 0) { return new Seq(getFields().toList(), values, 0); } return null; @@ -272,7 +269,7 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed, public Seq(IPersistentMap meta, List<String> fields, List<Object> values, int i) { super(meta); - this.fields= fields; + this.fields = fields; this.values = values; assert i >= 0; this.i = i; @@ -283,16 +280,16 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed, } public ISeq next() { - if(i+1 < fields.size()) { - return new Seq(fields, values, i+1); + if (i + 1 < fields.size()) { + return new Seq(fields, values, i + 1); } return null; } public int count() { - assert fields.size() -i >= 0 : "index out of bounds"; + assert fields.size() - i >= 0 : "index out of bounds"; // i being the position in the fields of this seq, the remainder of the seq is the size - return fields.size() -i; + return fields.size() - i; } public Obj withMeta(IPersistentMap meta) { @@ -302,7 +299,7 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed, /* Indexed */ public Object nth(int i) { - if(i < values.size()) { + if (i < values.size()) { return values.get(i); } else { return null; @@ -311,7 +308,8 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed, public Object nth(int i, Object notfound) { Object ret = nth(i); - if(ret==null) ret = notfound; + if (ret == null) + ret = notfound; return ret; } @@ -319,33 +317,32 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed, public int count() { return values.size(); } - + /* IMeta */ public IPersistentMap meta() { - if(_meta==null) { - _meta = new PersistentArrayMap( new Object[] { - makeKeyword("stream"), getSourceStreamId(), - makeKeyword("component"), getSourceComponent(), - makeKeyword("task"), getSourceTask()}); + if (_meta == null) { + _meta = + new PersistentArrayMap(new Object[] { makeKeyword("stream"), getSourceStreamId(), makeKeyword("component"), getSourceComponent(), + makeKeyword("task"), getSourceTask() }); } return _meta; } private PersistentArrayMap toMap() { - Object array[] = new Object[values.size()*2]; + Object array[] = new Object[values.size() * 2]; List<String> fields = getFields().toList(); - for(int i=0; i < values.size(); i++) { - array[i*2] = fields.get(i); - array[(i*2)+1] = values.get(i); + for (int i = 0; i < values.size(); i++) { + array[i * 2] = fields.get(i); + array[(i * 2) + 1] = values.get(i); } return new PersistentArrayMap(array); } public IPersistentMap getMap() { - if(_map==null) { + if (_map == null) { setMap(toMap()); } return _map; - } - + } + } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/tuple/TupleImplExt.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/tuple/TupleImplExt.java b/jstorm-core/src/main/java/backtype/storm/tuple/TupleImplExt.java index 2e966a0..4017769 100755 --- a/jstorm-core/src/main/java/backtype/storm/tuple/TupleImplExt.java +++ b/jstorm-core/src/main/java/backtype/storm/tuple/TupleImplExt.java @@ -22,25 +22,36 @@ import java.util.List; import backtype.storm.task.GeneralTopologyContext; public class TupleImplExt extends TupleImpl implements TupleExt { - + protected int targetTaskId; - + protected long creationTimeStamp = System.currentTimeMillis(); + public TupleImplExt(GeneralTopologyContext context, List<Object> values, int taskId, String streamId) { super(context, values, taskId, streamId); } - + public TupleImplExt(GeneralTopologyContext context, List<Object> values, int taskId, String streamId, MessageId id) { super(context, values, taskId, streamId, id); } - + @Override public int getTargetTaskId() { return targetTaskId; } - + @Override public void setTargetTaskId(int targetTaskId) { this.targetTaskId = targetTaskId; } - + + @Override + public long getCreationTimeStamp() { + return creationTimeStamp; + } + + @Override + public void setCreationTimeStamp(long timeStamp) { + this.creationTimeStamp = timeStamp; + } + } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/tuple/Values.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/tuple/Values.java b/jstorm-core/src/main/java/backtype/storm/tuple/Values.java index 41bbc71..c25363b 100755 --- a/jstorm-core/src/main/java/backtype/storm/tuple/Values.java +++ b/jstorm-core/src/main/java/backtype/storm/tuple/Values.java @@ -20,17 +20,16 @@ package backtype.storm.tuple; import java.util.ArrayList; /** - * A convenience class for making tuple values using new Values("field1", 2, 3) - * syntax. + * A convenience class for making tuple values using new Values("field1", 2, 3) syntax. */ -public class Values extends ArrayList<Object>{ +public class Values extends ArrayList<Object> { public Values() { - + } - + public Values(Object... vals) { super(vals.length); - for(Object o: vals) { + for (Object o : vals) { add(o); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/BufferFileInputStream.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/BufferFileInputStream.java b/jstorm-core/src/main/java/backtype/storm/utils/BufferFileInputStream.java index 1311d6d..d9fa692 100755 --- a/jstorm-core/src/main/java/backtype/storm/utils/BufferFileInputStream.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/BufferFileInputStream.java @@ -22,7 +22,6 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.util.Arrays; - public class BufferFileInputStream { byte[] buffer; FileInputStream stream; @@ -33,15 +32,15 @@ public class BufferFileInputStream { } public BufferFileInputStream(String file) throws FileNotFoundException { - this(file, 15*1024); + this(file, 15 * 1024); } public byte[] read() throws IOException { int length = stream.read(buffer); - if(length==-1) { + if (length == -1) { close(); return new byte[0]; - } else if(length==buffer.length) { + } else if (length == buffer.length) { return buffer; } else { return Arrays.copyOf(buffer, length);
