Re: spark 2.0 readStream from a REST API

2016-08-02 Thread Ayoub Benali
Why writeStream is needed to consume the data ?

When I tried it I got this exception:

INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
> org.apache.spark.sql.AnalysisException: Complete output mode not supported
> when there are no streaming aggregations on streaming DataFrames/Datasets;
> at
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:173)
> at
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:65)
> at
> org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:236)
> at
> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:287)
> at .(:59)




2016-08-01 18:44 GMT+02:00 Amit Sela <amitsel...@gmail.com>:

> I think you're missing:
>
> val query = wordCounts.writeStream
>
>   .outputMode("complete")
>   .format("console")
>   .start()
>
> Dis it help ?
>
> On Mon, Aug 1, 2016 at 2:44 PM Jacek Laskowski <ja...@japila.pl> wrote:
>
>> On Mon, Aug 1, 2016 at 11:01 AM, Ayoub Benali
>> <benali.ayoub.i...@gmail.com> wrote:
>>
>> > the problem now is that when I consume the dataframe for example with
>> count
>> > I get the stack trace below.
>>
>> Mind sharing the entire pipeline?
>>
>> > I followed the implementation of TextSocketSourceProvider to implement
>> my
>> > data source and Text Socket source is used in the official documentation
>> > here.
>>
>> Right. Completely forgot about the provider. Thanks for reminding me
>> about it!
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: spark 2.0 readStream from a REST API

2016-08-02 Thread Ayoub Benali
Hello,

here is the code I am trying to run:


https://gist.github.com/ayoub-benali/a96163c711b4fce1bdddf16b911475f2

Thanks,
Ayoub.

2016-08-01 13:44 GMT+02:00 Jacek Laskowski <ja...@japila.pl>:

> On Mon, Aug 1, 2016 at 11:01 AM, Ayoub Benali
> <benali.ayoub.i...@gmail.com> wrote:
>
> > the problem now is that when I consume the dataframe for example with
> count
> > I get the stack trace below.
>
> Mind sharing the entire pipeline?
>
> > I followed the implementation of TextSocketSourceProvider to implement my
> > data source and Text Socket source is used in the official documentation
> > here.
>
> Right. Completely forgot about the provider. Thanks for reminding me about
> it!
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>


Re: spark 2.0 readStream from a REST API

2016-08-01 Thread Ayoub Benali
Hello,

using the full class name worked, thanks.

the problem now is that when I consume the dataframe for example with count
I get the stack trace below.

I followed the implementation of TextSocketSourceProvider
<https://github.com/apache/spark/blob/9e2c763dbb5ac6fc5d2eb0759402504d4b9073a4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala#L128>
to
implement my data source and Text Socket source is used in the official
documentation here
<https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#quick-example>
.

Why does count works in the example documentation? is there some other
trait that need to be implemented ?

Thanks,
Ayoub.


org.apache.spark.sql.AnalysisException: Queries with streaming sources must
> be executed with writeStream.start();
> at
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:173)
> at
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:33)
> at
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:31)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125)
> at
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:31)
> at
> org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:59)
> at
> org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:70)
> at
> org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:68)
> at
> org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:74)
> at
> org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:74)
> at
> org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:78)
> at
> org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:76)
> at
> org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:83)
> at
> org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:83)
> at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2541)
> at org.apache.spark.sql.Dataset.count(Dataset.scala:2216)







2016-07-31 21:56 GMT+02:00 Michael Armbrust <mich...@databricks.com>:

> You have to add a file in resource too (example
> <https://github.com/apache/spark/blob/master/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister>).
> Either that or give a full class name.
>
> On Sun, Jul 31, 2016 at 9:45 AM, Ayoub Benali <benali.ayoub.i...@gmail.com
> > wrote:
>
>> Looks like the way to go in spark 2.0 is to implement
>> StreamSourceProvider
>> <https://github.com/apache/spark/blob/9e2c763dbb5ac6fc5d2eb0759402504d4b9073a4/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L117>
>>  with DataSourceRegister
>> <https://github.com/apache/spark/blob/9e2c763dbb5ac6fc5d2eb0759402504d4b9073a4/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L40>.
>> But now spark fails at loading the class when doing:
>>
>> spark.readStream.format("mysource").load()
>>
>> I get :
>>
>> java.lang.ClassNotFoundException: Failed to find data source: mysource.
>> Please find packages at http://spark-packages.org
>>
>> Is there something I need to do in order to "load" the Stream source
>> provider ?
>>
>> Thanks,
>> Ayoub
>>
>> 2016-07-31 17:19 GMT+02:00 Jacek Laskowski <ja...@japila.pl>:
>>
>>> On Sun, Jul 31, 2016 at 12:53 PM, Ayoub Benali
>>> <benali.ayoub.i...@gmail.com> wrote:
>>>
>>> > I started playing with the Structured Streaming API in spark 2.0 and I
>>> am
>>> > looking for a way to create streaming Dataset/Dataframe from a rest
>>> HTTP
>>> > endpoint but I am bit stuck.
>>>
>>> What a great idea! Why did I myself not think about this?!?!
>>>
>>> > What would be the easiest way to hack around it ? Do

Re: spark 2.0 readStream from a REST API

2016-07-31 Thread Ayoub Benali
Looks like the way to go in spark 2.0 is to implement StreamSourceProvider
<https://github.com/apache/spark/blob/9e2c763dbb5ac6fc5d2eb0759402504d4b9073a4/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L117>
 with DataSourceRegister
<https://github.com/apache/spark/blob/9e2c763dbb5ac6fc5d2eb0759402504d4b9073a4/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L40>.
But now spark fails at loading the class when doing:

spark.readStream.format("mysource").load()

I get :

java.lang.ClassNotFoundException: Failed to find data source: mysource.
Please find packages at http://spark-packages.org

Is there something I need to do in order to "load" the Stream source
provider ?

Thanks,
Ayoub

2016-07-31 17:19 GMT+02:00 Jacek Laskowski <ja...@japila.pl>:

> On Sun, Jul 31, 2016 at 12:53 PM, Ayoub Benali
> <benali.ayoub.i...@gmail.com> wrote:
>
> > I started playing with the Structured Streaming API in spark 2.0 and I am
> > looking for a way to create streaming Dataset/Dataframe from a rest HTTP
> > endpoint but I am bit stuck.
>
> What a great idea! Why did I myself not think about this?!?!
>
> > What would be the easiest way to hack around it ? Do I need to implement
> the
> > Datasource API ?
>
> Yes and perhaps Hadoop API too, but not sure which one exactly since I
> haven't even thought about it (not even once).
>
> > Are there examples on how to create a DataSource from a REST endpoint ?
>
> Never heard of one.
>
> I'm hosting a Spark/Scala meetup this week so I'll definitely propose
> it as a topic. Thanks a lot!
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>


spark 2.0 readStream from a REST API

2016-07-31 Thread Ayoub Benali
Hello,

I started playing with the Structured Streaming API in spark 2.0 and I am
looking for a way to create streaming Dataset/Dataframe from a rest HTTP
endpoint but I am bit stuck.

"readStream" in SparkSession has a json method but this one is expecting a
path (s3, hdfs, etc) and I want to avoid having to save the data on s3 and
then read again.

What would be the easiest way to hack around it ? Do I need to implement
the Datasource API ?

Are there examples on how to create a DataSource from a REST endpoint ?

Best,
Ayoub


[spark1.5.1] HiveQl.parse throws org.apache.spark.sql.AnalysisException: null

2015-10-20 Thread Ayoub
Hello,

when upgrading to spark 1.5.1 from 1.4.1 the following code crashed on
runtime. It is mainly used to parse HiveQL queries and check that they are
valid.

package org.apache.spark.sql.hive

val sql = "CREATE EXTERNAL TABLE IF NOT EXISTS `t`(`id` STRING, `foo` INT)
PARTITIONED BY (year INT, month INT, day INT) STORED AS PARQUET Location
'temp'"

HiveQl.parseSql(sql)

org.apache.spark.sql.AnalysisException: null;
at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:303)
at
org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41)
at
org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:40)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at 
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at 
scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
at
scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
at
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:34)
at org.apache.spark.sql.hive.HiveQl$.parseSql(HiveQl.scala:277)
at
org.apache.spark.sql.hive.SQLChecker$$anonfun$1.apply(SQLChecker.scala:9)
at
org.apache.spark.sql.hive.SQLChecker$$anonfun$1.apply(SQLChecker.scala:9)

Should that be done differently on spark 1.5.1 ? 

Thanks,
Ayoub





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark1-5-1-HiveQl-parse-throws-org-apache-spark-sql-AnalysisException-null-tp25138.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RDD[Future[T]] = Future[RDD[T]]

2015-07-27 Thread Ayoub
do you mean something like this ?

val values = rdd.mapPartitions{ i: Iterator[Future[T]] =
   val future: Future[Iterator[T]] = Future sequence i
   Await result (future, someTimeout)
 }


Where is the blocking happening in this case? It seems to me that all the
workers will be blocked until the future is completed, no ?

2015-07-27 7:24 GMT+02:00 Nick Pentreath nick.pentre...@gmail.com:

 You could use Iterator.single on the future[iterator].

 However if you collect all the partitions I'm not sure if it will work
 across executor boundaries. Perhaps you may need to await the sequence of
 futures in each partition and return the resulting iterator.

 —
 Sent from Mailbox https://www.dropbox.com/mailbox


 On Sun, Jul 26, 2015 at 10:43 PM, Ayoub Benali 
 benali.ayoub.i...@gmail.com wrote:

 It doesn't work because mapPartitions expects a function f:(Iterator[T])
 ⇒ Iterator[U] while .sequence wraps the iterator in a Future

 2015-07-26 22:25 GMT+02:00 Ignacio Blasco elnopin...@gmail.com:

 Maybe using mapPartitions and .sequence inside it?
 El 26/7/2015 10:22 p. m., Ayoub benali.ayoub.i...@gmail.com
 escribió:

 Hello,

 I am trying to convert the result I get after doing some async IO :

 val rdd: RDD[T] = // some rdd

 val result: RDD[Future[T]] = rdd.map(httpCall)

 Is there a way collect all futures once they are completed in a *non
 blocking* (i.e. without scala.concurrent
 Await) and lazy way?

 If the RDD was a standard scala collection then calling
 scala.concurrent.Future.sequence would have resolved the issue but
 RDD is
 not a TraversableOnce (which is required by the method).

 Is there a way to do this kind of transformation with an RDD[Future[T]]
 ?

 Thanks,
 Ayoub.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/RDD-Future-T-Future-RDD-T-tp24000.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org








--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Re-RDD-Future-T-Future-RDD-T-tp24005.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: RDD[Future[T]] = Future[RDD[T]]

2015-07-26 Thread Ayoub Benali
It doesn't work because mapPartitions expects a function f:(Iterator[T]) ⇒
Iterator[U] while .sequence wraps the iterator in a Future

2015-07-26 22:25 GMT+02:00 Ignacio Blasco elnopin...@gmail.com:

 Maybe using mapPartitions and .sequence inside it?
 El 26/7/2015 10:22 p. m., Ayoub benali.ayoub.i...@gmail.com escribió:

 Hello,

 I am trying to convert the result I get after doing some async IO :

 val rdd: RDD[T] = // some rdd

 val result: RDD[Future[T]] = rdd.map(httpCall)

 Is there a way collect all futures once they are completed in a *non
 blocking* (i.e. without scala.concurrent
 Await) and lazy way?

 If the RDD was a standard scala collection then calling
 scala.concurrent.Future.sequence would have resolved the issue but RDD
 is
 not a TraversableOnce (which is required by the method).

 Is there a way to do this kind of transformation with an RDD[Future[T]] ?

 Thanks,
 Ayoub.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/RDD-Future-T-Future-RDD-T-tp24000.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




RDD[Future[T]] = Future[RDD[T]]

2015-07-26 Thread Ayoub
Hello,

I am trying to convert the result I get after doing some async IO :

val rdd: RDD[T] = // some rdd 

val result: RDD[Future[T]] = rdd.map(httpCall)

Is there a way collect all futures once they are completed in a *non
blocking* (i.e. without scala.concurrent
Await) and lazy way?

If the RDD was a standard scala collection then calling
scala.concurrent.Future.sequence would have resolved the issue but RDD is
not a TraversableOnce (which is required by the method).

Is there a way to do this kind of transformation with an RDD[Future[T]] ?

Thanks,
Ayoub.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RDD-Future-T-Future-RDD-T-tp24000.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[spark1.4] sparkContext.stop causes exception on Mesos

2015-07-03 Thread Ayoub
Hello Spark developers, 

After upgrading to spark 1.4 on Mesos 0.22.1 existing code started to throw
this exception when calling sparkContext.stop :

(SparkListenerBus) [ERROR -
org.apache.spark.Logging$class.logError(Logging.scala:96)] Listener
EventLoggingListener threw an exception 
java.lang.reflect.InvocationTargetException 
at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source) 
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 
at java.lang.reflect.Method.invoke(Method.java:606) 
at
org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:146)
 
at
org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:146)
 
at scala.Option.foreach(Option.scala:236) 
at
org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:146)
 
at
org.apache.spark.scheduler.EventLoggingListener.onApplicationEnd(EventLoggingListener.scala:190)
 
at
org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:54)
 
at
org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
 
at
org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
 
at
org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:56) 
at
org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:37)
 
at
org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:79)
 
at
org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1215) 
at
org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63)
 
Caused by: java.io.IOException: Filesystem closed 
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:730) 
at
org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:1855) 
at
org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1816) 
at
org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130) 
... 16 more 
I0701 15:03:46.101809  1612 sched.cpp:1589] Asked to stop the driver 
I0701 15:03:46.101971  1355 sched.cpp:831] Stopping framework
'20150629-132734-1224736778-5050-6126-0028'


This problems happens only when spark.eventLog.enabled flag is set to true,
it happens also if sparkContext.stop is omitted in the code, I think because
Spark shut down indirectly the spark context. 

Does anyone know what could cause this problem ?

Thanks,
Ayoub.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark1-4-sparkContext-stop-causes-exception-on-Mesos-tp23605.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RDD to InputStream

2015-03-18 Thread Ayoub
In case it would interest other peoples, here is what I come up with and it
seems to work fine:

  case class RDDAsInputStream(private val rdd: RDD[String]) extends
java.io.InputStream {
var bytes = rdd.flatMap(_.getBytes(UTF-8)).toLocalIterator

def read(): Int = {
  if(bytes.hasNext) bytes.next.toInt
  else -1
}
override def markSupported(): Boolean = false
  }


2015-03-13 13:56 GMT+01:00 Sean Owen so...@cloudera.com:

 OK, then you do not want to collect() the RDD. You can get an iterator,
 yes.
 There is no such thing as making an Iterator into an InputStream. An
 Iterator is a sequence of arbitrary objects; an InputStream is a
 channel to a stream of bytes.
 I think you can employ similar Guava / Commons utilities to make an
 Iterator of Streams in a stream of Readers, join the Readers, and
 encode the result as bytes in an InputStream.

 On Fri, Mar 13, 2015 at 10:33 AM, Ayoub benali.ayoub.i...@gmail.com
 wrote:
  Thanks Sean,
 
  I forgot to mention that the data is too big to be collected on the
 driver.
 
  So yes your proposition would work in theory but in my case I cannot hold
  all the data in the driver memory, therefore it wouldn't work.
 
  I guess the crucial point to to do the collect in a lazy way and in that
  subject I noticed that we can get a local iterator from an RDD but that
  rises two questions:
 
  - does that involves an mediate collect just like with collect() or is
 it
  lazy process ?
  - how to go from an iterator to an InputStream ?
 
 
  2015-03-13 11:17 GMT+01:00 Sean Owen [hidden email]:
 
  These are quite different creatures. You have a distributed set of
  Strings, but want a local stream of bytes, which involves three
  conversions:
 
  - collect data to driver
  - concatenate strings in some way
  - encode strings as bytes according to an encoding
 
  Your approach is OK but might be faster to avoid disk, if you have
  enough memory:
 
  - collect() to a Array[String] locally
  - use Guava utilities to turn a bunch of Strings into a Reader
  - Use the Apache Commons ReaderInputStream to read it as encoded bytes
 
  I might wonder if that's all really what you want to do though.
 
 
  On Fri, Mar 13, 2015 at 9:54 AM, Ayoub [hidden email] wrote:
   Hello,
  
   I need to convert an RDD[String] to a java.io.InputStream but I didn't
   find
   an east way to do it.
   Currently I am saving the RDD as temporary file and then opening an
   inputstream on the file but that is not really optimal.
  
   Does anybody know a better way to do that ?
  
   Thanks,
   Ayoub.
  
  
  
   --
   View this message in context:
  
 http://apache-spark-user-list.1001560.n3.nabble.com/RDD-to-InputStream-tp22031.html
   Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
  
   -
   To unsubscribe, e-mail: [hidden email]
   For additional commands, e-mail: [hidden email]
  
 
 
 
  
  View this message in context: Re: RDD to InputStream
 
  Sent from the Apache Spark User List mailing list archive at Nabble.com.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Re-RDD-to-InputStream-tp22121.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: RDD to InputStream

2015-03-13 Thread Ayoub
Thanks Sean,

I forgot to mention that the data is too big to be collected on the driver.

So yes your proposition would work in theory but in my case I cannot hold
all the data in the driver memory, therefore it wouldn't work.

I guess the crucial point to to do the collect in a lazy way and in that
subject I noticed that we can get a local iterator from an RDD but that
rises two questions:

- does that involves an mediate collect just like with collect() or is it
lazy process ?
- how to go from an iterator to an InputStream ?


2015-03-13 11:17 GMT+01:00 Sean Owen so...@cloudera.com:

 These are quite different creatures. You have a distributed set of
 Strings, but want a local stream of bytes, which involves three
 conversions:

 - collect data to driver
 - concatenate strings in some way
 - encode strings as bytes according to an encoding

 Your approach is OK but might be faster to avoid disk, if you have
 enough memory:

 - collect() to a Array[String] locally
 - use Guava utilities to turn a bunch of Strings into a Reader
 - Use the Apache Commons ReaderInputStream to read it as encoded bytes

 I might wonder if that's all really what you want to do though.


 On Fri, Mar 13, 2015 at 9:54 AM, Ayoub benali.ayoub.i...@gmail.com
 wrote:
  Hello,
 
  I need to convert an RDD[String] to a java.io.InputStream but I didn't
 find
  an east way to do it.
  Currently I am saving the RDD as temporary file and then opening an
  inputstream on the file but that is not really optimal.
 
  Does anybody know a better way to do that ?
 
  Thanks,
  Ayoub.
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/RDD-to-InputStream-tp22031.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Re-RDD-to-InputStream-tp22032.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: [hive context] Unable to query array once saved as parquet

2015-02-12 Thread Ayoub
Hi,

as I was trying to find a work around until this bug will be fixed, I
discovered an other bug posted here:
https://issues.apache.org/jira/browse/SPARK-5775

For those who might had the same issue, one could use the LOAD sql
command in a hive context to load the parquet file into the table as long
it not partitioned. The queries work fine after that.

Best,
Ayoub.

2015-01-31 4:05 GMT+01:00 Cheng Lian lian.cs@gmail.com:

  According to the Gist Ayoub provided, the schema is fine. I reproduced
 this issue locally, it should be bug, but I don't think it's related to
 SPARK-5236. Will investigate this soon.

 Ayoub - would you mind to help to file a JIRA for this issue? Thanks!

 Cheng


 On 1/30/15 11:28 AM, Michael Armbrust wrote:

 Is it possible that your schema contains duplicate columns or column with
 spaces in the name?  The parquet library will often give confusing error
 messages in this case.

 On Fri, Jan 30, 2015 at 10:33 AM, Ayoub benali.ayoub.i...@gmail.com
 wrote:

  Hello,

 I have a problem when querying, with a hive context on spark
 1.2.1-snapshot, a column in my table which is nested data structure like an
 array of struct.
 The problems happens only on the table stored as parquet, while querying
 the Schema RDD saved, as a temporary table, don't lead to any exception.

 my steps are:
 1) reading JSON file
 2) creating a schema RDD and saving it as a tmp table
 3) creating an external table in hive meta store saved as parquet file
 4) inserting the data from the tmp table to the persisted table
 5) queering the persisted table lead to this exception:

 select data.field1 from persisted_table LATERAL VIEW explode(data_array)
 nestedStuff AS data

 parquet.io.ParquetDecodingException: Can not read value at 0 in block -1
 in file hdfs://***/test_table/part-1
 at
 parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
 at
 parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
 at
 org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145)
 at
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 at scala.collection.TraversableOnce$class.to
 (TraversableOnce.scala:273)
 at http://scala.collection.AbstractIterator.to
 scala.collection.AbstractIterator.to(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
 at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
 at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797)
 at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797)
 at
 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
 at
 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:744)
 Caused by: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
 at java.util.ArrayList.rangeCheck(ArrayList.java:635)
 at java.util.ArrayList.get(ArrayList.java:411)
 at parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:99)
 at parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:99)
 at parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:99)
 at parquet.io.PrimitiveColumnIO.getFirst(PrimitiveColumnIO.java:99)
 at parquet.io.PrimitiveColumnIO.isFirst(PrimitiveColumnIO.java:94)
 at
 parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:274)
 at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
 at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
 at
 parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
 at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96

Re: Parquet compression codecs not applied

2015-02-04 Thread Ayoub
I was using hive context an not sql context, therefore (SET
spark.sql.parquet.compression.codec=gzip) was ignored.

Michael Armbrust pointed out that parquet.compression should be used
instead, witch solved the issue.

I am still wondering if this behavior is normal, it would be better if
spark.sql.parquet.compression.codec would be translated to
parquet.compression in case of hive context.
Other wise the documentation should be updated to be more precise.



2015-02-04 19:13 GMT+01:00 sahanbull sa...@skimlinks.com:

 Hi Ayoub,

 You could try using the sql format to set the compression type:

 sc = SparkContext()
 sqc = SQLContext(sc)
 sqc.sql(SET spark.sql.parquet.compression.codec=gzip)

 You get a notification on screen while running the spark job when you set
 the compression codec like this. I havent compared it with different
 compression methods, Please let the mailing list knows if this works for
 you.

 Best
 Sahan



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-compression-codecs-not-applied-tp21058p21498.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Re-Parquet-compression-codecs-not-applied-tp21499.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: How to get Hive table schema using Spark SQL or otherwise

2015-02-04 Thread Ayoub
Given a hive context you could execute:
hiveContext.sql(describe TABLE_NAME) you would get the name of the fields
and their types

2015-02-04 21:47 GMT+01:00 nitinkak001 nitinkak...@gmail.com:

 I want to get a Hive table schema details into Spark. Specifically, I want
 to
 get column name and type information. Is it possible to do it e.g using
 JavaSchemaRDD or something else?





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-Hive-table-schema-using-Spark-SQL-or-otherwise-tp21501.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Re-How-to-get-Hive-table-schema-using-Spark-SQL-or-otherwise-tp21502.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: [hive context] Unable to query array once saved as parquet

2015-02-02 Thread Ayoub
Hi,

given the current open issue:
https://issues.apache.org/jira/browse/SPARK-5508 I cannot use HiveQL to
insert schemaRDD data into a table if one of the columns is an Array of
Struct.

using the spark API, Is it possible to insert schema RDD into an existing
and *partitioned* table ?
the method insertInto on schema RDD does take only the name of the table.

Thanks,
Ayoub.

2015-01-31 22:30 GMT+01:00 Ayoub Benali benali.ayoub.i...@gmail.com:

 Hello,

 as asked, I just filled this JIRA issue
 https://issues.apache.org/jira/browse/SPARK-5508.

 I will add an other similar code example which lead to GenericRow cannot
 be cast to org.apache.spark.sql.catalyst.expressions.SpecificMutableRow
 Exception.

 Best,
 Ayoub.


 2015-01-31 4:05 GMT+01:00 Cheng Lian lian.cs@gmail.com:

  According to the Gist Ayoub provided, the schema is fine. I reproduced
 this issue locally, it should be bug, but I don't think it's related to
 SPARK-5236. Will investigate this soon.

 Ayoub - would you mind to help to file a JIRA for this issue? Thanks!

 Cheng

 On 1/30/15 11:28 AM, Michael Armbrust wrote:

 Is it possible that your schema contains duplicate columns or column with
 spaces in the name?  The parquet library will often give confusing error
 messages in this case.

 On Fri, Jan 30, 2015 at 10:33 AM, Ayoub benali.ayoub.i...@gmail.com
 wrote:

  Hello,

 I have a problem when querying, with a hive context on spark
 1.2.1-snapshot, a column in my table which is nested data structure like an
 array of struct.
 The problems happens only on the table stored as parquet, while querying
 the Schema RDD saved, as a temporary table, don't lead to any exception.

 my steps are:
 1) reading JSON file
 2) creating a schema RDD and saving it as a tmp table
 3) creating an external table in hive meta store saved as parquet file
 4) inserting the data from the tmp table to the persisted table
 5) queering the persisted table lead to this exception:

 select data.field1 from persisted_table LATERAL VIEW
 explode(data_array) nestedStuff AS data

 parquet.io.ParquetDecodingException: Can not read value at 0 in block -1
 in file hdfs://***/test_table/part-1
 at
 parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
 at
 parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
 at
 org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145)
 at
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 at scala.collection.TraversableOnce$class.to
 (TraversableOnce.scala:273)
 at http://scala.collection.AbstractIterator.to
 scala.collection.AbstractIterator.to(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
 at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
 at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797)
 at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797)
 at
 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
 at
 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:744)
 Caused by: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
 at java.util.ArrayList.rangeCheck(ArrayList.java:635)
 at java.util.ArrayList.get(ArrayList.java:411)
 at parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:99)
 at parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:99)
 at parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:99)
 at parquet.io.PrimitiveColumnIO.getFirst(PrimitiveColumnIO.java:99)
 at parquet.io.PrimitiveColumnIO.isFirst(PrimitiveColumnIO.java:94

Re: [hive context] Unable to query array once saved as parquet

2015-01-31 Thread Ayoub
Hello,

as asked, I just filled this JIRA issue
https://issues.apache.org/jira/browse/SPARK-5508.

I will add an other similar code example which lead to GenericRow cannot
be cast to org.apache.spark.sql.catalyst.expressions.SpecificMutableRow
Exception.

Best,
Ayoub.

2015-01-31 4:05 GMT+01:00 Cheng Lian lian.cs@gmail.com:

  According to the Gist Ayoub provided, the schema is fine. I reproduced
 this issue locally, it should be bug, but I don't think it's related to
 SPARK-5236. Will investigate this soon.

 Ayoub - would you mind to help to file a JIRA for this issue? Thanks!

 Cheng

 On 1/30/15 11:28 AM, Michael Armbrust wrote:

 Is it possible that your schema contains duplicate columns or column with
 spaces in the name?  The parquet library will often give confusing error
 messages in this case.

 On Fri, Jan 30, 2015 at 10:33 AM, Ayoub benali.ayoub.i...@gmail.com
 wrote:

  Hello,

 I have a problem when querying, with a hive context on spark
 1.2.1-snapshot, a column in my table which is nested data structure like an
 array of struct.
 The problems happens only on the table stored as parquet, while querying
 the Schema RDD saved, as a temporary table, don't lead to any exception.

 my steps are:
 1) reading JSON file
 2) creating a schema RDD and saving it as a tmp table
 3) creating an external table in hive meta store saved as parquet file
 4) inserting the data from the tmp table to the persisted table
 5) queering the persisted table lead to this exception:

 select data.field1 from persisted_table LATERAL VIEW explode(data_array)
 nestedStuff AS data

 parquet.io.ParquetDecodingException: Can not read value at 0 in block -1
 in file hdfs://***/test_table/part-1
 at
 parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
 at
 parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
 at
 org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145)
 at
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 at scala.collection.TraversableOnce$class.to
 (TraversableOnce.scala:273)
 at http://scala.collection.AbstractIterator.to
 scala.collection.AbstractIterator.to(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
 at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
 at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797)
 at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797)
 at
 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
 at
 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:744)
 Caused by: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
 at java.util.ArrayList.rangeCheck(ArrayList.java:635)
 at java.util.ArrayList.get(ArrayList.java:411)
 at parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:99)
 at parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:99)
 at parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:99)
 at parquet.io.PrimitiveColumnIO.getFirst(PrimitiveColumnIO.java:99)
 at parquet.io.PrimitiveColumnIO.isFirst(PrimitiveColumnIO.java:94)
 at
 parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:274)
 at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
 at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
 at
 parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
 at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
 at
 parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126

Re: HiveContext created SchemaRDD's saveAsTable is not working on 1.2.0

2015-01-30 Thread Ayoub
I am not personally aware of a repo for snapshot builds.
In my use case, I had to build spark 1.2.1-snapshot

see https://spark.apache.org/docs/latest/building-spark.html

2015-01-30 17:11 GMT+01:00 Debajyoti Roy debajyoti@healthagen.com:

 Thanks Ayoub and Zhan,
 I am new to spark and wanted to make sure i am not trying something stupid
 or using a wrong API.

 Is there a repo where i can pull the snapshot or nighly builds for spark ?



 On Fri, Jan 30, 2015 at 2:45 AM, Ayoub Benali benali.ayoub.i...@gmail.com
  wrote:

 Hello,

 I had the same issue then I found this JIRA ticket
 https://issues.apache.org/jira/browse/SPARK-4825
 So I switched to Spark 1.2.1-snapshot witch solved the problem.



 2015-01-30 8:40 GMT+01:00 Zhan Zhang zzh...@hortonworks.com:

  I think it is expected. Refer to the comments in saveAsTable Note that
 this currently only works with SchemaRDDs that are created from a
 HiveContext. If I understand correctly, here the SchemaRDD means those
 generated by HiveContext.sql, instead of applySchema.

  Thanks.

  Zhan Zhang



  On Jan 29, 2015, at 9:38 PM, matroyd debajyoti@healthagen.com
 wrote:

 Hi, I am trying saveAsTable on SchemaRDD created from HiveContext and it
 fails. This is on Spark 1.2.0. Following are details of the code, command
 and exceptions:
 http://stackoverflow.com/questions/28222496/how-to-enable-sql-on-schemardd-via-the-jdbc-interface-is-it-even-possible
 http://cp.mcafee.com/d/5fHCMUe6zqb2qrVEVso73ztPqdT7HLIcII6QrK6zBxNB55cSztNWXX3bbVJ6XxEVvjKOMedywEjS1454WNDm1yIiJendAWNDm1yIiJendS76zB4xpx_HYO-OMe7tuVtdBBzD1NEVKCqeumKzp55l55zBgY-F6lK1FJ4SOrLOtXTLuZXTdTdw0WjSNmFDUKDRcsLwITZ9OH2C9L9FFI6zBP2tj1uti_MQwvVuwIunH0LiNfRtywwncOQLcDVshg8mbOxfUKXrOYG5Vjb-p-1tqJai87-rrFYiYvCT61tdZxZYKa2xfo4jytoPH0Nm9mDbwGySNaZGSS9_M04SOevpdxuhDNaI9-7Pd45E_I_gd40NoDWKwIe3zhfgQgkQPUxg4WgfYQgiEq88lCq835oBpg8Cy2I3h0zOvndK3zsm65OVJP
 Thanks in advance for any guidance
 --
 View this message in context: HiveContext created SchemaRDD's
 saveAsTable is not working on 1.2.0
 http://cp.mcafee.com/d/FZsScxNJ5xddYQsKc3xNKVJ6XzRTS6mm3qdT3hOMUOyyCrhKUZtZxBBYSztMQsLFTpo76Ngk9X0y2ytoPH0Nm9mDbCOtoPH0Nm9mDbCX3zhOygIM_R-pvpo73KLsKCOONPwUQsTjd7fbnhIyyGyyNOEuvkzaT0QSCrpdTVeZXTLuZXCXCM0uHroDVuySNaBSWv4KvaA-hLt5ZO_gpW6A21_YLwnApYiH2vxYOZE_I_gbz5yvGW2MUebQbQQPUxg4WgfYLaxu5pBP5oBpg8BWMbEBQOZOYRAQm6me1NJKDNbN-rso5QTS7TOUEa4Zwhe9RzeI35oBqsK2Gbr4HSHroD_00jr8VZAS5V6v4GMDUvcQgmz-PZ0Qg35yvGW2MUed4Z3h1jjfy50jF0_Ph1axEwxmpEwclylB0yq8aMd42f9ZsSUedGaVp
 Sent from the Apache Spark User List mailing list archive
 http://cp.mcafee.com/d/k-Kr43qb2qrVEVso73ztPqdT7HLIcII6QrK6zBxNB55cSztNWXX3bbVJ6XxEVvjKOMedywEjS1454WNDm1yIiJendAWNDm1yIiJendS76zB4xpx_HYO-OMe7tuVtdBBzD1NEVKCqeumKzp55l55zBgY-F6lK1FJASOrLOtXTLuZXTdTdw0ZmSNfOZ5JylbJQ-9s-l9YzuWbXB-wPQd843_Vv3rtfynzYSUMbFLIfLBNgk9X0ysjH6to6aNaQVs5kmS9nJmSNf-00CShPX9IbOc-9lxfM-pEwJ7ZDW1Ew6b4_lQ5xMsq9W6y2CCv4a0Di1_Cy2l3h12IPh0oH4Ha14Qglwq84ujWVJMsrlDSD
 at Nabble.com.






 --
 Thanks,

 *Debajyoti Roy*
 debajyoti@healthagen.com
 (646)561-0844 646-561-0844
 350 Madison Ave., FL 16,
 New York, NY 10017.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Re-HiveContext-created-SchemaRDD-s-saveAsTable-is-not-working-on-1-2-0-tp21442.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

[hive context] Unable to query array once saved as parquet

2015-01-30 Thread Ayoub
(DAGScheduler.scala:696)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

The full code leading to this issue is available here: gist
https://gist.github.com/ayoub-benali/54d6f3b8635530e4e936

Could the problem comes from the way I insert the data into the table ?

Is this problem related to this JIRA ticket
https://issues.apache.org/jira/browse/SPARK-5236 ?

Because I got a similar exception GenericRow cannot be cast to
org.apache.spark.sql.catalyst.expressions.SpecificMutableRow With an other
table that contains also a array of struct.

Thanks,
Ayoub.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/hive-context-Unable-to-query-array-once-saved-as-parquet-tp21446.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: [hive context] Unable to query array once saved as parquet

2015-01-30 Thread Ayoub
No it is not the case, here is the gist to reproduce the issue
https://gist.github.com/ayoub-benali/54d6f3b8635530e4e936
On Jan 30, 2015 8:29 PM, Michael Armbrust mich...@databricks.com wrote:

 Is it possible that your schema contains duplicate columns or column with
 spaces in the name?  The parquet library will often give confusing error
 messages in this case.

 On Fri, Jan 30, 2015 at 10:33 AM, Ayoub benali.ayoub.i...@gmail.com
 wrote:

 Hello,

 I have a problem when querying, with a hive context on spark
 1.2.1-snapshot, a column in my table which is nested data structure like an
 array of struct.
 The problems happens only on the table stored as parquet, while querying
 the Schema RDD saved, as a temporary table, don't lead to any exception.

 my steps are:
 1) reading JSON file
 2) creating a schema RDD and saving it as a tmp table
 3) creating an external table in hive meta store saved as parquet file
 4) inserting the data from the tmp table to the persisted table
 5) queering the persisted table lead to this exception:

 select data.field1 from persisted_table LATERAL VIEW explode(data_array)
 nestedStuff AS data

 parquet.io.ParquetDecodingException: Can not read value at 0 in block -1
 in file hdfs://***/test_table/part-1
 at
 parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
 at
 parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
 at
 org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145)
 at
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 at scala.collection.TraversableOnce$class.to
 (TraversableOnce.scala:273)
 at http://scala.collection.AbstractIterator.to
 scala.collection.AbstractIterator.to(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
 at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
 at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797)
 at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797)
 at
 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
 at
 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:744)
 Caused by: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
 at java.util.ArrayList.rangeCheck(ArrayList.java:635)
 at java.util.ArrayList.get(ArrayList.java:411)
 at parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:99)
 at parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:99)
 at parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:99)
 at parquet.io.PrimitiveColumnIO.getFirst(PrimitiveColumnIO.java:99)
 at parquet.io.PrimitiveColumnIO.isFirst(PrimitiveColumnIO.java:94)
 at
 parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:274)
 at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
 at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
 at
 parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
 at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
 at
 parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126)
 at
 parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
 ... 28 more

 Driver stacktrace:
 at http://org.apache.spark.scheduler.DAGScheduler.org
 org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203

Re: SQL query over (Long, JSON string) tuples

2015-01-29 Thread Ayoub
Hello,

SQLContext and hiveContext have a jsonRDD method which accept an
RDD[String] where the string is a JSON String a returns a SchemaRDD, it
extends RDD[Row] which the type you want.

After words you should be able to do a join to keep your tuple.

Best,
Ayoub.

2015-01-29 10:12 GMT+01:00 Tobias Pfeiffer t...@preferred.jp:

 Hi,

 I have data as RDD[(Long, String)], where the Long is a timestamp and the
 String is a JSON-encoded string. I want to infer the schema of the JSON and
 then do a SQL statement on the data (no aggregates, just column selection
 and UDF application), but still have the timestamp associated with each row
 of the result. I completely fail to see how that would be possible. Any
 suggestions?

 I can't even see how I would get an RDD[(Long, Row)] so that I *might* be
 able to add the timestamp to the row after schema inference. Is there *any*
 way other than string-manipulating the JSON string and adding the timestamp
 to it?

 Thanks
 Tobias





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Re-SQL-query-over-Long-JSON-string-tuples-tp21419.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Fwd: HiveContext created SchemaRDD's saveAsTable is not working on 1.2.0

2015-01-29 Thread Ayoub
Hello,

I had the same issue then I found this JIRA ticket
https://issues.apache.org/jira/browse/SPARK-4825
So I switched to Spark 1.2.1-snapshot witch solved the problem.



2015-01-30 8:40 GMT+01:00 Zhan Zhang zzh...@hortonworks.com:

  I think it is expected. Refer to the comments in saveAsTable Note that
 this currently only works with SchemaRDDs that are created from a
 HiveContext. If I understand correctly, here the SchemaRDD means those
 generated by HiveContext.sql, instead of applySchema.

  Thanks.

  Zhan Zhang



  On Jan 29, 2015, at 9:38 PM, matroyd debajyoti@healthagen.com
 wrote:

 Hi, I am trying saveAsTable on SchemaRDD created from HiveContext and it
 fails. This is on Spark 1.2.0. Following are details of the code, command
 and exceptions:
 http://stackoverflow.com/questions/28222496/how-to-enable-sql-on-schemardd-via-the-jdbc-interface-is-it-even-possible
 http://cp.mcafee.com/d/5fHCMUe6zqb2qrVEVso73ztPqdT7HLIcII6QrK6zBxNB55cSztNWXX3bbVJ6XxEVvjKOMedywEjS1454WNDm1yIiJendAWNDm1yIiJendS76zB4xpx_HYO-OMe7tuVtdBBzD1NEVKCqeumKzp55l55zBgY-F6lK1FJ4SOrLOtXTLuZXTdTdw0WjSNmFDUKDRcsLwITZ9OH2C9L9FFI6zBP2tj1uti_MQwvVuwIunH0LiNfRtywwncOQLcDVshg8mbOxfUKXrOYG5Vjb-p-1tqJai87-rrFYiYvCT61tdZxZYKa2xfo4jytoPH0Nm9mDbwGySNaZGSS9_M04SOevpdxuhDNaI9-7Pd45E_I_gd40NoDWKwIe3zhfgQgkQPUxg4WgfYQgiEq88lCq835oBpg8Cy2I3h0zOvndK3zsm65OVJP
 Thanks in advance for any guidance
 --
 View this message in context: HiveContext created SchemaRDD's saveAsTable
 is not working on 1.2.0
 http://cp.mcafee.com/d/FZsScxNJ5xddYQsKc3xNKVJ6XzRTS6mm3qdT3hOMUOyyCrhKUZtZxBBYSztMQsLFTpo76Ngk9X0y2ytoPH0Nm9mDbCOtoPH0Nm9mDbCX3zhOygIM_R-pvpo73KLsKCOONPwUQsTjd7fbnhIyyGyyNOEuvkzaT0QSCrpdTVeZXTLuZXCXCM0uHroDVuySNaBSWv4KvaA-hLt5ZO_gpW6A21_YLwnApYiH2vxYOZE_I_gbz5yvGW2MUebQbQQPUxg4WgfYLaxu5pBP5oBpg8BWMbEBQOZOYRAQm6me1NJKDNbN-rso5QTS7TOUEa4Zwhe9RzeI35oBqsK2Gbr4HSHroD_00jr8VZAS5V6v4GMDUvcQgmz-PZ0Qg35yvGW2MUed4Z3h1jjfy50jF0_Ph1axEwxmpEwclylB0yq8aMd42f9ZsSUedGaVp
 Sent from the Apache Spark User List mailing list archive
 http://cp.mcafee.com/d/k-Kr43qb2qrVEVso73ztPqdT7HLIcII6QrK6zBxNB55cSztNWXX3bbVJ6XxEVvjKOMedywEjS1454WNDm1yIiJendAWNDm1yIiJendS76zB4xpx_HYO-OMe7tuVtdBBzD1NEVKCqeumKzp55l55zBgY-F6lK1FJASOrLOtXTLuZXTdTdw0ZmSNfOZ5JylbJQ-9s-l9YzuWbXB-wPQd843_Vv3rtfynzYSUMbFLIfLBNgk9X0ysjH6to6aNaQVs5kmS9nJmSNf-00CShPX9IbOc-9lxfM-pEwJ7ZDW1Ew6b4_lQ5xMsq9W6y2CCv4a0Di1_Cy2l3h12IPh0oH4Ha14Qglwq84ujWVJMsrlDSD
 at Nabble.com.







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Fwd-HiveContext-created-SchemaRDD-s-saveAsTable-is-not-working-on-1-2-0-tp21436.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: SQL JSON array operations

2015-01-15 Thread Ayoub Benali
You could try yo use hive context which bring HiveQL, it would allow you to
query nested structures using LATERAL VIEW explode...
On Jan 15, 2015 4:03 PM, jvuillermet jeremy.vuiller...@gmail.com wrote:

 let's say my json file lines looks like this

 {user: baz, tags : [foo, bar] }
 

 sqlContext.jsonFile(data.json)
 ...
 How could I query for user with bar tags using SQL

 sqlContext.sql(select user from users where tags ?contains? 'bar' )

 I could simplify the request and use the returned RDD to filter on tags but
 I'm exploring an app where users can write their SQL queries




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/SQL-JSON-array-operations-tp21164.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: SQL JSON array operations

2015-01-15 Thread Ayoub
You could try to use hive context which bring HiveQL, it would allow you to
query nested structures using LATERAL VIEW explode... 

see  doc
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+LateralView  
here 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SQL-JSON-array-operations-tp21164p21172.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Parquet compression codecs not applied

2015-01-10 Thread Ayoub Benali
it worked thanks.

this doc page
https://spark.apache.org/docs/1.2.0/sql-programming-guide.htmlrecommends
to use spark.sql.parquet.compression.codec to set the compression coded
and I thought this setting would be forwarded to the hive context given
that HiveContext extends SQLContext, but it was not.

I am wondering if this behavior is normal, if not I could open an issue
with a potential fix so that spark.sql.parquet.compression.codec would be
translated to parquet.compression in the hive context ?

Or the documentation should be updated to mention that the compression
coded is set differently with HiveContext.

Ayoub.



2015-01-09 17:51 GMT+01:00 Michael Armbrust mich...@databricks.com:

 This is a little confusing, but that code path is actually going through
 hive.  So the spark sql configuration does not help.

 Perhaps, try:
 set parquet.compression=GZIP;

 On Fri, Jan 9, 2015 at 2:41 AM, Ayoub benali.ayoub.i...@gmail.com wrote:

 Hello,

 I tried to save a table created via the hive context as a parquet file but
 whatever compression codec (uncompressed, snappy, gzip or lzo) I set via
 setConf like:

 setConf(spark.sql.parquet.compression.codec, gzip)

 the size of the generated files is the always the same, so it seems like
 spark context ignores the compression codec that I set.

 Here is a code sample applied via the spark shell:

 import org.apache.spark.sql.hive.HiveContext
 val hiveContext = new HiveContext(sc)

 hiveContext.sql(SET hive.exec.dynamic.partition = true)
 hiveContext.sql(SET hive.exec.dynamic.partition.mode = nonstrict)
 hiveContext.setConf(spark.sql.parquet.binaryAsString, true) //
 required
 to make data compatible with impala
 hiveContext.setConf(spark.sql.parquet.compression.codec, gzip)

 hiveContext.sql(create external table if not exists foo (bar STRING, ts
 INT) Partitioned by (year INT, month INT, day INT) STORED AS PARQUET
 Location 'hdfs://path/data/foo')

 hiveContext.sql(insert into table foo partition(year, month,day) select
 *,
 year(from_unixtime(ts)) as year, month(from_unixtime(ts)) as month,
 day(from_unixtime(ts)) as day from raw_foo)

 I tried that with spark 1.2 and 1.3 snapshot against hive 0.13
 and I also tried that with Impala on the same cluster which applied
 correctly the compression codecs.

 Does anyone know what could be the problem ?

 Thanks,
 Ayoub.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-compression-codecs-not-applied-tp21058.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Parquet compression codecs not applied

2015-01-09 Thread Ayoub
Hello, 

I tried to save a table created via the hive context as a parquet file but
whatever compression codec (uncompressed, snappy, gzip or lzo) I set via
setConf like: 

setConf(spark.sql.parquet.compression.codec, gzip) 

the size of the generated files is the always the same, so it seems like
spark context ignores the compression codec that I set. 

Here is a code sample applied via the spark shell: 

import org.apache.spark.sql.hive.HiveContext 
val hiveContext = new HiveContext(sc) 

hiveContext.sql(SET hive.exec.dynamic.partition = true) 
hiveContext.sql(SET hive.exec.dynamic.partition.mode = nonstrict) 
hiveContext.setConf(spark.sql.parquet.binaryAsString, true) // required
to make data compatible with impala 
hiveContext.setConf(spark.sql.parquet.compression.codec, gzip) 

hiveContext.sql(create external table if not exists foo (bar STRING, ts
INT) Partitioned by (year INT, month INT, day INT) STORED AS PARQUET
Location 'hdfs://path/data/foo') 

hiveContext.sql(insert into table foo partition(year, month,day) select *,
year(from_unixtime(ts)) as year, month(from_unixtime(ts)) as month, 
day(from_unixtime(ts)) as day from raw_foo) 

I tried that with spark 1.2 and 1.3 snapshot against hive 0.13 
and I also tried that with Impala on the same cluster which applied
correctly the compression codecs. 

Does anyone know what could be the problem ? 

Thanks, 
Ayoub.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-compression-codecs-not-applied-tp21058.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Parquet compression codecs not applied

2015-01-08 Thread Ayoub Benali
Hello,

I tried to save a table created via the hive context as a parquet file but
whatever compression codec (uncompressed, snappy, gzip or lzo) I set via
setConf like:

setConf(spark.sql.parquet.compression.codec, gzip)

the size of the generated files is the always the same, so it seems like
spark context ignores the compression codec that I set.

Here is a code sample applied via the spark shell:

import org.apache.spark.sql.hive.HiveContext
val hiveContext = new HiveContext(sc)

hiveContext.sql(SET hive.exec.dynamic.partition = true)
hiveContext.sql(SET hive.exec.dynamic.partition.mode = nonstrict)
hiveContext.setConf(spark.sql.parquet.binaryAsString, true) // required
to make data compatible with impala
hiveContext.setConf(spark.sql.parquet.compression.codec, gzip)

hiveContext.sql(create external table if not exists foo (bar STRING, ts
INT) Partitioned by (year INT, month INT, day INT) STORED AS PARQUET
Location 'hdfs://path/data/foo')

hiveContext.sql(insert into table foo partition(year, month,day) select *,
year(from_unixtime(ts)) as year, month(from_unixtime(ts)) as month,
day(from_unixtime(ts)) as day from raw_foo)

I tried that with spark 1.2 and 1.3 snapshot against hive 0.13
and I also tried that with Impala on the same cluster which applied
correctly the compression codecs.

Does anyone know what could be the problem ?

Thanks,
Ayoub.


Parquet compression codecs not applied

2015-01-08 Thread Ayoub
Hello,

I tried to save a table created via the hive context as a parquet file but
whatever compression codec (uncompressed, snappy, gzip or lzo) I set via
setConf like:

setConf(spark.sql.parquet.compression.codec, gzip)

the size of the generated files is the always the same, so it seems like
spark context ignores the compression codec that I set.

Here is a code sample applied via the spark shell:

import org.apache.spark.sql.hive.HiveContext
val hiveContext = new HiveContext(sc)

hiveContext.sql(SET hive.exec.dynamic.partition = true)
hiveContext.sql(SET hive.exec.dynamic.partition.mode = nonstrict)
hiveContext.setConf(spark.sql.parquet.binaryAsString, true) // required
to make data compatible with impala
hiveContext.setConf(spark.sql.parquet.compression.codec, gzip)

hiveContext.sql(create external table if not exists foo (bar STRING, ts
INT) Partitioned by (year INT, month INT, day INT) STORED AS PARQUET
Location 'hdfs://path/data/foo')

hiveContext.sql(insert into table foo partition(year, month,day) select *,
year(from_unixtime(ts)) as year, month(from_unixtime(ts)) as month, 
day(from_unixtime(ts)) as day from raw_foo)

I tried that with spark 1.2 and 1.3 snapshot against hive 0.13
and I also tried that with Impala on the same cluster which applied
correctly the compression codecs.

Does anyone know what could be the problem ?

Thanks,
Ayoub.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-compression-codecs-not-applied-tp21033.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RDD lineage and broadcast variables

2014-12-12 Thread Ron Ayoub
I'm still wrapping my head around that fact that the data backing an RDD is 
immutable since an RDD may need to be reconstructed from its lineage at any 
point. In the context of clustering there are many iterations where an RDD may 
need to change (for instance cluster assignments, etc) based on a broadcast 
variable of a list of centroids which are objects that in turn contain a list 
of features. So immutability is all well and good for the purposes of being 
able to replay a lineage. But now I'm wondering, during each iterations in 
which this RDD goes through many transformations it will be transforming based 
on that broadcast variable of centroids that are mutable. How would it replay 
the lineage in this instance? Does a dependency on mutable variables mess up 
the whole lineage thing?
Any help appreciated. Just trying to wrap my head around using Spark correctly. 
I will say it does seem like there is a common miss conception that Spark RDDs 
are in-memory arrays - but perhaps this is for a reason. Perhaps in some cases 
an option for mutability and failure exception is exactly what is needed for a 
one off algorithm that doesn't necessarily need resiliency. Just a thought. 
 

RE: Java RDD Union

2014-12-06 Thread Ron Ayoub
With that said, and the nature of iterative algorithms that Spark is advertised 
for, isn't this a bit of an unnecessary restriction since I don't see where the 
problem is. For instance, it is clear that when aggregating you need operations 
to be associative because of the way they are divided and combined. But since 
forEach works on an individual item the same problem doesn't exist. 
As an example, during a k-means algorithm you have to continually update 
cluster assignments per data item along with perhaps distance from centroid.  
So if you can't update items in place you have to literally create thousands 
upon thousands of RDDs. Does Spark have some kind of trick like reuse behind 
the scenes - fully persistent data objects or whatever. How can it possibly be 
efficient for 'iterative' algorithms when it is creating so many RDDs as 
opposed to one? 

 From: so...@cloudera.com
 Date: Fri, 5 Dec 2014 14:58:37 -0600
 Subject: Re: Java RDD Union
 To: ronalday...@live.com; user@spark.apache.org
 
 foreach also creates a new RDD, and does not modify an existing RDD.
 However, in practice, nothing stops you from fiddling with the Java
 objects inside an RDD when you get a reference to them in a method
 like this. This is definitely a bad idea, as there is certainly no
 guarantee that any other operations will see any, some or all of these
 edits.
 
 On Fri, Dec 5, 2014 at 2:40 PM, Ron Ayoub ronalday...@live.com wrote:
  I tricked myself into thinking it was uniting things correctly. I see I'm
  wrong now.
 
  I have a question regarding your comment that RDD are immutable. Can you
  change values in an RDD using forEach. Does that violate immutability. I've
  been using forEach to modify RDD but perhaps I've tricked myself once again
  into believing it is working. I have object reference so perhaps it is
  working serendipitously in local mode since the references are in fact not
  changing but there are referents are and somehow this will no longer work
  when clustering.
 
  Thanks for comments.
 
  From: so...@cloudera.com
  Date: Fri, 5 Dec 2014 14:22:38 -0600
  Subject: Re: Java RDD Union
  To: ronalday...@live.com
  CC: user@spark.apache.org
 
 
  No, RDDs are immutable. union() creates a new RDD, and does not modify
  an existing RDD. Maybe this obviates the question. I'm not sure what
  you mean about releasing from memory. If you want to repartition the
  unioned RDD, you repartition the result of union(), not anything else.
 
  On Fri, Dec 5, 2014 at 1:27 PM, Ron Ayoub ronalday...@live.com wrote:
   I'm a bit confused regarding expected behavior of unions. I'm running on
   8
   cores. I have an RDD that is used to collect cluster associations
   (cluster
   id, content id, distance) for internal clusters as well as leaf clusters
   since I'm doing hierarchical k-means and need all distances for sorting
   documents appropriately upon examination.
  
   It appears that Union simply adds items in the argument to the RDD
   instance
   the method is called on rather than just returning a new RDD. If I want
   to
   do Union this was as more of an add/append should I be capturing the
   return
   value and releasing it from memory. Need help clarifying the semantics
   here.
  
   Also, in another related thread someone mentioned coalesce after union.
   Would I need to do the same on the instance RDD I'm calling Union on.
  
   Perhaps a method such as append would be useful and clearer.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
  

Modifying an RDD in forEach

2014-12-06 Thread Ron Ayoub
This is from a separate thread with a differently named title. 
Why can't you modify the actual contents of an RDD using forEach? It appears to 
be working for me. What I'm doing is changing cluster assignments and distances 
per data item for each iteration of the clustering algorithm. The clustering 
algorithm is massive and iterates thousands of times. As I understand it now, 
you are supposed to create new RDDs on each pass. This is a hierachical k-means 
that I'm doing and hence it is consist of many iterations rather than large 
iterations.
So I understand the restriction of why operation when aggregating and reducing 
etc, need to be associative. However, forEach operates on a single item. So 
being that Spark is advertised as being great for iterative algorithms since it 
operates in-memory, how can it be good to create thousands upon thousands of 
RDDs during the course of an iterative algorithm?  Does Spark have some kind of 
trick like reuse behind the scenes - fully persistent data objects or whatever? 
How can it possibly be efficient for 'iterative' algorithms when it is creating 
so many RDDs as opposed to one? 
Or is the answer that I should keep doing what I'm doing because it is working 
even though it is not theoretically sound and aligned with functional ideas. I 
personally just want it to be fast and be able to operate on up to 500 million 
data items.  

RE: Java RDD Union

2014-12-06 Thread Ron Ayoub
Hiearchical K-means require a massive amount of iterations whereas flat K-means 
does not but I've found flat to be generally useless since in most UIs it is 
nice to be able to drill down into more and more specific clusters. If you have 
100 million documents and your branching factor is 8 (8-secting k-means) then 
you will be picking a cluster to split and iterating thousands of times. So per 
split you iterate maybe 6 or 7 times to get new cluster assignments and there 
are ultimately going to be 5,000 to 50,000 splits depending on split criterion 
and cluster variances etc... 
In this case fault tolerance doesn't matter. I've found that the distributed 
aspect of RDD is what I'm looking for and don't care or need the resilience 
part as much. It is a one off algorithm and that can just be run again if 
something goes wrong. Once the data is created it is done with Spark. 
But anyway, that is the very thing Spark is advertised for. 

 From: so...@cloudera.com
 Date: Sat, 6 Dec 2014 06:39:10 -0600
 Subject: Re: Java RDD Union
 To: ronalday...@live.com
 CC: user@spark.apache.org
 
 I guess a major problem with this is that you lose fault tolerance.
 You have no way of recreating the local state of the mutable RDD if a
 partition is lost.
 
 Why would you need thousands of RDDs for kmeans? it's a few per iteration.
 
 An RDD is more bookkeeping that data structure, itself. They don't
 inherently take up resource, unless you mark them to be persisted.
 You're paying the cost of copying objects to create one RDD from next,
 but that's mostly it.
 
 On Sat, Dec 6, 2014 at 6:28 AM, Ron Ayoub ronalday...@live.com wrote:
  With that said, and the nature of iterative algorithms that Spark is
  advertised for, isn't this a bit of an unnecessary restriction since I don't
  see where the problem is. For instance, it is clear that when aggregating
  you need operations to be associative because of the way they are divided
  and combined. But since forEach works on an individual item the same problem
  doesn't exist.
 
  As an example, during a k-means algorithm you have to continually update
  cluster assignments per data item along with perhaps distance from centroid.
  So if you can't update items in place you have to literally create thousands
  upon thousands of RDDs. Does Spark have some kind of trick like reuse behind
  the scenes - fully persistent data objects or whatever. How can it possibly
  be efficient for 'iterative' algorithms when it is creating so many RDDs as
  opposed to one?
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
  

RE: Modifying an RDD in forEach

2014-12-06 Thread Ron Ayoub
These are very interesting comments. The vast majority of cases I'm working on 
are going to be in the 3 million range and 100 million was thrown out as 
something to shoot for. I upped it to 500 million. But all things considering, 
I believe I may be able to directly translate what I have to Java Streams API 
and run 100 million docs on 32 cores in under an hour or two which would suit 
our needs. Up until this point I've been focused on computational aspect 
If I can scale up to clustering 100 million documents on a single machine I can 
probably directly translate what I have to Java Streams API and be faster. It 
is that scaling out that changes things. These are interesting comments. I 
think in this hierarchical k-means case the lazy evaluation becomes almost 
useless and perhaps even an impediment. Part of the problem is that I've been a 
bit too focused on math/information retrieval and have to update a bit on 
functional approach to programming so I can better utilize the tools But it 
does appear that Spark may not be the best option for this need. I don't need 
resiliency or fault tolerance as much as I need to be able to execute an 
algorithm on a large amount of data fast and then be done with it. I'm now 
thinking that in the 100 million document range I may be ok clustering feature 
vectors with no more than 25 features per doc on a single machine with 32 cores 
and a load of memory. I might directly translate what I have to Java 8 Streams 
API. 
There is also questions of proportion. Perhaps what I have is not big enough to 
warrant or require scaling out. I may have other uses for Spark in traditional 
map-reduce algorithms such as counting pairs of shared shingles for near dupe 
detection but to this point I've found Oracles parallel-pipelined table 
functions, while not glamorous are doing quite well in DB. 
I'm just a bit confused still on why it is advertised ideal for iterative 
algorithms when iterative algorithms have that point per iteration where things 
do get evaluated and laziness is not terribly useful. Ideal for massive 
in-memory cluster computing yes - but iterative... ? not sure. I have that book 
Functional Programming in Scala and I hope to read it someday and enrich my 
understanding here. 

Subject: Re: Modifying an RDD in forEach
From: mohitja...@gmail.com
Date: Sat, 6 Dec 2014 13:13:50 -0800
CC: ronalday...@live.com; user@spark.apache.org
To: mayur.rust...@gmail.com

Ron,“appears to be working” might be true when there are no failures. on large 
datasets being processed on a large number of machines, failures of several 
types(server, network, disk etc) can happen. At that time, Spark will not 
“know” that you changed the RDD in-place and will use any version of any 
partition of the RDD to be retried. Retries require idempotency and that is 
difficult without immutability. I believe, this is one of the primary reasons 
for making RDDs immutable in Spark (mutable isn't even an option worth 
considering). In general mutating something in a distributed system is a hard 
problem. It can be solved (e.g. in NoSQL or newSQL databases) but Spark is not 
a transactional data store.
If you are building an iterative machine learning algorithm which usually have 
a “reduce” step at the end of every iteration, then the lazy evaluation is 
unlikely to be useful. On the other hand, if these intermediate RDDs stay in 
the young generation of the JVM heap [I am not sure if RDD cache management 
somehow changes this, so I could be wrong] they are garbage collected quickly 
and with very little overhead.
This is the price of scaling out :-)Hope this helps,Mohit.
On Dec 6, 2014, at 5:02 AM, Mayur Rustagi mayur.rust...@gmail.com 
wrote:You'll benefit by viewing Matei's talk in Yahoo on Spark internals and 
how it optimizes execution of iterative jobs.

Simple answer is 

1. Spark doesn't materialize RDD when you do an iteration but lazily captures 
the transformation functions in RDD.(only function and closure , no data 
operation actually happens)

2. When you finally execute and want to cause effects (save to disk , collect 
on master etc) it views the DAG of execution and optimizes what it can reason 
(eliminating intermediate states , performing multiple Transformations in one 
tasks, leveraging partitioning where available among others)

Bottom line it doesn't matter how many RDD you have in your DAG chain as long 
as Spark can optimize the functions in that DAG to create minimal 
materialization on its way to final output. 
Regards

Mayur


On 06-Dec-2014 6:12 pm, Ron Ayoub ronalday...@live.com wrote:



This is from a separate thread with a differently named title. 
Why can't you modify the actual contents of an RDD using forEach? It appears to 
be working for me. What I'm doing is changing cluster assignments and distances 
per data item for each iteration of the clustering algorithm. The clustering 
algorithm is massive and iterates thousands of times. As I understand

Java RDD Union

2014-12-05 Thread Ron Ayoub
I'm a bit confused regarding expected behavior of unions. I'm running on 8 
cores. I have an RDD that is used to collect cluster associations (cluster id, 
content id, distance) for internal clusters as well as leaf clusters since I'm 
doing hierarchical k-means and need all distances for sorting documents 
appropriately upon examination. 
It appears that Union simply adds items in the argument to the RDD instance the 
method is called on rather than just returning a new RDD. If I want to do Union 
this was as more of an add/append should I be capturing the return value and 
releasing it from memory. Need help clarifying the semantics here. 
Also, in another related thread someone mentioned coalesce after union. Would I 
need to do the same on the instance RDD I'm calling Union on. 
Perhaps a method such as append would be useful and clearer.
  

collecting fails - requirements for collecting (clone, hashCode etc?)

2014-12-03 Thread Ron Ayoub
The following code is failing on the collect. If I don't do the collect and go 
with a JavaRDDDocument it works fine. Except I really would like to collect. 
At first I was getting an error regarding JDI threads and an index being 0. 
Then it just started locking up. I'm running the spark context locally on 8 
cores. 

long count = documents.filter(d - d.getFeatures().size()  
Parameters.MIN_CENTROID_FEATURES).count();  ListDocument 
sampledDocuments = documents.filter(d - d.getFeatures().size()  
Parameters.MIN_CENTROID_FEATURES)  .sample(false, 
samplingFraction(count)).collect();


  

RE: collecting fails - requirements for collecting (clone, hashCode etc?)

2014-12-03 Thread Ron Ayoub
I didn't realize I do get a nice stack trace if not running in debug mode. 
Basically, I believe Document has to be serializable. 
But since the question has already been asked, are the other requirements for 
objects within an RDD that I should be aware of. serializable is very 
understandable. How about clone, hashCode, etc...

From: ronalday...@live.com
To: user@spark.apache.org
Subject: collecting fails - requirements for collecting (clone, hashCode etc?)
Date: Wed, 3 Dec 2014 07:48:53 -0600




The following code is failing on the collect. If I don't do the collect and go 
with a JavaRDDDocument it works fine. Except I really would like to collect. 
At first I was getting an error regarding JDI threads and an index being 0. 
Then it just started locking up. I'm running the spark context locally on 8 
cores. 

long count = documents.filter(d - d.getFeatures().size()  
Parameters.MIN_CENTROID_FEATURES).count();  ListDocument 
sampledDocuments = documents.filter(d - d.getFeatures().size()  
Parameters.MIN_CENTROID_FEATURES)  .sample(false, 
samplingFraction(count)).collect();



  

Example of Fold

2014-10-31 Thread Ron Ayoub
I'm want to fold an RDD into a smaller RDD with max elements. I have simple 
bean objects with 4 properties. I want to group by 3 of the properties and then 
select the max of the 4th. So I believe fold is the appropriate method for 
this. My question is, is there a good fold example out there. Additionally, 
what it the zero value used for as the first argument? Thanks.  
   

winutils

2014-10-29 Thread Ron Ayoub
Apparently Spark does require Hadoop even if you do not intend to use Hadoop. 
Is there a workaround for the below error I get when creating the SparkContext 
in Scala?
I will note that I didn't have this problem yesterday when creating the Spark 
context in Java as part of the getting started App. It could be because I was 
using Maven project to manage dependencies and that did something for me or 
else JavaSparkContext has some different code. 
I would say, in order for Spark to be general purpose this is a pretty big bug 
since now it appears Spark depends upon Hadoop. 
Could not locate executable null\bin\winutils.exe in the Hadoop binaries

  

RE: winutils

2014-10-29 Thread Ron Ayoub
Well. I got past this problem and the manner was in my own email. I did 
download the one with Hadoop since it was among the only ones you don't have to 
compile from source along with CDH and Map. It worked yesterday because I added 
1.1.0 as a maven dependency from the repository. I just did the same thing 
again and it worked perfect. 
One peculiarity I will mention is that even with Scala IDE installed in Eclipse 
when I created the Maven project per instructions on the web and installed the 
connector I still did not get the Scala perspective nor right clicking and 
being able to add Scala types. This time around, I used the Scala IDE project 
wizard to create a simple non-Maven app and then converted it to Maven and all 
features seem to work fine.
I will also note that I'm learning Java, Scala, Eclipse, Spark, Maven all at 
the same time. Kind of overkill. But part of the frustration was following 
along with the Maven Scala project instructions using an archetype badly out of 
date. So now I think I found the a good approach to getting up and running with 
spark (1. Eclipse, 2. Scala IDE, 3. Scala Wizard Project, 4. Convert to Maven, 
5. Add Spark dependency). 

Date: Wed, 29 Oct 2014 11:38:23 -0700
Subject: Re: winutils
From: denny.g@gmail.com
To: ronalday...@live.com
CC: user@spark.apache.org

QQ - did you download the Spark 1.1 binaries that included the Hadoop one?  
Does this happen if you're using the Spark 1.1 binaries that do not include the 
Hadoop jars?
On Wed, Oct 29, 2014 at 11:31 AM, Ron Ayoub ronalday...@live.com wrote:



Apparently Spark does require Hadoop even if you do not intend to use Hadoop. 
Is there a workaround for the below error I get when creating the SparkContext 
in Scala?
I will note that I didn't have this problem yesterday when creating the Spark 
context in Java as part of the getting started App. It could be because I was 
using Maven project to manage dependencies and that did something for me or 
else JavaSparkContext has some different code. 
I would say, in order for Spark to be general purpose this is a pretty big bug 
since now it appears Spark depends upon Hadoop. 
Could not locate executable null\bin\winutils.exe in the Hadoop binaries

  

  

RE: winutils

2014-10-29 Thread Ron Ayoub
Well. I got past this problem and the manner was in my own email. I did 
download the one with Hadoop since it was among the only ones you don't have to 
compile from source along with CDH and Map. It worked yesterday because I added 
1.1.0 as a maven dependency from the repository. I just did the same thing 
again and it worked perfect. 
One peculiarity I will mention is that even with Scala IDE installed in Eclipse 
when I created the Maven project per instructions on the web and installed the 
connector I still did not get the Scala perspective nor right clicking and 
being able to add Scala types. This time around, I used the Scala IDE project 
wizard to create a simple non-Maven app and then converted it to Maven and all 
features seem to work fine.
I will also note that I'm learning Java, Scala, Eclipse, Spark, Maven all at 
the same time. Kind of overkill. But part of the frustration was following 
along with the Maven Scala project instructions using an archetype badly out of 
date. So now I think I found the a good approach to getting up and running with 
spark (1. Eclipse, 2. Scala IDE, 3. Scala Wizard Project, 4. Convert to Maven, 
5. Add Spark dependency). 

Date: Wed, 29 Oct 2014 11:38:23 -0700
Subject: Re: winutils
From: denny.g@gmail.com
To: ronalday...@live.com
CC: user@spark.apache.org

QQ - did you download the Spark 1.1 binaries that included the Hadoop one?  
Does this happen if you're using the Spark 1.1 binaries that do not include the 
Hadoop jars?
On Wed, Oct 29, 2014 at 11:31 AM, Ron Ayoub ronalday...@live.com wrote:



Apparently Spark does require Hadoop even if you do not intend to use Hadoop. 
Is there a workaround for the below error I get when creating the SparkContext 
in Scala?
I will note that I didn't have this problem yesterday when creating the Spark 
context in Java as part of the getting started App. It could be because I was 
using Maven project to manage dependencies and that did something for me or 
else JavaSparkContext has some different code. 
I would say, in order for Spark to be general purpose this is a pretty big bug 
since now it appears Spark depends upon Hadoop. 
Could not locate executable null\bin\winutils.exe in the Hadoop binaries

  

  

JdbcRDD in Java

2014-10-28 Thread Ron Ayoub
The following line of code is indicating the constructor is not defined. The 
only examples I can find of usage of JdbcRDD is Scala examples. Does this work 
in Java? Is there any examples? Thanks. 
JdbcRDDInteger rdd = new JdbcRDDInteger(sp, () - 
ods.getConnection(), sql, 1, 1783059, 10, (ResultSet row) - 
row.getInt(FEATURE_ID)); 

Is Spark in Java a bad idea?

2014-10-28 Thread Ron Ayoub
I haven't learned Scala yet so as you might imagine I'm having challenges 
working with Spark from the Java API. For one thing, it seems very limited in 
comparison to Scala. I ran into a problem really quick. I need to hydrate an 
RDD from JDBC/Oracle and so I wanted to use the JdbcRDD. But that is part of 
the spark api and I'm unable to get the compiler to accept various parameters. 
I looked at the code and I noticed that JdbcRDD doesn't add much value and just 
implements compute and partition. I figured I can do that myself with better 
looking JDBC code. So I created a class inheriting from RDD that was heavily 
decorated with stuff I have never seen before. Next, I recalled that I have to 
use the JavaRDD. Of course, that class doesn't have those methods that you can 
override. 
From where I'm standing right now, it really appears that Spark doesn't really 
support Java and that if you really want to use it you need to learn Scala. Is 
this a correct assessment? 

  

RE: Is Spark in Java a bad idea?

2014-10-28 Thread Ron Ayoub
I interpret this to mean you have to learn Scala in order to work with Spark in 
Scala (goes without saying) and also to work with Spark in Java (since you have 
to jump through some hoops for basic functionality).
The best path here is to take this as a learning opportunity and sit down and 
learn Scala. 
Regarding RDD being an internal API, it has two methods that clearly allow you 
to override them which the JdbcRDD does and it looks close to trivial - if I 
only new Scala. Once I learn Scala, I would say the first thing I plan on doing 
is writing my own OracleRDD with my own flavor of Jdbc code. Why would this not 
be advisable? 
 Subject: Re: Is Spark in Java a bad idea?
 From: matei.zaha...@gmail.com
 Date: Tue, 28 Oct 2014 11:56:39 -0700
 CC: u...@spark.incubator.apache.org
 To: isasmani@gmail.com
 
 A pretty large fraction of users use Java, but a few features are still not 
 available in it. JdbcRDD is one of them -- this functionality will likely be 
 superseded by Spark SQL when we add JDBC as a data source. In the meantime, 
 to use it, I'd recommend writing a class in Scala that has Java-friendly 
 methods and getting an RDD to it from that. Basically the two parameters that 
 weren't friendly there were the ClassTag and the getConnection and mapRow 
 functions.
 
 Subclassing RDD in Java is also not really supported, because that's an 
 internal API. We don't expect users to be defining their own RDDs.
 
 Matei
 
  On Oct 28, 2014, at 11:47 AM, critikaled isasmani@gmail.com wrote:
  
  Hi Ron,
  what ever api you have in scala you can possibly use it form java. scala is
  inter-operable with java and vice versa. scala being both object oriented
  and functional will make your job easier on jvm and it is more consise than
  java. Take it as an opportunity and start learning scala ;).
  
  
  
  --
  View this message in context: 
  http://apache-spark-user-list.1001560.n3.nabble.com/Is-Spark-in-Java-a-bad-idea-tp17534p17538.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
  
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
  
 
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
  

Spark to eliminate full-table scan latency

2014-10-27 Thread Ron Ayoub
We have a table containing 25 features per item id along with feature weights. 
A correlation matrix can be constructed for every feature pair based on 
co-occurrence. If a user inputs a feature they can find out the features that 
are correlated with a self-join requiring a single full table scan. This 
results in high latency for big data (10 seconds +) due to the IO involved in 
the full table scan. My idea is for this feature the data can be loaded into an 
RDD and transformations and actions can be applied to find out per query what 
are the correlated features. 
I'm pretty sure Spark can do this sort of thing. Since I'm new, what I'm not 
sure about is, is Spark appropriate as a server application? For instance, the 
drive application would have to load the RDD and then listen for request and 
return results, perhaps using a socket?  Are there any libraries to facilitate 
this sort of Spark server app? So I understand how Spark can be used to grab 
data, run algorithms, and put results back but is it appropriate as the engine 
of a server app and what are the general patterns involved?

  

RE: Spark to eliminate full-table scan latency

2014-10-27 Thread Ron Ayoub
This does look like it provides a good way to allow other process to access the 
contents of an RDD in a separate app? Is there any other general purpose 
mechanism for serving up RDD data? I understand that the driver app and workers 
all are app specific and run in separate executors but would be cool if there 
was some general way to create a server app based on Spark. Perhaps Spark SQL 
is that general way and I'll soon find out. Thanks. 

From: mich...@databricks.com
Date: Mon, 27 Oct 2014 14:35:46 -0700
Subject: Re: Spark to eliminate full-table scan latency
To: ronalday...@live.com
CC: user@spark.apache.org

You can access cached data in spark through the JDBC server:
http://spark.apache.org/docs/latest/sql-programming-guide.html#running-the-thrift-jdbc-server

On Mon, Oct 27, 2014 at 1:47 PM, Ron Ayoub ronalday...@live.com wrote:



We have a table containing 25 features per item id along with feature weights. 
A correlation matrix can be constructed for every feature pair based on 
co-occurrence. If a user inputs a feature they can find out the features that 
are correlated with a self-join requiring a single full table scan. This 
results in high latency for big data (10 seconds +) due to the IO involved in 
the full table scan. My idea is for this feature the data can be loaded into an 
RDD and transformations and actions can be applied to find out per query what 
are the correlated features. 
I'm pretty sure Spark can do this sort of thing. Since I'm new, what I'm not 
sure about is, is Spark appropriate as a server application? For instance, the 
drive application would have to load the RDD and then listen for request and 
return results, perhaps using a socket?  Are there any libraries to facilitate 
this sort of Spark server app? So I understand how Spark can be used to grab 
data, run algorithms, and put results back but is it appropriate as the engine 
of a server app and what are the general patterns involved?