I am trying to run a spark job which performs some database operations
and saves passed records in one table and the failed ones in another.

Here is the code for the same:

```
log.info("Starting the spark job {}");

String sparkAppName = generateSparkAppName("reading-graph");
SparkConf sparkConf = getSparkConf(sparkAppName);
SparkSession sparkSession =
SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate();

LocalDate startDate = LocalDate.of(2022, 1, 1);
final LongAccumulator accumulator =
sparkSession.sparkContext().longAccumulator("Successful Processed
Count");
LocalDate endDate = startDate.plusDays(90);

Dataset<Row> rows = sparkSession.table("db.graph_details")
    .select(new Column("yyyy_mm_dd"), new Column("timestamp"), new
Column("email_id")))
    .filter("yyyy_mm_dd >= '" + startDate + "' AND yyyy_mm_dd < '" + endDate);

Dataset<Tuple2<Boolean, String>> tuple2Dataset =
rows.mapPartitions(new GetPaymentsGraphFeatures(accumulator),
    Encoders.tuple(Encoders.BOOLEAN(), Encoders.STRING()));
tuple2Dataset.persist();

Dataset<Row> successfulRows =
tuple2Dataset.filter((FilterFunction<Tuple2<Boolean, String>>)
booleanRowTuple2 -> booleanRowTuple2._1).map(
    (MapFunction<Tuple2<Boolean, String>, Row>) booleanRowTuple2 ->
mapToRow(booleanRowTuple2._2), RowEncoder.apply(getSchema()));

Dataset<Row> failedRows =
tuple2Dataset.filter((FilterFunction<Tuple2<Boolean, String>>)
booleanRowTuple2 -> !booleanRowTuple2._1).map(
    (MapFunction<Tuple2<Boolean, String>, Row>) booleanRowTuple2 ->
mapToRow(booleanRowTuple2._2), RowEncoder.apply(getFailureSchema()));

successfulRows.write().mode("overwrite").saveAsTable("db.deepak_jan_result");
failedRows.write().mode("overwrite").saveAsTable("db.deepak_jan_result_failures");
tuple2Dataset.unpersist();
log.info("Completed the spark job");
```

The spark job is running the mapPartitions twice, once to get the
successfulRows and once to get the failedRows. But ideally the
mapPartitions should be run once right ?

My job to process the successful action takes more than 1 hour. Can
that be causing this behaviour ?

How can I ensure that the map partitions run only once ?

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to