Re: Can anyone offer any insight at all?

2014-03-08 Thread Alan Burlison

On 07/03/2014 19:08, Ognen Duzlevski wrote:


I have had the most awful time figuring out these "looped" things. It
seems like it is next to impossible to run a .filter() operation in a
for loop, it seems to work if you yield .filter()


The equivalent of a filter in a for statement is an 'if'. Scala desugars 
for comprehensions into the equivalent sequence of map, flatMap and 
withFilter invocations, section 23.4 in "Programming with Scala" has a 
very good explanation of how the mapping is done.


$ scala -Xprint:parser -e 'println(for(i <- Seq(1,2,3,4) if (i % 2) == 
0; j <- Seq(10,11) if j <= 10) yield i * j)'
[[syntax trees at end ofparser]] // 
scalacmd2816067526771161181.scala

package  {
  object Main extends scala.AnyRef {
def () = {
  super.();
  ()
};
def main(argv: Array[String]): scala.Unit = {
  val args = argv;
  {
final class $anon extends scala.AnyRef {
  def () = {
super.();
()
  };
  println(Seq(1, 2, 3, 4).withFilter(((i) => 
i.$percent(2).$eq$eq(0))).flatMap(((i) => Seq(10, 11).withFilter(((j) => 
j.$less$eq(10))).map(((j) => i.$times(j))

};
new $anon()
  }
}
  }
}

List(20, 40)
$

--
Alan Burlison
--


Re: Can anyone offer any insight at all?

2014-03-07 Thread Ognen Duzlevski

No.

It was a logical error.

val ev1rdd = f.filter(_.split(",")(0).split(":")(1).replace("\"","") == 
event1).map(line => 
(line.split(",")(2).split(":")(1).replace("\"",""),1)).cache should have 
mapped to ,0, not ,1


I have had the most awful time figuring out these "looped" things. It 
seems like it is next to impossible to run a .filter() operation in a 
for loop, it seems to work if you yield .filter()


Still don't understand why that is...

Ognen

On 3/7/14, 1:05 PM, Mayur Rustagi wrote:

the issue was with print?
printing on worker?

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Fri, Mar 7, 2014 at 10:43 AM, Ognen Duzlevski 
mailto:og...@plainvanillagames.com>> wrote:


Strike that. Figured it out. Don't you just hate it when you fire
off an email and you figure it out as it is being sent? ;)
Ognen


On 3/7/14, 12:41 PM, Ognen Duzlevski wrote:

What is wrong with this code?

A condensed set of this code works in the spark-shell.

It does not work when deployed via a jar.

def

calcSimpleRetention(start:String,end:String,event1:String,event2:String):List[Double]
= {
val spd = new PipelineDate(start)
val epd = new PipelineDate(end)
// filter for event1 events and return RDDs that are maps
of user_ids and 0
val f = sc.textFile(spd.toJsonHdfsFileName)
val ev1rdd =
f.filter(_.split(",")(0).split(":")(1).replace("\"","") ==
event1).map(line =>
(line.split(",")(2).split(":")(1).replace("\"",""),1)).cache
val ev1c = ev1rdd.count.toDouble

// do the same as above for event2 events, only substitute
0s with 1s
val ev2rdds = for {
   dt <- PipelineDate.getPeriod(spd+1,epd)
   val f1 = sc.textFile(dt.toJsonHdfsFileName)
} yield
(f1.filter(_.split(",")(0).split(":")(1).replace("\"","") ==
event2).map(line =>
(line.split(",")(2).split(":")(1).replace("\"",""),1)).distinct)

// cache all event1 and event2 RDDs
ev2rdds.foreach(_.cache)
val cts = for {
  ev2 <- ev2rdds
} yield ev2.count

val retent = for {
  ev2rdd <- ev2rdds
  val ret = ev1rdd.union(ev2rdd).groupByKey()
} yield ret.filter(e => e._2.length > 1 &&
e._2.filter(_==0).length>0)

val rcts = retent.map(_.count)

println("--")

println(s"${rcts}")
println(s"${cts}")

for {
  c <- rcts
} yield(ev1c/c.toDouble)
//Map(result:_*)
  }

This is what this code prints:
List(0, 0)
List(785912, 825254)
List(Infinity, Infinity)

My question is: it does not appear that the
union().groupBy().filter() segment is working (the
List(0,0) output). The app is not failing, it finishes just fine.

Any ideas?
Ognen


-- 
Some people, when confronted with a problem, think "I know, I'll

use regular expressions." Now they have two problems.
-- Jamie Zawinski




--
Some people, when confronted with a problem, think "I know, I'll use regular 
expressions." Now they have two problems.
-- Jamie Zawinski



Re: Can anyone offer any insight at all?

2014-03-07 Thread Mayur Rustagi
the issue was with print?
printing on worker?

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Fri, Mar 7, 2014 at 10:43 AM, Ognen Duzlevski <
og...@plainvanillagames.com> wrote:

> Strike that. Figured it out. Don't you just hate it when you fire off an
> email and you figure it out as it is being sent? ;)
> Ognen
>
>
> On 3/7/14, 12:41 PM, Ognen Duzlevski wrote:
>
>> What is wrong with this code?
>>
>> A condensed set of this code works in the spark-shell.
>>
>> It does not work when deployed via a jar.
>>
>> def calcSimpleRetention(start:String,end:String,event1:
>> String,event2:String):List[Double] = {
>> val spd = new PipelineDate(start)
>> val epd = new PipelineDate(end)
>> // filter for event1 events and return RDDs that are maps of user_ids
>> and 0
>> val f = sc.textFile(spd.toJsonHdfsFileName)
>> val ev1rdd = f.filter(_.split(",")(0).split(":")(1).replace("\"","")
>> == event1).map(line => (line.split(",")(2).split(":")
>> (1).replace("\"",""),1)).cache
>> val ev1c = ev1rdd.count.toDouble
>>
>> // do the same as above for event2 events, only substitute 0s with 1s
>> val ev2rdds = for {
>>dt <- PipelineDate.getPeriod(spd+1,epd)
>>val f1 = sc.textFile(dt.toJsonHdfsFileName)
>> } yield (f1.filter(_.split(",")(0).split(":")(1).replace("\"","") ==
>> event2).map(line => (line.split(",")(2).split(":")
>> (1).replace("\"",""),1)).distinct)
>>
>> // cache all event1 and event2 RDDs
>> ev2rdds.foreach(_.cache)
>> val cts = for {
>>   ev2 <- ev2rdds
>> } yield ev2.count
>>
>> val retent = for {
>>   ev2rdd <- ev2rdds
>>   val ret = ev1rdd.union(ev2rdd).groupByKey()
>> } yield ret.filter(e => e._2.length > 1 && e._2.filter(_==0).length>0)
>>
>> val rcts = retent.map(_.count)
>> println("--")
>>
>> println(s"${rcts}")
>> println(s"${cts}")
>>
>> for {
>>   c <- rcts
>> } yield(ev1c/c.toDouble)
>> //Map(result:_*)
>>   }
>>
>> This is what this code prints:
>> List(0, 0)
>> List(785912, 825254)
>> List(Infinity, Infinity)
>>
>> My question is: it does not appear that the
>> union().groupBy().filter() segment is working (the List(0,0) output).
>> The app is not failing, it finishes just fine.
>>
>> Any ideas?
>> Ognen
>>
>
> --
> Some people, when confronted with a problem, think "I know, I'll use
> regular expressions." Now they have two problems.
> -- Jamie Zawinski
>
>


Re: Can anyone offer any insight at all?

2014-03-07 Thread Ognen Duzlevski
Strike that. Figured it out. Don't you just hate it when you fire off an 
email and you figure it out as it is being sent? ;)

Ognen

On 3/7/14, 12:41 PM, Ognen Duzlevski wrote:

What is wrong with this code?

A condensed set of this code works in the spark-shell.

It does not work when deployed via a jar.

def 
calcSimpleRetention(start:String,end:String,event1:String,event2:String):List[Double] 
= {

val spd = new PipelineDate(start)
val epd = new PipelineDate(end)
// filter for event1 events and return RDDs that are maps of 
user_ids and 0

val f = sc.textFile(spd.toJsonHdfsFileName)
val ev1rdd = 
f.filter(_.split(",")(0).split(":")(1).replace("\"","") == 
event1).map(line => 
(line.split(",")(2).split(":")(1).replace("\"",""),1)).cache

val ev1c = ev1rdd.count.toDouble

// do the same as above for event2 events, only substitute 0s with 1s
val ev2rdds = for {
   dt <- PipelineDate.getPeriod(spd+1,epd)
   val f1 = sc.textFile(dt.toJsonHdfsFileName)
} yield (f1.filter(_.split(",")(0).split(":")(1).replace("\"","") 
== event2).map(line => 
(line.split(",")(2).split(":")(1).replace("\"",""),1)).distinct)


// cache all event1 and event2 RDDs
ev2rdds.foreach(_.cache)
val cts = for {
  ev2 <- ev2rdds
} yield ev2.count

val retent = for {
  ev2rdd <- ev2rdds
  val ret = ev1rdd.union(ev2rdd).groupByKey()
} yield ret.filter(e => e._2.length > 1 && 
e._2.filter(_==0).length>0)


val rcts = retent.map(_.count)
println("--") 


println(s"${rcts}")
println(s"${cts}")

for {
  c <- rcts
} yield(ev1c/c.toDouble)
//Map(result:_*)
  }

This is what this code prints:
List(0, 0)
List(785912, 825254)
List(Infinity, Infinity)

My question is: it does not appear that the 
union().groupBy().filter() segment is working (the List(0,0) 
output). The app is not failing, it finishes just fine.


Any ideas?
Ognen


--
Some people, when confronted with a problem, think "I know, I'll use regular 
expressions." Now they have two problems.
-- Jamie Zawinski



Can anyone offer any insight at all?

2014-03-07 Thread Ognen Duzlevski

What is wrong with this code?

A condensed set of this code works in the spark-shell.

It does not work when deployed via a jar.

def 
calcSimpleRetention(start:String,end:String,event1:String,event2:String):List[Double] 
= {

val spd = new PipelineDate(start)
val epd = new PipelineDate(end)
// filter for event1 events and return RDDs that are maps of 
user_ids and 0

val f = sc.textFile(spd.toJsonHdfsFileName)
val ev1rdd = 
f.filter(_.split(",")(0).split(":")(1).replace("\"","") == 
event1).map(line => 
(line.split(",")(2).split(":")(1).replace("\"",""),1)).cache

val ev1c = ev1rdd.count.toDouble

// do the same as above for event2 events, only substitute 0s with 1s
val ev2rdds = for {
   dt <- PipelineDate.getPeriod(spd+1,epd)
   val f1 = sc.textFile(dt.toJsonHdfsFileName)
} yield (f1.filter(_.split(",")(0).split(":")(1).replace("\"","") 
== event2).map(line => 
(line.split(",")(2).split(":")(1).replace("\"",""),1)).distinct)


// cache all event1 and event2 RDDs
ev2rdds.foreach(_.cache)
val cts = for {
  ev2 <- ev2rdds
} yield ev2.count

val retent = for {
  ev2rdd <- ev2rdds
  val ret = ev1rdd.union(ev2rdd).groupByKey()
} yield ret.filter(e => e._2.length > 1 && e._2.filter(_==0).length>0)

val rcts = retent.map(_.count)
println("--")
println(s"${rcts}")
println(s"${cts}")

for {
  c <- rcts
} yield(ev1c/c.toDouble)
//Map(result:_*)
  }

This is what this code prints:
List(0, 0)
List(785912, 825254)
List(Infinity, Infinity)

My question is: it does not appear that the 
union().groupBy().filter() segment is working (the List(0,0) 
output). The app is not failing, it finishes just fine.


Any ideas?
Ognen