[ https://issues.apache.org/jira/browse/SPARK-18492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16389761#comment-16389761 ]
Kazuaki Ishizaki commented on SPARK-18492: ------------------------------------------ [~imranshaik] I ran the following program with master, then it works without throwing an exception. Could you please try your code with master? {code:java} test("SPARK") { val truetradescast = spark.createDataFrame(sparkContext.parallelize( Row(1L, "abc", 2L, 3L, 2.2F, 3.3F, 4.4F, 1.1F, 12.3F, 10L, DateTimeUtils.toJavaTimestamp(10 * 1000)) :: Nil), StructType(Seq( StructField("Event_Time", LongType), StructField("Symbol", StringType), StructField("Kline_Start_Time", LongType), StructField("Kline_Close_Time", LongType), StructField("Open_Price", FloatType), StructField("Close_Price", FloatType), StructField("High_Price", FloatType), StructField("Low_Price", FloatType), StructField("Base_Asset_Volume", FloatType), StructField("Number_Of_Trades", LongType), StructField("TimeStamp", TimestampType) ))) truetradescast.printSchema val stremingagg = truetradescast.groupBy($"Symbol",window($"TimeStamp","23 minute","1 minute","0 minute")).agg(mean($"Close_Price").as("SMA"),count($"Close_Price").as("Count"),stddev($"Close_Price").as("StdDev"),collect_list($"Close_Price").as("Raw_CP")) val slice = udf((array : Seq[Float], from : Int, to : Int) => array.slice(from,to)) val stremingdec = stremingagg.withColumn("bb120",when($"Count" === 20 , slice($"Raw_CP",lit(0),lit(20))).when($"Count" === 21 , slice($"Raw_CP",lit(1),lit(21))).when($"Count" === 22 , slice($"Raw_CP",lit(2),lit(22))).when($"Count" === 23 , slice($"Raw_CP",lit(3),lit(23)))).withColumn("bb121",when($"Count" === 21 , slice($"Raw_CP",lit(0),lit(20))).when($"Count" === 22, slice($"Raw_CP",lit(1),lit(21))).when($"Count" === 23,slice($"Raw_CP",lit(2),lit(22)))).withColumn("bb122",when($"Count" === 22 , slice($"Raw_CP",lit(0),lit(20))).when($"Count" === 23, slice($"Raw_CP",lit(1),lit(21)))) val sma = udf((array : Seq[Double]) => array.sum / array.length) val stremingdec1 = stremingdec.withColumn("sma20", when($"Count" >= 20,sma($"bb120"))).withColumn("sma21",when($"Count" >= 21, sma($"bb121"))).withColumn("sma22",when($"Count" >= 22, sma($"bb122"))) val vrn = udf((x:Seq[Double],y:Double) => x.map(_.toDouble).map(a => math.pow(a - y, 2)).sum / x.size) val stremingdec2 = stremingdec1.withColumn("var20",when($"Count" >= 20, vrn($"bb120",$"sma20"))).withColumn("var21",when($"Count" >= 21, vrn($"bb121",$"sma21"))).withColumn("var22",when($"Count" >= 22, vrn($"bb122",$"sma22"))) val stdev = udf((x:Double) => math.sqrt(x)) val stremingdec3 = stremingdec2.withColumn("std20",when($"Count" >= 20, stdev($"var20"))).withColumn("std21",when($"Count" >= 21, stdev($"var21"))).withColumn("std22",when($"Count" >= 22, stdev($"var22"))) stremingdec3.show } {code} > GeneratedIterator grows beyond 64 KB > ------------------------------------ > > Key: SPARK-18492 > URL: https://issues.apache.org/jira/browse/SPARK-18492 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.0.1 > Environment: CentOS release 6.7 (Final) > Reporter: Norris Merritt > Priority: Major > Attachments: Screenshot from 2018-03-02 12-57-51.png > > > spark-submit fails with ERROR CodeGenerator: failed to compile: > org.codehaus.janino.JaninoRuntimeException: Code of method > "(I[Lscala/collection/Iterator;)V" of class > "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" > grows beyond 64 KB > Error message is followed by a huge dump of generated source code. > The generated code declares 1,454 field sequences like the following: > /* 036 */ private org.apache.spark.sql.catalyst.expressions.ScalaUDF > project_scalaUDF1; > /* 037 */ private scala.Function1 project_catalystConverter1; > /* 038 */ private scala.Function1 project_converter1; > /* 039 */ private scala.Function1 project_converter2; > /* 040 */ private scala.Function2 project_udf1; > .... (many omitted lines) ... > /* 6089 */ private org.apache.spark.sql.catalyst.expressions.ScalaUDF > project_scalaUDF1454; > /* 6090 */ private scala.Function1 project_catalystConverter1454; > /* 6091 */ private scala.Function1 project_converter1695; > /* 6092 */ private scala.Function1 project_udf1454; > It then proceeds to emit code for several methods (init, processNext) each of > which has totally repetitive sequences of statements pertaining to each of > the sequences of variables declared in the class. For example: > /* 6101 */ public void init(int index, scala.collection.Iterator inputs[]) { > The reason that the 64KB JVM limit for code for a method is exceeded is > because the code generator is using an incredibly naive strategy. It emits a > sequence like the one shown below for each of the 1,454 groups of variables > shown above, in > /* 6132 */ this.project_udf = > (scala.Function1)project_scalaUDF.userDefinedFunc(); > /* 6133 */ this.project_scalaUDF1 = > (org.apache.spark.sql.catalyst.expressions.ScalaUDF) references[10]; > /* 6134 */ this.project_catalystConverter1 = > (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToCatalystConverter(project_scalaUDF1.dataType()); > /* 6135 */ this.project_converter1 = > (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToScalaConverter(((org.apache.spark.sql.catalyst.expressions.Expression)(((org.apache.spark.sql.catalyst.expressions.ScalaUDF)references[10]).getChildren().apply(0))).dataType()); > /* 6136 */ this.project_converter2 = > (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToScalaConverter(((org.apache.spark.sql.catalyst.expressions.Expression)(((org.apache.spark.sql.catalyst.expressions.ScalaUDF)references[10]).getChildren().apply(1))).dataType()); > It blows up after emitting 230 such sequences, while trying to emit the 231st: > /* 7282 */ this.project_udf230 = > (scala.Function2)project_scalaUDF230.userDefinedFunc(); > /* 7283 */ this.project_scalaUDF231 = > (org.apache.spark.sql.catalyst.expressions.ScalaUDF) references[240]; > /* 7284 */ this.project_catalystConverter231 = > (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToCatalystConverter(project_scalaUDF231.dataType()); > .... many omitted lines ... > Example of repetitive code sequences emitted for processNext method: > /* 12253 */ boolean project_isNull247 = project_result244 == null; > /* 12254 */ MapData project_value247 = null; > /* 12255 */ if (!project_isNull247) { > /* 12256 */ project_value247 = project_result244; > /* 12257 */ } > /* 12258 */ Object project_arg = sort_isNull5 ? null : > project_converter489.apply(sort_value5); > /* 12259 */ > /* 12260 */ ArrayData project_result249 = null; > /* 12261 */ try { > /* 12262 */ project_result249 = > (ArrayData)project_catalystConverter248.apply(project_udf248.apply(project_arg)); > /* 12263 */ } catch (Exception e) { > /* 12264 */ throw new > org.apache.spark.SparkException(project_scalaUDF248.udfErrorMessage(), e); > /* 12265 */ } > /* 12266 */ > /* 12267 */ boolean project_isNull252 = project_result249 == null; > /* 12268 */ ArrayData project_value252 = null; > /* 12269 */ if (!project_isNull252) { > /* 12270 */ project_value252 = project_result249; > /* 12271 */ } > /* 12272 */ Object project_arg1 = project_isNull252 ? null : > project_converter488.apply(project_value252); > /* 12273 */ > /* 12274 */ ArrayData project_result248 = null; > /* 12275 */ try { > /* 12276 */ project_result248 = > (ArrayData)project_catalystConverter247.apply(project_udf247.apply(project_arg1)); > /* 12277 */ } catch (Exception e) { > /* 12278 */ throw new > org.apache.spark.SparkException(project_scalaUDF247.udfErrorMessage(), e); > /* 12279 */ } > /* 12280 */ > /* 12281 */ boolean project_isNull251 = project_result248 == null; > /* 12282 */ ArrayData project_value251 = null; > /* 12283 */ if (!project_isNull251) { > /* 12284 */ project_value251 = project_result248; > /* 12285 */ } > /* 12286 */ Object project_arg2 = project_isNull251 ? null : > project_converter487.apply(project_value251); > /* 12287 */ > /* 12288 */ InternalRow project_result247 = null; > /* 12289 */ try { > /* 12290 */ project_result247 = > (InternalRow)project_catalystConverter246.apply(project_udf246.apply(project_arg2)); > /* 12291 */ } catch (Exception e) { > /* 12292 */ throw new > org.apache.spark.SparkException(project_scalaUDF246.udfErrorMessage(), e); > /* 12293 */ } > /* 12294 */ > /* 12295 */ boolean project_isNull250 = project_result247 == null; > /* 12296 */ InternalRow project_value250 = null; > /* 12297 */ if (!project_isNull250) { > /* 12298 */ project_value250 = project_result247; > /* 12299 */ } > /* 12300 */ Object project_arg3 = project_isNull250 ? null : > project_converter486.apply(project_value250); > /* 12301 */ > /* 12302 */ InternalRow project_result246 = null; > /* 12303 */ try { > /* 12304 */ project_result246 = > (InternalRow)project_catalystConverter245.apply(project_udf245.apply(project_arg3)); > /* 12305 */ } catch (Exception e) { > /* 12306 */ throw new > org.apache.spark.SparkException(project_scalaUDF245.udfErrorMessage(), e); > /* 12307 */ } > /* 12308 */ > It is pretty clear that the code generation strategy is naive. The code > generator should use arrays and loops instead of emitting all these > repetitive code sequences which only differ by a few numerical digits used to > generate the name of the variables. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org