spark driver with OOM due to org.apache.spark.status.ElementTrackingStore
He there, We have spark driver running 24x7, and we are continiously getting OOM in spark driver every 10 days. I found org.apache.spark.status.ElementTrackingStore keep 85% of heap usage after analyzing heap dump like this image: [image: image.png] i found these parameter would be the root cause in jira ticket, https://issues.apache.org/jira/browse/SPARK-26395 - spark.ui.retainedDeadExecutors - spark.ui.retainedJobs - spark.ui.retainedStages But it didn't work. OOM is delayed from 1 week to 10 days with these changes. It would be really appreciated if anyone can give me any solutions. Thanks Jason .
Is spark fair scheduler is for kubernete?
the official doc, https://spark.apache.org/docs/latest/job-scheduling.html, didn't mention that its working for kubernete cluster? Can anyone quickly answer this? TIA. Jason
sharing class between NonClosableMutableURLClassLoader and MutableURLClassLoader
Hi there, I'm tweaking hive thrift server and spart session to provide custom sql capabilities, and i came across java.lang.ClassNotFoundException to load customer session builder. What i found is that the customer session builder is tried to load by MutableURLClassLoader. I have no idea about why my customer session builder is not visible to this class loader. It's happening when opening session. I've got this root cause stack trace : Caused by: java.lang.ClassNotFoundException: com.xxx.XXXSessionStateBuilder at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.spark.util.Utils$.classForName(Utils.scala:207) at com.xxx.XXXSparkSession$.com$xxx$XXXSparkSession$$instantiateSessionState(XXXSparkSession.scala:724) Any idea about this. TIA. Jason
java.lang.ClassNotFoundException for custom hive authentication
Hi there, I'm leveraging thriftserver to provide sql service, and using custom hive authentication: -- hive.server2.custom.authentication.class com.abc.ABCAuthenticationProvider I've got this error when logging into thrift server. class path was set using --jar option. I guess this is because my class is loaded by system class loader. Please let me know how to fix this. TIA - java.lang.RuntimeException: java.lang.ClassNotFoundException: Class com.abc.ABCAuthenticationProvider not found at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2595) at org.apache.hive.service.auth.CustomAuthenticationProviderImpl.(CustomAuthenticationProviderImpl.java:39) at org.apache.hive.service.auth.AuthenticationProviderFactory.getAuthenticationProvider(AuthenticationProviderFactory.java:64) at org.apache.hive.service.auth.PlainSaslHelper$PlainServerCallbackHandler.handle(PlainSaslHelper.java:105) at org.apache.hive.service.auth.PlainSaslServer.evaluateResponse(PlainSaslServer.java:102) at org.apache.thrift.transport.TSaslTransport$SaslParticipant.evaluateChallengeOrResponse(TSaslTransport.java:537) at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:283) at org.apache.thrift.transport.TSaslServerTransport.open(TSaslServerTransport.java:43) at org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:223) at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: Class com.abc.ABCAuthenticationProvider not found at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2499) at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2593) ... 12 more
Re: How to convert InternalRow to Row.
Thanks Jia, Wenchen for your reply. I've change my code like belows : val sparkPlan = sqlContext.sparkSession.sessionState.planner.plan(pipeLinePlan).next() sparkPlan.execute().mapPartitions { itr => itr.map { internalRow => val encoder = RowEncoder(schema) encoder.fromRow(internalRow) } } but this time, i've got this exception : --- java.lang.RuntimeException: Error while decoding: java.lang.UnsupportedOperationException: Cannot evaluate expression: getcolumnbyordinal(0, StringType) createexternalrow(getcolumnbyordinal(0, StringType).toString, getcolumnbyordinal(1, StringType).toString, getcolumnbyordinal(2, IntegerType), getcolumnbyordinal(3, IntegerType), getcolumnbyordinal(4, StringType).toString, getcolumnbyordinal(5, LongType), StructField(product,StringType,true), StructField(category,StringType,true), StructField(revenue,IntegerType,true), StructField(item_count,IntegerType,true), StructField(product_category,StringType,true), StructField(tot_revenue,LongType,true)) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:305) at com.zetaris.lightning.sql.vpipeline.PipeLineRelation$$anonfun$3$$anonfun$apply$1.apply(PipeLineRelation.scala:53) at com.zetaris.lightning.sql.vpipeline.PipeLineRelation$$anonfun$3$$anonfun$apply$1.apply(PipeLineRelation.scala:43) at scala.collection.Iterator$$anon$11.next(Iterator.scala:370) at scala.collection.Iterator$$anon$11.next(Iterator.scala:370) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) --- Any idea about this error? Thanks Jason On Mon, 30 Nov 2020 at 19:34, Jia, Ke A wrote: > The fromRow method is removed in spark3.0. And the new API is : > > val encoder = RowEncoder(schema) > > val row = encoder.createDeserializer().apply(internalRow) > > > > Thanks, > > Jia Ke > > > > *From:* Wenchen Fan > *Sent:* Friday, November 27, 2020 9:32 PM > *To:* Jason Jun > *Cc:* Spark dev list > *Subject:* Re: How to convert InternalRow to Row. > > > > InternalRow is an internal/developer API that might change overtime. > Right now, the way to convert it to Row is to use `RowEncoder`, but you > need to know the data schema: > > val encoder = RowEncoder(schema) > > val row = encoder.fromRow(internalRow) > > > > On Fri, Nov 27, 2020 at 6:16 AM Jason Jun wrote: > > Hi dev, > > > > i'm working on generating custom pipeline on the fly, which means I > generate SparkPlan along with each node in my pipeline. > > > > So, my pipeline end up with PipeLineRelation extending BaseRelation like: > > > > case class PipeLineRelation(schema: StructType, pipeLinePlan: > LogicalPlan)(@transient override val sqlContext: SQLContext) extends > BaseRelation with PrunedFilteredScan { > override def needConversion: Boolean = true > override def unhandledFilters(filters: Array[Filter]): Array[Filter] = > filters > > override def buildScan(requiredColumns: Array[String], filters: > Array[Filter]): RDD[Row] = { > ... > val sparkPlan = > sqlContext.sparkSession.sessionState.planner.plan(pipeLinePlan).next() > *sparkPlan.execute().mapPartitions* { itr => > itr.map { internalRow => > val values = prunedColumnWithIndex
How to convert InternalRow to Row.
Hi dev, i'm working on generating custom pipeline on the fly, which means I generate SparkPlan along with each node in my pipeline. So, my pipeline end up with PipeLineRelation extending BaseRelation like: case class PipeLineRelation(schema: StructType, pipeLinePlan: LogicalPlan)(@transient override val sqlContext: SQLContext) extends BaseRelation with PrunedFilteredScan { override def needConversion: Boolean = true override def unhandledFilters(filters: Array[Filter]): Array[Filter] = filters override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { ... val sparkPlan = sqlContext.sparkSession.sessionState.planner.plan(pipeLinePlan).next() *sparkPlan.execute().mapPartitions* { itr => itr.map { internalRow => val values = prunedColumnWithIndex.map { case (index, columnType) => internalRow.get(index, columnType) } *Row.fromSeq(values) // Line 46* } } } } I'm getting InternalRow by executing subsequent Spark Plan, and converting it into Row using Row.fromSeq(). i saw values at Line 46 are what i exactly want : -- values = {Object[5]@14277} 0 = {UTF8String@14280} "Thin" 1 = {UTF8String@14281} "Cell phone" 2 = {Integer@14282} 6000 3 = {Integer@14283} 2 4 = {Integer@14284} 12000 but execution of Line 46 ended up with this error : -- Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.IllegalArgumentException: The value (2) of the type (java.lang.Integer) cannot be converted to the string type at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:290) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:285) at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103) at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:396) at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$3.apply(ExistingRDD.scala:60) at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$3.apply(ExistingRDD.scala:57) at scala.collection.Iterator$$anon$11.next(Iterator.scala:370) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Is it existing bug? otherwise how do I convert InternalRow to Row? Thanks in advance. Jason