We are building a Storm streaming job with many components and are seeing
some off behavior with the parallelismHint assignments. It is my
understanding that the parallelismHint will be applied to and functions up
to where the hint is defined. However that does not seem to be the case. We
are seeing the the hint is applied in reverse order and sometimes not
applied at all.

For the newSteam with Kafka spout, this is set to the number of partitions
for the kafka topic.
We then usually see the recordFilter section using the same parallelism as
we have set for the topic partitions.
extratLogData is then set to the parallelismHint used for the
getUserScoreData section and the getUserScoreData logic is set tot he
parallelismHint used for extractLogData.

Hopefully this makes sense. Also, any looking for any general tuning advice
for streaming in Storm 1.1.0 would be appreciated. It appears many knobs
have changed between Storm 0.10.0 and Storm 1.1.0.

Any help would be appreciated.

{Code}

ITridentDataSource tridentSpout =
SpoutsAndBolts.getKafkaTridentSpout(topologyProperties,consumerId);
TridentKafkaStateFactory stateFactory =
SpoutsAndBolts.getKafkaTridentProducerBolt(topologyProperties);

final TridentTopology tridentTopology = new TridentTopology();
final Stream spoutStream = tridentTopology.newStream("kafkaSpout",
tridentSpout).name("KafkaSpout").localOrShuffle().parallelismHint(4);

spoutStream.each(new Fields("logData") , new
FilterLogs()).name("recordFilter").parallelismHint(8) //filters data
from the logs for just records we want
  .localOrShuffle()
  .each(new Fields("logData"), new ProcessLogData() , new
Fields("logExtractData")).name("extractLogData").parallelismHint(8) //
extracts 3 fields from the log  record
  .localOrShuffle()
  .each(new Fields("logExtractData"), new GetUserScoreMetricData(),
new Fields("UserScoreData")).name("getUserScoreData").parallelismHint(16)
// Looks up data in Hbase/Phoenix
  .localOrShuffle()
  .each(new Fields("UserScoreData"),new PushToPhoenixTable(), new
Fields("adtrkLogData")).name("pushTOPhoenix").parallelismHint(16) //
Pushes data to Hbase/Phoenix
  .localOrShuffle()
  .each(new Fields("adtrkLogData"), new
FilterBwUserId()).name("filterOnBwUserId").parallelismHint(4) //
Filters for records w/out null values
  .localOrShuffle()
  .each(new Fields("adtrkLogData"), new GetKensisObjectJson(), new
Fields("key", "kensisObjectJson")).name("createKensisRecord") //
persists the data back to another Kafka topic
  .localOrShuffle().partitionPersist(stateFactory,new
Fields("key","kensisObjectJson") ,new TridentKafkaUpdater(), new
Fields()).parallelismHint(8);

return tridentTopology.build();


{code}

Reply via email to