[jira] [Commented] (SPARK-28484) spark-submit uses wrong SPARK_HOME with deploy-mode "cluster"
[ https://issues.apache.org/jira/browse/SPARK-28484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16893448#comment-16893448 ] Kalle Jepsen commented on SPARK-28484: -- thank you > spark-submit uses wrong SPARK_HOME with deploy-mode "cluster" > - > > Key: SPARK-28484 > URL: https://issues.apache.org/jira/browse/SPARK-28484 > Project: Spark > Issue Type: Bug > Components: Deploy >Affects Versions: 2.4.3 >Reporter: Kalle Jepsen >Priority: Major > > When submitting an application jar to a remote Spark cluster with > spark-submit and deploy-mode = "cluster", the driver command that is issued > on one of the workers seems to be configured with the SPARK_HOME of the local > machine, from which spark-submit was called, not the one where the driver is > actually running. > > I.e. if I have spark installed locally under e.g. /opt/apache-spark and > hadoop under /usr/lib/hadoop-3.2.0, but the cluster administrator installs > spark under /usr/local/spark on the workers, the command that is issued on > the worker still looks sth like this: > > {{"/usr/lib/jvm/java/bin/java" "-cp" > "/opt/apache-spark/conf:/etc/hadoop:/usr/lib/hadoop-3.2.0/share/hadoop/common/lib/*:/usr/lib/hadoop-3.2.0/share/hadoop/common/*:/usr/lib/hadoop-3.2.0/share/hadoop/hdfs:/usr/lib/hadoop-3.2.0/share/hadoop/hdfs/lib/*:/usr/lib/hadoop-3.2.0/share/hadoop/hdfs/*:/usr/lib/hadoop-3.2.0/share/hadoop/mapreduce/lib/*:/usr/lib/hadoop-3.2.0/share/hadoop/mapreduce/*:/usr/lib/hadoop-3.2.0/share/hadoop/yarn:/usr/lib/hadoop-3.2.0/share/hadoop/yarn/lib/*:/usr/lib/hadoop-3.2.0/share/hadoop/yarn/*" > "-Xmx1024M" "-Dspark.jars=file:///some/application.jar" > "-Dspark.driver.supervise=false" "-Dspark.submit.deployMode=cluster" > "-Dspark.master=spark://:7077" "-Dspark.app.name=" > "-Dspark.rpc.askTimeout=10s" "org.apache.spark.deploy.worker.DriverWrapper" > "spark://Worker@:65000" "/some/application.jar" > "some.class.Name"}} > > Is this expected behavior and/or can I somehow control that? > > Steps to reproduce: > > 1. Install Spark locally (with a SPARK_HOME that's different on the cluster) > {{2. Run: spark-submit --deploy-mode "cluster" --master > "spark://spark.example.com:7077" --class "com.example.SparkApp" > "hdfs:/some/application.jar"}} > 3. Observe that the application fails because some spark and/or hadoop > classes cannot be found > > This applies to Spark Standalone, I haven't tried with YARN -- This message was sent by Atlassian JIRA (v7.6.14#76016) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-28484) spark-submit uses wrong SPARK_HOME with deploy-mode "cluster"
Kalle Jepsen created SPARK-28484: Summary: spark-submit uses wrong SPARK_HOME with deploy-mode "cluster" Key: SPARK-28484 URL: https://issues.apache.org/jira/browse/SPARK-28484 Project: Spark Issue Type: Bug Components: Deploy Affects Versions: 2.4.3 Reporter: Kalle Jepsen When submitting an application jar to a remote Spark cluster with spark-submit and deploy-mode = "cluster", the driver command that is issued on one of the workers seems to be configured with the SPARK_HOME of the local machine, from which spark-submit was called, not the one where the driver is actually running. I.e. if I have spark installed locally under e.g. /opt/apache-spark and hadoop under /usr/lib/hadoop-3.2.0, but the cluster administrator installs spark under /usr/local/spark on the workers, the command that is issued on the worker still looks sth like this: {{"/usr/lib/jvm/java/bin/java" "-cp" "/opt/apache-spark/conf:/etc/hadoop:/usr/lib/hadoop-3.2.0/share/hadoop/common/lib/*:/usr/lib/hadoop-3.2.0/share/hadoop/common/*:/usr/lib/hadoop-3.2.0/share/hadoop/hdfs:/usr/lib/hadoop-3.2.0/share/hadoop/hdfs/lib/*:/usr/lib/hadoop-3.2.0/share/hadoop/hdfs/*:/usr/lib/hadoop-3.2.0/share/hadoop/mapreduce/lib/*:/usr/lib/hadoop-3.2.0/share/hadoop/mapreduce/*:/usr/lib/hadoop-3.2.0/share/hadoop/yarn:/usr/lib/hadoop-3.2.0/share/hadoop/yarn/lib/*:/usr/lib/hadoop-3.2.0/share/hadoop/yarn/*" "-Xmx1024M" "-Dspark.jars=file:///some/application.jar" "-Dspark.driver.supervise=false" "-Dspark.submit.deployMode=cluster" "-Dspark.master=spark://:7077" "-Dspark.app.name=" "-Dspark.rpc.askTimeout=10s" "org.apache.spark.deploy.worker.DriverWrapper" "spark://Worker@:65000" "/some/application.jar" "some.class.Name"}} Is this expected behavior and/or can I somehow control that? Steps to reproduce: 1. Install Spark locally (with a SPARK_HOME that's different on the cluster) {{2. Run: spark-submit --deploy-mode "cluster" --master "spark://spark.example.com:7077" --class "com.example.SparkApp" "hdfs:/some/application.jar"}} 3. Observe that the application fails because some spark and/or hadoop classes cannot be found This applies to Spark Standalone, I haven't tried with YARN -- This message was sent by Atlassian JIRA (v7.6.14#76016) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-17294) Caching invalidates data on mildly wide dataframes
Kalle Jepsen created SPARK-17294: Summary: Caching invalidates data on mildly wide dataframes Key: SPARK-17294 URL: https://issues.apache.org/jira/browse/SPARK-17294 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 2.0.0, 1.6.2 Reporter: Kalle Jepsen Caching a dataframe with > 200 columns causes the data within to simply vanish under certain circumstances. Consider the following code, where we create a one-row dataframe containing the numbers from 0 to 200. {code} n_cols = 201 rng = range(n_cols) df = spark.createDataFrame( data=[rng] ) last = df.columns[-1] print(df.select(last).collect()) df.select(F.greatest(*df.columns).alias('greatest')).show() {code} Returns: {noformat} [Row(_201=200)] ++ |greatest| ++ | 200| ++ {noformat} As expected column {{_201}} contains the number 200 and as expected the greatest value within that single row is 200. Now if we introduce a {{.cache}} on {{df}}: {code} n_cols = 201 rng = range(n_cols) df = spark.createDataFrame( data=[rng] ).cache() last = df.columns[-1] print(df.select(last).collect()) df.select(F.greatest(*df.columns).alias('greatest')).show() {code} Returns: {noformat} [Row(_201=200)] ++ |greatest| ++ | 0| ++ {noformat} the last column {{_201}} still seems to contain the correct value, but when I try to select the greatest value within the row, 0 is returned. When I issue {{.show()}} on the dataframe, all values will be zero. As soon as I limit the columns on a number < 200, everything looks fine again. When the number of columns is < 200 from the beginning, even the cache will not break things and everything works as expected. It doesn't matter whether the data is loaded from disk or created on the fly and this happens in Spark 1.6.2 and 2.0.0 (haven't tested anything else). Can anyone confirm this? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17217) Codegeneration fails for describe() on many columns
[ https://issues.apache.org/jira/browse/SPARK-17217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kalle Jepsen updated SPARK-17217: - Description: Consider the following minimal python script: {code:python} import pyspark from pyspark.sql import functions as F conf = pyspark.SparkConf() sc = pyspark.SparkContext(conf=conf) spark = pyspark.sql.SQLContext(sc) ncols = 510 nrows = 10 df = spark.range(0, nrows) s = df.select( [ F.randn(seed=i).alias('C%i' % i) for i in range(ncols) ] ).describe() {code} This fails with a traceback counting 3.6M (!) lines for {{ncols >= 510}}, saying something like {noformat} 16/08/24 16:50:57 ERROR CodeGenerator: failed to compile: java.io.EOFException /* 001 */ public java.lang.Object generate(Object[] references) { /* 002 */ return new SpecificMutableProjection(references); /* 003 */ } /* 004 */ /* 005 */ class SpecificMutableProjection extends org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection { ... /* 7372 */ private boolean isNull_1969; /* 7373 */ private double value_1969; /* 7374 */ private boolean isNull_1970; ... /* 11035 */ double value14944 = -1.0; /* 11036 */ /* 11037 */ /* 11038 */ if (!evalExpr1052IsNull) { /* 11039 */ /* 11040 */ isNull14944 = false; // resultCode could change nullability. /* 11041 */ value14944 = evalExpr1326Value - evalExpr1052Value; /* 11042 */ ... /* 157621 */ apply1_6(i); /* 157622 */ return mutableRow; /* 157623 */ } /* 157624 */ } at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:889) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:941) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:938) at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379) at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342) ... 30 more Caused by: java.io.EOFException at java.io.DataInputStream.readFully(DataInputStream.java:197) at java.io.DataInputStream.readFully(DataInputStream.java:169) at org.codehaus.janino.util.ClassFile.loadAttribute(ClassFile.java:1383) at org.codehaus.janino.util.ClassFile.loadAttributes(ClassFile.java:555) at org.codehaus.janino.util.ClassFile.loadFields(ClassFile.java:518) at org.codehaus.janino.util.ClassFile.(ClassFile.java:185) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anonfun$recordCompilationStats$1.apply(CodeGenerator.scala:914) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anonfun$recordCompilationStats$1.apply(CodeGenerator.scala:912) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.recordCompilationStats(CodeGenerator.scala:912) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:884) ... 35 more {noformat} I've seen something similar in an earlier Spark version ([reported in this issue|https://issues.apache.org/jira/browse/SPARK-14138]). My conclusion is that {{describe}} was never meant to be used non-interactively on very wide dataframes, am I right? was: Consider the following minimal python script: {code:python} import pyspark from pyspark.sql import functions as F conf = pyspark.SparkConf() sc = pyspark.SparkContext(conf=conf) spark = pyspark.sql.SQLContext(sc) ncols = 510 nrows = 10 df = spark.range(0, nrows) s = df.select( [ F.randn(seed=i).alias('C%i' % i) for i in range(ncols) ] ).describe() {code} This fails with a traceback counting 3.6M (!) lines for {{ncols >= 510}}, saying something in the likes of {noformat} 16/08/24 16:50:57 ERROR CodeGenerator: failed to compile: java.io.EOFException /* 001 */ public java.lang.Object generate(Object[] references) { /* 002 */ return new SpecificMutableProjection(references); /* 003 */ } /* 004 */ /* 005 */ class SpecificMutableProjection extends org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection { ... /* 7372 */ private boolean isNull_1969; /* 7373 */ private double value_1969; /* 7374 */ private boolean isNull_1970; ... /* 11035 */ double value14944 = -1.0;
[jira] [Created] (SPARK-17217) Codegeneration fails for describe() on many columns
Kalle Jepsen created SPARK-17217: Summary: Codegeneration fails for describe() on many columns Key: SPARK-17217 URL: https://issues.apache.org/jira/browse/SPARK-17217 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 2.0.0 Reporter: Kalle Jepsen Consider the following minimal python script: {code:python} import pyspark from pyspark.sql import functions as F conf = pyspark.SparkConf() sc = pyspark.SparkContext(conf=conf) spark = pyspark.sql.SQLContext(sc) ncols = 510 nrows = 10 df = spark.range(0, nrows) s = df.select( [ F.randn(seed=i).alias('C%i' % i) for i in range(ncols) ] ).describe() {code} This fails with a traceback counting 3.6M (!) lines for {{ncols >= 510}}, saying something in the likes of {noformat} 16/08/24 16:50:57 ERROR CodeGenerator: failed to compile: java.io.EOFException /* 001 */ public java.lang.Object generate(Object[] references) { /* 002 */ return new SpecificMutableProjection(references); /* 003 */ } /* 004 */ /* 005 */ class SpecificMutableProjection extends org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection { ... /* 7372 */ private boolean isNull_1969; /* 7373 */ private double value_1969; /* 7374 */ private boolean isNull_1970; ... /* 11035 */ double value14944 = -1.0; /* 11036 */ /* 11037 */ /* 11038 */ if (!evalExpr1052IsNull) { /* 11039 */ /* 11040 */ isNull14944 = false; // resultCode could change nullability. /* 11041 */ value14944 = evalExpr1326Value - evalExpr1052Value; /* 11042 */ ... /* 157621 */ apply1_6(i); /* 157622 */ return mutableRow; /* 157623 */ } /* 157624 */ } at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:889) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:941) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:938) at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379) at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342) ... 30 more Caused by: java.io.EOFException at java.io.DataInputStream.readFully(DataInputStream.java:197) at java.io.DataInputStream.readFully(DataInputStream.java:169) at org.codehaus.janino.util.ClassFile.loadAttribute(ClassFile.java:1383) at org.codehaus.janino.util.ClassFile.loadAttributes(ClassFile.java:555) at org.codehaus.janino.util.ClassFile.loadFields(ClassFile.java:518) at org.codehaus.janino.util.ClassFile.(ClassFile.java:185) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anonfun$recordCompilationStats$1.apply(CodeGenerator.scala:914) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anonfun$recordCompilationStats$1.apply(CodeGenerator.scala:912) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.recordCompilationStats(CodeGenerator.scala:912) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:884) ... 35 more {noformat} I've seen something similar in an earlier Spark version ([reported in this issue|https://issues.apache.org/jira/browse/SPARK-14138]. My conclusion is that {{describe}} was never meant to be used non-interactively on very wide dataframes, am I right? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-14138) Generated SpecificColumnarIterator code can exceed JVM size limit for cached DataFrames
[ https://issues.apache.org/jira/browse/SPARK-14138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15329387#comment-15329387 ] Kalle Jepsen commented on SPARK-14138: -- I've now successfully compiled from commit c53c83c for Hadoop 2.6. The original exception is gone, but unfortunately a StackOverflowError is raised instead. Full traceback here: http://pastebin.com/0C6tk2SD. The error occurred when trying to persist a Dataframe with a little more than 3000 aggregation columns from Pyspark. > Generated SpecificColumnarIterator code can exceed JVM size limit for cached > DataFrames > --- > > Key: SPARK-14138 > URL: https://issues.apache.org/jira/browse/SPARK-14138 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.1 >Reporter: Sven Krasser >Assignee: Kazuaki Ishizaki > Fix For: 2.0.0 > > > The generated {{SpecificColumnarIterator}} code for wide DataFrames can > exceed the JVM 64k limit under certain circumstances. This snippet reproduces > the error in spark-shell (with 5G driver memory) by creating a new DataFrame > with >2000 aggregation-based columns: > {code} > val df = sc.parallelize(1 to 10).toDF() > val aggr = {1 to 2260}.map(colnum => avg(df.col("_1")).as(s"col_$colnum")) > val res = df.groupBy("_1").agg(count("_1"), aggr: _*).cache() > res.show() // this will break > {code} > The following error is produced (pruned for brevity): > {noformat} > /* 001 */ > /* 002 */ import java.nio.ByteBuffer; > /* 003 */ import java.nio.ByteOrder; > /* 004 */ import scala.collection.Iterator; > /* 005 */ import org.apache.spark.sql.types.DataType; > /* 006 */ import > org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder; > /* 007 */ import > org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter; > /* 008 */ import org.apache.spark.sql.execution.columnar.MutableUnsafeRow; > /* 009 */ > /* 010 */ public SpecificColumnarIterator > generate(org.apache.spark.sql.catalyst.expressions.Expression[] expr) { > /* 011 */ return new SpecificColumnarIterator(); > /* 012 */ } > /* 013 */ > ... > /* 9113 */ accessor2261.extractTo(mutableRow, 2261); > /* 9114 */ unsafeRow.pointTo(bufferHolder.buffer, 2262, > bufferHolder.totalSize()); > /* 9115 */ return unsafeRow; > /* 9116 */ } > /* 9117 */ } > at > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:555) > at > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:575) > at > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:572) > at > org.spark-project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) > at > org.spark-project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379) > ... 28 more > Caused by: org.codehaus.janino.JaninoRuntimeException: Code of method "()Z" > of class > "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator" > grows beyond 64 KB > at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:941) > at org.codehaus.janino.CodeContext.write(CodeContext.java:836) > at org.codehaus.janino.UnitCompiler.writeOpcode(UnitCompiler.java:10251) > at org.codehaus.janino.UnitCompiler.invoke(UnitCompiler.java:10050) > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4008) > at org.codehaus.janino.UnitCompiler.access$6900(UnitCompiler.java:185) > at > org.codehaus.janino.UnitCompiler$10.visitMethodInvocation(UnitCompiler.java:3263) > at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:3974) > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:3290) > at > org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:4368) > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:3927) > at org.codehaus.janino.UnitCompiler.access$6900(UnitCompiler.java:185) > at > org.codehaus.janino.UnitCompiler$10.visitMethodInvocation(UnitCompiler.java:3263) > at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:3974) > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:3290) > at > org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:4368) > at > org.codehaus.janino.UnitCompiler.invokeConstructor(UnitCompiler.java:6681) > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4126) > at org.codehaus.janino.UnitCompiler.access$7600(UnitCompiler.java:185) > at > org.codehaus.janino.UnitCompiler$10.visitNewClassIn
[jira] [Commented] (SPARK-14138) Generated SpecificColumnarIterator code can exceed JVM size limit for cached DataFrames
[ https://issues.apache.org/jira/browse/SPARK-14138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15329358#comment-15329358 ] Kalle Jepsen commented on SPARK-14138: -- Will there be a 1.6.2 release any time soon? The current 1.6-branch cannot be compiled and there is no nightly for 1.6 at http://people.apache.org/~pwendell/spark-nightly/spark-branch-1.6-bin/spark-1.6.2-SNAPSHOT-2016_06_13_01_29-be3c41b-bin/. > Generated SpecificColumnarIterator code can exceed JVM size limit for cached > DataFrames > --- > > Key: SPARK-14138 > URL: https://issues.apache.org/jira/browse/SPARK-14138 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.1 >Reporter: Sven Krasser >Assignee: Kazuaki Ishizaki > Fix For: 2.0.0 > > > The generated {{SpecificColumnarIterator}} code for wide DataFrames can > exceed the JVM 64k limit under certain circumstances. This snippet reproduces > the error in spark-shell (with 5G driver memory) by creating a new DataFrame > with >2000 aggregation-based columns: > {code} > val df = sc.parallelize(1 to 10).toDF() > val aggr = {1 to 2260}.map(colnum => avg(df.col("_1")).as(s"col_$colnum")) > val res = df.groupBy("_1").agg(count("_1"), aggr: _*).cache() > res.show() // this will break > {code} > The following error is produced (pruned for brevity): > {noformat} > /* 001 */ > /* 002 */ import java.nio.ByteBuffer; > /* 003 */ import java.nio.ByteOrder; > /* 004 */ import scala.collection.Iterator; > /* 005 */ import org.apache.spark.sql.types.DataType; > /* 006 */ import > org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder; > /* 007 */ import > org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter; > /* 008 */ import org.apache.spark.sql.execution.columnar.MutableUnsafeRow; > /* 009 */ > /* 010 */ public SpecificColumnarIterator > generate(org.apache.spark.sql.catalyst.expressions.Expression[] expr) { > /* 011 */ return new SpecificColumnarIterator(); > /* 012 */ } > /* 013 */ > ... > /* 9113 */ accessor2261.extractTo(mutableRow, 2261); > /* 9114 */ unsafeRow.pointTo(bufferHolder.buffer, 2262, > bufferHolder.totalSize()); > /* 9115 */ return unsafeRow; > /* 9116 */ } > /* 9117 */ } > at > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:555) > at > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:575) > at > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:572) > at > org.spark-project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) > at > org.spark-project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379) > ... 28 more > Caused by: org.codehaus.janino.JaninoRuntimeException: Code of method "()Z" > of class > "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator" > grows beyond 64 KB > at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:941) > at org.codehaus.janino.CodeContext.write(CodeContext.java:836) > at org.codehaus.janino.UnitCompiler.writeOpcode(UnitCompiler.java:10251) > at org.codehaus.janino.UnitCompiler.invoke(UnitCompiler.java:10050) > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4008) > at org.codehaus.janino.UnitCompiler.access$6900(UnitCompiler.java:185) > at > org.codehaus.janino.UnitCompiler$10.visitMethodInvocation(UnitCompiler.java:3263) > at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:3974) > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:3290) > at > org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:4368) > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:3927) > at org.codehaus.janino.UnitCompiler.access$6900(UnitCompiler.java:185) > at > org.codehaus.janino.UnitCompiler$10.visitMethodInvocation(UnitCompiler.java:3263) > at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:3974) > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:3290) > at > org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:4368) > at > org.codehaus.janino.UnitCompiler.invokeConstructor(UnitCompiler.java:6681) > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4126) > at org.codehaus.janino.UnitCompiler.access$7600(UnitCompiler.java:185) > at > org.codehaus.janino.UnitCompiler$10.visitNewClassInstance(UnitCompiler.java:3275) > at org.codehaus.janino.Java$NewClassInstance
[jira] [Commented] (SPARK-12835) StackOverflowError when aggregating over column from window function
[ https://issues.apache.org/jira/browse/SPARK-12835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15102491#comment-15102491 ] Kalle Jepsen commented on SPARK-12835: -- The [traceback|http://pastebin.com/pRRCAben] really is ridiculously long. In my actual application I would have the window partitioned and the aggregation done in {{df.groupby(key).agg(avg_diff}}. Would that still be problematic with regard to performance? The error is the same there though, that's why I've chosen the more concise minimal example above. > StackOverflowError when aggregating over column from window function > > > Key: SPARK-12835 > URL: https://issues.apache.org/jira/browse/SPARK-12835 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 1.6.0 >Reporter: Kalle Jepsen > > I am encountering a StackoverflowError with a very long traceback, when I try > to directly aggregate on a column created by a window function. > E.g. I am trying to determine the average timespan between dates in a > Dataframe column by using a window-function: > {code} > from pyspark import SparkContext > from pyspark.sql import HiveContext, Window, functions > from datetime import datetime > sc = SparkContext() > sq = HiveContext(sc) > data = [ > [datetime(2014,1,1)], > [datetime(2014,2,1)], > [datetime(2014,3,1)], > [datetime(2014,3,6)], > [datetime(2014,8,23)], > [datetime(2014,10,1)], > ] > df = sq.createDataFrame(data, schema=['ts']) > ts = functions.col('ts') > > w = Window.orderBy(ts) > diff = functions.datediff( > ts, > functions.lag(ts, count=1).over(w) > ) > avg_diff = functions.avg(diff) > {code} > While {{df.select(diff.alias('diff')).show()}} correctly renders as > {noformat} > ++ > |diff| > ++ > |null| > | 31| > | 28| > | 5| > | 170| > | 39| > ++ > {noformat} > doing {code} > df.select(avg_diff).show() > {code} throws a {{java.lang.StackOverflowError}}. > When I say > {code} > df2 = df.select(diff.alias('diff')) > df2.select(functions.avg('diff')) > {code} > however, there's no error. > Am I wrong to assume that the above should work? > I've already described the same in [this question on > stackoverflow.com|http://stackoverflow.com/questions/34793999/averaging-over-window-function-leads-to-stackoverflowerror]. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-12835) StackOverflowError when aggregating over column from window function
Kalle Jepsen created SPARK-12835: Summary: StackOverflowError when aggregating over column from window function Key: SPARK-12835 URL: https://issues.apache.org/jira/browse/SPARK-12835 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 1.6.0 Reporter: Kalle Jepsen I am encountering a StackoverflowError with a very long traceback, when I try to directly aggregate on a column created by a window function. E.g. I am trying to determine the average timespan between dates in a Dataframe column by using a window-function: {code} from pyspark import SparkContext from pyspark.sql import HiveContext, Window, functions from datetime import datetime sc = SparkContext() sq = HiveContext(sc) data = [ [datetime(2014,1,1)], [datetime(2014,2,1)], [datetime(2014,3,1)], [datetime(2014,3,6)], [datetime(2014,8,23)], [datetime(2014,10,1)], ] df = sq.createDataFrame(data, schema=['ts']) ts = functions.col('ts') w = Window.orderBy(ts) diff = functions.datediff( ts, functions.lag(ts, count=1).over(w) ) avg_diff = functions.avg(diff) {code} While {{df.select(diff.alias('diff')).show()}} correctly renders as {noformat} ++ |diff| ++ |null| | 31| | 28| | 5| | 170| | 39| ++ {noformat} doing {code} df.select(avg_diff).show() {code} throws a {{java.lang.StackOverflowError}}. When I say {code} df2 = df.select(diff.alias('diff')) df2.select(functions.avg('diff')) {code} however, there's no error. Am I wrong to assume that the above should work? I've already described the same in [this question on stackoverflow.com|http://stackoverflow.com/questions/34793999/averaging-over-window-function-leads-to-stackoverflowerror]. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-4105) FAILED_TO_UNCOMPRESS(5) errors when fetching shuffle data with sort-based shuffle
[ https://issues.apache.org/jira/browse/SPARK-4105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kalle Jepsen updated SPARK-4105: Affects Version/s: 1.4.1 > FAILED_TO_UNCOMPRESS(5) errors when fetching shuffle data with sort-based > shuffle > - > > Key: SPARK-4105 > URL: https://issues.apache.org/jira/browse/SPARK-4105 > Project: Spark > Issue Type: Bug > Components: Shuffle, Spark Core >Affects Versions: 1.2.0, 1.2.1, 1.3.0, 1.4.1 >Reporter: Josh Rosen >Assignee: Josh Rosen >Priority: Blocker > Attachments: JavaObjectToSerialize.java, > SparkFailedToUncompressGenerator.scala > > > We have seen non-deterministic {{FAILED_TO_UNCOMPRESS(5)}} errors during > shuffle read. Here's a sample stacktrace from an executor: > {code} > 14/10/23 18:34:11 ERROR Executor: Exception in task 1747.3 in stage 11.0 (TID > 33053) > java.io.IOException: FAILED_TO_UNCOMPRESS(5) > at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:78) > at org.xerial.snappy.SnappyNative.rawUncompress(Native Method) > at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:391) > at org.xerial.snappy.Snappy.uncompress(Snappy.java:427) > at > org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:127) > at > org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88) > at org.xerial.snappy.SnappyInputStream.(SnappyInputStream.java:58) > at > org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128) > at > org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1090) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator$$anon$1$$anonfun$onBlockFetchSuccess$1.apply(ShuffleBlockFetcherIterator.scala:116) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator$$anon$1$$anonfun$onBlockFetchSuccess$1.apply(ShuffleBlockFetcherIterator.scala:115) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:243) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:52) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) > at > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at > org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:129) > at > org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) > at > org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) > at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at > org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at > org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:56) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > {code} > Here's another occurrence of a similar error: > {code} > java.io.IOException: failed to rea
[jira] [Commented] (SPARK-7278) Inconsistent handling of dates in PySparks Row object
[ https://issues.apache.org/jira/browse/SPARK-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14538051#comment-14538051 ] Kalle Jepsen commented on SPARK-7278: - Shouldn't {{DateType}} at least find {{datetime.datetime}} acceptable? > Inconsistent handling of dates in PySparks Row object > - > > Key: SPARK-7278 > URL: https://issues.apache.org/jira/browse/SPARK-7278 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 1.3.1 >Reporter: Kalle Jepsen > > Consider the following Python code: > {code:none} > import datetime > rdd = sc.parallelize([[0, datetime.date(2014, 11, 11)], [1, > datetime.date(2015,6,4)]]) > df = rdd.toDF(schema=['rid', 'date']) > row = df.first() > {code} > Accessing the {{date}} column via {{\_\_getitem\_\_}} returns a > {{datetime.datetime}} instance > {code:none} > >>>row[1] > datetime.datetime(2014, 11, 11, 0, 0) > {code} > while access via {{getattr}} returns a {{datetime.date}} instance: > {code:none} > >>>row.date > datetime.date(2014, 11, 11) > {code} > The problem seems to be that that Java deserializes the {{datetime.date}} > objects to {{datetime.datetime}}. This is taken care of > [here|https://github.com/apache/spark/blob/master/python/pyspark/sql/_types.py#L1027] > when using {{getattr}}, but is overlooked when directly accessing the tuple > by index. > Is there an easy way to fix this? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-7116) Intermediate RDD cached but never unpersisted
[ https://issues.apache.org/jira/browse/SPARK-7116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14532146#comment-14532146 ] Kalle Jepsen commented on SPARK-7116: - Sure, thanks > Intermediate RDD cached but never unpersisted > - > > Key: SPARK-7116 > URL: https://issues.apache.org/jira/browse/SPARK-7116 > Project: Spark > Issue Type: Improvement > Components: PySpark, SQL >Affects Versions: 1.3.1 >Reporter: Kalle Jepsen > > In > https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala#L233 > an intermediate RDD is cached, but never unpersisted. It shows up in the > 'Storage' section of the Web UI, but cannot be removed. There's already a > comment in the source, suggesting to 'clean up'. If that cleanup is more > involved than simply calling `unpersist`, it probably exceeds my current > Scala skills. > Why that is a problem: > I'm adding a constant column to a DataFrame of about 20M records resulting > from an inner join with {{df.withColumn(colname, ud_func())}} , where > {{ud_func}} is simply a wrapped {{lambda: 1}}. Before and after applying the > UDF the DataFrame takes up ~430MB in the cache. The cached intermediate RDD > however takes up ~10GB(!) of storage, and I know of no way to uncache it. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-7116) Intermediate RDD cached but never unpersisted
[ https://issues.apache.org/jira/browse/SPARK-7116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14530580#comment-14530580 ] Kalle Jepsen commented on SPARK-7116: - [~marmbrus] Do you remember why that {{cache()}} was necessary? I've boldly commented it out and the only thing that seems to have changed is that everything runs a lot faster... > Intermediate RDD cached but never unpersisted > - > > Key: SPARK-7116 > URL: https://issues.apache.org/jira/browse/SPARK-7116 > Project: Spark > Issue Type: Improvement > Components: PySpark, SQL >Affects Versions: 1.3.1 >Reporter: Kalle Jepsen > > In > https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala#L233 > an intermediate RDD is cached, but never unpersisted. It shows up in the > 'Storage' section of the Web UI, but cannot be removed. There's already a > comment in the source, suggesting to 'clean up'. If that cleanup is more > involved than simply calling `unpersist`, it probably exceeds my current > Scala skills. > Why that is a problem: > I'm adding a constant column to a DataFrame of about 20M records resulting > from an inner join with {{df.withColumn(colname, ud_func())}} , where > {{ud_func}} is simply a wrapped {{lambda: 1}}. Before and after applying the > UDF the DataFrame takes up ~430MB in the cache. The cached intermediate RDD > however takes up ~10GB(!) of storage, and I know of no way to uncache it. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-7278) Inconsistent handling of dates in PySparks Row object
[ https://issues.apache.org/jira/browse/SPARK-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521753#comment-14521753 ] Kalle Jepsen commented on SPARK-7278: - This is probably a duplicate of [this issue|https://issues.apache.org/jira/browse/SPARK-6289]. Is there anything dangerous about simply adopting {{datetime.datetime}} as PySparks generic date type? > Inconsistent handling of dates in PySparks Row object > - > > Key: SPARK-7278 > URL: https://issues.apache.org/jira/browse/SPARK-7278 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 1.3.1 >Reporter: Kalle Jepsen > > Consider the following Python code: > {code:none} > import datetime > rdd = sc.parallelize([[0, datetime.date(2014, 11, 11)], [1, > datetime.date(2015,6,4)]]) > df = rdd.toDF(schema=['rid', 'date']) > row = df.first() > {code} > Accessing the {{date}} column via {{\_\_getitem\_\_}} returns a > {{datetime.datetime}} instance > {code:none} > >>>row[1] > datetime.datetime(2014, 11, 11, 0, 0) > {code} > while access via {{getattr}} returns a {{datetime.date}} instance: > {code:none} > >>>row.date > datetime.date(2014, 11, 11) > {code} > The problem seems to be that that Java deserializes the {{datetime.date}} > objects to {{datetime.datetime}}. This is taken care of > [here|https://github.com/apache/spark/blob/master/python/pyspark/sql/_types.py#L1027] > when using {{getattr}}, but is overlooked when directly accessing the tuple > by index. > Is there an easy way to fix this? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-7278) Inconsistent handling of dates in PySparks Row object
Kalle Jepsen created SPARK-7278: --- Summary: Inconsistent handling of dates in PySparks Row object Key: SPARK-7278 URL: https://issues.apache.org/jira/browse/SPARK-7278 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 1.3.1 Reporter: Kalle Jepsen Consider the following Python code: {code:none} import datetime rdd = sc.parallelize([[0, datetime.date(2014, 11, 11)], [1, datetime.date(2015,6,4)]]) df = rdd.toDF(schema=['rid', 'date']) row = df.first() {code} Accessing the {{date}} column via {{\_\_getitem\_\_}} returns a {{datetime.datetime}} instance {code:none} >>>row[1] datetime.datetime(2014, 11, 11, 0, 0) {code} while access via {{getattr}} returns a {{datetime.date}} instance: {code:none} >>>row.date datetime.date(2014, 11, 11) {code} The problem seems to be that that Java deserializes the {{datetime.date}} objects to {{datetime.datetime}}. This is taken care of [here|https://github.com/apache/spark/blob/master/python/pyspark/sql/_types.py#L1027] when using {{getattr}}, but is overlooked when directly accessing the tuple by index. Is there an easy way to fix this? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-7035) Drop __getattr__ on pyspark.sql.DataFrame
[ https://issues.apache.org/jira/browse/SPARK-7035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14518925#comment-14518925 ] Kalle Jepsen commented on SPARK-7035: - I've created a PR to fix the error message in https://github.com/apache/spark/pull/5771. I didn't deem it necessary to open a JIRA for a minor change like this and hope that was the right thing to do. > Drop __getattr__ on pyspark.sql.DataFrame > - > > Key: SPARK-7035 > URL: https://issues.apache.org/jira/browse/SPARK-7035 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 1.4.0 >Reporter: Kalle Jepsen > > I think the {{\_\_getattr\_\_}} method on the DataFrame should be removed. > There is no point in having the possibility to address the DataFrames columns > as {{df.column}}, other than the questionable goal to please R developers. > And it seems R people can use Spark from their native API in the future. > I see the following problems with {{\_\_getattr\_\_}} for column selection: > * It's un-pythonic: There should only be one obvious way to solve a problem, > and we can already address columns on a DataFrame via the {{\_\_getitem\_\_}} > method, which in my opinion is by far superior and a lot more intuitive. > * It leads to confusing Exceptions. When we mistype a method-name the > {{AttributeError}} will say 'No such column ... '. > * And most importantly: we cannot load DataFrames that have columns with the > same name as any attribute on the DataFrame-object. Imagine having a > DataFrame with a column named {{cache}} or {{filter}}. Calling {{df.cache()}} > will be ambiguous and lead to broken code. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-7035) Drop __getattr__ on pyspark.sql.DataFrame
[ https://issues.apache.org/jira/browse/SPARK-7035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14513633#comment-14513633 ] Kalle Jepsen commented on SPARK-7035: - [~rxin] Sure, I'll try to prepare one today. > Drop __getattr__ on pyspark.sql.DataFrame > - > > Key: SPARK-7035 > URL: https://issues.apache.org/jira/browse/SPARK-7035 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 1.4.0 >Reporter: Kalle Jepsen > > I think the {{\_\_getattr\_\_}} method on the DataFrame should be removed. > There is no point in having the possibility to address the DataFrames columns > as {{df.column}}, other than the questionable goal to please R developers. > And it seems R people can use Spark from their native API in the future. > I see the following problems with {{\_\_getattr\_\_}} for column selection: > * It's un-pythonic: There should only be one obvious way to solve a problem, > and we can already address columns on a DataFrame via the {{\_\_getitem\_\_}} > method, which in my opinion is by far superior and a lot more intuitive. > * It leads to confusing Exceptions. When we mistype a method-name the > {{AttributeError}} will say 'No such column ... '. > * And most importantly: we cannot load DataFrames that have columns with the > same name as any attribute on the DataFrame-object. Imagine having a > DataFrame with a column named {{cache}} or {{filter}}. Calling {{df.cache()}} > will be ambiguous and lead to broken code. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-7116) Intermediate RDD cached but never unpersisted
Kalle Jepsen created SPARK-7116: --- Summary: Intermediate RDD cached but never unpersisted Key: SPARK-7116 URL: https://issues.apache.org/jira/browse/SPARK-7116 Project: Spark Issue Type: Improvement Components: PySpark Affects Versions: 1.3.1 Reporter: Kalle Jepsen In https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala#L233 an intermediate RDD is cached, but never unpersisted. It shows up in the 'Storage' section of the Web UI, but cannot be removed. There's already a comment in the source, suggesting to 'clean up'. If that cleanup is more involved than simply calling `unpersist`, it probably exceeds my current Scala skills. Why that is a problem: I'm adding a constant column to a DataFrame of about 20M records resulting from an inner join with {{df.withColumn(colname, ud_func())}} , where {{ud_func}} is simply a wrapped {{lambda: 1}}. Before and after applying the UDF the DataFrame takes up ~430MB in the cache. The cached intermediate RDD however takes up ~10GB(!) of storage, and I know of no way to uncache it. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-7035) Drop __getattr__ on pyspark.sql.DataFrame
[ https://issues.apache.org/jira/browse/SPARK-7035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14506671#comment-14506671 ] Kalle Jepsen commented on SPARK-7035: - 1. Well, the interface being un-pythonic is the weakest point of my three. Still I believe that Pandas is not exactly an authority on good Python style, so the argument that it's pythonic because Pandas supports it does not hold. 2. I agree on the Exception part 3. The problem with collisions between column names and attributes names remains. It's inconsistent and will break code. Why should some columns be accessible by `df.columnname` but others not? Imagine having code that relies on a column named `something` by using `some_func(df.something)` and it's working perfectly fine. Now at some point in the future an attribute `something` is added to the DataFrame API. The column will no longer be accessible like that, your application will break. Using `some_func(df['something'])` instead is robust. One might argue that it's up to the user to choose between the two, but that's exactly what I meant when I said it's unpythonic: There's more than one obvious way to do it and one of them is dangerous, the other perfectly fine. There's an easy fix, it's still early enough to make such API changes and we won't lose anything. In fact I think we gain a whole lot more by reducing the size of the code and the risk of errors. > Drop __getattr__ on pyspark.sql.DataFrame > - > > Key: SPARK-7035 > URL: https://issues.apache.org/jira/browse/SPARK-7035 > Project: Spark > Issue Type: Improvement > Components: PySpark >Affects Versions: 1.4.0 >Reporter: Kalle Jepsen > > I think the {{\_\_getattr\_\_}} method on the DataFrame should be removed. > There is no point in having the possibility to address the DataFrames columns > as {{df.column}}, other than the questionable goal to please R developers. > And it seems R people can use Spark from their native API in the future. > I see the following problems with {{\_\_getattr\_\_}} for column selection: > * It's un-pythonic: There should only be one obvious way to solve a problem, > and we can already address columns on a DataFrame via the {{\_\_getitem\_\_}} > method, which in my opinion is by far superior and a lot more intuitive. > * It leads to confusing Exceptions. When we mistype a method-name the > {{AttributeError}} will say 'No such column ... '. > * And most importantly: we cannot load DataFrames that have columns with the > same name as any attribute on the DataFrame-object. Imagine having a > DataFrame with a column named {{cache}} or {{filter}}. Calling {{df.cache()}} > will be ambiguous and lead to broken code. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6231) Join on two tables (generated from same one) is broken
[ https://issues.apache.org/jira/browse/SPARK-6231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14506611#comment-14506611 ] Kalle Jepsen commented on SPARK-6231: - The workaround proposed by [~marmbrus] is not possible in Python, because 'as' is not (and cannot be) an attribute on DataFrame. > Join on two tables (generated from same one) is broken > -- > > Key: SPARK-6231 > URL: https://issues.apache.org/jira/browse/SPARK-6231 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.3.0 >Reporter: Davies Liu >Priority: Critical > Labels: DataFrame > > If the two column used in joinExpr come from the same table, they have the > same id, then the joniExpr is explained in wrong way. > {code} > val df = sqlContext.load(path, "parquet") > val txns = df.groupBy("cust_id").agg($"cust_id", > countDistinct($"day_num").as("txns")) > val spend = df.groupBy("cust_id").agg($"cust_id", > sum($"extended_price").as("spend")) > val rmJoin = txns.join(spend, txns("cust_id") === spend("cust_id"), "inner") > scala> rmJoin.explain > == Physical Plan == > CartesianProduct > Filter (cust_id#0 = cust_id#0) > Aggregate false, [cust_id#0], [cust_id#0,CombineAndCount(partialSets#25) AS > txns#7L] >Exchange (HashPartitioning [cust_id#0], 200) > Aggregate true, [cust_id#0], [cust_id#0,AddToHashSet(day_num#2L) AS > partialSets#25] > PhysicalRDD [cust_id#0,day_num#2L], MapPartitionsRDD[1] at map at > newParquet.scala:542 > Aggregate false, [cust_id#17], [cust_id#17,SUM(PartialSum#38) AS spend#8] > Exchange (HashPartitioning [cust_id#17], 200) >Aggregate true, [cust_id#17], [cust_id#17,SUM(extended_price#20) AS > PartialSum#38] > PhysicalRDD [cust_id#17,extended_price#20], MapPartitionsRDD[3] at map at > newParquet.scala:542 > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-7035) Drop __getattr__ on pyspark.sql.DataFrame
[ https://issues.apache.org/jira/browse/SPARK-7035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14506547#comment-14506547 ] Kalle Jepsen commented on SPARK-7035: - I think the same applies to the Pandas Dataframe, but that doesn't make it less of a problem. > Drop __getattr__ on pyspark.sql.DataFrame > - > > Key: SPARK-7035 > URL: https://issues.apache.org/jira/browse/SPARK-7035 > Project: Spark > Issue Type: Improvement > Components: PySpark >Affects Versions: 1.4.0 >Reporter: Kalle Jepsen > > I think the {{\_\_getattr\_\_}} method on the DataFrame should be removed. > There is no point in having the possibility to address the DataFrames columns > as {{df.column}}, other than the questionable goal to please R developers. > And it seems R people can use Spark from their native API in the future. > I see the following problems with {{\_\_getattr\_\_}} for column selection: > * It's un-pythonic: There should only be one obvious way to solve a problem, > and we can already address columns on a DataFrame via the {{\_\_getitem\_\_}} > method, which in my opinion is by far superior and a lot more intuitive. > * It leads to confusing Exceptions. When we mistype a method-name the > {{AttributeError}} will say 'No such column ... '. > * And most importantly: we cannot load DataFrames that have columns with the > same name as any attribute on the DataFrame-object. Imagine having a > DataFrame with a column named {{cache}} or {{filter}}. Calling {{df.cache()}} > will be ambiguous and lead to broken code. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-7035) Drop __getattr__ on pyspark.sql.DataFrame
Kalle Jepsen created SPARK-7035: --- Summary: Drop __getattr__ on pyspark.sql.DataFrame Key: SPARK-7035 URL: https://issues.apache.org/jira/browse/SPARK-7035 Project: Spark Issue Type: Improvement Components: PySpark Affects Versions: 1.4.0 Reporter: Kalle Jepsen I think the {{\_\_getattr\_\_}} method on the DataFrame should be removed. There is no point in having the possibility to address the DataFrames columns as {{df.column}}, other than the questionable goal to please R developers. And it seems R people can use Spark from their native API in the future. I see the following problems with {{\_\_getattr\_\_}} for column selection: * It's un-pythonic: There should only be one obvious way to solve a problem, and we can already address columns on a DataFrame via the {{\_\_getitem\_\_}} method, which in my opinion is by far superior and a lot more intuitive. * It leads to confusing Exceptions. When we mistype a method-name the {{AttributeError}} will say 'No such column ... '. * And most importantly: we cannot load DataFrames that have columns with the same name as any attribute on the DataFrame-object. Imagine having a DataFrame with a column named {{cache}} or {{filter}}. Calling {{df.cache()}} will be ambiguous and lead to broken code. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6189) Pandas to DataFrame conversion should check field names for periods
[ https://issues.apache.org/jira/browse/SPARK-6189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14386625#comment-14386625 ] Kalle Jepsen commented on SPARK-6189: - I do not really understand why the column names have to be accessible directly as attributes anyway. What advantage does this yield above indexing? This basically restricts us on the ASCII character set for column names, doesn't it? Data in the wild may have all kinds of weird field names, including special characters, umlauts, accents and whatnot. Automatic renaming isn't very nice too, for the very reason already pointed out by mgdadv. Also, we cannot simply replace all illegal characters by underscores. The fields {{'ä.ö'}} and {{'ä.ü'}} would both be renamed to {{'___'}}. Besides, leading underscores have a somewhat special meaning in Python, potentially resulting in further confusion. I think {{df\['a.b'\]}} should definitely work, even if the columns contain non-ASCII characters and a warning should be issued when creating the DataFrame, informing the user that direct column access via attribute name will not work with the given column names. > Pandas to DataFrame conversion should check field names for periods > --- > > Key: SPARK-6189 > URL: https://issues.apache.org/jira/browse/SPARK-6189 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 1.3.0 >Reporter: Joseph K. Bradley >Priority: Minor > > Issue I ran into: I imported an R dataset in CSV format into a Pandas > DataFrame and then use toDF() to convert that into a Spark DataFrame. The R > dataset had a column with a period in it (column "GNP.deflator" in the > "longley" dataset). When I tried to select it using the Spark DataFrame DSL, > I could not because the DSL thought the period was selecting a field within > GNP. > Also, since "GNP" is another field's name, it gives an error which could be > obscure to users, complaining: > {code} > org.apache.spark.sql.AnalysisException: GetField is not valid on fields of > type DoubleType; > {code} > We should either handle periods in column names or check during loading and > warn/fail gracefully. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-6553) Support for functools.partial as UserDefinedFunction
Kalle Jepsen created SPARK-6553: --- Summary: Support for functools.partial as UserDefinedFunction Key: SPARK-6553 URL: https://issues.apache.org/jira/browse/SPARK-6553 Project: Spark Issue Type: Improvement Components: PySpark Affects Versions: 1.3.0 Reporter: Kalle Jepsen Currently {{functools.partial}} s cannot be used as {{UserDefinedFunction}} s for {{DataFrame}} s, as the {{\_\_name\_\_}} attribute does not exist. Passing a {{functools.partial}} object will raise an Exception at https://github.com/apache/spark/blob/master/python/pyspark/sql/functions.py#L126. {{functools.partial}} is very widely used and should probably be supported, despite its lack of a {{\_\_name\_\_}}. My suggestion is to use {{f.\_\_repr\_\_()}} instead, or check with {{hasattr(f, '\_\_name\_\_)}} and use {{\_\_class\_\_}} if {{False}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org