Re: Dataset -- Schema for type scala.collection.Set[scala.Int] is not supported

2019-08-09 Thread Mohit Jaggi
switched to immutable.Set and it works. this is weird as the code in
ScalaReflection.scala seems to support scala.collection.Set

cc: dev list, in case this is a bug

On Thu, Aug 8, 2019 at 8:41 PM Mohit Jaggi  wrote:

> Is this not supported? I found this diff
> <https://github.com/apache/spark/pull/18416/files> and wonder if this is
> a bug or am I doing something wrong?
>
>  see below =
>
> import scala.collection.Set
> case class A(ps: Set[Int], x: Int)
>
> val az = Seq(A(Set(1, 2), 1), A(Set(2), 2))
>
> az.toDS
> java.lang.UnsupportedOperationException: Schema for type
> scala.collection.Set[scala.Int] is not supported
> at
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:724)
> at
> scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:906)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:46)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:723)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1$$anonfun$7.apply(ScalaReflection.scala:388)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1$$anonfun$7.apply(ScalaReflection.scala:387)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.immutable.List.map(List.scala:296)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:387)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:157)
> at
> scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:906)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:46)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor(ScalaReflection.scala:157)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$.deserializerFor(ScalaReflection.scala:145)
> at
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:72)
> at org.apache.spark.sql.Encoders$.product(Encoders.scala:276)
> at
> org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:248)
> at
> org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:34)
> at
> line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:6)
> at
> line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:82)
> at
> line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:84)
> at
> line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:86)
> at
> line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:88)
> at
> line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:90)
> at
> line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:92)
> at
> line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:94)
> at
> line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:96)
> at
> line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:98)
> at
> line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:100)
> at
> line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:102)
> at
> line9f6df40864bf4b

Dataset -- Schema for type scala.collection.Set[scala.Int] is not supported

2019-08-08 Thread Mohit Jaggi
Is this not supported? I found this diff
 and wonder if this is a
bug or am I doing something wrong?

 see below =

import scala.collection.Set
case class A(ps: Set[Int], x: Int)

val az = Seq(A(Set(1, 2), 1), A(Set(2), 2))

az.toDS
java.lang.UnsupportedOperationException: Schema for type
scala.collection.Set[scala.Int] is not supported
at
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789)
at
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:724)
at
scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
at
org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:906)
at
org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:46)
at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:723)
at
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1$$anonfun$7.apply(ScalaReflection.scala:388)
at
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1$$anonfun$7.apply(ScalaReflection.scala:387)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:296)
at
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:387)
at
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:157)
at
scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
at
org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:906)
at
org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:46)
at
org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor(ScalaReflection.scala:157)
at
org.apache.spark.sql.catalyst.ScalaReflection$.deserializerFor(ScalaReflection.scala:145)
at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:72)
at org.apache.spark.sql.Encoders$.product(Encoders.scala:276)
at
org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:248)
at
org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:34)
at
line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:6)
at
line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:82)
at
line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:84)
at
line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:86)
at
line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:88)
at
line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:90)
at
line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:92)
at
line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:94)
at
line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:96)
at
line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:98)
at
line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:100)
at
line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:102)
at
line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw.(command-661233094182065:104)
at
line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw.(command-661233094182065:106)
at
line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw.(command-661233094182065:108)
at
line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw.(command-661233094182065:110)
at
line9f6df40864bf4b14acca9f5c334e0286112.$read.(command-661233094182065:112)
at
line9f6df40864bf4b14acca9f5c334e0286112.$read$.(command-661233094182065:116)
at
line9f6df40864bf4b14acca9f5c334e0286112.$read$.(command-661233094182065)
at
line9f6df40864bf4b14acca9f5c334e0286112.$eval$.$print$lzycompute(:7)
at 

Re: dataset best practice question

2019-01-18 Thread Mohit Jaggi
Thanks! I wanted to avoid repeating f1, f2, f3 in class B. I wonder whether
the encoders/decoders work if I use mixins

On Tue, Jan 15, 2019 at 7:57 PM  wrote:

> Hi Mohit,
>
>
>
> I’m not sure that there is a “correct” answer here, but I tend to use
> classes whenever the input or output data represents something meaningful
> (such as a domain model object). I would recommend against creating many
> temporary classes for each and every transformation step as that may be
> difficult to maintain over time.
>
>
>
> Using *withColumn* statements will continue to work, and you don’t need
> to cast to your output class until you’ve setup all tranformations.
> Therefore, you can do things like:
>
>
>
> case class A (f1, f2, f3)
>
> case class B (f1, f2, f3, f4, f5, f6)
>
>
>
> ds_a = spark.read.csv(“path”).as[A]
>
> ds_b = ds_a
>
>   .withColumn(“f4”, someUdf)
>
>   .withColumn(“f5”, someUdf)
>
>   .withColumn(“f6”, someUdf)
>
>   .as[B]
>
>
>
> Kevin
>
>
>
> *From:* Mohit Jaggi 
> *Sent:* Tuesday, January 15, 2019 1:31 PM
> *To:* user 
> *Subject:* dataset best practice question
>
>
>
> Fellow Spark Coders,
>
> I am trying to move from using Dataframes to Datasets for a reasonably
> large code base. Today the code looks like this:
>
>
>
> df_a= read_csv
>
> df_b = df.withColumn ( some_transform_that_adds_more_columns )
>
> //repeat the above several times
>
>
>
> With datasets, this will require defining
>
>
>
> case class A { f1, f2, f3 } //fields from csv file
>
> case class B { f1, f2, f3, f4 } //union of A and new field added by
> some_transform_that_adds_more_columns
>
> //repeat this 10 times
>
>
>
> Is there a better way?
>
>
>
> Mohit.
>


dataset best practice question

2019-01-15 Thread Mohit Jaggi
Fellow Spark Coders,
I am trying to move from using Dataframes to Datasets for a reasonably
large code base. Today the code looks like this:

df_a= read_csv
df_b = df.withColumn ( some_transform_that_adds_more_columns )
//repeat the above several times

With datasets, this will require defining

case class A { f1, f2, f3 } //fields from csv file
case class B { f1, f2, f3, f4 } //union of A and new field added by
some_transform_that_adds_more_columns
//repeat this 10 times

Is there a better way?

Mohit.


Re: Pyspark access to scala/java libraries

2018-07-17 Thread Mohit Jaggi
Thanks 0xF0F0F0 and Ashutosh for the pointers.

Holden,
I am trying to look into sparklingml...what am I looking for? Also which
chapter/page of your book should I look at?

Mohit.

On Sun, Jul 15, 2018 at 3:02 AM Holden Karau  wrote:

> If you want to see some examples in a library shows a way to do it -
> https://github.com/sparklingpandas/sparklingml and high performance spark
> also talks about it.
>
> On Sun, Jul 15, 2018, 11:57 AM <0xf0f...@protonmail.com.invalid> wrote:
>
>> Check
>> https://stackoverflow.com/questions/31684842/calling-java-scala-function-from-a-task
>>
>> ​Sent with ProtonMail Secure Email.​
>>
>> ‐‐‐ Original Message ‐‐‐
>>
>> On July 15, 2018 8:01 AM, Mohit Jaggi  wrote:
>>
>> > Trying again…anyone know how to make this work?
>> >
>> > > On Jul 9, 2018, at 3:45 PM, Mohit Jaggi mohitja...@gmail.com wrote:
>> > >
>> > > Folks,
>> > >
>> > > I am writing some Scala/Java code and want it to be usable from
>> pyspark.
>> > >
>> > > For example:
>> > >
>> > > class MyStuff(addend: Int) {
>> > >
>> > > def myMapFunction(x: Int) = x + addend
>> > >
>> > > }
>> > >
>> > > I want to call it from pyspark as:
>> > >
>> > > df = ...
>> > >
>> > > mystuff = sc._jvm.MyStuff(5)
>> > >
>> > > df[‘x’].map(lambda x: mystuff.myMapFunction(x))
>> > >
>> > > How can I do this?
>> > >
>> > > Mohit.
>> >
>> > --
>> >
>> > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>>
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>>


Re: Pyspark access to scala/java libraries

2018-07-15 Thread Mohit Jaggi
Trying again…anyone know how to make this work?

> On Jul 9, 2018, at 3:45 PM, Mohit Jaggi  wrote:
> 
> Folks,
> I am writing some Scala/Java code and want it to be usable from pyspark.
> 
> For example:
> class MyStuff(addend: Int)  {
>   def myMapFunction(x: Int) = x + addend
> }
> 
> I want to call it from pyspark as:
> 
> df = ...
> mystuff = sc._jvm.MyStuff(5)
> df[‘x’].map(lambda x: mystuff.myMapFunction(x))
> 
> How can I do this?
> 
> Mohit.
> 
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Pyspark access to scala/java libraries

2018-07-09 Thread Mohit Jaggi
Folks,
I am writing some Scala/Java code and want it to be usable from pyspark.

For example:
class MyStuff(addend: Int)  {
def myMapFunction(x: Int) = x + addend
}

I want to call it from pyspark as:

df = ...
mystuff = sc._jvm.MyStuff(5)
df[‘x’].map(lambda x: mystuff.myMapFunction(x))

How can I do this?

Mohit.



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: SparkILoop doesn't run

2016-11-28 Thread Mohit Jaggi
Thanks, I will look into the classpaths and check.

On Mon, Nov 21, 2016 at 3:28 PM, Jakob Odersky <ja...@odersky.com> wrote:

> The issue I was having had to do with missing classpath settings; in
> sbt it can be solved by setting `fork:=true` to run tests in new jvms
> with appropriate classpaths.
>
> Mohit, from the looks of the error message, it also appears to be some
> classpath issue. This typically happens when there are libraries of
> multiple scala versions on the same classpath. You mention that it
> worked before, can you recall what libraries you upgraded before it
> broke?
>
> --Jakob
>
> On Mon, Nov 21, 2016 at 2:34 PM, Jakob Odersky <ja...@odersky.com> wrote:
> > Trying it out locally gave me an NPE. I'll look into it in more
> > detail, however the SparkILoop.run() method is dead code. It's used
> > nowhere in spark and can be removed without any issues.
> >
> > On Thu, Nov 17, 2016 at 11:16 AM, Mohit Jaggi <mohitja...@gmail.com>
> wrote:
> >> Thanks Holden. I did post to the user list but since this is not a
> common
> >> case, I am trying the developer list as well. Yes there is a reason: I
> get
> >> code from somewhere e.g. a notebook. This type of code did work for me
> >> before.
> >>
> >> Mohit Jaggi
> >> Founder,
> >> Data Orchard LLC
> >> www.dataorchardllc.com
> >>
> >>
> >>
> >>
> >> On Nov 17, 2016, at 8:53 AM, Holden Karau <hol...@pigscanfly.ca> wrote:
> >>
> >> Moving to user list
> >>
> >> So this might be a better question for the user list - but is there a
> reason
> >> you are trying to use the SparkILoop for tests?
> >>
> >> On Thu, Nov 17, 2016 at 5:47 PM Mohit Jaggi <mohitja...@gmail.com>
> wrote:
> >>>
> >>>
> >>>
> >>> I am trying to use SparkILoop to write some tests(shown below) but the
> >>> test hangs with the following stack trace. Any idea what is going on?
> >>>
> >>>
> >>> import org.apache.log4j.{Level, LogManager}
> >>> import org.apache.spark.repl.SparkILoop
> >>> import org.scalatest.{BeforeAndAfterAll, FunSuite}
> >>>
> >>> class SparkReplSpec extends FunSuite with BeforeAndAfterAll {
> >>>
> >>>   override def beforeAll(): Unit = {
> >>>   }
> >>>
> >>>   override def afterAll(): Unit = {
> >>>   }
> >>>
> >>>   test("yay!") {
> >>> val rootLogger = LogManager.getRootLogger
> >>> val logLevel = rootLogger.getLevel
> >>> rootLogger.setLevel(Level.ERROR)
> >>>
> >>> val output = SparkILoop.run(
> >>>   """
> >>> |println("hello")
> >>>   """.stripMargin)
> >>>
> >>> println(s" $output ")
> >>>
> >>>   }
> >>> }
> >>>
> >>>
> >>> /Library/Java/JavaVirtualMachines/jdk1.8.0_
> 66.jdk/Contents/Home/bin/java
> >>> -Dspark.master=local[*] -Didea.launcher.port=7532
> >>> "-Didea.launcher.bin.path=/Applications/IntelliJ IDEA
> CE.app/Contents/bin"
> >>> -Dfile.encoding=UTF-8 -classpath "/Users/mohit/Library/Application
> >>> Support/IdeaIC2016.2/Scala/lib/scala-plugin-runners.jar:/Library/Java/
> JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/jre/lib/
> charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_
> 66.jdk/Contents/Home/jre/lib/deploy.jar:/Library/Java/
> JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/jre/lib/
> ext/cldrdata.jar:/Library/Java/JavaVirtualMachines/jdk1.
> 8.0_66.jdk/Contents/Home/jre/lib/ext/dnsns.jar:/Library/
> Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/jre/
> lib/ext/jaccess.jar:/Library/Java/JavaVirtualMachines/jdk1.
> 8.0_66.jdk/Contents/Home/jre/lib/ext/jfxrt.jar:/Library/
> Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/jre/
> lib/ext/localedata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_
> 66.jdk/Contents/Home/jre/lib/ext/nashorn.jar:/Library/Java/
> JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/jre/lib/
> ext/sunec.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_
> 66.jdk/Contents/Home/jre/lib/ext/sunjce_provider.jar:/Library/Java/
> JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/jre/lib/
> ext/sunpkcs11.jar:/Library/Java/JavaVirtualMachines/jdk1.
> 8.0_66.jdk/Contents/Home/jre/lib/ext/zipfs.jar:/Library/
> Java/JavaVirtualMachines/jdk1.8.0_66.

SparkILoop doesn't run

2016-11-16 Thread Mohit Jaggi
scala.tools.nsc.interpreter.ReplGlobal$$anon$1$$anon$2.typed(ReplGlobal.scala:36)
at scala.tools.nsc.typechecker.Typers$Typer.typed(Typers.scala:5448)
at 
scala.tools.nsc.typechecker.Analyzer$typerFactory$$anon$3.apply(Analyzer.scala:102)
at 
scala.tools.nsc.Global$GlobalPhase$$anonfun$applyPhase$1.apply$mcV$sp(Global.scala:440)
at scala.tools.nsc.Global$GlobalPhase.withCurrentUnit(Global.scala:431)
at scala.tools.nsc.Global$GlobalPhase.applyPhase(Global.scala:440)
at 
scala.tools.nsc.typechecker.Analyzer$typerFactory$$anon$3$$anonfun$run$1.apply(Analyzer.scala:94)
at 
scala.tools.nsc.typechecker.Analyzer$typerFactory$$anon$3$$anonfun$run$1.apply(Analyzer.scala:93)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at 
scala.tools.nsc.typechecker.Analyzer$typerFactory$$anon$3.run(Analyzer.scala:93)
at scala.tools.nsc.Global$Run.compileUnitsInternal(Global.scala:1501)
at scala.tools.nsc.Global$Run.compileUnits(Global.scala:1486)
at scala.tools.nsc.Global$Run.compileSources(Global.scala:1481)
at 
scala.tools.nsc.interpreter.IMain.compileSourcesKeepingRun(IMain.scala:435)
at 
scala.tools.nsc.interpreter.IMain$ReadEvalPrint.compileAndSaveRun(IMain.scala:855)
at 
scala.tools.nsc.interpreter.IMain$ReadEvalPrint.compile(IMain.scala:813)
at scala.tools.nsc.interpreter.IMain.bind(IMain.scala:675)
at scala.tools.nsc.interpreter.IMain.bind(IMain.scala:712)
at 
scala.tools.nsc.interpreter.IMain$$anonfun$quietBind$1.apply(IMain.scala:711)
at 
scala.tools.nsc.interpreter.IMain$$anonfun$quietBind$1.apply(IMain.scala:711)
at scala.tools.nsc.interpreter.IMain.beQuietDuring(IMain.scala:214)
at scala.tools.nsc.interpreter.IMain.quietBind(IMain.scala:711)
at 
scala.tools.nsc.interpreter.ILoop.scala$tools$nsc$interpreter$ILoop$$loopPostInit(ILoop.scala:891)
at 
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1$$anonfun$apply$mcZ$sp$1.apply$mcZ$sp(ILoop.scala:917)
at 
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1$$anonfun$apply$mcZ$sp$1.apply(ILoop.scala:915)
at 
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1$$anonfun$apply$mcZ$sp$1.apply(ILoop.scala:915)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at 
scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Process finished with exit code 137 (interrupted by signal 9: SIGKILL)



Mohit Jaggi
Founder,
Data Orchard LLC
www.dataorchardllc.com






Re: LinearRegressionWithSGD and Rank Features By Importance

2016-11-03 Thread Mohit Jaggi
For linear regression, it should be fairly easy. Just sort the co-efficients :)

Mohit Jaggi
Founder,
Data Orchard LLC
www.dataorchardllc.com




> On Nov 3, 2016, at 3:35 AM, Carlo.Allocca <carlo.allo...@open.ac.uk> wrote:
> 
> Hi All,
> 
> I am using SPARK and in particular the MLib library.
> 
> import org.apache.spark.mllib.regression.LabeledPoint;
> import org.apache.spark.mllib.regression.LinearRegressionModel;
> import org.apache.spark.mllib.regression.LinearRegressionWithSGD;
> 
> For my problem I am using the LinearRegressionWithSGD and I would like to 
> perform a “Rank Features By Importance”.
> 
> I checked the documentation and it seems that does not provide such methods.
> 
> Am I missing anything?  Please, could you provide any help on this?
> Should I change the approach?
> 
> Many Thanks in advance,
> 
> Best Regards,
> Carlo
> 
> 
> -- The Open University is incorporated by Royal Charter (RC 000391), an 
> exempt charity in England & Wales and a charity registered in Scotland (SC 
> 038302). The Open University is authorised and regulated by the Financial 
> Conduct Authority.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



using SparkILoop.run

2016-09-26 Thread Mohit Jaggi
I want to use the following API  SparkILoop.run(...). I am writing a test
case as that passes some scala code to spark interpreter and receives
result as string.

I couldn't figure out how to pass the right settings into the run() method.
I get an error about "master' not being set.

object SparkILoop {

  /**
   * Creates an interpreter loop with default settings and feeds
   * the given code to it as input.
   */
  def run(code: String, sets: Settings = new Settings): String = {
import java.io.{ BufferedReader, StringReader, OutputStreamWriter }

stringFromStream { ostream =>
  Console.withOut(ostream) {
val input = new BufferedReader(new StringReader(code))
val output = new JPrintWriter(new OutputStreamWriter(ostream), true)
val repl = new SparkILoop(input, output)

if (sets.classpath.isDefault) {
  sets.classpath.value = sys.props("java.class.path")
}
repl process sets
  }
}
  }
  def run(lines: List[String]): String = run(lines.map(_ + "\n").mkString)
}


Re: Model abstract class in spark ml

2016-08-31 Thread Mohit Jaggi
Thanks Cody. That was a good explanation!

Mohit Jaggi
Founder,
Data Orchard LLC
www.dataorchardllc.com




> On Aug 31, 2016, at 7:32 AM, Cody Koeninger <c...@koeninger.org> wrote:
> 
> http://blog.originate.com/blog/2014/02/27/types-inside-types-in-scala/
> 
> On Wed, Aug 31, 2016 at 2:19 AM, Sean Owen <so...@cloudera.com> wrote:
>> Weird, I recompiled Spark with a similar change to Model and it seemed
>> to work but maybe I missed a step in there.
>> 
>> On Wed, Aug 31, 2016 at 6:33 AM, Mohit Jaggi <mohitja...@gmail.com> wrote:
>>> I think I figured it out. There is indeed "something deeper in Scala” :-)
>>> 
>>> abstract class A {
>>>  def a: this.type
>>> }
>>> 
>>> class AA(i: Int) extends A {
>>>  def a = this
>>> }
>>> 
>>> the above works ok. But if you return anything other than “this”, you will
>>> get a compile error.
>>> 
>>> abstract class A {
>>>  def a: this.type
>>> }
>>> 
>>> class AA(i: Int) extends A {
>>>  def a = new AA(1)
>>> }
>>> 
>>> Error:(33, 11) type mismatch;
>>> found   : com.dataorchard.datagears.AA
>>> required: AA.this.type
>>>  def a = new AA(1)
>>>  ^
>>> 
>>> So you have to do:
>>> 
>>> abstract class A[T <: A[T]]  {
>>>  def a: T
>>> }
>>> 
>>> class AA(i: Int) extends A[AA] {
>>>  def a = new AA(1)
>>> }
>>> 
>>> 
>>> 
>>> Mohit Jaggi
>>> Founder,
>>> Data Orchard LLC
>>> www.dataorchardllc.com
>>> 
>>> 
>> 
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Model abstract class in spark ml

2016-08-30 Thread Mohit Jaggi
I think I figured it out. There is indeed "something deeper in Scala” :-)

abstract class A {
  def a: this.type
}

class AA(i: Int) extends A {
  def a = this
}
the above works ok. But if you return anything other than “this”, you will get 
a compile error.

abstract class A {
  def a: this.type
}

class AA(i: Int) extends A {
  def a = new AA(1)
}
Error:(33, 11) type mismatch;
 found   : com.dataorchard.datagears.AA
 required: AA.this.type
  def a = new AA(1)
  ^

So you have to do:

abstract class A[T <: A[T]]  {
  def a: T
}

class AA(i: Int) extends A[AA] {
  def a = new AA(1)
}


Mohit Jaggi
Founder,
Data Orchard LLC
www.dataorchardllc.com




> On Aug 30, 2016, at 9:51 PM, Mohit Jaggi <mohitja...@gmail.com> wrote:
> 
> thanks Sean. I am cross posting on dev to see why the code was written that 
> way. Perhaps, this.type doesn’t do what is needed.
> 
> Mohit Jaggi
> Founder,
> Data Orchard LLC
> www.dataorchardllc.com <http://www.dataorchardllc.com/>
> 
> 
> 
> 
>> On Aug 30, 2016, at 2:08 PM, Sean Owen <so...@cloudera.com 
>> <mailto:so...@cloudera.com>> wrote:
>> 
>> I think it's imitating, for example, how Enum is delcared in Java:
>> 
>> abstract class Enum>
>> 
>> this is done so that Enum can refer to the actual type of the derived
>> enum class when declaring things like public final int compareTo(E o)
>> to implement Comparable. The type is redundant in a sense, because
>> you effectively have MyEnum extending Enum.
>> 
>> Java allows this self-referential definition. However Scala has
>> "this.type" for this purpose and (unless I'm about to learn something
>> deeper about Scala) it would have been the better way to express this
>> so that Model methods can for example state that copy() returns a
>> Model of the same concrete type.
>> 
>> I don't know if it can be changed now without breaking compatibility
>> but you're welcome to give it a shot with MiMa to see. It does
>> compile, using this.type.
>> 
>> 
>> On Tue, Aug 30, 2016 at 9:47 PM, Mohit Jaggi <mohitja...@gmail.com 
>> <mailto:mohitja...@gmail.com>> wrote:
>>> Folks,
>>> I am having a bit of trouble understanding the following:
>>> 
>>> abstract class Model[M <: Model[M]]
>>> 
>>> Why is M <: Model[M]?
>>> 
>>> Cheers,
>>> Mohit.
>>> 
> 



Re: Model abstract class in spark ml

2016-08-30 Thread Mohit Jaggi
thanks Sean. I am cross posting on dev to see why the code was written that
way. Perhaps, this.type doesn’t do what is needed.

Mohit Jaggi
Founder,
Data Orchard LLC
www.dataorchardllc.com




On Aug 30, 2016, at 2:08 PM, Sean Owen <so...@cloudera.com> wrote:

I think it's imitating, for example, how Enum is delcared in Java:

abstract class Enum>

this is done so that Enum can refer to the actual type of the derived
enum class when declaring things like public final int compareTo(E o)
to implement Comparable. The type is redundant in a sense, because
you effectively have MyEnum extending Enum.

Java allows this self-referential definition. However Scala has
"this.type" for this purpose and (unless I'm about to learn something
deeper about Scala) it would have been the better way to express this
so that Model methods can for example state that copy() returns a
Model of the same concrete type.

I don't know if it can be changed now without breaking compatibility
but you're welcome to give it a shot with MiMa to see. It does
compile, using this.type.


On Tue, Aug 30, 2016 at 9:47 PM, Mohit Jaggi <mohitja...@gmail.com> wrote:

Folks,
I am having a bit of trouble understanding the following:

abstract class Model[M <: Model[M]]

Why is M <: Model[M]?

Cheers,
Mohit.


Re: Model abstract class in spark ml

2016-08-30 Thread Mohit Jaggi
thanks Sean. I am cross posting on dev to see why the code was written that 
way. Perhaps, this.type doesn’t do what is needed.

Mohit Jaggi
Founder,
Data Orchard LLC
www.dataorchardllc.com




> On Aug 30, 2016, at 2:08 PM, Sean Owen <so...@cloudera.com> wrote:
> 
> I think it's imitating, for example, how Enum is delcared in Java:
> 
> abstract class Enum>
> 
> this is done so that Enum can refer to the actual type of the derived
> enum class when declaring things like public final int compareTo(E o)
> to implement Comparable. The type is redundant in a sense, because
> you effectively have MyEnum extending Enum.
> 
> Java allows this self-referential definition. However Scala has
> "this.type" for this purpose and (unless I'm about to learn something
> deeper about Scala) it would have been the better way to express this
> so that Model methods can for example state that copy() returns a
> Model of the same concrete type.
> 
> I don't know if it can be changed now without breaking compatibility
> but you're welcome to give it a shot with MiMa to see. It does
> compile, using this.type.
> 
> 
> On Tue, Aug 30, 2016 at 9:47 PM, Mohit Jaggi <mohitja...@gmail.com> wrote:
>> Folks,
>> I am having a bit of trouble understanding the following:
>> 
>> abstract class Model[M <: Model[M]]
>> 
>> Why is M <: Model[M]?
>> 
>> Cheers,
>> Mohit.
>> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Model abstract class in spark ml

2016-08-30 Thread Mohit Jaggi
Folks,
I am having a bit of trouble understanding the following:

abstract class Model[M <: Model[M]]

Why is M <: Model[M]?

Cheers,
Mohit.


Re: How Spark HA works

2016-08-23 Thread Mohit Jaggi
what did you mean by “link” ? an HTTP URL to the spark monitoring UI? AFAIK, it 
is not directly supported. i typically go to both masters and check which one 
is active :-)

did you check if the failover actually happened in other ways (i don’t know 
what the znode should say)? you can try sending a spark job and if you used the 
right master config in your code, it should go to the new master. that will 
confirm that failover worked.



Mohit Jaggi
Founder,
Data Orchard LLC
www.dataorchardllc.com




> On Aug 19, 2016, at 8:56 PM, Charles Nnamdi Akalugwu <cprenzb...@gmail.com> 
> wrote:
> 
> I am experiencing this exact issue. Does anyone know what's going on with the 
> zookeeper setup?
> 
> On Jul 5, 2016 10:34 AM, "Akmal Abbasov" <akmal.abba...@icloud.com 
> <mailto:akmal.abba...@icloud.com>> wrote:
> >
> > Hi, 
> > I'm trying to understand how Spark HA works. I'm using Spark 1.6.1 and 
> > Zookeeper 3.4.6.
> > I've add the following line to $SPARK_HOME/conf/spark-env.sh
> > export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER 
> > -Dspark.deploy.zookeeper.url=zk1:2181,zk2:2181,zk3:2181 
> > -Dspark.deploy.zookeeper.dir=/spark
> > It's working so far.
> > I'd like to setup a link which will always go to active master UI(I'm using 
> > Spark in Standalone).  
> > I've checked the znode /spark, and it contains 
> > [leader_election, master_status]
> > I'm assuming that master_status znode will contain ip address of the 
> > current active master, is it true? Because in my case this znode isn't 
> > updated after failover.
> > And how /spark/leader_election works, because it doesn't contain any data.
> > Thank you.
> >
> > Regards,
> > Akmal
> >
> >
> 



Re: Using spark to distribute jobs to standalone servers

2016-08-23 Thread Mohit Jaggi
It is a bit hacky but possible. A lot depends on what kind of queries etc you 
want to run. You could write a data source that reads your data and keeps it 
partitioned the way you want, then use mapPartitions() to execute your code…


Mohit Jaggi
Founder,
Data Orchard LLC
www.dataorchardllc.com




> On Aug 22, 2016, at 7:59 AM, Larry White <ljw1...@gmail.com> wrote:
> 
> Hi,
> 
> I have a bit of an unusual use-case and would greatly appreciate some 
> feedback as to whether it is a good fit for spark.
> 
> I have a network of compute/data servers configured as a tree as shown below
> controller
> server 1
> server 2
> server 3
> etc.
> There are ~20 servers, but the number is increasing to ~100. 
> 
> Each server contains a different dataset, all in the same format. Each is 
> hosted by a different organization, and the data on every individual server 
> is unique to that organization.
> 
> Data cannot be replicated across servers using RDDs or any other means, for 
> privacy/ownership reasons.
> 
> Data cannot be retrieved to the controller, except in aggregate form, as the 
> result of a query, for example. 
> 
> Because of this, there are currently no operations that treats the data as if 
> it were a single data set: We could run a classifier on each site 
> individually, but cannot for legal reasons, pull all the data into a single 
> physical dataframe to run the classifier on all of it together. 
> 
> The servers are located across a wide geographic region (1,000s of miles)
> 
> We would like to send jobs from the controller to be executed in parallel on 
> all the servers, and retrieve the results to the controller. The jobs would 
> consist of SQL-Heavy Java code for 'production' queries, and python or R code 
> for ad-hoc queries and predictive modeling. 
> 
> Spark seems to have the capability to meet many of the individual 
> requirements, but is it a reasonable platform overall for building this 
> application?
> 
> Thank you very much for your assistance. 
> 
> Larry 
>  



Re: Spark with Parquet

2016-08-23 Thread Mohit Jaggi
something like this should work….

val df = sparkSession.read.csv(“myfile.csv”) //you may have to provide a schema 
if the guessed schema is not accurate
df.write.parquet(“myfile.parquet”)


Mohit Jaggi
Founder,
Data Orchard LLC
www.dataorchardllc.com




> On Apr 27, 2014, at 11:41 PM, Sai Prasanna <ansaiprasa...@gmail.com> wrote:
> 
> Hi All,
> 
> I want to store a csv-text file in Parquet format in HDFS and then do some 
> processing in Spark.
> 
> Somehow my search to find the way to do was futile. More help was available 
> for parquet with impala. 
> 
> Any guidance here? Thanks !!
> 



Re: SAS_TO_SPARK_SQL_(Could be a Bug?)

2016-06-12 Thread Mohit Jaggi
Looks like a bug in the code generating the SQL query…why would it be specific 
to SAS, I can’t guess. Did you try the same with another database? As a 
workaround you can write the select statement yourself instead of just 
providing the table name.

> On Jun 11, 2016, at 6:27 PM, Ajay Chander  wrote:
> 
> I tried implementing the same functionality through Scala as well. But no 
> luck so far. Just wondering if anyone here tried using Spark SQL to read SAS 
> dataset? Thank you
> 
> Regards,
> Ajay
> 
> On Friday, June 10, 2016, Ajay Chander  > wrote:
> Mich, I completely agree with you. I built another Spark SQL application 
> which reads data from MySQL and SQL server and writes the data into 
> Hive(parquet+snappy format). I have this problem only when I read directly 
> from remote SAS system. The interesting part is I am using same driver to 
> read data through pure Java app and spark app. It works fine in Java app, so 
> I cannot blame SAS driver here. Trying to understand where the problem could 
> be. Thanks for sharing this with me.
> 
> On Friday, June 10, 2016, Mich Talebzadeh  > wrote:
> I personally use Scala to do something similar. For example here I extract 
> data from an Oracle table and store in ORC table in Hive. This is compiled 
> via sbt as run with SparkSubmit.
> 
> It is similar to your code but in Scala. Note that I do not enclose my column 
> names in double quotes.  
> 
> import org.apache.spark.SparkContext
> import org.apache.spark.SparkConf
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.hive.HiveContext
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.SQLContext
> import org.apache.spark.sql.functions._
> 
> object ETL_scratchpad_dummy {
>   def main(args: Array[String]) {
>   val conf = new SparkConf().
>setAppName("ETL_scratchpad_dummy").
>set("spark.driver.allowMultipleContexts", "true")
>   val sc = new SparkContext(conf)
>   // Create sqlContext based on HiveContext
>   val sqlContext = new HiveContext(sc)
>   import sqlContext.implicits._
>   val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>   println ("\nStarted at"); sqlContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss') 
> ").collect.foreach(println)
>   HiveContext.sql("use oraclehadoop")
>   var _ORACLEserver : String = "jdbc:oracle:thin:@rhes564:1521:mydb12"
>   var _username : String = "scratchpad"
>   var _password : String = ""
> 
>   // Get data from Oracle table scratchpad.dummy
>   val d = HiveContext.load("jdbc",
>   Map("url" -> _ORACLEserver,
>   "dbtable" -> "(SELECT to_char(ID) AS ID, to_char(CLUSTERED) AS CLUSTERED, 
> to_char(SCATTERED) AS SCATTERED, to_char(RANDOMISED) AS RANDOMISED, 
> RANDOM_STRING, SMALL_VC, PADDING FROM scratchpad.dummy)",
>   "user" -> _username,
>   "password" -> _password))
>  
>d.registerTempTable("tmp")
>   //
>   // Need to create and populate target ORC table oraclehadoop.dummy
>   //
>   HiveContext.sql("use oraclehadoop")
>   //
>   // Drop and create table dummy
>   //
>   HiveContext.sql("DROP TABLE IF EXISTS oraclehadoop.dummy")
>   var sqltext : String = ""
>   sqltext = """
>   CREATE TABLE oraclehadoop.dummy (
>  ID INT
>, CLUSTERED INT
>, SCATTERED INT
>, RANDOMISED INT
>, RANDOM_STRING VARCHAR(50)
>, SMALL_VC VARCHAR(10)
>, PADDING  VARCHAR(10)
>   )
>   CLUSTERED BY (ID) INTO 256 BUCKETS
>   STORED AS ORC
>   TBLPROPERTIES (
>   "orc.create.index"="true",
>   "orc.bloom.filter.columns"="ID",
>   "orc.bloom.filter.fpp"="0.05",
>   "orc.compress"="SNAPPY",
>   "orc.stripe.size"="16777216",
>   "orc.row.index.stride"="1" )
>   """
>HiveContext.sql(sqltext)
>   //
>   // Put data in Hive table. Clean up is already done
>   //
>   sqltext = """
>   INSERT INTO TABLE oraclehadoop.dummy
>   SELECT
>   ID
> , CLUSTERED
> , SCATTERED
> , RANDOMISED
> , RANDOM_STRING
> , SMALL_VC
> , PADDING
>   FROM tmp
>   """
>HiveContext.sql(sqltext)
>   println ("\nFinished at"); sqlContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss') 
> ").collect.foreach(println)
>   sys.exit()
>  }
> }
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> 
>  
> http://talebzadehmich.wordpress.com 
>  
> 
> On 10 June 2016 at 23:38, Ajay Chander > wrote:
> Hi Mich,
> 
> Thanks for the response. If you look at my programs, I am not writings my 
> queries to include column names in a pair of "". My driver in spark program 
> is generating such query with column names in "" which 

Re: Calling Python code from Scala

2016-04-18 Thread Mohit Jaggi
When faced with this issue I followed the approach taken by pyspark and used 
py4j. You have to:
- ensure your code is Java compatible
- use py4j to call the java (scala) code from python


> On Apr 18, 2016, at 10:29 AM, Holden Karau  wrote:
> 
> So if there is just a few python functions your interested in accessing you 
> can also use the pipe interface (you'll have to manually serialize your data 
> on both ends in ways that Python and Scala can respectively parse) - but its 
> a very generic approach and can work with many different languages.
> 
> On Mon, Apr 18, 2016 at 10:23 AM, Ndjido Ardo BAR  > wrote:
> Hi Didier,
> 
> I think with PySpark you can wrap your legacy Python functions into UDFs and 
> use it in your DataFrames. But you have to use DataFrames instead of RDD. 
> 
> cheers,
> Ardo
> 
> On Mon, Apr 18, 2016 at 7:13 PM, didmar  > wrote:
> Hi,
> 
> I have a Spark project in Scala and I would like to call some Python
> functions from within the program.
> Both parts are quite big, so re-coding everything in one language is not
> really an option.
> 
> The workflow would be:
> - Creating a RDD with Scala code
> - Mapping a Python function over this RDD
> - Using the result directly in Scala
> 
> I've read about PySpark internals, but that didn't help much.
> Is it possible to do so, and preferably in an efficent manner ?
> 
> Cheers,
> Didier
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Calling-Python-code-from-Scala-tp26798.html
>  
> 
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 
> 
> 
> 
> 
> -- 
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau 


spark dataframe gc

2015-07-23 Thread Mohit Jaggi
Hi There,
I am testing Spark DataFrame and havn't been able to get my code to finish
due to what I suspect are GC issues. My guess is that GC interferes with
heartbeating and executors are detected as failed. The data is ~50 numeric
columns, ~100million rows in a CSV file.
We are doing a groupBy using one of the columns and trying to calculate the
average of each of the other columns. The groupBy key has about 250k unique
values.
It seems that Spark is creating a lot of temp objects (see jmap output
below) while calculating the average which I am surprised to see. Why
doesn't it use the same temp variable? Am I missing something? Do I need to
specify a config flag to enable code generation and not do this?


Mohit.

[x app-20150723142604-0002]$ jmap -histo 12209


 num #instances #bytes  class name

--

   1: 258615458 8275694656  scala.collection.immutable.$colon$colon

   2: 103435856 7447381632
org.apache.spark.sql.catalyst.expressions.Cast

   3: 103435856 4964921088
org.apache.spark.sql.catalyst.expressions.Coalesce

   4:   1158643 4257400112  [B

   5:  51717929 4137434320
org.apache.spark.sql.catalyst.expressions.SumFunction

   6:  51717928 3723690816
org.apache.spark.sql.catalyst.expressions.Add

   7:  51717929 2896204024
org.apache.spark.sql.catalyst.expressions.CountFunction

   8:  51717928 2896203968
org.apache.spark.sql.catalyst.expressions.MutableLiteral

   9:  51717928 2482460544
org.apache.spark.sql.catalyst.expressions.Literal

  10:  51803728 1243289472  java.lang.Double

  11:  51717755 1241226120
org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToDouble$5

  12:975810  850906320
[Lorg.apache.spark.sql.catalyst.expressions.AggregateFunction;

  13:  51717754  827484064
org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$org$apache$spark$sql$catalyst$expressions$Cast$$cast$1

  14:982451   47157648  java.util.HashMap$Entry

  15:981132   34981720  [Ljava.lang.Object;

  16:   1049984   25199616  org.apache.spark.sql.types.UTF8String

  17:978296   23479104
org.apache.spark.sql.catalyst.expressions.GenericRow

  18:117166   15944560  methodKlass

  19:117166   14986224  constMethodKlass

  20:  1567   12891952  [Ljava.util.HashMap$Entry;

  21:  9103   10249728  constantPoolKlass

  22:  91039278592  instanceKlassKlass

  23:  50725691320  [I

  24:  72815335040  constantPoolCacheKlass

  25: 464204769600  [C

  26:1059843391488
io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry


Re: Grouping runs of elements in a RDD

2015-07-02 Thread Mohit Jaggi
if you are joining successive lines together based on a predicate, then you
are doing a flatMap not an aggregate. you are on the right track with a
multi-pass solution. i had the same challenge when i needed a sliding
window over an RDD(see below).

[ i had suggested that the sliding window API be moved to spark-core. not
sure if that happened ]

- previous posts ---

http://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.mllib.rdd.RDDFunctions

 On Fri, Jan 30, 2015 at 12:27 AM, Mohit Jaggi mohitja...@gmail.com
 wrote:


 http://mail-archives.apache.org/mod_mbox/spark-user/201405.mbox/%3ccalrvtpkn65rolzbetc+ddk4o+yjm+tfaf5dz8eucpl-2yhy...@mail.gmail.com%3E

 you can use the MLLib function or do the following (which is what I had
 done):

 - in first pass over the data, using mapPartitionWithIndex, gather the
 first item in each partition. you can use collect (or aggregator) for this.
 “key” them by the partition index. at the end, you will have a map
(partition index) -- first item
 - in the second pass over the data, using mapPartitionWithIndex again,
 look at two (or in the general case N items at a time, you can use scala’s
 sliding iterator) items at a time and check the time difference(or any
 sliding window computation). To this mapParitition, pass the map created in
 previous step. You will need to use them to check the last item in this
 partition.

 If you can tolerate a few inaccuracies then you can just do the second
 step. You will miss the “boundaries” of the partitions but it might be
 acceptable for your use case.


On Tue, Jun 30, 2015 at 12:21 PM, RJ Nowling rnowl...@gmail.com wrote:

 That's an interesting idea!  I hadn't considered that.  However, looking
 at the Partitioner interface, I would need to know from looking at a single
 key which doesn't fit my case, unfortunately.  For my case, I need to
 compare successive pairs of keys.  (I'm trying to re-join lines that were
 split prematurely.)

 On Tue, Jun 30, 2015 at 2:07 PM, Abhishek R. Singh 
 abhis...@tetrationanalytics.com wrote:

 could you use a custom partitioner to preserve boundaries such that all
 related tuples end up on the same partition?

 On Jun 30, 2015, at 12:00 PM, RJ Nowling rnowl...@gmail.com wrote:

 Thanks, Reynold.  I still need to handle incomplete groups that fall
 between partition boundaries. So, I need a two-pass approach. I came up
 with a somewhat hacky way to handle those using the partition indices and
 key-value pairs as a second pass after the first.

 OCaml's std library provides a function called group() that takes a break
 function that operators on pairs of successive elements.  It seems a
 similar approach could be used in Spark and would be more efficient than my
 approach with key-value pairs since you know the ordering of the partitions.

 Has this need been expressed by others?

 On Tue, Jun 30, 2015 at 1:03 PM, Reynold Xin r...@databricks.com wrote:

 Try mapPartitions, which gives you an iterator, and you can produce an
 iterator back.


 On Tue, Jun 30, 2015 at 11:01 AM, RJ Nowling rnowl...@gmail.com wrote:

 Hi all,

 I have a problem where I have a RDD of elements:

 Item1 Item2 Item3 Item4 Item5 Item6 ...

 and I want to run a function over them to decide which runs of elements
 to group together:

 [Item1 Item2] [Item3] [Item4 Item5 Item6] ...

 Technically, I could use aggregate to do this, but I would have to use
 a List of List of T which would produce a very large collection in memory.

 Is there an easy way to accomplish this?  e.g.,, it would be nice to
 have a version of aggregate where the combination function can return a
 complete group that is added to the new RDD and an incomplete group which
 is passed to the next call of the reduce function.

 Thanks,
 RJ








Re: Spark SQL v MemSQL/Voltdb

2015-05-28 Thread Mohit Jaggi
I have used VoltDB and Spark. The use cases for the two are quite different. 
VoltDB is intended for transactions and also supports queries on the 
same(custom to voltdb) store. Spark(SQL) is NOT suitable for transactions; it 
is designed for querying immutable data (which may exist in several different 
forms of stores).

 On May 28, 2015, at 7:48 AM, Ashish Mukherjee ashish.mukher...@gmail.com 
 wrote:
 
 Hello,
 
 I was wondering if there is any documented comparison of SparkSQL with 
 MemSQL/VoltDB kind of in-memory SQL databases. MemSQL etc. too allow queries 
 to be run in a clustered environment. What is  the major differentiation?
 
 Regards,
 Ashish


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Parsing CSV files in Spark

2015-02-06 Thread Mohit Jaggi
As Sean said, this is just a few lines of code. You can see an example here:

https://github.com/AyasdiOpenSource/bigdf/blob/master/src/main/scala/com/ayasdi/bigdf/DF.scala#L660
 
https://github.com/AyasdiOpenSource/bigdf/blob/master/src/main/scala/com/ayasdi/bigdf/DF.scala#L660


 On Feb 6, 2015, at 7:29 AM, Charles Feduke charles.fed...@gmail.com wrote:
 
 I've been doing a bunch of work with CSVs in Spark, mostly saving them as a 
 merged CSV (instead of the various part-n files). You might find the 
 following links useful:
 
 - This article is about combining the part files and outputting a header as 
 the first line in the merged results:
 
 http://java.dzone.com/articles/spark-write-csv-file-header 
 http://java.dzone.com/articles/spark-write-csv-file-header
 
 - This was my take on the previous author's original article, but it doesn't 
 yet handle the header row:
 
 http://deploymentzone.com/2015/01/30/spark-and-merged-csv-files/ 
 http://deploymentzone.com/2015/01/30/spark-and-merged-csv-files/
 
 spark-csv helps with reading CSV data and mapping a schema for Spark SQL, but 
 as of now doesn't save CSV data.
 
 On Fri Feb 06 2015 at 9:49:06 AM Sean Owen so...@cloudera.com 
 mailto:so...@cloudera.com wrote:
 You can do this manually without much trouble: get your files on a
 distributed store like HDFS, read them with textFile, filter out
 headers, parse with a CSV library like Commons CSV, select columns,
 format and store the result. That's tens of lines of code.
 
 However you probably want to start by looking at
 https://github.com/databricks/spark-csv 
 https://github.com/databricks/spark-csv which may make it even easier
 than that and give you a richer query syntax.
 
 On Fri, Feb 6, 2015 at 8:37 AM, Spico Florin spicoflo...@gmail.com 
 mailto:spicoflo...@gmail.com wrote:
  Hi!
I'm new to Spark. I have a case study that where the data is store in CSV
  files. These files have headers with morte than 1000 columns. I would like
  to know what are the best practice to parsing them and in special the
  following points:
  1. Getting and parsing all the files from a folder
  2. What CSV parser do you use?
  3. I would like to select just some columns whose names matches a pattern
  and then pass the selected columns values (plus the column names) to the
  processing and save the output to a CSV (preserving the selected columns).
 
  If you have any experience with some points above, it will be really helpful
  (for me and for the others that will encounter the same cases) if you can
  share your thoughts.
  Thanks.
Regards,
   Florin
 
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
 mailto:user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org 
 mailto:user-h...@spark.apache.org
 



Re: spark with cdh 5.2.1

2015-02-04 Thread Mohit Jaggi
yep...it was unnecessary to create a 2.5 profile. i struggled a bit because
it wasn't clear that i *need* to select a profile using -P option. i didn't
have to do that for earlier hadoop versions.

On Fri, Jan 30, 2015 at 12:11 AM, Sean Owen so...@cloudera.com wrote:

 There is no need for a 2.5 profile. The hadoop-2.4 profile is for
 Hadoop 2.4 and beyond. You can set the particular version you want
 with -Dhadoop.version=

 You do not need to make any new profile to compile vs 2.5.0-cdh5.2.1.
 Again, the hadoop-2.4 profile is what you need.

 On Thu, Jan 29, 2015 at 11:33 PM, Mohit Jaggi mohitja...@gmail.com
 wrote:
  Hi All,
  I noticed in pom.xml that there is no entry for Hadoop 2.5. Has anyone
 tried Spark with 2.5.0-cdh5.2.1? Will replicating the 2.4 entry be
 sufficient to make this work?
 
  Mohit.
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 



Re: RDD.combineBy without intermediate (k,v) pair allocation

2015-01-29 Thread Mohit Jaggi
Francois,
RDD.aggregate() does not support aggregation by key. But, indeed, that is the 
kind of implementation I am looking for, one that does not allocate 
intermediate space for storing (K,V) pairs. When working with large datasets 
this type of intermediate memory allocation wrecks havoc with garbage 
collection, not to mention unnecessarily increases the working memory 
requirement of the program.

I wonder if someone has already noticed this and there is an effort underway to 
optimize this. If not, I will take a shot at adding this functionality.

Mohit.

 On Jan 27, 2015, at 1:52 PM, francois.garil...@typesafe.com wrote:
 
 Have you looked at the `aggregate` function in the RDD API ? 
 
 If your way of extracting the “key” (identifier) and “value” (payload) parts 
 of the RDD elements is uniform (a function), it’s unclear to me how this 
 would be more efficient that extracting key and value and then using combine, 
 however.
 
 —
 FG
 
 
 On Tue, Jan 27, 2015 at 10:17 PM, Mohit Jaggi mohitja...@gmail.com 
 mailto:mohitja...@gmail.com wrote:
 
 Hi All, 
 I have a use case where I have an RDD (not a k,v pair) where I want to do a 
 combineByKey() operation. I can do that by creating an intermediate RDD of 
 k,v pairs and using PairRDDFunctions.combineByKey(). However, I believe it 
 will be more efficient if I can avoid this intermediate RDD. Is there a way I 
 can do this by passing in a function that extracts the key, like in 
 RDD.groupBy()? [oops, RDD.groupBy seems to create the intermediate RDD 
 anyway, maybe a better implementation is possible for that too?] 
 If not, is it worth adding to the Spark API? 
 
 Mohit. 
 - 
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
 For additional commands, e-mail: user-h...@spark.apache.org 
 
 
 



spark with cdh 5.2.1

2015-01-29 Thread Mohit Jaggi
Hi All,
I noticed in pom.xml that there is no entry for Hadoop 2.5. Has anyone tried 
Spark with 2.5.0-cdh5.2.1? Will replicating the 2.4 entry be sufficient to make 
this work?

Mohit.
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark challenge: zip with next???

2015-01-29 Thread Mohit Jaggi
http://mail-archives.apache.org/mod_mbox/spark-user/201405.mbox/%3ccalrvtpkn65rolzbetc+ddk4o+yjm+tfaf5dz8eucpl-2yhy...@mail.gmail.com%3E
 
http://mail-archives.apache.org/mod_mbox/spark-user/201405.mbox/%3ccalrvtpkn65rolzbetc+ddk4o+yjm+tfaf5dz8eucpl-2yhy...@mail.gmail.com%3E

you can use the MLLib function or do the following (which is what I had done):

- in first pass over the data, using mapPartitionWithIndex, gather the first 
item in each partition. you can use collect (or aggregator) for this. “key” 
them by the partition index. at the end, you will have a map
   (partition index) -- first item
- in the second pass over the data, using mapPartitionWithIndex again, look at 
two (or in the general case N items at a time, you can use scala’s sliding 
iterator) items at a time and check the time difference(or any sliding window 
computation). To this mapParitition, pass the map created in previous step. You 
will need to use them to check the last item in this partition.

If you can tolerate a few inaccuracies then you can just do the second step. 
You will miss the “boundaries” of the partitions but it might be acceptable for 
your use case.



 On Jan 29, 2015, at 4:36 PM, Tobias Pfeiffer t...@preferred.jp wrote:
 
 Hi,
 
 On Fri, Jan 30, 2015 at 6:32 AM, Ganelin, Ilya ilya.gane...@capitalone.com 
 mailto:ilya.gane...@capitalone.com wrote:
 Make a copy of your RDD with an extra entry in the beginning to offset. The 
 you can zip the two RDDs and run a map to generate an RDD of differences.
 
 Does that work? I recently tried something to compute differences between 
 each entry and the next, so I did
   val rdd1 = ... // null element + rdd
   val rdd2 = ... // rdd + null element
 but got an error message about zip requiring data sizes in each partition to 
 match.
 
 Tobias
 



RDD.combineBy

2015-01-27 Thread Mohit Jaggi
Hi All,
I have a use case where I have an RDD (not a k,v pair) where I want to do a 
combineByKey() operation. I can do that by creating an intermediate RDD of k,v 
pairs and using PairRDDFunctions.combineByKey(). However, I believe it will be 
more efficient if I can avoid this intermediate RDD. Is there a way I can do 
this by passing in a function that extracts the key, like in RDD.groupBy()? 
[oops, RDD.groupBy seems to create the intermediate RDD anyway, maybe a better 
implementation is possible for that too?]
If not, is it worth adding to the Spark API?

Mohit.
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RDD Moving Average

2015-01-09 Thread Mohit Jaggi
Read this:
http://mail-archives.apache.org/mod_mbox/spark-user/201405.mbox/%3ccalrvtpkn65rolzbetc+ddk4o+yjm+tfaf5dz8eucpl-2yhy...@mail.gmail.com%3E
 
http://mail-archives.apache.org/mod_mbox/spark-user/201405.mbox/%3ccalrvtpkn65rolzbetc+ddk4o+yjm+tfaf5dz8eucpl-2yhy...@mail.gmail.com%3E





Re: Modifying an RDD in forEach

2014-12-06 Thread Mohit Jaggi
Ron,
“appears to be working” might be true when there are no failures. on large 
datasets being processed on a large number of machines, failures of several 
types(server, network, disk etc) can happen. At that time, Spark will not 
“know” that you changed the RDD in-place and will use any version of any 
partition of the RDD to be retried. Retries require idempotency and that is 
difficult without immutability. I believe, this is one of the primary reasons 
for making RDDs immutable in Spark (mutable isn't even an option worth 
considering). In general mutating something in a distributed system is a hard 
problem. It can be solved (e.g. in NoSQL or newSQL databases) but Spark is not 
a transactional data store.

If you are building an iterative machine learning algorithm which usually have 
a “reduce” step at the end of every iteration, then the lazy evaluation is 
unlikely to be useful. On the other hand, if these intermediate RDDs stay in 
the young generation of the JVM heap [I am not sure if RDD cache management 
somehow changes this, so I could be wrong] they are garbage collected quickly 
and with very little overhead.

This is the price of scaling out :-)

Hope this helps,
Mohit.

 On Dec 6, 2014, at 5:02 AM, Mayur Rustagi mayur.rust...@gmail.com wrote:
 
 You'll benefit by viewing Matei's talk in Yahoo on Spark internals and how it 
 optimizes execution of iterative jobs.
 Simple answer is 
 1. Spark doesn't materialize RDD when you do an iteration but lazily captures 
 the transformation functions in RDD.(only function and closure , no data 
 operation actually happens)
 2. When you finally execute and want to cause effects (save to disk , collect 
 on master etc) it views the DAG of execution and optimizes what it can reason 
 (eliminating intermediate states , performing multiple Transformations in one 
 tasks, leveraging partitioning where available among others)
 Bottom line it doesn't matter how many RDD you have in your DAG chain as long 
 as Spark can optimize the functions in that DAG to create minimal 
 materialization on its way to final output. 
 
 Regards
 Mayur
 On 06-Dec-2014 6:12 pm, Ron Ayoub ronalday...@live.com 
 mailto:ronalday...@live.com wrote:
 This is from a separate thread with a differently named title. 
 
 Why can't you modify the actual contents of an RDD using forEach? It appears 
 to be working for me. What I'm doing is changing cluster assignments and 
 distances per data item for each iteration of the clustering algorithm. The 
 clustering algorithm is massive and iterates thousands of times. As I 
 understand it now, you are supposed to create new RDDs on each pass. This is 
 a hierachical k-means that I'm doing and hence it is consist of many 
 iterations rather than large iterations.
 
 So I understand the restriction of why operation when aggregating and 
 reducing etc, need to be associative. However, forEach operates on a single 
 item. So being that Spark is advertised as being great for iterative 
 algorithms since it operates in-memory, how can it be good to create 
 thousands upon thousands of RDDs during the course of an iterative algorithm? 
  Does Spark have some kind of trick like reuse behind the scenes - fully 
 persistent data objects or whatever? How can it possibly be efficient for 
 'iterative' algorithms when it is creating so many RDDs as opposed to one? 
 
 Or is the answer that I should keep doing what I'm doing because it is 
 working even though it is not theoretically sound and aligned with functional 
 ideas. I personally just want it to be fast and be able to operate on up to 
 500 million data items. 



Re: Bug in Accumulators...

2014-11-22 Thread Mohit Jaggi
perhaps the closure ends up including the main object which is not
defined as serializable...try making it a case object or object main
extends Serializable.

On Sat, Nov 22, 2014 at 4:16 PM, lordjoe lordjoe2...@gmail.com wrote:

 I posted several examples in java at http://lordjoesoftware.blogspot.com/

 Generally code like this works and I show how to accumulate more complex
 values.

 // Make two accumulators using Statistics
  final AccumulatorInteger totalLetters= ctx.accumulator(0L,
 ttl);
  JavaRDDstring lines = ...

 JavaRDDstring words = lines.flatMap(new FlatMapFunctionString,
 String() {
 @Override
 public Iterablestring call(final String s) throws Exception {
 // Handle accumulator here
 totalLetters.add(s.length()); // count all letters
 
  });
 
  Long numberCalls = totalCounts.value();

 I believe the mistake is to pass the accumulator to the function rather
 than
 letting the function find the accumulator - I do this in this case by using
 a final local variable



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Bug-in-Accumulators-tp17263p19579.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




MEMORY_ONLY_SER question

2014-11-04 Thread Mohit Jaggi
Folks,
If I have an RDD persisted in MEMORY_ONLY_SER mode and then it is needed
for a transformation/action later, is the whole partition of the RDD
deserialized into Java objects first before my transform/action code works
on it? Or is it deserialized in a streaming manner as the iterator moves
over the partition? Is this behavior customizable? I generally use the Kryo
serializer.

Mohit.


Re: how to run a dev spark project without fully rebuilding the fat jar ?

2014-10-22 Thread Mohit Jaggi
i think you can give a list of jars - not just one - to spark-submit, so
build only the one that has changed source code.

On Wed, Oct 22, 2014 at 10:29 PM, Yang tedd...@gmail.com wrote:

 during tests, I often modify my code a little bit  and want to see the
 result.
 but spark-submit requires the full fat-jar, which takes quite a lot of
 time to build.

 I just need to run in --master local mode. is there a way to run it
 without rebuilding the fat jar?

 thanks
 Yang



Re: scala 2.11?

2014-09-16 Thread Mohit Jaggi
Can I load that plugin in spark-shell? Or perhaps due the 2-phase
compilation quasiquotes won't work in shell?

On Mon, Sep 15, 2014 at 7:15 PM, Mark Hamstra m...@clearstorydata.com
wrote:

 Okay, that's consistent with what I was expecting.  Thanks, Matei.

 On Mon, Sep 15, 2014 at 5:20 PM, Matei Zaharia matei.zaha...@gmail.com
 wrote:

 I think the current plan is to put it in 1.2.0, so that's what I meant by
 soon. It might be possible to backport it too, but I'd be hesitant to do
 that as a maintenance release on 1.1.x and 1.0.x since it would require
 nontrivial changes to the build that could break things on Scala 2.10.

 Matei

 On September 15, 2014 at 12:19:04 PM, Mark Hamstra (
 m...@clearstorydata.com) wrote:

 Are we going to put 2.11 support into 1.1 or 1.0?  Else will be in
 soon applies to the master development branch, but actually in the Spark
 1.2.0 release won't occur until the second half of November at the earliest.

 On Mon, Sep 15, 2014 at 12:11 PM, Matei Zaharia matei.zaha...@gmail.com
 wrote:

  Scala 2.11 work is under way in open pull requests though, so
 hopefully it will be in soon.

  Matei

 On September 15, 2014 at 9:48:42 AM, Mohit Jaggi (mohitja...@gmail.com)
 wrote:

  ah...thanks!

 On Mon, Sep 15, 2014 at 9:47 AM, Mark Hamstra m...@clearstorydata.com
 wrote:

 No, not yet.  Spark SQL is using org.scalamacros:quasiquotes_2.10.

 On Mon, Sep 15, 2014 at 9:28 AM, Mohit Jaggi mohitja...@gmail.com
 wrote:

 Folks,
 I understand Spark SQL uses quasiquotes. Does that mean Spark has now
 moved to Scala 2.11?

 Mohit.








scala 2.11?

2014-09-15 Thread Mohit Jaggi
Folks,
I understand Spark SQL uses quasiquotes. Does that mean Spark has now moved
to Scala 2.11?

Mohit.


Re: File I/O in spark

2014-09-15 Thread Mohit Jaggi
Is this code running in an executor? You need to make sure the file is
accessible on ALL executors. One way to do that is to use a distributed
filesystem like HDFS or GlusterFS.

On Mon, Sep 15, 2014 at 8:51 AM, rapelly kartheek kartheek.m...@gmail.com
wrote:

 Hi

 I am trying to perform some read/write file operations in spark. Somehow I
 am neither able to write to a file nor read.

 import java.io._

   val writer = new PrintWriter(new File(test.txt ))

   writer.write(Hello Scala)


 Can someone please tell me how to perform file I/O in spark.




Re: File I/O in spark

2014-09-15 Thread Mohit Jaggi
But the above APIs are not for HDFS.

On Mon, Sep 15, 2014 at 9:40 AM, rapelly kartheek kartheek.m...@gmail.com
wrote:

 Yes. I have HDFS. My cluster has 5 nodes. When I run the above commands, I
 see that the file gets created in the master node. But, there wont be any
 data written to it.


 On Mon, Sep 15, 2014 at 10:06 PM, Mohit Jaggi mohitja...@gmail.com
 wrote:

 Is this code running in an executor? You need to make sure the file is
 accessible on ALL executors. One way to do that is to use a distributed
 filesystem like HDFS or GlusterFS.

 On Mon, Sep 15, 2014 at 8:51 AM, rapelly kartheek 
 kartheek.m...@gmail.com wrote:

 Hi

 I am trying to perform some read/write file operations in spark. Somehow
 I am neither able to write to a file nor read.

 import java.io._

   val writer = new PrintWriter(new File(test.txt ))

   writer.write(Hello Scala)


 Can someone please tell me how to perform file I/O in spark.






Re: scala 2.11?

2014-09-15 Thread Mohit Jaggi
ah...thanks!

On Mon, Sep 15, 2014 at 9:47 AM, Mark Hamstra m...@clearstorydata.com
wrote:

 No, not yet.  Spark SQL is using org.scalamacros:quasiquotes_2.10.

 On Mon, Sep 15, 2014 at 9:28 AM, Mohit Jaggi mohitja...@gmail.com wrote:

 Folks,
 I understand Spark SQL uses quasiquotes. Does that mean Spark has now
 moved to Scala 2.11?

 Mohit.





Re: File I/O in spark

2014-09-15 Thread Mohit Jaggi
If you underlying filesystem is HDFS, you need to use HDFS APIs. A google
search brought up this link which appears reasonable.

http://wiki.apache.org/hadoop/HadoopDfsReadWriteExample

If you want to use java.io APIs, you have to make sure your filesystem is
accessible from all nodes in your cluster. You did not mention what errors
you get with your code. They may mean something.


On Mon, Sep 15, 2014 at 9:51 AM, rapelly kartheek kartheek.m...@gmail.com
wrote:

 Can you please direct me to the right way of doing this.

 On Mon, Sep 15, 2014 at 10:18 PM, rapelly kartheek 
 kartheek.m...@gmail.com wrote:

 I came across these APIs in one the scala tutorials over the net.

 On Mon, Sep 15, 2014 at 10:14 PM, Mohit Jaggi mohitja...@gmail.com
 wrote:

 But the above APIs are not for HDFS.

 On Mon, Sep 15, 2014 at 9:40 AM, rapelly kartheek 
 kartheek.m...@gmail.com wrote:

 Yes. I have HDFS. My cluster has 5 nodes. When I run the above
 commands, I see that the file gets created in the master node. But, there
 wont be any data written to it.


 On Mon, Sep 15, 2014 at 10:06 PM, Mohit Jaggi mohitja...@gmail.com
 wrote:

 Is this code running in an executor? You need to make sure the file is
 accessible on ALL executors. One way to do that is to use a distributed
 filesystem like HDFS or GlusterFS.

 On Mon, Sep 15, 2014 at 8:51 AM, rapelly kartheek 
 kartheek.m...@gmail.com wrote:

 Hi

 I am trying to perform some read/write file operations in spark.
 Somehow I am neither able to write to a file nor read.

 import java.io._

   val writer = new PrintWriter(new File(test.txt ))

   writer.write(Hello Scala)


 Can someone please tell me how to perform file I/O in spark.









Re: sc.textFile problem due to newlines within a CSV record

2014-09-13 Thread Mohit Jaggi
Thanks Xiangrui. This file already exists w/o escapes. I could probably try
to preprocess it and add the escaping.

On Fri, Sep 12, 2014 at 9:38 PM, Xiangrui Meng men...@gmail.com wrote:

 I wrote an input format for Redshift's tables unloaded UNLOAD the
 ESCAPE option: https://github.com/mengxr/redshift-input-format , which
 can recognize multi-line records.

 Redshift puts a backslash before any in-record `\\`, `\r`, `\n`, and
 the delimiter character. You can apply the same escaping before
 calling saveAsTextFIle, then use the input format to load them back.

 Xiangrui

 On Fri, Sep 12, 2014 at 7:43 PM, Mohit Jaggi mohitja...@gmail.com wrote:
  Folks,
  I think this might be due to the default TextInputFormat in Hadoop. Any
  pointers to solutions much appreciated.
 
  More powerfully, you can define your own InputFormat implementations to
  format the input to your programs however you want. For example, the
 default
  TextInputFormat reads lines of text files. The key it emits for each
 record
  is the byte offset of the line read (as a LongWritable), and the value is
  the contents of the line up to the terminating '\n' character (as a Text
  object). If you have multi-line records each separated by a $character,
 you
  could write your own InputFormat that parses files into records split on
  this character instead.
 
 
  Thanks,
  Mohit



slides from df talk at global big data conference

2014-09-12 Thread Mohit Jaggi
http://engineering.ayasdi.com/2014/09/11/df-dataframes-on-spark/


Re: efficient zipping of lots of RDDs

2014-09-11 Thread Mohit Jaggi
filed  jira SPARK-3489  https://issues.apache.org/jira/browse/SPARK-3489

On Thu, Sep 4, 2014 at 9:36 AM, Mohit Jaggi mohitja...@gmail.com wrote:

 Folks,
 I sent an email announcing
 https://github.com/AyasdiOpenSource/df

 This dataframe is basically a map of RDDs of columns(along with DSL
 sugar), as column based operations seem to be most common. But row
 operations are not uncommon. To get rows out of columns right now I zip the
 column RDDs together. I use RDD.zip then flatten the tuples I get. I
 realize that RDD.zipPartitions might be faster. However, I believe an even
 better approach should be possible. Surely we can have a zip method that
 can combine a large variable number of RDDs? Can that be added to
 Spark-core? Or is there an alternative equally good or better approach?

 Cheers,
 Mohit.



pandas-like dataframe in spark

2014-09-04 Thread Mohit Jaggi
Folks,
I have been working on a pandas-like dataframe DSL on top of spark. It is
written in Scala and can be used from spark-shell. The APIs have the look
and feel of pandas which is a wildly popular piece of software data
scientists use. The goal is to let people familiar with pandas scale their
efforts to larger datasets by using spark but not having to go through a
steep learning curve for Spark and Scala.
It is open sourced with Apache License and can be found here:
https://github.com/AyasdiOpenSource/df

I welcome your comments, suggestions and feedback. Any help in developing
it further is much appreciated. I have the following items on the roadmap
(and happy to change this based on your comments)
- Python wrappers most likely in the same way as MLLib
- Sliding window aggregations
- Row indexing
- Graphing/charting
- Efficient row-based operations
- Pretty printing of output on the spark-shell
- Unit test completeness and automated nightly runs

Mohit.

P.S.: Thanks to my awesome employer Ayasdi http://www.ayasdi.com for open
sourcing this software

P.P.S.: I need some design advice on making row operations efficient and
I'll start a new thread for that


efficient zipping of lots of RDDs

2014-09-04 Thread Mohit Jaggi
Folks,
I sent an email announcing
https://github.com/AyasdiOpenSource/df

This dataframe is basically a map of RDDs of columns(along with DSL sugar),
as column based operations seem to be most common. But row operations are
not uncommon. To get rows out of columns right now I zip the column RDDs
together. I use RDD.zip then flatten the tuples I get. I realize that
RDD.zipPartitions might be faster. However, I believe an even better
approach should be possible. Surely we can have a zip method that can
combine a large variable number of RDDs? Can that be added to Spark-core?
Or is there an alternative equally good or better approach?

Cheers,
Mohit.


Re: advice sought on spark/cassandra input development - scala or python?

2014-09-04 Thread Mohit Jaggi
Johnny,
Without knowing the domain of the problem it is hard to choose a
programming language. I would suggest you ask yourself the following
questions:
- What if your project depends on a lot of python libraries that don't have
Scala/Java counterparts? It is unlikely but possible.
- What if Python programmers are in good supply and Scala ones not as much?
- Do you need to rewrite a lot of code, is that feasible?
- Is the rest of your team willing to learn Scala?
- If you are processing streams in a long lived process, how does Python
perform?

Mohit.
P.S.: I end up choosing Scala more often than Python.


On Thu, Sep 4, 2014 at 8:03 AM, Johnny Kelsey jkkel...@semblent.com wrote:

 Hi guys,

 We're testing out a spark/cassandra cluster,  we're very impressed with
 what we've seen so far. However, I'd very much like some advice from the
 shiny brains on the mailing list.

 We have a large collection of python code that we're in the process of
 adapting to move into spark/cassandra,  I have some misgivings on using
 python for any further development.

 As a concrete example, we have a python class (part of a fairly large
 class library) which, as part of its constructor, also creates a record of
 itself in the cassandra key space. So we get an initialised class  a row
 in a table on the cluster. My problem is this: should we even be doing this?

 By this I mean, we could be facing an increasing number of transactions,
 which we (naturally) would like to process as quickly as possible. The
 input transactions themselves may well be routed to a number of processes,
 e.g. starting an agent, written to a log file, etc. So it seems wrong to be
 putting the 'INSERT ... INTO ...' code into the class instantiation: it
 would seem more sensible to split this into a bunch of different spark
 processes, with an input handler, database insertion, create new python
 object, update log file, all happening on the spark cluster,  all written
 as atomically as possible.

 But I think my reservations here are more fundamental. Is python the wrong
 choice for this sort of thing? Would it not be better to use scala?
 Shouldn't we be dividing these tasks into atomic processes which execute as
 rapidly as possible? What about streaming events to the cluster, wouldn't
 python be a bottleneck here rather than scala with its more robust support
 for multithreading?  Is streaming even supported in python?

 What do people think?

 Best regards,

 Johnny

 --
 Johnny Kelsey
 Chief Technology Officer
 *Semblent*
 *jkkel...@semblent.com jkkel...@semblent.com*



Re: Object serialisation inside closures

2014-09-04 Thread Mohit Jaggi
I faced the same problem and ended up using the same approach that Sean
suggested
https://github.com/AyasdiOpenSource/df/blob/master/src/main/scala/com/ayasdi/df/DF.scala#L313

Option 3 also seems reasonable. It should create a CSVParser per executor.


On Thu, Sep 4, 2014 at 6:58 AM, Andrianasolo Fanilo 
fanilo.andrianas...@worldline.com wrote:

 Thank you for the quick answer, looks good to me

 Though that brings me to another question. Suppose we want to open a
 connection to a database, an ElasticSearch, etc...

 I now have two proceedings :
 1/ use .mapPartitions and setup the connection at the start of each
 partition, so I get a connection per partition
 2/ use a singleton object, which loads a connection per executor if my
 understanding is correct

 I would have used the second possibility, so I don't create a new
 connection for a partition each time the partition fails to compute for
 whatever reason.  I also don't have a lot of connections in parallel
 because I have only one connection per worker. If I have 200 partitions in
 parallel, that makes 200 connections.
 But in the second case a partition could kill the connection on the worker
 during computation and because that connection is shared for all tasks of
 the executor, all partitions would fail. Also, only one connection object
 would have to manage 200 partitions trying to output to
 Elasticsearch/database/etc...that may be bad performance-wise.

 Can't see a case where second is preferable for now. Doesn't seem I could
 use that singleton object to share data within an executor sadly...

 Thanks for the input
 Fanilo


 -Message d'origine-
 De : Sean Owen [mailto:so...@cloudera.com]
 Envoyé : jeudi 4 septembre 2014 15:36
 À : Andrianasolo Fanilo
 Cc : user@spark.apache.org
 Objet : Re: Object serialisation inside closures

 In your original version, the object is referenced by the function but
 it's on the driver, and so has to be serialized. This leads to an error
 since it's not serializable. Instead, you want to recreate the object
 locally on each of the remote machines.

 In your third version you are holding the parser in a static member of a
 class, in your Scala object. When you call the parse method, you're calling
 it on the instance of the CSVParserPlus class that was loaded on the remote
 worker. It loads and creates its own copy of the parser.

 A maybe more compact solution is to use mapPartitions, and create the
 parser once at the start. This avoids needing this static / singleton
 pattern, but also means the parser is created only once per partition.

 On Thu, Sep 4, 2014 at 2:29 PM, Andrianasolo Fanilo 
 fanilo.andrianas...@worldline.com wrote:
  Hello Spark fellows J
 
 
 
  I’m a new user of Spark and Scala and have been using both for 6
  months without too many problems.
 
  Here I’m looking for best practices for using non-serializable classes
  inside closure. I’m using Spark-0.9.0-incubating here with Hadoop 2.2.
 
 
 
  Suppose I am using OpenCSV parser to parse an input file. So inside my
  main
  :
 
 
 
  val sc = new SparkContext(local[2], App)
 
  val heyRDD = sc.textFile(…)
 
 
 
  val csvparser = new CSVParser(';')
 
  val heyMap = heyRDD.map { line =
 
val temp = csvparser.parseLine(line)
 
(temp(1), temp(4))
 
  }
 
 
 
 
 
  This gives me a java.io.NotSerializableException:
  au.com.bytecode.opencsv.CSVParser, which seems reasonable.
 
 
 
  From here I could see 3 solutions :
 
  1/ Extending CSVParser with Serialisable properties, which adds a lot
  of boilerplate code if you ask me
 
  2/ Using Kryo Serialization (still need to define a serializer)
 
  3/ Creating an object with an instance of the class I want to use,
  typically
  :
 
 
 
  object CSVParserPlus {
 
 
 
val csvParser = new CSVParser(';')
 
 
 
def parse(line: String) = {
 
  csvParser.parseLine(line)
 
}
 
  }
 
 
 
 
 
  val heyMap = heyRDD.map { line =
 
val temp = CSVParserPlus.parse(line)
 
(temp(1), temp(4))
 
  }
 
 
 
  Third solution works and I don’t get how, so I was wondering how
  worked the closure system inside Spark to be able to serialize an
  object with a non-serializable instance. How does that work ? Does it
 hinder performance ?
  Is it a good solution ? How do you manage this problem ?
 
 
 
  Any input would be greatly appreciated
 
 
 
  Best regards,
 
  Fanilo
 
 
  
 
  Ce message et les pièces jointes sont confidentiels et réservés à
  l'usage exclusif de ses destinataires. Il peut également être protégé
  par le secret professionnel. Si vous recevez ce message par erreur,
  merci d'en avertir immédiatement l'expéditeur et de le détruire.
  L'intégrité du message ne pouvant être assurée sur Internet, la
  responsabilité de Worldline ne pourra être recherchée quant au contenu
  de ce message. Bien que les meilleurs efforts soient faits pour
  maintenir cette transmission exempte de tout virus, l'expéditeur 

Re: pandas-like dataframe in spark

2014-09-04 Thread Mohit Jaggi
Thanks Matei. I will take a look at SchemaRDDs.


On Thu, Sep 4, 2014 at 11:24 AM, Matei Zaharia matei.zaha...@gmail.com
wrote:

 Hi Mohit,

 This looks pretty interesting, but just a note on the implementation -- it
 might be worthwhile to try doing this on top of Spark SQL SchemaRDDs. The
 reason is that SchemaRDDs already have an efficient in-memory
 representation (columnar storage), and can be read from a variety of data
 sources (JSON, Hive, soon things like CSV as well). Using the operators in
 Spark SQL you can also get really efficient code-generated operations on
 them. I know that stuff like zipping two data frames might become harder,
 but the overall benefit in performance could be substantial.

 Matei

 On September 4, 2014 at 9:28:12 AM, Mohit Jaggi (mohitja...@gmail.com)
 wrote:

 Folks,
 I have been working on a pandas-like dataframe DSL on top of spark. It is
 written in Scala and can be used from spark-shell. The APIs have the look
 and feel of pandas which is a wildly popular piece of software data
 scientists use. The goal is to let people familiar with pandas scale their
 efforts to larger datasets by using spark but not having to go through a
 steep learning curve for Spark and Scala.
 It is open sourced with Apache License and can be found here:
 https://github.com/AyasdiOpenSource/df

 I welcome your comments, suggestions and feedback. Any help in developing
 it further is much appreciated. I have the following items on the roadmap
 (and happy to change this based on your comments)
 - Python wrappers most likely in the same way as MLLib
 - Sliding window aggregations
 - Row indexing
 - Graphing/charting
 - Efficient row-based operations
 - Pretty printing of output on the spark-shell
 - Unit test completeness and automated nightly runs

 Mohit.

 P.S.: Thanks to my awesome employer Ayasdi http://www.ayasdi.com for
 open sourcing this software

 P.P.S.: I need some design advice on making row operations efficient and
 I'll start a new thread for that




why classTag not typeTag?

2014-08-22 Thread Mohit Jaggi
Folks,
I am wondering why Spark uses ClassTag in RDD[T: ClassTag] instead of the
more functional TypeTag option.
I have some code that needs TypeTag functionality and I don't know if a
typeTag can be converted to a classTag.

Mohit.


kryo out of buffer exception

2014-08-16 Thread Mohit Jaggi
Hi All,
I was doing a groupBy and apparently some keys were very frequent making
the serializer fail with buffer overflow exception. I did not need a
groupBy so I switched to combineByKey in this case but would like to know
how to increase the kryo buffer sizes to avoid this error. I hope there is
a way to grow the buffers dynamically based on the size of the data.

Mohit.


closure issue - works in scalatest but not in spark-shell

2014-08-15 Thread Mohit Jaggi
Folks,

I wrote the following wrapper on top on combineByKey. The RDD is of
Array[Any] and I am extracting a field at a given index for combining.
There are two ways in which I tried this:

Option A: leave colIndex abstract in Aggregator class and define in derived
object Aggtor with value -1. It is set later in function myAggregate. Works
fine but I want to keep the API user unaware of colIndex.

Option B(shown in code below): Set colIndex to -1 in abstract class. Aggtor
does not mention it at all. It is set later in myAggregate.

Option B works from scalatest in Eclipse but runs into closure mishap in
scala-shell. I am looking for an explanation and a possible
solution/workaround. Appreciate any help!

Thanks,

Mohit.

-- API helper -

abstract class Aggregator[U] {

var colIndex: Int = -1

def convert(a: Array[Any]): U = {

a(colIndex).asInstanceOf[U]

}

def mergeValue(a: U, b: Array[Any]): U = {

aggregate(a, convert(b))

}

def mergeCombiners(x: U, y: U): U = {

aggregate(x, y)

}

def aggregate(p: U, q: U): U

}

-- API handler -

def myAggregate[U: ClassTag](...aggtor: Aggregator[U]) = {

aggtor.colIndex = something

keyBy(aggByCol).combineByKey(aggtor.convert, aggtor.mergeValue,
aggtor.mergeCombiners)

}


 call the API 

case object Aggtor extends Aggregator[List[String]] {

//var colIndex = -1

 def aggregate = 

}

myAggregate(...Aggtor)


sparkcontext stop and then start again

2014-07-25 Thread Mohit Jaggi
Folks,
I had some pyspark code which used to hang with no useful debug logs. It
got fixed when I changed my code to keep the sparkcontext forever instead
of stopping it and then creating another one later. Is this a bug or
expected behavior?

Mohit.


Re: pyspark sc.parallelize running OOM with smallish data

2014-07-14 Thread Mohit Jaggi
Continuing to debug with Scala, I tried this on local with enough memory
(10g) and it is able to count the dataset. With more memory(for executor
and driver) in a cluster it still fails. The data is about 2Gbytes. It is
30k * 4k doubles.


On Sat, Jul 12, 2014 at 6:31 PM, Aaron Davidson ilike...@gmail.com wrote:

 I think this is probably dying on the driver itself, as you are probably
 materializing the whole dataset inside your python driver. How large is
 spark_data_array compared to your driver memory?


 On Fri, Jul 11, 2014 at 7:30 PM, Mohit Jaggi mohitja...@gmail.com wrote:

 I put the same dataset into scala (using spark-shell) and it acts weird.
 I cannot do a count on it, the executors seem to hang. The WebUI shows 0/96
 in the status bar, shows details about the worker nodes but there is no
 progress.
 sc.parallelize does finish (takes too long for the data size) in scala.


 On Fri, Jul 11, 2014 at 2:00 PM, Mohit Jaggi mohitja...@gmail.com
 wrote:

 spark_data_array here has about 35k rows with 4k columns. I have 4 nodes
 in the cluster and gave 48g to executors. also tried kyro serialization.

 traceback (most recent call last):

   File /mohit/./m.py, line 58, in module

 spark_data = sc.parallelize(spark_data_array)

   File /mohit/spark/python/pyspark/context.py, line 265, in parallelize

 jrdd = readRDDFromFile(self._jsc, tempFile.name, numSlices)

   File
 /mohit/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py, line
 537, in __call__

   File /mohit/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py,
 line 300, in get_return_value

 py4j.protocol.Py4JJavaError: An error occurred while calling
 z:org.apache.spark.api.python.PythonRDD.readRDDFromFile.

 : java.lang.OutOfMemoryError: Java heap space

 at
 org.apache.spark.api.python.PythonRDD$.readRDDFromFile(PythonRDD.scala:279)

 at org.apache.spark.api.python.PythonRDD.readRDDFromFile(PythonRDD.scala)

 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

 at java.lang.reflect.Method.invoke(Method.java:606)

 at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)

 at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)

 at py4j.Gateway.invoke(Gateway.java:259)

 at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

 at py4j.commands.CallCommand.execute(CallCommand.java:79)

 at py4j.GatewayConnection.run(GatewayConnection.java:207)

 at java.lang.Thread.run(Thread.java:745)






pyspark sc.parallelize running OOM with smallish data

2014-07-11 Thread Mohit Jaggi
spark_data_array here has about 35k rows with 4k columns. I have 4 nodes in
the cluster and gave 48g to executors. also tried kyro serialization.

traceback (most recent call last):

  File /mohit/./m.py, line 58, in module

spark_data = sc.parallelize(spark_data_array)

  File /mohit/spark/python/pyspark/context.py, line 265, in parallelize

jrdd = readRDDFromFile(self._jsc, tempFile.name, numSlices)

  File /mohit/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py,
line 537, in __call__

  File /mohit/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py, line
300, in get_return_value

py4j.protocol.Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.readRDDFromFile.

: java.lang.OutOfMemoryError: Java heap space

at
org.apache.spark.api.python.PythonRDD$.readRDDFromFile(PythonRDD.scala:279)

at org.apache.spark.api.python.PythonRDD.readRDDFromFile(PythonRDD.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)

at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)

at py4j.Gateway.invoke(Gateway.java:259)

at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

at py4j.commands.CallCommand.execute(CallCommand.java:79)

at py4j.GatewayConnection.run(GatewayConnection.java:207)

at java.lang.Thread.run(Thread.java:745)


Re: pyspark sc.parallelize running OOM with smallish data

2014-07-11 Thread Mohit Jaggi
I put the same dataset into scala (using spark-shell) and it acts weird. I
cannot do a count on it, the executors seem to hang. The WebUI shows 0/96
in the status bar, shows details about the worker nodes but there is no
progress.
sc.parallelize does finish (takes too long for the data size) in scala.


On Fri, Jul 11, 2014 at 2:00 PM, Mohit Jaggi mohitja...@gmail.com wrote:

 spark_data_array here has about 35k rows with 4k columns. I have 4 nodes
 in the cluster and gave 48g to executors. also tried kyro serialization.

 traceback (most recent call last):

   File /mohit/./m.py, line 58, in module

 spark_data = sc.parallelize(spark_data_array)

   File /mohit/spark/python/pyspark/context.py, line 265, in parallelize

 jrdd = readRDDFromFile(self._jsc, tempFile.name, numSlices)

   File /mohit/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py,
 line 537, in __call__

   File /mohit/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py, line
 300, in get_return_value

 py4j.protocol.Py4JJavaError: An error occurred while calling
 z:org.apache.spark.api.python.PythonRDD.readRDDFromFile.

 : java.lang.OutOfMemoryError: Java heap space

 at
 org.apache.spark.api.python.PythonRDD$.readRDDFromFile(PythonRDD.scala:279)

 at org.apache.spark.api.python.PythonRDD.readRDDFromFile(PythonRDD.scala)

 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

 at java.lang.reflect.Method.invoke(Method.java:606)

 at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)

 at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)

 at py4j.Gateway.invoke(Gateway.java:259)

 at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

 at py4j.commands.CallCommand.execute(CallCommand.java:79)

 at py4j.GatewayConnection.run(GatewayConnection.java:207)

 at java.lang.Thread.run(Thread.java:745)



Re: pyspark regression results way off

2014-06-25 Thread Mohit Jaggi
Is a python binding for LBFGS in the works? My co-worker has written one
and can contribute back if it helps.


On Mon, Jun 16, 2014 at 11:00 AM, DB Tsai dbt...@stanford.edu wrote:

 Is your data normalized? Sometimes, GD doesn't work well if the data
 has wide range. If you are willing to write scala code, you can try
 LBFGS optimizer which converges better than GD.

 Sincerely,

 DB Tsai
 ---
 My Blog: https://www.dbtsai.com
 LinkedIn: https://www.linkedin.com/in/dbtsai


 On Mon, Jun 16, 2014 at 8:14 AM, jamborta jambo...@gmail.com wrote:
  forgot to mention that I'm running spark 1.0
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-regression-results-way-off-tp7672p7673.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.



kibana like frontend for spark

2014-06-20 Thread Mohit Jaggi
Folks,
I want to analyse logs and I want to use spark for that. However,
elasticsearch has a fancy frontend in Kibana. Kibana's docs indicate that
it works with elasticsearch only. Is there a similar frontend that can work
with spark?

Mohit.

P.S.: On MapR's spark FAQ I read a statement like Kibana can use any
ODBC/JDBC backend and Shark has that interace


Re: spark with docker: errors with akka, NAT?

2014-06-19 Thread Mohit Jaggi
Based on Jacob's suggestion, I started using --net=host which is a new
option in latest version of docker. I also set SPARK_LOCAL_IP to the host's
IP address and then AKKA does not use the hostname and I don't need the
Spark driver's hostname to be resolvable.

Thanks guys for your help!


On Tue, Jun 17, 2014 at 7:49 PM, Aaron Davidson ilike...@gmail.com wrote:

 Yup, alright, same solution then :)


 On Tue, Jun 17, 2014 at 7:39 PM, Mohit Jaggi mohitja...@gmail.com wrote:

 I used --privileged to start the container and then unmounted /etc/hosts.
 Then I created a new /etc/hosts file


 On Tue, Jun 17, 2014 at 4:58 PM, Aaron Davidson ilike...@gmail.com
 wrote:

 I remember having to do a similar thing in the spark docker scripts for
 testing purposes. Were you able to modify the /etc/hosts directly? I
 remember issues with that as docker apparently mounts it as part of its
 read-only filesystem.


 On Tue, Jun 17, 2014 at 4:36 PM, Mohit Jaggi mohitja...@gmail.com
 wrote:

 It was a DNS issue. AKKA apparently uses the hostname of the endpoints
 and hence they need to be resolvable. In my case the hostname of the docker
 container was a randomly generated string and was not resolvable. I added a
 workaround (entry in etc/hosts file of spark master) for now. If anyone can
 point to a more elegant solution, that would be awesome!


 On Tue, Jun 17, 2014 at 3:48 PM, Mohit Jaggi mohitja...@gmail.com
 wrote:

 I am using cutting edge code from git but doing my own sbt assembly.


 On Mon, Jun 16, 2014 at 10:28 PM, Andre Schumacher 
 schum...@icsi.berkeley.edu wrote:


 Hi,

 are you using the amplab/spark-1.0.0 images from the global registry?

 Andre

 On 06/17/2014 01:36 AM, Mohit Jaggi wrote:
  Hi Folks,
 
  I am having trouble getting spark driver running in docker. If I
 run a
  pyspark example on my mac it works but the same example on a docker
 image
  (Via boot2docker) fails with following logs. I am pointing the
 spark driver
  (which is running the example) to a spark cluster (driver is not
 part of
  the cluster). I guess this has something to do with docker's
 networking
  stack (it may be getting NAT'd) but I am not sure why (if at all)
 the
  spark-worker or spark-master is trying to create a new TCP
 connection to
  the driver, instead of responding on the connection initiated by
 the driver.
 
  I would appreciate any help in figuring this out.
 
  Thanks,
 
  Mohit.
 
  logs
 
  Spark Executor Command: java -cp
 
 ::/home/ayasdi/spark/conf:/home//spark/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.0.4.jar
  -Xms2g -Xmx2g -Xms512M -Xmx512M
  org.apache.spark.executor.CoarseGrainedExecutorBackend
  akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler
 1
  cobalt 24 akka.tcp://sparkWorker@:33952/user/Worker
  app-20140616152201-0021
 
  
 
 
  log4j:WARN No appenders could be found for logger
  (org.apache.hadoop.conf.Configuration).
 
  log4j:WARN Please initialize the log4j system properly.
 
  log4j:WARN See
 http://logging.apache.org/log4j/1.2/faq.html#noconfig for
  more info.
 
  14/06/16 15:22:05 INFO SparkHadoopUtil: Using Spark's default log4j
  profile: org/apache/spark/log4j-defaults.properties
 
  14/06/16 15:22:05 INFO SecurityManager: Changing view acls to:
 ayasdi,root
 
  14/06/16 15:22:05 INFO SecurityManager: SecurityManager:
 authentication
  disabled; ui acls disabled; users with view permissions: Set(xxx,
 xxx)
 
  14/06/16 15:22:05 INFO Slf4jLogger: Slf4jLogger started
 
  14/06/16 15:22:05 INFO Remoting: Starting remoting
 
  14/06/16 15:22:06 INFO Remoting: Remoting started; listening on
 addresses
  :[akka.tcp://sparkExecutor@:33536]
 
  14/06/16 15:22:06 INFO Remoting: Remoting now listens on addresses:
  [akka.tcp://sparkExecutor@:33536]
 
  14/06/16 15:22:06 INFO CoarseGrainedExecutorBackend: Connecting to
 driver:
  akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler
 
  14/06/16 15:22:06 INFO WorkerWatcher: Connecting to worker
  akka.tcp://sparkWorker@:33952/user/Worker
 
  14/06/16 15:22:06 WARN Remoting: Tried to associate with
 unreachable remote
  address [akka.tcp://spark@fc31887475e3:43921]. Address is now
 gated for
  6 ms, all messages to this address will be delivered to dead
 letters.
 
  14/06/16 15:22:06 ERROR CoarseGrainedExecutorBackend: Driver
 Disassociated
  [akka.tcp://sparkExecutor@:33536] -
 [akka.tcp://spark@fc31887475e3:43921]
  disassociated! Shutting down.
 









Re: spark with docker: errors with akka, NAT?

2014-06-17 Thread Mohit Jaggi
I am using cutting edge code from git but doing my own sbt assembly.


On Mon, Jun 16, 2014 at 10:28 PM, Andre Schumacher 
schum...@icsi.berkeley.edu wrote:


 Hi,

 are you using the amplab/spark-1.0.0 images from the global registry?

 Andre

 On 06/17/2014 01:36 AM, Mohit Jaggi wrote:
  Hi Folks,
 
  I am having trouble getting spark driver running in docker. If I run a
  pyspark example on my mac it works but the same example on a docker image
  (Via boot2docker) fails with following logs. I am pointing the spark
 driver
  (which is running the example) to a spark cluster (driver is not part of
  the cluster). I guess this has something to do with docker's networking
  stack (it may be getting NAT'd) but I am not sure why (if at all) the
  spark-worker or spark-master is trying to create a new TCP connection to
  the driver, instead of responding on the connection initiated by the
 driver.
 
  I would appreciate any help in figuring this out.
 
  Thanks,
 
  Mohit.
 
  logs
 
  Spark Executor Command: java -cp
 
 ::/home/ayasdi/spark/conf:/home//spark/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.0.4.jar
  -Xms2g -Xmx2g -Xms512M -Xmx512M
  org.apache.spark.executor.CoarseGrainedExecutorBackend
  akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler 1
  cobalt 24 akka.tcp://sparkWorker@:33952/user/Worker
  app-20140616152201-0021
 
  
 
 
  log4j:WARN No appenders could be found for logger
  (org.apache.hadoop.conf.Configuration).
 
  log4j:WARN Please initialize the log4j system properly.
 
  log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
  more info.
 
  14/06/16 15:22:05 INFO SparkHadoopUtil: Using Spark's default log4j
  profile: org/apache/spark/log4j-defaults.properties
 
  14/06/16 15:22:05 INFO SecurityManager: Changing view acls to:
 ayasdi,root
 
  14/06/16 15:22:05 INFO SecurityManager: SecurityManager: authentication
  disabled; ui acls disabled; users with view permissions: Set(xxx, xxx)
 
  14/06/16 15:22:05 INFO Slf4jLogger: Slf4jLogger started
 
  14/06/16 15:22:05 INFO Remoting: Starting remoting
 
  14/06/16 15:22:06 INFO Remoting: Remoting started; listening on addresses
  :[akka.tcp://sparkExecutor@:33536]
 
  14/06/16 15:22:06 INFO Remoting: Remoting now listens on addresses:
  [akka.tcp://sparkExecutor@:33536]
 
  14/06/16 15:22:06 INFO CoarseGrainedExecutorBackend: Connecting to
 driver:
  akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler
 
  14/06/16 15:22:06 INFO WorkerWatcher: Connecting to worker
  akka.tcp://sparkWorker@:33952/user/Worker
 
  14/06/16 15:22:06 WARN Remoting: Tried to associate with unreachable
 remote
  address [akka.tcp://spark@fc31887475e3:43921]. Address is now gated for
  6 ms, all messages to this address will be delivered to dead letters.
 
  14/06/16 15:22:06 ERROR CoarseGrainedExecutorBackend: Driver
 Disassociated
  [akka.tcp://sparkExecutor@:33536] - [akka.tcp://spark@fc31887475e3
 :43921]
  disassociated! Shutting down.
 




Re: spark with docker: errors with akka, NAT?

2014-06-17 Thread Mohit Jaggi
I used --privileged to start the container and then unmounted /etc/hosts.
Then I created a new /etc/hosts file


On Tue, Jun 17, 2014 at 4:58 PM, Aaron Davidson ilike...@gmail.com wrote:

 I remember having to do a similar thing in the spark docker scripts for
 testing purposes. Were you able to modify the /etc/hosts directly? I
 remember issues with that as docker apparently mounts it as part of its
 read-only filesystem.


 On Tue, Jun 17, 2014 at 4:36 PM, Mohit Jaggi mohitja...@gmail.com wrote:

 It was a DNS issue. AKKA apparently uses the hostname of the endpoints
 and hence they need to be resolvable. In my case the hostname of the docker
 container was a randomly generated string and was not resolvable. I added a
 workaround (entry in etc/hosts file of spark master) for now. If anyone can
 point to a more elegant solution, that would be awesome!


 On Tue, Jun 17, 2014 at 3:48 PM, Mohit Jaggi mohitja...@gmail.com
 wrote:

 I am using cutting edge code from git but doing my own sbt assembly.


 On Mon, Jun 16, 2014 at 10:28 PM, Andre Schumacher 
 schum...@icsi.berkeley.edu wrote:


 Hi,

 are you using the amplab/spark-1.0.0 images from the global registry?

 Andre

 On 06/17/2014 01:36 AM, Mohit Jaggi wrote:
  Hi Folks,
 
  I am having trouble getting spark driver running in docker. If I run a
  pyspark example on my mac it works but the same example on a docker
 image
  (Via boot2docker) fails with following logs. I am pointing the spark
 driver
  (which is running the example) to a spark cluster (driver is not part
 of
  the cluster). I guess this has something to do with docker's
 networking
  stack (it may be getting NAT'd) but I am not sure why (if at all) the
  spark-worker or spark-master is trying to create a new TCP connection
 to
  the driver, instead of responding on the connection initiated by the
 driver.
 
  I would appreciate any help in figuring this out.
 
  Thanks,
 
  Mohit.
 
  logs
 
  Spark Executor Command: java -cp
 
 ::/home/ayasdi/spark/conf:/home//spark/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.0.4.jar
  -Xms2g -Xmx2g -Xms512M -Xmx512M
  org.apache.spark.executor.CoarseGrainedExecutorBackend
  akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler 1
  cobalt 24 akka.tcp://sparkWorker@:33952/user/Worker
  app-20140616152201-0021
 
  
 
 
  log4j:WARN No appenders could be found for logger
  (org.apache.hadoop.conf.Configuration).
 
  log4j:WARN Please initialize the log4j system properly.
 
  log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig
 for
  more info.
 
  14/06/16 15:22:05 INFO SparkHadoopUtil: Using Spark's default log4j
  profile: org/apache/spark/log4j-defaults.properties
 
  14/06/16 15:22:05 INFO SecurityManager: Changing view acls to:
 ayasdi,root
 
  14/06/16 15:22:05 INFO SecurityManager: SecurityManager:
 authentication
  disabled; ui acls disabled; users with view permissions: Set(xxx, xxx)
 
  14/06/16 15:22:05 INFO Slf4jLogger: Slf4jLogger started
 
  14/06/16 15:22:05 INFO Remoting: Starting remoting
 
  14/06/16 15:22:06 INFO Remoting: Remoting started; listening on
 addresses
  :[akka.tcp://sparkExecutor@:33536]
 
  14/06/16 15:22:06 INFO Remoting: Remoting now listens on addresses:
  [akka.tcp://sparkExecutor@:33536]
 
  14/06/16 15:22:06 INFO CoarseGrainedExecutorBackend: Connecting to
 driver:
  akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler
 
  14/06/16 15:22:06 INFO WorkerWatcher: Connecting to worker
  akka.tcp://sparkWorker@:33952/user/Worker
 
  14/06/16 15:22:06 WARN Remoting: Tried to associate with unreachable
 remote
  address [akka.tcp://spark@fc31887475e3:43921]. Address is now gated
 for
  6 ms, all messages to this address will be delivered to dead
 letters.
 
  14/06/16 15:22:06 ERROR CoarseGrainedExecutorBackend: Driver
 Disassociated
  [akka.tcp://sparkExecutor@:33536] -
 [akka.tcp://spark@fc31887475e3:43921]
  disassociated! Shutting down.
 







spark with docker: errors with akka, NAT?

2014-06-16 Thread Mohit Jaggi
Hi Folks,

I am having trouble getting spark driver running in docker. If I run a
pyspark example on my mac it works but the same example on a docker image
(Via boot2docker) fails with following logs. I am pointing the spark driver
(which is running the example) to a spark cluster (driver is not part of
the cluster). I guess this has something to do with docker's networking
stack (it may be getting NAT'd) but I am not sure why (if at all) the
spark-worker or spark-master is trying to create a new TCP connection to
the driver, instead of responding on the connection initiated by the driver.

I would appreciate any help in figuring this out.

Thanks,

Mohit.

logs

Spark Executor Command: java -cp
::/home/ayasdi/spark/conf:/home//spark/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.0.4.jar
-Xms2g -Xmx2g -Xms512M -Xmx512M
org.apache.spark.executor.CoarseGrainedExecutorBackend
akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler 1
cobalt 24 akka.tcp://sparkWorker@:33952/user/Worker
app-20140616152201-0021




log4j:WARN No appenders could be found for logger
(org.apache.hadoop.conf.Configuration).

log4j:WARN Please initialize the log4j system properly.

log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
more info.

14/06/16 15:22:05 INFO SparkHadoopUtil: Using Spark's default log4j
profile: org/apache/spark/log4j-defaults.properties

14/06/16 15:22:05 INFO SecurityManager: Changing view acls to: ayasdi,root

14/06/16 15:22:05 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(xxx, xxx)

14/06/16 15:22:05 INFO Slf4jLogger: Slf4jLogger started

14/06/16 15:22:05 INFO Remoting: Starting remoting

14/06/16 15:22:06 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkExecutor@:33536]

14/06/16 15:22:06 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://sparkExecutor@:33536]

14/06/16 15:22:06 INFO CoarseGrainedExecutorBackend: Connecting to driver:
akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler

14/06/16 15:22:06 INFO WorkerWatcher: Connecting to worker
akka.tcp://sparkWorker@:33952/user/Worker

14/06/16 15:22:06 WARN Remoting: Tried to associate with unreachable remote
address [akka.tcp://spark@fc31887475e3:43921]. Address is now gated for
6 ms, all messages to this address will be delivered to dead letters.

14/06/16 15:22:06 ERROR CoarseGrainedExecutorBackend: Driver Disassociated
[akka.tcp://sparkExecutor@:33536] - [akka.tcp://spark@fc31887475e3:43921]
disassociated! Shutting down.


Re: ExternalAppendOnlyMap: Spilling in-memory map

2014-05-22 Thread Mohit Jaggi
Andrew,
I did not register anything explicitly based on the belief that the class
name is written out in full only once. I also wondered why that problem
would be specific to JodaTime and not show up with Java.util.date...I guess
it is possible based on internals of Joda time.
If I remove DateTime from my RDD, the problem goes away.
I will try explicit registration(and add DateTime back to my RDD) and see
if that makes things better.

Mohit.




On Wed, May 21, 2014 at 8:36 PM, Andrew Ash and...@andrewash.com wrote:

 Hi Mohit,

 The log line about the ExternalAppendOnlyMap is more of a symptom of
 slowness than causing slowness itself.  The ExternalAppendOnlyMap is used
 when a shuffle is causing too much data to be held in memory.  Rather than
 OOM'ing, Spark writes the data out to disk in a sorted order and reads it
 back from disk later on when it's needed.  That's the job of the
 ExternalAppendOnlyMap.

 I wouldn't normally expect a conversion from Date to a Joda DateTime to
 take significantly more memory.  But since you're using Kryo and classes
 should be registered with it, may may have forgotten to register DateTime
 with Kryo.  If you don't register a class, it writes the class name at the
 beginning of every serialized instance, which for DateTime objects of size
 roughly 1 long, that's a ton of extra space and very inefficient.

 Can you confirm that DateTime is registered with Kryo?

 http://spark.apache.org/docs/latest/tuning.html#data-serialization


 On Wed, May 21, 2014 at 2:35 PM, Mohit Jaggi mohitja...@gmail.com wrote:

 Hi,

 I changed my application to use Joda time instead of java.util.Date and I
 started getting this:

 WARN ExternalAppendOnlyMap: Spilling in-memory map of 484 MB to disk (1
 time so far)

 What does this mean? How can I fix this? Due to this a small job takes
 forever.

 Mohit.


 P.S.: I am using kyro serialization, have played around with several
 values of sparkRddMemFraction





Re: accessing partition i+1 from mapper of partition i

2014-05-22 Thread Mohit Jaggi
Austin,
I made up a mock example...my real use case is more complex. I used
foreach() instead of collect/cache..that forces the accumulable to be
evaluated. On another thread Xiangrui pointed me to a sliding window rdd in
mlllib that is a great alternative (although I did not switch to using it)

Mohit.


On Thu, May 22, 2014 at 2:30 PM, Austin Gibbons aus...@quantifind.comwrote:

 Mohit, if you want to end up with (1 .. N) , why don't you skip the logic
 for finding missing values, and generate it directly?

 val max = myCollection.reduce(math.max)
 sc.parallelize((0 until max))

 In either case, you don't need to call cache, which will force it into
 memory - you can do something like count which will not necessarily store
 the RDD in memory.

 Additionally, instead of an accumulable, you could consider mapping that
 value directly:

 rdd.mapPartitionWithIndex{case(index, partition) = index -
 partition.reduce(math.max)}.collectAsMap()


 On Mon, May 19, 2014 at 9:50 PM, Mohit Jaggi mohitja...@gmail.com wrote:

 Thanks Brian. This works. I used Accumulable to do the collect in step
 B. While doing that I found that Accumulable.value is not a Spark action,
 I need to call cache in the underlying RDD for value to work. Not sure
 if that is intentional or a bug.
 The collect of Step B can be done as a new RDD too.


 On Thu, May 15, 2014 at 5:47 PM, Brian Gawalt bgaw...@gmail.com wrote:

 I don't think there's a direct way of bleeding elements across
 partitions. But you could write it yourself relatively succinctly:

 A) Sort the RDD
 B) Look at the sorted RDD's partitions with the
 .mapParititionsWithIndex( ) method. Map each partition to its partition ID,
 and its maximum element. Collect the (partID, maxElements) in the driver.
 C) Broadcast the collection of (partID, part's max element) tuples
 D) Look again at the sorted RDD's partitions with
 mapPartitionsWithIndex( ). For each partition *K:*
 D1) Find the immediately-preceding partition* K -1 , *and its
 associated maximum value. Use that to decide how many values are missing
 between the last element of part *K-1 *and the first element of part *K*
 .
 D2) Step through part *K*'s elements and find the rest of the missing
 elements in that part

 This approach sidesteps worries you might have over the hack of using
 .filter to remove the first element, as well as the zipping.

 --Brian



 On Tue, May 13, 2014 at 9:31 PM, Mohit Jaggi mohitja...@gmail.comwrote:

 Hi,
 I am trying to find a way to fill in missing values in an RDD. The RDD
 is a sorted sequence.
 For example, (1, 2, 3, 5, 8, 11, ...)
 I need to fill in the missing numbers and get (1,2,3,4,5,6,7,8,9,10,11)

 One way to do this is to slide and zip
 rdd1 =  sc.parallelize(List(1, 2, 3, 5, 8, 11, ...))
 x = rdd1.first
 rdd2 = rdd1 filter (_ != x)
 rdd3 = rdd2 zip rdd1
 rdd4 = rdd3 flatmap { (x, y) = generate missing elements between x and
 y }

 Another method which I think is more efficient is to use
 mapParititions() on rdd1 to be able to iterate on elements of rdd1 in each
 partition. However, that leaves the boundaries of the partitions to be
 unfilled. *Is there a way within the function passed to
 mapPartitions, to read the first element in the next partition?*

 The latter approach also appears to work for a general sliding window
 calculation on the RDD. The former technique requires a lot of sliding and
 zipping and I believe it is not efficient. If only I could read the next
 partition...I have tried passing a pointer to rdd1 to the function passed
 to mapPartitions but the rdd1 pointer turns out to be NULL, I guess because
 Spark cannot deal with a mapper calling another mapper (since it happens on
 a worker not the driver)

 Mohit.






 --
 . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
 Austin Gibbons
 Research | quantiFind http://www.quantifind.com/ | 708 601 4894
 . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .



Re: filling missing values in a sequence

2014-05-20 Thread Mohit Jaggi
Xiangrui,
Thanks for the pointer. I think it should work...for now I did cook up my
own which is similar but on top of spark core APIs. I would suggest moving
the sliding window RDD to the core spark library. It seems quite general to
me and a cursory look at the code indicates nothing specific to machine
learning.

Mohit.


On Mon, May 19, 2014 at 10:13 PM, Xiangrui Meng men...@gmail.com wrote:

 Actually there is a sliding method implemented in
 mllib.rdd.RDDFunctions. Since this is not for general use cases, we
 didn't include it in spark-core. You can take a look at the
 implementation there and see whether it fits. -Xiangrui

 On Mon, May 19, 2014 at 10:06 PM, Mohit Jaggi mohitja...@gmail.com
 wrote:
  Thanks Sean. Yes, your solution works :-) I did oversimplify my real
  problem, which has other parameters that go along with the sequence.
 
 
  On Fri, May 16, 2014 at 3:03 AM, Sean Owen so...@cloudera.com wrote:
 
  Not sure if this is feasible, but this literally does what I think you
  are describing:
 
  sc.parallelize(rdd1.first to rdd1.last)
 
  On Tue, May 13, 2014 at 4:56 PM, Mohit Jaggi mohitja...@gmail.com
 wrote:
   Hi,
   I am trying to find a way to fill in missing values in an RDD. The RDD
   is a
   sorted sequence.
   For example, (1, 2, 3, 5, 8, 11, ...)
   I need to fill in the missing numbers and get
 (1,2,3,4,5,6,7,8,9,10,11)
  
   One way to do this is to slide and zip
   rdd1 =  sc.parallelize(List(1, 2, 3, 5, 8, 11, ...))
   x = rdd1.first
   rdd2 = rdd1 filter (_ != x)
   rdd3 = rdd2 zip rdd1
   rdd4 = rdd3 flatmap { (x, y) = generate missing elements between x
 and
   y }
  
   Another method which I think is more efficient is to use
   mapParititions() on
   rdd1 to be able to iterate on elements of rdd1 in each partition.
   However,
   that leaves the boundaries of the partitions to be unfilled. Is
 there
   a
   way within the function passed to mapPartitions, to read the first
   element
   in the next partition?
  
   The latter approach also appears to work for a general sliding
 window
   calculation on the RDD. The former technique requires a lot of
 sliding
   and
   zipping and I believe it is not efficient. If only I could read the
   next
   partition...I have tried passing a pointer to rdd1 to the function
   passed to
   mapPartitions but the rdd1 pointer turns out to be NULL, I guess
 because
   Spark cannot deal with a mapper calling another mapper (since it
 happens
   on
   a worker not the driver)
  
   Mohit.
 
 



Re: filling missing values in a sequence

2014-05-19 Thread Mohit Jaggi
Thanks Sean. Yes, your solution works :-) I did oversimplify my real
problem, which has other parameters that go along with the sequence.


On Fri, May 16, 2014 at 3:03 AM, Sean Owen so...@cloudera.com wrote:

 Not sure if this is feasible, but this literally does what I think you
 are describing:

 sc.parallelize(rdd1.first to rdd1.last)

 On Tue, May 13, 2014 at 4:56 PM, Mohit Jaggi mohitja...@gmail.com wrote:
  Hi,
  I am trying to find a way to fill in missing values in an RDD. The RDD
 is a
  sorted sequence.
  For example, (1, 2, 3, 5, 8, 11, ...)
  I need to fill in the missing numbers and get (1,2,3,4,5,6,7,8,9,10,11)
 
  One way to do this is to slide and zip
  rdd1 =  sc.parallelize(List(1, 2, 3, 5, 8, 11, ...))
  x = rdd1.first
  rdd2 = rdd1 filter (_ != x)
  rdd3 = rdd2 zip rdd1
  rdd4 = rdd3 flatmap { (x, y) = generate missing elements between x and
 y }
 
  Another method which I think is more efficient is to use
 mapParititions() on
  rdd1 to be able to iterate on elements of rdd1 in each partition.
 However,
  that leaves the boundaries of the partitions to be unfilled. Is there a
  way within the function passed to mapPartitions, to read the first
 element
  in the next partition?
 
  The latter approach also appears to work for a general sliding window
  calculation on the RDD. The former technique requires a lot of sliding
 and
  zipping and I believe it is not efficient. If only I could read the next
  partition...I have tried passing a pointer to rdd1 to the function
 passed to
  mapPartitions but the rdd1 pointer turns out to be NULL, I guess because
  Spark cannot deal with a mapper calling another mapper (since it happens
 on
  a worker not the driver)
 
  Mohit.



Re: life if an executor

2014-05-19 Thread Mohit Jaggi
I guess it needs to be this way to benefit from caching of RDDs in
memory. It would be nice however if the RDD cache can be dissociated from
the JVM heap so that in cases where garbage collection is difficult to
tune, one could choose to discard the JVM and run the next operation in a
few one.


On Mon, May 19, 2014 at 10:06 PM, Matei Zaharia matei.zaha...@gmail.comwrote:

 They’re tied to the SparkContext (application) that launched them.

 Matei

 On May 19, 2014, at 8:44 PM, Koert Kuipers ko...@tresata.com wrote:

 from looking at the source code i see executors run in their own jvm
 subprocesses.

 how long to they live for? as long as the worker/slave? or are they tied
 to the sparkcontext and life/die with it?

 thx





filling missing values in a sequence

2014-05-15 Thread Mohit Jaggi
Hi,
I am trying to find a way to fill in missing values in an RDD. The RDD is a
sorted sequence.
For example, (1, 2, 3, 5, 8, 11, ...)
I need to fill in the missing numbers and get (1,2,3,4,5,6,7,8,9,10,11)

One way to do this is to slide and zip
rdd1 =  sc.parallelize(List(1, 2, 3, 5, 8, 11, ...))
x = rdd1.first
rdd2 = rdd1 filter (_ != x)
rdd3 = rdd2 zip rdd1
rdd4 = rdd3 flatmap { (x, y) = generate missing elements between x and y }

Another method which I think is more efficient is to use mapParititions()
on rdd1 to be able to iterate on elements of rdd1 in each partition.
However, that leaves the boundaries of the partitions to be unfilled. Is
there a way within the function passed to mapPartitions, to read the first
element in the next partition?

The latter approach also appears to work for a general sliding window
calculation on the RDD. The former technique requires a lot of sliding and
zipping and I believe it is not efficient. If only I could read the next
partition...I have tried passing a pointer to rdd1 to the function passed
to mapPartitions but the rdd1 pointer turns out to be NULL, I guess because
Spark cannot deal with a mapper calling another mapper (since it happens on
a worker not the driver)

Mohit.


accessing partition i+1 from mapper of partition i

2014-05-14 Thread Mohit Jaggi
Hi,
I am trying to find a way to fill in missing values in an RDD. The RDD is a
sorted sequence.
For example, (1, 2, 3, 5, 8, 11, ...)
I need to fill in the missing numbers and get (1,2,3,4,5,6,7,8,9,10,11)

One way to do this is to slide and zip
rdd1 =  sc.parallelize(List(1, 2, 3, 5, 8, 11, ...))
x = rdd1.first
rdd2 = rdd1 filter (_ != x)
rdd3 = rdd2 zip rdd1
rdd4 = rdd3 flatmap { (x, y) = generate missing elements between x and y }

Another method which I think is more efficient is to use mapParititions()
on rdd1 to be able to iterate on elements of rdd1 in each partition.
However, that leaves the boundaries of the partitions to be unfilled. *Is
there a way within the function passed to mapPartitions, to read the first
element in the next partition?*

The latter approach also appears to work for a general sliding window
calculation on the RDD. The former technique requires a lot of sliding and
zipping and I believe it is not efficient. If only I could read the next
partition...I have tried passing a pointer to rdd1 to the function passed
to mapPartitions but the rdd1 pointer turns out to be NULL, I guess because
Spark cannot deal with a mapper calling another mapper (since it happens on
a worker not the driver)

Mohit.


rdd ordering gets scrambled

2014-04-29 Thread Mohit Jaggi
Hi,
I started with a text file(CSV) of sorted data (by first column), parsed it
into Scala objects using map operation in Scala. Then I used more maps to
add some extra info to the data and saved it as text file.
The final text file is not sorted. What do I need to do to keep the order
from the original input intact?

My code looks like:

csvFile = sc.textFile(..) //file is CSV and ordered by first column
splitRdd = csvFile map { line = line.split(,,-1) }
parsedRdd = rdd map { parts =
  {
key = parts(0) //use first column as key
value = new MyObject(parts(0), parts(1)) //parse into scala objects
(key, value)
  }

augmentedRdd = parsedRdd map { x =
   key =  x._1
   value = //add extra fields to x._2
   (key, value)
}
augmentedRdd.saveAsFile(...) //this file is not sorted

Mohit.


Re: error in mllib lr example code

2014-04-24 Thread Mohit Jaggi
Thanks Xiangrui, Matei and Arpit. It does work fine after adding
Vector.dense. I have a follow up question, I will post on a new thread.


On Thu, Apr 24, 2014 at 2:49 AM, Arpit Tak arpi...@sigmoidanalytics.comwrote:

 Also try out these examples, all of them works

 http://docs.sigmoidanalytics.com/index.php/MLlib

 if you spot any problems in those, let us know.

 Regards,
 arpit


 On Wed, Apr 23, 2014 at 11:08 PM, Matei Zaharia 
 matei.zaha...@gmail.comwrote:

 See http://people.csail.mit.edu/matei/spark-unified-docs/ for a more
 recent build of the docs; if you spot any problems in those, let us know.

 Matei

 On Apr 23, 2014, at 9:49 AM, Xiangrui Meng men...@gmail.com wrote:

  The doc is for 0.9.1. You are running a later snapshot, which added
  sparse vectors. Try LabeledPoint(parts(0).toDouble,
  Vectors.dense(parts(1).split(' ').map(x = x.toDouble)). The examples
  are updated in the master branch. You can also check the examples
  there. -Xiangrui
 
  On Wed, Apr 23, 2014 at 9:34 AM, Mohit Jaggi mohitja...@gmail.com
 wrote:
 
  sorry...added a subject now
 
  On Wed, Apr 23, 2014 at 9:32 AM, Mohit Jaggi mohitja...@gmail.com
 wrote:
 
  I am trying to run the example linear regression code from
 
  http://spark.apache.org/docs/latest/mllib-guide.html
 
  But I am getting the following error...am I missing an import?
 
  code
 
  import org.apache.spark._
 
  import org.apache.spark.mllib.regression.LinearRegressionWithSGD
 
  import org.apache.spark.mllib.regression.LabeledPoint
 
 
  object ModelLR {
 
   def main(args: Array[String]) {
 
 val sc = new SparkContext(args(0), SparkLR,
 
   System.getenv(SPARK_HOME),
  SparkContext.jarOfClass(this.getClass).toSeq)
 
  // Load and parse the data
 
  val data = sc.textFile(mllib/data/ridge-data/lpsa.data)
 
  val parsedData = data.map { line =
 
   val parts = line.split(',')
 
   LabeledPoint(parts(0).toDouble, parts(1).split(' ').map(x =
  x.toDouble).toArray)
 
  }
 
  ...snip...
 
  }
 
  error
 
  - polymorphic expression cannot be instantiated to expected type;
 found :
  [U : Double]Array[U] required:
 
  org.apache.spark.mllib.linalg.Vector
 
  - polymorphic expression cannot be instantiated to expected type;
 found :
  [U : Double]Array[U] required:
 
  org.apache.spark.mllib.linalg.Vector
 
 





spark mllib to jblas calls..and comparison with VW

2014-04-24 Thread Mohit Jaggi
Folks,
I am wondering how mllib interacts with jblas and lapack. Does it make
copies of data from my RDD format to jblas's format? Does jblas copy it
again before passing to lapack native code?

I also saw some comparisons with VW and it seems mllib is slower on a
single node but scales better and outperforms VW on 16 nodes. Any idea why?
Are improvements in the pipeline?

Mohit.


error in mllib lr example code

2014-04-23 Thread Mohit Jaggi
sorry...added a subject now

On Wed, Apr 23, 2014 at 9:32 AM, Mohit Jaggi mohitja...@gmail.com wrote:

 I am trying to run the example linear regression code from

  http://spark.apache.org/docs/latest/mllib-guide.html

 But I am getting the following error...am I missing an import?

 code

 import org.apache.spark._

 import org.apache.spark.mllib.regression.LinearRegressionWithSGD

 import org.apache.spark.mllib.regression.LabeledPoint


 object ModelLR {

   def main(args: Array[String]) {

 val sc = new SparkContext(args(0), SparkLR,

   System.getenv(SPARK_HOME), SparkContext.jarOfClass(this.getClass)
 .toSeq)

 // Load and parse the data

 val data = sc.textFile(mllib/data/ridge-data/lpsa.data)

 val parsedData = data.map { line =

   val parts = line.split(',')

   LabeledPoint(parts(0).toDouble, parts(1).split(' ').map(x =
 x.toDouble).toArray)

  }

 ...snip...

 }

 error

 - polymorphic expression cannot be instantiated to expected type; found :
 [U : Double]Array[U] required:

  org.apache.spark.mllib.linalg.Vector

 - polymorphic expression cannot be instantiated to expected type; found :
 [U : Double]Array[U] required:

  org.apache.spark.mllib.linalg.Vector



scheduler question

2014-04-15 Thread Mohit Jaggi
Hi Folks,
I have some questions about how Spark scheduler works:
- How does Spark know how many resources a job might need?
- How does it fairly share resources between multiple jobs?
- Does it know about data and partition sizes and use that information
for scheduling?

Mohit.