Hi 

Im getting the following error when trying to process a csv based data file.

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to 
stage failure: Task 1 in stage 10.0 failed 4 times, most recent failure: Lost 
task 1.3 in stage 10.0 (TID 262, hc2r1m3.semtech-solutions.co.nz): 
java.lang.ArrayIndexOutOfBoundsException: 0
        at 
org.apache.spark.sql.catalyst.expressions.GenericRow.apply(Row.scala:142)
        at 
org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:37)
        at 
org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68)
        at 
org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)

I have made sure that none of my data rows are empty and that they all have 15 
records. I have also physically checked the
data. The error occurs when I run the actual spark sql on the last line. The 
script is as follows. 

  val server    = "hdfs://hc2nn.semtech-solutions.co.nz:8020"
  val path      = "/data/spark/h2o/"

  val train_csv =  server + path + "adult.train.data" // 32,562 rows
  val test_csv  =  server + path + "adult.test.data"  // 16,283 rows

  // load the data

  val rawTrainData = sparkCxt.textFile(train_csv)
  val rawTestData  = sparkCxt.textFile(test_csv)

  // create a spark sql schema for the row

  val schemaString = "age workclass fnlwgt education educationalnum 
maritalstatus" +
                     " occupation relationship race gender capitalgain 
capitalloss" +
                     " hoursperweek nativecountry income"

  val schema = StructType( schemaString.split(" ")
      .map(fieldName => StructField(fieldName, StringType, false)))

  // create an RDD from the raw training data

  val trainRDD  = rawTrainData
         .filter(!_.isEmpty)
         .map(rawRow => Row.fromSeq(rawRow.split(",")
         .filter(_.length == 15)
         .map(_.toString).map(_.trim) ))

  println( ">>>>> Raw Training Data Count = " + trainRDD.count() )

  val testRDD   = rawTestData
         .filter(!_.isEmpty)
         .map(rawRow  => Row.fromSeq(rawRow.split(",")
         .filter(_.length == 15)
         .map(_.toString).map(_.trim) ))

  println( ">>>>> Raw Testing Data Count = " + testRDD.count() )

  // create a schema RDD

  val trainSchemaRDD = sqlContext.applySchema(trainRDD, schema)
  val testSchemaRDD  = sqlContext.applySchema(testRDD,  schema)

  // register schema RDD as a table

  trainSchemaRDD.registerTempTable("trainingTable")
  testSchemaRDD.registerTempTable("testingTable")

  println( ">>>>> Schema RDD Training Data Count = " + trainSchemaRDD.count() )
  println( ">>>>> Schema RDD Testing Data Count  = " + testSchemaRDD.count() )

  // now run sql against the table to filter the data

  val schemaRddTrain = sqlContext.sql(
    "SELECT "+
       "age,workclass,education,maritalstatus,occupation,relationship,race,"+
       "gender,hoursperweek,nativecountry,income "+
    "FROM trainingTable LIMIT 5000")

  println( ">>>>> Training Data Count = " + schemaRddTrain.count() )

Any advice is appreciated :)

                                          

Reply via email to