Getting error for the following code snippet:

object SparkTaskTry extends Logging {
  63   /**
  64    * Extends the normal Try constructor to allow TaskKilledExceptions
to propagate
  65    */
  66   def apply[T](r: => T): Try[T] =
  67     try scala.util.Success(r) catch {
  68       case e: TaskKilledException => throw e
  69       case NonFatal(e) =>
  70         logInfo("Caught and Ignored Exception: " + e.toString)
  71         e.printStackTrace()
  72         Failure(e)
  73     }
  74 }

override def buildScan(
 349       requiredColumns: Array[String],
 350       filters: Array[Filter],
 351       inputFiles: Array[FileStatus],
 352       broadcastedConf: Broadcast[SerializableConfiguration]): RDD[Row]
= {
 353     val useMetadataCache =
sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA)
 354     val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown
 355     val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
 356     val assumeInt96IsTimestamp =
sqlContext.conf.isParquetINT96AsTimestamp
 357     val followParquetFormatSpec =
sqlContext.conf.followParquetFormatSpec
 358 
 359     // When merging schemas is enabled and the column of the given
filter does not exist,
 360     // Parquet emits an exception which is an issue of Parquet
(PARQUET-389).
 361     val safeParquetFilterPushDown = !shouldMergeSchemas &&
parquetFilterPushDown
 362 
 363     // Parquet row group size. We will use this value as the value for
 364     // mapreduce.input.fileinputformat.split.minsize and
mapred.min.split.size if the value
 365     // of these flags are smaller than the parquet row group size.
 366     val parquetBlockSize =
ParquetOutputFormat.getLongBlockSize(broadcastedConf.value.value)
 367 
 368     // Create the function to set variable Parquet confs at both driver
and executor side.
 369     val initLocalJobFuncOpt =
 370       ParquetRelation.initializeLocalJobFunc(
 371         requiredColumns,
 372         filters,
 373         dataSchema,
 374         parquetBlockSize,
 375         useMetadataCache,
 376         safeParquetFilterPushDown,
 377         assumeBinaryIsString,
 378         assumeInt96IsTimestamp,
 379         followParquetFormatSpec) _
 380 
 381     // Create the function to set input paths at the driver side.
 382     val setInputPaths =
 383       ParquetRelation.initializeDriverSideJobFunc(inputFiles,
parquetBlockSize) _
 384 
 385     Utils.withDummyCallSite(sqlContext.sparkContext) {
 386       new RDD[Try[InternalRow]](sqlContext.sparkContext, Nil) with
Logging {
 387 
 388         override def getPartitions: Array[SparkPartition] =
internalRDD.getPartitions
 389 
 390         override def getPreferredLocations(split: SparkPartition):
Seq[String] =
 391           internalRDD.getPreferredLocations(split)
 392 
 393         override def checkpoint() {
 394           // Do nothing. Hadoop RDD should not be checkpointed.
 395         }
 396 
 397         override def persist(storageLevel: StorageLevel): this.type = {
 398           super.persist(storageLevel)
 399         }
 400 
 401         val internalRDD: SqlNewHadoopRDD[InternalRow] = new
SqlNewHadoopRDD(
 402         sc = sqlContext.sparkContext,
 403         broadcastedConf = broadcastedConf,
 404         initDriverSideJobFuncOpt = Some(setInputPaths),
 405         initLocalJobFuncOpt = Some(initLocalJobFuncOpt),
 406         inputFormatClass = if (isSplittable) {
 407           classOf[ParquetInputFormat[InternalRow]]
 408         } else {
 409           classOf[ParquetRowInputFormatIndivisible]
 410         },
 411         valueClass = classOf[InternalRow]) {
 412 
 413         val cacheMetadata = useMetadataCache
 414 
 415         @transient val cachedStatuses = inputFiles.map { f =>
 416           // In order to encode the authority of a Path containing
special characters such as '/'
 417           // (which does happen in some S3N credentials), we need to
use the string returned by the
 418           // URI of the path to create a new Path.
 419           val pathWithEscapedAuthority = escapePathUserInfo(f.getPath)
 420           new FileStatus(
 421             f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
f.getModificationTime,
 422             f.getAccessTime, f.getPermission, f.getOwner, f.getGroup,
pathWithEscapedAuthority)
 423         }.toSeq
 424 
 425         private def escapePathUserInfo(path: Path): Path = {
 426           val uri = path.toUri
 427           new Path(new URI(
 428             uri.getScheme, uri.getRawUserInfo, uri.getHost,
uri.getPort, uri.getPath,
 429             uri.getQuery, uri.getFragment))
 430         }
 431 
 432         // Overridden so we can inject our own cached files statuses.
 433         override def getPartitions: Array[SparkPartition] = {
 434           val inputFormat = new ParquetInputFormat[InternalRow] {
 435             override def listStatus(jobContext: JobContext):
JList[FileStatus] = {
 436               if (cacheMetadata) cachedStatuses else
super.listStatus(jobContext)
 437             }
 438           }
 439 
 440           val jobContext = newJobContext(getConf(isDriverSide = true),
jobId)
 441           val rawSplits = inputFormat.getSplits(jobContext)
 442 
 443           Array.tabulate[SparkPartition](rawSplits.size) { i =>
 444             new SqlNewHadoopPartition(id, i,
rawSplits(i).asInstanceOf[InputSplit with Writable])
 445           }
 446         }
 447       }
 448 
 449         override def compute(part: SparkPartition, context:
TaskContext):
 450         InterruptibleIterator[Try[InternalRow]] = {
 451           val iter: Iterator[InternalRow] =
internalRDD.constructIter(part, context)
 452           val tryIter = new Iterator[Try[InternalRow]] {
 453             override def next(): Try[InternalRow] = {
 454               val readAttempt = SparkTaskTry(iter.next())
 455               readAttempt
 456             }
 457 
 458             override def hasNext: Boolean = {
 459               SparkTaskTry[Boolean](iter.hasNext) match {
 460                 case scala.util.Success(r) => r
 461                 case _ => false
 462               }
 463             }
 464           }
 465           new InterruptibleIterator[Try[InternalRow]](context, tryIter)
 466         }
 467 
 468       }.filter(_.isSuccess).map(_.get)
 469         .asInstanceOf[RDD[Row]]  // type erasure hack to pass
RDD[InternalRow] as RDD[Row]
 470     }
 471   }

Error StackTrace :
Caused by: java.io.NotSerializableException:
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
Serialization stack:
        - object not serializable (class:
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation, value:
ParquetRelation)
        - field (class:
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$buildInternalScan$1,
name: $outer, type: class
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation)
        - object (class
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$buildInternalScan$1,
<function0>)
        - field (class:
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$buildInternalScan$1$$anon$2,
name: $outer, type: class
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$buildInternalScan$1)
        - object (class
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$buildInternalScan$1$$anon$2,
$anonfun$buildInternalScan$1$$anon$2[2] at )
        - field (class: org.apache.spark.NarrowDependency, name: _rdd, type: 
class
org.apache.spark.rdd.RDD)
        - object (class org.apache.spark.OneToOneDependency,
org.apache.spark.OneToOneDependency@47293a0b)
        - writeObject data (class: scala.collection.immutable.$colon$colon)
        - object (class scala.collection.immutable.$colon$colon,
List(org.apache.spark.OneToOneDependency@47293a0b))
        - field (class: org.apache.spark.rdd.RDD, name:
org$apache$spark$rdd$RDD$$dependencies_, type: interface
scala.collection.Seq)
        - object (class org.apache.spark.rdd.MapPartitionsRDD, 
MapPartitionsRDD[4]
at )
        - field (class: org.apache.spark.rdd.MapPartitionsRDD, name: prev, type:
class org.apache.spark.rdd.RDD)
        - object (class org.apache.spark.rdd.MapPartitionsRDD, 
MapPartitionsRDD[5]
at )
        - field (class: org.apache.spark.sql.execution.PhysicalRDD, name: rdd,
type: class org.apache.spark.rdd.RDD)
        - object (class org.apache.spark.sql.execution.PhysicalRDD, Scan
ParquetRelation[_1#0] InputPaths:
hdfs://CRUX2-SETUP:9000/data/testdir/data1.parquet
)
        - field (class: org.apache.spark.sql.execution.ConvertToSafe, name: 
child,
type: class org.apache.spark.sql.execution.SparkPlan)
        - object (class org.apache.spark.sql.execution.ConvertToSafe, 
ConvertToSafe
+- Scan ParquetRelation[_1#0] InputPaths:
hdfs://CRUX2-SETUP:9000/data/testdir/data1.parquet
)
        - field (class: org.apache.spark.sql.execution.ConvertToSafe$$anonfun$2,
name: $outer, type: class org.apache.spark.sql.execution.ConvertToSafe)
        - object (class org.apache.spark.sql.execution.ConvertToSafe$$anonfun$2,
<function1>)
        at
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
        at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
        at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
        at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
        ... 78 more

Please help!



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Task-not-Serializable-Exception-tp20417.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org

Reply via email to