Hi, all:

according to http://storm.apache.org/documentation/Trident-API-Overview.html

 A list of all non-join fields from all streams, in order of how the streams 
were passed to the join method

if your code is :

Stream datatypeStream = inputStream.each(new Fields("datatypeId"), new 
DatatypeLoader(), new Fields("datatype")).project(new Fields("messageId", 
"datatype"));


Stream providerStream = inputStream.each(new Fields("providerId"), new 
ProviderLoader(), new Fields("provider")).project(new Fields("messageId", 
"provider"));
     
// Join both streams on original 'messageId'
builder.join(providerStream, new Fields("messageId"),
      datatypeStream, new Fields("messageId"),
                           new Fields("messageId", "datatype", "provider")).    
             
     each(new Fields("datatype", "provider"), new Print()); // Print the result

The "datatype" correspond to "provider" from providerStream,  the "provider" 
correspond to "dataType" from datatypeStream.  Maybe it's a bug in your 
application...

If I am wrong, please correct me , thx ~~


2014-10-17



shengyi.pan



发件人:Thiago Souza <[email protected]>
发送时间:2014-10-17 04:55
主题:Re: Trying to learn Storm
收件人:"user"<[email protected]>
抄送:

Got it! Thanks! :)


On Thu, Oct 16, 2014 at 4:26 PM, Chin Huang <[email protected]> wrote:

The tuples emitted from the join must have these fields:


1. First, the fields you are joining on.
2. Next, the other fields, in the same order as the streams passed to the join 
method and in the same order as the input tuple fields.


For your example, the join output tuple fields would be


    new Fields("messageId", "datatype", "provider")


On Thu, Oct 16, 2014 at 7:25 AM, Thiago Souza <[email protected]> wrote:

I'm a Storm newbie trying to learn it. Could someone please help me? Consider 
the following snippet:


     TridentTopology builder = new TridentTopology();

        // Random data generated with a random 'messageId'   
     Stream inputStream = builder.newStream("data-supply", new 
RandomDataGenerator() /* this extends BaseRichSpout */);


        // Load 'Datatype' referenced by datatypeId from external source     
     Stream datatypeStream = inputStream.each(new Fields("datatypeId"), new 
DatatypeLoader(), new Fields("datatype")).
                                                                      
project(new Fields("messageId", "datatype"));


        // Load 'Provider' referenced by providerId from external source     
     Stream providerStream = inputStream.each(new Fields("providerId"), new 
ProviderLoader(), new Fields("provider")).
                                                                      
project(new Fields("messageId", "provider"));
     
// Join both streams on original 'messageId'
        builder.join(providerStream, new Fields("messageId"),

      datatypeStream, new Fields("messageId"),
                           new Fields("datatype", "provider")).                 
     each(new Fields("datatype", "provider"), new Print()); // Print the result




I expected to get a Tuple with both 'Datatype' and 'Provider' previoulsy 
loaded, but what I get is a Tuple with either one, depending on which is the 
first argument of the 'join' method (in this case 'providerStream').


Am I doing something wrong or maybe with a misconcept of how storm works?


Thanks,
Thiago Souza

Reply via email to