Re: spark 2.0 readStream from a REST API

2016-08-02 Thread Ayoub Benali
Why writeStream is needed to consume the data ?

When I tried it I got this exception:

INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
> org.apache.spark.sql.AnalysisException: Complete output mode not supported
> when there are no streaming aggregations on streaming DataFrames/Datasets;
> at
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:173)
> at
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:65)
> at
> org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:236)
> at
> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:287)
> at .(:59)




2016-08-01 18:44 GMT+02:00 Amit Sela <amitsel...@gmail.com>:

> I think you're missing:
>
> val query = wordCounts.writeStream
>
>   .outputMode("complete")
>   .format("console")
>   .start()
>
> Dis it help ?
>
> On Mon, Aug 1, 2016 at 2:44 PM Jacek Laskowski <ja...@japila.pl> wrote:
>
>> On Mon, Aug 1, 2016 at 11:01 AM, Ayoub Benali
>> <benali.ayoub.i...@gmail.com> wrote:
>>
>> > the problem now is that when I consume the dataframe for example with
>> count
>> > I get the stack trace below.
>>
>> Mind sharing the entire pipeline?
>>
>> > I followed the implementation of TextSocketSourceProvider to implement
>> my
>> > data source and Text Socket source is used in the official documentation
>> > here.
>>
>> Right. Completely forgot about the provider. Thanks for reminding me
>> about it!
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: spark 2.0 readStream from a REST API

2016-08-02 Thread Ayoub Benali
Hello,

here is the code I am trying to run:


https://gist.github.com/ayoub-benali/a96163c711b4fce1bdddf16b911475f2

Thanks,
Ayoub.

2016-08-01 13:44 GMT+02:00 Jacek Laskowski <ja...@japila.pl>:

> On Mon, Aug 1, 2016 at 11:01 AM, Ayoub Benali
> <benali.ayoub.i...@gmail.com> wrote:
>
> > the problem now is that when I consume the dataframe for example with
> count
> > I get the stack trace below.
>
> Mind sharing the entire pipeline?
>
> > I followed the implementation of TextSocketSourceProvider to implement my
> > data source and Text Socket source is used in the official documentation
> > here.
>
> Right. Completely forgot about the provider. Thanks for reminding me about
> it!
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>


Re: spark 2.0 readStream from a REST API

2016-08-01 Thread Ayoub Benali
Hello,

using the full class name worked, thanks.

the problem now is that when I consume the dataframe for example with count
I get the stack trace below.

I followed the implementation of TextSocketSourceProvider
<https://github.com/apache/spark/blob/9e2c763dbb5ac6fc5d2eb0759402504d4b9073a4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala#L128>
to
implement my data source and Text Socket source is used in the official
documentation here
<https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#quick-example>
.

Why does count works in the example documentation? is there some other
trait that need to be implemented ?

Thanks,
Ayoub.


org.apache.spark.sql.AnalysisException: Queries with streaming sources must
> be executed with writeStream.start();
> at
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:173)
> at
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:33)
> at
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:31)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125)
> at
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:31)
> at
> org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:59)
> at
> org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:70)
> at
> org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:68)
> at
> org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:74)
> at
> org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:74)
> at
> org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:78)
> at
> org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:76)
> at
> org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:83)
> at
> org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:83)
> at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2541)
> at org.apache.spark.sql.Dataset.count(Dataset.scala:2216)







2016-07-31 21:56 GMT+02:00 Michael Armbrust <mich...@databricks.com>:

> You have to add a file in resource too (example
> <https://github.com/apache/spark/blob/master/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister>).
> Either that or give a full class name.
>
> On Sun, Jul 31, 2016 at 9:45 AM, Ayoub Benali <benali.ayoub.i...@gmail.com
> > wrote:
>
>> Looks like the way to go in spark 2.0 is to implement
>> StreamSourceProvider
>> <https://github.com/apache/spark/blob/9e2c763dbb5ac6fc5d2eb0759402504d4b9073a4/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L117>
>>  with DataSourceRegister
>> <https://github.com/apache/spark/blob/9e2c763dbb5ac6fc5d2eb0759402504d4b9073a4/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L40>.
>> But now spark fails at loading the class when doing:
>>
>> spark.readStream.format("mysource").load()
>>
>> I get :
>>
>> java.lang.ClassNotFoundException: Failed to find data source: mysource.
>> Please find packages at http://spark-packages.org
>>
>> Is there something I need to do in order to "load" the Stream source
>> provider ?
>>
>> Thanks,
>> Ayoub
>>
>> 2016-07-31 17:19 GMT+02:00 Jacek Laskowski <ja...@japila.pl>:
>>
>>> On Sun, Jul 31, 2016 at 12:53 PM, Ayoub Benali
>>> <benali.ayoub.i...@gmail.com> wrote:
>>>
>>> > I started playing with the Structured Streaming API in spark 2.0 and I
>>> am
>>> > looking for a way to create streaming Dataset/Dataframe from a rest
>>> HTTP
>>> > endpoint but I am bit stuck.
>>>
>>> What a great idea! Why did I myself not think about this?!?!
>>>
>>> > What would be the easiest way to hack around it ? Do

Re: spark 2.0 readStream from a REST API

2016-07-31 Thread Ayoub Benali
Looks like the way to go in spark 2.0 is to implement StreamSourceProvider
<https://github.com/apache/spark/blob/9e2c763dbb5ac6fc5d2eb0759402504d4b9073a4/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L117>
 with DataSourceRegister
<https://github.com/apache/spark/blob/9e2c763dbb5ac6fc5d2eb0759402504d4b9073a4/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L40>.
But now spark fails at loading the class when doing:

spark.readStream.format("mysource").load()

I get :

java.lang.ClassNotFoundException: Failed to find data source: mysource.
Please find packages at http://spark-packages.org

Is there something I need to do in order to "load" the Stream source
provider ?

Thanks,
Ayoub

2016-07-31 17:19 GMT+02:00 Jacek Laskowski <ja...@japila.pl>:

> On Sun, Jul 31, 2016 at 12:53 PM, Ayoub Benali
> <benali.ayoub.i...@gmail.com> wrote:
>
> > I started playing with the Structured Streaming API in spark 2.0 and I am
> > looking for a way to create streaming Dataset/Dataframe from a rest HTTP
> > endpoint but I am bit stuck.
>
> What a great idea! Why did I myself not think about this?!?!
>
> > What would be the easiest way to hack around it ? Do I need to implement
> the
> > Datasource API ?
>
> Yes and perhaps Hadoop API too, but not sure which one exactly since I
> haven't even thought about it (not even once).
>
> > Are there examples on how to create a DataSource from a REST endpoint ?
>
> Never heard of one.
>
> I'm hosting a Spark/Scala meetup this week so I'll definitely propose
> it as a topic. Thanks a lot!
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>


spark 2.0 readStream from a REST API

2016-07-31 Thread Ayoub Benali
Hello,

I started playing with the Structured Streaming API in spark 2.0 and I am
looking for a way to create streaming Dataset/Dataframe from a rest HTTP
endpoint but I am bit stuck.

"readStream" in SparkSession has a json method but this one is expecting a
path (s3, hdfs, etc) and I want to avoid having to save the data on s3 and
then read again.

What would be the easiest way to hack around it ? Do I need to implement
the Datasource API ?

Are there examples on how to create a DataSource from a REST endpoint ?

Best,
Ayoub


Re: RDD[Future[T]] = Future[RDD[T]]

2015-07-26 Thread Ayoub Benali
It doesn't work because mapPartitions expects a function f:(Iterator[T]) ⇒
Iterator[U] while .sequence wraps the iterator in a Future

2015-07-26 22:25 GMT+02:00 Ignacio Blasco elnopin...@gmail.com:

 Maybe using mapPartitions and .sequence inside it?
 El 26/7/2015 10:22 p. m., Ayoub benali.ayoub.i...@gmail.com escribió:

 Hello,

 I am trying to convert the result I get after doing some async IO :

 val rdd: RDD[T] = // some rdd

 val result: RDD[Future[T]] = rdd.map(httpCall)

 Is there a way collect all futures once they are completed in a *non
 blocking* (i.e. without scala.concurrent
 Await) and lazy way?

 If the RDD was a standard scala collection then calling
 scala.concurrent.Future.sequence would have resolved the issue but RDD
 is
 not a TraversableOnce (which is required by the method).

 Is there a way to do this kind of transformation with an RDD[Future[T]] ?

 Thanks,
 Ayoub.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/RDD-Future-T-Future-RDD-T-tp24000.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: SQL JSON array operations

2015-01-15 Thread Ayoub Benali
You could try yo use hive context which bring HiveQL, it would allow you to
query nested structures using LATERAL VIEW explode...
On Jan 15, 2015 4:03 PM, jvuillermet jeremy.vuiller...@gmail.com wrote:

 let's say my json file lines looks like this

 {user: baz, tags : [foo, bar] }
 

 sqlContext.jsonFile(data.json)
 ...
 How could I query for user with bar tags using SQL

 sqlContext.sql(select user from users where tags ?contains? 'bar' )

 I could simplify the request and use the returned RDD to filter on tags but
 I'm exploring an app where users can write their SQL queries




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/SQL-JSON-array-operations-tp21164.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Parquet compression codecs not applied

2015-01-10 Thread Ayoub Benali
it worked thanks.

this doc page
https://spark.apache.org/docs/1.2.0/sql-programming-guide.htmlrecommends
to use spark.sql.parquet.compression.codec to set the compression coded
and I thought this setting would be forwarded to the hive context given
that HiveContext extends SQLContext, but it was not.

I am wondering if this behavior is normal, if not I could open an issue
with a potential fix so that spark.sql.parquet.compression.codec would be
translated to parquet.compression in the hive context ?

Or the documentation should be updated to mention that the compression
coded is set differently with HiveContext.

Ayoub.



2015-01-09 17:51 GMT+01:00 Michael Armbrust mich...@databricks.com:

 This is a little confusing, but that code path is actually going through
 hive.  So the spark sql configuration does not help.

 Perhaps, try:
 set parquet.compression=GZIP;

 On Fri, Jan 9, 2015 at 2:41 AM, Ayoub benali.ayoub.i...@gmail.com wrote:

 Hello,

 I tried to save a table created via the hive context as a parquet file but
 whatever compression codec (uncompressed, snappy, gzip or lzo) I set via
 setConf like:

 setConf(spark.sql.parquet.compression.codec, gzip)

 the size of the generated files is the always the same, so it seems like
 spark context ignores the compression codec that I set.

 Here is a code sample applied via the spark shell:

 import org.apache.spark.sql.hive.HiveContext
 val hiveContext = new HiveContext(sc)

 hiveContext.sql(SET hive.exec.dynamic.partition = true)
 hiveContext.sql(SET hive.exec.dynamic.partition.mode = nonstrict)
 hiveContext.setConf(spark.sql.parquet.binaryAsString, true) //
 required
 to make data compatible with impala
 hiveContext.setConf(spark.sql.parquet.compression.codec, gzip)

 hiveContext.sql(create external table if not exists foo (bar STRING, ts
 INT) Partitioned by (year INT, month INT, day INT) STORED AS PARQUET
 Location 'hdfs://path/data/foo')

 hiveContext.sql(insert into table foo partition(year, month,day) select
 *,
 year(from_unixtime(ts)) as year, month(from_unixtime(ts)) as month,
 day(from_unixtime(ts)) as day from raw_foo)

 I tried that with spark 1.2 and 1.3 snapshot against hive 0.13
 and I also tried that with Impala on the same cluster which applied
 correctly the compression codecs.

 Does anyone know what could be the problem ?

 Thanks,
 Ayoub.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-compression-codecs-not-applied-tp21058.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Parquet compression codecs not applied

2015-01-08 Thread Ayoub Benali
Hello,

I tried to save a table created via the hive context as a parquet file but
whatever compression codec (uncompressed, snappy, gzip or lzo) I set via
setConf like:

setConf(spark.sql.parquet.compression.codec, gzip)

the size of the generated files is the always the same, so it seems like
spark context ignores the compression codec that I set.

Here is a code sample applied via the spark shell:

import org.apache.spark.sql.hive.HiveContext
val hiveContext = new HiveContext(sc)

hiveContext.sql(SET hive.exec.dynamic.partition = true)
hiveContext.sql(SET hive.exec.dynamic.partition.mode = nonstrict)
hiveContext.setConf(spark.sql.parquet.binaryAsString, true) // required
to make data compatible with impala
hiveContext.setConf(spark.sql.parquet.compression.codec, gzip)

hiveContext.sql(create external table if not exists foo (bar STRING, ts
INT) Partitioned by (year INT, month INT, day INT) STORED AS PARQUET
Location 'hdfs://path/data/foo')

hiveContext.sql(insert into table foo partition(year, month,day) select *,
year(from_unixtime(ts)) as year, month(from_unixtime(ts)) as month,
day(from_unixtime(ts)) as day from raw_foo)

I tried that with spark 1.2 and 1.3 snapshot against hive 0.13
and I also tried that with Impala on the same cluster which applied
correctly the compression codecs.

Does anyone know what could be the problem ?

Thanks,
Ayoub.