[ https://issues.apache.org/jira/browse/SPARK-10038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-10038: ------------------------------------ Assignee: Davies Liu (was: Apache Spark) > TungstenProject code generation fails when applied to array<binary> > ------------------------------------------------------------------- > > Key: SPARK-10038 > URL: https://issues.apache.org/jira/browse/SPARK-10038 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 1.5.0 > Reporter: Josh Rosen > Assignee: Davies Liu > Priority: Blocker > > During fuzz testing, I discovered that TungstenProject can crash when applied > to schemas that contain {{array<binary>}} columns. As a minimal example, the > following code crashes in spark-shell: > {code} > sc.parallelize(Seq((Array(Array[Byte](1)), 1))).toDF.select("_1").rdd.count() > {code} > Here's the stacktrace: > {code} > 15/08/16 17:11:49 ERROR Executor: Exception in task 3.0 in stage 29.0 (TID > 144) > java.util.concurrent.ExecutionException: java.lang.Exception: failed to > compile: org.codehaus.commons.compiler.CompileException: Line 53, Column 63: > '{' expected instead of '[' > public Object generate(org.apache.spark.sql.catalyst.expressions.Expression[] > exprs) { > return new SpecificUnsafeProjection(exprs); > } > class SpecificUnsafeProjection extends > org.apache.spark.sql.catalyst.expressions.UnsafeProjection { > private org.apache.spark.sql.catalyst.expressions.Expression[] expressions; > private UnsafeRow convertedStruct2; > private byte[] buffer3; > private int cursor4; > private UnsafeArrayData convertedArray6; > private byte[] buffer7; > public > SpecificUnsafeProjection(org.apache.spark.sql.catalyst.expressions.Expression[] > expressions) { > this.expressions = expressions; > this.convertedStruct2 = new UnsafeRow(); > this.buffer3 = new byte[16]; > this.cursor4 = 0; > convertedArray6 = new UnsafeArrayData(); > buffer7 = new byte[64]; > } > // Scala.Function1 need this > public Object apply(Object row) { > return apply((InternalRow) row); > } > public UnsafeRow apply(InternalRow i) { > cursor4 = 16; > convertedStruct2.pointTo(buffer3, Platform.BYTE_ARRAY_OFFSET, 1, cursor4); > /* input[0, ArrayType(BinaryType,true)] */ > boolean isNull0 = i.isNullAt(0); > ArrayData primitive1 = isNull0 ? null : (i.getArray(0)); > final boolean isNull8 = isNull0; > if (!isNull8) { > final ArrayData tmp9 = primitive1; > if (tmp9 instanceof UnsafeArrayData) { > convertedArray6 = (UnsafeArrayData) tmp9; > } else { > final int numElements10 = tmp9.numElements(); > final int fixedSize11 = 4 * numElements10; > int numBytes12 = fixedSize11; > final byte[][] elements13 = new byte[][numElements10]; > for (int index15 = 0; index15 < numElements10; index15++) { > if (!tmp9.isNullAt(index15)) { > elements13[index15] = tmp9.getBinary(index15); > numBytes12 += > org.apache.spark.sql.catalyst.expressions.UnsafeWriters$BinaryWriter.getSize(elements13[index15]); > } > } > if (numBytes12 > buffer7.length) { > buffer7 = new byte[numBytes12]; > } > int cursor14 = fixedSize11; > for (int index15 = 0; index15 < numElements10; index15++) { > if (elements13[index15] == null) { > // If element is null, write the negative value address into > offset region. > Platform.putInt(buffer7, Platform.BYTE_ARRAY_OFFSET + 4 * > index15, -cursor14); > } else { > Platform.putInt(buffer7, Platform.BYTE_ARRAY_OFFSET + 4 * > index15, cursor14); > cursor14 += > org.apache.spark.sql.catalyst.expressions.UnsafeWriters$BinaryWriter.write( > buffer7, > Platform.BYTE_ARRAY_OFFSET + cursor14, > elements13[index15]); > } > } > convertedArray6.pointTo( > buffer7, > Platform.BYTE_ARRAY_OFFSET, > numElements10, > numBytes12); > } > } > int numBytes16 = cursor4 + (isNull8 ? 0 : > org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters$ArrayWriter.getSize(convertedArray6)); > if (buffer3.length < numBytes16) { > // This will not happen frequently, because the buffer is re-used. > byte[] tmpBuffer5 = new byte[numBytes16 * 2]; > Platform.copyMemory(buffer3, Platform.BYTE_ARRAY_OFFSET, > tmpBuffer5, Platform.BYTE_ARRAY_OFFSET, buffer3.length); > buffer3 = tmpBuffer5; > } > convertedStruct2.pointTo(buffer3, Platform.BYTE_ARRAY_OFFSET, 1, > numBytes16); > if (isNull8) { > convertedStruct2.setNullAt(0); > } else { > cursor4 += > org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters$ArrayWriter.write(convertedStruct2, > 0, cursor4, convertedArray6); > } > return convertedStruct2; > } > } > at > com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306) > at > com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293) > at > com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) > at > com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135) > at > com.google.common.cache.LocalCache$LoadingValueReference.waitForValue(LocalCache.java:3620) > at > com.google.common.cache.LocalCache$Segment.waitForLoadingValue(LocalCache.java:2362) > at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2251) > at com.google.common.cache.LocalCache.get(LocalCache.java:4000) > at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4004) > at > com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874) > at > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.compile(CodeGenerator.scala:362) > at > org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:469) > at > org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:32) > at > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:425) > at > org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:124) > at > org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:134) > at > org.apache.spark.sql.execution.TungstenProject$$anonfun$2.apply(basicOperators.scala:85) > at > org.apache.spark.sql.execution.TungstenProject$$anonfun$2.apply(basicOperators.scala:80) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:88) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > {code} > Here's the {{explain}} output: > {code} > scala> sc.parallelize(Seq((Array(Array[Byte](1)), > 1))).toDF.select("_1").explain(true) > == Parsed Logical Plan == > 'Project [unresolvedalias('_1)] > LogicalRDD [_1#161,_2#162], MapPartitionsRDD[187] at rddToDataFrameHolder at > <console>:22 > == Analyzed Logical Plan == > _1: array<binary> > Project [_1#161] > LogicalRDD [_1#161,_2#162], MapPartitionsRDD[187] at rddToDataFrameHolder at > <console>:22 > == Optimized Logical Plan == > Project [_1#161] > LogicalRDD [_1#161,_2#162], MapPartitionsRDD[187] at rddToDataFrameHolder at > <console>:22 > == Physical Plan == > TungstenProject [_1#161] > Scan PhysicalRDD[_1#161,_2#162] > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org