[ https://issues.apache.org/jira/browse/SPARK-9627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14702671#comment-14702671 ]
Cheng Lian commented on SPARK-9627: ----------------------------------- A quick Googling suggesting that it's probably related to this Scala reflection API issue https://issues.scala-lang.org/browse/SI-6240 I'll try to stop using the reflection API here to fix this issue. > SQL job failed if the dataframe with string columns is cached > ------------------------------------------------------------- > > Key: SPARK-9627 > URL: https://issues.apache.org/jira/browse/SPARK-9627 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 1.5.0 > Reporter: Davies Liu > Assignee: Cheng Lian > Priority: Blocker > > {code} > r = random.Random() > def gen(i): > d = date.today() - timedelta(r.randint(0, 5000)) > cat = str(r.randint(0, 20)) * 5 > c = r.randint(0, 1000) > price = decimal.Decimal(r.randint(0, 100000)) / 100 > return (d, cat, c, price) > schema = StructType().add('date', DateType()).add('cat', > StringType()).add('count', ShortType()).add('price', DecimalType(5, 2)) > #df = sqlContext.createDataFrame(sc.range(1<<24).map(gen), schema) > #df.show() > #df.write.parquet('sales4') > df = sqlContext.read.parquet('sales4') > df.cache() > df.count() > df.show() > print df.schema > raw_input() > r = df.groupBy(df.date, df.cat).agg(sum(df['count'] * df.price)) > print r.explain(True) > r.show() > {code} > {code} > StructType(List(StructField(date,DateType,true),StructField(cat,StringType,true),StructField(count,ShortType,true),StructField(price,DecimalType(5,2),true))) > == Parsed Logical Plan == > 'Aggregate [date#0,cat#1], [date#0,cat#1,sum((count#2 * price#3)) AS > sum((count * price))#70] > Relation[date#0,cat#1,count#2,price#3] > org.apache.spark.sql.parquet.ParquetRelation@5ec8f315 > == Analyzed Logical Plan == > date: date, cat: string, sum((count * price)): decimal(21,2) > Aggregate [date#0,cat#1], > [date#0,cat#1,sum((change_decimal_precision(CAST(CAST(count#2, > DecimalType(5,0)), DecimalType(11,2))) * > change_decimal_precision(CAST(price#3, DecimalType(11,2))))) AS sum((count * > price))#70] > Relation[date#0,cat#1,count#2,price#3] > org.apache.spark.sql.parquet.ParquetRelation@5ec8f315 > == Optimized Logical Plan == > Aggregate [date#0,cat#1], > [date#0,cat#1,sum((change_decimal_precision(CAST(CAST(count#2, > DecimalType(5,0)), DecimalType(11,2))) * > change_decimal_precision(CAST(price#3, DecimalType(11,2))))) AS sum((count * > price))#70] > InMemoryRelation [date#0,cat#1,count#2,price#3], true, 10000, > StorageLevel(true, true, false, true, 1), (PhysicalRDD > [date#0,cat#1,count#2,price#3], MapPartitionsRDD[3] at), None > == Physical Plan == > NewAggregate with SortBasedAggregationIterator List(date#0, cat#1) > ArrayBuffer((sum((change_decimal_precision(CAST(CAST(count#2, > DecimalType(5,0)), DecimalType(11,2))) * > change_decimal_precision(CAST(price#3, > DecimalType(11,2)))))2,mode=Final,isDistinct=false)) > TungstenSort [date#0 ASC,cat#1 ASC], false, 0 > ConvertToUnsafe > Exchange hashpartitioning(date#0,cat#1) > NewAggregate with SortBasedAggregationIterator List(date#0, cat#1) > ArrayBuffer((sum((change_decimal_precision(CAST(CAST(count#2, > DecimalType(5,0)), DecimalType(11,2))) * > change_decimal_precision(CAST(price#3, > DecimalType(11,2)))))2,mode=Partial,isDistinct=false)) > TungstenSort [date#0 ASC,cat#1 ASC], false, 0 > ConvertToUnsafe > InMemoryColumnarTableScan [date#0,cat#1,count#2,price#3], > (InMemoryRelation [date#0,cat#1,count#2,price#3], true, 10000, > StorageLevel(true, true, false, true, 1), (PhysicalRDD > [date#0,cat#1,count#2,price#3], MapPartitionsRDD[3] at), None) > Code Generation: true > == RDD == > None > 15/08/04 23:21:53 ERROR TaskSetManager: Task 0 in stage 4.0 failed 1 times; > aborting job > Traceback (most recent call last): > File "t.py", line 34, in <module> > r.show() > File "/Users/davies/work/spark/python/pyspark/sql/dataframe.py", line 258, > in show > print(self._jdf.showString(n, truncate)) > File "/Users/davies/work/spark/python/lib/py4j/java_gateway.py", line 538, > in __call__ > self.target_id, self.name) > File "/Users/davies/work/spark/python/pyspark/sql/utils.py", line 36, in > deco > return f(*a, **kw) > File "/Users/davies/work/spark/python/lib/py4j/protocol.py", line 300, in > get_return_value > format(target_id, '.', name), value) > py4j.protocol.Py4JJavaError: An error occurred while calling o36.showString. > : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 > in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 > (TID 10, localhost): java.lang.UnsupportedOperationException: tail of empty > list > at scala.collection.immutable.Nil$.tail(List.scala:339) > at scala.collection.immutable.Nil$.tail(List.scala:334) > at scala.reflect.internal.SymbolTable.popPhase(SymbolTable.scala:172) > at scala.reflect.internal.Symbols$Symbol.typeParams(Symbols.scala:1491) > at > scala.reflect.internal.Types$NoArgsTypeRef.typeParams(Types.scala:2144) > at > scala.reflect.internal.Types$TypeRef.initializedTypeParams(Types.scala:2408) > at > scala.reflect.internal.Types$TypeRef.typeParamsMatchArgs(Types.scala:2409) > at > scala.reflect.internal.Types$AliasTypeRef$class.dealias(Types.scala:2232) > at > scala.reflect.internal.Types$TypeRef$$anon$3.dealias(Types.scala:2539) > at > scala.reflect.runtime.JavaMirrors$JavaMirror.typeToJavaClass(JavaMirrors.scala:1256) > at > scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:202) > at > scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:65) > at > org.apache.spark.sql.columnar.compression.DictionaryEncoding$Decoder.<init>(compressionSchemes.scala:277) > at > org.apache.spark.sql.columnar.compression.DictionaryEncoding$.decoder(compressionSchemes.scala:185) > at > org.apache.spark.sql.columnar.compression.DictionaryEncoding$.decoder(compressionSchemes.scala:177) > at > org.apache.spark.sql.columnar.compression.CompressibleColumnAccessor$class.initialize(CompressibleColumnAccessor.scala:31) > at > org.apache.spark.sql.columnar.NativeColumnAccessor.initialize(ColumnAccessor.scala:64) > at > org.apache.spark.sql.columnar.ColumnAccessor$class.$init$(ColumnAccessor.scala:33) > at > org.apache.spark.sql.columnar.BasicColumnAccessor.<init>(ColumnAccessor.scala:44) > at > org.apache.spark.sql.columnar.NativeColumnAccessor.<init>(ColumnAccessor.scala:64) > at > org.apache.spark.sql.columnar.StringColumnAccessor.<init>(ColumnAccessor.scala:92) > at > org.apache.spark.sql.columnar.ColumnAccessor$.apply(ColumnAccessor.scala:130) > at > org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anonfun$15.apply(InMemoryColumnarTableScan.scala:300) > at > org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anonfun$15.apply(InMemoryColumnarTableScan.scala:299) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.immutable.List.foreach(List.scala:318) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at > org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14.apply(InMemoryColumnarTableScan.scala:299) > at > org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14.apply(InMemoryColumnarTableScan.scala:297) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:173) > at > org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:146) > at > org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:126) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:88) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org