Hello, 

I'm facing an issue with Trident and would appreciate your
expertise. 

I'm separating the tuples output by a spout into two
branches, based on a field value. Each branch features different
processing algorithms and one of my branches has a State operator (a
partitionPersist()). Then at the end I need to merge the two branches
together. This topology does not work when using a partitioning operator
after the State operator, but I don't understand why (bug ?
misunderstanding ?).

The following code is a minimal demonstrator of
the issue.
When the shuffle() line is commented out, I get all the
messages I'm expecting (i.e. the topology works as expected) : 

Stream
A after filter : [1, A]
Stream A end of branch : [1, A, computed A
value]
Merged stream : [1, A, computed A value]
Stream B after filter :
[1, B]
STREAM B END OF BRANCH : [1, B, COMPUTED B VALUE]
MERGED STREAM :
[1, B, COMPUTED B VALUE] 

But when the shuffle() line is present, the
topology only processes branch B up to the State operator. No "Stream B
end of branch" message appears : 

Stream A after filter : [1, A]
Stream
B after filter : [1, B]
Stream A end of branch : [1, A, computed A
value]
Merged stream : [1, A, computed A value]

(1 minute pause, then
Trident figures out that the micro-batch has failed and replays it)


Stream A after filter : [1, A]
Stream B after filter : [1, B]
Stream A
end of branch : [1, A, computed A value]
Merged stream : [1, A, computed
A value] 

(...) 

Tested with same results under Storm 1.2.3 and 2.0.0.


Thank you,
marc 

========================== CODE
================================ 

import java.util.List;
import
java.util.Map;

import org.apache.storm.Config;
import
org.apache.storm.LocalCluster;
import
org.apache.storm.generated.StormTopology;
import
org.apache.storm.trident.Stream;
import
org.apache.storm.trident.TridentTopology;
import
org.apache.storm.trident.operation.BaseFilter;
import
org.apache.storm.trident.operation.BaseFunction;
import
org.apache.storm.trident.operation.Consumer;
import
org.apache.storm.trident.operation.TridentCollector;
import
org.apache.storm.trident.operation.TridentOperationContext;
import
org.apache.storm.trident.state.StateUpdater;
import
org.apache.storm.trident.testing.FixedBatchSpout;
import
org.apache.storm.trident.testing.MemoryMapState;
import
org.apache.storm.trident.tuple.TridentTuple;
import
org.apache.storm.tuple.Fields;
import
org.apache.storm.tuple.Values;
import
org.apache.storm.utils.Utils;

public class JoinIssueMinimal {

   
public static void main(final String[] args) throws Exception {
       
TridentTopology topology = buildTopology();
        final String
topologyName = "JoinIssueMinimal";
        final Config conf = new
Config();
        conf.setMaxSpoutPending(1);
        final LocalCluster
cluster = new LocalCluster();
       
cluster.submitTopology(topologyName, conf, topology.build());
       
Utils.sleep(700 * 1000L);
        cluster.killTopology(topologyName);
  
    cluster.shutdown();
    }

    public static TridentTopology
buildTopology() {
        final TridentTopology topology = new
TridentTopology();

        // this spout will output 2 tuples for the
first batch (one A and one B)
        final Stream startStream =
topology.newStream("start", new FixedBatchSpout(
                new
Fields("key", "type"), 
                2,
                new Values(1,
"A"),
                new Values(1, "B")));

        // "A" branch
     
 Stream AStream = startStream
               
.filter(keepOnlyType("A"))
                .peek(printTuples("Stream A
after filter"))
                .each(addField("computed A value"), new
Fields("val"))
                .peek(printTuples("Stream A end of
branch"));
                // tuples have "key", "type" and "val"
fields

        // "B" branch
        Stream BStream = startStream
     
         .filter(keepOnlyType("B"))
               
.peek(printTuples("Stream B after filter"))
               
.partitionPersist(
                        new MemoryMapState.Factory(),

                        new Fields("key", "type"),
                    
  reemitTuples(), // state operator which simply returns the input
tuple 
                        new Fields("key", "type"))
              
.newValuesStream()
                .shuffle() // <<<<<<<<<<<<<<<<< 
when this shuffle is present, the topology no longer works
             
 .each(addField("computed B value"), new Fields("val"))
               
.parallelismHint(2) // I would like multiple instances of the above
computation (addField), therefore I need to shuffle() earlier
          
    .peek(printTuples("Stream B end of branch"));
                //
tuples have "key", "type" and "val" fields

        // merge the two
branches together
        topology.merge(AStream, BStream)
             
 .peek(printTuples("Merged stream"));

        return topology;
    }


  /** Returns a Filter which keeps only tuples with a given 'type'
field */
    static BaseFilter keepOnlyType(String t) {
        return
new BaseFilter() {
            @Override
            public boolean
isKeep(TridentTuple tuple) {
                return
t.equals(tuple.getStringByField("type"));
            }
        };
   
}

    /** Returns a Consumer which prints all tuples, along with a
given message */
    static Consumer printTuples(String msg) {
       
return new Consumer() {
            @Override
            public void
accept(TridentTuple input) {
                System.out.println(msg + "
: " + input);
            }
        };
    }

    /** Returns a function
which adds a field with a given value */
    static BaseFunction
addField(String val) {
        return new BaseFunction() {
           
@Override
            public void execute(TridentTuple tuple,
TridentCollector collector) {
                collector.emit(new
Values(val));
            }
        };
    }

    /** Returns a
StateUpdater which always returns all input tuples as-is */
    static
StateUpdater<MemoryMapState<Integer>> reemitTuples() {
        return
new StateUpdater<MemoryMapState<Integer>>() {
            @Override
    
      public void prepare(Map conf, TridentOperationContext context)
{}
            @Override
            public void cleanup() {}
          
@Override
            public void updateState(MemoryMapState<Integer>
state, List<TridentTuple> tuples, TridentCollector collector) {
        
      for (TridentTuple t : tuples) {
                   
collector.emit(t);
                }
            }
        };
    }
}

 

Reply via email to