Re: SparkSQL 1.3.0 (RC3) failed to read parquet file generated by 1.1.1
Hi all My team has the same issue. It looks like Spark 1.3's sparkSQL cannot read parquet file generated by Spark 1.1. It will cost a lot of migration work when we wanna to upgrade Spark 1.3. Is there anyone can help me? Thanks Wisely Chen On Tue, Mar 10, 2015 at 5:06 PM, Pei-Lun Lee pl...@appier.com wrote: Hi, I found that if I try to read parquet file generated by spark 1.1.1 using 1.3.0-rc3 by default settings, I got this error: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'StructType': was expecting ('true', 'false' or 'null') at [Source: StructType(List(StructField(a,IntegerType,false))); line: 1, column: 11] at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1419) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:508) at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2300) at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1459) at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:683) at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3105) at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3051) at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2161) at org.json4s.jackson.JsonMethods$class.parse(JsonMethods.scala:19) at org.json4s.jackson.JsonMethods$.parse(JsonMethods.scala:44) at org.apache.spark.sql.types.DataType$.fromJson(dataTypes.scala:41) at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$readSchema$1$$anonfun$25.apply(newParquet.scala:675) at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$readSchema$1$$anonfun$25.apply(newParquet.scala:675) this is how I save parquet file with 1.1.1: sql(select 1 as a).saveAsParquetFile(/tmp/foo) and this is the meta data of the 1.1.1 parquet file: creator: parquet-mr version 1.4.3 extra: org.apache.spark.sql.parquet.row.metadata = StructType(List(StructField(a,IntegerType,false))) by comparison, this is 1.3.0 meta: creator: parquet-mr version 1.6.0rc3 extra: org.apache.spark.sql.parquet.row.metadata = {type:struct,fields:[{name:a,type:integer,nullable:t [more]... It looks like now ParquetRelation2 is used to load parquet file by default and it only recognizes JSON format schema but 1.1.1 schema was case class string format. Setting spark.sql.parquet.useDataSourceApi to false will fix it, but I don't know the differences. Is this considered a bug? We have a lot of parquet files from 1.1.1, should we disable data source api in order to read them if we want to upgrade to 1.3? Thanks, -- Pei-Lun
Profiling Spark: MemoryStore
Hi, I am working on artificial neural networks for Spark. It is solved with Gradient Descent, so each step the data is read, sum of gradients is calculated for each data partition (on each worker), aggregated (on the driver) and broadcasted back. I noticed that the gradient computation time is few times less than the total time needed for each step. To narrow down my observation, I run the gradient on a single machine with single partition of data of site 100MB that I persist (data.persist). This should minimize the overhead for aggregation at least, but the gradient computation still takes much less time than the whole step. Just in case, data is loaded by MLUtil. loadLibSVMFile in RDD[LabeledPoint], this is my code: val conf = new SparkConf().setAppName(myApp).setMaster(local[2]) val train = MLUtils.loadLibSVMFile(new SparkContext(conf), /data/mnist/mnist.scale).repartition(1).persist() val model = ANN2Classifier.train(train, 1000, Array[Int](32), 10, 1e-4) //training data, batch size, hidden layer size, iterations, LBFGS tolerance Profiler shows that there are two threads, one is doing Gradient and the other I don't know what. The Gradient takes 10% of this thread. Almost all other time is spent by MemoryStore. Below is the screenshot (first thread): https://drive.google.com/file/d/0BzYMzvDiCep5bGp2S2F6eE9TRlk/view?usp=sharing Second thread: https://drive.google.com/file/d/0BzYMzvDiCep5OHA0WUtQbXd3WmM/view?usp=sharing Could Spark developers please elaborate what's going on in MemoryStore? It seems that it does some string operations (parsing libsvm file? Why every step?) and a lot of InputStream reading. It seems that the overall time depends on the size of the data batch (or size of vector) I am processing. However it does not seems linear to me. Also, I would like to know how to speedup these operations. Best regards, Alexander
Spilling when not expected
Hi all, I'm running the teraSort benchmark with a relative small input set: 5GB. During profiling, I can see I am using a total of 68GB. I've got a terabyte of memory in my system, and set spark.executor.memory 900g spark.driver.memory 900g I use the default for spark.shuffle.memoryFraction spark.storage.memoryFraction I believe that I now have 0.2*900=180GB for shuffle and 0.6*900=540GB for storage. I noticed a lot of variation in runtime (under the same load), and tracked this down to this function in core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala private def spillToPartitionFiles(collection: SizeTrackingPairCollection[(Int, K), C]): Unit = { spillToPartitionFiles(collection.iterator) } In a slow run, it would loop through this function 12000 times, in a fast run only 700 times, even though the settings in both runs are the same and there are no other users on the system. When I look at the function calling this (insertAll, also in ExternalSorter), I see that spillToPartitionFiles is only called 700 times in both fast and slow runs, meaning that the function recursively calls itself very often. Because of the function name, I assume the system is spilling to disk. As I have sufficient memory, I assume that I forgot to set a certain memory setting. Anybody any idea which other setting I have to set, in order to not spill data in this scenario? Thanks, Tom -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Spilling-when-not-expected-tp11017.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Is this a bug in MLlib.stat.test ? About the mapPartitions API used in Chi-Squared test
The checks against maxCategories are not for statistical purposes; they are to make sure communication does not blow up. There currently are not checks to make sure that there are enough entries for statistically significant results. That is up to the user. I do like the idea of adding a warning. A reasonable fix for now might be to print a logWarning message and add a note to the documentation. On the JIRA, we could also discuss whether the result should be set to some value to indicate a meaningless test (e.g., a very bad fixed pValue). I made a JIRA to track this issue: SPARK-6312 Joseph On Thu, Mar 12, 2015 at 12:13 AM, Chunnan Yao yaochun...@gmail.com wrote: Hi everyone! I am digging into MLlib of Spark 1.2.1 currently. When reading codes of MLlib.stat.test, in the file ChiSqTest.scala under /spark/mllib/src/main/scala/org/apache/spark/mllib/stat/test, I am confused by the usage of mapPartitions API in the function def chiSquaredFeatures(data: RDD[LabeledPoint], methodName: String = PEARSON.name): Array[ChiSqTestResult] According to my statistical testing knowledge, Chi-Square test requires large numbers (5 for 80% entries) in its contingency matrix in order to satisfy good approximation (http://en.wikipedia.org/wiki/Pearson%27s_chi-squared_test). Thus the number of feature label categories cannot be too large because if otherwise, there would be too few items in each categories, which fails to meet the constraint in usage of Chi-square test. I do see in the function above, Spark will throw exceptions when distinctLabels.size and distinctFeatures.size exceed maxCategories defined as 1, but the two HashSets distinctLabels and distinctFeatures are initialized inside mapPartition, which means Spark will only be sensitive to the number of feature label categories in one partition. This will make the reduced result---contingency matrix still have exceeded number of categories and thus small matrix entries which makes Chi-Square inaccurate. I've made a unit test on this function, which proves the case. Maybe I am just being trapped by a misunderstanding. Could any one please give me a hint on this issue? - Feel the sparking Spark! -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Is-this-a-bug-in-MLlib-stat-test-About-the-mapPartitions-API-used-in-Chi-Squared-test-tp11015.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org