[ 
https://issues.apache.org/jira/browse/SPARK-18492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16389761#comment-16389761
 ] 

Kazuaki Ishizaki commented on SPARK-18492:
------------------------------------------

[~imranshaik]
I ran the following program with master, then it works without throwing an 
exception. Could you please try your code with master?


{code:java}
  test("SPARK") {
    val truetradescast = spark.createDataFrame(sparkContext.parallelize(
      Row(1L, "abc", 2L, 3L, 2.2F, 3.3F, 4.4F, 1.1F, 12.3F, 10L, 
DateTimeUtils.toJavaTimestamp(10 * 1000)) :: Nil),
      StructType(Seq(
        StructField("Event_Time", LongType),
        StructField("Symbol", StringType),
        StructField("Kline_Start_Time", LongType),
        StructField("Kline_Close_Time", LongType),
        StructField("Open_Price", FloatType),
        StructField("Close_Price", FloatType),
        StructField("High_Price", FloatType),
        StructField("Low_Price", FloatType),
        StructField("Base_Asset_Volume", FloatType),
        StructField("Number_Of_Trades", LongType),
        StructField("TimeStamp", TimestampType)
      )))
    truetradescast.printSchema

    val stremingagg = truetradescast.groupBy($"Symbol",window($"TimeStamp","23 
minute","1 minute","0 
minute")).agg(mean($"Close_Price").as("SMA"),count($"Close_Price").as("Count"),stddev($"Close_Price").as("StdDev"),collect_list($"Close_Price").as("Raw_CP"))
    val slice = udf((array : Seq[Float], from : Int, to : Int) => 
array.slice(from,to))
    val stremingdec = stremingagg.withColumn("bb120",when($"Count" === 20 , 
slice($"Raw_CP",lit(0),lit(20))).when($"Count" === 21 , 
slice($"Raw_CP",lit(1),lit(21))).when($"Count" === 22 , 
slice($"Raw_CP",lit(2),lit(22))).when($"Count" === 23 , 
slice($"Raw_CP",lit(3),lit(23)))).withColumn("bb121",when($"Count" === 21 , 
slice($"Raw_CP",lit(0),lit(20))).when($"Count" === 22, 
slice($"Raw_CP",lit(1),lit(21))).when($"Count" === 
23,slice($"Raw_CP",lit(2),lit(22)))).withColumn("bb122",when($"Count" === 22 , 
slice($"Raw_CP",lit(0),lit(20))).when($"Count" === 23, 
slice($"Raw_CP",lit(1),lit(21))))
    val sma = udf((array : Seq[Double]) => array.sum / array.length)
    val stremingdec1 = stremingdec.withColumn("sma20", when($"Count" >= 
20,sma($"bb120"))).withColumn("sma21",when($"Count" >= 21, 
sma($"bb121"))).withColumn("sma22",when($"Count" >= 22, sma($"bb122")))
    val vrn = udf((x:Seq[Double],y:Double) => x.map(_.toDouble).map(a => 
math.pow(a - y, 2)).sum / x.size)
    val stremingdec2 = stremingdec1.withColumn("var20",when($"Count" >= 20, 
vrn($"bb120",$"sma20"))).withColumn("var21",when($"Count" >= 21, 
vrn($"bb121",$"sma21"))).withColumn("var22",when($"Count" >= 22, 
vrn($"bb122",$"sma22")))
    val stdev = udf((x:Double) => math.sqrt(x))
    val stremingdec3 = stremingdec2.withColumn("std20",when($"Count" >= 20, 
stdev($"var20"))).withColumn("std21",when($"Count" >= 21, 
stdev($"var21"))).withColumn("std22",when($"Count" >= 22, stdev($"var22")))

    stremingdec3.show
  }
{code}

> GeneratedIterator grows beyond 64 KB
> ------------------------------------
>
>                 Key: SPARK-18492
>                 URL: https://issues.apache.org/jira/browse/SPARK-18492
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.0.1
>         Environment: CentOS release 6.7 (Final)
>            Reporter: Norris Merritt
>            Priority: Major
>         Attachments: Screenshot from 2018-03-02 12-57-51.png
>
>
> spark-submit fails with ERROR CodeGenerator: failed to compile: 
> org.codehaus.janino.JaninoRuntimeException: Code of method 
> "(I[Lscala/collection/Iterator;)V" of class 
> "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" 
> grows beyond 64 KB
> Error message is followed by a huge dump of generated source code.
> The generated code declares 1,454 field sequences like the following:
> /* 036 */   private org.apache.spark.sql.catalyst.expressions.ScalaUDF 
> project_scalaUDF1;
> /* 037 */   private scala.Function1 project_catalystConverter1;
> /* 038 */   private scala.Function1 project_converter1;
> /* 039 */   private scala.Function1 project_converter2;
> /* 040 */   private scala.Function2 project_udf1;
>  .... (many omitted lines) ...
> /* 6089 */   private org.apache.spark.sql.catalyst.expressions.ScalaUDF 
> project_scalaUDF1454;
> /* 6090 */   private scala.Function1 project_catalystConverter1454;
> /* 6091 */   private scala.Function1 project_converter1695;
> /* 6092 */   private scala.Function1 project_udf1454;
> It then proceeds to emit code for several methods (init, processNext) each of 
> which has totally repetitive sequences of statements pertaining to each of 
> the sequences of variables declared in the class.  For example:
> /* 6101 */   public void init(int index, scala.collection.Iterator inputs[]) {
> The reason that the 64KB JVM limit for code for a method is exceeded is 
> because the code generator is using an incredibly naive strategy.  It emits a 
> sequence like the one shown below for each of the 1,454 groups of variables 
> shown above, in 
> /* 6132 */     this.project_udf = 
> (scala.Function1)project_scalaUDF.userDefinedFunc();
> /* 6133 */     this.project_scalaUDF1 = 
> (org.apache.spark.sql.catalyst.expressions.ScalaUDF) references[10];
> /* 6134 */     this.project_catalystConverter1 = 
> (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToCatalystConverter(project_scalaUDF1.dataType());
> /* 6135 */     this.project_converter1 = 
> (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToScalaConverter(((org.apache.spark.sql.catalyst.expressions.Expression)(((org.apache.spark.sql.catalyst.expressions.ScalaUDF)references[10]).getChildren().apply(0))).dataType());
> /* 6136 */     this.project_converter2 = 
> (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToScalaConverter(((org.apache.spark.sql.catalyst.expressions.Expression)(((org.apache.spark.sql.catalyst.expressions.ScalaUDF)references[10]).getChildren().apply(1))).dataType());
> It blows up after emitting 230 such sequences, while trying to emit the 231st:
> /* 7282 */     this.project_udf230 = 
> (scala.Function2)project_scalaUDF230.userDefinedFunc();
> /* 7283 */     this.project_scalaUDF231 = 
> (org.apache.spark.sql.catalyst.expressions.ScalaUDF) references[240];
> /* 7284 */     this.project_catalystConverter231 = 
> (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToCatalystConverter(project_scalaUDF231.dataType());
>  .... many omitted lines ...
>  Example of repetitive code sequences emitted for processNext method:
> /* 12253 */       boolean project_isNull247 = project_result244 == null;
> /* 12254 */       MapData project_value247 = null;
> /* 12255 */       if (!project_isNull247) {
> /* 12256 */         project_value247 = project_result244;
> /* 12257 */       }
> /* 12258 */       Object project_arg = sort_isNull5 ? null : 
> project_converter489.apply(sort_value5);
> /* 12259 */
> /* 12260 */       ArrayData project_result249 = null;
> /* 12261 */       try {
> /* 12262 */         project_result249 = 
> (ArrayData)project_catalystConverter248.apply(project_udf248.apply(project_arg));
> /* 12263 */       } catch (Exception e) {
> /* 12264 */         throw new 
> org.apache.spark.SparkException(project_scalaUDF248.udfErrorMessage(), e);
> /* 12265 */       }
> /* 12266 */
> /* 12267 */       boolean project_isNull252 = project_result249 == null;
> /* 12268 */       ArrayData project_value252 = null;
> /* 12269 */       if (!project_isNull252) {
> /* 12270 */         project_value252 = project_result249;
> /* 12271 */       }
> /* 12272 */       Object project_arg1 = project_isNull252 ? null : 
> project_converter488.apply(project_value252);
> /* 12273 */
> /* 12274 */       ArrayData project_result248 = null;
> /* 12275 */       try {
> /* 12276 */         project_result248 = 
> (ArrayData)project_catalystConverter247.apply(project_udf247.apply(project_arg1));
> /* 12277 */       } catch (Exception e) {
> /* 12278 */         throw new 
> org.apache.spark.SparkException(project_scalaUDF247.udfErrorMessage(), e);
> /* 12279 */       }
> /* 12280 */
> /* 12281 */       boolean project_isNull251 = project_result248 == null;
> /* 12282 */       ArrayData project_value251 = null;
> /* 12283 */       if (!project_isNull251) {
> /* 12284 */         project_value251 = project_result248;
> /* 12285 */       }
> /* 12286 */       Object project_arg2 = project_isNull251 ? null : 
> project_converter487.apply(project_value251);
> /* 12287 */
> /* 12288 */       InternalRow project_result247 = null;
> /* 12289 */       try {
> /* 12290 */         project_result247 = 
> (InternalRow)project_catalystConverter246.apply(project_udf246.apply(project_arg2));
> /* 12291 */       } catch (Exception e) {
> /* 12292 */         throw new 
> org.apache.spark.SparkException(project_scalaUDF246.udfErrorMessage(), e);
> /* 12293 */       }
> /* 12294 */
> /* 12295 */       boolean project_isNull250 = project_result247 == null;
> /* 12296 */       InternalRow project_value250 = null;
> /* 12297 */       if (!project_isNull250) {
> /* 12298 */         project_value250 = project_result247;
> /* 12299 */       }
> /* 12300 */       Object project_arg3 = project_isNull250 ? null : 
> project_converter486.apply(project_value250);
> /* 12301 */
> /* 12302 */       InternalRow project_result246 = null;
> /* 12303 */       try {
> /* 12304 */         project_result246 = 
> (InternalRow)project_catalystConverter245.apply(project_udf245.apply(project_arg3));
> /* 12305 */       } catch (Exception e) {
> /* 12306 */         throw new 
> org.apache.spark.SparkException(project_scalaUDF245.udfErrorMessage(), e);
> /* 12307 */       }
> /* 12308 */
> It is pretty clear that the code generation strategy is naive. The code 
> generator should use arrays and loops instead of emitting all these 
> repetitive code sequences which only differ by a few numerical digits used to 
> generate the name of the variables.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to