Thanks for the reply.

Yes that's what I wanted to achieve.

On Sun, 8 Oct 2017 at 13:26 Stig Rohde Døssing <s...@apache.org> wrote:

> Hi Noppanit,
>
> Just to make sure we're talking about the same thing, you want to send
> from OriginBolt tuples where _c="a" to e.g. BoltOne and tuples where _c="b"
> to BoltTwo, where BoltOne and BoltTwo are two different bolt
> implementations.
>
> You need to make your bolt define multiple output streams, e.g. "stream1",
> "stream2". You then need to make it emit _c="a" to "stream1" and _c="b" to
> "stream2". When you build your topology, you then make BoltOne listen to
> "stream1" and BoltTwo listen to "stream2".
>
> There's some good example code here
> https://stackoverflow.com/questions/19807395/how-would-i-split-a-stream-in-apache-storm
>
> 2017-10-08 18:48 GMT+02:00 Noppanit Charassinvichai <noppani...@gmail.com>
> :
>
>> Hi,
>>
>> I have a Storm cluster connecting to Kinesis Stream. The message looks
>> like this.
>>
>> {
>> _c: "a"
>> }
>>
>> or it should be
>>
>> {
>> _c: "b"
>> }
>>
>> I would like to send a tuple with _c="a" to one bolt and _c="b" to a
>> different bolt. How do I achieve this?
>>
>> This is the bolt that parsing the message from Kinesis to JSON Object
>> using GSon
>>
>>     @Override
>>     public void execute(Tuple tuple) {
>>       String partitionKey = (String)
>> tuple.getValueByField(SampleKinesisRecordScheme.FIELD_PARTITION_KEY);
>>       String sequenceNumber = (String)
>> tuple.getValueByField(SampleKinesisRecordScheme.FIELD_SEQUENCE_NUMBER);
>>       byte[] payload = (byte[])
>> tuple.getValueByField(SampleKinesisRecordScheme.FIELD_RECORD_DATA);
>>
>>       ByteBuffer buffer = ByteBuffer.wrap(payload);
>>       String data = null;
>>       try {
>>         data = decoder.decode(buffer).toString();
>>
>>         HashMap < String, String > map = new Gson().fromJson(data, new
>> TypeToken < HashMap < String, Object >> () {}.getType());
>>
>>         this.outputCollector.emit(tuple, new Values(map));
>>         this.outputCollector.ack(tuple);
>>
>>       } catch (CharacterCodingException e) {
>>         this.outputCollector.fail(tuple);
>>       }
>>
>>     }
>>
>> Thanks
>>
>
>

Reply via email to