Hi,I'm new to spark, and am facing a peculiar problem.
I'm writing a simple Java Driver program where i'm creating Key / Value data
structure and collecting them, once created. The problem i'm facing is that,
when i increase the iterations of a for loop which creates the ArrayList of
Long Values which i have to put into the Key / Value data structure and save
in Spark as a Java Collection, the serialized size of tasks also increases
proportionately. 
e.g:  for Loop count : 10                    Task Size : 1120 bytes       
for Loop Count : 10000              Task Size : 33402 bytes        for Loop
Count : 10000000         Task Size : 453434 bytes   etc.
I'm not able to understand why Task size increases, i tried to run the same
example via Spark Shell, and i noticed the Task size remains the same,
irrespective of the loop iteration count.
Code :
@Override
                
                public void execute() {
                        // do something                 
List numbers = new ArrayList();
                                                
JavaRDD distData = null;
                                                
JavaPairRDD<String, Long> mapOfKeys = null;
                                                
JavaRDD keysRDD = null;
                                                
class ByKeyImpl implements Function<Long, String>, Serializable {
                                /**      *       */                             
private static final long serialVersionUID = 5749098182016143296L;
                                
public String call(Long paramT1) throws Exception {
                                        // TODO Auto-generated method stub      
                                StringBuilder builder = new
StringBuilder();                                        
builder.append(paramT1).append(',').append(paramT1 +
1);                                     return builder.toString();
                                }                       }                       
                
        System.out.println(" ************** STARTING BENCHMARK  EXAMPLE
...*****************");
                        
while(true) {                           System.out.println(" ************** DO 
YOU WANT TO CONTINUE
? (YES/NO) *****************");
                                BufferedReader reader = new BufferedReader(new
InputStreamReader(System.in));                          try {                   
        String continueString =
reader.readLine();                              
                                        
if("yes".equalsIgnoreCase(continueString)) {                                    
        
                                                if( numbers.size() == 0 ) {     
                                                                                
                        // List not populated                                   
                
for (long i = 0; i < num; i++) {                                                
                numbers.add(i);                                                 
}
                                                }
                                                // at this time numbers has 
long values in it.                                          // check for RDD
if already created or not.
                                                if( distData == null) {         
                                System.out.println("********************
NEW RDD CREATED.********************");
                                                if ( numPartitions > 0) {       
                                                distData =
sc.parallelize(numbers,numPartitions)                                           
                        ;                                               } else 
{                                                        distData
= sc.parallelize(numbers)                                                       
                ;                                               }               
                                
                                                                                
                }
                                                // at this time, RDD is already 
present or newly created                                                //
check if map is null or not                                             
if(mapOfKeys == null) {                                                         
                                        
mapOfKeys = distData                                                            
        .keyBy(new ByKeyImpl());                                                
                                                                                
                                                                                
        
keysRDD = mapOfKeys.keys();                                             
keysRDD.persist(StorageLevel.MEMORY_ONLY());                                    
                                                                }
                                                System.out.println("******** DO 
YOU WANT TO COUNT OR COLLECT THE
COLLECTION ? *******************");
                                                String inputOperation = 
reader.readLine();
                                                
if("count".equalsIgnoreCase(inputOperation)) {                                  
                long startTime =
Calendar.getInstance().getTimeInMillis();                                       
                System.out.println(" START
Time of Function ... *** " + startTime);
                                                        
                                                        
System.out.println("***************  KEYS COUNT IS **************** "           
                                                
+ 
keysRDD.count())
;
                                                                                
                                                                                
        
long endTime = Calendar.getInstance().getTimeInMillis();
                                                        
System.out.println(" END Time of Function ... *** " + endTime                   
                                                + "
and difference is ************"                                                 
                + ((endTime - startTime) / 1000) + "
sec(s)...");                                            } else if 
("collect".equalsIgnoreCase(inputOperation)) {                                  
              
long startTime = Calendar.getInstance().getTimeInMillis();                      
                        
System.out.println(" START Time of Function ... *** " + startTime);
        
                                                
                                                        
System.out.println("*************  AFTER COLLECTING KEYS COUNT IS
**************** "                                                              
        + 
keysRDD.collect().size()
);
                                                                                
                                
                                                        long endTime = 
Calendar.getInstance().getTimeInMillis();
                                                        
System.out.println(" END Time of Function ... *** " + endTime                   
                                                + "
and difference is ************"                                                 
                + ((endTime - startTime) / 1000) + "
sec(s)...");
                                                }                               
                
                                        } else 
if("no".equalsIgnoreCase(continueString)) {                                     
         System.exit(0);                         
}
                                } catch (IOException e) {                       
                // TODO Auto-generated catch block                              
e.printStackTrace();                            }
                        }                                               }       
}
In the code, the more the count of loop iterations, the collect / count,
sends more data along with the Task size. Is there any way to reduce this
????????



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Task-Size-Increases-when-using-loops-tp17694.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to