[ 
https://issues.apache.org/jira/browse/SPARK-21828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16141138#comment-16141138
 ] 

Otis Smart commented on SPARK-21828:
------------------------------------

Hi KI: I thank you for the expedient reply!
* Here (below text) is example code that generates the error in PySpark 2.1.
* Please forgive me...I initially inadvertently applied this code on a Spark 
2.1 (rather than Spark 2.2) cluster; but I moments ago began a test on a Spark 
2.2 cluster (definitely this time).  Nonetheless, a troubleshoot + 
investigation of the aforementioned error may aid others on Spark 2.1 if my 
ongoing test yields no error in Pyspark 2.2.

Gratefully + Best,

OS


# OTIS SMART: 24.08.2017 (https://issues.apache.org/jira/browse/SPARK-21828)


# 
----------------------------------------------------------------------------------------------------------------------
#
# 
----------------------------------------------------------------------------------------------------------------------

#
from pyspark import SparkConf, SparkContext

#
from pyspark.sql import HiveContext
from pyspark.sql.functions import col, lit

#
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

#
from pyspark.mllib.random import RandomRDDs as randrdd


# 
----------------------------------------------------------------------------------------------------------------------
#
# 
----------------------------------------------------------------------------------------------------------------------

# 
def computeDT(df):

    #
    namevAll = df.columns
    namevP = namevAll[:-1]
    namevC = namevAll[-1]

    #
    training_data, testing_data = df.randomSplit([0.70, 0.30])
    print "done: randomSplit"

    # 
    variableoutStringIndexer = StringIndexer(inputCol=namevC, 
outputCol='variableout')
    print "done: StringIndexer"

    # 
    variablesinAssembler = VectorAssembler(inputCols=namevP, 
outputCol='variablesin')
    print "done: VectorAssembler"

    # 
    dTree = DecisionTreeClassifier(labelCol='variableout',
                                   featuresCol='variablesin',
                                   impurity='gini',
                                   maxBins=3)
    print "done: DecisionTreeClassifier"

    # 
    pipeline = Pipeline(stages=[variableoutStringIndexer, variablesinAssembler, 
dTree])
    print "done: Pipeline"

    # 
    paramgrid = ParamGridBuilder().addGrid(dTree.maxDepth, [3, 7, 8]).build()
    print "done: ParamGridBuilder"

    #
    evaluator = MulticlassClassificationEvaluator(labelCol='variableout',
                                                  predictionCol='prediction', 
metricName="f1")
    print "done: MulticlassClassificationEvaluator"

    #
    CrossValidator(estimator=pipeline,
                              estimatorParamMaps=paramgrid,
                              evaluator=evaluator,
                              numFolds=10).fit(training_data)
    print "done: CrossValidatorModel"


    #
    return True


# 
----------------------------------------------------------------------------------------------------------------------
#
# 
----------------------------------------------------------------------------------------------------------------------

#
def main():

    #
    numcols = 1234
    numrows = 23456

    #
    x = randrdd.normalVectorRDD(hiveContext, numrows, numcols, seed=1)
    y = randrdd.normalVectorRDD(hiveContext, numrows, numcols, seed=2)

    #
    dfx = HiveContext.createDataFrame(hiveContext,
        x.map(lambda v: tuple([float(ii) for ii in v])).collect(), 
["v{0:0>4}".format(jj) for jj in range(0, numcols)])

    #
    dfy = HiveContext.createDataFrame(hiveContext,
        y.map(lambda v: tuple([float(ii) for ii in v])).collect(), 
["v{0:0>4}".format(jj) for jj in range(0, numcols)])

    #
    dfx = dfx.withColumn("v{0:0>4}".format(numcols), 
dfx["v{0:0>4}".format(numcols - 1)] + 5).withColumn("V", lit('a'))
    dfy = dfy.withColumn("v{0:0>4}".format(numcols), 
dfy["v{0:0>4}".format(numcols - 1)] - 5).withColumn("V", lit('b'))

    #
    df = dfx.union(dfy)

    #
    df.cache()

    #
    df.printSchema()
    df.show(n=10, truncate=False)

    #
    computeDT(df)

    #
    df.unpersist()

    #
    return True


# 
----------------------------------------------------------------------------------------------------------------------
# CONFIGURE SPARK CONTEXT THEN RUN 'MAIN' ANALYSIS
# 
----------------------------------------------------------------------------------------------------------------------

#
if __name__ == "__main__":

    #
    conf = 
SparkConf().setAppName("TroubleshootPyspark.ASFJira21828.Otis+Kazuaki"). \
        set("spark.sql.tungsten.enabled", "false")

    #
    sc = SparkContext(conf=conf)

    #
    hiveContext = HiveContext(sc)

    #
    main()


> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificOrdering" 
> grows beyond 64 KB...again
> -----------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-21828
>                 URL: https://issues.apache.org/jira/browse/SPARK-21828
>             Project: Spark
>          Issue Type: Bug
>          Components: ML, SQL
>    Affects Versions: 2.1.0
>            Reporter: Otis Smart
>            Priority: Critical
>
> Hello!
> 1. I encounter a similar issue (see below text) on Pyspark 2.2 (e.g., 
> dataframe with ~50000 rows x 1100+ columns as input to ".fit()" method of 
> CrossValidator() that includes Pipeline() that includes StringIndexer(), 
> VectorAssembler() and DecisionTreeClassifier()).
> 2. Was the aforementioned patch (aka 
> fix(https://github.com/apache/spark/pull/15480) not included in the latest 
> release; what are the reason and (source) of and solution to this persistent 
> issue please?
> py4j.protocol.Py4JJavaError: An error occurred while calling o9396.fit.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 38 
> in stage 18.0 failed 4 times, most recent failure: Lost task 38.3 in stage 
> 18.0 (TID 1996, ip-10-0-14-83.ec2.internal, executor 4): 
> java.util.concurrent.ExecutionException: java.lang.Exception: failed to 
> compile: org.codehaus.janino.JaninoRuntimeException: Code of method 
> "compare(Lorg/apache/spark/sql/catalyst/InternalRow;Lorg/apache/spark/sql/catalyst/InternalRow;)I"
>  of class 
> "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificOrdering" 
> grows beyond 64 KB
> /* 001 */ public SpecificOrdering generate(Object[] references)
> { /* 002 */ return new SpecificOrdering(references); /* 003 */ }
> /* 004 */
> /* 005 */ class SpecificOrdering extends 
> org.apache.spark.sql.catalyst.expressions.codegen.BaseOrdering {
> /* 006 */
> /* 007 */ private Object[] references;
> /* 008 */
> /* 009 */
> /* 010 */ public SpecificOrdering(Object[] references)
> { /* 011 */ this.references = references; /* 012 */ /* 013 */ }
> /* 014 */
> /* 015 */
> /* 016 */
> /* 017 */ public int compare(InternalRow a, InternalRow b) {
> /* 018 */ InternalRow i = null; // Holds current row being evaluated.
> /* 019 */
> /* 020 */ i = a;
> /* 021 */ boolean isNullA;
> /* 022 */ double primitiveA;
> /* 023 */
> { /* 024 */ /* 025 */ double value = i.getDouble(0); /* 026 */ isNullA = 
> false; /* 027 */ primitiveA = value; /* 028 */ }
> /* 029 */ i = b;
> /* 030 */ boolean isNullB;
> /* 031 */ double primitiveB;
> /* 032 */
> { /* 033 */ /* 034 */ double value = i.getDouble(0); /* 035 */ isNullB = 
> false; /* 036 */ primitiveB = value; /* 037 */ }
> /* 038 */ if (isNullA && isNullB)
> { /* 039 */ // Nothing /* 040 */ }
> else if (isNullA)
> { /* 041 */ return -1; /* 042 */ }
> else if (isNullB)
> { /* 043 */ return 1; /* 044 */ }
> else {
> /* 045 */ int comp = 
> org.apache.spark.util.Utils.nanSafeCompareDoubles(primitiveA, primitiveB);
> /* 046 */ if (comp != 0)
> { /* 047 */ return comp; /* 048 */ }
> /* 049 */ }
> /* 050 */
> /* 051 */
> ...



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to