Zoltan Toth created SPARK-10487: ----------------------------------- Summary: MLlib model fitting causes DataFrame write to break with OutOfMemory exception Key: SPARK-10487 URL: https://issues.apache.org/jira/browse/SPARK-10487 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.5.0, 1.5.1 Environment: Tried in a centos-based 1-node YARN in docker and on a real-world CDH5 cluster
Spark 1.5.0-SNAPSHOT built for Hadoop 2.6.0 (I'm working with the latest nightly build) Build flags: -Psparkr -Phadoop-2.6 -Phive -Phive-thriftserver -Pyarn -DzincPort=3034 I'm using the default resource setup 15/09/07 08:49:04 INFO yarn.YarnAllocator: Will request 2 executor containers, each with 1 cores and 1408 MB memory including 384 MB overhead 15/09/07 08:49:04 INFO yarn.YarnAllocator: Container request (host: Any, capability: <memory:1408, vCores:1>) 15/09/07 08:49:04 INFO yarn.YarnAllocator: Container request (host: Any, capability: <memory:1408, vCores:1>) Reporter: Zoltan Toth After fitting a _spark.ml_ or _mllib model_ in *cluster* deploy mode, no dataframes can be written to hdfs. The driver receives an OutOfMemory exception during the writing. It seems, however, that the file gets written successfully. * This happens both in SparkR and pyspark * Only happens in cluster deploy mode * The write fails regardless the size of the dataframe and whether the dataframe is associated with the ml model. REPRO: {code} from pyspark import SparkContext, SparkConf from pyspark.sql import SQLContext from pyspark.ml.classification import LogisticRegression from pyspark.mllib.regression import LabeledPoint from pyspark.mllib.linalg import Vector, Vectors conf = SparkConf().setAppName("LogRegTest") sc = SparkContext(conf=conf) sqlContext = SQLContext(sc) sqlContext.setConf("park.sql.parquet.compression.codec", "uncompressed") training = sc.parallelize(( LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)), LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5)))) df = training.toDF() reg = LogisticRegression().setMaxIter(10).setRegParam(0.01) model = reg.fit(df) # Note that this is a brand new dataframe: one_df = sc.parallelize(( LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)), LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5)))).toDF() one_df.write.mode("overwrite").parquet("/tmp/df.parquet") {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org