I am trying to run a spark job which performs some database operations
and saves passed records in one table and the failed ones in another.
Here is the code for the same:
```
log.info("Starting the spark job {}");
String sparkAppName = generateSparkAppName("reading-graph");
SparkConf sparkConf = getSparkConf(sparkAppName);
SparkSession sparkSession =
SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate();
LocalDate startDate = LocalDate.of(2022, 1, 1);
final LongAccumulator accumulator =
sparkSession.sparkContext().longAccumulator("Successful Processed
Count");
LocalDate endDate = startDate.plusDays(90);
Dataset<Row> rows = sparkSession.table("db.graph_details")
.select(new Column("yyyy_mm_dd"), new Column("timestamp"), new
Column("email_id")))
.filter("yyyy_mm_dd >= '" + startDate + "' AND yyyy_mm_dd < '" + endDate);
Dataset<Tuple2<Boolean, String>> tuple2Dataset =
rows.mapPartitions(new GetPaymentsGraphFeatures(accumulator),
Encoders.tuple(Encoders.BOOLEAN(), Encoders.STRING()));
tuple2Dataset.persist();
Dataset<Row> successfulRows =
tuple2Dataset.filter((FilterFunction<Tuple2<Boolean, String>>)
booleanRowTuple2 -> booleanRowTuple2._1).map(
(MapFunction<Tuple2<Boolean, String>, Row>) booleanRowTuple2 ->
mapToRow(booleanRowTuple2._2), RowEncoder.apply(getSchema()));
Dataset<Row> failedRows =
tuple2Dataset.filter((FilterFunction<Tuple2<Boolean, String>>)
booleanRowTuple2 -> !booleanRowTuple2._1).map(
(MapFunction<Tuple2<Boolean, String>, Row>) booleanRowTuple2 ->
mapToRow(booleanRowTuple2._2), RowEncoder.apply(getFailureSchema()));
successfulRows.write().mode("overwrite").saveAsTable("db.deepak_jan_result");
failedRows.write().mode("overwrite").saveAsTable("db.deepak_jan_result_failures");
tuple2Dataset.unpersist();
log.info("Completed the spark job");
```
The spark job is running the mapPartitions twice, once to get the
successfulRows and once to get the failedRows. But ideally the
mapPartitions should be run once right ?
My job to process the successful action takes more than 1 hour. Can
that be causing this behaviour ?
How can I ensure that the map partitions run only once ?
---------------------------------------------------------------------
To unsubscribe e-mail: [email protected]