Thanks for reply. Please find sudo code below. We are fetching Dstreams from
kinesis stream for every 10sec and performing transformations and finally
persisting to hbase tables using batch insertions.

dStream = dStream.repartition(jssc.defaultMinPartitions() * 3);
dStream.foreachRDD(javaRDD -> javaRDD.foreachPartition(partitionOfRecords ->
{
                Connection hbaseConnection =
ConnectionUtil.getHbaseConnection();
                List<byte[]> listOfRecords = new ArrayList<>();
                while (partitionOfRecords.hasNext()) {
                    try {
                        listOfRecords.add(partitionOfRecords.next());

                        if (listOfRecords.size() < 10 &&
partitionOfRecords.hasNext())
                            continue;

                        List<byte[]> finalListOfRecords = listOfRecords;
                        doJob(finalListOfRecords, primaryConnection,
lookupsConnection);
                        listOfRecords = new ArrayList<>();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            })); 

We are batching every 10 records and sending to doJob method where actual
transformations happen and every batch will get batch inserted to hbase
table.

With above code can we guess whats happening at Job 1 => 6 tasks and how to
reduce that time. 
Mainly how to effectively set parallelism avoiding repartition() method.

Thanks in Advance.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to