Re: spark 2.0 readStream from a REST API
Why writeStream is needed to consume the data ? When I tried it I got this exception: INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint > org.apache.spark.sql.AnalysisException: Complete output mode not supported > when there are no streaming aggregations on streaming DataFrames/Datasets; > at > org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:173) > at > org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:65) > at > org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:236) > at > org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:287) > at .(:59) 2016-08-01 18:44 GMT+02:00 Amit Sela <amitsel...@gmail.com>: > I think you're missing: > > val query = wordCounts.writeStream > > .outputMode("complete") > .format("console") > .start() > > Dis it help ? > > On Mon, Aug 1, 2016 at 2:44 PM Jacek Laskowski <ja...@japila.pl> wrote: > >> On Mon, Aug 1, 2016 at 11:01 AM, Ayoub Benali >> <benali.ayoub.i...@gmail.com> wrote: >> >> > the problem now is that when I consume the dataframe for example with >> count >> > I get the stack trace below. >> >> Mind sharing the entire pipeline? >> >> > I followed the implementation of TextSocketSourceProvider to implement >> my >> > data source and Text Socket source is used in the official documentation >> > here. >> >> Right. Completely forgot about the provider. Thanks for reminding me >> about it! >> >> Pozdrawiam, >> Jacek Laskowski >> >> https://medium.com/@jaceklaskowski/ >> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark >> Follow me at https://twitter.com/jaceklaskowski >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >>
Re: spark 2.0 readStream from a REST API
Hello, here is the code I am trying to run: https://gist.github.com/ayoub-benali/a96163c711b4fce1bdddf16b911475f2 Thanks, Ayoub. 2016-08-01 13:44 GMT+02:00 Jacek Laskowski <ja...@japila.pl>: > On Mon, Aug 1, 2016 at 11:01 AM, Ayoub Benali > <benali.ayoub.i...@gmail.com> wrote: > > > the problem now is that when I consume the dataframe for example with > count > > I get the stack trace below. > > Mind sharing the entire pipeline? > > > I followed the implementation of TextSocketSourceProvider to implement my > > data source and Text Socket source is used in the official documentation > > here. > > Right. Completely forgot about the provider. Thanks for reminding me about > it! > > Pozdrawiam, > Jacek Laskowski > > https://medium.com/@jaceklaskowski/ > Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark > Follow me at https://twitter.com/jaceklaskowski >
Re: spark 2.0 readStream from a REST API
Hello, using the full class name worked, thanks. the problem now is that when I consume the dataframe for example with count I get the stack trace below. I followed the implementation of TextSocketSourceProvider <https://github.com/apache/spark/blob/9e2c763dbb5ac6fc5d2eb0759402504d4b9073a4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala#L128> to implement my data source and Text Socket source is used in the official documentation here <https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#quick-example> . Why does count works in the example documentation? is there some other trait that need to be implemented ? Thanks, Ayoub. org.apache.spark.sql.AnalysisException: Queries with streaming sources must > be executed with writeStream.start(); > at > org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:173) > at > org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:33) > at > org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:31) > at > org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125) > at scala.collection.immutable.List.foreach(List.scala:318) > at > org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125) > at > org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:31) > at > org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:59) > at > org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:70) > at > org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:68) > at > org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:74) > at > org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:74) > at > org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:78) > at > org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:76) > at > org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:83) > at > org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:83) > at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2541) > at org.apache.spark.sql.Dataset.count(Dataset.scala:2216) 2016-07-31 21:56 GMT+02:00 Michael Armbrust <mich...@databricks.com>: > You have to add a file in resource too (example > <https://github.com/apache/spark/blob/master/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister>). > Either that or give a full class name. > > On Sun, Jul 31, 2016 at 9:45 AM, Ayoub Benali <benali.ayoub.i...@gmail.com > > wrote: > >> Looks like the way to go in spark 2.0 is to implement >> StreamSourceProvider >> <https://github.com/apache/spark/blob/9e2c763dbb5ac6fc5d2eb0759402504d4b9073a4/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L117> >> with DataSourceRegister >> <https://github.com/apache/spark/blob/9e2c763dbb5ac6fc5d2eb0759402504d4b9073a4/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L40>. >> But now spark fails at loading the class when doing: >> >> spark.readStream.format("mysource").load() >> >> I get : >> >> java.lang.ClassNotFoundException: Failed to find data source: mysource. >> Please find packages at http://spark-packages.org >> >> Is there something I need to do in order to "load" the Stream source >> provider ? >> >> Thanks, >> Ayoub >> >> 2016-07-31 17:19 GMT+02:00 Jacek Laskowski <ja...@japila.pl>: >> >>> On Sun, Jul 31, 2016 at 12:53 PM, Ayoub Benali >>> <benali.ayoub.i...@gmail.com> wrote: >>> >>> > I started playing with the Structured Streaming API in spark 2.0 and I >>> am >>> > looking for a way to create streaming Dataset/Dataframe from a rest >>> HTTP >>> > endpoint but I am bit stuck. >>> >>> What a great idea! Why did I myself not think about this?!?! >>> >>> > What would be the easiest way to hack around it ? Do
Re: spark 2.0 readStream from a REST API
Looks like the way to go in spark 2.0 is to implement StreamSourceProvider <https://github.com/apache/spark/blob/9e2c763dbb5ac6fc5d2eb0759402504d4b9073a4/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L117> with DataSourceRegister <https://github.com/apache/spark/blob/9e2c763dbb5ac6fc5d2eb0759402504d4b9073a4/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L40>. But now spark fails at loading the class when doing: spark.readStream.format("mysource").load() I get : java.lang.ClassNotFoundException: Failed to find data source: mysource. Please find packages at http://spark-packages.org Is there something I need to do in order to "load" the Stream source provider ? Thanks, Ayoub 2016-07-31 17:19 GMT+02:00 Jacek Laskowski <ja...@japila.pl>: > On Sun, Jul 31, 2016 at 12:53 PM, Ayoub Benali > <benali.ayoub.i...@gmail.com> wrote: > > > I started playing with the Structured Streaming API in spark 2.0 and I am > > looking for a way to create streaming Dataset/Dataframe from a rest HTTP > > endpoint but I am bit stuck. > > What a great idea! Why did I myself not think about this?!?! > > > What would be the easiest way to hack around it ? Do I need to implement > the > > Datasource API ? > > Yes and perhaps Hadoop API too, but not sure which one exactly since I > haven't even thought about it (not even once). > > > Are there examples on how to create a DataSource from a REST endpoint ? > > Never heard of one. > > I'm hosting a Spark/Scala meetup this week so I'll definitely propose > it as a topic. Thanks a lot! > > Pozdrawiam, > Jacek Laskowski > > https://medium.com/@jaceklaskowski/ > Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark > Follow me at https://twitter.com/jaceklaskowski >
spark 2.0 readStream from a REST API
Hello, I started playing with the Structured Streaming API in spark 2.0 and I am looking for a way to create streaming Dataset/Dataframe from a rest HTTP endpoint but I am bit stuck. "readStream" in SparkSession has a json method but this one is expecting a path (s3, hdfs, etc) and I want to avoid having to save the data on s3 and then read again. What would be the easiest way to hack around it ? Do I need to implement the Datasource API ? Are there examples on how to create a DataSource from a REST endpoint ? Best, Ayoub
[spark1.5.1] HiveQl.parse throws org.apache.spark.sql.AnalysisException: null
Hello, when upgrading to spark 1.5.1 from 1.4.1 the following code crashed on runtime. It is mainly used to parse HiveQL queries and check that they are valid. package org.apache.spark.sql.hive val sql = "CREATE EXTERNAL TABLE IF NOT EXISTS `t`(`id` STRING, `foo` INT) PARTITIONED BY (year INT, month INT, day INT) STORED AS PARQUET Location 'temp'" HiveQl.parseSql(sql) org.apache.spark.sql.AnalysisException: null; at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:303) at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41) at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:40) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890) at scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:34) at org.apache.spark.sql.hive.HiveQl$.parseSql(HiveQl.scala:277) at org.apache.spark.sql.hive.SQLChecker$$anonfun$1.apply(SQLChecker.scala:9) at org.apache.spark.sql.hive.SQLChecker$$anonfun$1.apply(SQLChecker.scala:9) Should that be done differently on spark 1.5.1 ? Thanks, Ayoub -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark1-5-1-HiveQl-parse-throws-org-apache-spark-sql-AnalysisException-null-tp25138.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RDD[Future[T]] = Future[RDD[T]]
do you mean something like this ? val values = rdd.mapPartitions{ i: Iterator[Future[T]] = val future: Future[Iterator[T]] = Future sequence i Await result (future, someTimeout) } Where is the blocking happening in this case? It seems to me that all the workers will be blocked until the future is completed, no ? 2015-07-27 7:24 GMT+02:00 Nick Pentreath nick.pentre...@gmail.com: You could use Iterator.single on the future[iterator]. However if you collect all the partitions I'm not sure if it will work across executor boundaries. Perhaps you may need to await the sequence of futures in each partition and return the resulting iterator. — Sent from Mailbox https://www.dropbox.com/mailbox On Sun, Jul 26, 2015 at 10:43 PM, Ayoub Benali benali.ayoub.i...@gmail.com wrote: It doesn't work because mapPartitions expects a function f:(Iterator[T]) ⇒ Iterator[U] while .sequence wraps the iterator in a Future 2015-07-26 22:25 GMT+02:00 Ignacio Blasco elnopin...@gmail.com: Maybe using mapPartitions and .sequence inside it? El 26/7/2015 10:22 p. m., Ayoub benali.ayoub.i...@gmail.com escribió: Hello, I am trying to convert the result I get after doing some async IO : val rdd: RDD[T] = // some rdd val result: RDD[Future[T]] = rdd.map(httpCall) Is there a way collect all futures once they are completed in a *non blocking* (i.e. without scala.concurrent Await) and lazy way? If the RDD was a standard scala collection then calling scala.concurrent.Future.sequence would have resolved the issue but RDD is not a TraversableOnce (which is required by the method). Is there a way to do this kind of transformation with an RDD[Future[T]] ? Thanks, Ayoub. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-Future-T-Future-RDD-T-tp24000.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Re-RDD-Future-T-Future-RDD-T-tp24005.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: RDD[Future[T]] = Future[RDD[T]]
It doesn't work because mapPartitions expects a function f:(Iterator[T]) ⇒ Iterator[U] while .sequence wraps the iterator in a Future 2015-07-26 22:25 GMT+02:00 Ignacio Blasco elnopin...@gmail.com: Maybe using mapPartitions and .sequence inside it? El 26/7/2015 10:22 p. m., Ayoub benali.ayoub.i...@gmail.com escribió: Hello, I am trying to convert the result I get after doing some async IO : val rdd: RDD[T] = // some rdd val result: RDD[Future[T]] = rdd.map(httpCall) Is there a way collect all futures once they are completed in a *non blocking* (i.e. without scala.concurrent Await) and lazy way? If the RDD was a standard scala collection then calling scala.concurrent.Future.sequence would have resolved the issue but RDD is not a TraversableOnce (which is required by the method). Is there a way to do this kind of transformation with an RDD[Future[T]] ? Thanks, Ayoub. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-Future-T-Future-RDD-T-tp24000.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RDD[Future[T]] = Future[RDD[T]]
Hello, I am trying to convert the result I get after doing some async IO : val rdd: RDD[T] = // some rdd val result: RDD[Future[T]] = rdd.map(httpCall) Is there a way collect all futures once they are completed in a *non blocking* (i.e. without scala.concurrent Await) and lazy way? If the RDD was a standard scala collection then calling scala.concurrent.Future.sequence would have resolved the issue but RDD is not a TraversableOnce (which is required by the method). Is there a way to do this kind of transformation with an RDD[Future[T]] ? Thanks, Ayoub. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-Future-T-Future-RDD-T-tp24000.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[spark1.4] sparkContext.stop causes exception on Mesos
Hello Spark developers, After upgrading to spark 1.4 on Mesos 0.22.1 existing code started to throw this exception when calling sparkContext.stop : (SparkListenerBus) [ERROR - org.apache.spark.Logging$class.logError(Logging.scala:96)] Listener EventLoggingListener threw an exception java.lang.reflect.InvocationTargetException at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:146) at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:146) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:146) at org.apache.spark.scheduler.EventLoggingListener.onApplicationEnd(EventLoggingListener.scala:190) at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:54) at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31) at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31) at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:56) at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:37) at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:79) at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1215) at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63) Caused by: java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:730) at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:1855) at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1816) at org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130) ... 16 more I0701 15:03:46.101809 1612 sched.cpp:1589] Asked to stop the driver I0701 15:03:46.101971 1355 sched.cpp:831] Stopping framework '20150629-132734-1224736778-5050-6126-0028' This problems happens only when spark.eventLog.enabled flag is set to true, it happens also if sparkContext.stop is omitted in the code, I think because Spark shut down indirectly the spark context. Does anyone know what could cause this problem ? Thanks, Ayoub. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark1-4-sparkContext-stop-causes-exception-on-Mesos-tp23605.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RDD to InputStream
In case it would interest other peoples, here is what I come up with and it seems to work fine: case class RDDAsInputStream(private val rdd: RDD[String]) extends java.io.InputStream { var bytes = rdd.flatMap(_.getBytes(UTF-8)).toLocalIterator def read(): Int = { if(bytes.hasNext) bytes.next.toInt else -1 } override def markSupported(): Boolean = false } 2015-03-13 13:56 GMT+01:00 Sean Owen so...@cloudera.com: OK, then you do not want to collect() the RDD. You can get an iterator, yes. There is no such thing as making an Iterator into an InputStream. An Iterator is a sequence of arbitrary objects; an InputStream is a channel to a stream of bytes. I think you can employ similar Guava / Commons utilities to make an Iterator of Streams in a stream of Readers, join the Readers, and encode the result as bytes in an InputStream. On Fri, Mar 13, 2015 at 10:33 AM, Ayoub benali.ayoub.i...@gmail.com wrote: Thanks Sean, I forgot to mention that the data is too big to be collected on the driver. So yes your proposition would work in theory but in my case I cannot hold all the data in the driver memory, therefore it wouldn't work. I guess the crucial point to to do the collect in a lazy way and in that subject I noticed that we can get a local iterator from an RDD but that rises two questions: - does that involves an mediate collect just like with collect() or is it lazy process ? - how to go from an iterator to an InputStream ? 2015-03-13 11:17 GMT+01:00 Sean Owen [hidden email]: These are quite different creatures. You have a distributed set of Strings, but want a local stream of bytes, which involves three conversions: - collect data to driver - concatenate strings in some way - encode strings as bytes according to an encoding Your approach is OK but might be faster to avoid disk, if you have enough memory: - collect() to a Array[String] locally - use Guava utilities to turn a bunch of Strings into a Reader - Use the Apache Commons ReaderInputStream to read it as encoded bytes I might wonder if that's all really what you want to do though. On Fri, Mar 13, 2015 at 9:54 AM, Ayoub [hidden email] wrote: Hello, I need to convert an RDD[String] to a java.io.InputStream but I didn't find an east way to do it. Currently I am saving the RDD as temporary file and then opening an inputstream on the file but that is not really optimal. Does anybody know a better way to do that ? Thanks, Ayoub. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-to-InputStream-tp22031.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: [hidden email] For additional commands, e-mail: [hidden email] View this message in context: Re: RDD to InputStream Sent from the Apache Spark User List mailing list archive at Nabble.com. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Re-RDD-to-InputStream-tp22121.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: RDD to InputStream
Thanks Sean, I forgot to mention that the data is too big to be collected on the driver. So yes your proposition would work in theory but in my case I cannot hold all the data in the driver memory, therefore it wouldn't work. I guess the crucial point to to do the collect in a lazy way and in that subject I noticed that we can get a local iterator from an RDD but that rises two questions: - does that involves an mediate collect just like with collect() or is it lazy process ? - how to go from an iterator to an InputStream ? 2015-03-13 11:17 GMT+01:00 Sean Owen so...@cloudera.com: These are quite different creatures. You have a distributed set of Strings, but want a local stream of bytes, which involves three conversions: - collect data to driver - concatenate strings in some way - encode strings as bytes according to an encoding Your approach is OK but might be faster to avoid disk, if you have enough memory: - collect() to a Array[String] locally - use Guava utilities to turn a bunch of Strings into a Reader - Use the Apache Commons ReaderInputStream to read it as encoded bytes I might wonder if that's all really what you want to do though. On Fri, Mar 13, 2015 at 9:54 AM, Ayoub benali.ayoub.i...@gmail.com wrote: Hello, I need to convert an RDD[String] to a java.io.InputStream but I didn't find an east way to do it. Currently I am saving the RDD as temporary file and then opening an inputstream on the file but that is not really optimal. Does anybody know a better way to do that ? Thanks, Ayoub. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-to-InputStream-tp22031.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Re-RDD-to-InputStream-tp22032.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: [hive context] Unable to query array once saved as parquet
Hi, as I was trying to find a work around until this bug will be fixed, I discovered an other bug posted here: https://issues.apache.org/jira/browse/SPARK-5775 For those who might had the same issue, one could use the LOAD sql command in a hive context to load the parquet file into the table as long it not partitioned. The queries work fine after that. Best, Ayoub. 2015-01-31 4:05 GMT+01:00 Cheng Lian lian.cs@gmail.com: According to the Gist Ayoub provided, the schema is fine. I reproduced this issue locally, it should be bug, but I don't think it's related to SPARK-5236. Will investigate this soon. Ayoub - would you mind to help to file a JIRA for this issue? Thanks! Cheng On 1/30/15 11:28 AM, Michael Armbrust wrote: Is it possible that your schema contains duplicate columns or column with spaces in the name? The parquet library will often give confusing error messages in this case. On Fri, Jan 30, 2015 at 10:33 AM, Ayoub benali.ayoub.i...@gmail.com wrote: Hello, I have a problem when querying, with a hive context on spark 1.2.1-snapshot, a column in my table which is nested data structure like an array of struct. The problems happens only on the table stored as parquet, while querying the Schema RDD saved, as a temporary table, don't lead to any exception. my steps are: 1) reading JSON file 2) creating a schema RDD and saving it as a tmp table 3) creating an external table in hive meta store saved as parquet file 4) inserting the data from the tmp table to the persisted table 5) queering the persisted table lead to this exception: select data.field1 from persisted_table LATERAL VIEW explode(data_array) nestedStuff AS data parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file hdfs://***/test_table/part-1 at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to (TraversableOnce.scala:273) at http://scala.collection.AbstractIterator.to scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797) at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 at java.util.ArrayList.rangeCheck(ArrayList.java:635) at java.util.ArrayList.get(ArrayList.java:411) at parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:99) at parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:99) at parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:99) at parquet.io.PrimitiveColumnIO.getFirst(PrimitiveColumnIO.java:99) at parquet.io.PrimitiveColumnIO.isFirst(PrimitiveColumnIO.java:94) at parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:274) at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131) at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96) at parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136) at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96
Re: Parquet compression codecs not applied
I was using hive context an not sql context, therefore (SET spark.sql.parquet.compression.codec=gzip) was ignored. Michael Armbrust pointed out that parquet.compression should be used instead, witch solved the issue. I am still wondering if this behavior is normal, it would be better if spark.sql.parquet.compression.codec would be translated to parquet.compression in case of hive context. Other wise the documentation should be updated to be more precise. 2015-02-04 19:13 GMT+01:00 sahanbull sa...@skimlinks.com: Hi Ayoub, You could try using the sql format to set the compression type: sc = SparkContext() sqc = SQLContext(sc) sqc.sql(SET spark.sql.parquet.compression.codec=gzip) You get a notification on screen while running the spark job when you set the compression codec like this. I havent compared it with different compression methods, Please let the mailing list knows if this works for you. Best Sahan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-compression-codecs-not-applied-tp21058p21498.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Re-Parquet-compression-codecs-not-applied-tp21499.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How to get Hive table schema using Spark SQL or otherwise
Given a hive context you could execute: hiveContext.sql(describe TABLE_NAME) you would get the name of the fields and their types 2015-02-04 21:47 GMT+01:00 nitinkak001 nitinkak...@gmail.com: I want to get a Hive table schema details into Spark. Specifically, I want to get column name and type information. Is it possible to do it e.g using JavaSchemaRDD or something else? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-Hive-table-schema-using-Spark-SQL-or-otherwise-tp21501.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Re-How-to-get-Hive-table-schema-using-Spark-SQL-or-otherwise-tp21502.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: [hive context] Unable to query array once saved as parquet
Hi, given the current open issue: https://issues.apache.org/jira/browse/SPARK-5508 I cannot use HiveQL to insert schemaRDD data into a table if one of the columns is an Array of Struct. using the spark API, Is it possible to insert schema RDD into an existing and *partitioned* table ? the method insertInto on schema RDD does take only the name of the table. Thanks, Ayoub. 2015-01-31 22:30 GMT+01:00 Ayoub Benali benali.ayoub.i...@gmail.com: Hello, as asked, I just filled this JIRA issue https://issues.apache.org/jira/browse/SPARK-5508. I will add an other similar code example which lead to GenericRow cannot be cast to org.apache.spark.sql.catalyst.expressions.SpecificMutableRow Exception. Best, Ayoub. 2015-01-31 4:05 GMT+01:00 Cheng Lian lian.cs@gmail.com: According to the Gist Ayoub provided, the schema is fine. I reproduced this issue locally, it should be bug, but I don't think it's related to SPARK-5236. Will investigate this soon. Ayoub - would you mind to help to file a JIRA for this issue? Thanks! Cheng On 1/30/15 11:28 AM, Michael Armbrust wrote: Is it possible that your schema contains duplicate columns or column with spaces in the name? The parquet library will often give confusing error messages in this case. On Fri, Jan 30, 2015 at 10:33 AM, Ayoub benali.ayoub.i...@gmail.com wrote: Hello, I have a problem when querying, with a hive context on spark 1.2.1-snapshot, a column in my table which is nested data structure like an array of struct. The problems happens only on the table stored as parquet, while querying the Schema RDD saved, as a temporary table, don't lead to any exception. my steps are: 1) reading JSON file 2) creating a schema RDD and saving it as a tmp table 3) creating an external table in hive meta store saved as parquet file 4) inserting the data from the tmp table to the persisted table 5) queering the persisted table lead to this exception: select data.field1 from persisted_table LATERAL VIEW explode(data_array) nestedStuff AS data parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file hdfs://***/test_table/part-1 at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to (TraversableOnce.scala:273) at http://scala.collection.AbstractIterator.to scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797) at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 at java.util.ArrayList.rangeCheck(ArrayList.java:635) at java.util.ArrayList.get(ArrayList.java:411) at parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:99) at parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:99) at parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:99) at parquet.io.PrimitiveColumnIO.getFirst(PrimitiveColumnIO.java:99) at parquet.io.PrimitiveColumnIO.isFirst(PrimitiveColumnIO.java:94
Re: [hive context] Unable to query array once saved as parquet
Hello, as asked, I just filled this JIRA issue https://issues.apache.org/jira/browse/SPARK-5508. I will add an other similar code example which lead to GenericRow cannot be cast to org.apache.spark.sql.catalyst.expressions.SpecificMutableRow Exception. Best, Ayoub. 2015-01-31 4:05 GMT+01:00 Cheng Lian lian.cs@gmail.com: According to the Gist Ayoub provided, the schema is fine. I reproduced this issue locally, it should be bug, but I don't think it's related to SPARK-5236. Will investigate this soon. Ayoub - would you mind to help to file a JIRA for this issue? Thanks! Cheng On 1/30/15 11:28 AM, Michael Armbrust wrote: Is it possible that your schema contains duplicate columns or column with spaces in the name? The parquet library will often give confusing error messages in this case. On Fri, Jan 30, 2015 at 10:33 AM, Ayoub benali.ayoub.i...@gmail.com wrote: Hello, I have a problem when querying, with a hive context on spark 1.2.1-snapshot, a column in my table which is nested data structure like an array of struct. The problems happens only on the table stored as parquet, while querying the Schema RDD saved, as a temporary table, don't lead to any exception. my steps are: 1) reading JSON file 2) creating a schema RDD and saving it as a tmp table 3) creating an external table in hive meta store saved as parquet file 4) inserting the data from the tmp table to the persisted table 5) queering the persisted table lead to this exception: select data.field1 from persisted_table LATERAL VIEW explode(data_array) nestedStuff AS data parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file hdfs://***/test_table/part-1 at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to (TraversableOnce.scala:273) at http://scala.collection.AbstractIterator.to scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797) at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 at java.util.ArrayList.rangeCheck(ArrayList.java:635) at java.util.ArrayList.get(ArrayList.java:411) at parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:99) at parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:99) at parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:99) at parquet.io.PrimitiveColumnIO.getFirst(PrimitiveColumnIO.java:99) at parquet.io.PrimitiveColumnIO.isFirst(PrimitiveColumnIO.java:94) at parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:274) at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131) at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96) at parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136) at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96) at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126
Re: HiveContext created SchemaRDD's saveAsTable is not working on 1.2.0
I am not personally aware of a repo for snapshot builds. In my use case, I had to build spark 1.2.1-snapshot see https://spark.apache.org/docs/latest/building-spark.html 2015-01-30 17:11 GMT+01:00 Debajyoti Roy debajyoti@healthagen.com: Thanks Ayoub and Zhan, I am new to spark and wanted to make sure i am not trying something stupid or using a wrong API. Is there a repo where i can pull the snapshot or nighly builds for spark ? On Fri, Jan 30, 2015 at 2:45 AM, Ayoub Benali benali.ayoub.i...@gmail.com wrote: Hello, I had the same issue then I found this JIRA ticket https://issues.apache.org/jira/browse/SPARK-4825 So I switched to Spark 1.2.1-snapshot witch solved the problem. 2015-01-30 8:40 GMT+01:00 Zhan Zhang zzh...@hortonworks.com: I think it is expected. Refer to the comments in saveAsTable Note that this currently only works with SchemaRDDs that are created from a HiveContext. If I understand correctly, here the SchemaRDD means those generated by HiveContext.sql, instead of applySchema. Thanks. Zhan Zhang On Jan 29, 2015, at 9:38 PM, matroyd debajyoti@healthagen.com wrote: Hi, I am trying saveAsTable on SchemaRDD created from HiveContext and it fails. This is on Spark 1.2.0. Following are details of the code, command and exceptions: http://stackoverflow.com/questions/28222496/how-to-enable-sql-on-schemardd-via-the-jdbc-interface-is-it-even-possible http://cp.mcafee.com/d/5fHCMUe6zqb2qrVEVso73ztPqdT7HLIcII6QrK6zBxNB55cSztNWXX3bbVJ6XxEVvjKOMedywEjS1454WNDm1yIiJendAWNDm1yIiJendS76zB4xpx_HYO-OMe7tuVtdBBzD1NEVKCqeumKzp55l55zBgY-F6lK1FJ4SOrLOtXTLuZXTdTdw0WjSNmFDUKDRcsLwITZ9OH2C9L9FFI6zBP2tj1uti_MQwvVuwIunH0LiNfRtywwncOQLcDVshg8mbOxfUKXrOYG5Vjb-p-1tqJai87-rrFYiYvCT61tdZxZYKa2xfo4jytoPH0Nm9mDbwGySNaZGSS9_M04SOevpdxuhDNaI9-7Pd45E_I_gd40NoDWKwIe3zhfgQgkQPUxg4WgfYQgiEq88lCq835oBpg8Cy2I3h0zOvndK3zsm65OVJP Thanks in advance for any guidance -- View this message in context: HiveContext created SchemaRDD's saveAsTable is not working on 1.2.0 http://cp.mcafee.com/d/FZsScxNJ5xddYQsKc3xNKVJ6XzRTS6mm3qdT3hOMUOyyCrhKUZtZxBBYSztMQsLFTpo76Ngk9X0y2ytoPH0Nm9mDbCOtoPH0Nm9mDbCX3zhOygIM_R-pvpo73KLsKCOONPwUQsTjd7fbnhIyyGyyNOEuvkzaT0QSCrpdTVeZXTLuZXCXCM0uHroDVuySNaBSWv4KvaA-hLt5ZO_gpW6A21_YLwnApYiH2vxYOZE_I_gbz5yvGW2MUebQbQQPUxg4WgfYLaxu5pBP5oBpg8BWMbEBQOZOYRAQm6me1NJKDNbN-rso5QTS7TOUEa4Zwhe9RzeI35oBqsK2Gbr4HSHroD_00jr8VZAS5V6v4GMDUvcQgmz-PZ0Qg35yvGW2MUed4Z3h1jjfy50jF0_Ph1axEwxmpEwclylB0yq8aMd42f9ZsSUedGaVp Sent from the Apache Spark User List mailing list archive http://cp.mcafee.com/d/k-Kr43qb2qrVEVso73ztPqdT7HLIcII6QrK6zBxNB55cSztNWXX3bbVJ6XxEVvjKOMedywEjS1454WNDm1yIiJendAWNDm1yIiJendS76zB4xpx_HYO-OMe7tuVtdBBzD1NEVKCqeumKzp55l55zBgY-F6lK1FJASOrLOtXTLuZXTdTdw0ZmSNfOZ5JylbJQ-9s-l9YzuWbXB-wPQd843_Vv3rtfynzYSUMbFLIfLBNgk9X0ysjH6to6aNaQVs5kmS9nJmSNf-00CShPX9IbOc-9lxfM-pEwJ7ZDW1Ew6b4_lQ5xMsq9W6y2CCv4a0Di1_Cy2l3h12IPh0oH4Ha14Qglwq84ujWVJMsrlDSD at Nabble.com. -- Thanks, *Debajyoti Roy* debajyoti@healthagen.com (646)561-0844 646-561-0844 350 Madison Ave., FL 16, New York, NY 10017. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Re-HiveContext-created-SchemaRDD-s-saveAsTable-is-not-working-on-1-2-0-tp21442.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
[hive context] Unable to query array once saved as parquet
(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) The full code leading to this issue is available here: gist https://gist.github.com/ayoub-benali/54d6f3b8635530e4e936 Could the problem comes from the way I insert the data into the table ? Is this problem related to this JIRA ticket https://issues.apache.org/jira/browse/SPARK-5236 ? Because I got a similar exception GenericRow cannot be cast to org.apache.spark.sql.catalyst.expressions.SpecificMutableRow With an other table that contains also a array of struct. Thanks, Ayoub. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/hive-context-Unable-to-query-array-once-saved-as-parquet-tp21446.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: [hive context] Unable to query array once saved as parquet
No it is not the case, here is the gist to reproduce the issue https://gist.github.com/ayoub-benali/54d6f3b8635530e4e936 On Jan 30, 2015 8:29 PM, Michael Armbrust mich...@databricks.com wrote: Is it possible that your schema contains duplicate columns or column with spaces in the name? The parquet library will often give confusing error messages in this case. On Fri, Jan 30, 2015 at 10:33 AM, Ayoub benali.ayoub.i...@gmail.com wrote: Hello, I have a problem when querying, with a hive context on spark 1.2.1-snapshot, a column in my table which is nested data structure like an array of struct. The problems happens only on the table stored as parquet, while querying the Schema RDD saved, as a temporary table, don't lead to any exception. my steps are: 1) reading JSON file 2) creating a schema RDD and saving it as a tmp table 3) creating an external table in hive meta store saved as parquet file 4) inserting the data from the tmp table to the persisted table 5) queering the persisted table lead to this exception: select data.field1 from persisted_table LATERAL VIEW explode(data_array) nestedStuff AS data parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file hdfs://***/test_table/part-1 at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to (TraversableOnce.scala:273) at http://scala.collection.AbstractIterator.to scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797) at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 at java.util.ArrayList.rangeCheck(ArrayList.java:635) at java.util.ArrayList.get(ArrayList.java:411) at parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:99) at parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:99) at parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:99) at parquet.io.PrimitiveColumnIO.getFirst(PrimitiveColumnIO.java:99) at parquet.io.PrimitiveColumnIO.isFirst(PrimitiveColumnIO.java:94) at parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:274) at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131) at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96) at parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136) at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96) at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126) at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193) ... 28 more Driver stacktrace: at http://org.apache.spark.scheduler.DAGScheduler.org org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203
Re: SQL query over (Long, JSON string) tuples
Hello, SQLContext and hiveContext have a jsonRDD method which accept an RDD[String] where the string is a JSON String a returns a SchemaRDD, it extends RDD[Row] which the type you want. After words you should be able to do a join to keep your tuple. Best, Ayoub. 2015-01-29 10:12 GMT+01:00 Tobias Pfeiffer t...@preferred.jp: Hi, I have data as RDD[(Long, String)], where the Long is a timestamp and the String is a JSON-encoded string. I want to infer the schema of the JSON and then do a SQL statement on the data (no aggregates, just column selection and UDF application), but still have the timestamp associated with each row of the result. I completely fail to see how that would be possible. Any suggestions? I can't even see how I would get an RDD[(Long, Row)] so that I *might* be able to add the timestamp to the row after schema inference. Is there *any* way other than string-manipulating the JSON string and adding the timestamp to it? Thanks Tobias -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Re-SQL-query-over-Long-JSON-string-tuples-tp21419.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Fwd: HiveContext created SchemaRDD's saveAsTable is not working on 1.2.0
Hello, I had the same issue then I found this JIRA ticket https://issues.apache.org/jira/browse/SPARK-4825 So I switched to Spark 1.2.1-snapshot witch solved the problem. 2015-01-30 8:40 GMT+01:00 Zhan Zhang zzh...@hortonworks.com: I think it is expected. Refer to the comments in saveAsTable Note that this currently only works with SchemaRDDs that are created from a HiveContext. If I understand correctly, here the SchemaRDD means those generated by HiveContext.sql, instead of applySchema. Thanks. Zhan Zhang On Jan 29, 2015, at 9:38 PM, matroyd debajyoti@healthagen.com wrote: Hi, I am trying saveAsTable on SchemaRDD created from HiveContext and it fails. This is on Spark 1.2.0. Following are details of the code, command and exceptions: http://stackoverflow.com/questions/28222496/how-to-enable-sql-on-schemardd-via-the-jdbc-interface-is-it-even-possible http://cp.mcafee.com/d/5fHCMUe6zqb2qrVEVso73ztPqdT7HLIcII6QrK6zBxNB55cSztNWXX3bbVJ6XxEVvjKOMedywEjS1454WNDm1yIiJendAWNDm1yIiJendS76zB4xpx_HYO-OMe7tuVtdBBzD1NEVKCqeumKzp55l55zBgY-F6lK1FJ4SOrLOtXTLuZXTdTdw0WjSNmFDUKDRcsLwITZ9OH2C9L9FFI6zBP2tj1uti_MQwvVuwIunH0LiNfRtywwncOQLcDVshg8mbOxfUKXrOYG5Vjb-p-1tqJai87-rrFYiYvCT61tdZxZYKa2xfo4jytoPH0Nm9mDbwGySNaZGSS9_M04SOevpdxuhDNaI9-7Pd45E_I_gd40NoDWKwIe3zhfgQgkQPUxg4WgfYQgiEq88lCq835oBpg8Cy2I3h0zOvndK3zsm65OVJP Thanks in advance for any guidance -- View this message in context: HiveContext created SchemaRDD's saveAsTable is not working on 1.2.0 http://cp.mcafee.com/d/FZsScxNJ5xddYQsKc3xNKVJ6XzRTS6mm3qdT3hOMUOyyCrhKUZtZxBBYSztMQsLFTpo76Ngk9X0y2ytoPH0Nm9mDbCOtoPH0Nm9mDbCX3zhOygIM_R-pvpo73KLsKCOONPwUQsTjd7fbnhIyyGyyNOEuvkzaT0QSCrpdTVeZXTLuZXCXCM0uHroDVuySNaBSWv4KvaA-hLt5ZO_gpW6A21_YLwnApYiH2vxYOZE_I_gbz5yvGW2MUebQbQQPUxg4WgfYLaxu5pBP5oBpg8BWMbEBQOZOYRAQm6me1NJKDNbN-rso5QTS7TOUEa4Zwhe9RzeI35oBqsK2Gbr4HSHroD_00jr8VZAS5V6v4GMDUvcQgmz-PZ0Qg35yvGW2MUed4Z3h1jjfy50jF0_Ph1axEwxmpEwclylB0yq8aMd42f9ZsSUedGaVp Sent from the Apache Spark User List mailing list archive http://cp.mcafee.com/d/k-Kr43qb2qrVEVso73ztPqdT7HLIcII6QrK6zBxNB55cSztNWXX3bbVJ6XxEVvjKOMedywEjS1454WNDm1yIiJendAWNDm1yIiJendS76zB4xpx_HYO-OMe7tuVtdBBzD1NEVKCqeumKzp55l55zBgY-F6lK1FJASOrLOtXTLuZXTdTdw0ZmSNfOZ5JylbJQ-9s-l9YzuWbXB-wPQd843_Vv3rtfynzYSUMbFLIfLBNgk9X0ysjH6to6aNaQVs5kmS9nJmSNf-00CShPX9IbOc-9lxfM-pEwJ7ZDW1Ew6b4_lQ5xMsq9W6y2CCv4a0Di1_Cy2l3h12IPh0oH4Ha14Qglwq84ujWVJMsrlDSD at Nabble.com. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Fwd-HiveContext-created-SchemaRDD-s-saveAsTable-is-not-working-on-1-2-0-tp21436.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: SQL JSON array operations
You could try yo use hive context which bring HiveQL, it would allow you to query nested structures using LATERAL VIEW explode... On Jan 15, 2015 4:03 PM, jvuillermet jeremy.vuiller...@gmail.com wrote: let's say my json file lines looks like this {user: baz, tags : [foo, bar] } sqlContext.jsonFile(data.json) ... How could I query for user with bar tags using SQL sqlContext.sql(select user from users where tags ?contains? 'bar' ) I could simplify the request and use the returned RDD to filter on tags but I'm exploring an app where users can write their SQL queries -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SQL-JSON-array-operations-tp21164.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SQL JSON array operations
You could try to use hive context which bring HiveQL, it would allow you to query nested structures using LATERAL VIEW explode... see doc https://cwiki.apache.org/confluence/display/Hive/LanguageManual+LateralView here -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SQL-JSON-array-operations-tp21164p21172.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Parquet compression codecs not applied
it worked thanks. this doc page https://spark.apache.org/docs/1.2.0/sql-programming-guide.htmlrecommends to use spark.sql.parquet.compression.codec to set the compression coded and I thought this setting would be forwarded to the hive context given that HiveContext extends SQLContext, but it was not. I am wondering if this behavior is normal, if not I could open an issue with a potential fix so that spark.sql.parquet.compression.codec would be translated to parquet.compression in the hive context ? Or the documentation should be updated to mention that the compression coded is set differently with HiveContext. Ayoub. 2015-01-09 17:51 GMT+01:00 Michael Armbrust mich...@databricks.com: This is a little confusing, but that code path is actually going through hive. So the spark sql configuration does not help. Perhaps, try: set parquet.compression=GZIP; On Fri, Jan 9, 2015 at 2:41 AM, Ayoub benali.ayoub.i...@gmail.com wrote: Hello, I tried to save a table created via the hive context as a parquet file but whatever compression codec (uncompressed, snappy, gzip or lzo) I set via setConf like: setConf(spark.sql.parquet.compression.codec, gzip) the size of the generated files is the always the same, so it seems like spark context ignores the compression codec that I set. Here is a code sample applied via the spark shell: import org.apache.spark.sql.hive.HiveContext val hiveContext = new HiveContext(sc) hiveContext.sql(SET hive.exec.dynamic.partition = true) hiveContext.sql(SET hive.exec.dynamic.partition.mode = nonstrict) hiveContext.setConf(spark.sql.parquet.binaryAsString, true) // required to make data compatible with impala hiveContext.setConf(spark.sql.parquet.compression.codec, gzip) hiveContext.sql(create external table if not exists foo (bar STRING, ts INT) Partitioned by (year INT, month INT, day INT) STORED AS PARQUET Location 'hdfs://path/data/foo') hiveContext.sql(insert into table foo partition(year, month,day) select *, year(from_unixtime(ts)) as year, month(from_unixtime(ts)) as month, day(from_unixtime(ts)) as day from raw_foo) I tried that with spark 1.2 and 1.3 snapshot against hive 0.13 and I also tried that with Impala on the same cluster which applied correctly the compression codecs. Does anyone know what could be the problem ? Thanks, Ayoub. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-compression-codecs-not-applied-tp21058.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Parquet compression codecs not applied
Hello, I tried to save a table created via the hive context as a parquet file but whatever compression codec (uncompressed, snappy, gzip or lzo) I set via setConf like: setConf(spark.sql.parquet.compression.codec, gzip) the size of the generated files is the always the same, so it seems like spark context ignores the compression codec that I set. Here is a code sample applied via the spark shell: import org.apache.spark.sql.hive.HiveContext val hiveContext = new HiveContext(sc) hiveContext.sql(SET hive.exec.dynamic.partition = true) hiveContext.sql(SET hive.exec.dynamic.partition.mode = nonstrict) hiveContext.setConf(spark.sql.parquet.binaryAsString, true) // required to make data compatible with impala hiveContext.setConf(spark.sql.parquet.compression.codec, gzip) hiveContext.sql(create external table if not exists foo (bar STRING, ts INT) Partitioned by (year INT, month INT, day INT) STORED AS PARQUET Location 'hdfs://path/data/foo') hiveContext.sql(insert into table foo partition(year, month,day) select *, year(from_unixtime(ts)) as year, month(from_unixtime(ts)) as month, day(from_unixtime(ts)) as day from raw_foo) I tried that with spark 1.2 and 1.3 snapshot against hive 0.13 and I also tried that with Impala on the same cluster which applied correctly the compression codecs. Does anyone know what could be the problem ? Thanks, Ayoub. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-compression-codecs-not-applied-tp21058.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Parquet compression codecs not applied
Hello, I tried to save a table created via the hive context as a parquet file but whatever compression codec (uncompressed, snappy, gzip or lzo) I set via setConf like: setConf(spark.sql.parquet.compression.codec, gzip) the size of the generated files is the always the same, so it seems like spark context ignores the compression codec that I set. Here is a code sample applied via the spark shell: import org.apache.spark.sql.hive.HiveContext val hiveContext = new HiveContext(sc) hiveContext.sql(SET hive.exec.dynamic.partition = true) hiveContext.sql(SET hive.exec.dynamic.partition.mode = nonstrict) hiveContext.setConf(spark.sql.parquet.binaryAsString, true) // required to make data compatible with impala hiveContext.setConf(spark.sql.parquet.compression.codec, gzip) hiveContext.sql(create external table if not exists foo (bar STRING, ts INT) Partitioned by (year INT, month INT, day INT) STORED AS PARQUET Location 'hdfs://path/data/foo') hiveContext.sql(insert into table foo partition(year, month,day) select *, year(from_unixtime(ts)) as year, month(from_unixtime(ts)) as month, day(from_unixtime(ts)) as day from raw_foo) I tried that with spark 1.2 and 1.3 snapshot against hive 0.13 and I also tried that with Impala on the same cluster which applied correctly the compression codecs. Does anyone know what could be the problem ? Thanks, Ayoub.
Parquet compression codecs not applied
Hello, I tried to save a table created via the hive context as a parquet file but whatever compression codec (uncompressed, snappy, gzip or lzo) I set via setConf like: setConf(spark.sql.parquet.compression.codec, gzip) the size of the generated files is the always the same, so it seems like spark context ignores the compression codec that I set. Here is a code sample applied via the spark shell: import org.apache.spark.sql.hive.HiveContext val hiveContext = new HiveContext(sc) hiveContext.sql(SET hive.exec.dynamic.partition = true) hiveContext.sql(SET hive.exec.dynamic.partition.mode = nonstrict) hiveContext.setConf(spark.sql.parquet.binaryAsString, true) // required to make data compatible with impala hiveContext.setConf(spark.sql.parquet.compression.codec, gzip) hiveContext.sql(create external table if not exists foo (bar STRING, ts INT) Partitioned by (year INT, month INT, day INT) STORED AS PARQUET Location 'hdfs://path/data/foo') hiveContext.sql(insert into table foo partition(year, month,day) select *, year(from_unixtime(ts)) as year, month(from_unixtime(ts)) as month, day(from_unixtime(ts)) as day from raw_foo) I tried that with spark 1.2 and 1.3 snapshot against hive 0.13 and I also tried that with Impala on the same cluster which applied correctly the compression codecs. Does anyone know what could be the problem ? Thanks, Ayoub. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-compression-codecs-not-applied-tp21033.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RDD lineage and broadcast variables
I'm still wrapping my head around that fact that the data backing an RDD is immutable since an RDD may need to be reconstructed from its lineage at any point. In the context of clustering there are many iterations where an RDD may need to change (for instance cluster assignments, etc) based on a broadcast variable of a list of centroids which are objects that in turn contain a list of features. So immutability is all well and good for the purposes of being able to replay a lineage. But now I'm wondering, during each iterations in which this RDD goes through many transformations it will be transforming based on that broadcast variable of centroids that are mutable. How would it replay the lineage in this instance? Does a dependency on mutable variables mess up the whole lineage thing? Any help appreciated. Just trying to wrap my head around using Spark correctly. I will say it does seem like there is a common miss conception that Spark RDDs are in-memory arrays - but perhaps this is for a reason. Perhaps in some cases an option for mutability and failure exception is exactly what is needed for a one off algorithm that doesn't necessarily need resiliency. Just a thought.
RE: Java RDD Union
With that said, and the nature of iterative algorithms that Spark is advertised for, isn't this a bit of an unnecessary restriction since I don't see where the problem is. For instance, it is clear that when aggregating you need operations to be associative because of the way they are divided and combined. But since forEach works on an individual item the same problem doesn't exist. As an example, during a k-means algorithm you have to continually update cluster assignments per data item along with perhaps distance from centroid. So if you can't update items in place you have to literally create thousands upon thousands of RDDs. Does Spark have some kind of trick like reuse behind the scenes - fully persistent data objects or whatever. How can it possibly be efficient for 'iterative' algorithms when it is creating so many RDDs as opposed to one? From: so...@cloudera.com Date: Fri, 5 Dec 2014 14:58:37 -0600 Subject: Re: Java RDD Union To: ronalday...@live.com; user@spark.apache.org foreach also creates a new RDD, and does not modify an existing RDD. However, in practice, nothing stops you from fiddling with the Java objects inside an RDD when you get a reference to them in a method like this. This is definitely a bad idea, as there is certainly no guarantee that any other operations will see any, some or all of these edits. On Fri, Dec 5, 2014 at 2:40 PM, Ron Ayoub ronalday...@live.com wrote: I tricked myself into thinking it was uniting things correctly. I see I'm wrong now. I have a question regarding your comment that RDD are immutable. Can you change values in an RDD using forEach. Does that violate immutability. I've been using forEach to modify RDD but perhaps I've tricked myself once again into believing it is working. I have object reference so perhaps it is working serendipitously in local mode since the references are in fact not changing but there are referents are and somehow this will no longer work when clustering. Thanks for comments. From: so...@cloudera.com Date: Fri, 5 Dec 2014 14:22:38 -0600 Subject: Re: Java RDD Union To: ronalday...@live.com CC: user@spark.apache.org No, RDDs are immutable. union() creates a new RDD, and does not modify an existing RDD. Maybe this obviates the question. I'm not sure what you mean about releasing from memory. If you want to repartition the unioned RDD, you repartition the result of union(), not anything else. On Fri, Dec 5, 2014 at 1:27 PM, Ron Ayoub ronalday...@live.com wrote: I'm a bit confused regarding expected behavior of unions. I'm running on 8 cores. I have an RDD that is used to collect cluster associations (cluster id, content id, distance) for internal clusters as well as leaf clusters since I'm doing hierarchical k-means and need all distances for sorting documents appropriately upon examination. It appears that Union simply adds items in the argument to the RDD instance the method is called on rather than just returning a new RDD. If I want to do Union this was as more of an add/append should I be capturing the return value and releasing it from memory. Need help clarifying the semantics here. Also, in another related thread someone mentioned coalesce after union. Would I need to do the same on the instance RDD I'm calling Union on. Perhaps a method such as append would be useful and clearer. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Modifying an RDD in forEach
This is from a separate thread with a differently named title. Why can't you modify the actual contents of an RDD using forEach? It appears to be working for me. What I'm doing is changing cluster assignments and distances per data item for each iteration of the clustering algorithm. The clustering algorithm is massive and iterates thousands of times. As I understand it now, you are supposed to create new RDDs on each pass. This is a hierachical k-means that I'm doing and hence it is consist of many iterations rather than large iterations. So I understand the restriction of why operation when aggregating and reducing etc, need to be associative. However, forEach operates on a single item. So being that Spark is advertised as being great for iterative algorithms since it operates in-memory, how can it be good to create thousands upon thousands of RDDs during the course of an iterative algorithm? Does Spark have some kind of trick like reuse behind the scenes - fully persistent data objects or whatever? How can it possibly be efficient for 'iterative' algorithms when it is creating so many RDDs as opposed to one? Or is the answer that I should keep doing what I'm doing because it is working even though it is not theoretically sound and aligned with functional ideas. I personally just want it to be fast and be able to operate on up to 500 million data items.
RE: Java RDD Union
Hiearchical K-means require a massive amount of iterations whereas flat K-means does not but I've found flat to be generally useless since in most UIs it is nice to be able to drill down into more and more specific clusters. If you have 100 million documents and your branching factor is 8 (8-secting k-means) then you will be picking a cluster to split and iterating thousands of times. So per split you iterate maybe 6 or 7 times to get new cluster assignments and there are ultimately going to be 5,000 to 50,000 splits depending on split criterion and cluster variances etc... In this case fault tolerance doesn't matter. I've found that the distributed aspect of RDD is what I'm looking for and don't care or need the resilience part as much. It is a one off algorithm and that can just be run again if something goes wrong. Once the data is created it is done with Spark. But anyway, that is the very thing Spark is advertised for. From: so...@cloudera.com Date: Sat, 6 Dec 2014 06:39:10 -0600 Subject: Re: Java RDD Union To: ronalday...@live.com CC: user@spark.apache.org I guess a major problem with this is that you lose fault tolerance. You have no way of recreating the local state of the mutable RDD if a partition is lost. Why would you need thousands of RDDs for kmeans? it's a few per iteration. An RDD is more bookkeeping that data structure, itself. They don't inherently take up resource, unless you mark them to be persisted. You're paying the cost of copying objects to create one RDD from next, but that's mostly it. On Sat, Dec 6, 2014 at 6:28 AM, Ron Ayoub ronalday...@live.com wrote: With that said, and the nature of iterative algorithms that Spark is advertised for, isn't this a bit of an unnecessary restriction since I don't see where the problem is. For instance, it is clear that when aggregating you need operations to be associative because of the way they are divided and combined. But since forEach works on an individual item the same problem doesn't exist. As an example, during a k-means algorithm you have to continually update cluster assignments per data item along with perhaps distance from centroid. So if you can't update items in place you have to literally create thousands upon thousands of RDDs. Does Spark have some kind of trick like reuse behind the scenes - fully persistent data objects or whatever. How can it possibly be efficient for 'iterative' algorithms when it is creating so many RDDs as opposed to one? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Modifying an RDD in forEach
These are very interesting comments. The vast majority of cases I'm working on are going to be in the 3 million range and 100 million was thrown out as something to shoot for. I upped it to 500 million. But all things considering, I believe I may be able to directly translate what I have to Java Streams API and run 100 million docs on 32 cores in under an hour or two which would suit our needs. Up until this point I've been focused on computational aspect If I can scale up to clustering 100 million documents on a single machine I can probably directly translate what I have to Java Streams API and be faster. It is that scaling out that changes things. These are interesting comments. I think in this hierarchical k-means case the lazy evaluation becomes almost useless and perhaps even an impediment. Part of the problem is that I've been a bit too focused on math/information retrieval and have to update a bit on functional approach to programming so I can better utilize the tools But it does appear that Spark may not be the best option for this need. I don't need resiliency or fault tolerance as much as I need to be able to execute an algorithm on a large amount of data fast and then be done with it. I'm now thinking that in the 100 million document range I may be ok clustering feature vectors with no more than 25 features per doc on a single machine with 32 cores and a load of memory. I might directly translate what I have to Java 8 Streams API. There is also questions of proportion. Perhaps what I have is not big enough to warrant or require scaling out. I may have other uses for Spark in traditional map-reduce algorithms such as counting pairs of shared shingles for near dupe detection but to this point I've found Oracles parallel-pipelined table functions, while not glamorous are doing quite well in DB. I'm just a bit confused still on why it is advertised ideal for iterative algorithms when iterative algorithms have that point per iteration where things do get evaluated and laziness is not terribly useful. Ideal for massive in-memory cluster computing yes - but iterative... ? not sure. I have that book Functional Programming in Scala and I hope to read it someday and enrich my understanding here. Subject: Re: Modifying an RDD in forEach From: mohitja...@gmail.com Date: Sat, 6 Dec 2014 13:13:50 -0800 CC: ronalday...@live.com; user@spark.apache.org To: mayur.rust...@gmail.com Ron,“appears to be working” might be true when there are no failures. on large datasets being processed on a large number of machines, failures of several types(server, network, disk etc) can happen. At that time, Spark will not “know” that you changed the RDD in-place and will use any version of any partition of the RDD to be retried. Retries require idempotency and that is difficult without immutability. I believe, this is one of the primary reasons for making RDDs immutable in Spark (mutable isn't even an option worth considering). In general mutating something in a distributed system is a hard problem. It can be solved (e.g. in NoSQL or newSQL databases) but Spark is not a transactional data store. If you are building an iterative machine learning algorithm which usually have a “reduce” step at the end of every iteration, then the lazy evaluation is unlikely to be useful. On the other hand, if these intermediate RDDs stay in the young generation of the JVM heap [I am not sure if RDD cache management somehow changes this, so I could be wrong] they are garbage collected quickly and with very little overhead. This is the price of scaling out :-)Hope this helps,Mohit. On Dec 6, 2014, at 5:02 AM, Mayur Rustagi mayur.rust...@gmail.com wrote:You'll benefit by viewing Matei's talk in Yahoo on Spark internals and how it optimizes execution of iterative jobs. Simple answer is 1. Spark doesn't materialize RDD when you do an iteration but lazily captures the transformation functions in RDD.(only function and closure , no data operation actually happens) 2. When you finally execute and want to cause effects (save to disk , collect on master etc) it views the DAG of execution and optimizes what it can reason (eliminating intermediate states , performing multiple Transformations in one tasks, leveraging partitioning where available among others) Bottom line it doesn't matter how many RDD you have in your DAG chain as long as Spark can optimize the functions in that DAG to create minimal materialization on its way to final output. Regards Mayur On 06-Dec-2014 6:12 pm, Ron Ayoub ronalday...@live.com wrote: This is from a separate thread with a differently named title. Why can't you modify the actual contents of an RDD using forEach? It appears to be working for me. What I'm doing is changing cluster assignments and distances per data item for each iteration of the clustering algorithm. The clustering algorithm is massive and iterates thousands of times. As I understand
Java RDD Union
I'm a bit confused regarding expected behavior of unions. I'm running on 8 cores. I have an RDD that is used to collect cluster associations (cluster id, content id, distance) for internal clusters as well as leaf clusters since I'm doing hierarchical k-means and need all distances for sorting documents appropriately upon examination. It appears that Union simply adds items in the argument to the RDD instance the method is called on rather than just returning a new RDD. If I want to do Union this was as more of an add/append should I be capturing the return value and releasing it from memory. Need help clarifying the semantics here. Also, in another related thread someone mentioned coalesce after union. Would I need to do the same on the instance RDD I'm calling Union on. Perhaps a method such as append would be useful and clearer.
collecting fails - requirements for collecting (clone, hashCode etc?)
The following code is failing on the collect. If I don't do the collect and go with a JavaRDDDocument it works fine. Except I really would like to collect. At first I was getting an error regarding JDI threads and an index being 0. Then it just started locking up. I'm running the spark context locally on 8 cores. long count = documents.filter(d - d.getFeatures().size() Parameters.MIN_CENTROID_FEATURES).count(); ListDocument sampledDocuments = documents.filter(d - d.getFeatures().size() Parameters.MIN_CENTROID_FEATURES) .sample(false, samplingFraction(count)).collect();
RE: collecting fails - requirements for collecting (clone, hashCode etc?)
I didn't realize I do get a nice stack trace if not running in debug mode. Basically, I believe Document has to be serializable. But since the question has already been asked, are the other requirements for objects within an RDD that I should be aware of. serializable is very understandable. How about clone, hashCode, etc... From: ronalday...@live.com To: user@spark.apache.org Subject: collecting fails - requirements for collecting (clone, hashCode etc?) Date: Wed, 3 Dec 2014 07:48:53 -0600 The following code is failing on the collect. If I don't do the collect and go with a JavaRDDDocument it works fine. Except I really would like to collect. At first I was getting an error regarding JDI threads and an index being 0. Then it just started locking up. I'm running the spark context locally on 8 cores. long count = documents.filter(d - d.getFeatures().size() Parameters.MIN_CENTROID_FEATURES).count(); ListDocument sampledDocuments = documents.filter(d - d.getFeatures().size() Parameters.MIN_CENTROID_FEATURES) .sample(false, samplingFraction(count)).collect();
Example of Fold
I'm want to fold an RDD into a smaller RDD with max elements. I have simple bean objects with 4 properties. I want to group by 3 of the properties and then select the max of the 4th. So I believe fold is the appropriate method for this. My question is, is there a good fold example out there. Additionally, what it the zero value used for as the first argument? Thanks.
winutils
Apparently Spark does require Hadoop even if you do not intend to use Hadoop. Is there a workaround for the below error I get when creating the SparkContext in Scala? I will note that I didn't have this problem yesterday when creating the Spark context in Java as part of the getting started App. It could be because I was using Maven project to manage dependencies and that did something for me or else JavaSparkContext has some different code. I would say, in order for Spark to be general purpose this is a pretty big bug since now it appears Spark depends upon Hadoop. Could not locate executable null\bin\winutils.exe in the Hadoop binaries
RE: winutils
Well. I got past this problem and the manner was in my own email. I did download the one with Hadoop since it was among the only ones you don't have to compile from source along with CDH and Map. It worked yesterday because I added 1.1.0 as a maven dependency from the repository. I just did the same thing again and it worked perfect. One peculiarity I will mention is that even with Scala IDE installed in Eclipse when I created the Maven project per instructions on the web and installed the connector I still did not get the Scala perspective nor right clicking and being able to add Scala types. This time around, I used the Scala IDE project wizard to create a simple non-Maven app and then converted it to Maven and all features seem to work fine. I will also note that I'm learning Java, Scala, Eclipse, Spark, Maven all at the same time. Kind of overkill. But part of the frustration was following along with the Maven Scala project instructions using an archetype badly out of date. So now I think I found the a good approach to getting up and running with spark (1. Eclipse, 2. Scala IDE, 3. Scala Wizard Project, 4. Convert to Maven, 5. Add Spark dependency). Date: Wed, 29 Oct 2014 11:38:23 -0700 Subject: Re: winutils From: denny.g@gmail.com To: ronalday...@live.com CC: user@spark.apache.org QQ - did you download the Spark 1.1 binaries that included the Hadoop one? Does this happen if you're using the Spark 1.1 binaries that do not include the Hadoop jars? On Wed, Oct 29, 2014 at 11:31 AM, Ron Ayoub ronalday...@live.com wrote: Apparently Spark does require Hadoop even if you do not intend to use Hadoop. Is there a workaround for the below error I get when creating the SparkContext in Scala? I will note that I didn't have this problem yesterday when creating the Spark context in Java as part of the getting started App. It could be because I was using Maven project to manage dependencies and that did something for me or else JavaSparkContext has some different code. I would say, in order for Spark to be general purpose this is a pretty big bug since now it appears Spark depends upon Hadoop. Could not locate executable null\bin\winutils.exe in the Hadoop binaries
RE: winutils
Well. I got past this problem and the manner was in my own email. I did download the one with Hadoop since it was among the only ones you don't have to compile from source along with CDH and Map. It worked yesterday because I added 1.1.0 as a maven dependency from the repository. I just did the same thing again and it worked perfect. One peculiarity I will mention is that even with Scala IDE installed in Eclipse when I created the Maven project per instructions on the web and installed the connector I still did not get the Scala perspective nor right clicking and being able to add Scala types. This time around, I used the Scala IDE project wizard to create a simple non-Maven app and then converted it to Maven and all features seem to work fine. I will also note that I'm learning Java, Scala, Eclipse, Spark, Maven all at the same time. Kind of overkill. But part of the frustration was following along with the Maven Scala project instructions using an archetype badly out of date. So now I think I found the a good approach to getting up and running with spark (1. Eclipse, 2. Scala IDE, 3. Scala Wizard Project, 4. Convert to Maven, 5. Add Spark dependency). Date: Wed, 29 Oct 2014 11:38:23 -0700 Subject: Re: winutils From: denny.g@gmail.com To: ronalday...@live.com CC: user@spark.apache.org QQ - did you download the Spark 1.1 binaries that included the Hadoop one? Does this happen if you're using the Spark 1.1 binaries that do not include the Hadoop jars? On Wed, Oct 29, 2014 at 11:31 AM, Ron Ayoub ronalday...@live.com wrote: Apparently Spark does require Hadoop even if you do not intend to use Hadoop. Is there a workaround for the below error I get when creating the SparkContext in Scala? I will note that I didn't have this problem yesterday when creating the Spark context in Java as part of the getting started App. It could be because I was using Maven project to manage dependencies and that did something for me or else JavaSparkContext has some different code. I would say, in order for Spark to be general purpose this is a pretty big bug since now it appears Spark depends upon Hadoop. Could not locate executable null\bin\winutils.exe in the Hadoop binaries
JdbcRDD in Java
The following line of code is indicating the constructor is not defined. The only examples I can find of usage of JdbcRDD is Scala examples. Does this work in Java? Is there any examples? Thanks. JdbcRDDInteger rdd = new JdbcRDDInteger(sp, () - ods.getConnection(), sql, 1, 1783059, 10, (ResultSet row) - row.getInt(FEATURE_ID));
Is Spark in Java a bad idea?
I haven't learned Scala yet so as you might imagine I'm having challenges working with Spark from the Java API. For one thing, it seems very limited in comparison to Scala. I ran into a problem really quick. I need to hydrate an RDD from JDBC/Oracle and so I wanted to use the JdbcRDD. But that is part of the spark api and I'm unable to get the compiler to accept various parameters. I looked at the code and I noticed that JdbcRDD doesn't add much value and just implements compute and partition. I figured I can do that myself with better looking JDBC code. So I created a class inheriting from RDD that was heavily decorated with stuff I have never seen before. Next, I recalled that I have to use the JavaRDD. Of course, that class doesn't have those methods that you can override. From where I'm standing right now, it really appears that Spark doesn't really support Java and that if you really want to use it you need to learn Scala. Is this a correct assessment?
RE: Is Spark in Java a bad idea?
I interpret this to mean you have to learn Scala in order to work with Spark in Scala (goes without saying) and also to work with Spark in Java (since you have to jump through some hoops for basic functionality). The best path here is to take this as a learning opportunity and sit down and learn Scala. Regarding RDD being an internal API, it has two methods that clearly allow you to override them which the JdbcRDD does and it looks close to trivial - if I only new Scala. Once I learn Scala, I would say the first thing I plan on doing is writing my own OracleRDD with my own flavor of Jdbc code. Why would this not be advisable? Subject: Re: Is Spark in Java a bad idea? From: matei.zaha...@gmail.com Date: Tue, 28 Oct 2014 11:56:39 -0700 CC: u...@spark.incubator.apache.org To: isasmani@gmail.com A pretty large fraction of users use Java, but a few features are still not available in it. JdbcRDD is one of them -- this functionality will likely be superseded by Spark SQL when we add JDBC as a data source. In the meantime, to use it, I'd recommend writing a class in Scala that has Java-friendly methods and getting an RDD to it from that. Basically the two parameters that weren't friendly there were the ClassTag and the getConnection and mapRow functions. Subclassing RDD in Java is also not really supported, because that's an internal API. We don't expect users to be defining their own RDDs. Matei On Oct 28, 2014, at 11:47 AM, critikaled isasmani@gmail.com wrote: Hi Ron, what ever api you have in scala you can possibly use it form java. scala is inter-operable with java and vice versa. scala being both object oriented and functional will make your job easier on jvm and it is more consise than java. Take it as an opportunity and start learning scala ;). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-Spark-in-Java-a-bad-idea-tp17534p17538.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark to eliminate full-table scan latency
We have a table containing 25 features per item id along with feature weights. A correlation matrix can be constructed for every feature pair based on co-occurrence. If a user inputs a feature they can find out the features that are correlated with a self-join requiring a single full table scan. This results in high latency for big data (10 seconds +) due to the IO involved in the full table scan. My idea is for this feature the data can be loaded into an RDD and transformations and actions can be applied to find out per query what are the correlated features. I'm pretty sure Spark can do this sort of thing. Since I'm new, what I'm not sure about is, is Spark appropriate as a server application? For instance, the drive application would have to load the RDD and then listen for request and return results, perhaps using a socket? Are there any libraries to facilitate this sort of Spark server app? So I understand how Spark can be used to grab data, run algorithms, and put results back but is it appropriate as the engine of a server app and what are the general patterns involved?
RE: Spark to eliminate full-table scan latency
This does look like it provides a good way to allow other process to access the contents of an RDD in a separate app? Is there any other general purpose mechanism for serving up RDD data? I understand that the driver app and workers all are app specific and run in separate executors but would be cool if there was some general way to create a server app based on Spark. Perhaps Spark SQL is that general way and I'll soon find out. Thanks. From: mich...@databricks.com Date: Mon, 27 Oct 2014 14:35:46 -0700 Subject: Re: Spark to eliminate full-table scan latency To: ronalday...@live.com CC: user@spark.apache.org You can access cached data in spark through the JDBC server: http://spark.apache.org/docs/latest/sql-programming-guide.html#running-the-thrift-jdbc-server On Mon, Oct 27, 2014 at 1:47 PM, Ron Ayoub ronalday...@live.com wrote: We have a table containing 25 features per item id along with feature weights. A correlation matrix can be constructed for every feature pair based on co-occurrence. If a user inputs a feature they can find out the features that are correlated with a self-join requiring a single full table scan. This results in high latency for big data (10 seconds +) due to the IO involved in the full table scan. My idea is for this feature the data can be loaded into an RDD and transformations and actions can be applied to find out per query what are the correlated features. I'm pretty sure Spark can do this sort of thing. Since I'm new, what I'm not sure about is, is Spark appropriate as a server application? For instance, the drive application would have to load the RDD and then listen for request and return results, perhaps using a socket? Are there any libraries to facilitate this sort of Spark server app? So I understand how Spark can be used to grab data, run algorithms, and put results back but is it appropriate as the engine of a server app and what are the general patterns involved?