Zoltan Toth created SPARK-10487:
-----------------------------------

             Summary: MLlib model fitting causes DataFrame write to break with 
OutOfMemory exception
                 Key: SPARK-10487
                 URL: https://issues.apache.org/jira/browse/SPARK-10487
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 1.5.0, 1.5.1
         Environment: Tried in a centos-based 1-node YARN in docker and on a 
real-world CDH5 cluster

Spark 1.5.0-SNAPSHOT built for Hadoop 2.6.0 (I'm working with the latest 
nightly build)
Build flags: -Psparkr -Phadoop-2.6 -Phive -Phive-thriftserver -Pyarn 
-DzincPort=3034

I'm using the default resource setup
15/09/07 08:49:04 INFO yarn.YarnAllocator: Will request 2 executor containers, 
each with 1 cores and 1408 MB memory including 384 MB overhead
15/09/07 08:49:04 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: <memory:1408, vCores:1>)
15/09/07 08:49:04 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: <memory:1408, vCores:1>)

            Reporter: Zoltan Toth


After fitting a _spark.ml_ or _mllib model_ in *cluster* deploy mode, no 
dataframes can be written to hdfs. The driver receives an OutOfMemory exception 
during the writing. It seems, however, that the file gets written successfully.


 * This happens both in SparkR and pyspark
 * Only happens in cluster deploy mode
 * The write fails regardless the size of the dataframe and whether the 
dataframe is associated with the ml model.

REPRO:
{code}
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.ml.classification import LogisticRegression
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vector, Vectors

conf = SparkConf().setAppName("LogRegTest")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
sqlContext.setConf("park.sql.parquet.compression.codec", "uncompressed")

training = sc.parallelize((
  LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
  LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5))))
df = training.toDF()

reg = LogisticRegression().setMaxIter(10).setRegParam(0.01)
model = reg.fit(df)

# Note that this is a brand new dataframe:
one_df = sc.parallelize((
  LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
  LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5)))).toDF()

one_df.write.mode("overwrite").parquet("/tmp/df.parquet")
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to