Re: Dataset -- Schema for type scala.collection.Set[scala.Int] is not supported

2019-08-09 Thread Mohit Jaggi
switched to immutable.Set and it works. this is weird as the code in
ScalaReflection.scala seems to support scala.collection.Set

cc: dev list, in case this is a bug

On Thu, Aug 8, 2019 at 8:41 PM Mohit Jaggi  wrote:

> Is this not supported? I found this diff
> <https://github.com/apache/spark/pull/18416/files> and wonder if this is
> a bug or am I doing something wrong?
>
>  see below =
>
> import scala.collection.Set
> case class A(ps: Set[Int], x: Int)
>
> val az = Seq(A(Set(1, 2), 1), A(Set(2), 2))
>
> az.toDS
> java.lang.UnsupportedOperationException: Schema for type
> scala.collection.Set[scala.Int] is not supported
> at
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:724)
> at
> scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:906)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:46)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:723)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1$$anonfun$7.apply(ScalaReflection.scala:388)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1$$anonfun$7.apply(ScalaReflection.scala:387)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.immutable.List.map(List.scala:296)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:387)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:157)
> at
> scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:906)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:46)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor(ScalaReflection.scala:157)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$.deserializerFor(ScalaReflection.scala:145)
> at
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:72)
> at org.apache.spark.sql.Encoders$.product(Encoders.scala:276)
> at
> org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:248)
> at
> org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:34)
> at
> line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:6)
> at
> line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:82)
> at
> line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:84)
> at
> line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:86)
> at
> line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:88)
> at
> line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:90)
> at
> line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:92)
> at
> line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:94)
> at
> line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:96)
> at
> line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:98)
> at
> line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:100)
> at
> line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:102)
> at
> line9f6df40864bf4b14acca

Dataset -- Schema for type scala.collection.Set[scala.Int] is not supported

2019-08-08 Thread Mohit Jaggi
Is this not supported? I found this diff
 and wonder if this is a
bug or am I doing something wrong?

 see below =

import scala.collection.Set
case class A(ps: Set[Int], x: Int)

val az = Seq(A(Set(1, 2), 1), A(Set(2), 2))

az.toDS
java.lang.UnsupportedOperationException: Schema for type
scala.collection.Set[scala.Int] is not supported
at
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789)
at
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:724)
at
scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
at
org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:906)
at
org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:46)
at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:723)
at
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1$$anonfun$7.apply(ScalaReflection.scala:388)
at
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1$$anonfun$7.apply(ScalaReflection.scala:387)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:296)
at
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:387)
at
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:157)
at
scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
at
org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:906)
at
org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:46)
at
org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor(ScalaReflection.scala:157)
at
org.apache.spark.sql.catalyst.ScalaReflection$.deserializerFor(ScalaReflection.scala:145)
at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:72)
at org.apache.spark.sql.Encoders$.product(Encoders.scala:276)
at
org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:248)
at
org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:34)
at
line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:6)
at
line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:82)
at
line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:84)
at
line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:86)
at
line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:88)
at
line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:90)
at
line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:92)
at
line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:94)
at
line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:96)
at
line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:98)
at
line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:100)
at
line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:102)
at
line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw.(command-661233094182065:104)
at
line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw.(command-661233094182065:106)
at
line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw.(command-661233094182065:108)
at
line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw.(command-661233094182065:110)
at
line9f6df40864bf4b14acca9f5c334e0286112.$read.(command-661233094182065:112)
at
line9f6df40864bf4b14acca9f5c334e0286112.$read$.(command-661233094182065:116)
at
line9f6df40864bf4b14acca9f5c334e0286112.$read$.(command-661233094182065)
at
line9f6df40864bf4b14acca9f5c334e0286112.$eval$.$print$lzycompute(:7)
at line9f6df40864bf4b14acca9f5c334e0286112.$ev

Re: dataset best practice question

2019-01-18 Thread Mohit Jaggi
Thanks! I wanted to avoid repeating f1, f2, f3 in class B. I wonder whether
the encoders/decoders work if I use mixins

On Tue, Jan 15, 2019 at 7:57 PM  wrote:

> Hi Mohit,
>
>
>
> I’m not sure that there is a “correct” answer here, but I tend to use
> classes whenever the input or output data represents something meaningful
> (such as a domain model object). I would recommend against creating many
> temporary classes for each and every transformation step as that may be
> difficult to maintain over time.
>
>
>
> Using *withColumn* statements will continue to work, and you don’t need
> to cast to your output class until you’ve setup all tranformations.
> Therefore, you can do things like:
>
>
>
> case class A (f1, f2, f3)
>
> case class B (f1, f2, f3, f4, f5, f6)
>
>
>
> ds_a = spark.read.csv(“path”).as[A]
>
> ds_b = ds_a
>
>   .withColumn(“f4”, someUdf)
>
>   .withColumn(“f5”, someUdf)
>
>   .withColumn(“f6”, someUdf)
>
>   .as[B]
>
>
>
> Kevin
>
>
>
> *From:* Mohit Jaggi 
> *Sent:* Tuesday, January 15, 2019 1:31 PM
> *To:* user 
> *Subject:* dataset best practice question
>
>
>
> Fellow Spark Coders,
>
> I am trying to move from using Dataframes to Datasets for a reasonably
> large code base. Today the code looks like this:
>
>
>
> df_a= read_csv
>
> df_b = df.withColumn ( some_transform_that_adds_more_columns )
>
> //repeat the above several times
>
>
>
> With datasets, this will require defining
>
>
>
> case class A { f1, f2, f3 } //fields from csv file
>
> case class B { f1, f2, f3, f4 } //union of A and new field added by
> some_transform_that_adds_more_columns
>
> //repeat this 10 times
>
>
>
> Is there a better way?
>
>
>
> Mohit.
>


dataset best practice question

2019-01-15 Thread Mohit Jaggi
Fellow Spark Coders,
I am trying to move from using Dataframes to Datasets for a reasonably
large code base. Today the code looks like this:

df_a= read_csv
df_b = df.withColumn ( some_transform_that_adds_more_columns )
//repeat the above several times

With datasets, this will require defining

case class A { f1, f2, f3 } //fields from csv file
case class B { f1, f2, f3, f4 } //union of A and new field added by
some_transform_that_adds_more_columns
//repeat this 10 times

Is there a better way?

Mohit.


Re: Pyspark access to scala/java libraries

2018-07-17 Thread Mohit Jaggi
Thanks 0xF0F0F0 and Ashutosh for the pointers.

Holden,
I am trying to look into sparklingml...what am I looking for? Also which
chapter/page of your book should I look at?

Mohit.

On Sun, Jul 15, 2018 at 3:02 AM Holden Karau  wrote:

> If you want to see some examples in a library shows a way to do it -
> https://github.com/sparklingpandas/sparklingml and high performance spark
> also talks about it.
>
> On Sun, Jul 15, 2018, 11:57 AM <0xf0f...@protonmail.com.invalid> wrote:
>
>> Check
>> https://stackoverflow.com/questions/31684842/calling-java-scala-function-from-a-task
>>
>> ​Sent with ProtonMail Secure Email.​
>>
>> ‐‐‐ Original Message ‐‐‐
>>
>> On July 15, 2018 8:01 AM, Mohit Jaggi  wrote:
>>
>> > Trying again…anyone know how to make this work?
>> >
>> > > On Jul 9, 2018, at 3:45 PM, Mohit Jaggi mohitja...@gmail.com wrote:
>> > >
>> > > Folks,
>> > >
>> > > I am writing some Scala/Java code and want it to be usable from
>> pyspark.
>> > >
>> > > For example:
>> > >
>> > > class MyStuff(addend: Int) {
>> > >
>> > > def myMapFunction(x: Int) = x + addend
>> > >
>> > > }
>> > >
>> > > I want to call it from pyspark as:
>> > >
>> > > df = ...
>> > >
>> > > mystuff = sc._jvm.MyStuff(5)
>> > >
>> > > df[‘x’].map(lambda x: mystuff.myMapFunction(x))
>> > >
>> > > How can I do this?
>> > >
>> > > Mohit.
>> >
>> > --
>> >
>> > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>>
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>>


Re: Pyspark access to scala/java libraries

2018-07-14 Thread Mohit Jaggi
Trying again…anyone know how to make this work?

> On Jul 9, 2018, at 3:45 PM, Mohit Jaggi  wrote:
> 
> Folks,
> I am writing some Scala/Java code and want it to be usable from pyspark.
> 
> For example:
> class MyStuff(addend: Int)  {
>   def myMapFunction(x: Int) = x + addend
> }
> 
> I want to call it from pyspark as:
> 
> df = ...
> mystuff = sc._jvm.MyStuff(5)
> df[‘x’].map(lambda x: mystuff.myMapFunction(x))
> 
> How can I do this?
> 
> Mohit.
> 
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Pyspark access to scala/java libraries

2018-07-09 Thread Mohit Jaggi
Folks,
I am writing some Scala/Java code and want it to be usable from pyspark.

For example:
class MyStuff(addend: Int)  {
def myMapFunction(x: Int) = x + addend
}

I want to call it from pyspark as:

df = ...
mystuff = sc._jvm.MyStuff(5)
df[‘x’].map(lambda x: mystuff.myMapFunction(x))

How can I do this?

Mohit.



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: SparkILoop doesn't run

2016-11-28 Thread Mohit Jaggi
Thanks, I will look into the classpaths and check.

On Mon, Nov 21, 2016 at 3:28 PM, Jakob Odersky  wrote:

> The issue I was having had to do with missing classpath settings; in
> sbt it can be solved by setting `fork:=true` to run tests in new jvms
> with appropriate classpaths.
>
> Mohit, from the looks of the error message, it also appears to be some
> classpath issue. This typically happens when there are libraries of
> multiple scala versions on the same classpath. You mention that it
> worked before, can you recall what libraries you upgraded before it
> broke?
>
> --Jakob
>
> On Mon, Nov 21, 2016 at 2:34 PM, Jakob Odersky  wrote:
> > Trying it out locally gave me an NPE. I'll look into it in more
> > detail, however the SparkILoop.run() method is dead code. It's used
> > nowhere in spark and can be removed without any issues.
> >
> > On Thu, Nov 17, 2016 at 11:16 AM, Mohit Jaggi 
> wrote:
> >> Thanks Holden. I did post to the user list but since this is not a
> common
> >> case, I am trying the developer list as well. Yes there is a reason: I
> get
> >> code from somewhere e.g. a notebook. This type of code did work for me
> >> before.
> >>
> >> Mohit Jaggi
> >> Founder,
> >> Data Orchard LLC
> >> www.dataorchardllc.com
> >>
> >>
> >>
> >>
> >> On Nov 17, 2016, at 8:53 AM, Holden Karau  wrote:
> >>
> >> Moving to user list
> >>
> >> So this might be a better question for the user list - but is there a
> reason
> >> you are trying to use the SparkILoop for tests?
> >>
> >> On Thu, Nov 17, 2016 at 5:47 PM Mohit Jaggi 
> wrote:
> >>>
> >>>
> >>>
> >>> I am trying to use SparkILoop to write some tests(shown below) but the
> >>> test hangs with the following stack trace. Any idea what is going on?
> >>>
> >>>
> >>> import org.apache.log4j.{Level, LogManager}
> >>> import org.apache.spark.repl.SparkILoop
> >>> import org.scalatest.{BeforeAndAfterAll, FunSuite}
> >>>
> >>> class SparkReplSpec extends FunSuite with BeforeAndAfterAll {
> >>>
> >>>   override def beforeAll(): Unit = {
> >>>   }
> >>>
> >>>   override def afterAll(): Unit = {
> >>>   }
> >>>
> >>>   test("yay!") {
> >>> val rootLogger = LogManager.getRootLogger
> >>> val logLevel = rootLogger.getLevel
> >>> rootLogger.setLevel(Level.ERROR)
> >>>
> >>> val output = SparkILoop.run(
> >>>   """
> >>> |println("hello")
> >>>   """.stripMargin)
> >>>
> >>> println(s" $output ")
> >>>
> >>>   }
> >>> }
> >>>
> >>>
> >>> /Library/Java/JavaVirtualMachines/jdk1.8.0_
> 66.jdk/Contents/Home/bin/java
> >>> -Dspark.master=local[*] -Didea.launcher.port=7532
> >>> "-Didea.launcher.bin.path=/Applications/IntelliJ IDEA
> CE.app/Contents/bin"
> >>> -Dfile.encoding=UTF-8 -classpath "/Users/mohit/Library/Application
> >>> Support/IdeaIC2016.2/Scala/lib/scala-plugin-runners.jar:/Library/Java/
> JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/jre/lib/
> charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_
> 66.jdk/Contents/Home/jre/lib/deploy.jar:/Library/Java/
> JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/jre/lib/
> ext/cldrdata.jar:/Library/Java/JavaVirtualMachines/jdk1.
> 8.0_66.jdk/Contents/Home/jre/lib/ext/dnsns.jar:/Library/
> Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/jre/
> lib/ext/jaccess.jar:/Library/Java/JavaVirtualMachines/jdk1.
> 8.0_66.jdk/Contents/Home/jre/lib/ext/jfxrt.jar:/Library/
> Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/jre/
> lib/ext/localedata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_
> 66.jdk/Contents/Home/jre/lib/ext/nashorn.jar:/Library/Java/
> JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/jre/lib/
> ext/sunec.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_
> 66.jdk/Contents/Home/jre/lib/ext/sunjce_provider.jar:/Library/Java/
> JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/jre/lib/
> ext/sunpkcs11.jar:/Library/Java/JavaVirtualMachines/jdk1.
> 8.0_66.jdk/Contents/Home/jre/lib/ext/zipfs.jar:/Library/
> Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/jre/
> lib/javaws.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_
> 66.jdk/Contents/Home/jre/lib/jce.

SparkILoop doesn't run

2016-11-16 Thread Mohit Jaggi
nsc.interpreter.ReplGlobal$$anon$1$$anon$2.typed(ReplGlobal.scala:36)
at scala.tools.nsc.typechecker.Typers$Typer.typed(Typers.scala:5448)
at 
scala.tools.nsc.typechecker.Analyzer$typerFactory$$anon$3.apply(Analyzer.scala:102)
at 
scala.tools.nsc.Global$GlobalPhase$$anonfun$applyPhase$1.apply$mcV$sp(Global.scala:440)
at scala.tools.nsc.Global$GlobalPhase.withCurrentUnit(Global.scala:431)
at scala.tools.nsc.Global$GlobalPhase.applyPhase(Global.scala:440)
at 
scala.tools.nsc.typechecker.Analyzer$typerFactory$$anon$3$$anonfun$run$1.apply(Analyzer.scala:94)
at 
scala.tools.nsc.typechecker.Analyzer$typerFactory$$anon$3$$anonfun$run$1.apply(Analyzer.scala:93)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at 
scala.tools.nsc.typechecker.Analyzer$typerFactory$$anon$3.run(Analyzer.scala:93)
at scala.tools.nsc.Global$Run.compileUnitsInternal(Global.scala:1501)
at scala.tools.nsc.Global$Run.compileUnits(Global.scala:1486)
at scala.tools.nsc.Global$Run.compileSources(Global.scala:1481)
at 
scala.tools.nsc.interpreter.IMain.compileSourcesKeepingRun(IMain.scala:435)
at 
scala.tools.nsc.interpreter.IMain$ReadEvalPrint.compileAndSaveRun(IMain.scala:855)
at 
scala.tools.nsc.interpreter.IMain$ReadEvalPrint.compile(IMain.scala:813)
at scala.tools.nsc.interpreter.IMain.bind(IMain.scala:675)
at scala.tools.nsc.interpreter.IMain.bind(IMain.scala:712)
at 
scala.tools.nsc.interpreter.IMain$$anonfun$quietBind$1.apply(IMain.scala:711)
at 
scala.tools.nsc.interpreter.IMain$$anonfun$quietBind$1.apply(IMain.scala:711)
at scala.tools.nsc.interpreter.IMain.beQuietDuring(IMain.scala:214)
at scala.tools.nsc.interpreter.IMain.quietBind(IMain.scala:711)
at 
scala.tools.nsc.interpreter.ILoop.scala$tools$nsc$interpreter$ILoop$$loopPostInit(ILoop.scala:891)
at 
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1$$anonfun$apply$mcZ$sp$1.apply$mcZ$sp(ILoop.scala:917)
at 
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1$$anonfun$apply$mcZ$sp$1.apply(ILoop.scala:915)
at 
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1$$anonfun$apply$mcZ$sp$1.apply(ILoop.scala:915)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at 
scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Process finished with exit code 137 (interrupted by signal 9: SIGKILL)



Mohit Jaggi
Founder,
Data Orchard LLC
www.dataorchardllc.com






Re: LinearRegressionWithSGD and Rank Features By Importance

2016-11-03 Thread Mohit Jaggi
For linear regression, it should be fairly easy. Just sort the co-efficients :)

Mohit Jaggi
Founder,
Data Orchard LLC
www.dataorchardllc.com




> On Nov 3, 2016, at 3:35 AM, Carlo.Allocca  wrote:
> 
> Hi All,
> 
> I am using SPARK and in particular the MLib library.
> 
> import org.apache.spark.mllib.regression.LabeledPoint;
> import org.apache.spark.mllib.regression.LinearRegressionModel;
> import org.apache.spark.mllib.regression.LinearRegressionWithSGD;
> 
> For my problem I am using the LinearRegressionWithSGD and I would like to 
> perform a “Rank Features By Importance”.
> 
> I checked the documentation and it seems that does not provide such methods.
> 
> Am I missing anything?  Please, could you provide any help on this?
> Should I change the approach?
> 
> Many Thanks in advance,
> 
> Best Regards,
> Carlo
> 
> 
> -- The Open University is incorporated by Royal Charter (RC 000391), an 
> exempt charity in England & Wales and a charity registered in Scotland (SC 
> 038302). The Open University is authorised and regulated by the Financial 
> Conduct Authority.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



using SparkILoop.run

2016-09-26 Thread Mohit Jaggi
I want to use the following API  SparkILoop.run(...). I am writing a test
case as that passes some scala code to spark interpreter and receives
result as string.

I couldn't figure out how to pass the right settings into the run() method.
I get an error about "master' not being set.

object SparkILoop {

  /**
   * Creates an interpreter loop with default settings and feeds
   * the given code to it as input.
   */
  def run(code: String, sets: Settings = new Settings): String = {
import java.io.{ BufferedReader, StringReader, OutputStreamWriter }

stringFromStream { ostream =>
  Console.withOut(ostream) {
val input = new BufferedReader(new StringReader(code))
val output = new JPrintWriter(new OutputStreamWriter(ostream), true)
val repl = new SparkILoop(input, output)

if (sets.classpath.isDefault) {
  sets.classpath.value = sys.props("java.class.path")
}
repl process sets
  }
}
  }
  def run(lines: List[String]): String = run(lines.map(_ + "\n").mkString)
}


Re: Model abstract class in spark ml

2016-08-31 Thread Mohit Jaggi
Thanks Cody. That was a good explanation!

Mohit Jaggi
Founder,
Data Orchard LLC
www.dataorchardllc.com




> On Aug 31, 2016, at 7:32 AM, Cody Koeninger  wrote:
> 
> http://blog.originate.com/blog/2014/02/27/types-inside-types-in-scala/
> 
> On Wed, Aug 31, 2016 at 2:19 AM, Sean Owen  wrote:
>> Weird, I recompiled Spark with a similar change to Model and it seemed
>> to work but maybe I missed a step in there.
>> 
>> On Wed, Aug 31, 2016 at 6:33 AM, Mohit Jaggi  wrote:
>>> I think I figured it out. There is indeed "something deeper in Scala” :-)
>>> 
>>> abstract class A {
>>>  def a: this.type
>>> }
>>> 
>>> class AA(i: Int) extends A {
>>>  def a = this
>>> }
>>> 
>>> the above works ok. But if you return anything other than “this”, you will
>>> get a compile error.
>>> 
>>> abstract class A {
>>>  def a: this.type
>>> }
>>> 
>>> class AA(i: Int) extends A {
>>>  def a = new AA(1)
>>> }
>>> 
>>> Error:(33, 11) type mismatch;
>>> found   : com.dataorchard.datagears.AA
>>> required: AA.this.type
>>>  def a = new AA(1)
>>>  ^
>>> 
>>> So you have to do:
>>> 
>>> abstract class A[T <: A[T]]  {
>>>  def a: T
>>> }
>>> 
>>> class AA(i: Int) extends A[AA] {
>>>  def a = new AA(1)
>>> }
>>> 
>>> 
>>> 
>>> Mohit Jaggi
>>> Founder,
>>> Data Orchard LLC
>>> www.dataorchardllc.com
>>> 
>>> 
>> 
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Model abstract class in spark ml

2016-08-30 Thread Mohit Jaggi
I think I figured it out. There is indeed "something deeper in Scala” :-)

abstract class A {
  def a: this.type
}

class AA(i: Int) extends A {
  def a = this
}
the above works ok. But if you return anything other than “this”, you will get 
a compile error.

abstract class A {
  def a: this.type
}

class AA(i: Int) extends A {
  def a = new AA(1)
}
Error:(33, 11) type mismatch;
 found   : com.dataorchard.datagears.AA
 required: AA.this.type
  def a = new AA(1)
  ^

So you have to do:

abstract class A[T <: A[T]]  {
  def a: T
}

class AA(i: Int) extends A[AA] {
  def a = new AA(1)
}


Mohit Jaggi
Founder,
Data Orchard LLC
www.dataorchardllc.com




> On Aug 30, 2016, at 9:51 PM, Mohit Jaggi  wrote:
> 
> thanks Sean. I am cross posting on dev to see why the code was written that 
> way. Perhaps, this.type doesn’t do what is needed.
> 
> Mohit Jaggi
> Founder,
> Data Orchard LLC
> www.dataorchardllc.com <http://www.dataorchardllc.com/>
> 
> 
> 
> 
>> On Aug 30, 2016, at 2:08 PM, Sean Owen > <mailto:so...@cloudera.com>> wrote:
>> 
>> I think it's imitating, for example, how Enum is delcared in Java:
>> 
>> abstract class Enum>
>> 
>> this is done so that Enum can refer to the actual type of the derived
>> enum class when declaring things like public final int compareTo(E o)
>> to implement Comparable. The type is redundant in a sense, because
>> you effectively have MyEnum extending Enum.
>> 
>> Java allows this self-referential definition. However Scala has
>> "this.type" for this purpose and (unless I'm about to learn something
>> deeper about Scala) it would have been the better way to express this
>> so that Model methods can for example state that copy() returns a
>> Model of the same concrete type.
>> 
>> I don't know if it can be changed now without breaking compatibility
>> but you're welcome to give it a shot with MiMa to see. It does
>> compile, using this.type.
>> 
>> 
>> On Tue, Aug 30, 2016 at 9:47 PM, Mohit Jaggi > <mailto:mohitja...@gmail.com>> wrote:
>>> Folks,
>>> I am having a bit of trouble understanding the following:
>>> 
>>> abstract class Model[M <: Model[M]]
>>> 
>>> Why is M <: Model[M]?
>>> 
>>> Cheers,
>>> Mohit.
>>> 
> 



Re: Model abstract class in spark ml

2016-08-30 Thread Mohit Jaggi
thanks Sean. I am cross posting on dev to see why the code was written that
way. Perhaps, this.type doesn’t do what is needed.

Mohit Jaggi
Founder,
Data Orchard LLC
www.dataorchardllc.com




On Aug 30, 2016, at 2:08 PM, Sean Owen  wrote:

I think it's imitating, for example, how Enum is delcared in Java:

abstract class Enum>

this is done so that Enum can refer to the actual type of the derived
enum class when declaring things like public final int compareTo(E o)
to implement Comparable. The type is redundant in a sense, because
you effectively have MyEnum extending Enum.

Java allows this self-referential definition. However Scala has
"this.type" for this purpose and (unless I'm about to learn something
deeper about Scala) it would have been the better way to express this
so that Model methods can for example state that copy() returns a
Model of the same concrete type.

I don't know if it can be changed now without breaking compatibility
but you're welcome to give it a shot with MiMa to see. It does
compile, using this.type.


On Tue, Aug 30, 2016 at 9:47 PM, Mohit Jaggi  wrote:

Folks,
I am having a bit of trouble understanding the following:

abstract class Model[M <: Model[M]]

Why is M <: Model[M]?

Cheers,
Mohit.


Re: Model abstract class in spark ml

2016-08-30 Thread Mohit Jaggi
thanks Sean. I am cross posting on dev to see why the code was written that 
way. Perhaps, this.type doesn’t do what is needed.

Mohit Jaggi
Founder,
Data Orchard LLC
www.dataorchardllc.com




> On Aug 30, 2016, at 2:08 PM, Sean Owen  wrote:
> 
> I think it's imitating, for example, how Enum is delcared in Java:
> 
> abstract class Enum>
> 
> this is done so that Enum can refer to the actual type of the derived
> enum class when declaring things like public final int compareTo(E o)
> to implement Comparable. The type is redundant in a sense, because
> you effectively have MyEnum extending Enum.
> 
> Java allows this self-referential definition. However Scala has
> "this.type" for this purpose and (unless I'm about to learn something
> deeper about Scala) it would have been the better way to express this
> so that Model methods can for example state that copy() returns a
> Model of the same concrete type.
> 
> I don't know if it can be changed now without breaking compatibility
> but you're welcome to give it a shot with MiMa to see. It does
> compile, using this.type.
> 
> 
> On Tue, Aug 30, 2016 at 9:47 PM, Mohit Jaggi  wrote:
>> Folks,
>> I am having a bit of trouble understanding the following:
>> 
>> abstract class Model[M <: Model[M]]
>> 
>> Why is M <: Model[M]?
>> 
>> Cheers,
>> Mohit.
>> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Model abstract class in spark ml

2016-08-30 Thread Mohit Jaggi
Folks,
I am having a bit of trouble understanding the following:

abstract class Model[M <: Model[M]]

Why is M <: Model[M]?

Cheers,
Mohit.


Re: How Spark HA works

2016-08-23 Thread Mohit Jaggi
what did you mean by “link” ? an HTTP URL to the spark monitoring UI? AFAIK, it 
is not directly supported. i typically go to both masters and check which one 
is active :-)

did you check if the failover actually happened in other ways (i don’t know 
what the znode should say)? you can try sending a spark job and if you used the 
right master config in your code, it should go to the new master. that will 
confirm that failover worked.



Mohit Jaggi
Founder,
Data Orchard LLC
www.dataorchardllc.com




> On Aug 19, 2016, at 8:56 PM, Charles Nnamdi Akalugwu  
> wrote:
> 
> I am experiencing this exact issue. Does anyone know what's going on with the 
> zookeeper setup?
> 
> On Jul 5, 2016 10:34 AM, "Akmal Abbasov"  <mailto:akmal.abba...@icloud.com>> wrote:
> >
> > Hi, 
> > I'm trying to understand how Spark HA works. I'm using Spark 1.6.1 and 
> > Zookeeper 3.4.6.
> > I've add the following line to $SPARK_HOME/conf/spark-env.sh
> > export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER 
> > -Dspark.deploy.zookeeper.url=zk1:2181,zk2:2181,zk3:2181 
> > -Dspark.deploy.zookeeper.dir=/spark
> > It's working so far.
> > I'd like to setup a link which will always go to active master UI(I'm using 
> > Spark in Standalone).  
> > I've checked the znode /spark, and it contains 
> > [leader_election, master_status]
> > I'm assuming that master_status znode will contain ip address of the 
> > current active master, is it true? Because in my case this znode isn't 
> > updated after failover.
> > And how /spark/leader_election works, because it doesn't contain any data.
> > Thank you.
> >
> > Regards,
> > Akmal
> >
> >
> 



Re: Using spark to distribute jobs to standalone servers

2016-08-23 Thread Mohit Jaggi
It is a bit hacky but possible. A lot depends on what kind of queries etc you 
want to run. You could write a data source that reads your data and keeps it 
partitioned the way you want, then use mapPartitions() to execute your code…


Mohit Jaggi
Founder,
Data Orchard LLC
www.dataorchardllc.com




> On Aug 22, 2016, at 7:59 AM, Larry White  wrote:
> 
> Hi,
> 
> I have a bit of an unusual use-case and would greatly appreciate some 
> feedback as to whether it is a good fit for spark.
> 
> I have a network of compute/data servers configured as a tree as shown below
> controller
> server 1
> server 2
> server 3
> etc.
> There are ~20 servers, but the number is increasing to ~100. 
> 
> Each server contains a different dataset, all in the same format. Each is 
> hosted by a different organization, and the data on every individual server 
> is unique to that organization.
> 
> Data cannot be replicated across servers using RDDs or any other means, for 
> privacy/ownership reasons.
> 
> Data cannot be retrieved to the controller, except in aggregate form, as the 
> result of a query, for example. 
> 
> Because of this, there are currently no operations that treats the data as if 
> it were a single data set: We could run a classifier on each site 
> individually, but cannot for legal reasons, pull all the data into a single 
> physical dataframe to run the classifier on all of it together. 
> 
> The servers are located across a wide geographic region (1,000s of miles)
> 
> We would like to send jobs from the controller to be executed in parallel on 
> all the servers, and retrieve the results to the controller. The jobs would 
> consist of SQL-Heavy Java code for 'production' queries, and python or R code 
> for ad-hoc queries and predictive modeling. 
> 
> Spark seems to have the capability to meet many of the individual 
> requirements, but is it a reasonable platform overall for building this 
> application?
> 
> Thank you very much for your assistance. 
> 
> Larry 
>  



Re: Spark with Parquet

2016-08-23 Thread Mohit Jaggi
something like this should work….

val df = sparkSession.read.csv(“myfile.csv”) //you may have to provide a schema 
if the guessed schema is not accurate
df.write.parquet(“myfile.parquet”)


Mohit Jaggi
Founder,
Data Orchard LLC
www.dataorchardllc.com




> On Apr 27, 2014, at 11:41 PM, Sai Prasanna  wrote:
> 
> Hi All,
> 
> I want to store a csv-text file in Parquet format in HDFS and then do some 
> processing in Spark.
> 
> Somehow my search to find the way to do was futile. More help was available 
> for parquet with impala. 
> 
> Any guidance here? Thanks !!
> 



Re: SAS_TO_SPARK_SQL_(Could be a Bug?)

2016-06-12 Thread Mohit Jaggi
Looks like a bug in the code generating the SQL query…why would it be specific 
to SAS, I can’t guess. Did you try the same with another database? As a 
workaround you can write the select statement yourself instead of just 
providing the table name.

> On Jun 11, 2016, at 6:27 PM, Ajay Chander  wrote:
> 
> I tried implementing the same functionality through Scala as well. But no 
> luck so far. Just wondering if anyone here tried using Spark SQL to read SAS 
> dataset? Thank you
> 
> Regards,
> Ajay
> 
> On Friday, June 10, 2016, Ajay Chander  > wrote:
> Mich, I completely agree with you. I built another Spark SQL application 
> which reads data from MySQL and SQL server and writes the data into 
> Hive(parquet+snappy format). I have this problem only when I read directly 
> from remote SAS system. The interesting part is I am using same driver to 
> read data through pure Java app and spark app. It works fine in Java app, so 
> I cannot blame SAS driver here. Trying to understand where the problem could 
> be. Thanks for sharing this with me.
> 
> On Friday, June 10, 2016, Mich Talebzadeh  > wrote:
> I personally use Scala to do something similar. For example here I extract 
> data from an Oracle table and store in ORC table in Hive. This is compiled 
> via sbt as run with SparkSubmit.
> 
> It is similar to your code but in Scala. Note that I do not enclose my column 
> names in double quotes.  
> 
> import org.apache.spark.SparkContext
> import org.apache.spark.SparkConf
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.hive.HiveContext
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.SQLContext
> import org.apache.spark.sql.functions._
> 
> object ETL_scratchpad_dummy {
>   def main(args: Array[String]) {
>   val conf = new SparkConf().
>setAppName("ETL_scratchpad_dummy").
>set("spark.driver.allowMultipleContexts", "true")
>   val sc = new SparkContext(conf)
>   // Create sqlContext based on HiveContext
>   val sqlContext = new HiveContext(sc)
>   import sqlContext.implicits._
>   val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>   println ("\nStarted at"); sqlContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss') 
> ").collect.foreach(println)
>   HiveContext.sql("use oraclehadoop")
>   var _ORACLEserver : String = "jdbc:oracle:thin:@rhes564:1521:mydb12"
>   var _username : String = "scratchpad"
>   var _password : String = ""
> 
>   // Get data from Oracle table scratchpad.dummy
>   val d = HiveContext.load("jdbc",
>   Map("url" -> _ORACLEserver,
>   "dbtable" -> "(SELECT to_char(ID) AS ID, to_char(CLUSTERED) AS CLUSTERED, 
> to_char(SCATTERED) AS SCATTERED, to_char(RANDOMISED) AS RANDOMISED, 
> RANDOM_STRING, SMALL_VC, PADDING FROM scratchpad.dummy)",
>   "user" -> _username,
>   "password" -> _password))
>  
>d.registerTempTable("tmp")
>   //
>   // Need to create and populate target ORC table oraclehadoop.dummy
>   //
>   HiveContext.sql("use oraclehadoop")
>   //
>   // Drop and create table dummy
>   //
>   HiveContext.sql("DROP TABLE IF EXISTS oraclehadoop.dummy")
>   var sqltext : String = ""
>   sqltext = """
>   CREATE TABLE oraclehadoop.dummy (
>  ID INT
>, CLUSTERED INT
>, SCATTERED INT
>, RANDOMISED INT
>, RANDOM_STRING VARCHAR(50)
>, SMALL_VC VARCHAR(10)
>, PADDING  VARCHAR(10)
>   )
>   CLUSTERED BY (ID) INTO 256 BUCKETS
>   STORED AS ORC
>   TBLPROPERTIES (
>   "orc.create.index"="true",
>   "orc.bloom.filter.columns"="ID",
>   "orc.bloom.filter.fpp"="0.05",
>   "orc.compress"="SNAPPY",
>   "orc.stripe.size"="16777216",
>   "orc.row.index.stride"="1" )
>   """
>HiveContext.sql(sqltext)
>   //
>   // Put data in Hive table. Clean up is already done
>   //
>   sqltext = """
>   INSERT INTO TABLE oraclehadoop.dummy
>   SELECT
>   ID
> , CLUSTERED
> , SCATTERED
> , RANDOMISED
> , RANDOM_STRING
> , SMALL_VC
> , PADDING
>   FROM tmp
>   """
>HiveContext.sql(sqltext)
>   println ("\nFinished at"); sqlContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss') 
> ").collect.foreach(println)
>   sys.exit()
>  }
> }
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> 
>  
> http://talebzadehmich.wordpress.com 
>  
> 
> On 10 June 2016 at 23:38, Ajay Chander > wrote:
> Hi Mich,
> 
> Thanks for the response. If you look at my programs, I am not writings my 
> queries to include column names in a pair of "". My driver in spark program 
> is generating such query with column names in "" which I do not want. On the 
> other hand, I am using the same driver in my pure Java program which is 
> attached, in that program the same driver is generating 

Re: Calling Python code from Scala

2016-04-18 Thread Mohit Jaggi
When faced with this issue I followed the approach taken by pyspark and used 
py4j. You have to:
- ensure your code is Java compatible
- use py4j to call the java (scala) code from python


> On Apr 18, 2016, at 10:29 AM, Holden Karau  wrote:
> 
> So if there is just a few python functions your interested in accessing you 
> can also use the pipe interface (you'll have to manually serialize your data 
> on both ends in ways that Python and Scala can respectively parse) - but its 
> a very generic approach and can work with many different languages.
> 
> On Mon, Apr 18, 2016 at 10:23 AM, Ndjido Ardo BAR  > wrote:
> Hi Didier,
> 
> I think with PySpark you can wrap your legacy Python functions into UDFs and 
> use it in your DataFrames. But you have to use DataFrames instead of RDD. 
> 
> cheers,
> Ardo
> 
> On Mon, Apr 18, 2016 at 7:13 PM, didmar  > wrote:
> Hi,
> 
> I have a Spark project in Scala and I would like to call some Python
> functions from within the program.
> Both parts are quite big, so re-coding everything in one language is not
> really an option.
> 
> The workflow would be:
> - Creating a RDD with Scala code
> - Mapping a Python function over this RDD
> - Using the result directly in Scala
> 
> I've read about PySpark internals, but that didn't help much.
> Is it possible to do so, and preferably in an efficent manner ?
> 
> Cheers,
> Didier
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Calling-Python-code-from-Scala-tp26798.html
>  
> 
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 
> 
> 
> 
> 
> -- 
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau 


spark dataframe gc

2015-07-23 Thread Mohit Jaggi
Hi There,
I am testing Spark DataFrame and havn't been able to get my code to finish
due to what I suspect are GC issues. My guess is that GC interferes with
heartbeating and executors are detected as failed. The data is ~50 numeric
columns, ~100million rows in a CSV file.
We are doing a groupBy using one of the columns and trying to calculate the
average of each of the other columns. The groupBy key has about 250k unique
values.
It seems that Spark is creating a lot of temp objects (see jmap output
below) while calculating the average which I am surprised to see. Why
doesn't it use the same temp variable? Am I missing something? Do I need to
specify a config flag to enable code generation and not do this?


Mohit.

[x app-20150723142604-0002]$ jmap -histo 12209


 num #instances #bytes  class name

--

   1: 258615458 8275694656  scala.collection.immutable.$colon$colon

   2: 103435856 7447381632
org.apache.spark.sql.catalyst.expressions.Cast

   3: 103435856 4964921088
org.apache.spark.sql.catalyst.expressions.Coalesce

   4:   1158643 4257400112  [B

   5:  51717929 4137434320
org.apache.spark.sql.catalyst.expressions.SumFunction

   6:  51717928 3723690816
org.apache.spark.sql.catalyst.expressions.Add

   7:  51717929 2896204024
org.apache.spark.sql.catalyst.expressions.CountFunction

   8:  51717928 2896203968
org.apache.spark.sql.catalyst.expressions.MutableLiteral

   9:  51717928 2482460544
org.apache.spark.sql.catalyst.expressions.Literal

  10:  51803728 1243289472  java.lang.Double

  11:  51717755 1241226120
org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToDouble$5

  12:975810  850906320
[Lorg.apache.spark.sql.catalyst.expressions.AggregateFunction;

  13:  51717754  827484064
org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$org$apache$spark$sql$catalyst$expressions$Cast$$cast$1

  14:982451   47157648  java.util.HashMap$Entry

  15:981132   34981720  [Ljava.lang.Object;

  16:   1049984   25199616  org.apache.spark.sql.types.UTF8String

  17:978296   23479104
org.apache.spark.sql.catalyst.expressions.GenericRow

  18:117166   15944560  

  19:117166   14986224  

  20:  1567   12891952  [Ljava.util.HashMap$Entry;

  21:  9103   10249728  

  22:  91039278592  

  23:  50725691320  [I

  24:  72815335040  

  25: 464204769600  [C

  26:1059843391488
io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry


Re: Grouping runs of elements in a RDD

2015-07-02 Thread Mohit Jaggi
if you are joining successive lines together based on a predicate, then you
are doing a "flatMap" not an "aggregate". you are on the right track with a
multi-pass solution. i had the same challenge when i needed a sliding
window over an RDD(see below).

[ i had suggested that the sliding window API be moved to spark-core. not
sure if that happened ]

- previous posts ---

http://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.mllib.rdd.RDDFunctions

> On Fri, Jan 30, 2015 at 12:27 AM, Mohit Jaggi 
> wrote:
>
>
> http://mail-archives.apache.org/mod_mbox/spark-user/201405.mbox/%3ccalrvtpkn65rolzbetc+ddk4o+yjm+tfaf5dz8eucpl-2yhy...@mail.gmail.com%3E
>
> you can use the MLLib function or do the following (which is what I had
> done):
>
> - in first pass over the data, using mapPartitionWithIndex, gather the
> first item in each partition. you can use collect (or aggregator) for this.
> “key” them by the partition index. at the end, you will have a map
>(partition index) --> first item
> - in the second pass over the data, using mapPartitionWithIndex again,
> look at two (or in the general case N items at a time, you can use scala’s
> sliding iterator) items at a time and check the time difference(or any
> sliding window computation). To this mapParitition, pass the map created in
> previous step. You will need to use them to check the last item in this
> partition.
>
> If you can tolerate a few inaccuracies then you can just do the second
> step. You will miss the “boundaries” of the partitions but it might be
> acceptable for your use case.


On Tue, Jun 30, 2015 at 12:21 PM, RJ Nowling  wrote:

> That's an interesting idea!  I hadn't considered that.  However, looking
> at the Partitioner interface, I would need to know from looking at a single
> key which doesn't fit my case, unfortunately.  For my case, I need to
> compare successive pairs of keys.  (I'm trying to re-join lines that were
> split prematurely.)
>
> On Tue, Jun 30, 2015 at 2:07 PM, Abhishek R. Singh <
> abhis...@tetrationanalytics.com> wrote:
>
>> could you use a custom partitioner to preserve boundaries such that all
>> related tuples end up on the same partition?
>>
>> On Jun 30, 2015, at 12:00 PM, RJ Nowling  wrote:
>>
>> Thanks, Reynold.  I still need to handle incomplete groups that fall
>> between partition boundaries. So, I need a two-pass approach. I came up
>> with a somewhat hacky way to handle those using the partition indices and
>> key-value pairs as a second pass after the first.
>>
>> OCaml's std library provides a function called group() that takes a break
>> function that operators on pairs of successive elements.  It seems a
>> similar approach could be used in Spark and would be more efficient than my
>> approach with key-value pairs since you know the ordering of the partitions.
>>
>> Has this need been expressed by others?
>>
>> On Tue, Jun 30, 2015 at 1:03 PM, Reynold Xin  wrote:
>>
>>> Try mapPartitions, which gives you an iterator, and you can produce an
>>> iterator back.
>>>
>>>
>>> On Tue, Jun 30, 2015 at 11:01 AM, RJ Nowling  wrote:
>>>
>>>> Hi all,
>>>>
>>>> I have a problem where I have a RDD of elements:
>>>>
>>>> Item1 Item2 Item3 Item4 Item5 Item6 ...
>>>>
>>>> and I want to run a function over them to decide which runs of elements
>>>> to group together:
>>>>
>>>> [Item1 Item2] [Item3] [Item4 Item5 Item6] ...
>>>>
>>>> Technically, I could use aggregate to do this, but I would have to use
>>>> a List of List of T which would produce a very large collection in memory.
>>>>
>>>> Is there an easy way to accomplish this?  e.g.,, it would be nice to
>>>> have a version of aggregate where the combination function can return a
>>>> complete group that is added to the new RDD and an incomplete group which
>>>> is passed to the next call of the reduce function.
>>>>
>>>> Thanks,
>>>> RJ
>>>>
>>>
>>>
>>
>>
>


Re: Spark SQL v MemSQL/Voltdb

2015-05-28 Thread Mohit Jaggi
I have used VoltDB and Spark. The use cases for the two are quite different. 
VoltDB is intended for transactions and also supports queries on the 
same(custom to voltdb) store. Spark(SQL) is NOT suitable for transactions; it 
is designed for querying immutable data (which may exist in several different 
forms of stores).

> On May 28, 2015, at 7:48 AM, Ashish Mukherjee  
> wrote:
> 
> Hello,
> 
> I was wondering if there is any documented comparison of SparkSQL with 
> MemSQL/VoltDB kind of in-memory SQL databases. MemSQL etc. too allow queries 
> to be run in a clustered environment. What is  the major differentiation?
> 
> Regards,
> Ashish


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Parsing CSV files in Spark

2015-02-06 Thread Mohit Jaggi
As Sean said, this is just a few lines of code. You can see an example here:

https://github.com/AyasdiOpenSource/bigdf/blob/master/src/main/scala/com/ayasdi/bigdf/DF.scala#L660
 



> On Feb 6, 2015, at 7:29 AM, Charles Feduke  wrote:
> 
> I've been doing a bunch of work with CSVs in Spark, mostly saving them as a 
> merged CSV (instead of the various part-n files). You might find the 
> following links useful:
> 
> - This article is about combining the part files and outputting a header as 
> the first line in the merged results:
> 
> http://java.dzone.com/articles/spark-write-csv-file-header 
> 
> 
> - This was my take on the previous author's original article, but it doesn't 
> yet handle the header row:
> 
> http://deploymentzone.com/2015/01/30/spark-and-merged-csv-files/ 
> 
> 
> spark-csv helps with reading CSV data and mapping a schema for Spark SQL, but 
> as of now doesn't save CSV data.
> 
> On Fri Feb 06 2015 at 9:49:06 AM Sean Owen  > wrote:
> You can do this manually without much trouble: get your files on a
> distributed store like HDFS, read them with textFile, filter out
> headers, parse with a CSV library like Commons CSV, select columns,
> format and store the result. That's tens of lines of code.
> 
> However you probably want to start by looking at
> https://github.com/databricks/spark-csv 
>  which may make it even easier
> than that and give you a richer query syntax.
> 
> On Fri, Feb 6, 2015 at 8:37 AM, Spico Florin  > wrote:
> > Hi!
> >   I'm new to Spark. I have a case study that where the data is store in CSV
> > files. These files have headers with morte than 1000 columns. I would like
> > to know what are the best practice to parsing them and in special the
> > following points:
> > 1. Getting and parsing all the files from a folder
> > 2. What CSV parser do you use?
> > 3. I would like to select just some columns whose names matches a pattern
> > and then pass the selected columns values (plus the column names) to the
> > processing and save the output to a CSV (preserving the selected columns).
> >
> > If you have any experience with some points above, it will be really helpful
> > (for me and for the others that will encounter the same cases) if you can
> > share your thoughts.
> > Thanks.
> >   Regards,
> >  Florin
> >
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 



Re: spark with cdh 5.2.1

2015-02-04 Thread Mohit Jaggi
yep...it was unnecessary to create a 2.5 profile. i struggled a bit because
it wasn't clear that i *need* to select a profile using -P option. i didn't
have to do that for earlier hadoop versions.

On Fri, Jan 30, 2015 at 12:11 AM, Sean Owen  wrote:

> There is no need for a 2.5 profile. The hadoop-2.4 profile is for
> Hadoop 2.4 and beyond. You can set the particular version you want
> with -Dhadoop.version=
>
> You do not need to make any new profile to compile vs 2.5.0-cdh5.2.1.
> Again, the hadoop-2.4 profile is what you need.
>
> On Thu, Jan 29, 2015 at 11:33 PM, Mohit Jaggi 
> wrote:
> > Hi All,
> > I noticed in pom.xml that there is no entry for Hadoop 2.5. Has anyone
> tried Spark with 2.5.0-cdh5.2.1? Will replicating the 2.4 entry be
> sufficient to make this work?
> >
> > Mohit.
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>


Re: spark with cdh 5.2.1

2015-01-29 Thread Mohit Jaggi
Thanks Nobuhiro. I have my own fork so I can’t use precompiled repos. I copy 
pasted the hadoop 2.4 profile in pom.xml and replaced 2.4 with 2.5. It seems to 
have worked, though I wouldn’t be sure until I run more tests.

> On Jan 29, 2015, at 5:41 PM, Nobuhiro Sue  wrote:
> 
> Mohit,
> 
> I'm using spark modules provided by Cloudera repos, it works fine.
> 
> Please add Cloudera maven repo, and specify dependencies with CDH
> version, like "spark-core_2.10-1.1.0-cdh5.2.1".
> 
> To add Cloudera maven repo, see:
> http://www.cloudera.com/content/cloudera/en/documentation/core/v5-2-x/topics/cdh_vd_cdh5_maven_repo.html
> 
> Nobuhiro Sue(PetaData inc.)
> 
> 2015-01-30 8:33 GMT+09:00 Mohit Jaggi :
>> Hi All,
>> I noticed in pom.xml that there is no entry for Hadoop 2.5. Has anyone tried 
>> Spark with 2.5.0-cdh5.2.1? Will replicating the 2.4 entry be sufficient to 
>> make this work?
>> 
>> Mohit.
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>> 
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark challenge: zip with next???

2015-01-29 Thread Mohit Jaggi
http://mail-archives.apache.org/mod_mbox/spark-user/201405.mbox/%3ccalrvtpkn65rolzbetc+ddk4o+yjm+tfaf5dz8eucpl-2yhy...@mail.gmail.com%3E
 


you can use the MLLib function or do the following (which is what I had done):

- in first pass over the data, using mapPartitionWithIndex, gather the first 
item in each partition. you can use collect (or aggregator) for this. “key” 
them by the partition index. at the end, you will have a map
   (partition index) --> first item
- in the second pass over the data, using mapPartitionWithIndex again, look at 
two (or in the general case N items at a time, you can use scala’s sliding 
iterator) items at a time and check the time difference(or any sliding window 
computation). To this mapParitition, pass the map created in previous step. You 
will need to use them to check the last item in this partition.

If you can tolerate a few inaccuracies then you can just do the second step. 
You will miss the “boundaries” of the partitions but it might be acceptable for 
your use case.



> On Jan 29, 2015, at 4:36 PM, Tobias Pfeiffer  wrote:
> 
> Hi,
> 
> On Fri, Jan 30, 2015 at 6:32 AM, Ganelin, Ilya  > wrote:
> Make a copy of your RDD with an extra entry in the beginning to offset. The 
> you can zip the two RDDs and run a map to generate an RDD of differences.
> 
> Does that work? I recently tried something to compute differences between 
> each entry and the next, so I did
>   val rdd1 = ... // null element + rdd
>   val rdd2 = ... // rdd + null element
> but got an error message about zip requiring data sizes in each partition to 
> match.
> 
> Tobias
> 



spark with cdh 5.2.1

2015-01-29 Thread Mohit Jaggi
Hi All,
I noticed in pom.xml that there is no entry for Hadoop 2.5. Has anyone tried 
Spark with 2.5.0-cdh5.2.1? Will replicating the 2.4 entry be sufficient to make 
this work?

Mohit.
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RDD.combineBy without intermediate (k,v) pair allocation

2015-01-29 Thread Mohit Jaggi
Francois,
RDD.aggregate() does not support aggregation by key. But, indeed, that is the 
kind of implementation I am looking for, one that does not allocate 
intermediate space for storing (K,V) pairs. When working with large datasets 
this type of intermediate memory allocation wrecks havoc with garbage 
collection, not to mention unnecessarily increases the working memory 
requirement of the program.

I wonder if someone has already noticed this and there is an effort underway to 
optimize this. If not, I will take a shot at adding this functionality.

Mohit.

> On Jan 27, 2015, at 1:52 PM, francois.garil...@typesafe.com wrote:
> 
> Have you looked at the `aggregate` function in the RDD API ? 
> 
> If your way of extracting the “key” (identifier) and “value” (payload) parts 
> of the RDD elements is uniform (a function), it’s unclear to me how this 
> would be more efficient that extracting key and value and then using combine, 
> however.
> 
> —
> FG
> 
> 
> On Tue, Jan 27, 2015 at 10:17 PM, Mohit Jaggi  <mailto:mohitja...@gmail.com>> wrote:
> 
> Hi All, 
> I have a use case where I have an RDD (not a k,v pair) where I want to do a 
> combineByKey() operation. I can do that by creating an intermediate RDD of 
> k,v pairs and using PairRDDFunctions.combineByKey(). However, I believe it 
> will be more efficient if I can avoid this intermediate RDD. Is there a way I 
> can do this by passing in a function that extracts the key, like in 
> RDD.groupBy()? [oops, RDD.groupBy seems to create the intermediate RDD 
> anyway, maybe a better implementation is possible for that too?] 
> If not, is it worth adding to the Spark API? 
> 
> Mohit. 
> - 
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 
> 



RDD.combineBy

2015-01-27 Thread Mohit Jaggi
Hi All,
I have a use case where I have an RDD (not a k,v pair) where I want to do a 
combineByKey() operation. I can do that by creating an intermediate RDD of k,v 
pairs and using PairRDDFunctions.combineByKey(). However, I believe it will be 
more efficient if I can avoid this intermediate RDD. Is there a way I can do 
this by passing in a function that extracts the key, like in RDD.groupBy()? 
[oops, RDD.groupBy seems to create the intermediate RDD anyway, maybe a better 
implementation is possible for that too?]
If not, is it worth adding to the Spark API?

Mohit.
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RDD Moving Average

2015-01-09 Thread Mohit Jaggi
Read this:
http://mail-archives.apache.org/mod_mbox/spark-user/201405.mbox/%3ccalrvtpkn65rolzbetc+ddk4o+yjm+tfaf5dz8eucpl-2yhy...@mail.gmail.com%3E
 






Re: Modifying an RDD in forEach

2014-12-06 Thread Mohit Jaggi
“ideal for iterative workloads” is a comparison to hadoop map-reduce. if you 
are happy with a single machine, by all means, do that.

scaling out may be useful when:
1) you want to finish the task faster by using more machines. this may not 
involve any additional cost if you are using utility computing like AWS. e.g. 
if the cost of 1 machine for 1 hour is the same as the cost of 60 machines for 
a minute, but you get your results 60 times faster
2) if you may have larger data. at some point you will run out of “vertical 
scaling” options or they will become prohibitively expensive [e.g. you had 
everything working for 100 million docs but then you got 10 more docs. now do 
you buy and install more DIMMs in your server?]
3) if you are using utility computing like AWS and there is a cliff drop in 
pricing for smaller machines [this is common]
4) what if you want to modify your algorithm and now it needs a few more bytes 
of memory? go to the store, buy DIMMs, and install in your server?

> On Dec 6, 2014, at 1:42 PM, Ron Ayoub  wrote:
> 
> These are very interesting comments. The vast majority of cases I'm working 
> on are going to be in the 3 million range and 100 million was thrown out as 
> something to shoot for. I upped it to 500 million. But all things 
> considering, I believe I may be able to directly translate what I have to 
> Java Streams API and run 100 million docs on 32 cores in under an hour or two 
> which would suit our needs. Up until this point I've been focused on 
> computational aspect 
> 
> If I can scale up to clustering 100 million documents on a single machine I 
> can probably directly translate what I have to Java Streams API and be 
> faster. It is that scaling out that changes things. These are interesting 
> comments. I think in this hierarchical k-means case the lazy evaluation 
> becomes almost useless and perhaps even an impediment. Part of the problem is 
> that I've been a bit too focused on math/information retrieval and have to 
> update a bit on functional approach to programming so I can better utilize 
> the tools But it does appear that Spark may not be the best option for this 
> need. I don't need resiliency or fault tolerance as much as I need to be able 
> to execute an algorithm on a large amount of data fast and then be done with 
> it. I'm now thinking that in the 100 million document range I may be ok 
> clustering feature vectors with no more than 25 features per doc on a single 
> machine with 32 cores and a load of memory. I might directly translate what I 
> have to Java 8 Streams API. 
> 
> There is also questions of proportion. Perhaps what I have is not big enough 
> to warrant or require scaling out. I may have other uses for Spark in 
> traditional map-reduce algorithms such as counting pairs of shared shingles 
> for near dupe detection but to this point I've found Oracles 
> parallel-pipelined table functions, while not glamorous are doing quite well 
> in DB. 
> 
> I'm just a bit confused still on why it is advertised ideal for iterative 
> algorithms when iterative algorithms have that point per iteration where 
> things do get evaluated and laziness is not terribly useful. Ideal for 
> massive in-memory cluster computing yes - but iterative... ? not sure. I have 
> that book "Functional Programming in Scala" and I hope to read it someday and 
> enrich my understanding here. 
> 
> Subject: Re: Modifying an RDD in forEach
> From: mohitja...@gmail.com
> Date: Sat, 6 Dec 2014 13:13:50 -0800
> CC: ronalday...@live.com; user@spark.apache.org
> To: mayur.rust...@gmail.com
> 
> Ron,
> “appears to be working” might be true when there are no failures. on large 
> datasets being processed on a large number of machines, failures of several 
> types(server, network, disk etc) can happen. At that time, Spark will not 
> “know” that you changed the RDD in-place and will use any version of any 
> partition of the RDD to be retried. Retries require idempotency and that is 
> difficult without immutability. I believe, this is one of the primary reasons 
> for making RDDs immutable in Spark (mutable isn't even an option worth 
> considering). In general mutating something in a distributed system is a hard 
> problem. It can be solved (e.g. in NoSQL or newSQL databases) but Spark is 
> not a transactional data store.
> 
> If you are building an iterative machine learning algorithm which usually 
> have a “reduce” step at the end of every iteration, then the lazy evaluation 
> is unlikely to be useful. On the other hand, if these intermediate RDDs stay 
> in the young generation of the JVM heap [I am not sure if RDD cache 
> management somehow changes this, so I could be wrong] they are garbage 
> collected quickly and with very little overhead.
> 
> This is the price of scaling out :-)
>   
> Hope this helps,
> Mohit.
> 
> On Dec 6, 2014, at 5:02 AM, Mayur Rustagi  > wrote:
> 
> You'll benefit by viewing Matei's talk in Yahoo

Re: Modifying an RDD in forEach

2014-12-06 Thread Mohit Jaggi
Ron,
“appears to be working” might be true when there are no failures. on large 
datasets being processed on a large number of machines, failures of several 
types(server, network, disk etc) can happen. At that time, Spark will not 
“know” that you changed the RDD in-place and will use any version of any 
partition of the RDD to be retried. Retries require idempotency and that is 
difficult without immutability. I believe, this is one of the primary reasons 
for making RDDs immutable in Spark (mutable isn't even an option worth 
considering). In general mutating something in a distributed system is a hard 
problem. It can be solved (e.g. in NoSQL or newSQL databases) but Spark is not 
a transactional data store.

If you are building an iterative machine learning algorithm which usually have 
a “reduce” step at the end of every iteration, then the lazy evaluation is 
unlikely to be useful. On the other hand, if these intermediate RDDs stay in 
the young generation of the JVM heap [I am not sure if RDD cache management 
somehow changes this, so I could be wrong] they are garbage collected quickly 
and with very little overhead.

This is the price of scaling out :-)

Hope this helps,
Mohit.

> On Dec 6, 2014, at 5:02 AM, Mayur Rustagi  wrote:
> 
> You'll benefit by viewing Matei's talk in Yahoo on Spark internals and how it 
> optimizes execution of iterative jobs.
> Simple answer is 
> 1. Spark doesn't materialize RDD when you do an iteration but lazily captures 
> the transformation functions in RDD.(only function and closure , no data 
> operation actually happens)
> 2. When you finally execute and want to cause effects (save to disk , collect 
> on master etc) it views the DAG of execution and optimizes what it can reason 
> (eliminating intermediate states , performing multiple Transformations in one 
> tasks, leveraging partitioning where available among others)
> Bottom line it doesn't matter how many RDD you have in your DAG chain as long 
> as Spark can optimize the functions in that DAG to create minimal 
> materialization on its way to final output. 
> 
> Regards
> Mayur
> On 06-Dec-2014 6:12 pm, "Ron Ayoub"  > wrote:
> This is from a separate thread with a differently named title. 
> 
> Why can't you modify the actual contents of an RDD using forEach? It appears 
> to be working for me. What I'm doing is changing cluster assignments and 
> distances per data item for each iteration of the clustering algorithm. The 
> clustering algorithm is massive and iterates thousands of times. As I 
> understand it now, you are supposed to create new RDDs on each pass. This is 
> a hierachical k-means that I'm doing and hence it is consist of many 
> iterations rather than large iterations.
> 
> So I understand the restriction of why operation when aggregating and 
> reducing etc, need to be associative. However, forEach operates on a single 
> item. So being that Spark is advertised as being great for iterative 
> algorithms since it operates in-memory, how can it be good to create 
> thousands upon thousands of RDDs during the course of an iterative algorithm? 
>  Does Spark have some kind of trick like reuse behind the scenes - fully 
> persistent data objects or whatever? How can it possibly be efficient for 
> 'iterative' algorithms when it is creating so many RDDs as opposed to one? 
> 
> Or is the answer that I should keep doing what I'm doing because it is 
> working even though it is not theoretically sound and aligned with functional 
> ideas. I personally just want it to be fast and be able to operate on up to 
> 500 million data items. 



Re: Bug in Accumulators...

2014-11-22 Thread Mohit Jaggi
perhaps the closure ends up including the "main" object which is not
defined as serializable...try making it a "case object" or "object main
extends Serializable".

On Sat, Nov 22, 2014 at 4:16 PM, lordjoe  wrote:

> I posted several examples in java at http://lordjoesoftware.blogspot.com/
>
> Generally code like this works and I show how to accumulate more complex
> values.
>
> // Make two accumulators using Statistics
>  final Accumulator totalLetters= ctx.accumulator(0L,
> "ttl");
>  JavaRDD lines = ...
>
> JavaRDD words = lines.flatMap(new FlatMapFunction String>() {
> @Override
> public Iterable call(final String s) throws Exception {
> // Handle accumulator here
> totalLetters.add(s.length()); // count all letters
> 
>  });
> 
>  Long numberCalls = totalCounts.value();
>
> I believe the mistake is to pass the accumulator to the function rather
> than
> letting the function find the accumulator - I do this in this case by using
> a final local variable
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Bug-in-Accumulators-tp17263p19579.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: MEMORY_ONLY_SER question

2014-11-12 Thread Mohit Jaggi
thanks jerry and tathagata. does anyone know how kryo compresses data? are
there any other serializers that work with spark and have good compression
for basic data types?

On Tue, Nov 4, 2014 at 10:29 PM, Shao, Saisai  wrote:

>  From my understanding, the Spark code use Kryo as a streaming manner for
> RDD partitions, the deserialization comes with iteration to move forward.
> But the internal thing of Kryo to deserialize all the object once or
> incrementally is actually a behavior of Kryo, I guess Kyro will not
> deserialize the objects once for all.
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Mohit Jaggi [mailto:mohitja...@gmail.com]
> *Sent:* Wednesday, November 05, 2014 2:01 PM
> *To:* Tathagata Das
> *Cc:* user@spark.apache.org
> *Subject:* Re: MEMORY_ONLY_SER question
>
>
>
> I used the word "streaming" but I did not mean to refer to spark
> streaming. I meant if a partition containing 10 objects was kryo-serialized
> into a single buffer, then in a mapPartitions() call, as I call iter.next()
> 10 times to access these objects one at a time, does the deserialization
> happen
>
> a) once to get all 10 objects,
>
> b) 10 times "incrementally" to get an object at a time, or
>
> c) 10 times to get 10 objects and discard the "wrong" 9 objects [ i doubt
> this would a design anyone would have adopted ]
>
> I think your answer is option (a) and you refered to Spark streaming to
> indicate that there is no difference in its behavior from spark
> core...right?
>
>
>
> If it is indeed option (a), I am happy with it and don't need to
> customize. If it is (b), I would like to have (a) instead.
>
>
>
> I am also wondering if kryo is good at compression of strings and numbers.
> Often I have the data type as "Double" but it could be encoded in much
> fewer bits.
>
>
>
>
>
>
>
> On Tue, Nov 4, 2014 at 1:02 PM, Tathagata Das 
> wrote:
>
>  It it deserialized in a streaming manner as the iterator moves over the
> partition. This is a functionality of core Spark, and Spark Streaming just
> uses it as is.
>
> What do you want to customize it to?
>
>
>
> On Tue, Nov 4, 2014 at 9:22 AM, Mohit Jaggi  wrote:
>
>  Folks,
>
> If I have an RDD persisted in MEMORY_ONLY_SER mode and then it is needed
> for a transformation/action later, is the whole partition of the RDD
> deserialized into Java objects first before my transform/action code works
> on it? Or is it deserialized in a streaming manner as the iterator moves
> over the partition? Is this behavior customizable? I generally use the Kryo
> serializer.
>
>
>
> Mohit.
>
>
>
>
>


Re: MEMORY_ONLY_SER question

2014-11-04 Thread Mohit Jaggi
I used the word "streaming" but I did not mean to refer to spark streaming.
I meant if a partition containing 10 objects was kryo-serialized into a
single buffer, then in a mapPartitions() call, as I call iter.next() 10
times to access these objects one at a time, does the deserialization happen
a) once to get all 10 objects,
b) 10 times "incrementally" to get an object at a time, or
c) 10 times to get 10 objects and discard the "wrong" 9 objects [ i doubt
this would a design anyone would have adopted ]
I think your answer is option (a) and you refered to Spark streaming to
indicate that there is no difference in its behavior from spark
core...right?

If it is indeed option (a), I am happy with it and don't need to customize.
If it is (b), I would like to have (a) instead.

I am also wondering if kryo is good at compression of strings and numbers.
Often I have the data type as "Double" but it could be encoded in much
fewer bits.



On Tue, Nov 4, 2014 at 1:02 PM, Tathagata Das 
wrote:

> It it deserialized in a streaming manner as the iterator moves over the
> partition. This is a functionality of core Spark, and Spark Streaming just
> uses it as is.
> What do you want to customize it to?
>
> On Tue, Nov 4, 2014 at 9:22 AM, Mohit Jaggi  wrote:
>
>> Folks,
>> If I have an RDD persisted in MEMORY_ONLY_SER mode and then it is needed
>> for a transformation/action later, is the whole partition of the RDD
>> deserialized into Java objects first before my transform/action code works
>> on it? Or is it deserialized in a streaming manner as the iterator moves
>> over the partition? Is this behavior customizable? I generally use the Kryo
>> serializer.
>>
>> Mohit.
>>
>
>


MEMORY_ONLY_SER question

2014-11-04 Thread Mohit Jaggi
Folks,
If I have an RDD persisted in MEMORY_ONLY_SER mode and then it is needed
for a transformation/action later, is the whole partition of the RDD
deserialized into Java objects first before my transform/action code works
on it? Or is it deserialized in a streaming manner as the iterator moves
over the partition? Is this behavior customizable? I generally use the Kryo
serializer.

Mohit.


Re: how to run a dev spark project without fully rebuilding the fat jar ?

2014-10-22 Thread Mohit Jaggi
i think you can give a list of jars - not just one - to spark-submit, so
build only the one that has changed source code.

On Wed, Oct 22, 2014 at 10:29 PM, Yang  wrote:

> during tests, I often modify my code a little bit  and want to see the
> result.
> but spark-submit requires the full fat-jar, which takes quite a lot of
> time to build.
>
> I just need to run in --master local mode. is there a way to run it
> without rebuilding the fat jar?
>
> thanks
> Yang
>


Re: scala 2.11?

2014-09-16 Thread Mohit Jaggi
Can I load that plugin in spark-shell? Or perhaps due the 2-phase
compilation quasiquotes won't work in shell?

On Mon, Sep 15, 2014 at 7:15 PM, Mark Hamstra 
wrote:

> Okay, that's consistent with what I was expecting.  Thanks, Matei.
>
> On Mon, Sep 15, 2014 at 5:20 PM, Matei Zaharia 
> wrote:
>
>> I think the current plan is to put it in 1.2.0, so that's what I meant by
>> "soon". It might be possible to backport it too, but I'd be hesitant to do
>> that as a maintenance release on 1.1.x and 1.0.x since it would require
>> nontrivial changes to the build that could break things on Scala 2.10.
>>
>> Matei
>>
>> On September 15, 2014 at 12:19:04 PM, Mark Hamstra (
>> m...@clearstorydata.com) wrote:
>>
>> Are we going to put 2.11 support into 1.1 or 1.0?  Else "will be in
>> soon" applies to the master development branch, but actually in the Spark
>> 1.2.0 release won't occur until the second half of November at the earliest.
>>
>> On Mon, Sep 15, 2014 at 12:11 PM, Matei Zaharia 
>> wrote:
>>
>>>  Scala 2.11 work is under way in open pull requests though, so
>>> hopefully it will be in soon.
>>>
>>>  Matei
>>>
>>> On September 15, 2014 at 9:48:42 AM, Mohit Jaggi (mohitja...@gmail.com)
>>> wrote:
>>>
>>>  ah...thanks!
>>>
>>> On Mon, Sep 15, 2014 at 9:47 AM, Mark Hamstra 
>>> wrote:
>>>
>>>> No, not yet.  Spark SQL is using org.scalamacros:quasiquotes_2.10.
>>>>
>>>> On Mon, Sep 15, 2014 at 9:28 AM, Mohit Jaggi 
>>>> wrote:
>>>>
>>>>> Folks,
>>>>> I understand Spark SQL uses quasiquotes. Does that mean Spark has now
>>>>> moved to Scala 2.11?
>>>>>
>>>>> Mohit.
>>>>>
>>>>
>>>>
>>>
>>
>


Re: File I/O in spark

2014-09-15 Thread Mohit Jaggi
If you underlying filesystem is HDFS, you need to use HDFS APIs. A google
search brought up this link which appears reasonable.

http://wiki.apache.org/hadoop/HadoopDfsReadWriteExample

If you want to use java.io APIs, you have to make sure your filesystem is
accessible from all nodes in your cluster. You did not mention what errors
you get with your code. They may mean something.


On Mon, Sep 15, 2014 at 9:51 AM, rapelly kartheek 
wrote:

> Can you please direct me to the right way of doing this.
>
> On Mon, Sep 15, 2014 at 10:18 PM, rapelly kartheek <
> kartheek.m...@gmail.com> wrote:
>
>> I came across these APIs in one the scala tutorials over the net.
>>
>> On Mon, Sep 15, 2014 at 10:14 PM, Mohit Jaggi 
>> wrote:
>>
>>> But the above APIs are not for HDFS.
>>>
>>> On Mon, Sep 15, 2014 at 9:40 AM, rapelly kartheek <
>>> kartheek.m...@gmail.com> wrote:
>>>
>>>> Yes. I have HDFS. My cluster has 5 nodes. When I run the above
>>>> commands, I see that the file gets created in the master node. But, there
>>>> wont be any data written to it.
>>>>
>>>>
>>>> On Mon, Sep 15, 2014 at 10:06 PM, Mohit Jaggi 
>>>> wrote:
>>>>
>>>>> Is this code running in an executor? You need to make sure the file is
>>>>> accessible on ALL executors. One way to do that is to use a distributed
>>>>> filesystem like HDFS or GlusterFS.
>>>>>
>>>>> On Mon, Sep 15, 2014 at 8:51 AM, rapelly kartheek <
>>>>> kartheek.m...@gmail.com> wrote:
>>>>>
>>>>>> Hi
>>>>>>
>>>>>> I am trying to perform some read/write file operations in spark.
>>>>>> Somehow I am neither able to write to a file nor read.
>>>>>>
>>>>>> import java.io._
>>>>>>
>>>>>>   val writer = new PrintWriter(new File("test.txt" ))
>>>>>>
>>>>>>   writer.write("Hello Scala")
>>>>>>
>>>>>>
>>>>>> Can someone please tell me how to perform file I/O in spark.
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: scala 2.11?

2014-09-15 Thread Mohit Jaggi
ah...thanks!

On Mon, Sep 15, 2014 at 9:47 AM, Mark Hamstra 
wrote:

> No, not yet.  Spark SQL is using org.scalamacros:quasiquotes_2.10.
>
> On Mon, Sep 15, 2014 at 9:28 AM, Mohit Jaggi  wrote:
>
>> Folks,
>> I understand Spark SQL uses quasiquotes. Does that mean Spark has now
>> moved to Scala 2.11?
>>
>> Mohit.
>>
>
>


Re: File I/O in spark

2014-09-15 Thread Mohit Jaggi
But the above APIs are not for HDFS.

On Mon, Sep 15, 2014 at 9:40 AM, rapelly kartheek 
wrote:

> Yes. I have HDFS. My cluster has 5 nodes. When I run the above commands, I
> see that the file gets created in the master node. But, there wont be any
> data written to it.
>
>
> On Mon, Sep 15, 2014 at 10:06 PM, Mohit Jaggi 
> wrote:
>
>> Is this code running in an executor? You need to make sure the file is
>> accessible on ALL executors. One way to do that is to use a distributed
>> filesystem like HDFS or GlusterFS.
>>
>> On Mon, Sep 15, 2014 at 8:51 AM, rapelly kartheek <
>> kartheek.m...@gmail.com> wrote:
>>
>>> Hi
>>>
>>> I am trying to perform some read/write file operations in spark. Somehow
>>> I am neither able to write to a file nor read.
>>>
>>> import java.io._
>>>
>>>   val writer = new PrintWriter(new File("test.txt" ))
>>>
>>>   writer.write("Hello Scala")
>>>
>>>
>>> Can someone please tell me how to perform file I/O in spark.
>>>
>>>
>>
>


Re: File I/O in spark

2014-09-15 Thread Mohit Jaggi
Is this code running in an executor? You need to make sure the file is
accessible on ALL executors. One way to do that is to use a distributed
filesystem like HDFS or GlusterFS.

On Mon, Sep 15, 2014 at 8:51 AM, rapelly kartheek 
wrote:

> Hi
>
> I am trying to perform some read/write file operations in spark. Somehow I
> am neither able to write to a file nor read.
>
> import java.io._
>
>   val writer = new PrintWriter(new File("test.txt" ))
>
>   writer.write("Hello Scala")
>
>
> Can someone please tell me how to perform file I/O in spark.
>
>


scala 2.11?

2014-09-15 Thread Mohit Jaggi
Folks,
I understand Spark SQL uses quasiquotes. Does that mean Spark has now moved
to Scala 2.11?

Mohit.


Re: sc.textFile problem due to newlines within a CSV record

2014-09-13 Thread Mohit Jaggi
Thanks Xiangrui. This file already exists w/o escapes. I could probably try
to preprocess it and add the escaping.

On Fri, Sep 12, 2014 at 9:38 PM, Xiangrui Meng  wrote:

> I wrote an input format for Redshift's tables unloaded UNLOAD the
> ESCAPE option: https://github.com/mengxr/redshift-input-format , which
> can recognize multi-line records.
>
> Redshift puts a backslash before any in-record `\\`, `\r`, `\n`, and
> the delimiter character. You can apply the same escaping before
> calling saveAsTextFIle, then use the input format to load them back.
>
> Xiangrui
>
> On Fri, Sep 12, 2014 at 7:43 PM, Mohit Jaggi  wrote:
> > Folks,
> > I think this might be due to the default TextInputFormat in Hadoop. Any
> > pointers to solutions much appreciated.
> >>>
> > More powerfully, you can define your own InputFormat implementations to
> > format the input to your programs however you want. For example, the
> default
> > TextInputFormat reads lines of text files. The key it emits for each
> record
> > is the byte offset of the line read (as a LongWritable), and the value is
> > the contents of the line up to the terminating '\n' character (as a Text
> > object). If you have multi-line records each separated by a $character,
> you
> > could write your own InputFormat that parses files into records split on
> > this character instead.
> >>>
> >
> > Thanks,
> > Mohit
>


sc.textFile problem due to newlines within a CSV record

2014-09-12 Thread Mohit Jaggi
Folks,
I think this might be due to the default TextInputFormat in Hadoop. Any
pointers to solutions much appreciated.
>>
More powerfully, you can define your own *InputFormat* implementations to
format the input to your programs however you want. For example, the
default TextInputFormat reads lines of text files. The key it emits for
each record is the byte offset of the line read (as a LongWritable), and
the value is the contents of the line up to the terminating '\n' character
(as a Text object). If you have multi-line records each separated by a
$character,
you could write your own *InputFormat* that parses files into records split
on this character instead.
>>

Thanks,
Mohit


slides from df talk at global big data conference

2014-09-12 Thread Mohit Jaggi
http://engineering.ayasdi.com/2014/09/11/df-dataframes-on-spark/


Re: efficient zipping of lots of RDDs

2014-09-11 Thread Mohit Jaggi
filed  jira SPARK-3489  <https://issues.apache.org/jira/browse/SPARK-3489>

On Thu, Sep 4, 2014 at 9:36 AM, Mohit Jaggi  wrote:

> Folks,
> I sent an email announcing
> https://github.com/AyasdiOpenSource/df
>
> This dataframe is basically a map of RDDs of columns(along with DSL
> sugar), as column based operations seem to be most common. But row
> operations are not uncommon. To get rows out of columns right now I zip the
> column RDDs together. I use RDD.zip then flatten the tuples I get. I
> realize that RDD.zipPartitions might be faster. However, I believe an even
> better approach should be possible. Surely we can have a zip method that
> can combine a large variable number of RDDs? Can that be added to
> Spark-core? Or is there an alternative equally good or better approach?
>
> Cheers,
> Mohit.
>


Re: pandas-like dataframe in spark

2014-09-04 Thread Mohit Jaggi
Thanks Matei. I will take a look at SchemaRDDs.


On Thu, Sep 4, 2014 at 11:24 AM, Matei Zaharia 
wrote:

> Hi Mohit,
>
> This looks pretty interesting, but just a note on the implementation -- it
> might be worthwhile to try doing this on top of Spark SQL SchemaRDDs. The
> reason is that SchemaRDDs already have an efficient in-memory
> representation (columnar storage), and can be read from a variety of data
> sources (JSON, Hive, soon things like CSV as well). Using the operators in
> Spark SQL you can also get really efficient code-generated operations on
> them. I know that stuff like zipping two data frames might become harder,
> but the overall benefit in performance could be substantial.
>
> Matei
>
> On September 4, 2014 at 9:28:12 AM, Mohit Jaggi (mohitja...@gmail.com)
> wrote:
>
> Folks,
> I have been working on a pandas-like dataframe DSL on top of spark. It is
> written in Scala and can be used from spark-shell. The APIs have the look
> and feel of pandas which is a wildly popular piece of software data
> scientists use. The goal is to let people familiar with pandas scale their
> efforts to larger datasets by using spark but not having to go through a
> steep learning curve for Spark and Scala.
> It is open sourced with Apache License and can be found here:
> https://github.com/AyasdiOpenSource/df
>
> I welcome your comments, suggestions and feedback. Any help in developing
> it further is much appreciated. I have the following items on the roadmap
> (and happy to change this based on your comments)
> - Python wrappers most likely in the same way as MLLib
> - Sliding window aggregations
> - Row indexing
> - Graphing/charting
> - Efficient row-based operations
> - Pretty printing of output on the spark-shell
> - Unit test completeness and automated nightly runs
>
> Mohit.
>
> P.S.: Thanks to my awesome employer Ayasdi <http://www.ayasdi.com> for
> open sourcing this software
>
> P.P.S.: I need some design advice on making row operations efficient and
> I'll start a new thread for that
>
>


Re: Object serialisation inside closures

2014-09-04 Thread Mohit Jaggi
I faced the same problem and ended up using the same approach that Sean
suggested
https://github.com/AyasdiOpenSource/df/blob/master/src/main/scala/com/ayasdi/df/DF.scala#L313

Option 3 also seems reasonable. It should create a CSVParser per executor.


On Thu, Sep 4, 2014 at 6:58 AM, Andrianasolo Fanilo <
fanilo.andrianas...@worldline.com> wrote:

> Thank you for the quick answer, looks good to me
>
> Though that brings me to another question. Suppose we want to open a
> connection to a database, an ElasticSearch, etc...
>
> I now have two proceedings :
> 1/ use .mapPartitions and setup the connection at the start of each
> partition, so I get a connection per partition
> 2/ use a singleton object, which loads a connection per executor if my
> understanding is correct
>
> I would have used the second possibility, so I don't create a new
> connection for a partition each time the partition fails to compute for
> whatever reason.  I also don't have a lot of connections in parallel
> because I have only one connection per worker. If I have 200 partitions in
> parallel, that makes 200 connections.
> But in the second case a partition could kill the connection on the worker
> during computation and because that connection is shared for all tasks of
> the executor, all partitions would fail. Also, only one connection object
> would have to manage 200 partitions trying to output to
> Elasticsearch/database/etc...that may be bad performance-wise.
>
> Can't see a case where second is preferable for now. Doesn't seem I could
> use that singleton object to share data within an executor sadly...
>
> Thanks for the input
> Fanilo
>
>
> -Message d'origine-
> De : Sean Owen [mailto:so...@cloudera.com]
> Envoyé : jeudi 4 septembre 2014 15:36
> À : Andrianasolo Fanilo
> Cc : user@spark.apache.org
> Objet : Re: Object serialisation inside closures
>
> In your original version, the object is referenced by the function but
> it's on the driver, and so has to be serialized. This leads to an error
> since it's not serializable. Instead, you want to recreate the object
> locally on each of the remote machines.
>
> In your third version you are holding the parser in a static member of a
> class, in your Scala object. When you call the parse method, you're calling
> it on the instance of the CSVParserPlus class that was loaded on the remote
> worker. It loads and creates its own copy of the parser.
>
> A maybe more compact solution is to use mapPartitions, and create the
> parser once at the start. This avoids needing this static / singleton
> pattern, but also means the parser is created only once per partition.
>
> On Thu, Sep 4, 2014 at 2:29 PM, Andrianasolo Fanilo <
> fanilo.andrianas...@worldline.com> wrote:
> > Hello Spark fellows J
> >
> >
> >
> > I’m a new user of Spark and Scala and have been using both for 6
> > months without too many problems.
> >
> > Here I’m looking for best practices for using non-serializable classes
> > inside closure. I’m using Spark-0.9.0-incubating here with Hadoop 2.2.
> >
> >
> >
> > Suppose I am using OpenCSV parser to parse an input file. So inside my
> > main
> > :
> >
> >
> >
> > val sc = new SparkContext("local[2]", "App")
> >
> > val heyRDD = sc.textFile("…")
> >
> >
> >
> > val csvparser = new CSVParser(';')
> >
> > val heyMap = heyRDD.map { line =>
> >
> >   val temp = csvparser.parseLine(line)
> >
> >   (temp(1), temp(4))
> >
> > }
> >
> >
> >
> >
> >
> > This gives me a java.io.NotSerializableException:
> > au.com.bytecode.opencsv.CSVParser, which seems reasonable.
> >
> >
> >
> > From here I could see 3 solutions :
> >
> > 1/ Extending CSVParser with Serialisable properties, which adds a lot
> > of boilerplate code if you ask me
> >
> > 2/ Using Kryo Serialization (still need to define a serializer)
> >
> > 3/ Creating an object with an instance of the class I want to use,
> > typically
> > :
> >
> >
> >
> > object CSVParserPlus {
> >
> >
> >
> >   val csvParser = new CSVParser(';')
> >
> >
> >
> >   def parse(line: String) = {
> >
> > csvParser.parseLine(line)
> >
> >   }
> >
> > }
> >
> >
> >
> >
> >
> > val heyMap = heyRDD.map { line =>
> >
> >   val temp = CSVParserPlus.parse(line)
> >
> >   (temp(1), temp(4))
> >
> > }
> >
> >
> >
> > Third solution works and I don’t get how, so I was wondering how
> > worked the closure system inside Spark to be able to serialize an
> > object with a non-serializable instance. How does that work ? Does it
> hinder performance ?
> > Is it a good solution ? How do you manage this problem ?
> >
> >
> >
> > Any input would be greatly appreciated
> >
> >
> >
> > Best regards,
> >
> > Fanilo
> >
> >
> > 
> >
> > Ce message et les pièces jointes sont confidentiels et réservés à
> > l'usage exclusif de ses destinataires. Il peut également être protégé
> > par le secret professionnel. Si vous recevez ce message par erreur,
> > merci d'en avertir immédiatement l'expéditeur et de le dé

Re: advice sought on spark/cassandra input development - scala or python?

2014-09-04 Thread Mohit Jaggi
Johnny,
Without knowing the domain of the problem it is hard to choose a
programming language. I would suggest you ask yourself the following
questions:
- What if your project depends on a lot of python libraries that don't have
Scala/Java counterparts? It is unlikely but possible.
- What if Python programmers are in good supply and Scala ones not as much?
- Do you need to rewrite a lot of code, is that feasible?
- Is the rest of your team willing to learn Scala?
- If you are processing streams in a long lived process, how does Python
perform?

Mohit.
P.S.: I end up choosing Scala more often than Python.


On Thu, Sep 4, 2014 at 8:03 AM, Johnny Kelsey  wrote:

> Hi guys,
>
> We're testing out a spark/cassandra cluster, & we're very impressed with
> what we've seen so far. However, I'd very much like some advice from the
> shiny brains on the mailing list.
>
> We have a large collection of python code that we're in the process of
> adapting to move into spark/cassandra, & I have some misgivings on using
> python for any further development.
>
> As a concrete example, we have a python class (part of a fairly large
> class library) which, as part of its constructor, also creates a record of
> itself in the cassandra key space. So we get an initialised class & a row
> in a table on the cluster. My problem is this: should we even be doing this?
>
> By this I mean, we could be facing an increasing number of transactions,
> which we (naturally) would like to process as quickly as possible. The
> input transactions themselves may well be routed to a number of processes,
> e.g. starting an agent, written to a log file, etc. So it seems wrong to be
> putting the 'INSERT ... INTO ...' code into the class instantiation: it
> would seem more sensible to split this into a bunch of different spark
> processes, with an input handler, database insertion, create new python
> object, update log file, all happening on the spark cluster, & all written
> as atomically as possible.
>
> But I think my reservations here are more fundamental. Is python the wrong
> choice for this sort of thing? Would it not be better to use scala?
> Shouldn't we be dividing these tasks into atomic processes which execute as
> rapidly as possible? What about streaming events to the cluster, wouldn't
> python be a bottleneck here rather than scala with its more robust support
> for multithreading?  Is streaming even supported in python?
>
> What do people think?
>
> Best regards,
>
> Johnny
>
> --
> Johnny Kelsey
> Chief Technology Officer
> *Semblent*
> *jkkel...@semblent.com *
>


efficient zipping of lots of RDDs

2014-09-04 Thread Mohit Jaggi
Folks,
I sent an email announcing
https://github.com/AyasdiOpenSource/df

This dataframe is basically a map of RDDs of columns(along with DSL sugar),
as column based operations seem to be most common. But row operations are
not uncommon. To get rows out of columns right now I zip the column RDDs
together. I use RDD.zip then flatten the tuples I get. I realize that
RDD.zipPartitions might be faster. However, I believe an even better
approach should be possible. Surely we can have a zip method that can
combine a large variable number of RDDs? Can that be added to Spark-core?
Or is there an alternative equally good or better approach?

Cheers,
Mohit.


pandas-like dataframe in spark

2014-09-04 Thread Mohit Jaggi
Folks,
I have been working on a pandas-like dataframe DSL on top of spark. It is
written in Scala and can be used from spark-shell. The APIs have the look
and feel of pandas which is a wildly popular piece of software data
scientists use. The goal is to let people familiar with pandas scale their
efforts to larger datasets by using spark but not having to go through a
steep learning curve for Spark and Scala.
It is open sourced with Apache License and can be found here:
https://github.com/AyasdiOpenSource/df

I welcome your comments, suggestions and feedback. Any help in developing
it further is much appreciated. I have the following items on the roadmap
(and happy to change this based on your comments)
- Python wrappers most likely in the same way as MLLib
- Sliding window aggregations
- Row indexing
- Graphing/charting
- Efficient row-based operations
- Pretty printing of output on the spark-shell
- Unit test completeness and automated nightly runs

Mohit.

P.S.: Thanks to my awesome employer Ayasdi  for open
sourcing this software

P.P.S.: I need some design advice on making row operations efficient and
I'll start a new thread for that


why classTag not typeTag?

2014-08-22 Thread Mohit Jaggi
Folks,
I am wondering why Spark uses ClassTag in RDD[T: ClassTag] instead of the
more functional TypeTag option.
I have some code that needs TypeTag functionality and I don't know if a
typeTag can be converted to a classTag.

Mohit.


kryo out of buffer exception

2014-08-16 Thread Mohit Jaggi
Hi All,
I was doing a groupBy and apparently some keys were very frequent making
the serializer fail with buffer overflow exception. I did not need a
groupBy so I switched to combineByKey in this case but would like to know
how to increase the kryo buffer sizes to avoid this error. I hope there is
a way to grow the buffers dynamically based on the size of the data.

Mohit.


closure issue - works in scalatest but not in spark-shell

2014-08-15 Thread Mohit Jaggi
Folks,

I wrote the following wrapper on top on combineByKey. The RDD is of
Array[Any] and I am extracting a field at a given index for combining.
There are two ways in which I tried this:

Option A: leave colIndex abstract in Aggregator class and define in derived
object Aggtor with value -1. It is set later in function myAggregate. Works
fine but I want to keep the API user unaware of colIndex.

Option B(shown in code below): Set colIndex to -1 in abstract class. Aggtor
does not mention it at all. It is set later in myAggregate.

Option B works from scalatest in Eclipse but runs into closure mishap in
scala-shell. I am looking for an explanation and a possible
solution/workaround. Appreciate any help!

Thanks,

Mohit.

-- API helper -

abstract class Aggregator[U] {

var colIndex: Int = -1

def convert(a: Array[Any]): U = {

a(colIndex).asInstanceOf[U]

}

def mergeValue(a: U, b: Array[Any]): U = {

aggregate(a, convert(b))

}

def mergeCombiners(x: U, y: U): U = {

aggregate(x, y)

}

def aggregate(p: U, q: U): U

}

-- API handler -

def myAggregate[U: ClassTag](...aggtor: Aggregator[U]) = {

aggtor.colIndex = 

keyBy(aggByCol).combineByKey(aggtor.convert, aggtor.mergeValue,
aggtor.mergeCombiners)

}


 call the API 

case object Aggtor extends Aggregator[List[String]] {

//var colIndex = -1

 def aggregate = 

}

myAggregate(...Aggtor)


spark-shell ip address issues

2014-08-02 Thread Mohit Jaggi
Folks,
When my MacBook's IP address changes spark-shell throws up (when I restart
it). It somehow remembers the old address. I worked around this by using
SPARK_LOCAL_IP=
Mohit

Welcome to

    __

 / __/__  ___ _/ /__

_\ \/ _ \/ _ `/ __/  '_/

   /___/ .__/\_,_/_/ /_/\_\   version 1.0.0-SNAPSHOT

  /_/


Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java
1.7.0_40)

Type in expressions to have them evaluated.

Type :help for more information.

14/08/02 21:46:35 WARN SparkConf:

SPARK_JAVA_OPTS was detected (set to '-Xmx10g').

This is deprecated in Spark 1.0+.


Please instead use:

 - ./spark-submit with conf/spark-defaults.conf to set defaults for an
application

 - ./spark-submit with --driver-java-options to set -X options for a driver

 - spark.executor.extraJavaOptions to set -X options for executors

 - SPARK_DAEMON_JAVA_OPTS to set java options for standalone daemons
(master or worker)



14/08/02 21:46:35 WARN SparkConf: Setting 'spark.executor.extraJavaOptions'
to '-Xmx10g' as a work-around.

14/08/02 21:46:35 WARN SparkConf: Setting 'spark.driver.extraJavaOptions'
to '-Xmx10g' as a work-around.

14/08/02 21:46:35 INFO SecurityManager: Changing view acls to: mohit

14/08/02 21:46:35 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(mohit)

14/08/02 21:46:36 INFO Slf4jLogger: Slf4jLogger started

14/08/02 21:46:36 INFO Remoting: Starting remoting

14/08/02 21:46:36 ERROR Remoting: Remoting error: [Startup failed] [

akka.remote.RemoteTransportException: Startup failed

at
akka.remote.Remoting.akka$remote$Remoting$$notifyError(Remoting.scala:129)

at akka.remote.Remoting.start(Remoting.scala:194)

at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)

at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:579)

at akka.actor.ActorSystemImpl._start(ActorSystem.scala:577)

at akka.actor.ActorSystemImpl.start(ActorSystem.scala:588)

at akka.actor.ActorSystem$.apply(ActorSystem.scala:111)

at akka.actor.ActorSystem$.apply(ActorSystem.scala:104)

at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:104)

at org.apache.spark.SparkEnv$.create(SparkEnv.scala:153)

at org.apache.spark.SparkContext.(SparkContext.scala:202)

at org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:957)

at $line3.$read$$iwC$$iwC.(:8)

at $line3.$read$$iwC.(:14)

at $line3.$read.(:16)

at $line3.$read$.(:20)

at $line3.$read$.()

at $line3.$eval$.(:7)

at $line3.$eval$.()

at $line3.$eval.$print()

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:788)

at
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1056)

at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:614)

at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:645)

at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:609)

at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:796)

at
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:841)

at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:753)

at
org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:121)

at
org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:120)

at org.apache.spark.repl.SparkIMain.beQuietDuring(SparkIMain.scala:263)

at
org.apache.spark.repl.SparkILoopInit$class.initializeSpark(SparkILoopInit.scala:120)

at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:56)

at
org.apache.spark.repl.SparkILoop$$anonfun$process$1$$anonfun$apply$mcZ$sp$5.apply$mcV$sp(SparkILoop.scala:913)

at
org.apache.spark.repl.SparkILoopInit$class.runThunks(SparkILoopInit.scala:142)

at org.apache.spark.repl.SparkILoop.runThunks(SparkILoop.scala:56)

at
org.apache.spark.repl.SparkILoopInit$class.postInitialization(SparkILoopInit.scala:104)

at org.apache.spark.repl.SparkILoop.postInitialization(SparkILoop.scala:56)

at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:930)

at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884)

at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884)

at
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)

at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:884)

at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:982)

at org.apache.spark.repl.Main$.main(Main.scala:31)

at org.apache.spark.repl.Main.main(Main.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native

sparkcontext stop and then start again

2014-07-25 Thread Mohit Jaggi
Folks,
I had some pyspark code which used to hang with no useful debug logs. It
got fixed when I changed my code to keep the sparkcontext forever instead
of stopping it and then creating another one later. Is this a bug or
expected behavior?

Mohit.


Re: pyspark sc.parallelize running OOM with smallish data

2014-07-14 Thread Mohit Jaggi
Continuing to debug with Scala, I tried this on local with enough memory
(10g) and it is able to count the dataset. With more memory(for executor
and driver) in a cluster it still fails. The data is about 2Gbytes. It is
30k * 4k doubles.


On Sat, Jul 12, 2014 at 6:31 PM, Aaron Davidson  wrote:

> I think this is probably dying on the driver itself, as you are probably
> materializing the whole dataset inside your python driver. How large is
> spark_data_array compared to your driver memory?
>
>
> On Fri, Jul 11, 2014 at 7:30 PM, Mohit Jaggi  wrote:
>
>> I put the same dataset into scala (using spark-shell) and it acts weird.
>> I cannot do a count on it, the executors seem to hang. The WebUI shows 0/96
>> in the status bar, shows details about the worker nodes but there is no
>> progress.
>> sc.parallelize does finish (takes too long for the data size) in scala.
>>
>>
>> On Fri, Jul 11, 2014 at 2:00 PM, Mohit Jaggi 
>> wrote:
>>
>>> spark_data_array here has about 35k rows with 4k columns. I have 4 nodes
>>> in the cluster and gave 48g to executors. also tried kyro serialization.
>>>
>>> traceback (most recent call last):
>>>
>>>   File "/mohit/./m.py", line 58, in 
>>>
>>> spark_data = sc.parallelize(spark_data_array)
>>>
>>>   File "/mohit/spark/python/pyspark/context.py", line 265, in parallelize
>>>
>>> jrdd = readRDDFromFile(self._jsc, tempFile.name, numSlices)
>>>
>>>   File
>>> "/mohit/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py", line
>>> 537, in __call__
>>>
>>>   File "/mohit/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py",
>>> line 300, in get_return_value
>>>
>>> py4j.protocol.Py4JJavaError: An error occurred while calling
>>> z:org.apache.spark.api.python.PythonRDD.readRDDFromFile.
>>>
>>> : java.lang.OutOfMemoryError: Java heap space
>>>
>>> at
>>> org.apache.spark.api.python.PythonRDD$.readRDDFromFile(PythonRDD.scala:279)
>>>
>>> at org.apache.spark.api.python.PythonRDD.readRDDFromFile(PythonRDD.scala)
>>>
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>
>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>>
>>> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
>>>
>>> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
>>>
>>> at py4j.Gateway.invoke(Gateway.java:259)
>>>
>>> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>>>
>>> at py4j.commands.CallCommand.execute(CallCommand.java:79)
>>>
>>> at py4j.GatewayConnection.run(GatewayConnection.java:207)
>>>
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>
>>
>


Re: pyspark sc.parallelize running OOM with smallish data

2014-07-11 Thread Mohit Jaggi
I put the same dataset into scala (using spark-shell) and it acts weird. I
cannot do a count on it, the executors seem to hang. The WebUI shows 0/96
in the status bar, shows details about the worker nodes but there is no
progress.
sc.parallelize does finish (takes too long for the data size) in scala.


On Fri, Jul 11, 2014 at 2:00 PM, Mohit Jaggi  wrote:

> spark_data_array here has about 35k rows with 4k columns. I have 4 nodes
> in the cluster and gave 48g to executors. also tried kyro serialization.
>
> traceback (most recent call last):
>
>   File "/mohit/./m.py", line 58, in 
>
> spark_data = sc.parallelize(spark_data_array)
>
>   File "/mohit/spark/python/pyspark/context.py", line 265, in parallelize
>
> jrdd = readRDDFromFile(self._jsc, tempFile.name, numSlices)
>
>   File "/mohit/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
> line 537, in __call__
>
>   File "/mohit/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py", line
> 300, in get_return_value
>
> py4j.protocol.Py4JJavaError: An error occurred while calling
> z:org.apache.spark.api.python.PythonRDD.readRDDFromFile.
>
> : java.lang.OutOfMemoryError: Java heap space
>
> at
> org.apache.spark.api.python.PythonRDD$.readRDDFromFile(PythonRDD.scala:279)
>
> at org.apache.spark.api.python.PythonRDD.readRDDFromFile(PythonRDD.scala)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:606)
>
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
>
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
>
> at py4j.Gateway.invoke(Gateway.java:259)
>
> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
>
> at py4j.GatewayConnection.run(GatewayConnection.java:207)
>
> at java.lang.Thread.run(Thread.java:745)
>


pyspark sc.parallelize running OOM with smallish data

2014-07-11 Thread Mohit Jaggi
spark_data_array here has about 35k rows with 4k columns. I have 4 nodes in
the cluster and gave 48g to executors. also tried kyro serialization.

traceback (most recent call last):

  File "/mohit/./m.py", line 58, in 

spark_data = sc.parallelize(spark_data_array)

  File "/mohit/spark/python/pyspark/context.py", line 265, in parallelize

jrdd = readRDDFromFile(self._jsc, tempFile.name, numSlices)

  File "/mohit/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
line 537, in __call__

  File "/mohit/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py", line
300, in get_return_value

py4j.protocol.Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.readRDDFromFile.

: java.lang.OutOfMemoryError: Java heap space

at
org.apache.spark.api.python.PythonRDD$.readRDDFromFile(PythonRDD.scala:279)

at org.apache.spark.api.python.PythonRDD.readRDDFromFile(PythonRDD.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)

at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)

at py4j.Gateway.invoke(Gateway.java:259)

at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

at py4j.commands.CallCommand.execute(CallCommand.java:79)

at py4j.GatewayConnection.run(GatewayConnection.java:207)

at java.lang.Thread.run(Thread.java:745)


Re: pyspark regression results way off

2014-06-25 Thread Mohit Jaggi
Is a python binding for LBFGS in the works? My co-worker has written one
and can contribute back if it helps.


On Mon, Jun 16, 2014 at 11:00 AM, DB Tsai  wrote:

> Is your data normalized? Sometimes, GD doesn't work well if the data
> has wide range. If you are willing to write scala code, you can try
> LBFGS optimizer which converges better than GD.
>
> Sincerely,
>
> DB Tsai
> ---
> My Blog: https://www.dbtsai.com
> LinkedIn: https://www.linkedin.com/in/dbtsai
>
>
> On Mon, Jun 16, 2014 at 8:14 AM, jamborta  wrote:
> > forgot to mention that I'm running spark 1.0
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-regression-results-way-off-tp7672p7673.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


kibana like frontend for spark

2014-06-20 Thread Mohit Jaggi
Folks,
I want to analyse logs and I want to use spark for that. However,
elasticsearch has a fancy frontend in Kibana. Kibana's docs indicate that
it works with elasticsearch only. Is there a similar frontend that can work
with spark?

Mohit.

P.S.: On MapR's spark FAQ I read a statement like "Kibana can use any
ODBC/JDBC backend and Shark has that interace"


Re: spark with docker: errors with akka, NAT?

2014-06-19 Thread Mohit Jaggi
Based on Jacob's suggestion, I started using --net=host which is a new
option in latest version of docker. I also set SPARK_LOCAL_IP to the host's
IP address and then AKKA does not use the hostname and I don't need the
Spark driver's hostname to be resolvable.

Thanks guys for your help!


On Tue, Jun 17, 2014 at 7:49 PM, Aaron Davidson  wrote:

> Yup, alright, same solution then :)
>
>
> On Tue, Jun 17, 2014 at 7:39 PM, Mohit Jaggi  wrote:
>
>> I used --privileged to start the container and then unmounted /etc/hosts.
>> Then I created a new /etc/hosts file
>>
>>
>> On Tue, Jun 17, 2014 at 4:58 PM, Aaron Davidson 
>> wrote:
>>
>>> I remember having to do a similar thing in the spark docker scripts for
>>> testing purposes. Were you able to modify the /etc/hosts directly? I
>>> remember issues with that as docker apparently mounts it as part of its
>>> read-only filesystem.
>>>
>>>
>>> On Tue, Jun 17, 2014 at 4:36 PM, Mohit Jaggi 
>>> wrote:
>>>
>>>> It was a DNS issue. AKKA apparently uses the hostname of the endpoints
>>>> and hence they need to be resolvable. In my case the hostname of the docker
>>>> container was a randomly generated string and was not resolvable. I added a
>>>> workaround (entry in etc/hosts file of spark master) for now. If anyone can
>>>> point to a more elegant solution, that would be awesome!
>>>>
>>>>
>>>> On Tue, Jun 17, 2014 at 3:48 PM, Mohit Jaggi 
>>>> wrote:
>>>>
>>>>> I am using cutting edge code from git but doing my own sbt assembly.
>>>>>
>>>>>
>>>>> On Mon, Jun 16, 2014 at 10:28 PM, Andre Schumacher <
>>>>> schum...@icsi.berkeley.edu> wrote:
>>>>>
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> are you using the amplab/spark-1.0.0 images from the global registry?
>>>>>>
>>>>>> Andre
>>>>>>
>>>>>> On 06/17/2014 01:36 AM, Mohit Jaggi wrote:
>>>>>> > Hi Folks,
>>>>>> >
>>>>>> > I am having trouble getting spark driver running in docker. If I
>>>>>> run a
>>>>>> > pyspark example on my mac it works but the same example on a docker
>>>>>> image
>>>>>> > (Via boot2docker) fails with following logs. I am pointing the
>>>>>> spark driver
>>>>>> > (which is running the example) to a spark cluster (driver is not
>>>>>> part of
>>>>>> > the cluster). I guess this has something to do with docker's
>>>>>> networking
>>>>>> > stack (it may be getting NAT'd) but I am not sure why (if at all)
>>>>>> the
>>>>>> > spark-worker or spark-master is trying to create a new TCP
>>>>>> connection to
>>>>>> > the driver, instead of responding on the connection initiated by
>>>>>> the driver.
>>>>>> >
>>>>>> > I would appreciate any help in figuring this out.
>>>>>> >
>>>>>> > Thanks,
>>>>>> >
>>>>>> > Mohit.
>>>>>> >
>>>>>> > logs
>>>>>> >
>>>>>> > Spark Executor Command: "java" "-cp"
>>>>>> >
>>>>>> "::/home/ayasdi/spark/conf:/home//spark/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.0.4.jar"
>>>>>> > "-Xms2g" "-Xmx2g" "-Xms512M" "-Xmx512M"
>>>>>> > "org.apache.spark.executor.CoarseGrainedExecutorBackend"
>>>>>> > "akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler"
>>>>>> "1"
>>>>>> > "cobalt" "24" "akka.tcp://sparkWorker@:33952/user/Worker"
>>>>>> > "app-20140616152201-0021"
>>>>>> >
>>>>>> > 
>>>>>> >
>>>>>> >
>>>>>> > log4j:WARN No appenders could be found for logger
>>>>>> > (org.apache.hadoop.conf.Configuration).
>>>>>> >
>>>>>> > log4j:WARN Please initialize the log4j system pr

Re: spark with docker: errors with akka, NAT?

2014-06-17 Thread Mohit Jaggi
I used --privileged to start the container and then unmounted /etc/hosts.
Then I created a new /etc/hosts file


On Tue, Jun 17, 2014 at 4:58 PM, Aaron Davidson  wrote:

> I remember having to do a similar thing in the spark docker scripts for
> testing purposes. Were you able to modify the /etc/hosts directly? I
> remember issues with that as docker apparently mounts it as part of its
> read-only filesystem.
>
>
> On Tue, Jun 17, 2014 at 4:36 PM, Mohit Jaggi  wrote:
>
>> It was a DNS issue. AKKA apparently uses the hostname of the endpoints
>> and hence they need to be resolvable. In my case the hostname of the docker
>> container was a randomly generated string and was not resolvable. I added a
>> workaround (entry in etc/hosts file of spark master) for now. If anyone can
>> point to a more elegant solution, that would be awesome!
>>
>>
>> On Tue, Jun 17, 2014 at 3:48 PM, Mohit Jaggi 
>> wrote:
>>
>>> I am using cutting edge code from git but doing my own sbt assembly.
>>>
>>>
>>> On Mon, Jun 16, 2014 at 10:28 PM, Andre Schumacher <
>>> schum...@icsi.berkeley.edu> wrote:
>>>
>>>>
>>>> Hi,
>>>>
>>>> are you using the amplab/spark-1.0.0 images from the global registry?
>>>>
>>>> Andre
>>>>
>>>> On 06/17/2014 01:36 AM, Mohit Jaggi wrote:
>>>> > Hi Folks,
>>>> >
>>>> > I am having trouble getting spark driver running in docker. If I run a
>>>> > pyspark example on my mac it works but the same example on a docker
>>>> image
>>>> > (Via boot2docker) fails with following logs. I am pointing the spark
>>>> driver
>>>> > (which is running the example) to a spark cluster (driver is not part
>>>> of
>>>> > the cluster). I guess this has something to do with docker's
>>>> networking
>>>> > stack (it may be getting NAT'd) but I am not sure why (if at all) the
>>>> > spark-worker or spark-master is trying to create a new TCP connection
>>>> to
>>>> > the driver, instead of responding on the connection initiated by the
>>>> driver.
>>>> >
>>>> > I would appreciate any help in figuring this out.
>>>> >
>>>> > Thanks,
>>>> >
>>>> > Mohit.
>>>> >
>>>> > logs
>>>> >
>>>> > Spark Executor Command: "java" "-cp"
>>>> >
>>>> "::/home/ayasdi/spark/conf:/home//spark/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.0.4.jar"
>>>> > "-Xms2g" "-Xmx2g" "-Xms512M" "-Xmx512M"
>>>> > "org.apache.spark.executor.CoarseGrainedExecutorBackend"
>>>> > "akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler" "1"
>>>> > "cobalt" "24" "akka.tcp://sparkWorker@:33952/user/Worker"
>>>> > "app-20140616152201-0021"
>>>> >
>>>> > 
>>>> >
>>>> >
>>>> > log4j:WARN No appenders could be found for logger
>>>> > (org.apache.hadoop.conf.Configuration).
>>>> >
>>>> > log4j:WARN Please initialize the log4j system properly.
>>>> >
>>>> > log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig
>>>> for
>>>> > more info.
>>>> >
>>>> > 14/06/16 15:22:05 INFO SparkHadoopUtil: Using Spark's default log4j
>>>> > profile: org/apache/spark/log4j-defaults.properties
>>>> >
>>>> > 14/06/16 15:22:05 INFO SecurityManager: Changing view acls to:
>>>> ayasdi,root
>>>> >
>>>> > 14/06/16 15:22:05 INFO SecurityManager: SecurityManager:
>>>> authentication
>>>> > disabled; ui acls disabled; users with view permissions: Set(xxx, xxx)
>>>> >
>>>> > 14/06/16 15:22:05 INFO Slf4jLogger: Slf4jLogger started
>>>> >
>>>> > 14/06/16 15:22:05 INFO Remoting: Starting remoting
>>>> >
>>>> > 14/06/16 15:22:06 INFO Remoting: Remoting started; listening on
>>>> addresses
>>>> > :[akka.tcp://sparkExecutor@:33536]
>>>> >
>>>> > 14/06/16 15:22:06 INFO Remoting: Remoting now listens on addresses:
>>>> > [akka.tcp://sparkExecutor@:33536]
>>>> >
>>>> > 14/06/16 15:22:06 INFO CoarseGrainedExecutorBackend: Connecting to
>>>> driver:
>>>> > akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler
>>>> >
>>>> > 14/06/16 15:22:06 INFO WorkerWatcher: Connecting to worker
>>>> > akka.tcp://sparkWorker@:33952/user/Worker
>>>> >
>>>> > 14/06/16 15:22:06 WARN Remoting: Tried to associate with unreachable
>>>> remote
>>>> > address [akka.tcp://spark@fc31887475e3:43921]. Address is now gated
>>>> for
>>>> > 6 ms, all messages to this address will be delivered to dead
>>>> letters.
>>>> >
>>>> > 14/06/16 15:22:06 ERROR CoarseGrainedExecutorBackend: Driver
>>>> Disassociated
>>>> > [akka.tcp://sparkExecutor@:33536] ->
>>>> [akka.tcp://spark@fc31887475e3:43921]
>>>> > disassociated! Shutting down.
>>>> >
>>>>
>>>>
>>>
>>
>


Re: spark with docker: errors with akka, NAT?

2014-06-17 Thread Mohit Jaggi
It was a DNS issue. AKKA apparently uses the hostname of the endpoints and
hence they need to be resolvable. In my case the hostname of the docker
container was a randomly generated string and was not resolvable. I added a
workaround (entry in etc/hosts file of spark master) for now. If anyone can
point to a more elegant solution, that would be awesome!


On Tue, Jun 17, 2014 at 3:48 PM, Mohit Jaggi  wrote:

> I am using cutting edge code from git but doing my own sbt assembly.
>
>
> On Mon, Jun 16, 2014 at 10:28 PM, Andre Schumacher <
> schum...@icsi.berkeley.edu> wrote:
>
>>
>> Hi,
>>
>> are you using the amplab/spark-1.0.0 images from the global registry?
>>
>> Andre
>>
>> On 06/17/2014 01:36 AM, Mohit Jaggi wrote:
>> > Hi Folks,
>> >
>> > I am having trouble getting spark driver running in docker. If I run a
>> > pyspark example on my mac it works but the same example on a docker
>> image
>> > (Via boot2docker) fails with following logs. I am pointing the spark
>> driver
>> > (which is running the example) to a spark cluster (driver is not part of
>> > the cluster). I guess this has something to do with docker's networking
>> > stack (it may be getting NAT'd) but I am not sure why (if at all) the
>> > spark-worker or spark-master is trying to create a new TCP connection to
>> > the driver, instead of responding on the connection initiated by the
>> driver.
>> >
>> > I would appreciate any help in figuring this out.
>> >
>> > Thanks,
>> >
>> > Mohit.
>> >
>> > logs
>> >
>> > Spark Executor Command: "java" "-cp"
>> >
>> "::/home/ayasdi/spark/conf:/home//spark/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.0.4.jar"
>> > "-Xms2g" "-Xmx2g" "-Xms512M" "-Xmx512M"
>> > "org.apache.spark.executor.CoarseGrainedExecutorBackend"
>> > "akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler" "1"
>> > "cobalt" "24" "akka.tcp://sparkWorker@:33952/user/Worker"
>> > "app-20140616152201-0021"
>> >
>> > 
>> >
>> >
>> > log4j:WARN No appenders could be found for logger
>> > (org.apache.hadoop.conf.Configuration).
>> >
>> > log4j:WARN Please initialize the log4j system properly.
>> >
>> > log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig
>> for
>> > more info.
>> >
>> > 14/06/16 15:22:05 INFO SparkHadoopUtil: Using Spark's default log4j
>> > profile: org/apache/spark/log4j-defaults.properties
>> >
>> > 14/06/16 15:22:05 INFO SecurityManager: Changing view acls to:
>> ayasdi,root
>> >
>> > 14/06/16 15:22:05 INFO SecurityManager: SecurityManager: authentication
>> > disabled; ui acls disabled; users with view permissions: Set(xxx, xxx)
>> >
>> > 14/06/16 15:22:05 INFO Slf4jLogger: Slf4jLogger started
>> >
>> > 14/06/16 15:22:05 INFO Remoting: Starting remoting
>> >
>> > 14/06/16 15:22:06 INFO Remoting: Remoting started; listening on
>> addresses
>> > :[akka.tcp://sparkExecutor@:33536]
>> >
>> > 14/06/16 15:22:06 INFO Remoting: Remoting now listens on addresses:
>> > [akka.tcp://sparkExecutor@:33536]
>> >
>> > 14/06/16 15:22:06 INFO CoarseGrainedExecutorBackend: Connecting to
>> driver:
>> > akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler
>> >
>> > 14/06/16 15:22:06 INFO WorkerWatcher: Connecting to worker
>> > akka.tcp://sparkWorker@:33952/user/Worker
>> >
>> > 14/06/16 15:22:06 WARN Remoting: Tried to associate with unreachable
>> remote
>> > address [akka.tcp://spark@fc31887475e3:43921]. Address is now gated for
>> > 6 ms, all messages to this address will be delivered to dead
>> letters.
>> >
>> > 14/06/16 15:22:06 ERROR CoarseGrainedExecutorBackend: Driver
>> Disassociated
>> > [akka.tcp://sparkExecutor@:33536] -> [akka.tcp://spark@fc31887475e3
>> :43921]
>> > disassociated! Shutting down.
>> >
>>
>>
>


Re: spark with docker: errors with akka, NAT?

2014-06-17 Thread Mohit Jaggi
I am using cutting edge code from git but doing my own sbt assembly.


On Mon, Jun 16, 2014 at 10:28 PM, Andre Schumacher <
schum...@icsi.berkeley.edu> wrote:

>
> Hi,
>
> are you using the amplab/spark-1.0.0 images from the global registry?
>
> Andre
>
> On 06/17/2014 01:36 AM, Mohit Jaggi wrote:
> > Hi Folks,
> >
> > I am having trouble getting spark driver running in docker. If I run a
> > pyspark example on my mac it works but the same example on a docker image
> > (Via boot2docker) fails with following logs. I am pointing the spark
> driver
> > (which is running the example) to a spark cluster (driver is not part of
> > the cluster). I guess this has something to do with docker's networking
> > stack (it may be getting NAT'd) but I am not sure why (if at all) the
> > spark-worker or spark-master is trying to create a new TCP connection to
> > the driver, instead of responding on the connection initiated by the
> driver.
> >
> > I would appreciate any help in figuring this out.
> >
> > Thanks,
> >
> > Mohit.
> >
> > logs
> >
> > Spark Executor Command: "java" "-cp"
> >
> "::/home/ayasdi/spark/conf:/home//spark/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.0.4.jar"
> > "-Xms2g" "-Xmx2g" "-Xms512M" "-Xmx512M"
> > "org.apache.spark.executor.CoarseGrainedExecutorBackend"
> > "akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler" "1"
> > "cobalt" "24" "akka.tcp://sparkWorker@:33952/user/Worker"
> > "app-20140616152201-0021"
> >
> > 
> >
> >
> > log4j:WARN No appenders could be found for logger
> > (org.apache.hadoop.conf.Configuration).
> >
> > log4j:WARN Please initialize the log4j system properly.
> >
> > log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
> > more info.
> >
> > 14/06/16 15:22:05 INFO SparkHadoopUtil: Using Spark's default log4j
> > profile: org/apache/spark/log4j-defaults.properties
> >
> > 14/06/16 15:22:05 INFO SecurityManager: Changing view acls to:
> ayasdi,root
> >
> > 14/06/16 15:22:05 INFO SecurityManager: SecurityManager: authentication
> > disabled; ui acls disabled; users with view permissions: Set(xxx, xxx)
> >
> > 14/06/16 15:22:05 INFO Slf4jLogger: Slf4jLogger started
> >
> > 14/06/16 15:22:05 INFO Remoting: Starting remoting
> >
> > 14/06/16 15:22:06 INFO Remoting: Remoting started; listening on addresses
> > :[akka.tcp://sparkExecutor@:33536]
> >
> > 14/06/16 15:22:06 INFO Remoting: Remoting now listens on addresses:
> > [akka.tcp://sparkExecutor@:33536]
> >
> > 14/06/16 15:22:06 INFO CoarseGrainedExecutorBackend: Connecting to
> driver:
> > akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler
> >
> > 14/06/16 15:22:06 INFO WorkerWatcher: Connecting to worker
> > akka.tcp://sparkWorker@:33952/user/Worker
> >
> > 14/06/16 15:22:06 WARN Remoting: Tried to associate with unreachable
> remote
> > address [akka.tcp://spark@fc31887475e3:43921]. Address is now gated for
> > 6 ms, all messages to this address will be delivered to dead letters.
> >
> > 14/06/16 15:22:06 ERROR CoarseGrainedExecutorBackend: Driver
> Disassociated
> > [akka.tcp://sparkExecutor@:33536] -> [akka.tcp://spark@fc31887475e3
> :43921]
> > disassociated! Shutting down.
> >
>
>


spark with docker: errors with akka, NAT?

2014-06-16 Thread Mohit Jaggi
Hi Folks,

I am having trouble getting spark driver running in docker. If I run a
pyspark example on my mac it works but the same example on a docker image
(Via boot2docker) fails with following logs. I am pointing the spark driver
(which is running the example) to a spark cluster (driver is not part of
the cluster). I guess this has something to do with docker's networking
stack (it may be getting NAT'd) but I am not sure why (if at all) the
spark-worker or spark-master is trying to create a new TCP connection to
the driver, instead of responding on the connection initiated by the driver.

I would appreciate any help in figuring this out.

Thanks,

Mohit.

logs

Spark Executor Command: "java" "-cp"
"::/home/ayasdi/spark/conf:/home//spark/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.0.4.jar"
"-Xms2g" "-Xmx2g" "-Xms512M" "-Xmx512M"
"org.apache.spark.executor.CoarseGrainedExecutorBackend"
"akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler" "1"
"cobalt" "24" "akka.tcp://sparkWorker@:33952/user/Worker"
"app-20140616152201-0021"




log4j:WARN No appenders could be found for logger
(org.apache.hadoop.conf.Configuration).

log4j:WARN Please initialize the log4j system properly.

log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
more info.

14/06/16 15:22:05 INFO SparkHadoopUtil: Using Spark's default log4j
profile: org/apache/spark/log4j-defaults.properties

14/06/16 15:22:05 INFO SecurityManager: Changing view acls to: ayasdi,root

14/06/16 15:22:05 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(xxx, xxx)

14/06/16 15:22:05 INFO Slf4jLogger: Slf4jLogger started

14/06/16 15:22:05 INFO Remoting: Starting remoting

14/06/16 15:22:06 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkExecutor@:33536]

14/06/16 15:22:06 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://sparkExecutor@:33536]

14/06/16 15:22:06 INFO CoarseGrainedExecutorBackend: Connecting to driver:
akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler

14/06/16 15:22:06 INFO WorkerWatcher: Connecting to worker
akka.tcp://sparkWorker@:33952/user/Worker

14/06/16 15:22:06 WARN Remoting: Tried to associate with unreachable remote
address [akka.tcp://spark@fc31887475e3:43921]. Address is now gated for
6 ms, all messages to this address will be delivered to dead letters.

14/06/16 15:22:06 ERROR CoarseGrainedExecutorBackend: Driver Disassociated
[akka.tcp://sparkExecutor@:33536] -> [akka.tcp://spark@fc31887475e3:43921]
disassociated! Shutting down.


Re: accessing partition i+1 from mapper of partition i

2014-05-22 Thread Mohit Jaggi
Austin,
I made up a mock example...my real use case is more complex. I used
foreach() instead of collect/cache..that forces the accumulable to be
evaluated. On another thread Xiangrui pointed me to a sliding window rdd in
mlllib that is a great alternative (although I did not switch to using it)

Mohit.


On Thu, May 22, 2014 at 2:30 PM, Austin Gibbons wrote:

> Mohit, if you want to end up with (1 .. N) , why don't you skip the logic
> for finding missing values, and generate it directly?
>
> val max = myCollection.reduce(math.max)
> sc.parallelize((0 until max))
>
> In either case, you don't need to call cache, which will force it into
> memory - you can do something like "count" which will not necessarily store
> the RDD in memory.
>
> Additionally, instead of an accumulable, you could consider mapping that
> value directly:
>
> rdd.mapPartitionWithIndex{case(index, partition) => index ->
> partition.reduce(math.max)}.collectAsMap()
>
>
> On Mon, May 19, 2014 at 9:50 PM, Mohit Jaggi  wrote:
>
>> Thanks Brian. This works. I used Accumulable to do the "collect" in step
>> B. While doing that I found that Accumulable.value is not a Spark "action",
>> I need to call "cache" in the underlying RDD for "value" to work. Not sure
>> if that is intentional or a bug.
>> The "collect" of Step B can be done as a new RDD too.
>>
>>
>> On Thu, May 15, 2014 at 5:47 PM, Brian Gawalt  wrote:
>>
>>> I don't think there's a direct way of bleeding elements across
>>> partitions. But you could write it yourself relatively succinctly:
>>>
>>> A) Sort the RDD
>>> B) Look at the sorted RDD's partitions with the
>>> .mapParititionsWithIndex( ) method. Map each partition to its partition ID,
>>> and its maximum element. Collect the (partID, maxElements) in the driver.
>>> C) Broadcast the collection of (partID, part's max element) tuples
>>> D) Look again at the sorted RDD's partitions with
>>> mapPartitionsWithIndex( ). For each partition *K:*
>>> D1) Find the immediately-preceding partition* K -1 , *and its
>>> associated maximum value. Use that to decide how many values are missing
>>> between the last element of part *K-1 *and the first element of part *K*
>>> .
>>> D2) Step through part *K*'s elements and find the rest of the missing
>>> elements in that part
>>>
>>> This approach sidesteps worries you might have over the hack of using
>>> .filter to remove the first element, as well as the zipping.
>>>
>>> --Brian
>>>
>>>
>>>
>>> On Tue, May 13, 2014 at 9:31 PM, Mohit Jaggi wrote:
>>>
>>>> Hi,
>>>> I am trying to find a way to fill in missing values in an RDD. The RDD
>>>> is a sorted sequence.
>>>> For example, (1, 2, 3, 5, 8, 11, ...)
>>>> I need to fill in the missing numbers and get (1,2,3,4,5,6,7,8,9,10,11)
>>>>
>>>> One way to do this is to "slide and zip"
>>>> rdd1 =  sc.parallelize(List(1, 2, 3, 5, 8, 11, ...))
>>>> x = rdd1.first
>>>> rdd2 = rdd1 filter (_ != x)
>>>> rdd3 = rdd2 zip rdd1
>>>> rdd4 = rdd3 flatmap { (x, y) => generate missing elements between x and
>>>> y }
>>>>
>>>> Another method which I think is more efficient is to use
>>>> mapParititions() on rdd1 to be able to iterate on elements of rdd1 in each
>>>> partition. However, that leaves the boundaries of the partitions to be
>>>> "unfilled". *Is there a way within the function passed to
>>>> mapPartitions, to read the first element in the next partition?*
>>>>
>>>> The latter approach also appears to work for a general "sliding window"
>>>> calculation on the RDD. The former technique requires a lot of "sliding and
>>>> zipping" and I believe it is not efficient. If only I could read the next
>>>> partition...I have tried passing a pointer to rdd1 to the function passed
>>>> to mapPartitions but the rdd1 pointer turns out to be NULL, I guess because
>>>> Spark cannot deal with a mapper calling another mapper (since it happens on
>>>> a worker not the driver)
>>>>
>>>> Mohit.
>>>>
>>>>
>>>
>>
>
>
> --
> . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
> Austin Gibbons
> Research | quantiFind <http://www.quantifind.com/> | 708 601 4894
> . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
>


Re: ExternalAppendOnlyMap: Spilling in-memory map

2014-05-22 Thread Mohit Jaggi
Andrew,
I did not register anything explicitly based on the belief that the class
name is written out in full only once. I also wondered why that problem
would be specific to JodaTime and not show up with Java.util.date...I guess
it is possible based on internals of Joda time.
If I remove DateTime from my RDD, the problem goes away.
I will try explicit registration(and add DateTime back to my RDD) and see
if that makes things better.

Mohit.




On Wed, May 21, 2014 at 8:36 PM, Andrew Ash  wrote:

> Hi Mohit,
>
> The log line about the ExternalAppendOnlyMap is more of a symptom of
> slowness than causing slowness itself.  The ExternalAppendOnlyMap is used
> when a shuffle is causing too much data to be held in memory.  Rather than
> OOM'ing, Spark writes the data out to disk in a sorted order and reads it
> back from disk later on when it's needed.  That's the job of the
> ExternalAppendOnlyMap.
>
> I wouldn't normally expect a conversion from Date to a Joda DateTime to
> take significantly more memory.  But since you're using Kryo and classes
> should be registered with it, may may have forgotten to register DateTime
> with Kryo.  If you don't register a class, it writes the class name at the
> beginning of every serialized instance, which for DateTime objects of size
> roughly 1 long, that's a ton of extra space and very inefficient.
>
> Can you confirm that DateTime is registered with Kryo?
>
> http://spark.apache.org/docs/latest/tuning.html#data-serialization
>
>
> On Wed, May 21, 2014 at 2:35 PM, Mohit Jaggi  wrote:
>
>> Hi,
>>
>> I changed my application to use Joda time instead of java.util.Date and I
>> started getting this:
>>
>> WARN ExternalAppendOnlyMap: Spilling in-memory map of 484 MB to disk (1
>> time so far)
>>
>> What does this mean? How can I fix this? Due to this a small job takes
>> forever.
>>
>> Mohit.
>>
>>
>> P.S.: I am using kyro serialization, have played around with several
>> values of sparkRddMemFraction
>>
>
>


ExternalAppendOnlyMap: Spilling in-memory map

2014-05-21 Thread Mohit Jaggi
Hi,

I changed my application to use Joda time instead of java.util.Date and I
started getting this:

WARN ExternalAppendOnlyMap: Spilling in-memory map of 484 MB to disk (1
time so far)

What does this mean? How can I fix this? Due to this a small job takes
forever.

Mohit.


P.S.: I am using kyro serialization, have played around with several values
of sparkRddMemFraction


Re: filling missing values in a sequence

2014-05-20 Thread Mohit Jaggi
Xiangrui,
Thanks for the pointer. I think it should work...for now I did cook up my
own which is similar but on top of spark core APIs. I would suggest moving
the sliding window RDD to the core spark library. It seems quite general to
me and a cursory look at the code indicates nothing specific to machine
learning.

Mohit.


On Mon, May 19, 2014 at 10:13 PM, Xiangrui Meng  wrote:

> Actually there is a sliding method implemented in
> mllib.rdd.RDDFunctions. Since this is not for general use cases, we
> didn't include it in spark-core. You can take a look at the
> implementation there and see whether it fits. -Xiangrui
>
> On Mon, May 19, 2014 at 10:06 PM, Mohit Jaggi 
> wrote:
> > Thanks Sean. Yes, your solution works :-) I did oversimplify my real
> > problem, which has other parameters that go along with the sequence.
> >
> >
> > On Fri, May 16, 2014 at 3:03 AM, Sean Owen  wrote:
> >>
> >> Not sure if this is feasible, but this literally does what I think you
> >> are describing:
> >>
> >> sc.parallelize(rdd1.first to rdd1.last)
> >>
> >> On Tue, May 13, 2014 at 4:56 PM, Mohit Jaggi 
> wrote:
> >> > Hi,
> >> > I am trying to find a way to fill in missing values in an RDD. The RDD
> >> > is a
> >> > sorted sequence.
> >> > For example, (1, 2, 3, 5, 8, 11, ...)
> >> > I need to fill in the missing numbers and get
> (1,2,3,4,5,6,7,8,9,10,11)
> >> >
> >> > One way to do this is to "slide and zip"
> >> > rdd1 =  sc.parallelize(List(1, 2, 3, 5, 8, 11, ...))
> >> > x = rdd1.first
> >> > rdd2 = rdd1 filter (_ != x)
> >> > rdd3 = rdd2 zip rdd1
> >> > rdd4 = rdd3 flatmap { (x, y) => generate missing elements between x
> and
> >> > y }
> >> >
> >> > Another method which I think is more efficient is to use
> >> > mapParititions() on
> >> > rdd1 to be able to iterate on elements of rdd1 in each partition.
> >> > However,
> >> > that leaves the boundaries of the partitions to be "unfilled". Is
> there
> >> > a
> >> > way within the function passed to mapPartitions, to read the first
> >> > element
> >> > in the next partition?
> >> >
> >> > The latter approach also appears to work for a general "sliding
> window"
> >> > calculation on the RDD. The former technique requires a lot of
> "sliding
> >> > and
> >> > zipping" and I believe it is not efficient. If only I could read the
> >> > next
> >> > partition...I have tried passing a pointer to rdd1 to the function
> >> > passed to
> >> > mapPartitions but the rdd1 pointer turns out to be NULL, I guess
> because
> >> > Spark cannot deal with a mapper calling another mapper (since it
> happens
> >> > on
> >> > a worker not the driver)
> >> >
> >> > Mohit.
> >
> >
>


Re: life if an executor

2014-05-19 Thread Mohit Jaggi
I guess it "needs" to be this way to benefit from caching of RDDs in
memory. It would be nice however if the RDD cache can be dissociated from
the JVM heap so that in cases where garbage collection is difficult to
tune, one could choose to discard the JVM and run the next operation in a
few one.


On Mon, May 19, 2014 at 10:06 PM, Matei Zaharia wrote:

> They’re tied to the SparkContext (application) that launched them.
>
> Matei
>
> On May 19, 2014, at 8:44 PM, Koert Kuipers  wrote:
>
> from looking at the source code i see executors run in their own jvm
> subprocesses.
>
> how long to they live for? as long as the worker/slave? or are they tied
> to the sparkcontext and life/die with it?
>
> thx
>
>
>


Re: filling missing values in a sequence

2014-05-19 Thread Mohit Jaggi
Thanks Sean. Yes, your solution works :-) I did oversimplify my real
problem, which has other parameters that go along with the sequence.


On Fri, May 16, 2014 at 3:03 AM, Sean Owen  wrote:

> Not sure if this is feasible, but this literally does what I think you
> are describing:
>
> sc.parallelize(rdd1.first to rdd1.last)
>
> On Tue, May 13, 2014 at 4:56 PM, Mohit Jaggi  wrote:
> > Hi,
> > I am trying to find a way to fill in missing values in an RDD. The RDD
> is a
> > sorted sequence.
> > For example, (1, 2, 3, 5, 8, 11, ...)
> > I need to fill in the missing numbers and get (1,2,3,4,5,6,7,8,9,10,11)
> >
> > One way to do this is to "slide and zip"
> > rdd1 =  sc.parallelize(List(1, 2, 3, 5, 8, 11, ...))
> > x = rdd1.first
> > rdd2 = rdd1 filter (_ != x)
> > rdd3 = rdd2 zip rdd1
> > rdd4 = rdd3 flatmap { (x, y) => generate missing elements between x and
> y }
> >
> > Another method which I think is more efficient is to use
> mapParititions() on
> > rdd1 to be able to iterate on elements of rdd1 in each partition.
> However,
> > that leaves the boundaries of the partitions to be "unfilled". Is there a
> > way within the function passed to mapPartitions, to read the first
> element
> > in the next partition?
> >
> > The latter approach also appears to work for a general "sliding window"
> > calculation on the RDD. The former technique requires a lot of "sliding
> and
> > zipping" and I believe it is not efficient. If only I could read the next
> > partition...I have tried passing a pointer to rdd1 to the function
> passed to
> > mapPartitions but the rdd1 pointer turns out to be NULL, I guess because
> > Spark cannot deal with a mapper calling another mapper (since it happens
> on
> > a worker not the driver)
> >
> > Mohit.
>


Re: accessing partition i+1 from mapper of partition i

2014-05-19 Thread Mohit Jaggi
Thanks Brian. This works. I used Accumulable to do the "collect" in step B.
While doing that I found that Accumulable.value is not a Spark "action", I
need to call "cache" in the underlying RDD for "value" to work. Not sure if
that is intentional or a bug.
The "collect" of Step B can be done as a new RDD too.


On Thu, May 15, 2014 at 5:47 PM, Brian Gawalt  wrote:

> I don't think there's a direct way of bleeding elements across partitions.
> But you could write it yourself relatively succinctly:
>
> A) Sort the RDD
> B) Look at the sorted RDD's partitions with the .mapParititionsWithIndex(
> ) method. Map each partition to its partition ID, and its maximum element.
> Collect the (partID, maxElements) in the driver.
> C) Broadcast the collection of (partID, part's max element) tuples
> D) Look again at the sorted RDD's partitions with mapPartitionsWithIndex(
> ). For each partition *K:*
> D1) Find the immediately-preceding partition* K -1 , *and its associated
> maximum value. Use that to decide how many values are missing between the
> last element of part *K-1 *and the first element of part *K*.
> D2) Step through part *K*'s elements and find the rest of the missing
> elements in that part
>
> This approach sidesteps worries you might have over the hack of using
> .filter to remove the first element, as well as the zipping.
>
> --Brian
>
>
>
> On Tue, May 13, 2014 at 9:31 PM, Mohit Jaggi  wrote:
>
>> Hi,
>> I am trying to find a way to fill in missing values in an RDD. The RDD is
>> a sorted sequence.
>> For example, (1, 2, 3, 5, 8, 11, ...)
>> I need to fill in the missing numbers and get (1,2,3,4,5,6,7,8,9,10,11)
>>
>> One way to do this is to "slide and zip"
>> rdd1 =  sc.parallelize(List(1, 2, 3, 5, 8, 11, ...))
>> x = rdd1.first
>> rdd2 = rdd1 filter (_ != x)
>> rdd3 = rdd2 zip rdd1
>> rdd4 = rdd3 flatmap { (x, y) => generate missing elements between x and y
>> }
>>
>> Another method which I think is more efficient is to use mapParititions()
>> on rdd1 to be able to iterate on elements of rdd1 in each partition.
>> However, that leaves the boundaries of the partitions to be "unfilled". *Is
>> there a way within the function passed to mapPartitions, to read the first
>> element in the next partition?*
>>
>> The latter approach also appears to work for a general "sliding window"
>> calculation on the RDD. The former technique requires a lot of "sliding and
>> zipping" and I believe it is not efficient. If only I could read the next
>> partition...I have tried passing a pointer to rdd1 to the function passed
>> to mapPartitions but the rdd1 pointer turns out to be NULL, I guess because
>> Spark cannot deal with a mapper calling another mapper (since it happens on
>> a worker not the driver)
>>
>> Mohit.
>>
>>
>


filling missing values in a sequence

2014-05-15 Thread Mohit Jaggi
Hi,
I am trying to find a way to fill in missing values in an RDD. The RDD is a
sorted sequence.
For example, (1, 2, 3, 5, 8, 11, ...)
I need to fill in the missing numbers and get (1,2,3,4,5,6,7,8,9,10,11)

One way to do this is to "slide and zip"
rdd1 =  sc.parallelize(List(1, 2, 3, 5, 8, 11, ...))
x = rdd1.first
rdd2 = rdd1 filter (_ != x)
rdd3 = rdd2 zip rdd1
rdd4 = rdd3 flatmap { (x, y) => generate missing elements between x and y }

Another method which I think is more efficient is to use mapParititions()
on rdd1 to be able to iterate on elements of rdd1 in each partition.
However, that leaves the boundaries of the partitions to be "unfilled". Is
there a way within the function passed to mapPartitions, to read the first
element in the next partition?

The latter approach also appears to work for a general "sliding window"
calculation on the RDD. The former technique requires a lot of "sliding and
zipping" and I believe it is not efficient. If only I could read the next
partition...I have tried passing a pointer to rdd1 to the function passed
to mapPartitions but the rdd1 pointer turns out to be NULL, I guess because
Spark cannot deal with a mapper calling another mapper (since it happens on
a worker not the driver)

Mohit.


accessing partition i+1 from mapper of partition i

2014-05-14 Thread Mohit Jaggi
Hi,
I am trying to find a way to fill in missing values in an RDD. The RDD is a
sorted sequence.
For example, (1, 2, 3, 5, 8, 11, ...)
I need to fill in the missing numbers and get (1,2,3,4,5,6,7,8,9,10,11)

One way to do this is to "slide and zip"
rdd1 =  sc.parallelize(List(1, 2, 3, 5, 8, 11, ...))
x = rdd1.first
rdd2 = rdd1 filter (_ != x)
rdd3 = rdd2 zip rdd1
rdd4 = rdd3 flatmap { (x, y) => generate missing elements between x and y }

Another method which I think is more efficient is to use mapParititions()
on rdd1 to be able to iterate on elements of rdd1 in each partition.
However, that leaves the boundaries of the partitions to be "unfilled". *Is
there a way within the function passed to mapPartitions, to read the first
element in the next partition?*

The latter approach also appears to work for a general "sliding window"
calculation on the RDD. The former technique requires a lot of "sliding and
zipping" and I believe it is not efficient. If only I could read the next
partition...I have tried passing a pointer to rdd1 to the function passed
to mapPartitions but the rdd1 pointer turns out to be NULL, I guess because
Spark cannot deal with a mapper calling another mapper (since it happens on
a worker not the driver)

Mohit.


rdd ordering gets scrambled

2014-04-29 Thread Mohit Jaggi
Hi,
I started with a text file(CSV) of sorted data (by first column), parsed it
into Scala objects using map operation in Scala. Then I used more maps to
add some extra info to the data and saved it as text file.
The final text file is not sorted. What do I need to do to keep the order
from the original input intact?

My code looks like:

csvFile = sc.textFile(..) //file is CSV and ordered by first column
splitRdd = csvFile map { line => line.split(",",-1) }
parsedRdd = rdd map { parts =>
  {
key = parts(0) //use first column as key
value = new MyObject(parts(0), parts(1)) //parse into scala objects
(key, value)
  }

augmentedRdd = parsedRdd map { x =>
   key =  x._1
   value = //add extra fields to x._2
   (key, value)
}
augmentedRdd.saveAsFile(...) //this file is not sorted

Mohit.


spark mllib to jblas calls..and comparison with VW

2014-04-24 Thread Mohit Jaggi
Folks,
I am wondering how mllib interacts with jblas and lapack. Does it make
copies of data from my RDD format to jblas's format? Does jblas copy it
again before passing to lapack native code?

I also saw some comparisons with VW and it seems mllib is slower on a
single node but scales better and outperforms VW on 16 nodes. Any idea why?
Are improvements in the pipeline?

Mohit.


Re: error in mllib lr example code

2014-04-24 Thread Mohit Jaggi
Thanks Xiangrui, Matei and Arpit. It does work fine after adding
Vector.dense. I have a follow up question, I will post on a new thread.


On Thu, Apr 24, 2014 at 2:49 AM, Arpit Tak wrote:

> Also try out these examples, all of them works
>
> http://docs.sigmoidanalytics.com/index.php/MLlib
>
> if you spot any problems in those, let us know.
>
> Regards,
> arpit
>
>
> On Wed, Apr 23, 2014 at 11:08 PM, Matei Zaharia 
> wrote:
>
>> See http://people.csail.mit.edu/matei/spark-unified-docs/ for a more
>> recent build of the docs; if you spot any problems in those, let us know.
>>
>> Matei
>>
>> On Apr 23, 2014, at 9:49 AM, Xiangrui Meng  wrote:
>>
>> > The doc is for 0.9.1. You are running a later snapshot, which added
>> > sparse vectors. Try LabeledPoint(parts(0).toDouble,
>> > Vectors.dense(parts(1).split(' ').map(x => x.toDouble)). The examples
>> > are updated in the master branch. You can also check the examples
>> > there. -Xiangrui
>> >
>> > On Wed, Apr 23, 2014 at 9:34 AM, Mohit Jaggi 
>> wrote:
>> >>
>> >> sorry...added a subject now
>> >>
>> >> On Wed, Apr 23, 2014 at 9:32 AM, Mohit Jaggi 
>> wrote:
>> >>>
>> >>> I am trying to run the example linear regression code from
>> >>>
>> >>> http://spark.apache.org/docs/latest/mllib-guide.html
>> >>>
>> >>> But I am getting the following error...am I missing an import?
>> >>>
>> >>> code
>> >>>
>> >>> import org.apache.spark._
>> >>>
>> >>> import org.apache.spark.mllib.regression.LinearRegressionWithSGD
>> >>>
>> >>> import org.apache.spark.mllib.regression.LabeledPoint
>> >>>
>> >>>
>> >>> object ModelLR {
>> >>>
>> >>>  def main(args: Array[String]) {
>> >>>
>> >>>val sc = new SparkContext(args(0), "SparkLR",
>> >>>
>> >>>  System.getenv("SPARK_HOME"),
>> >>> SparkContext.jarOfClass(this.getClass).toSeq)
>> >>>
>> >>> // Load and parse the data
>> >>>
>> >>> val data = sc.textFile("mllib/data/ridge-data/lpsa.data")
>> >>>
>> >>> val parsedData = data.map { line =>
>> >>>
>> >>>  val parts = line.split(',')
>> >>>
>> >>>  LabeledPoint(parts(0).toDouble, parts(1).split(' ').map(x =>
>> >>> x.toDouble).toArray)
>> >>>
>> >>> }
>> >>>
>> >>> ..
>> >>>
>> >>> }
>> >>>
>> >>> error
>> >>>
>> >>> - polymorphic expression cannot be instantiated to expected type;
>> found :
>> >>> [U >: Double]Array[U] required:
>> >>>
>> >>> org.apache.spark.mllib.linalg.Vector
>> >>>
>> >>> - polymorphic expression cannot be instantiated to expected type;
>> found :
>> >>> [U >: Double]Array[U] required:
>> >>>
>> >>> org.apache.spark.mllib.linalg.Vector
>> >>
>> >>
>>
>>
>


error in mllib lr example code

2014-04-23 Thread Mohit Jaggi
sorry...added a subject now

On Wed, Apr 23, 2014 at 9:32 AM, Mohit Jaggi  wrote:

> I am trying to run the example linear regression code from
>
>  http://spark.apache.org/docs/latest/mllib-guide.html
>
> But I am getting the following error...am I missing an import?
>
> code
>
> import org.apache.spark._
>
> import org.apache.spark.mllib.regression.LinearRegressionWithSGD
>
> import org.apache.spark.mllib.regression.LabeledPoint
>
>
> object ModelLR {
>
>   def main(args: Array[String]) {
>
> val sc = new SparkContext(args(0), "SparkLR",
>
>   System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass)
> .toSeq)
>
> // Load and parse the data
>
> val data = sc.textFile("mllib/data/ridge-data/lpsa.data")
>
> val parsedData = data.map { line =>
>
>   val parts = line.split(',')
>
>   LabeledPoint(parts(0).toDouble, parts(1).split(' ').map(x =>
> x.toDouble).toArray)
>
>  }
>
> ..
>
> }
>
> error
>
> - polymorphic expression cannot be instantiated to expected type; found :
> [U >: Double]Array[U] required:
>
>  org.apache.spark.mllib.linalg.Vector
>
> - polymorphic expression cannot be instantiated to expected type; found :
> [U >: Double]Array[U] required:
>
>  org.apache.spark.mllib.linalg.Vector
>


[no subject]

2014-04-23 Thread Mohit Jaggi
I am trying to run the example linear regression code from

 http://spark.apache.org/docs/latest/mllib-guide.html

But I am getting the following error...am I missing an import?

code

import org.apache.spark._

import org.apache.spark.mllib.regression.LinearRegressionWithSGD

import org.apache.spark.mllib.regression.LabeledPoint


object ModelLR {

  def main(args: Array[String]) {

val sc = new SparkContext(args(0), "SparkLR",

  System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass)
.toSeq)

// Load and parse the data

val data = sc.textFile("mllib/data/ridge-data/lpsa.data")

val parsedData = data.map { line =>

  val parts = line.split(',')

  LabeledPoint(parts(0).toDouble, parts(1).split(' ').map(x =>
x.toDouble).toArray)

 }

..

}

error

- polymorphic expression cannot be instantiated to expected type; found :
[U >: Double]Array[U] required:

 org.apache.spark.mllib.linalg.Vector

- polymorphic expression cannot be instantiated to expected type; found :
[U >: Double]Array[U] required:

 org.apache.spark.mllib.linalg.Vector


scheduler question

2014-04-15 Thread Mohit Jaggi
Hi Folks,
I have some questions about how Spark scheduler works:
- How does Spark know how many resources a job might need?
- How does it fairly share resources between multiple jobs?
- Does it "know" about data and partition sizes and use that information
for scheduling?

Mohit.