Hi Viraj,

Thank you for your response. I have been through that page countless times,
and if you see the code I provided, I am registering the required classes
for serialization. The problem is that WorkerTopologyContext is not a
serializable class. This question is geared towards the late tuple stream.
WindowBoltExecutor
<https://github.com/apache/storm/blob/18341682ce90976c173ecf9ac68582b1626bda8a/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java#L313>
simply emits any late tuple, and BoltOutputCollectorImpl
<https://github.com/apache/storm/blob/18341682ce90976c173ecf9ac68582b1626bda8a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java#L110>
rewraps that tuple into another Tuple. Under normal circumstances, this
collector receives Values and wraps them in a Tuple, but in this case, it
wraps a late tuple (which contains WorkerTopologyContext) in another tuple.
Now, when it tries to serialize this tuple, I get the serialization error.

Can you give me an example of the smallest possible working code on how to
process late tuples?

Regards,
Jawad Tahir.

On Sat, Nov 11, 2023, 04:40 v.s kadu <virajkadu...@gmail.com> wrote:

> Hi Jawad,
> Go through with following page
>
> https://storm.apache.org/releases/current/Serialization.html
>
> Regards,
> Viraj Kadu
>
> On Sat, 11 Nov, 2023, 1:19 am Jawad Tahir, <ranajawadta...@gmail.com>
> wrote:
>
>> Hi all,
>>
>> I am developing an Apache Storm (v2.5.0) topology that reads events from
>> a spout (`BaseRichSpout`), counts the number of events in tumbling windows
>> (`BaseWindowedBolt`), and prints the count (`BaseRichBolt`). The topology
>> works fine, but there are some out-of-order events in my dataset. The
>> BaseWindowedBolt provides withLateTupleStream method to route late events
>> to a separate stream. However, when I try to process late events, I get a
>> serialization exception:
>>
>> ```
>> `Caused by: com.esotericsoftware.kryo.KryoException:
>> java.lang.IllegalArgumentException: Class is not registered:
>> org.apache.storm.shade.com.google.common.collect.SingletonImmutableBiMap
>> Note: To register this class use:
>> kryo.register(org.apache.storm.shade.com.google.common.collect.SingletonImmutableBiMap.class);
>> Serialization trace:
>> defaultResources (org.apache.storm.task.WorkerTopologyContext)
>> context (org.apache.storm.tuple.TupleImpl)
>> at
>> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101)
>> ~[kryo-4.0.2.jar:?]
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
>> ~[kryo-4.0.2.jar:?]
>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
>> ~[kryo-4.0.2.jar:?]
>> at
>> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)
>> ~[kryo-4.0.2.jar:?]
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
>> ~[kryo-4.0.2.jar:?]
>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
>> ~[kryo-4.0.2.jar:?]
>> at
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100)
>> ~[kryo-4.0.2.jar:?]
>> at
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40)
>> ~[kryo-4.0.2.jar:?]
>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:557)
>> ~[kryo-4.0.2.jar:?]
>> at
>> org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:38)
>> ~[storm-client-2.5.0.jar:2.5.0]
>> at
>> org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:40)
>> ~[storm-client-2.5.0.jar:2.5.0]
>> at
>> org.apache.storm.daemon.worker.WorkerState.checkSerialize(WorkerState.java:613)
>> ~[storm-client-2.5.0.jar:2.5.0]
>> at
>> org.apache.storm.executor.ExecutorTransfer.tryTransferLocal(ExecutorTransfer.java:101)
>> ~[storm-client-2.5.0.jar:2.5.0]
>> at
>> org.apache.storm.executor.ExecutorTransfer.tryTransfer(ExecutorTransfer.java:it66)
>> ~[storm-client-2.5.0.jar:2.5.0]
>> at
>> org.apache.storm.executor.LocalExecutor$1.tryTransfer(LocalExecutor.java:36)
>> ~[storm-client-2.5.0.jar:2.5.0]
>> at
>> org.apache.storm.executor.bolt.BoltOutputCollectorImpl.boltEmit(BoltOutputCollectorImpl.java:112)
>> ~[storm-client-2.5.0.jar:2.5.0]
>> at
>> org.apache.storm.executor.bolt.BoltOutputCollectorImpl.emit(BoltOutputCollectorImpl.java:65)
>> ~[storm-client-2.5.0.jar:2.5.0]
>> at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93)
>> ~[storm-client-2.5.0.jar:2.5.0]
>> at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93)
>> ~[storm-client-2.5.0.jar:2.5.0]
>> at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:42)
>> ~[storm-client-2.5.0.jar:2.5.0]
>> at
>> org.apache.storm.topology.WindowedBoltExecutor.execute(WindowedBoltExecutor.java:313)
>> ~[storm-client-2.5.0.jar:2.5.0]
>> at
>> org.apache.storm.executor.bolt.BoltExecutor.tupleActionFn(BoltExecutor.java:212)
>> ~[storm-client-2.5.0.jar:2.5.0]
>> at org.apache.storm.executor.Executor.accept(Executor.java:294)
>> ~[storm-client-2.5.0.jar:2.5.0]
>> ... 6 more`
>> ```
>>
>> I have defined my topology as below:
>>
>> ```
>> public class TestTopology {
>>     public static void main (String[] args) throws Exception {
>>         Config config = new Config();
>>         config.put(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE, true);
>>         config.registerSerialization(TupleImpl.class);
>>         config.registerSerialization(Fields.class);
>>
>>         LocalCluster cluster = new LocalCluster();
>>         try (LocalCluster.LocalTopology topology =
>> cluster.submitTopology("testTopology", config,
>> getTopology().createTopology())) {
>>             Thread.sleep(50000);}
>>         cluster.shutdown();
>>     }
>>     static TopologyBuilder getTopology(){
>>         TopologyBuilder builder = new TopologyBuilder();
>>         builder.setSpout("eventSpout", new LateEventSpout());
>>         builder.setBolt("windowBolt", new
>> WindowBolt().withTumblingWindow(BaseWindowedBolt.Duration.seconds(10)).
>>                         withTimestampField("time").
>>                         withLateTupleStream("lateEvents")).
>>                         shuffleGrouping("eventSpout");
>>         builder.setBolt("latePrintBolt", new LatePrintBolt()).
>>                         shuffleGrouping("windowBolt", "lateEvents");
>>         builder.setBolt("printBolt", new
>> PrintBolt()).shuffleGrouping("windowBolt");
>>         return builder;
>>     }
>> }
>> ```
>> Where `LateEventSpout` is
>>
>> ```
>> public class LateEventSpout extends BaseRichSpout {
>>     private SpoutOutputCollector collector;
>>     private List<Long> eventTimes;
>>     private int currentTime = 0;
>>     private int id = 1;
>>     public LateEventSpout () {
>>         eventTimes = new ArrayList<>();
>>         for (int i = 1; i<= 61; i++) {
>>             eventTimes.add(Instant.EPOCH.plusSeconds(i).toEpochMilli());
>>         } // [epoch+1, epoch+2, .., epoch+61]
>>     }
>>     @Override
>>     public void open(Map<String, Object> conf, TopologyContext context,
>> SpoutOutputCollector collector) {
>>         this.collector = collector;
>>     }
>>     @Override
>>     public void nextTuple() {
>>         int eventId = id++;
>>         Long eventTime = eventTimes.get(currentTime++);
>>         if (currentTime == eventTimes.size()){
>>             currentTime = 0;      // reset time to zero so we have OOO
>> events
>>         }
>>         collector.emit(new Values(eventId, eventTime));
>>     }
>>     @Override
>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>         declarer.declare(new Fields("id", "time"));
>>     }
>> }
>> ```
>> And `WindowBolt` is:
>>
>> ```
>> public class WindowBolt extends BaseWindowedBolt {
>>     OutputCollector collector;
>>     @Override
>>     public void prepare(Map<String, Object> topoConf, TopologyContext
>> context, OutputCollector collector){
>>         this.collector = collector;
>>     }
>>     @Override
>>     public void execute(TupleWindow inputWindow) {
>>         int sum = 0;
>>         for (Tuple event : inputWindow.get()){
>>             sum++;
>>         }
>>         collector.emit(new Values(inputWindow.getStartTimestamp(),
>> inputWindow.getEndTimestamp(), sum));
>>     }
>>     @Override
>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>         declarer.declare(new Fields("start", "end", "sum"));
>>     }
>> }
>> ```
>> And `PrintBolt` just prints the `windowBolt` output. (`LatePrintBolt` is
>> similar)
>>
>> ```
>> public class PrintBolt extends BaseRichBolt {
>>     @Override
>>     public void prepare(Map<String, Object> topoConf, TopologyContext
>> context, OutputCollector collector) {
>>     }
>>     @Override
>>     public void execute(Tuple input) {
>>         System.out.println(String.format("Start: %d, End: %d, Sum:%d",
>> input.getLongByField("start"), input.getLongByField("end"),
>> input.getIntegerByField("sum")));
>>     }
>>     @Override
>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>     }
>> }
>> ```
>> If I don't set the `LatePrintBolt` in `TopologyBuilder`, I get the
>> correct results.
>>
>> ```
>> Start: 0, End: 10000, Sum:10
>> Start: 10000, End: 20000, Sum:10
>> Start: 20000, End: 30000, Sum:10
>> Start: 30000, End: 40000, Sum:10
>> Start: 40000, End: 50000, Sum:10
>> Start: 50000, End: 60000, Sum:10
>> ```
>> However, when I try to print lateEvents stream, I get the same output but
>> on the first late event, I get the above-mentioned exception.
>>
>> I have debugged the issue. When [WindowedBoltExecutor](
>> https://github.com/apache/storm/blob/18341682ce90976c173ecf9ac68582b1626bda8a/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java#L313)
>> receives a late tuple, it emits the late tuple but
>> [BoltOutputCollectorImpl](
>> https://github.com/apache/storm/blob/18341682ce90976c173ecf9ac68582b1626bda8a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java#L110)
>> rewraps it in a new Tuple. Now, this new tuple contains
>> `WorkerTopologyContext` which is not serializable, hence, the error.
>>
>> I would like to know how I can process the late tuples.
>>
>> Regards,
>> Jawad Tahir.
>>
>

Reply via email to