Getting error for the following code snippet: object SparkTaskTry extends Logging { 63 /** 64 * Extends the normal Try constructor to allow TaskKilledExceptions to propagate 65 */ 66 def apply[T](r: => T): Try[T] = 67 try scala.util.Success(r) catch { 68 case e: TaskKilledException => throw e 69 case NonFatal(e) => 70 logInfo("Caught and Ignored Exception: " + e.toString) 71 e.printStackTrace() 72 Failure(e) 73 } 74 }
override def buildScan( 349 requiredColumns: Array[String], 350 filters: Array[Filter], 351 inputFiles: Array[FileStatus], 352 broadcastedConf: Broadcast[SerializableConfiguration]): RDD[Row] = { 353 val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA) 354 val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown 355 val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString 356 val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp 357 val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec 358 359 // When merging schemas is enabled and the column of the given filter does not exist, 360 // Parquet emits an exception which is an issue of Parquet (PARQUET-389). 361 val safeParquetFilterPushDown = !shouldMergeSchemas && parquetFilterPushDown 362 363 // Parquet row group size. We will use this value as the value for 364 // mapreduce.input.fileinputformat.split.minsize and mapred.min.split.size if the value 365 // of these flags are smaller than the parquet row group size. 366 val parquetBlockSize = ParquetOutputFormat.getLongBlockSize(broadcastedConf.value.value) 367 368 // Create the function to set variable Parquet confs at both driver and executor side. 369 val initLocalJobFuncOpt = 370 ParquetRelation.initializeLocalJobFunc( 371 requiredColumns, 372 filters, 373 dataSchema, 374 parquetBlockSize, 375 useMetadataCache, 376 safeParquetFilterPushDown, 377 assumeBinaryIsString, 378 assumeInt96IsTimestamp, 379 followParquetFormatSpec) _ 380 381 // Create the function to set input paths at the driver side. 382 val setInputPaths = 383 ParquetRelation.initializeDriverSideJobFunc(inputFiles, parquetBlockSize) _ 384 385 Utils.withDummyCallSite(sqlContext.sparkContext) { 386 new RDD[Try[InternalRow]](sqlContext.sparkContext, Nil) with Logging { 387 388 override def getPartitions: Array[SparkPartition] = internalRDD.getPartitions 389 390 override def getPreferredLocations(split: SparkPartition): Seq[String] = 391 internalRDD.getPreferredLocations(split) 392 393 override def checkpoint() { 394 // Do nothing. Hadoop RDD should not be checkpointed. 395 } 396 397 override def persist(storageLevel: StorageLevel): this.type = { 398 super.persist(storageLevel) 399 } 400 401 val internalRDD: SqlNewHadoopRDD[InternalRow] = new SqlNewHadoopRDD( 402 sc = sqlContext.sparkContext, 403 broadcastedConf = broadcastedConf, 404 initDriverSideJobFuncOpt = Some(setInputPaths), 405 initLocalJobFuncOpt = Some(initLocalJobFuncOpt), 406 inputFormatClass = if (isSplittable) { 407 classOf[ParquetInputFormat[InternalRow]] 408 } else { 409 classOf[ParquetRowInputFormatIndivisible] 410 }, 411 valueClass = classOf[InternalRow]) { 412 413 val cacheMetadata = useMetadataCache 414 415 @transient val cachedStatuses = inputFiles.map { f => 416 // In order to encode the authority of a Path containing special characters such as '/' 417 // (which does happen in some S3N credentials), we need to use the string returned by the 418 // URI of the path to create a new Path. 419 val pathWithEscapedAuthority = escapePathUserInfo(f.getPath) 420 new FileStatus( 421 f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, f.getModificationTime, 422 f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithEscapedAuthority) 423 }.toSeq 424 425 private def escapePathUserInfo(path: Path): Path = { 426 val uri = path.toUri 427 new Path(new URI( 428 uri.getScheme, uri.getRawUserInfo, uri.getHost, uri.getPort, uri.getPath, 429 uri.getQuery, uri.getFragment)) 430 } 431 432 // Overridden so we can inject our own cached files statuses. 433 override def getPartitions: Array[SparkPartition] = { 434 val inputFormat = new ParquetInputFormat[InternalRow] { 435 override def listStatus(jobContext: JobContext): JList[FileStatus] = { 436 if (cacheMetadata) cachedStatuses else super.listStatus(jobContext) 437 } 438 } 439 440 val jobContext = newJobContext(getConf(isDriverSide = true), jobId) 441 val rawSplits = inputFormat.getSplits(jobContext) 442 443 Array.tabulate[SparkPartition](rawSplits.size) { i => 444 new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) 445 } 446 } 447 } 448 449 override def compute(part: SparkPartition, context: TaskContext): 450 InterruptibleIterator[Try[InternalRow]] = { 451 val iter: Iterator[InternalRow] = internalRDD.constructIter(part, context) 452 val tryIter = new Iterator[Try[InternalRow]] { 453 override def next(): Try[InternalRow] = { 454 val readAttempt = SparkTaskTry(iter.next()) 455 readAttempt 456 } 457 458 override def hasNext: Boolean = { 459 SparkTaskTry[Boolean](iter.hasNext) match { 460 case scala.util.Success(r) => r 461 case _ => false 462 } 463 } 464 } 465 new InterruptibleIterator[Try[InternalRow]](context, tryIter) 466 } 467 468 }.filter(_.isSuccess).map(_.get) 469 .asInstanceOf[RDD[Row]] // type erasure hack to pass RDD[InternalRow] as RDD[Row] 470 } 471 } Error StackTrace : Caused by: java.io.NotSerializableException: org.apache.spark.sql.execution.datasources.parquet.ParquetRelation Serialization stack: - object not serializable (class: org.apache.spark.sql.execution.datasources.parquet.ParquetRelation, value: ParquetRelation) - field (class: org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$buildInternalScan$1, name: $outer, type: class org.apache.spark.sql.execution.datasources.parquet.ParquetRelation) - object (class org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$buildInternalScan$1, <function0>) - field (class: org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$buildInternalScan$1$$anon$2, name: $outer, type: class org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$buildInternalScan$1) - object (class org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$buildInternalScan$1$$anon$2, $anonfun$buildInternalScan$1$$anon$2[2] at ) - field (class: org.apache.spark.NarrowDependency, name: _rdd, type: class org.apache.spark.rdd.RDD) - object (class org.apache.spark.OneToOneDependency, org.apache.spark.OneToOneDependency@47293a0b) - writeObject data (class: scala.collection.immutable.$colon$colon) - object (class scala.collection.immutable.$colon$colon, List(org.apache.spark.OneToOneDependency@47293a0b)) - field (class: org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies_, type: interface scala.collection.Seq) - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[4] at ) - field (class: org.apache.spark.rdd.MapPartitionsRDD, name: prev, type: class org.apache.spark.rdd.RDD) - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[5] at ) - field (class: org.apache.spark.sql.execution.PhysicalRDD, name: rdd, type: class org.apache.spark.rdd.RDD) - object (class org.apache.spark.sql.execution.PhysicalRDD, Scan ParquetRelation[_1#0] InputPaths: hdfs://CRUX2-SETUP:9000/data/testdir/data1.parquet ) - field (class: org.apache.spark.sql.execution.ConvertToSafe, name: child, type: class org.apache.spark.sql.execution.SparkPlan) - object (class org.apache.spark.sql.execution.ConvertToSafe, ConvertToSafe +- Scan ParquetRelation[_1#0] InputPaths: hdfs://CRUX2-SETUP:9000/data/testdir/data1.parquet ) - field (class: org.apache.spark.sql.execution.ConvertToSafe$$anonfun$2, name: $outer, type: class org.apache.spark.sql.execution.ConvertToSafe) - object (class org.apache.spark.sql.execution.ConvertToSafe$$anonfun$2, <function1>) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) ... 78 more Please help! -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Task-not-Serializable-Exception-tp20417.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe e-mail: dev-unsubscr...@spark.apache.org