Re: Spark 1.1.0 with Hadoop 2.5.0
That's a Hive version issue, not Hadoop version issue. On Tue, Oct 7, 2014 at 7:21 AM, Li HM hmx...@gmail.com wrote: Thanks for the replied. Please refer to my another post entitled How to make ./bin/spark-sql work with hive. It has all the error/exceptions I am getting. If I understand you correctly, I can build the package with mvn -Phive,hadoop-2.4 -Dhadoop.version=2.5.0 clean package This is what I actually tried. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
HiveServer1 and SparkSQL
Hi Shark supported both the HiveServer1 and HiveServer2 thrift interfaces (using $ bin/shark -service sharkserver[1 or 2]). SparkSQL seems to support only HiveServer2. I was wondering what is involved to add support for HiveServer1. Is this something straightforward to do that I can embark on myself. I have some legacy clients (and users using TOAD for CLoud) that only work with the HiveServer1 interface. Deenar -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/HiveServer1-and-SparkSQL-tp15832.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming saveAsNewAPIHadoopFiles
Hi All, Continuing on this discussion... Is there a good reason why the def of saveAsNewAPIHadoopFiles in org/apache/spark/streaming/api/java/JavaPairDStream.scala is defined like this - def saveAsNewAPIHadoopFiles( prefix: String, suffix: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ : NewOutputFormat[_, _]], conf: Configuration = new Configuration) { dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf) } As pointed out earlier due to type erasure on the Java side we have to add this code to keep the compiler quite @SuppressWarnings(unchecked) Class? extends OutputFormat?,? outputFormatClass = (Class? extends OutputFormat?,?) (Class?) SequenceFileOutputFormat.class; Its works fine, but adds a layer of confusion and inconsistency when compared to its counterpart from the regular RDD saveAsNewAPIHadoopFile as defined in spark / core / src / main / scala / org / apache / spark / api / java / JavaPairRDD.scala /** Output the RDD to any Hadoop-supported file system. */ def saveAsNewAPIHadoopFile[F : NewOutputFormat[_, _]]( path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[F], conf: Configuration) { rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, conf) } So, is it possible to change the code for saveAsNewAPIHadoopFiles in org/apache/spark/streaming/api/java/JavaPairDStream.scala to the following - def saveAsNewAPIHadoopFiles[F : NewOutputFormat[_, _]]( prefix: String, suffix: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[F], conf: Configuration = new Configuration) { dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf) } Less confusion, more readability and better consistency... -abe On Mon, Oct 6, 2014 at 1:51 PM, Abraham Jacob abe.jac...@gmail.com wrote: Sean, Thanks a ton Sean... This is exactly what I was looking for. As mentioned in the code - // This horrible, separate declaration is necessary to appease the compiler @SuppressWarnings(unchecked) Class? extends OutputFormat?,? outputFormatClass = (Class? extends OutputFormat?,?) (Class?) SequenceFileOutputFormat.class; writableDStream.saveAsNewAPIHadoopFiles(dataDirString + /oryx, data, keyWritableClass, messageWritableClass, outputFormatClass, streamingContext.sparkContext().hadoopConfiguration()); I was just having a hard time with the OutputFormatClass parameter. The scala code in org/apache/spark/streaming/api/java/JavaPairDStream.scala defines saveAsNewAPIHadoopFiles as the following - /** * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is * generated based on `prefix` and `suffix`: prefix-TIME_IN_MS.suffix. */ def saveAsNewAPIHadoopFiles( prefix: String, suffix: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ : NewOutputFormat[_, _]], conf: Configuration = new Configuration) { dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf) } The problem is that Class[_ : NewOutputFormat[_, _]] in scala cannot be assigned as the following (say you are using TextOutputFormat Text as KeyClass and IntWritable as ValueClass) TextOuputFormatText, IntWritable.class in java due to 'type erasure. The parameterized types lose there type arguments when they are translated to byte code during compilation. Looks like adding this works - @SuppressWarnings(unchecked) Class? extends OutputFormat?,? outputFormatClass = (Class? extends OutputFormat?,?) (Class?) SequenceFileOutputFormat.class; Thanks again Sean... On Mon, Oct 6, 2014 at 12:23 PM, Sean Owen so...@cloudera.com wrote: Here's an example: https://github.com/OryxProject/oryx/blob/master/oryx-lambda/src/main/java/com/cloudera/oryx/lambda/BatchLayer.java#L131 On Mon, Oct 6, 2014 at 7:39 PM, Abraham Jacob abe.jac...@gmail.com wrote: Hi All, Would really appreciate from the community if anyone has implemented the saveAsNewAPIHadoopFiles method in Java found in the org.apache.spark.streaming.api.java.JavaPairDStreamK,V Any code snippet or online link would be greatly appreciated. Regards, Jacob -- ~ -- ~
Re: Strategies for reading large numbers of files
Hi Landon I had a problem very similar to your, where we have to process around 5 million relatively small files on NFS. After trying various options, we did something similar to what Matei suggested. 1) take the original path and find the subdirectories under that path and then parallelize the resulting list. you can configure the depth you want to go down to before sending the paths across the cluster. def getFileList(srcDir:File, depth:Int) : List[File] = { var list : ListBuffer[File] = new ListBuffer[File]() if (srcDir.isDirectory()) { srcDir.listFiles() .foreach((file: File) = if (file.isFile()) { list +=(file) } else { if (depth 0 ) { list ++= getFileList(file, (depth- 1 )) } else if (depth 0) { list ++= getFileList(file, (depth)) } else { list += file } }) } else { list += srcDir } list .toList } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Strategies-for-reading-large-numbers-of-files-tp15644p15835.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Parsing one big multiple line .xml loaded in RDD using Python
Hi, I have already unsucesfully asked quiet simmilar question at stackoverflow, particularly here: http://stackoverflow.com/questions/26202978/spark-and-python-trying-to-parse-wikipedia-using-gensim. I've also unsucessfully tryied some workaround, but unsucessfuly, workaround problem can be found at http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Python-using-generator-of-data-bigger-than-RAM-as-input-to-sc-parallelize-td15789.html. Particularly what I'm trying to do, I have .xml dump of wikipedia as the input. The .xml is quite big and it spreads across multiple lines. You can check it out at http://dumps.wikimedia.org/enwiki/latest/enwiki-latest-pages-articles.xml.bz2. My goal is to parse this .xml in a same way as gensim.corpora.wikicorpus.extract_pages do, implementation is at https://github.com/piskvorky/gensim/blob/develop/gensim/corpora/wikicorpus.py. Unfortunately this method does not work, because RDD.flatMap() process the RDD line by line as strings. Does anyone has some suggestion of how to possibly parse the wikipedia like .xml loaded in RDD using Python? Thank you in advance for any suggestions, advices or hints. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Relation between worker memory and executor memory in standalone mode
Try to set --total-executor-cores to limit how many total cores it can use. Thanks Regards, Meethu M On Thursday, 2 October 2014 2:39 AM, Akshat Aranya aara...@gmail.com wrote: I guess one way to do so would be to run 1 worker per node, like say, instead of running 1 worker and giving it 8 cores, you can run 4 workers with 2 cores each. Then, you get 4 executors with 2 cores each. On Wed, Oct 1, 2014 at 1:06 PM, Boromir Widas vcsub...@gmail.com wrote: I have not found a way to control the cores yet. This effectively limits the cluster to a single application at a time. A subsequent application shows in the 'WAITING' State on the dashboard. On Wed, Oct 1, 2014 at 2:49 PM, Akshat Aranya aara...@gmail.com wrote: On Wed, Oct 1, 2014 at 11:33 AM, Akshat Aranya aara...@gmail.com wrote: On Wed, Oct 1, 2014 at 11:00 AM, Boromir Widas vcsub...@gmail.com wrote: 1. worker memory caps executor. 2. With default config, every job gets one executor per worker. This executor runs with all cores available to the worker. By the job do you mean one SparkContext or one stage execution within a program? Does that also mean that two concurrent jobs will get one executor each at the same time? Experimenting with this some more, I figured out that an executor takes away spark.executor.memory amount of memory from the configured worker memory. It also takes up all the cores, so even if there is still some memory left, there are no cores left for starting another executor. Is my assessment correct? Is there no way to configure the number of cores that an executor can use? On Wed, Oct 1, 2014 at 11:04 AM, Akshat Aranya aara...@gmail.com wrote: Hi, What's the relationship between Spark worker and executor memory settings in standalone mode? Do they work independently or does the worker cap executor memory? Also, is the number of concurrent executors per worker capped by the number of CPU cores configured for the worker?
Re: Hive Parquet Serde from Spark
I have found related PRs in the parquet-mr project: https://github.com/Parquet/parquet-mr/issues/324, however using that version of the bundle doesn't solve the issue. The issue seems to related to packaged scope in separate class loaders. I am busy looking for a work around. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Hive-Parquet-Serde-from-Spark-tp15787p15838.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Cannot read from s3 using sc.textFile
Hello, I'm trying to read from s3 using a simple spark java app: - SparkConf sparkConf = new SparkConf().setAppName(TestApp); sparkConf.setMaster(local); JavaSparkContext sc = new JavaSparkContext(sparkConf); sc.hadoopConfiguration().set(fs.s3.awsAccessKeyId, XX); sc.hadoopConfiguration().set(fs.s3.awsSecretAccessKey, XX); String path = s3://bucket/test/testdata; JavaRDDString textFile = sc.textFile(path); System.out.println(textFile.count()); - But getting this error: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: s3://bucket/test/testdata at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:251) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:175) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1097) at org.apache.spark.rdd.RDD.count(RDD.scala:861) at org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:365) at org.apache.spark.api.java.JavaRDD.count(JavaRDD.scala:29) Looking at the debug log I see that org.jets3t.service.impl.rest.httpclient.RestS3Service returned 404 error trying to locate the file. Using a simple java program with com.amazonaws.services.s3.AmazonS3Client works just fine. Any idea? Thanks, Tomer - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Fwd: Cannot read from s3 using sc.textFile
Hello, I'm trying to read from s3 using a simple spark java app: - SparkConf sparkConf = new SparkConf().setAppName(TestApp); sparkConf.setMaster(local); JavaSparkContext sc = new JavaSparkContext(sparkConf); sc.hadoopConfiguration().set(fs.s3.awsAccessKeyId, XX); sc.hadoopConfiguration().set(fs.s3.awsSecretAccessKey, XX); String path = s3://bucket/test/testdata; JavaRDDString textFile = sc.textFile(path); System.out.println(textFile.count()); - But getting this error: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: s3://bucket/test/testdata at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:251) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:175) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1097) at org.apache.spark.rdd.RDD.count(RDD.scala:861) at org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:365) at org.apache.spark.api.java.JavaRDD.count(JavaRDD.scala:29) Looking at the debug log I see that org.jets3t.service.impl.rest.httpclient.RestS3Service returned 404 error trying to locate the file. Using a simple java program with com.amazonaws.services.s3.AmazonS3Client works just fine. Any idea? Thanks, Tomer - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Same code --works in spark 1.0.2-- but not in spark 1.1.0
Hi all, My code was working fine in spark 1.0.2 ,but after upgrading to 1.1.0, its throwing exceptions and tasks are getting failed. The code contains some map and filter transformations followed by groupByKey (reduceByKey in another code ). What I could find out is that the code works fine until groupByKey or reduceByKey in both versions.But after that the following errors show up in Spark 1.1.0 java.io.FileNotFoundException: /tmp/spark-local-20141006173014-4178/35/shuffle_6_0_5161 (Too many open files) java.io.FileOutputStream.openAppend(Native Method) java.io.FileOutputStream.init(FileOutputStream.java:210) org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123) org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:192) org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:67) org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:65) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:701) I cleaned my /tmp directory,changed my local directory to another folder ; but nothing helped. Can anyone say what could be the reason .? Thanks Regards, Meethu M
Re: GraphX: Types for the Nodes and Edges
Hi again, Thank you for your suggestion :) I've tried to implement this method but I'm stuck trying to union the payload before creating the graph. Below is a really simplified snippet of what have worked so far. //Reading the articles given in json format val articles = sqlContext.jsonFile(path) articles.registerTempTable(TblArticles) //Creating two abstract vertex types class Vertices() case class Paper(pid: Long, ptitle:String, aid:String) extends Vertices case class Author(aid:String, aname:String) extends Vertices //Using the subclasses as payload with only the fields I need val filteredPapers=sqlContext.sql(SELECT id,title,authorid FROM TblArticles) val vP=filteredPapers.map(line = new Paper(line.getLong(0), line.getString(1), line.getString(2)).cache val filteredAuthors=sqlContext.sql(SELECT authorid, name FROM TblArticles) val vA=filteredAuthors.map(line = new Author(line.getString(0), line.getString(1)).cache //Let's assume for now there's only one edge type - AUTHORS val Authedges: RDD[Edge[String]] = vP.map { t = Edge(t.authorid.toLong, t.id, AUTHORS)} 1. vP and vA are RDDs, how do I convert them to vertexRDDs and perform the union? 2. Should the graph be then created as val graph=Graph[Vertices,Authedges,String] Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-Types-for-the-Nodes-and-Edges-tp15486p15842.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unable to ship external Python libraries in PYSPARK
Hi David, Thanks for the reply and effort u put to explain the concepts.Thanks for example.It worked. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-ship-external-Python-libraries-in-PYSPARK-tp14074p15844.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.1.0 with Hadoop 2.5.0
The build command should be correct. What exact error did you encounter when trying Spark 1.1 + Hive 0.12 + Hadoop 2.5.0? On 10/7/14 2:21 PM, Li HM wrote: Thanks for the replied. Please refer to my another post entitled How to make ./bin/spark-sql work with hive. It has all the error/exceptions I am getting. If I understand you correctly, I can build the package with mvn -Phive,hadoop-2.4 -Dhadoop.version=2.5.0 clean package This is what I actually tried. On Mon, Oct 6, 2014 at 11:03 PM, Sean Owen so...@cloudera.com wrote: The hadoop-2.4 profile is really intended to be Hadoop 2.4+. It should compile and run fine with Hadoop 2.5 as far as I know. CDH 5.2 is Hadoop 2.5 + Spark 1.1, so there is evidence it works. You didn't say what doesn't work. On Tue, Oct 7, 2014 at 6:07 AM, hmxxyy hmx...@gmail.com wrote: Does Spark 1.1.0 work with Hadoop 2.5.0? The maven build instruction only has command options up to hadoop 2.4. Anybody ever made it work? I am trying to run spark-sql with hive 0.12 on top of hadoop 2.5.0 but can't make it work. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-1-0-with-Hadoop-2-5-0-tp15827.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL -- more than two tables for join
Hi, the same problem happens when I try several joins together, such as 'SELECT * FROM sales INNER JOIN magasin ON sales.STO_KEY = magasin.STO_KEY INNER JOIN eans ON (sales.BARC_KEY = eans.BARC_KEY and magasin.FORM_KEY = eans.FORM_KEY)' The error information is as follow: py4j.protocol.Py4JJavaError: An error occurred while calling o1229.sql. : java.lang.RuntimeException: [1.269] failure: ``UNION'' expected but `INNER' fo und SELECT sales.Date AS Date, sales.ID_FOYER AS ID_FOYER, Sales.STO_KEY AS STO_KEY, sales.Quantite AS Quantite, sales.Prix AS Prix, sales.Total AS Total, magasin.F ORM_KEY AS FORM_KEY, eans.UB_KEY AS UB_KEY FROM sales INNER JOIN magasin ON sale s.STO_KEY = magasin.STO_KEY INNER JOIN eans ON (sales.BARC_KEY = eans.BARC_KEY a nd magasin.FORM_KEY = eans.FORM_KEY) ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.SqlParser.apply(SqlParser.scala:60) at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:73) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:260) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl. java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces sorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) I have an impression that sparksql doesn't support more than two joins -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-more-than-two-tables-for-join-tp13865p15847.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL -- more than two tables for join
Hi, in fact, the same problem happens when I try several joins together: SELECT * FROM sales INNER JOIN magasin ON sales.STO_KEY = magasin.STO_KEY INNER JOIN eans ON (sales.BARC_KEY = eans.BARC_KEY and magasin.FORM_KEY = eans.FORM_KEY) py4j.protocol.Py4JJavaError: An error occurred while calling o1229.sql. : java.lang.RuntimeException: [1.269] failure: ``UNION'' expected but `INNER' found SELECT sales.Date AS Date, sales.ID_FOYER AS ID_FOYER, Sales.STO_KEY AS STO_KEY,sales.Quantite AS Quantite, sales.Prix AS Prix, sales.Total AS Total, magasin.FORM_KEY AS FORM_KEY, eans.UB_KEY AS UB_KEY FROM sales INNER JOIN magasin ON sales.STO_KEY = magasin.STO_KEY INNER JOIN eans ON (sales.BARC_KEY = eans.BARC_KEY and magasin.FORM_KEY = eans.FORM_KEY) at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.SqlParser.apply(SqlParser.scala:60) at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:73) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:260) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) I use spark 1.1.0, so I have an impression that sparksql doesn't support several joins together. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-more-than-two-tables-for-join-tp13865p15848.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
akka.remote.transport.netty.NettyTransport
Hi All I have one master and one worker on AWS (amazon web service) and am running spark map reduce code provided on the link https://spark.apache.org/examples.html We are using Spark version 1.0.2 Word Count val file = spark.textFile(hdfs://...) val counts = file.flatMap(line = line.split( )) .map(word = (word, 1)) .reduceByKey(_ + _) counts.saveAsTextFile(hdfs://...) To get this working I have compiled a jar file. spark-submit --master spark://ip-172-31-24-183.ec2.internal:7077 --class Sample /home/ec2-user/scalatest/target/scala-2.11/test_big_2.11-0.1.jar when I run the spark job it gives me error: Connection refused - 1. There is not result shown on the master 2. Error on worker is as below 14/10/07 10:16:16 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@ip-172-31-27-51.ec2.internal:39142] - [akka.tcp://sparkExecutor@ip-172-31-27-51.ec2.internal:38752]: Error [Association failed with [akka.tcp://sparkExecutor@ip-172-31-27-51.ec2.internal:38752]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkExecutor@ip-172-31-27-51.ec2.internal:38752] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: ip-172-31-27-51.ec2.internal/172.31.27.51:38752 ] Could some one help clarify if this is a * spark issue * issue with the jar file * or if slave cannot connect to master or itself or some port ? Please help Thanks Jacob
Re: Parsing one big multiple line .xml loaded in RDD using Python
Maybe sc.wholeTextFile() is what you want, you can get the whole text and parse it by yourself. On Tue, Oct 7, 2014 at 1:06 AM, jan.zi...@centrum.cz wrote: Hi, I have already unsucesfully asked quiet simmilar question at stackoverflow, particularly here: http://stackoverflow.com/questions/26202978/spark-and-python-trying-to-parse-wikipedia-using-gensim. I've also unsucessfully tryied some workaround, but unsucessfuly, workaround problem can be found at http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Python-using-generator-of-data-bigger-than-RAM-as-input-to-sc-parallelize-td15789.html. Particularly what I'm trying to do, I have .xml dump of wikipedia as the input. The .xml is quite big and it spreads across multiple lines. You can check it out at http://dumps.wikimedia.org/enwiki/latest/enwiki-latest-pages-articles.xml.bz2. My goal is to parse this .xml in a same way as gensim.corpora.wikicorpus.extract_pages do, implementation is at https://github.com/piskvorky/gensim/blob/develop/gensim/corpora/wikicorpus.py. Unfortunately this method does not work, because RDD.flatMap() process the RDD line by line as strings. Does anyone has some suggestion of how to possibly parse the wikipedia like .xml loaded in RDD using Python? Thank you in advance for any suggestions, advices or hints. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Cannot read from s3 using sc.textFile
Not sure if it's supposed to work. Can you try newAPIHadoopFile() passing in the required configuration object. On Tue, Oct 7, 2014 at 4:20 AM, Tomer Benyamini tomer@gmail.com wrote: Hello, I'm trying to read from s3 using a simple spark java app: - SparkConf sparkConf = new SparkConf().setAppName(TestApp); sparkConf.setMaster(local); JavaSparkContext sc = new JavaSparkContext(sparkConf); sc.hadoopConfiguration().set(fs.s3.awsAccessKeyId, XX); sc.hadoopConfiguration().set(fs.s3.awsSecretAccessKey, XX); String path = s3://bucket/test/testdata; JavaRDDString textFile = sc.textFile(path); System.out.println(textFile.count()); - But getting this error: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: s3://bucket/test/testdata at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:251) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:175) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1097) at org.apache.spark.rdd.RDD.count(RDD.scala:861) at org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:365) at org.apache.spark.api.java.JavaRDD.count(JavaRDD.scala:29) Looking at the debug log I see that org.jets3t.service.impl.rest.httpclient.RestS3Service returned 404 error trying to locate the file. Using a simple java program with com.amazonaws.services.s3.AmazonS3Client works just fine. Any idea? Thanks, Tomer - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Streaming Fault Tolerance (?)
Reading the Spark Streaming Programming Guide I found a couple of interesting points. First of all, while talking about receivers, it says: *If the number of cores allocated to the application is less than or equal to the number of input DStreams / receivers, then the system will receive data, but not be able to process them.* Then, when talking about fault tolerance it says: *“However, if the worker node where a network receiver was running fails, then a tiny bit of data may be lost, that is, the data received by the system but not yet replicated to other node(s). The receiver will be started on a different node and it will continue to receive data.”* So I asked myself: what happens if I have 2 workers with 2 cores each and one worker dies? From what I've reported above my answer would be: the receiver of the dead worker will be moved to the other worker, so there will be 2 receivers on the same worker. But that worker has 2 cores, so it won't be able to process batches anymore. *Is this possible? * Well, I actually tried: I had 2 workers receiving from Kafka and processing RDDs properly. I killed one of the workers and observed the behaviour in the Spark web UI (port 4040). In the Streaming tab there still are 2 active receivers, both allocated to the only living worker. But the Processed batches number is stuck, as the evidence that no batches have been processed after the worker died. Also, given that the receivers are still active, they are updating Kafka offsets in Zookeeper, meaning that now those messages are lost, unless you replay them resetting the offsets properly (but where to start from?). Right, this was my test. I still hope I'm wrong, but does this mean that your number of workers needs to be decided at the beginning (based on the number of cores available) without a choice to scale the cluster if needed? I mean, I could use 2 workers with 3 cores each, but what if I want to add a new worker after a while? Looking forward to hear your feedback, I suppose this is a pretty important topic to get right. Thanks a lot, Max -- Massimiliano Tomassi web: http://about.me/maxtomassi e-mail: max.toma...@gmail.com
Re: Kafka-HDFS to store as Parquet format
Currently I am not doing anything, if anything change start from scratch. In general I doubt there are many options to account for schema changes. If you are reading files using impala, then it may allow if the schema changes are append only. Otherwise existing Parquet files have to be migrated to new schema. - Original Message - From: Buntu Dev buntu...@gmail.com To: Soumitra Kumar kumar.soumi...@gmail.com Cc: u...@spark.incubator.apache.org Sent: Tuesday, October 7, 2014 10:18:16 AM Subject: Re: Kafka-HDFS to store as Parquet format Thanks for the info Soumitra.. its a good start for me. Just wanted to know how you are managing schema changes/evolution as parquetSchema is provided to setSchema in the above sample code. On Tue, Oct 7, 2014 at 10:09 AM, Soumitra Kumar kumar.soumi...@gmail.com wrote: I have used it to write Parquet files as: val job = new Job val conf = job.getConfiguration conf.set (ParquetOutputFormat.COMPRESSION, CompressionCodecName.SNAPPY.name ()) ExampleOutputFormat.setSchema (job, MessageTypeParser.parseMessageType (parquetSchema)) rdd saveAsNewAPIHadoopFile (rddToFileName (outputDir, em, time), classOf[Void], classOf[Group], classOf[ExampleOutputFormat], conf) - Original Message - From: bdev buntu...@gmail.com To: u...@spark.incubator.apache.org Sent: Tuesday, October 7, 2014 9:51:40 AM Subject: Re: Kafka-HDFS to store as Parquet format After a bit of looking around, I found saveAsNewAPIHadoopFile could be used to specify the ParquetOutputFormat. Has anyone used it to convert JSON to Parquet format or any pointers are welcome, thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-HDFS-to-store-as-Parquet-format-tp15768p15852.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Kafka-HDFS to store as Parquet format
Thanks for the input Soumitra. On Tue, Oct 7, 2014 at 10:24 AM, Soumitra Kumar kumar.soumi...@gmail.com wrote: Currently I am not doing anything, if anything change start from scratch. In general I doubt there are many options to account for schema changes. If you are reading files using impala, then it may allow if the schema changes are append only. Otherwise existing Parquet files have to be migrated to new schema. - Original Message - From: Buntu Dev buntu...@gmail.com To: Soumitra Kumar kumar.soumi...@gmail.com Cc: u...@spark.incubator.apache.org Sent: Tuesday, October 7, 2014 10:18:16 AM Subject: Re: Kafka-HDFS to store as Parquet format Thanks for the info Soumitra.. its a good start for me. Just wanted to know how you are managing schema changes/evolution as parquetSchema is provided to setSchema in the above sample code. On Tue, Oct 7, 2014 at 10:09 AM, Soumitra Kumar kumar.soumi...@gmail.com wrote: I have used it to write Parquet files as: val job = new Job val conf = job.getConfiguration conf.set (ParquetOutputFormat.COMPRESSION, CompressionCodecName.SNAPPY.name ()) ExampleOutputFormat.setSchema (job, MessageTypeParser.parseMessageType (parquetSchema)) rdd saveAsNewAPIHadoopFile (rddToFileName (outputDir, em, time), classOf[Void], classOf[Group], classOf[ExampleOutputFormat], conf) - Original Message - From: bdev buntu...@gmail.com To: u...@spark.incubator.apache.org Sent: Tuesday, October 7, 2014 9:51:40 AM Subject: Re: Kafka-HDFS to store as Parquet format After a bit of looking around, I found saveAsNewAPIHadoopFile could be used to specify the ParquetOutputFormat. Has anyone used it to convert JSON to Parquet format or any pointers are welcome, thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-HDFS-to-store-as-Parquet-format-tp15768p15852.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Cannot read from s3 using sc.textFile
Try using s3n:// instead of s3 (for the credential configuration as well). On Tue, Oct 7, 2014 at 9:51 AM, Sunny Khatri sunny.k...@gmail.com wrote: Not sure if it's supposed to work. Can you try newAPIHadoopFile() passing in the required configuration object. On Tue, Oct 7, 2014 at 4:20 AM, Tomer Benyamini tomer@gmail.com wrote: Hello, I'm trying to read from s3 using a simple spark java app: - SparkConf sparkConf = new SparkConf().setAppName(TestApp); sparkConf.setMaster(local); JavaSparkContext sc = new JavaSparkContext(sparkConf); sc.hadoopConfiguration().set(fs.s3.awsAccessKeyId, XX); sc.hadoopConfiguration().set(fs.s3.awsSecretAccessKey, XX); String path = s3://bucket/test/testdata; JavaRDDString textFile = sc.textFile(path); System.out.println(textFile.count()); - But getting this error: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: s3://bucket/test/testdata at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:251) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:175) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1097) at org.apache.spark.rdd.RDD.count(RDD.scala:861) at org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:365) at org.apache.spark.api.java.JavaRDD.count(JavaRDD.scala:29) Looking at the debug log I see that org.jets3t.service.impl.rest.httpclient.RestS3Service returned 404 error trying to locate the file. Using a simple java program with com.amazonaws.services.s3.AmazonS3Client works just fine. Any idea? Thanks, Tomer - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: return probability \ confidence instead of actual class
Not familiar with scikit SVM implementation ( and I assume you are using linearSVC). To figure out an optimal decision boundary based on the scores obtained, you can use an ROC curve varying your thresholds. On Tue, Oct 7, 2014 at 12:08 AM, Adamantios Corais adamantios.cor...@gmail.com wrote: Well, apparently, the above Python set-up is wrong. Please consider the following set-up which DOES use 'linear' kernel... And the question remains the same: how to interpret Spark results (or why Spark results are NOT bounded between -1 and 1)? On Mon, Oct 6, 2014 at 8:35 PM, Sunny Khatri sunny.k...@gmail.com wrote: One diff I can find is you may have different kernel functions for your training, In Spark, you end up using Linear Kernel whereas for scikit you are using rbk kernel. That can explain the different in the coefficients you are getting. On Mon, Oct 6, 2014 at 10:15 AM, Adamantios Corais adamantios.cor...@gmail.com wrote: Hi again, Finally, I found the time to play around with your suggestions. Unfortunately, I noticed some unusual behavior in the MLlib results, which is more obvious when I compare them against their scikit-learn equivalent. Note that I am currently using spark 0.9.2. Long story short: I find it difficult to interpret the result: scikit-learn SVM always returns a value between 0 and 1 which makes it easy for me to set-up a threshold in order to keep only the most significant classifications (this is the case for both short and long input vectors). On the other hand, Spark MLlib makes it impossible to interpret the results; results are hardly ever bounded between -1 and +1 and hence it is impossible to choose a good cut-off value - results are of no practical use. And here is the strangest thing ever: although - it seems that - MLlib does NOT generate the right weights and intercept, when I feed the MLlib with the weights and intercept from scikit-learn the results become pretty accurate Any ideas about what is happening? Any suggestion is highly appreciated. PS: to make thinks easier I have quoted both of my implantations as well as results, bellow. // SPARK (short input): training_error: Double = 0.0 res2: Array[Double] = Array(-1.4420684459128205E-19, -1.4420684459128205E-19, -1.4420684459128205E-19, 0.3749, 0.7498, 0.7498, 0.7498) SPARK (long input): training_error: Double = 0.0 res2: Array[Double] = Array(-0.782207630902241, -0.782207630902241, -0.782207630902241, 0.9522394329769612, 2.6866864968561632, 2.6866864968561632, 2.6866864968561632) PYTHON (short input): array([[-1.0001], [-1.0001], [-1.0001], [-0.], [ 1.0001], [ 1.0001], [ 1.0001]]) PYTHON (long input): array([[-1.0001], [-1.0001], [-1.0001], [-0.], [ 1.0001], [ 1.0001], [ 1.0001]]) // import analytics.MSC import java.util.Calendar import java.text.SimpleDateFormat import scala.collection.mutable import scala.collection.JavaConversions._ import org.apache.spark.SparkContext._ import org.apache.spark.mllib.classification.SVMWithSGD import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.optimization.L1Updater import com.datastax.bdp.spark.connector.CassandraConnector import com.datastax.bdp.spark.SparkContextCassandraFunctions._ val sc = MSC.sc val lg = MSC.logger //val s_users_double_2 = Seq( // (0.0,Seq(0.0, 0.0, 0.0)), // (0.0,Seq(0.0, 0.0, 0.0)), // (0.0,Seq(0.0, 0.0, 0.0)), // (1.0,Seq(1.0, 1.0, 1.0)), // (1.0,Seq(1.0, 1.0, 1.0)), // (1.0,Seq(1.0, 1.0, 1.0)) //) val s_users_double_2 = Seq( (0.0,Seq(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 2.0, 0.0, 0.0, 0.0)), (0.0,Seq(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 2.0, 0.0, 0.0, 0.0)), (0.0,Seq(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 2.0, 0.0, 0.0, 0.0)), (1.0,Seq(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 1.0, 1.0, 1.0)), (1.0,Seq(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 1.0, 1.0,
Re: Spark SQL -- more than two tables for join
The issue is that you're using SQLContext instead of HiveContext. SQLContext implements a smaller subset of the SQL language and so you're getting a SQL parse error because it doesn't support the syntax you have. Look at how you'd write this in HiveQL, and then try doing that with HiveContext. On Oct 7, 2014, at 7:20 AM, Gen gen.tan...@gmail.com wrote: Hi, in fact, the same problem happens when I try several joins together: SELECT * FROM sales INNER JOIN magasin ON sales.STO_KEY = magasin.STO_KEY INNER JOIN eans ON (sales.BARC_KEY = eans.BARC_KEY and magasin.FORM_KEY = eans.FORM_KEY) py4j.protocol.Py4JJavaError: An error occurred while calling o1229.sql. : java.lang.RuntimeException: [1.269] failure: ``UNION'' expected but `INNER' found SELECT sales.Date AS Date, sales.ID_FOYER AS ID_FOYER, Sales.STO_KEY AS STO_KEY,sales.Quantite AS Quantite, sales.Prix AS Prix, sales.Total AS Total, magasin.FORM_KEY AS FORM_KEY, eans.UB_KEY AS UB_KEY FROM sales INNER JOIN magasin ON sales.STO_KEY = magasin.STO_KEY INNER JOIN eans ON (sales.BARC_KEY = eans.BARC_KEY and magasin.FORM_KEY = eans.FORM_KEY) at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.SqlParser.apply(SqlParser.scala:60) at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:73) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:260) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) I use spark 1.1.0, so I have an impression that sparksql doesn't support several joins together. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-more-than-two-tables-for-join-tp13865p15848.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Stupid Spark question
You can create a new Configuration object in something like a mapPartitions method and use that. It will pick up local Hadoop configuration from the node, but presumably the Spark workers and HDFS data nodes are colocated in this case, so the machines have the correct Hadoop config locally. On Tue, Oct 7, 2014 at 7:01 PM, Steve Lewis lordjoe2...@gmail.com wrote: I am porting a Hadoop job to Spark - One issue is that the workers need to read files from hdfs reading a different file based on the key or in some cases reading an object that is expensive to serialize. This is easy if the worker has access to the JavaSparkContext (I am working in Java) but this cannot be serialized - how can a worker read from a Path - assume hdfs - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.1.0 with Hadoop 2.5.0
Thanks Cheng. Here is the error message after a fresh build. $ mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.5.0 -Phive -DskipTests clean package [INFO] [INFO] Reactor Summary: [INFO] [INFO] Spark Project Parent POM .. SUCCESS [19.117s] [INFO] Spark Project Core SUCCESS [11:24.009s] [INFO] Spark Project Bagel ... SUCCESS [1:09.498s] [INFO] Spark Project GraphX .. SUCCESS [3:41.113s] [INFO] Spark Project Streaming ... SUCCESS [4:25.378s] [INFO] Spark Project ML Library .. SUCCESS [5:43.323s] [INFO] Spark Project Tools ... SUCCESS [44.647s] [INFO] Spark Project Catalyst SUCCESS [4:48.658s] [INFO] Spark Project SQL . SUCCESS [4:56.966s] [INFO] Spark Project Hive SUCCESS [3:45.269s] [INFO] Spark Project REPL SUCCESS [2:11.617s] [INFO] Spark Project YARN Parent POM . SUCCESS [6.723s] [INFO] Spark Project YARN Stable API . SUCCESS [2:20.860s] [INFO] Spark Project Hive Thrift Server .. SUCCESS [1:15.231s] [INFO] Spark Project Assembly SUCCESS [1:41.245s] [INFO] Spark Project External Twitter SUCCESS [50.839s] [INFO] Spark Project External Kafka .. SUCCESS [1:15.888s] [INFO] Spark Project External Flume Sink . SUCCESS [57.807s] [INFO] Spark Project External Flume .. SUCCESS [1:26.589s] [INFO] Spark Project External ZeroMQ . SUCCESS [54.361s] [INFO] Spark Project External MQTT ... SUCCESS [53.901s] [INFO] Spark Project Examples SUCCESS [2:39.407s] [INFO] [INFO] BUILD SUCCESS [INFO] spark-sql use mydb; FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient org.apache.spark.sql.execution.QueryExecutionException: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:302) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:272) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult$lzycompute(NativeCommand.scala:35) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult(NativeCommand.scala:35) at org.apache.spark.sql.hive.execution.NativeCommand.execute(NativeCommand.scala:38) at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd$lzycompute(HiveContext.scala:360) at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd(HiveContext.scala:360) at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:103) at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:98) at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:58) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:291) at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:413) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:226) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) On Tue, Oct 7, 2014 at 6:19 AM, Cheng Lian lian.cs@gmail.com wrote: The build command should be correct. What exact error did you encounter when trying Spark 1.1 + Hive 0.12 + Hadoop 2.5.0? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
MLLib Linear regression
Hi All,I have following classes of features: class A: 15000 featuresclass B: 170 featuresclass C: 900 featuresClass D: 6000 features. I use linear regression (over sparse data). I get excellent results with low RMSE (~0.06) for the following combinations of classes:1. A + B + C 2. B + C + D3. A + B4. A + C5. B + D6. C + D7. D Unfortunately, when I use A + B + C + D (all the features) I get results that don't make any sense -- all weights are zero or below and the indices are only from set A. I also get high MSE. I changed the number of iterations from 100 to 150, 250, or even 400. I still get MSE as (5/ 6). Are there any other parameters that I can play with? Any insight on what could be wrong? Is it somehow it is not able to scale up to 22K features? (I highly doubt that).
RE: MLLib Linear regression
BTW, one detail: When number of iterations is 100 all weights are zero or below and the indices are only from set A. When number of iterations is 150 I see 30+ non-zero weights (when sorted by weight) and indices are distributed across al sets. however MSE is high (5.xxx) and the result does not match the domain knowledge. When number of iterations is 400 I see 30+ non-zero weights (when sorted by weight) and indices are distributed across al sets. however MSE is high (6.xxx) and the result does not match the domain knowledge. Any help will be highly appreciated. From: ssti...@live.com To: user@spark.apache.org Subject: MLLib Linear regression Date: Tue, 7 Oct 2014 13:41:03 -0700 Hi All,I have following classes of features: class A: 15000 featuresclass B: 170 featuresclass C: 900 featuresClass D: 6000 features. I use linear regression (over sparse data). I get excellent results with low RMSE (~0.06) for the following combinations of classes:1. A + B + C 2. B + C + D3. A + B4. A + C5. B + D6. C + D7. D Unfortunately, when I use A + B + C + D (all the features) I get results that don't make any sense -- all weights are zero or below and the indices are only from set A. I also get high MSE. I changed the number of iterations from 100 to 150, 250, or even 400. I still get MSE as (5/ 6). Are there any other parameters that I can play with? Any insight on what could be wrong? Is it somehow it is not able to scale up to 22K features? (I highly doubt that).
Re: Shuffle files
- We set ulimit to 50. But I still get the same too many open files warning. - I tried setting consolidateFiles to True, but that did not help either. I am using a Mesos cluster. Does Mesos have any limit on the number of open files? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185p15869.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
anyone else seeing something like https://issues.apache.org/jira/browse/SPARK-3637
java.lang.NullPointerException at java.nio.ByteBuffer.wrap(ByteBuffer.java:392) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:199) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) My spark application running on Windows 8 keeps crashing with this error and I find no work around
Re: anyone else seeing something like https://issues.apache.org/jira/browse/SPARK-3637
Hi Steve, what Spark version are you running? 2014-10-07 14:45 GMT-07:00 Steve Lewis lordjoe2...@gmail.com: java.lang.NullPointerException at java.nio.ByteBuffer.wrap(ByteBuffer.java:392) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:199) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) My spark application running on Windows 8 keeps crashing with this error and I find no work around
Re: MLLib Linear regression
Did you test different regularization parameters and step sizes? In the combination that works, I don't see A + D. Did you test that combination? Are there any linear dependency between A's columns and D's columns? -Xiangrui On Tue, Oct 7, 2014 at 1:56 PM, Sameer Tilak ssti...@live.com wrote: BTW, one detail: When number of iterations is 100 all weights are zero or below and the indices are only from set A. When number of iterations is 150 I see 30+ non-zero weights (when sorted by weight) and indices are distributed across al sets. however MSE is high (5.xxx) and the result does not match the domain knowledge. When number of iterations is 400 I see 30+ non-zero weights (when sorted by weight) and indices are distributed across al sets. however MSE is high (6.xxx) and the result does not match the domain knowledge. Any help will be highly appreciated. From: ssti...@live.com To: user@spark.apache.org Subject: MLLib Linear regression Date: Tue, 7 Oct 2014 13:41:03 -0700 Hi All, I have following classes of features: class A: 15000 features class B: 170 features class C: 900 features Class D: 6000 features. I use linear regression (over sparse data). I get excellent results with low RMSE (~0.06) for the following combinations of classes: 1. A + B + C 2. B + C + D 3. A + B 4. A + C 5. B + D 6. C + D 7. D Unfortunately, when I use A + B + C + D (all the features) I get results that don't make any sense -- all weights are zero or below and the indices are only from set A. I also get high MSE. I changed the number of iterations from 100 to 150, 250, or even 400. I still get MSE as (5/ 6). Are there any other parameters that I can play with? Any insight on what could be wrong? Is it somehow it is not able to scale up to 22K features? (I highly doubt that). - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark / Kafka connector - CDH5 distribution
Hi All, Does anyone know if CDH5.1.2 packages the spark streaming kafka connector under the spark externals project? -- ~
Storing shuffle files on a Tachyon
Is it possible to store spark shuffle files on Tachyon ?
SparkStreaming program does not start
I'm probably doing something obviously wrong, but I'm not seeing it. I have the program below (in a file try1.scala), which is similar but not identical to the examples. import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ println(Point 0) val appName = try1.scala val master = local[5] val conf = new SparkConf().setAppName(appName).setMaster(master) val ssc = new StreamingContext(conf, Seconds(10)) println(Point 1) val lines = ssc.textFileStream(/Users/spr/Documents/big_data/RSA2014/) println(Point 2) println(lines=+lines) println(Point 3) ssc.start() println(Point 4) ssc.awaitTermination() println(Point 5) I start the program via $S/bin/spark-shell --master local[5] try1.scala The messages I get are mbp-spr:cyber spr$ $S/bin/spark-shell --master local[5] try1.scala 14/10/07 17:36:58 INFO SecurityManager: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/10/07 17:36:58 INFO SecurityManager: Changing view acls to: spr 14/10/07 17:36:58 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(spr) 14/10/07 17:36:58 INFO HttpServer: Starting HTTP Server Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.0.2 /_/ Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_65) Type in expressions to have them evaluated. Type :help for more information. 14/10/07 17:37:01 INFO SecurityManager: Changing view acls to: spr 14/10/07 17:37:01 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(spr) 14/10/07 17:37:01 INFO Slf4jLogger: Slf4jLogger started 14/10/07 17:37:01 INFO Remoting: Starting remoting 14/10/07 17:37:02 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@192.168.0.3:58351] 14/10/07 17:37:02 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@192.168.0.3:58351] 14/10/07 17:37:02 INFO SparkEnv: Registering MapOutputTracker 14/10/07 17:37:02 INFO SparkEnv: Registering BlockManagerMaster 14/10/07 17:37:02 INFO DiskBlockManager: Created local directory at /var/folders/pk/2bm2rq8n0rv499w5s9_c_w6r3b/T/spark-local-20141007173702-054c 14/10/07 17:37:02 INFO MemoryStore: MemoryStore started with capacity 303.4 MB. 14/10/07 17:37:02 INFO ConnectionManager: Bound socket to port 58352 with id = ConnectionManagerId(192.168.0.3,58352) 14/10/07 17:37:02 INFO BlockManagerMaster: Trying to register BlockManager 14/10/07 17:37:02 INFO BlockManagerInfo: Registering block manager 192.168.0.3:58352 with 303.4 MB RAM 14/10/07 17:37:02 INFO BlockManagerMaster: Registered BlockManager 14/10/07 17:37:02 INFO HttpServer: Starting HTTP Server 14/10/07 17:37:02 INFO HttpBroadcast: Broadcast server started at http://192.168.0.3:58353 14/10/07 17:37:02 INFO HttpFileServer: HTTP File server directory is /var/folders/pk/2bm2rq8n0rv499w5s9_c_w6r3b/T/spark-0950f667-aa04-4f6e-9d2e-5a9fab30806c 14/10/07 17:37:02 INFO HttpServer: Starting HTTP Server 14/10/07 17:37:02 INFO SparkUI: Started SparkUI at http://192.168.0.3:4040 2014-10-07 17:37:02.428 java[27725:1607] Unable to load realm mapping info from SCDynamicStore 14/10/07 17:37:02 INFO Executor: Using REPL class URI: http://192.168.0.3:58350 14/10/07 17:37:02 INFO SparkILoop: Created spark context.. Spark context available as sc. Note no messages from any of my println() statements. I could understand that I'm possibly screwing up something in the code, but why am I getting no print-out at all. ??? Suggestions? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkStreaming-program-does-not-start-tp15876.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark / Kafka connector - CDH5 distribution
Thanks Sean, Sorry in my earlier question I meant to type CDH5.1.3 not CDH5.1.2 I presume it's included in spark-streaming_2.10-1.0.0-cdh5.1.3 But for some reason eclipse complains that import org.apache.spark.streaming.kafka cannon be resolved, even though I have included the spark-streaming_2.10-1.0.0-cdh5.1.3.jar file in the project. Where can I find it in the CDH5.1.3 spark distribution? On Tue, Oct 7, 2014 at 3:40 PM, Sean Owen so...@cloudera.com wrote: Yes, it is the entire Spark distribution. On Oct 7, 2014 11:36 PM, Abraham Jacob abe.jac...@gmail.com wrote: Hi All, Does anyone know if CDH5.1.2 packages the spark streaming kafka connector under the spark externals project? -- ~ -- ~
RE: Shuffle files
Are you sure the new ulimit has taken effect? How many cores are you using? How many reducers? In general if a node in your cluster has C assigned cores and you run a job with X reducers then Spark will open C*X files in parallel and start writing. Shuffle consolidation will help decrease the total number of files created but the number of file handles open at any time doesn't change so it won't help the ulimit problem. Quoted from Patrick at: http://apache-spark-user-list.1001560.n3.nabble.com/quot-Too-many-open-files-quot-exception-on-reduceByKey-td2462.html Thanks, Todd -Original Message- From: SK [mailto:skrishna...@gmail.com] Sent: Tuesday, October 7, 2014 2:12 PM To: u...@spark.incubator.apache.org Subject: Re: Shuffle files - We set ulimit to 50. But I still get the same too many open files warning. - I tried setting consolidateFiles to True, but that did not help either. I am using a Mesos cluster. Does Mesos have any limit on the number of open files? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185p15869.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkStreaming program does not start
Try using spark-submit instead of spark-shell On Tue, Oct 7, 2014 at 3:47 PM, spr s...@yarcdata.com wrote: I'm probably doing something obviously wrong, but I'm not seeing it. I have the program below (in a file try1.scala), which is similar but not identical to the examples. import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ println(Point 0) val appName = try1.scala val master = local[5] val conf = new SparkConf().setAppName(appName).setMaster(master) val ssc = new StreamingContext(conf, Seconds(10)) println(Point 1) val lines = ssc.textFileStream(/Users/spr/Documents/big_data/RSA2014/) println(Point 2) println(lines=+lines) println(Point 3) ssc.start() println(Point 4) ssc.awaitTermination() println(Point 5) I start the program via $S/bin/spark-shell --master local[5] try1.scala The messages I get are mbp-spr:cyber spr$ $S/bin/spark-shell --master local[5] try1.scala 14/10/07 17:36:58 INFO SecurityManager: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/10/07 17:36:58 INFO SecurityManager: Changing view acls to: spr 14/10/07 17:36:58 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(spr) 14/10/07 17:36:58 INFO HttpServer: Starting HTTP Server Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.0.2 /_/ Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_65) Type in expressions to have them evaluated. Type :help for more information. 14/10/07 17:37:01 INFO SecurityManager: Changing view acls to: spr 14/10/07 17:37:01 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(spr) 14/10/07 17:37:01 INFO Slf4jLogger: Slf4jLogger started 14/10/07 17:37:01 INFO Remoting: Starting remoting 14/10/07 17:37:02 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@192.168.0.3:58351] 14/10/07 17:37:02 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@192.168.0.3:58351] 14/10/07 17:37:02 INFO SparkEnv: Registering MapOutputTracker 14/10/07 17:37:02 INFO SparkEnv: Registering BlockManagerMaster 14/10/07 17:37:02 INFO DiskBlockManager: Created local directory at /var/folders/pk/2bm2rq8n0rv499w5s9_c_w6r3b/T/spark-local-20141007173702-054c 14/10/07 17:37:02 INFO MemoryStore: MemoryStore started with capacity 303.4 MB. 14/10/07 17:37:02 INFO ConnectionManager: Bound socket to port 58352 with id = ConnectionManagerId(192.168.0.3,58352) 14/10/07 17:37:02 INFO BlockManagerMaster: Trying to register BlockManager 14/10/07 17:37:02 INFO BlockManagerInfo: Registering block manager 192.168.0.3:58352 with 303.4 MB RAM 14/10/07 17:37:02 INFO BlockManagerMaster: Registered BlockManager 14/10/07 17:37:02 INFO HttpServer: Starting HTTP Server 14/10/07 17:37:02 INFO HttpBroadcast: Broadcast server started at http://192.168.0.3:58353 14/10/07 17:37:02 INFO HttpFileServer: HTTP File server directory is /var/folders/pk/2bm2rq8n0rv499w5s9_c_w6r3b/T/spark-0950f667-aa04-4f6e-9d2e-5a9fab30806c 14/10/07 17:37:02 INFO HttpServer: Starting HTTP Server 14/10/07 17:37:02 INFO SparkUI: Started SparkUI at http://192.168.0.3:4040 2014-10-07 17:37:02.428 java[27725:1607] Unable to load realm mapping info from SCDynamicStore 14/10/07 17:37:02 INFO Executor: Using REPL class URI: http://192.168.0.3:58350 14/10/07 17:37:02 INFO SparkILoop: Created spark context.. Spark context available as sc. Note no messages from any of my println() statements. I could understand that I'm possibly screwing up something in the code, but why am I getting no print-out at all. ??? Suggestions? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkStreaming-program-does-not-start-tp15876.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- ~
Re: Shuffle files
@SK: Make sure ulimit has taken effect as Todd mentioned. You can verify via ulimit -a. Also make sure you have proper kernel parameters set in /etc/sysctl.conf (MacOSX) On Tue, Oct 7, 2014 at 3:57 PM, Lisonbee, Todd todd.lison...@intel.com wrote: Are you sure the new ulimit has taken effect? How many cores are you using? How many reducers? In general if a node in your cluster has C assigned cores and you run a job with X reducers then Spark will open C*X files in parallel and start writing. Shuffle consolidation will help decrease the total number of files created but the number of file handles open at any time doesn't change so it won't help the ulimit problem. Quoted from Patrick at: http://apache-spark-user-list.1001560.n3.nabble.com/quot-Too-many-open-files-quot-exception-on-reduceByKey-td2462.html Thanks, Todd -Original Message- From: SK [mailto:skrishna...@gmail.com] Sent: Tuesday, October 7, 2014 2:12 PM To: u...@spark.incubator.apache.org Subject: Re: Shuffle files - We set ulimit to 50. But I still get the same too many open files warning. - I tried setting consolidateFiles to True, but that did not help either. I am using a Mesos cluster. Does Mesos have any limit on the number of open files? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185p15869.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkStreaming program does not start
|| Try using spark-submit instead of spark-shell Two questions: - What does spark-submit do differently from spark-shell that makes you think that may be the cause of my difficulty? - When I try spark-submit it complains about Error: Cannot load main class from JAR: file:/Users/spr/.../try1.scala. My program is not structured as a main class. Does it have to be to run with Spark Streaming? Or with spark-submit? Thanks much. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkStreaming-program-does-not-start-tp15876p15881.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark / Kafka connector - CDH5 distribution
Never mind... my bad... made a typo. looks good. Thanks, On Tue, Oct 7, 2014 at 3:57 PM, Abraham Jacob abe.jac...@gmail.com wrote: Thanks Sean, Sorry in my earlier question I meant to type CDH5.1.3 not CDH5.1.2 I presume it's included in spark-streaming_2.10-1.0.0-cdh5.1.3 But for some reason eclipse complains that import org.apache.spark.streaming.kafka cannon be resolved, even though I have included the spark-streaming_2.10-1.0.0-cdh5.1.3.jar file in the project. Where can I find it in the CDH5.1.3 spark distribution? On Tue, Oct 7, 2014 at 3:40 PM, Sean Owen so...@cloudera.com wrote: Yes, it is the entire Spark distribution. On Oct 7, 2014 11:36 PM, Abraham Jacob abe.jac...@gmail.com wrote: Hi All, Does anyone know if CDH5.1.2 packages the spark streaming kafka connector under the spark externals project? -- ~ -- ~ -- ~
bug with IPython notebook?
Hi I think I found a bug in the iPython notebook integration. I am not sure how to report it I am running spark-1.1.0-bin-hadoop2.4 on an AWS ec2 cluster. I start the cluster using the launch script provided by spark I start iPython notebook on my cluster master as follows and use an ssh tunnel to open the notebook in a browser running on my local computer ec2-user@ip-172-31-20-107 ~]$ IPYTHON_OPTS=notebook --pylab inline --no-browser --port=7000 /root/spark/bin/pyspark Bellow is the code my notebook executes Bug list: 1. Why do I need to create a SparkContext? If I run pyspark interactively The context is created automatically for me 2. The print statement causes the output to be displayed in the terminal I started pyspark, not in the notebooks output Any comments or suggestions would be greatly appreciated Thanks Andy import sys from operator import add from pyspark import SparkContext # only stand alone jobs should create a SparkContext sc = SparkContext(appName=pyStreamingSparkRDDPipe²) data = [1, 2, 3, 4, 5] rdd = sc.parallelize(data) def echo(data): print python recieved: %s % (data) # output winds up in the shell console in my cluster (ie. The machine I launched pyspark from) rdd.foreach(echo) print we are done
spark fold question
Hi, I am using the fold(zeroValue)(t1, t2) on the RDD I noticed that it runs in parallel on all the partitions then aggregates the results from the partitions. My data object is not aggregate-able I was wondering if there's any way to run the fold sequentially. [I am looking to do a foldLeft kind of scala operaton]. Here's what I want: run_partition1 - get_t1_and_send_to_next_partition - run_partition_2 - get_t1_and_send_to_next_partition .. I tried setting coalesce(1, true) on the parent RDD since I have a lot of data (30G) it was trying to shuffle all the data to one node took forever so that's not really an option. Thanks, -C -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-fold-question-tp15888.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark-Shell: OOM: GC overhead limit exceeded
Hi I am new to Spark and trying to develop an application that loads data from Hive. Here is my setup: * Spark-1.1.0 (built using -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive) * Executing Spark-shell on a box with 16 GB RAM * 4 Cores Single Processor * OpenCSV library (SerDe) * Hive table has 100K records While trying to execute a query that does a group-by (select ... group by ...) on a hive table, I get an OOM error. I tried setting the following parameters, but they don't seem to help: spark.executor.memory 2g spark.shuffle.memoryFraction 0.8 spark.storage.memoryFraction 0.1 spark.default.parallelism 24 Any help is appreciated. The stack trace of the error is given below. - Ranga == Stack trace == java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Arrays.copyOf(Arrays.java:3332) at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137) at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121) at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:569) at java.lang.StringBuffer.append(StringBuffer.java:369) at java.io.BufferedReader.readLine(BufferedReader.java:370) at java.io.BufferedReader.readLine(BufferedReader.java:389) at au.com.bytecode.opencsv.CSVReader.getNextLine(CSVReader.java:266) at au.com.bytecode.opencsv.CSVReader.readNext(CSVReader.java:233) at com.bizo.hive.serde.csv.CSVSerde.deserialize(CSVSerde.java:129) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$1.apply(TableReader.scala:279) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$1.apply(TableReader.scala:278) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:157) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:151) at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Shell-OOM-GC-overhead-limit-exceeded-tp15890.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Record-at-a-time model for Spark Streaming
Jianneng, On Wed, Oct 8, 2014 at 8:44 AM, Jianneng Li jiannen...@berkeley.edu wrote: I understand that Spark Streaming uses micro-batches to implement streaming, while traditional streaming systems use the record-at-a-time processing model. The performance benefit of the former is throughput, and the latter is latency. I'm wondering what it would take to implement record-at-a-time for Spark Streaming? Would it be something that is feasible to prototype in one or two months? I think this is so much against the fundamental design concept of Spark Streaming that there would be nothing left of Spark Streaming when you are done with it. Spark is fundamentally based on the idea of an RDD, that is, distributed storage of data, and Spark Streaming basically a wrapper that stores incoming data as an RDD and then processes it as a batch. One item at a time does not match this model. Even if you *were* able to prototype something, I think performance would be abysmal. Tobias
Re: dynamic sliding window duration
Hi, On Wed, Oct 8, 2014 at 4:50 AM, Josh J joshjd...@gmail.com wrote: I have a source which fluctuates in the frequency of streaming tuples. I would like to process certain batch counts, rather than batch window durations. Is it possible to either 1) define batch window sizes Cf. http://apache-spark-user-list.1001560.n3.nabble.com/window-every-n-elements-instead-of-time-based-td2085.html 2) dynamically adjust the duration of the sliding window? That's not possible AFAIK, because you can't change anything in the processing pipeline after StreamingContext has been started. Tobias
Re: Spark 1.1.0 with Hadoop 2.5.0
Here is the hive-site.xml ?xml version=1.0? ?xml-stylesheet type=text/xsl href=configuration.xsl? configuration !-- Hive Execution Parameters -- property namehive.metastore.local/name valuefalse/value descriptioncontrols whether to connect to remove metastore server or open a new metastore server in Hive Client JVM/description /property property namehive.metastore.uris/name valuethrift://*:2513/value descriptionRemote location of the metastore server/description /property property namehive.metastore.warehouse.dir/name value/projects/hcatalog-warehouse/value descriptionlocation of default database for the warehouse/description /property property namehive.metastore.sasl.enabled/name valuetrue/value descriptionIf true, the metastore thrift interface will be secured with SASL. Clients must authenticate with Kerberos./description /property property namehive.metastore.kerberos.principal/name valuehcat/*.com@.COM/value descriptionThe service principal for the metastore thrift server. The special string _HOST will be replaced automatically with the correct host name./description /property property namehive.metastore.client.socket.timeout/name value200/value descriptionMetaStore Client socket timeout in seconds/description /property property namehive.exec.mode.local.auto/name valuefalse/value descriptionLet hive determine whether to run in local mode automatically/description /property property namehive.hadoop.supports.splittable.combineinputformat/name valuetrue/value descriptionHive internal, should be set to true as MAPREDUCE-1597 is present in Hadoop/description /property property namehive.exec.scratchdir/name value/tmp/value descriptionHDFS Scratch space for Hive jobs/description /property property namehive.querylog.location/name value${user.home}/hivelogs/value descriptionLocal Directory where structured hive query logs are created. One file per session is created in this directory. If this variable set to empty string structured log will not be created./description /property property namemapreduce.job.queuename/name valuedefault/value descriptionSet a default queue name for execution of the Hive queries/description /property property namehadoop.clientside.fs.operations/name valuetrue/value descriptionFS operations related to DDL operations are owned by Hive client/description /property property namehive.exec.compress.output/name valuetrue/value description This controls whether the final outputs of a query (to a local/hdfs file or a hive table) is compressed. The compression codec and other options are determined from hadoop config variables mapred.output.compress* /description /property property namehive.exec.compress.intermediate/name valuetrue/value description This controls whether intermediate files produced by hive between multiple map-reduce jobs are compressed. The compression codec and other options are determined from hadoop config variables mapred.output.compress* /description /property property namehive.auto.convert.join/name valuefalse/value description This controls whether intermediate files produced by hive between multiple map-reduce jobs are compressed. The compression codec and other options are determined from hadoop config variables mapred.output.compress* /description /property property namehive.optimize.partition.prune.metadata/name valuetrue/value descriptionThis controls whether metadata optimizations are applied during partition pruning/description /property property namehive.mapred.mode/name valuenonstrict/value descriptionThe mode in which the hive operations are being performed. In strict mode, some risky queries are not allowed to run/description /property property nameio.seqfile.compression.type/name valueBLOCK/value descriptionDetermines how the compression is performed. Can take NONE, RECORD or BLOCK/description /property property namehive.input.format/name valueorg.apache.hadoop.hive.ql.io.CombineHiveInputFormat/value descriptionDetermines the input format. Can take org.apache.hadoop.hive.ql.io.HiveInputFormat or org.apache.hadoop.hive.ql.io.CombineHiveInputFormat (default)/description /property property namemapreduce.input.fileinputformat.split.minsize/name value134217728/value descriptionSize of the minimum split for CombineFileInputFormat (128MB recommended)/description /property property namemapreduce.input.fileinputformat.split.maxsize/name value1073741824/value descriptionSize of maximum split for CombineFileInputFormat (1GB recommended)/description /property property namemapreduce.input.fileinputformat.split.minsize.per.rack/name value134217728/value descriptionSize of minimum split size per rack (128MB recommended)/description /property property namemapreduce.input.fileinputformat.split.minsize.per.node/name value134217728/value descriptionSize of minimum split size per node (128MB
Re: Reading from HBase is too slow
I found the reason why reading HBase is too slow. Although each regionserver serves multiple regions for the table I'm reading, the number of Spark workers allocated by Yarn is too low. Actually, I could see that the table has dozens of regions spread over about 20 regionservers, but only two Spark workers are allocated by Yarn. What is worse, the two workers run one after one. So, the Spark job lost parallelism. *So now the question is : Why are only 2 workers allocated? * The following is the log info in ApplicationMaster Log UI and we can see that only 2 workers are allocated on two nodes (*a04.jsepc.com http://a04.jsepc.com* and *b06 jsepc.com http://jsepc.com*) Showing 4096 bytes. Click here for full log erLauncher: ApplicationAttemptId: appattempt_1412731028648_0157_01 14/10/08 09:55:16 INFO yarn.WorkerLauncher: Registering the ApplicationMaster 14/10/08 09:55:16 INFO yarn.WorkerLauncher: Waiting for Spark driver to be reachable. 14/10/08 09:55:16 INFO yarn.WorkerLauncher: Driver now available: a04.jsepc.com:56888 14/10/08 09:55:16 INFO yarn.WorkerLauncher: Listen to driver: akka.tcp:// sp...@a04.jsepc.com:56888/user/CoarseGrainedScheduler 14/10/08 09:55:16 INFO yarn.WorkerLauncher: *Allocating 2 workers*. 14/10/08 09:55:16 INFO yarn.YarnAllocationHandler: *Will Allocate 2 worker containers, each with 1408 memory* 14/10/08 09:55:16 INFO yarn.YarnAllocationHandler: Container request (host: Any, priority: 1, capability: memory:1408, vCores:1 14/10/08 09:55:16 INFO yarn.YarnAllocationHandler: Container request (host: Any, priority: 1, capability: memory:1408, vCores:1 14/10/08 09:55:20 INFO util.RackResolver: *Resolved a04.jsepc.com http://a04.jsepc.com to /rack1* 14/10/08 09:55:20 INFO util.RackResolver: *Resolved b06.jsepc.com http://b06.jsepc.com to /rack2* 14/10/08 09:55:20 INFO yarn.YarnAllocationHandler: Launching container container_1412731028648_0157_01_02 for on host a04.jsepc.com 14/10/08 09:55:20 INFO yarn.YarnAllocationHandler: Launching WorkerRunnable. driverUrl: akka.tcp:// sp...@a04.jsepc.com:56888/user/CoarseGrainedScheduler, workerHostname: a04.jsepc.com 14/10/08 09:55:21 INFO yarn.YarnAllocationHandler: Launching container container_1412731028648_0157_01_03 for on host b06.jsepc.com 14/10/08 09:55:21 INFO yarn.YarnAllocationHandler: Launching WorkerRunnable. driverUrl: akka.tcp:// sp...@a04.jsepc.com:56888/user/CoarseGrainedScheduler, workerHostname: b06.jsepc.com 14/10/08 09:55:21 INFO yarn.WorkerRunnable: Starting Worker Container 14/10/08 09:55:21 INFO yarn.WorkerRunnable: Starting Worker Container 14/10/08 09:55:21 INFO impl.ContainerManagementProtocolProxy: yarn.client.max-nodemanagers-proxies : 500 14/10/08 09:55:21 INFO impl.ContainerManagementProtocolProxy: yarn.client.max-nodemanagers-proxies : 500 14/10/08 09:55:21 INFO yarn.WorkerRunnable: Setting up ContainerLaunchContext 14/10/08 09:55:21 INFO yarn.WorkerRunnable: Setting up ContainerLaunchContext 14/10/08 09:55:21 INFO yarn.WorkerRunnable: Preparing Local resources 14/10/08 09:55:21 INFO yarn.WorkerRunnable: Preparing Local resources 14/10/08 09:55:21 INFO yarn.WorkerLauncher: All workers have launched. 14/10/08 09:55:21 INFO yarn.WorkerLauncher: Started progress reporter thread - sleep time : 5000 14/10/08 09:55:21 INFO yarn.WorkerRunnable: Prepared Local resources Map(spark.jar - resource { scheme: hdfs host: jsepc-ns port: -1 file: /user/root/.sparkStaging/application_1412731028648_0157/spark-assembly_2.10-0.9.0-cdh5.0.1-hadoop2.3.0-cdh5.0.1.jar } size: 75288668 timestamp: 1412733307395 type: FILE visibility: PRIVATE) 14/10/08 09:55:21 INFO yarn.WorkerRunnable: Prepared Local resources Map(spark.jar - resource { scheme: hdfs host: jsepc-ns port: -1 file: /user/root/.sparkStaging/application_1412731028648_0157/spark-assembly_2.10-0.9.0-cdh5.0.1-hadoop2.3.0-cdh5.0.1.jar } size: 75288668 timestamp: 1412733307395 type: FILE visibility: PRIVATE) 14/10/08 09:55:21 INFO yarn.WorkerRunnable: Setting up worker with commands: List($JAVA_HOME/bin/java -server -XX:OnOutOfMemoryError='kill %p' -Xms1024m -Xmx1024m -Djava.io.tmpdir=$PWD/tmp org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp:// sp...@a04.jsepc.com:56888/user/CoarseGrainedScheduler 2 b06.jsepc.com 1 1 LOG_DIR/stdout 2 LOG_DIR/stderr) 14/10/08 09:55:21 INFO yarn.WorkerRunnable: Setting up worker with commands: List($JAVA_HOME/bin/java -server -XX:OnOutOfMemoryError='kill %p' -Xms1024m -Xmx1024m -Djava.io.tmpdir=$PWD/tmp org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp:// sp...@a04.jsepc.com:56888/user/CoarseGrainedScheduler 1 a04.jsepc.com 1 1 LOG_DIR/stdout 2 LOG_DIR/stderr) 14/10/08 09:55:21 INFO impl.ContainerManagementProtocolProxy: *Opening proxy : a04.jsepc.com:8041 http://a04.jsepc.com:8041* 14/10/08 09:55:21 INFO impl.ContainerManagementProtocolProxy: *Opening proxy : b06.jsepc.com:8041 http://b06.jsepc.com:8041* Here http://pastebin.com/VhfmHPQeis the log printed on console while the Spark job is running.
Re: Shuffle files
You will need to restart your Mesos workers to pick up the new limits as well. On Tue, Oct 7, 2014 at 4:02 PM, Sunny Khatri sunny.k...@gmail.com wrote: @SK: Make sure ulimit has taken effect as Todd mentioned. You can verify via ulimit -a. Also make sure you have proper kernel parameters set in /etc/sysctl.conf (MacOSX) On Tue, Oct 7, 2014 at 3:57 PM, Lisonbee, Todd todd.lison...@intel.com wrote: Are you sure the new ulimit has taken effect? How many cores are you using? How many reducers? In general if a node in your cluster has C assigned cores and you run a job with X reducers then Spark will open C*X files in parallel and start writing. Shuffle consolidation will help decrease the total number of files created but the number of file handles open at any time doesn't change so it won't help the ulimit problem. Quoted from Patrick at: http://apache-spark-user-list.1001560.n3.nabble.com/quot-Too-many-open-files-quot-exception-on-reduceByKey-td2462.html Thanks, Todd -Original Message- From: SK [mailto:skrishna...@gmail.com] Sent: Tuesday, October 7, 2014 2:12 PM To: u...@spark.incubator.apache.org Subject: Re: Shuffle files - We set ulimit to 50. But I still get the same too many open files warning. - I tried setting consolidateFiles to True, but that did not help either. I am using a Mesos cluster. Does Mesos have any limit on the number of open files? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185p15869.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Same code --works in spark 1.0.2-- but not in spark 1.1.0
Hi Meethu, I believe you may be hitting a regression in https://issues.apache.org/jira/browse/SPARK-3633 If you are able, could you please try running a patched version of Spark 1.1.0 that has commit 4fde28c reverted and see if the errors go away? Posting your results on that bug would be useful, especially if the issues disappear after the revert. Thanks! Andrew On Tue, Oct 7, 2014 at 8:27 AM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi all, My code was working fine in spark 1.0.2 ,but after upgrading to 1.1.0, its throwing exceptions and tasks are getting failed. The code contains some map and filter transformations followed by groupByKey (reduceByKey in another code ). What I could find out is that the code works fine until groupByKey or reduceByKey in both versions.But after that the following errors show up in Spark 1.1.0 java.io.FileNotFoundException: /tmp/spark-local-20141006173014-4178/35/shuffle_6_0_5161 (Too many open files) java.io.FileOutputStream.openAppend(Native Method) java.io.FileOutputStream.init(FileOutputStream.java:210) org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123) org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:192) org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:67) org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:65) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:701) I cleaned my /tmp directory,changed my local directory to another folder ; but nothing helped. Can anyone say what could be the reason .? Thanks Regards, Meethu M
Support for Parquet V2 in ParquetTableSupport?
Hello, I was interested in testing Parquet V2 with Spark SQL, but noticed after some investigation that the parquet writer that Spark SQL uses is fixed at V1 here: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala#L350. Any particular reason Spark SQL is hard-coded to write Parquet V1? Should I expect trouble if I write Parquet V2? Cheers, Michael - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org