Re: spark 2.0 readStream from a REST API
Why writeStream is needed to consume the data ? When I tried it I got this exception: INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint > org.apache.spark.sql.AnalysisException: Complete output mode not supported > when there are no streaming aggregations on streaming DataFrames/Datasets; > at > org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:173) > at > org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:65) > at > org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:236) > at > org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:287) > at .(:59) 2016-08-01 18:44 GMT+02:00 Amit Sela <amitsel...@gmail.com>: > I think you're missing: > > val query = wordCounts.writeStream > > .outputMode("complete") > .format("console") > .start() > > Dis it help ? > > On Mon, Aug 1, 2016 at 2:44 PM Jacek Laskowski <ja...@japila.pl> wrote: > >> On Mon, Aug 1, 2016 at 11:01 AM, Ayoub Benali >> <benali.ayoub.i...@gmail.com> wrote: >> >> > the problem now is that when I consume the dataframe for example with >> count >> > I get the stack trace below. >> >> Mind sharing the entire pipeline? >> >> > I followed the implementation of TextSocketSourceProvider to implement >> my >> > data source and Text Socket source is used in the official documentation >> > here. >> >> Right. Completely forgot about the provider. Thanks for reminding me >> about it! >> >> Pozdrawiam, >> Jacek Laskowski >> >> https://medium.com/@jaceklaskowski/ >> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark >> Follow me at https://twitter.com/jaceklaskowski >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >>
Re: spark 2.0 readStream from a REST API
Hello, here is the code I am trying to run: https://gist.github.com/ayoub-benali/a96163c711b4fce1bdddf16b911475f2 Thanks, Ayoub. 2016-08-01 13:44 GMT+02:00 Jacek Laskowski <ja...@japila.pl>: > On Mon, Aug 1, 2016 at 11:01 AM, Ayoub Benali > <benali.ayoub.i...@gmail.com> wrote: > > > the problem now is that when I consume the dataframe for example with > count > > I get the stack trace below. > > Mind sharing the entire pipeline? > > > I followed the implementation of TextSocketSourceProvider to implement my > > data source and Text Socket source is used in the official documentation > > here. > > Right. Completely forgot about the provider. Thanks for reminding me about > it! > > Pozdrawiam, > Jacek Laskowski > > https://medium.com/@jaceklaskowski/ > Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark > Follow me at https://twitter.com/jaceklaskowski >
Re: spark 2.0 readStream from a REST API
Hello, using the full class name worked, thanks. the problem now is that when I consume the dataframe for example with count I get the stack trace below. I followed the implementation of TextSocketSourceProvider <https://github.com/apache/spark/blob/9e2c763dbb5ac6fc5d2eb0759402504d4b9073a4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala#L128> to implement my data source and Text Socket source is used in the official documentation here <https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#quick-example> . Why does count works in the example documentation? is there some other trait that need to be implemented ? Thanks, Ayoub. org.apache.spark.sql.AnalysisException: Queries with streaming sources must > be executed with writeStream.start(); > at > org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:173) > at > org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:33) > at > org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:31) > at > org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125) > at scala.collection.immutable.List.foreach(List.scala:318) > at > org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125) > at > org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:31) > at > org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:59) > at > org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:70) > at > org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:68) > at > org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:74) > at > org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:74) > at > org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:78) > at > org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:76) > at > org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:83) > at > org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:83) > at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2541) > at org.apache.spark.sql.Dataset.count(Dataset.scala:2216) 2016-07-31 21:56 GMT+02:00 Michael Armbrust <mich...@databricks.com>: > You have to add a file in resource too (example > <https://github.com/apache/spark/blob/master/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister>). > Either that or give a full class name. > > On Sun, Jul 31, 2016 at 9:45 AM, Ayoub Benali <benali.ayoub.i...@gmail.com > > wrote: > >> Looks like the way to go in spark 2.0 is to implement >> StreamSourceProvider >> <https://github.com/apache/spark/blob/9e2c763dbb5ac6fc5d2eb0759402504d4b9073a4/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L117> >> with DataSourceRegister >> <https://github.com/apache/spark/blob/9e2c763dbb5ac6fc5d2eb0759402504d4b9073a4/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L40>. >> But now spark fails at loading the class when doing: >> >> spark.readStream.format("mysource").load() >> >> I get : >> >> java.lang.ClassNotFoundException: Failed to find data source: mysource. >> Please find packages at http://spark-packages.org >> >> Is there something I need to do in order to "load" the Stream source >> provider ? >> >> Thanks, >> Ayoub >> >> 2016-07-31 17:19 GMT+02:00 Jacek Laskowski <ja...@japila.pl>: >> >>> On Sun, Jul 31, 2016 at 12:53 PM, Ayoub Benali >>> <benali.ayoub.i...@gmail.com> wrote: >>> >>> > I started playing with the Structured Streaming API in spark 2.0 and I >>> am >>> > looking for a way to create streaming Dataset/Dataframe from a rest >>> HTTP >>> > endpoint but I am bit stuck. >>> >>> What a great idea! Why did I myself not think about this?!?! >>> >>> > What would be the easiest way to hack around it ? Do
Re: spark 2.0 readStream from a REST API
Looks like the way to go in spark 2.0 is to implement StreamSourceProvider <https://github.com/apache/spark/blob/9e2c763dbb5ac6fc5d2eb0759402504d4b9073a4/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L117> with DataSourceRegister <https://github.com/apache/spark/blob/9e2c763dbb5ac6fc5d2eb0759402504d4b9073a4/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L40>. But now spark fails at loading the class when doing: spark.readStream.format("mysource").load() I get : java.lang.ClassNotFoundException: Failed to find data source: mysource. Please find packages at http://spark-packages.org Is there something I need to do in order to "load" the Stream source provider ? Thanks, Ayoub 2016-07-31 17:19 GMT+02:00 Jacek Laskowski <ja...@japila.pl>: > On Sun, Jul 31, 2016 at 12:53 PM, Ayoub Benali > <benali.ayoub.i...@gmail.com> wrote: > > > I started playing with the Structured Streaming API in spark 2.0 and I am > > looking for a way to create streaming Dataset/Dataframe from a rest HTTP > > endpoint but I am bit stuck. > > What a great idea! Why did I myself not think about this?!?! > > > What would be the easiest way to hack around it ? Do I need to implement > the > > Datasource API ? > > Yes and perhaps Hadoop API too, but not sure which one exactly since I > haven't even thought about it (not even once). > > > Are there examples on how to create a DataSource from a REST endpoint ? > > Never heard of one. > > I'm hosting a Spark/Scala meetup this week so I'll definitely propose > it as a topic. Thanks a lot! > > Pozdrawiam, > Jacek Laskowski > > https://medium.com/@jaceklaskowski/ > Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark > Follow me at https://twitter.com/jaceklaskowski >
spark 2.0 readStream from a REST API
Hello, I started playing with the Structured Streaming API in spark 2.0 and I am looking for a way to create streaming Dataset/Dataframe from a rest HTTP endpoint but I am bit stuck. "readStream" in SparkSession has a json method but this one is expecting a path (s3, hdfs, etc) and I want to avoid having to save the data on s3 and then read again. What would be the easiest way to hack around it ? Do I need to implement the Datasource API ? Are there examples on how to create a DataSource from a REST endpoint ? Best, Ayoub
Re: RDD[Future[T]] = Future[RDD[T]]
It doesn't work because mapPartitions expects a function f:(Iterator[T]) ⇒ Iterator[U] while .sequence wraps the iterator in a Future 2015-07-26 22:25 GMT+02:00 Ignacio Blasco elnopin...@gmail.com: Maybe using mapPartitions and .sequence inside it? El 26/7/2015 10:22 p. m., Ayoub benali.ayoub.i...@gmail.com escribió: Hello, I am trying to convert the result I get after doing some async IO : val rdd: RDD[T] = // some rdd val result: RDD[Future[T]] = rdd.map(httpCall) Is there a way collect all futures once they are completed in a *non blocking* (i.e. without scala.concurrent Await) and lazy way? If the RDD was a standard scala collection then calling scala.concurrent.Future.sequence would have resolved the issue but RDD is not a TraversableOnce (which is required by the method). Is there a way to do this kind of transformation with an RDD[Future[T]] ? Thanks, Ayoub. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-Future-T-Future-RDD-T-tp24000.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SQL JSON array operations
You could try yo use hive context which bring HiveQL, it would allow you to query nested structures using LATERAL VIEW explode... On Jan 15, 2015 4:03 PM, jvuillermet jeremy.vuiller...@gmail.com wrote: let's say my json file lines looks like this {user: baz, tags : [foo, bar] } sqlContext.jsonFile(data.json) ... How could I query for user with bar tags using SQL sqlContext.sql(select user from users where tags ?contains? 'bar' ) I could simplify the request and use the returned RDD to filter on tags but I'm exploring an app where users can write their SQL queries -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SQL-JSON-array-operations-tp21164.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Parquet compression codecs not applied
it worked thanks. this doc page https://spark.apache.org/docs/1.2.0/sql-programming-guide.htmlrecommends to use spark.sql.parquet.compression.codec to set the compression coded and I thought this setting would be forwarded to the hive context given that HiveContext extends SQLContext, but it was not. I am wondering if this behavior is normal, if not I could open an issue with a potential fix so that spark.sql.parquet.compression.codec would be translated to parquet.compression in the hive context ? Or the documentation should be updated to mention that the compression coded is set differently with HiveContext. Ayoub. 2015-01-09 17:51 GMT+01:00 Michael Armbrust mich...@databricks.com: This is a little confusing, but that code path is actually going through hive. So the spark sql configuration does not help. Perhaps, try: set parquet.compression=GZIP; On Fri, Jan 9, 2015 at 2:41 AM, Ayoub benali.ayoub.i...@gmail.com wrote: Hello, I tried to save a table created via the hive context as a parquet file but whatever compression codec (uncompressed, snappy, gzip or lzo) I set via setConf like: setConf(spark.sql.parquet.compression.codec, gzip) the size of the generated files is the always the same, so it seems like spark context ignores the compression codec that I set. Here is a code sample applied via the spark shell: import org.apache.spark.sql.hive.HiveContext val hiveContext = new HiveContext(sc) hiveContext.sql(SET hive.exec.dynamic.partition = true) hiveContext.sql(SET hive.exec.dynamic.partition.mode = nonstrict) hiveContext.setConf(spark.sql.parquet.binaryAsString, true) // required to make data compatible with impala hiveContext.setConf(spark.sql.parquet.compression.codec, gzip) hiveContext.sql(create external table if not exists foo (bar STRING, ts INT) Partitioned by (year INT, month INT, day INT) STORED AS PARQUET Location 'hdfs://path/data/foo') hiveContext.sql(insert into table foo partition(year, month,day) select *, year(from_unixtime(ts)) as year, month(from_unixtime(ts)) as month, day(from_unixtime(ts)) as day from raw_foo) I tried that with spark 1.2 and 1.3 snapshot against hive 0.13 and I also tried that with Impala on the same cluster which applied correctly the compression codecs. Does anyone know what could be the problem ? Thanks, Ayoub. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-compression-codecs-not-applied-tp21058.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Parquet compression codecs not applied
Hello, I tried to save a table created via the hive context as a parquet file but whatever compression codec (uncompressed, snappy, gzip or lzo) I set via setConf like: setConf(spark.sql.parquet.compression.codec, gzip) the size of the generated files is the always the same, so it seems like spark context ignores the compression codec that I set. Here is a code sample applied via the spark shell: import org.apache.spark.sql.hive.HiveContext val hiveContext = new HiveContext(sc) hiveContext.sql(SET hive.exec.dynamic.partition = true) hiveContext.sql(SET hive.exec.dynamic.partition.mode = nonstrict) hiveContext.setConf(spark.sql.parquet.binaryAsString, true) // required to make data compatible with impala hiveContext.setConf(spark.sql.parquet.compression.codec, gzip) hiveContext.sql(create external table if not exists foo (bar STRING, ts INT) Partitioned by (year INT, month INT, day INT) STORED AS PARQUET Location 'hdfs://path/data/foo') hiveContext.sql(insert into table foo partition(year, month,day) select *, year(from_unixtime(ts)) as year, month(from_unixtime(ts)) as month, day(from_unixtime(ts)) as day from raw_foo) I tried that with spark 1.2 and 1.3 snapshot against hive 0.13 and I also tried that with Impala on the same cluster which applied correctly the compression codecs. Does anyone know what could be the problem ? Thanks, Ayoub.