Hello all,

We are having a major performance issue with the Spark, which is holding us
from going live.

We have a job that carries out computation on log files and write the
results into Oracle DB.

The reducer 'reduceByKey'  have been set to parallelize by 4 as we don't
want to establish too many DB connections.

We are then calling the foreachPartition on the RDD pairs that were reduced
by the key. Within this foreachPartition method we establish DB connection,
then iterate the results, prepare the Oracle statement for batch insertion
then we commit the batch and close the connection. All these are working
fine.

However, when we execute the job to process 12GB of data, it takes forever
to complete, especially at the foreachPartition stage.

We submitted the job with 6 executors, 2 cores, and 6GB memory of which 0.3
is assigned to spark.storage.memoryFraction.

The job is taking about 50 minutes to complete, which is not ideal. I'm not
sure how we could enhance the performance. I've provided the main body of
the codes, please take a look and advice:

>From Driver:

reduceResultsRDD.foreachPartition(new DB.InsertFunction(
dbuser,dbpass,batchsize));


DB class:

public class DB {
private static final Logger logger = LoggerFactory
.getLogger(DB.class);
public static class InsertFunction implements
VoidFunction<Iterator<Tuple2<String, String>>> {

private static final long serialVersionUID = 999955766876878L;
private String dbuser = "";
private String dbpass = "";
private int batchsize;

public InsertFunction(String dbuser, String dbpass, int batchsize) {
super();
this.dbuser = dbuser;
this.dbuser = dbuser;
this.batchsize=batchsize;
}

@Override
public void call(Iterator<Tuple2<String, String>> results) {
Connection connect = null;
PreparedStatement pstmt = null;
try {
connect = getDBConnection(dbuser,
dbpass);

int count = 0;

if (batchsize <= 0) {
batchsize = 10000;
}

pstmt1 = connect
.prepareStatement("MERGE INTO SOME TABLE IF RECORD FOUND, IF NOT INSERT");

while (results.hasNext()) {

Tuple2<String, String> kv = results.next();
 String [] data = kv._1.concat("," +kv._2).split(",");

 pstmt.setString(1, data[0].toString());
pstmt.setString(2, data[1].toString());
.....

pstmt.addBatch();

count++;

if (count == batchsize) {
logger.info("BulkCount : " + count);
pstmt.executeBatch();
connect.commit();
count = 0;
}

pstmt.executeBatch();
connect.commit();

}

pstmt.executeBatch();
connect.commit();

} catch (Exception e) {
logger.error("InsertFunction error: " + e.getMessage());
} finally {

if (pstmt != null) {
pstmt.close();
}

try {
 connect.close();
} catch (SQLException e) {
logger.error("InsertFunction Connection Close error: "
+ e.getMessage());
}
}
}

}
}

Reply via email to