[ https://issues.apache.org/jira/browse/SPARK-21828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16141138#comment-16141138 ]
Otis Smart commented on SPARK-21828: ------------------------------------ Hi KI: I thank you for the expedient reply! * Here (below text) is example code that generates the error in PySpark 2.1. * Please forgive me...I initially inadvertently applied this code on a Spark 2.1 (rather than Spark 2.2) cluster; but I moments ago began a test on a Spark 2.2 cluster (definitely this time). Nonetheless, a troubleshoot + investigation of the aforementioned error may aid others on Spark 2.1 if my ongoing test yields no error in Pyspark 2.2. Gratefully + Best, OS # OTIS SMART: 24.08.2017 (https://issues.apache.org/jira/browse/SPARK-21828) # ---------------------------------------------------------------------------------------------------------------------- # # ---------------------------------------------------------------------------------------------------------------------- # from pyspark import SparkConf, SparkContext # from pyspark.sql import HiveContext from pyspark.sql.functions import col, lit # from pyspark.ml import Pipeline from pyspark.ml.classification import DecisionTreeClassifier from pyspark.ml.evaluation import MulticlassClassificationEvaluator from pyspark.ml.feature import StringIndexer, VectorAssembler from pyspark.ml.tuning import CrossValidator, ParamGridBuilder # from pyspark.mllib.random import RandomRDDs as randrdd # ---------------------------------------------------------------------------------------------------------------------- # # ---------------------------------------------------------------------------------------------------------------------- # def computeDT(df): # namevAll = df.columns namevP = namevAll[:-1] namevC = namevAll[-1] # training_data, testing_data = df.randomSplit([0.70, 0.30]) print "done: randomSplit" # variableoutStringIndexer = StringIndexer(inputCol=namevC, outputCol='variableout') print "done: StringIndexer" # variablesinAssembler = VectorAssembler(inputCols=namevP, outputCol='variablesin') print "done: VectorAssembler" # dTree = DecisionTreeClassifier(labelCol='variableout', featuresCol='variablesin', impurity='gini', maxBins=3) print "done: DecisionTreeClassifier" # pipeline = Pipeline(stages=[variableoutStringIndexer, variablesinAssembler, dTree]) print "done: Pipeline" # paramgrid = ParamGridBuilder().addGrid(dTree.maxDepth, [3, 7, 8]).build() print "done: ParamGridBuilder" # evaluator = MulticlassClassificationEvaluator(labelCol='variableout', predictionCol='prediction', metricName="f1") print "done: MulticlassClassificationEvaluator" # CrossValidator(estimator=pipeline, estimatorParamMaps=paramgrid, evaluator=evaluator, numFolds=10).fit(training_data) print "done: CrossValidatorModel" # return True # ---------------------------------------------------------------------------------------------------------------------- # # ---------------------------------------------------------------------------------------------------------------------- # def main(): # numcols = 1234 numrows = 23456 # x = randrdd.normalVectorRDD(hiveContext, numrows, numcols, seed=1) y = randrdd.normalVectorRDD(hiveContext, numrows, numcols, seed=2) # dfx = HiveContext.createDataFrame(hiveContext, x.map(lambda v: tuple([float(ii) for ii in v])).collect(), ["v{0:0>4}".format(jj) for jj in range(0, numcols)]) # dfy = HiveContext.createDataFrame(hiveContext, y.map(lambda v: tuple([float(ii) for ii in v])).collect(), ["v{0:0>4}".format(jj) for jj in range(0, numcols)]) # dfx = dfx.withColumn("v{0:0>4}".format(numcols), dfx["v{0:0>4}".format(numcols - 1)] + 5).withColumn("V", lit('a')) dfy = dfy.withColumn("v{0:0>4}".format(numcols), dfy["v{0:0>4}".format(numcols - 1)] - 5).withColumn("V", lit('b')) # df = dfx.union(dfy) # df.cache() # df.printSchema() df.show(n=10, truncate=False) # computeDT(df) # df.unpersist() # return True # ---------------------------------------------------------------------------------------------------------------------- # CONFIGURE SPARK CONTEXT THEN RUN 'MAIN' ANALYSIS # ---------------------------------------------------------------------------------------------------------------------- # if __name__ == "__main__": # conf = SparkConf().setAppName("TroubleshootPyspark.ASFJira21828.Otis+Kazuaki"). \ set("spark.sql.tungsten.enabled", "false") # sc = SparkContext(conf=conf) # hiveContext = HiveContext(sc) # main() > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificOrdering" > grows beyond 64 KB...again > ----------------------------------------------------------------------------------------------------- > > Key: SPARK-21828 > URL: https://issues.apache.org/jira/browse/SPARK-21828 > Project: Spark > Issue Type: Bug > Components: ML, SQL > Affects Versions: 2.1.0 > Reporter: Otis Smart > Priority: Critical > > Hello! > 1. I encounter a similar issue (see below text) on Pyspark 2.2 (e.g., > dataframe with ~50000 rows x 1100+ columns as input to ".fit()" method of > CrossValidator() that includes Pipeline() that includes StringIndexer(), > VectorAssembler() and DecisionTreeClassifier()). > 2. Was the aforementioned patch (aka > fix(https://github.com/apache/spark/pull/15480) not included in the latest > release; what are the reason and (source) of and solution to this persistent > issue please? > py4j.protocol.Py4JJavaError: An error occurred while calling o9396.fit. > : org.apache.spark.SparkException: Job aborted due to stage failure: Task 38 > in stage 18.0 failed 4 times, most recent failure: Lost task 38.3 in stage > 18.0 (TID 1996, ip-10-0-14-83.ec2.internal, executor 4): > java.util.concurrent.ExecutionException: java.lang.Exception: failed to > compile: org.codehaus.janino.JaninoRuntimeException: Code of method > "compare(Lorg/apache/spark/sql/catalyst/InternalRow;Lorg/apache/spark/sql/catalyst/InternalRow;)I" > of class > "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificOrdering" > grows beyond 64 KB > /* 001 */ public SpecificOrdering generate(Object[] references) > { /* 002 */ return new SpecificOrdering(references); /* 003 */ } > /* 004 */ > /* 005 */ class SpecificOrdering extends > org.apache.spark.sql.catalyst.expressions.codegen.BaseOrdering { > /* 006 */ > /* 007 */ private Object[] references; > /* 008 */ > /* 009 */ > /* 010 */ public SpecificOrdering(Object[] references) > { /* 011 */ this.references = references; /* 012 */ /* 013 */ } > /* 014 */ > /* 015 */ > /* 016 */ > /* 017 */ public int compare(InternalRow a, InternalRow b) { > /* 018 */ InternalRow i = null; // Holds current row being evaluated. > /* 019 */ > /* 020 */ i = a; > /* 021 */ boolean isNullA; > /* 022 */ double primitiveA; > /* 023 */ > { /* 024 */ /* 025 */ double value = i.getDouble(0); /* 026 */ isNullA = > false; /* 027 */ primitiveA = value; /* 028 */ } > /* 029 */ i = b; > /* 030 */ boolean isNullB; > /* 031 */ double primitiveB; > /* 032 */ > { /* 033 */ /* 034 */ double value = i.getDouble(0); /* 035 */ isNullB = > false; /* 036 */ primitiveB = value; /* 037 */ } > /* 038 */ if (isNullA && isNullB) > { /* 039 */ // Nothing /* 040 */ } > else if (isNullA) > { /* 041 */ return -1; /* 042 */ } > else if (isNullB) > { /* 043 */ return 1; /* 044 */ } > else { > /* 045 */ int comp = > org.apache.spark.util.Utils.nanSafeCompareDoubles(primitiveA, primitiveB); > /* 046 */ if (comp != 0) > { /* 047 */ return comp; /* 048 */ } > /* 049 */ } > /* 050 */ > /* 051 */ > ... -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org