I am trying to run a spark job which performs some database operations and saves passed records in one table and the failed ones in another.
Here is the code for the same: ``` log.info("Starting the spark job {}"); String sparkAppName = generateSparkAppName("reading-graph"); SparkConf sparkConf = getSparkConf(sparkAppName); SparkSession sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate(); LocalDate startDate = LocalDate.of(2022, 1, 1); final LongAccumulator accumulator = sparkSession.sparkContext().longAccumulator("Successful Processed Count"); LocalDate endDate = startDate.plusDays(90); Dataset<Row> rows = sparkSession.table("db.graph_details") .select(new Column("yyyy_mm_dd"), new Column("timestamp"), new Column("email_id"))) .filter("yyyy_mm_dd >= '" + startDate + "' AND yyyy_mm_dd < '" + endDate); Dataset<Tuple2<Boolean, String>> tuple2Dataset = rows.mapPartitions(new GetPaymentsGraphFeatures(accumulator), Encoders.tuple(Encoders.BOOLEAN(), Encoders.STRING())); tuple2Dataset.persist(); Dataset<Row> successfulRows = tuple2Dataset.filter((FilterFunction<Tuple2<Boolean, String>>) booleanRowTuple2 -> booleanRowTuple2._1).map( (MapFunction<Tuple2<Boolean, String>, Row>) booleanRowTuple2 -> mapToRow(booleanRowTuple2._2), RowEncoder.apply(getSchema())); Dataset<Row> failedRows = tuple2Dataset.filter((FilterFunction<Tuple2<Boolean, String>>) booleanRowTuple2 -> !booleanRowTuple2._1).map( (MapFunction<Tuple2<Boolean, String>, Row>) booleanRowTuple2 -> mapToRow(booleanRowTuple2._2), RowEncoder.apply(getFailureSchema())); successfulRows.write().mode("overwrite").saveAsTable("db.deepak_jan_result"); failedRows.write().mode("overwrite").saveAsTable("db.deepak_jan_result_failures"); tuple2Dataset.unpersist(); log.info("Completed the spark job"); ``` The spark job is running the mapPartitions twice, once to get the successfulRows and once to get the failedRows. But ideally the mapPartitions should be run once right ? My job to process the successful action takes more than 1 hour. Can that be causing this behaviour ? How can I ensure that the map partitions run only once ? --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org