Can you try with val encoder = RowEncoder(schema).resolveAndBind() ... On Mon, Nov 30, 2020 at 5:07 PM Jason Jun <jaes...@gmail.com> wrote:
> Thanks Jia, Wenchen for your reply. > > I've change my code like belows : > > val sparkPlan = > sqlContext.sparkSession.sessionState.planner.plan(pipeLinePlan).next() > sparkPlan.execute().mapPartitions { itr => > itr.map { internalRow => > val encoder = RowEncoder(schema) > encoder.fromRow(internalRow) > } > } > > but this time, i've got this exception : > --- > java.lang.RuntimeException: Error while decoding: > java.lang.UnsupportedOperationException: Cannot evaluate expression: > getcolumnbyordinal(0, StringType) > createexternalrow(getcolumnbyordinal(0, StringType).toString, > getcolumnbyordinal(1, StringType).toString, getcolumnbyordinal(2, > IntegerType), getcolumnbyordinal(3, IntegerType), getcolumnbyordinal(4, > StringType).toString, getcolumnbyordinal(5, LongType), > StructField(product,StringType,true), > StructField(category,StringType,true), > StructField(revenue,IntegerType,true), > StructField(item_count,IntegerType,true), > StructField(product_category,StringType,true), > StructField(tot_revenue,LongType,true)) > at > org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:305) > at > com.zetaris.lightning.sql.vpipeline.PipeLineRelation$$anonfun$3$$anonfun$apply$1.apply(PipeLineRelation.scala:53) > at > com.zetaris.lightning.sql.vpipeline.PipeLineRelation$$anonfun$3$$anonfun$apply$1.apply(PipeLineRelation.scala:43) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:370) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:370) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.run(Task.scala:121) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > --- > Any idea about this error? > > Thanks > Jason > > On Mon, 30 Nov 2020 at 19:34, Jia, Ke A <ke.a....@intel.com> wrote: > >> The fromRow method is removed in spark3.0. And the new API is : >> >> val encoder = RowEncoder(schema) >> >> val row = encoder.createDeserializer().apply(internalRow) >> >> >> >> Thanks, >> >> Jia Ke >> >> >> >> *From:* Wenchen Fan <cloud0...@gmail.com> >> *Sent:* Friday, November 27, 2020 9:32 PM >> *To:* Jason Jun <jaes...@gmail.com> >> *Cc:* Spark dev list <dev@spark.apache.org> >> *Subject:* Re: How to convert InternalRow to Row. >> >> >> >> InternalRow is an internal/developer API that might change overtime. >> Right now, the way to convert it to Row is to use `RowEncoder`, but you >> need to know the data schema: >> >> val encoder = RowEncoder(schema) >> >> val row = encoder.fromRow(internalRow) >> >> >> >> On Fri, Nov 27, 2020 at 6:16 AM Jason Jun <jaes...@gmail.com> wrote: >> >> Hi dev, >> >> >> >> i'm working on generating custom pipeline on the fly, which means I >> generate SparkPlan along with each node in my pipeline. >> >> >> >> So, my pipeline end up with PipeLineRelation extending BaseRelation like: >> >> >> >> case class PipeLineRelation(schema: StructType, pipeLinePlan: >> LogicalPlan)(@transient override val sqlContext: SQLContext) extends >> BaseRelation with PrunedFilteredScan { >> override def needConversion: Boolean = true >> override def unhandledFilters(filters: Array[Filter]): Array[Filter] = >> filters >> >> override def buildScan(requiredColumns: Array[String], filters: >> Array[Filter]): RDD[Row] = { >> ... >> val sparkPlan = >> sqlContext.sparkSession.sessionState.planner.plan(pipeLinePlan).next() >> *sparkPlan.execute().mapPartitions* { itr => >> itr.map { internalRow => >> val values = prunedColumnWithIndex.map { case (index, columnType) >> => >> internalRow.get(index, columnType) >> } >> * Row.fromSeq(values) // Line 46* >> } >> } >> } >> } >> >> >> >> I'm getting InternalRow by executing subsequent Spark Plan, and >> converting it into Row using Row.fromSeq(). i saw values at Line 46 are >> what i exactly want : >> >> ------ >> >> values = {Object[5]@14277} >> 0 = {UTF8String@14280} "Thin" >> 1 = {UTF8String@14281} "Cell phone" >> 2 = {Integer@14282} 6000 >> 3 = {Integer@14283} 2 >> 4 = {Integer@14284} 12000 >> >> >> >> but execution of Line 46 ended up with this error : >> >> ------ >> >> Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, >> most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor >> driver): java.lang.IllegalArgumentException: The value (2) of the type >> (java.lang.Integer) cannot be converted to the string type >> at >> org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:290) >> at >> org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:285) >> at >> org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103) >> at >> org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:396) >> at >> org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$3.apply(ExistingRDD.scala:60) >> at >> org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$3.apply(ExistingRDD.scala:57) >> at scala.collection.Iterator$$anon$11.next(Iterator.scala:370) >> at >> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown >> Source) >> at >> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) >> at >> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) >> at >> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) >> at >> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) >> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) >> at org.apache.spark.scheduler.Task.run(Task.scala:121) >> at >> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402) >> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:748) >> >> ---- >> >> >> >> Is it existing bug? otherwise how do I convert InternalRow to Row? >> >> >> >> Thanks in advance. >> >> Jason >> >>