Hi,

Below my code snippet where I am using checkpointing feature of spark
streaming. The  SPARK_DURATION that I am using is 5 minutes and the
 batch duration is 15 minutes. I am checkpointing the data at each
SPARK_DURATION (5 minutes). When I kill the job and start the next batch
 it takes longer time than the usual. The normal time is approx 2.5 minutes
and on killing the job and restarting, it takes around
 4.5 minutes or more.


In the stage information I can see that mapToPair (in mapAndReduce
function) is called thrice, for each 5 minute window. Is it correct to
calculate these again if I already have checkpointed the previous rdds or I
am missing something ?

Also, do I need to checkpoint kafkaStreamRdd and advDataObjectsRdd when I
am already checkpointing advDashboardAggKeyVsMetricRdd.

Please let me know if any other information is required. I am using spark
1.4.0


JavaPairDStream<String, String> kafkaStreamRdd =
KafkaConnector.getKafkaStream(jsc);
JavaPairDStream<String, String> kafkaStream = null;

if(CommandLineArguments.DO_REPARTITION_OF_RAW_STREAM_NB){
    kafkaStream =
kafkaStreamRdd.repartition(CommandLineArguments.FINAL_SPARK_PARTITIONS_OF_RAW_STREAM_NB);
}else {
    kafkaStream = kafkaStreamRdd;
}

kafkaStreamRdd.checkpoint(new
Duration(CommandLineArguments.SPARK_DURATION));

JavaPairDStream<String, AdLog> filteredDataObjectsRdd =
FilterInvalidAdlog.kafkaStreamToAdLogMapper(kafkaStream);
filteredDataObjectsRdd.checkpoint(new
Duration(CommandLineArguments.SPARK_DURATION));


JavaDStream<AdLog> advDataObjectsRdd =
AdvAggregation.kafkaStreamToAdLogMapper(filteredDataObjectsRdd);
advDataObjectsRdd.checkpoint(new
Duration(CommandLineArguments.SPARK_DURATION));



JavaPairDStream<String, AggregationMetrics> advDashboardAggKeyVsMetricRdd =
AdvDashboardV1.mapAndReduce(advDataObjectsRdd);   //mapToPair applied
inside mapAndReduce
advDashboardAggKeyVsMetricRdd.checkpoint(new
Duration(CommandLineArguments.SPARK_DURATION));
JavaDStream<AggAdvDashboardStats> advDashboardAggDataRdd =
AdvDashboardV1.cassandraOutputRowMapper(advDashboardAggKeyVsMetricRdd,
CommandLineArguments.SPARK_BATCH_DURATION_NB);



Thanks !!
Kundan

Reply via email to