Re: [Spark SQL & Core]: RDD to Dataset 1500 columns data with createDataFrame() throw exception of grows beyond 64 KB
Hi Eyal, You can also try to call repartition(1) before calling the "monotonically_increasing_id" function , it will probably have some performance degradation (depends on the size of the files), but might be not as bad as the other workaround after adding the ID you can repartition again in order to get better parallelism Example: spark .read .csv("data") .repartition(1) .withColumn("rowid", monotonically_increasing_id()) .repartition(20) .write.csv("output") Regards Eyal Zituny On Sat, Mar 18, 2017 at 11:58 AM, Kazuaki Ishizaki wrote: > Hi > There is the latest status for code generation. > > When we use the master that will be Spark 2.2, the following exception > occurs. The latest version fixed 64KB errors in this case. However, we meet > another JVM limit, the number of the constant pool entry. > > Caused by: org.codehaus.janino.JaninoRuntimeException: Constant pool for > class > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection > has grown past JVM limit of 0x > at org.codehaus.janino.util.ClassFile.addToConstantPool( > ClassFile.java:499) > at org.codehaus.janino.util.ClassFile.addConstantNameAndTypeInfo( > ClassFile.java:439) > at org.codehaus.janino.util.ClassFile.addConstantMethodrefInfo( > ClassFile.java:358) > ... > > While this PR https://github.com/apache/spark/pull/16648addresses the > number of the constant pool issue, it has not been merged yet. > > Regards, > Kazuaki Ishizaki > > > > From:elevy > To:user@spark.apache.org > Date:2017/03/18 17:14 > Subject:[Spark SQL & Core]: RDD to Dataset 1500 columns data with > createDataFrame() throw exception of grows beyond 64 KB > -- > > > > Hello all, > I am using the Spark 2.1.0 release, > I am trying to load BigTable CSV file with more than 1500 columns into our > system > > Our flow of doing that is: > > • First, read the data as an RDD > • generate continuing record id using the zipWithIndex() >(this operation exist only in RDD API, > in the Dataset there is similar option which is > monotonically_increasing_id() > but this method work in partitioning and create id which is not > sequentially – and it is not what we need ☹) > • converting the RDD to Dataset using the createDataFrame() of > sparkSession > • this last operation generate code that exceeded the JVM object size > limitation of 64KB > > I search the web for some solution or even similar Use Case, > found few issues that talked about the 64KB error but all of the cases was > dealing with 100 column and solved in Spark 2.1.0 version by shrinking the > generated code, > but none of them reach the JVM limitation > > *Any Idea from this forum of expert will be very appreciated * > there could be 2 type of solution we are looking for: > *1.* How should I overcome the size of the code generation > *OR* > *2.* How can I generate sequential ID directly on the Dataset > > Our Temporal Solution: > > • reading the DS as RDD > • generate sequential id > • write the new data as text file > • reading the data as Dataset > this solution cause us 30% of performance degradation :( > > *Code That reproduced the issue * > > /import org.apache.spark.api.java.JavaRDD; > import org.apache.spark.api.java.JavaSparkContext; > import org.apache.spark.sql.*; > import org.apache.spark.sql.types.DataTypes; > import org.apache.spark.sql.types.Metadata; > import org.apache.spark.sql.types.StructField; > import org.apache.spark.sql.types.StructType; > import poc.commons.SparkSessionInitializer; > > import java.util.Arrays; > import java.util.Collections; > import java.util.List; > import java.util.stream.IntStream; > > public class RDDConverter { >private static final int FIELD_COUNT = 1900; > >private Dataset createBigSchema(SparkSession sparkSession , int > startColName, int fieldNumber) { >JavaSparkContext jsc = new > JavaSparkContext(sparkSession.sparkContext()); >SQLContext sqlContext = new SQLContext(sparkSession. > sparkContext()); > >String[] row = IntStream.range(startColName, > fieldNumber).mapToObj(String::valueOf).toArray(String[]::new); >List data = Collections.singletonList(row); >JavaRDD rdd = jsc.parallelize(data).map(RowFactory::create); > >StructField[] structFields = IntStream.range(startColName, > fieldNumber) >.mapToObj(i -> new StructField(String.valueOf(i), > DataTypes.StringType, true, Metadata.empty())) >.toArray(StructField[]::new); >StructType schema = DataTypes.createStructType(structFields); > >Dataset dataSet = sqlContext.createDataFrame(rdd, schema); >dataSet.show(); >return dataSet; >} > >public static void main(String[] args) { >SparkSessionInitializer sparkSessionInitializer = new > SparkSessionInitializer(); >SparkSession sparkSession = sparkSessionInitializer.init(); > >
Re: [Spark SQL & Core]: RDD to Dataset 1500 columns data with createDataFrame() throw exception of grows beyond 64 KB
Hi There is the latest status for code generation. When we use the master that will be Spark 2.2, the following exception occurs. The latest version fixed 64KB errors in this case. However, we meet another JVM limit, the number of the constant pool entry. Caused by: org.codehaus.janino.JaninoRuntimeException: Constant pool for class org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection has grown past JVM limit of 0x at org.codehaus.janino.util.ClassFile.addToConstantPool(ClassFile.java:499) at org.codehaus.janino.util.ClassFile.addConstantNameAndTypeInfo(ClassFile.java:439) at org.codehaus.janino.util.ClassFile.addConstantMethodrefInfo(ClassFile.java:358) ... While this PR https://github.com/apache/spark/pull/16648 addresses the number of the constant pool issue, it has not been merged yet. Regards, Kazuaki Ishizaki From: elevy To: user@spark.apache.org Date: 2017/03/18 17:14 Subject:[Spark SQL & Core]: RDD to Dataset 1500 columns data with createDataFrame() throw exception of grows beyond 64 KB Hello all, I am using the Spark 2.1.0 release, I am trying to load BigTable CSV file with more than 1500 columns into our system Our flow of doing that is: • First, read the data as an RDD • generate continuing record id using the zipWithIndex() (this operation exist only in RDD API, in the Dataset there is similar option which is monotonically_increasing_id() but this method work in partitioning and create id which is not sequentially – and it is not what we need ☹) • converting the RDD to Dataset using the createDataFrame() of sparkSession • this last operation generate code that exceeded the JVM object size limitation of 64KB I search the web for some solution or even similar Use Case, found few issues that talked about the 64KB error but all of the cases was dealing with 100 column and solved in Spark 2.1.0 version by shrinking the generated code, but none of them reach the JVM limitation *Any Idea from this forum of expert will be very appreciated * there could be 2 type of solution we are looking for: *1.* How should I overcome the size of the code generation *OR* *2.* How can I generate sequential ID directly on the Dataset Our Temporal Solution: • reading the DS as RDD • generate sequential id • write the new data as text file • reading the data as Dataset this solution cause us 30% of performance degradation :( *Code That reproduced the issue * /import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.*; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import poc.commons.SparkSessionInitializer; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.stream.IntStream; public class RDDConverter { private static final int FIELD_COUNT = 1900; private Dataset createBigSchema(SparkSession sparkSession , int startColName, int fieldNumber) { JavaSparkContext jsc = new JavaSparkContext(sparkSession.sparkContext()); SQLContext sqlContext = new SQLContext(sparkSession.sparkContext()); String[] row = IntStream.range(startColName, fieldNumber).mapToObj(String::valueOf).toArray(String[]::new); List data = Collections.singletonList(row); JavaRDD rdd = jsc.parallelize(data).map(RowFactory::create); StructField[] structFields = IntStream.range(startColName, fieldNumber) .mapToObj(i -> new StructField(String.valueOf(i), DataTypes.StringType, true, Metadata.empty())) .toArray(StructField[]::new); StructType schema = DataTypes.createStructType(structFields); Dataset dataSet = sqlContext.createDataFrame(rdd, schema); dataSet.show(); return dataSet; } public static void main(String[] args) { SparkSessionInitializer sparkSessionInitializer = new SparkSessionInitializer(); SparkSession sparkSession = sparkSessionInitializer.init(); RDDConverter rddConverter = new RDDConverter(); rddConverter.createBigSchema(sparkSession, 0, FIELD_COUNT); } }/ The Exception we are getting : org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:893) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:950) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:947) at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379) ... 39 common frames omitted *Caused by: org.codehaus.janino.Ja