Hi, all:
according to http://storm.apache.org/documentation/Trident-API-Overview.html
A list of all non-join fields from all streams, in order of how the streams
were passed to the join method
if your code is :
Stream datatypeStream = inputStream.each(new Fields("datatypeId"), new
DatatypeLoader(), new Fields("datatype")).project(new Fields("messageId",
"datatype"));
Stream providerStream = inputStream.each(new Fields("providerId"), new
ProviderLoader(), new Fields("provider")).project(new Fields("messageId",
"provider"));
// Join both streams on original 'messageId'
builder.join(providerStream, new Fields("messageId"),
datatypeStream, new Fields("messageId"),
new Fields("messageId", "datatype", "provider")).
each(new Fields("datatype", "provider"), new Print()); // Print the result
The "datatype" correspond to "provider" from providerStream, the "provider"
correspond to "dataType" from datatypeStream. Maybe it's a bug in your
application...
If I am wrong, please correct me , thx ~~
2014-10-17
shengyi.pan
发件人:Thiago Souza <[email protected]>
发送时间:2014-10-17 04:55
主题:Re: Trying to learn Storm
收件人:"user"<[email protected]>
抄送:
Got it! Thanks! :)
On Thu, Oct 16, 2014 at 4:26 PM, Chin Huang <[email protected]> wrote:
The tuples emitted from the join must have these fields:
1. First, the fields you are joining on.
2. Next, the other fields, in the same order as the streams passed to the join
method and in the same order as the input tuple fields.
For your example, the join output tuple fields would be
new Fields("messageId", "datatype", "provider")
On Thu, Oct 16, 2014 at 7:25 AM, Thiago Souza <[email protected]> wrote:
I'm a Storm newbie trying to learn it. Could someone please help me? Consider
the following snippet:
TridentTopology builder = new TridentTopology();
// Random data generated with a random 'messageId'
Stream inputStream = builder.newStream("data-supply", new
RandomDataGenerator() /* this extends BaseRichSpout */);
// Load 'Datatype' referenced by datatypeId from external source
Stream datatypeStream = inputStream.each(new Fields("datatypeId"), new
DatatypeLoader(), new Fields("datatype")).
project(new Fields("messageId", "datatype"));
// Load 'Provider' referenced by providerId from external source
Stream providerStream = inputStream.each(new Fields("providerId"), new
ProviderLoader(), new Fields("provider")).
project(new Fields("messageId", "provider"));
// Join both streams on original 'messageId'
builder.join(providerStream, new Fields("messageId"),
datatypeStream, new Fields("messageId"),
new Fields("datatype", "provider")).
each(new Fields("datatype", "provider"), new Print()); // Print the result
I expected to get a Tuple with both 'Datatype' and 'Provider' previoulsy
loaded, but what I get is a Tuple with either one, depending on which is the
first argument of the 'join' method (in this case 'providerStream').
Am I doing something wrong or maybe with a misconcept of how storm works?
Thanks,
Thiago Souza