Hello, I'm facing an issue with Trident and would appreciate your expertise.
I'm separating the tuples output by a spout into two branches, based on a field value. Each branch features different processing algorithms and one of my branches has a State operator (a partitionPersist()). Then at the end I need to merge the two branches together. This topology does not work when using a partitioning operator after the State operator, but I don't understand why (bug ? misunderstanding ?). The following code is a minimal demonstrator of the issue. When the shuffle() line is commented out, I get all the messages I'm expecting (i.e. the topology works as expected) : Stream A after filter : [1, A] Stream A end of branch : [1, A, computed A value] Merged stream : [1, A, computed A value] Stream B after filter : [1, B] STREAM B END OF BRANCH : [1, B, COMPUTED B VALUE] MERGED STREAM : [1, B, COMPUTED B VALUE] But when the shuffle() line is present, the topology only processes branch B up to the State operator. No "Stream B end of branch" message appears : Stream A after filter : [1, A] Stream B after filter : [1, B] Stream A end of branch : [1, A, computed A value] Merged stream : [1, A, computed A value] (1 minute pause, then Trident figures out that the micro-batch has failed and replays it) Stream A after filter : [1, A] Stream B after filter : [1, B] Stream A end of branch : [1, A, computed A value] Merged stream : [1, A, computed A value] (...) Tested with same results under Storm 1.2.3 and 2.0.0. Thank you, marc ========================== CODE ================================ import java.util.List; import java.util.Map; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.generated.StormTopology; import org.apache.storm.trident.Stream; import org.apache.storm.trident.TridentTopology; import org.apache.storm.trident.operation.BaseFilter; import org.apache.storm.trident.operation.BaseFunction; import org.apache.storm.trident.operation.Consumer; import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.operation.TridentOperationContext; import org.apache.storm.trident.state.StateUpdater; import org.apache.storm.trident.testing.FixedBatchSpout; import org.apache.storm.trident.testing.MemoryMapState; import org.apache.storm.trident.tuple.TridentTuple; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; public class JoinIssueMinimal { public static void main(final String[] args) throws Exception { TridentTopology topology = buildTopology(); final String topologyName = "JoinIssueMinimal"; final Config conf = new Config(); conf.setMaxSpoutPending(1); final LocalCluster cluster = new LocalCluster(); cluster.submitTopology(topologyName, conf, topology.build()); Utils.sleep(700 * 1000L); cluster.killTopology(topologyName); cluster.shutdown(); } public static TridentTopology buildTopology() { final TridentTopology topology = new TridentTopology(); // this spout will output 2 tuples for the first batch (one A and one B) final Stream startStream = topology.newStream("start", new FixedBatchSpout( new Fields("key", "type"), 2, new Values(1, "A"), new Values(1, "B"))); // "A" branch Stream AStream = startStream .filter(keepOnlyType("A")) .peek(printTuples("Stream A after filter")) .each(addField("computed A value"), new Fields("val")) .peek(printTuples("Stream A end of branch")); // tuples have "key", "type" and "val" fields // "B" branch Stream BStream = startStream .filter(keepOnlyType("B")) .peek(printTuples("Stream B after filter")) .partitionPersist( new MemoryMapState.Factory(), new Fields("key", "type"), reemitTuples(), // state operator which simply returns the input tuple new Fields("key", "type")) .newValuesStream() .shuffle() // <<<<<<<<<<<<<<<<< when this shuffle is present, the topology no longer works .each(addField("computed B value"), new Fields("val")) .parallelismHint(2) // I would like multiple instances of the above computation (addField), therefore I need to shuffle() earlier .peek(printTuples("Stream B end of branch")); // tuples have "key", "type" and "val" fields // merge the two branches together topology.merge(AStream, BStream) .peek(printTuples("Merged stream")); return topology; } /** Returns a Filter which keeps only tuples with a given 'type' field */ static BaseFilter keepOnlyType(String t) { return new BaseFilter() { @Override public boolean isKeep(TridentTuple tuple) { return t.equals(tuple.getStringByField("type")); } }; } /** Returns a Consumer which prints all tuples, along with a given message */ static Consumer printTuples(String msg) { return new Consumer() { @Override public void accept(TridentTuple input) { System.out.println(msg + " : " + input); } }; } /** Returns a function which adds a field with a given value */ static BaseFunction addField(String val) { return new BaseFunction() { @Override public void execute(TridentTuple tuple, TridentCollector collector) { collector.emit(new Values(val)); } }; } /** Returns a StateUpdater which always returns all input tuples as-is */ static StateUpdater<MemoryMapState<Integer>> reemitTuples() { return new StateUpdater<MemoryMapState<Integer>>() { @Override public void prepare(Map conf, TridentOperationContext context) {} @Override public void cleanup() {} @Override public void updateState(MemoryMapState<Integer> state, List<TridentTuple> tuples, TridentCollector collector) { for (TridentTuple t : tuples) { collector.emit(t); } } }; } }