Hi,

While using reference with in JDBCRdd , It is throwing serializable exception. 
Does JDBCRdd does not except reference from other part of code.?
             confMap= ConfFactory.getConf(ParquetStreaming)

              val jdbcRDD = new JdbcRDD(sc, () => {
                Class.forName("org.apache.phoenix.jdbc.PhoenixDriver")
                DriverManager.getConnection(confMap(PHOENIX_URL)) -Throwing 
below exception
                
//DriverManager.getConnection(ConfFactory.getConf(ParquetStreaming)(PHOENIX_URL))
 ---This works
              }, s"SELECT tenant_id, data_source_id, mne_id, device_type1_key " 
+
                 s" FROM XYZ_TYPE1_TEST WHERE DEVICE_TYPE1_KEY >= ? and 
DEVICE_TYPE1_KEY <= ? and TENANT_ID in ($tenantIds) " +
                 s" AND DATA_SOURCE_ID in ($dataSourceIds) AND ISDELETED = 
false",
                minKey, maxKey, 10, row => DeviceDel(row.getString(1), 
row.getString(2), row.getLong(3), row.getLong(4))).cache()

It throws runtime exception. However, " 
DriverManager.getConnection("jdbc:phoenix:10.20.87.1:2181") "   works fine.

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to 
stage failure: Task not serializable: java.io.NotSerializableException: 
org.apache.spark.SparkContext
Serialization stack:
                - object not serializable (class: 
org.apache.spark.SparkContext, value: 
org.apache.spark.SparkContext@5bb273b4<mailto:org.apache.spark.SparkContext@5bb273b4>)
                - field (class: 
advance_reporting.transformations.DeviceDelETL$$anonfun$main$1, name: sc$1, 
type: class org.apache.spark.SparkContext)
                - object (class 
advance_reporting.transformations.DeviceDelETL$$anonfun$main$1, <function1>)
                - field (class: 
advance_reporting.transformations.DeviceDelETL$$anonfun$main$1$$anonfun$6, 
name: $outer, type: class $$anonfun$main$1)
                - object (class 
advance_reporting.transformations.DeviceDelETL$$anonfun$main$1$$anonfun$6, 
<function0>)
                - field (class: org.apache.spark.rdd.JdbcRDD, name: 
org$apache$spark$rdd$JdbcRDD$$getConnection, type: interface scala.Function0)
                - object (class org.apache.spark.rdd.JdbcRDD, JdbcRDD[15] at 
JdbcRDD at DeviceDelETL.scala:91)
                - field (class: scala.Tuple2, name: _1, type: class 
java.lang.Object)
                - object (class scala.Tuple2, (JdbcRDD[15] at JdbcRDD at 
DeviceDelETL.scala:91,<function2>))
                at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
                at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
                at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
                at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
                at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
                at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
                at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:878)
                at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:815)
                at 
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:799)
                at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1426)
                at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
                at 
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Sep 18, 2015 12:22:59 PM INFO: parquet.hadoop.ParquetFileReader: Initiating 
action with parallelism: 5

Any idea?

Reply via email to