Re: reduceByKeyAndWindow, but using log timestamps instead of clock seconds

2015-07-02 Thread Tobias Pfeiffer
Hi,

On Thu, Jan 29, 2015 at 9:52 AM, Tobias Pfeiffer t...@preferred.jp wrote:

 Hi,

 On Thu, Jan 29, 2015 at 1:54 AM, YaoPau jonrgr...@gmail.com wrote:

 My thinking is to maintain state in an RDD and update it an persist it
 with
 each 2-second pass, but this also seems like it could get messy.  Any
 thoughts or examples that might help me?


 I have just implemented some timestamp-based windowing on DStreams (can't
 share the code now, but will be published a couple of months ahead),
 although with the assumption that items are in correct order. The main
 challenge (rather technical) was to keep proper state across RDD boundaries
 and to tell the state you can mark this partial window from the last
 interval as 'complete' now without shuffling too much data around. For
 example, if there are some empty intervals, you don't know when the next
 item to go into the partial window will arrive, or if there will be one at
 all. I guess if you want to have out-of-order tolerance, that will become
 even trickier, as you need to define and think about some timeout for
 partial windows in your state...


Sorry, it took ages to get the code published, and I am not involved in
that project any more and cannot provide a lot of support, but if you are
interested in the code, here it is:

https://github.com/jubatus/jubaql-server/blob/master/processor/src/main/scala/us/jubat/jubaql_server/processor/SlidingWindow.scala

The usage can be seen in

https://github.com/jubatus/jubaql-server/blob/master/processor/src/test/scala/us/jubat/jubaql_server/processor/SlidingStreamSpec.scala

and it basically boils down to

  val inputStream: DStream[(Long, T)] = ssc.queueStream(itemsQueue,
oneAtATime = true)
  val windowStream: DStream[(Long, (Long, T))] =
SlidingWindow.byTimestamp(inputStream,
length, step)

where the first Long in the result is a window ID and the second Long is
the original timestamp, so now you can use any of the groupByKey,
reduceByKey etc. functions.

Hope this helps anyone,
Tobias


Re: Spark shell never leaves ACCEPTED state in YARN CDH5

2015-03-25 Thread Tobias Pfeiffer
Hi,

On Thu, Mar 26, 2015 at 4:08 AM, Khandeshi, Ami 
ami.khande...@fmr.com.invalid wrote:

 I am seeing the same behavior.  I have enough resources…..


CPU *and* memory are sufficient? No previous (unfinished) jobs eating them?

Tobias


Re: SchemaRDD: SQL Queries vs Language Integrated Queries

2015-03-11 Thread Tobias Pfeiffer
Hi,

On Wed, Mar 11, 2015 at 11:05 PM, Cesar Flores ces...@gmail.com wrote:

 Thanks for both answers. One final question. *This registerTempTable is
 not an extra process that the SQL queries need to do that may decrease
 performance over the language integrated method calls? *


As far as I know, registerTempTable is just a Map[String, SchemaRDD]
insertion, nothing that would be measurable. But there are no
distributed/RDD operations involved, I think.

Tobias


Re: Timed out while stopping the job generator plus subsequent failures

2015-03-11 Thread Tobias Pfeiffer
Sean,

On Wed, Mar 11, 2015 at 7:43 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 it seems like I am unable to shut down my StreamingContext properly, both
 in local[n] and yarn-cluster mode. In addition, (only) in yarn-cluster
 mode, subsequent use of a new StreamingContext will raise
 an InvalidActorNameException.


I was wondering if this is related to your question on spark-dev
  http://tinyurl.com/q5cd5px
Did you get any additional insight into this issue?

In my case the processing of the first batch completes, but I don't know if
there is anything wrong with the checkpoints? When I look to the
corresponding checkpoint directory in HDFS, it doesn't seem like all state
RDDs are persisted there, just a subset. Any ideas?

Thanks
Tobias


Re: SQL with Spark Streaming

2015-03-11 Thread Tobias Pfeiffer
Hi,

On Thu, Mar 12, 2015 at 12:08 AM, Huang, Jie jie.hu...@intel.com wrote:

 According to my understanding, your approach is to register a series of
 tables by using transformWith, right? And then, you can get a new Dstream
 (i.e., SchemaDstream), which consists of lots of SchemaRDDs.


Yep, it's basically the following:

case class SchemaDStream(sqlc: SQLContext,
 dataStream: DStream[Row],
 schemaStream: DStream[StructType]) {
  def registerStreamAsTable(name: String): Unit = {
foreachRDD(_.registerTempTable(name))
  }

  def foreachRDD(func: SchemaRDD = Unit): Unit = {
def executeFunction(dataRDD: RDD[Row], schemaRDD: RDD[StructType]):
RDD[Unit] = {
  val schema: StructType = schemaRDD.collect.head
  val dataWithSchema: SchemaRDD = sqlc.applySchema(dataRDD, schema)
  val result = func(dataWithSchema)
  schemaRDD.map(x = result) // return RDD[Unit]
}
dataStream.transformWith(schemaStream, executeFunction
_).foreachRDD(_.count())
  }

}

In a similar way you could add a `transform(func: SchemaRDD = SchemaRDD)`
method. But as I said, I am not sure about performance.

Tobias


Timed out while stopping the job generator plus subsequent failures

2015-03-11 Thread Tobias Pfeiffer
Hi,

it seems like I am unable to shut down my StreamingContext properly, both
in local[n] and yarn-cluster mode. In addition, (only) in yarn-cluster
mode, subsequent use of a new StreamingContext will raise
an InvalidActorNameException.

I use

  logger.info(stoppingStreamingContext)
  staticStreamingContext.stop(stopSparkContext=false,
stopGracefully=true)
  logger.debug(done)

and have in my output logs

  19:16:47.708 [ForkJoinPool-2-worker-11] INFO  stopping StreamingContext
[... output from other threads ...]
  19:17:07.729 [ForkJoinPool-2-worker-11] WARN  scheduler.JobGenerator -
Timed out while stopping the job generator (timeout = 2)
  19:17:07.739 [ForkJoinPool-2-worker-11] DEBUG done

The processing itself is complete, i.e., the batch currently processed at
the time of stop() is finished and no further batches are processed.
However, something keeps the streaming context from stopping properly. In
local[n] mode, this is not actually a problem (other than I have to wait 20
seconds for shutdown), but in yarn-cluster mode, I get an error

  akka.actor.InvalidActorNameException: actor name [JobGenerator] is not
unique!

when I start a (newly created) StreamingContext, and I was wondering what
* is the issue with stop()
* is the difference between local[n] and yarn-cluster mode.

Some possible reasons:
* On my executors, I use a networking library that depends on netty and
doesn't properly shut down the event loop. (That has not been a problem in
the past, though.)
* I have a non-empty state (from using updateStateByKey()) that is
checkpointed to /tmp/spark (in local mode) and hdfs:///tmp/spark (in
yarn-cluster) mode, could that be an issue? (In fact, I have not seen this
error in any non-stateful stream applications before.)

Any help much appreciated!

Thanks
Tobias


Re: Timed out while stopping the job generator plus subsequent failures

2015-03-11 Thread Tobias Pfeiffer
Hi,

I discovered what caused my issue when running on YARN and was able to work
around it.

On Wed, Mar 11, 2015 at 7:43 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 The processing itself is complete, i.e., the batch currently processed at
 the time of stop() is finished and no further batches are processed.
 However, something keeps the streaming context from stopping properly. In
 local[n] mode, this is not actually a problem (other than I have to wait 20
 seconds for shutdown), but in yarn-cluster mode, I get an error

   akka.actor.InvalidActorNameException: actor name [JobGenerator] is not
 unique!


It seems that not all checkpointed RDDs are cleaned (metadata cleared,
checkpoint directories deleted etc.?) at the time when the streamingContext
is stopped, but only afterwards. In particular, when I add
`Thread.sleep(5000)` after my streamingContext.stop() call, then it works
and I can start a different streamingContext afterwards.

This is pretty ugly, so does anyone know a method to poll whether it's safe
to continue or whether there are still some RDDs waiting to be cleaned up?

Thanks
Tobias


Re: SchemaRDD: SQL Queries vs Language Integrated Queries

2015-03-10 Thread Tobias Pfeiffer
Hi,

On Tue, Mar 10, 2015 at 2:13 PM, Cesar Flores ces...@gmail.com wrote:

 I am new to the SchemaRDD class, and I am trying to decide in using SQL
 queries or Language Integrated Queries (
 https://spark.apache.org/docs/1.2.0/api/scala/index.html#org.apache.spark.sql.SchemaRDD
 ).

 Can someone tell me what is the main difference between the two
 approaches, besides using different syntax? Are they interchangeable? Which
 one has better performance?


One difference is that the language integrated queries are method calls on
the SchemaRDD you want to work on, which requires you have access to the
object at hand. The SQL queries are passed to a method of the SQLContext
and you have to call registerTempTable() on the SchemaRDD you want to use
beforehand, which can basically happen at an arbitrary location of your
program. (I don't know if I could express what I wanted to say.) That may
have an influence on how you design your program and how the different
parts work together.

Tobias


Re: SQL with Spark Streaming

2015-03-10 Thread Tobias Pfeiffer
Hi,

On Wed, Mar 11, 2015 at 9:33 AM, Cheng, Hao hao.ch...@intel.com wrote:

  Intel has a prototype for doing this, SaiSai and Jason are the authors.
 Probably you can ask them for some materials.


The github repository is here: https://github.com/intel-spark/stream-sql

Also, what I did is writing a wrapper class SchemaDStream that internally
holds a DStream[Row] and a DStream[StructType] (the latter having just one
element in every RDD) and then allows to do
- operations SchemaRDD = SchemaRDD using
`rowStream.transformWith(schemaStream, ...)`
- in particular you can register this stream's data as a table this way
- and via a companion object with a method `fromSQL(sql: String):
SchemaDStream` you can get a new stream from previously registered tables.

However, you are limited to batch-internal operations, i.e., you can't
aggregate across batches.

I am not able to share the code at the moment, but will within the next
months. It is not very advanced code, though, and should be easy to
replicate. Also, I have no idea about the performance of transformWith

Tobias


Re: Spark with data on NFS v HDFS

2015-03-05 Thread Tobias Pfeiffer
Hi,

On Thu, Mar 5, 2015 at 10:58 PM, Ashish Mukherjee 
ashish.mukher...@gmail.com wrote:

 I understand Spark can be used with Hadoop or standalone. I have certain
 questions related to use of the correct FS for Spark data.

 What is the efficiency trade-off in feeding data to Spark from NFS v HDFS?


As I understand it, one performance advantage of using HDFS is that the
task will be computed at a cluster node that has data on its local disk
already, so the tasks go to where the data is. In the case of NFS, all data
must be downloaded from the file server(s) first, so there is no such thing
as data locality.

Tobias


Re: scala.Double vs java.lang.Double in RDD

2015-03-04 Thread Tobias Pfeiffer
Hi,

On Thu, Mar 5, 2015 at 12:20 AM, Imran Rashid iras...@cloudera.com wrote:

 This doesn't involve spark at all, I think this is entirely an issue with
 how scala deals w/ primitives and boxing.  Often it can hide the details
 for you, but IMO it just leads to far more confusing errors when things
 don't work out.  The issue here is that your map has value type Any, which
 leads scala to leave it as a boxed java.lang.Double.


I see, thank you very much for your explanation and the code examples!
Helps very much!

Thanks
Tobias


scala.Double vs java.lang.Double in RDD

2015-03-04 Thread Tobias Pfeiffer
Hi,

I have a function with signature

  def aggFun1(rdd: RDD[(Long, (Long, Double))]):
RDD[(Long, Any)]

and one with

  def aggFun2[_Key: ClassTag, _Index](rdd: RDD[(_Key, (_Index, Double))]):
RDD[(_Key, Double)]

where all Double classes involved are scala.Double classes (according
to IDEA) and my implementation of aggFun1 is just calling aggFun2 (type
parameters _Key and _Index are inferred by the Scala compiler).

Now I am writing a test as follows:

  val result: Map[Long, Any] = aggFun1(input).collect().toMap
  result.values.foreach(v = println(v.getClass))
  result.values.foreach(_ shouldBe a[Double])

and I get the following output:

  class java.lang.Double
  class java.lang.Double
  [info] avg
  [info] - should compute the average *** FAILED ***
  [info]   1.75 was not an instance of double, but an instance of
java.lang.Double

So I am wondering about what magic is going on here. Are scala.Double
values in RDDs automatically converted to java.lang.Doubles or am I just
missing the implicit back-conversion etc.?

Any help appreciated,
Tobias


Re: Spark sql results can't be printed out to system console from spark streaming application

2015-03-03 Thread Tobias Pfeiffer
Hi,

can you explain how you copied that into your *streaming* application?
Like, how do you issue the SQL, what data do you operate on, how do you
view the logs etc.?

Tobias

On Wed, Mar 4, 2015 at 8:55 AM, Cui Lin cui@hds.com wrote:



 Dear all,
 
 I found the below sample code can be printed out only in spark shell, but
 when I moved them into my spark streaming application, nothing can be
 printed out into system console. Can you explain why it happened? anything
 related to new spark context? Thanks a lot!
 
 
 val anotherPeopleRDD = sc_context.parallelize(
   {name:Yin,address:{city:Columbus,state:Ohio}} ::
 Nil)
 
 anotherPeopleRDD.toArray().foreach(line = System.out.println(line))
 
 val jsonMessage = sqlContext.jsonRDD(anotherPeopleRDD)
 
 jsonMessage.toArray().foreach(line = System.out.println(line))
 
 jsonMessage.registerTempTable(people)
 
 val test: SchemaRDD = sqlContext.sql(select count(*) from people)
 
 test.toArray().foreach(line = System.out.println(line))
 
 
 
 
 Best regards,
 
 Cui Lin
 
 
 
 
 


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Issue with yarn cluster - hangs in accepted state.

2015-03-03 Thread Tobias Pfeiffer
Hi,

On Wed, Mar 4, 2015 at 6:20 AM, Zhan Zhang zzh...@hortonworks.com wrote:

  Do you have enough resource in your cluster? You can check your resource
 manager to see the usage.


Yep, I can confirm that this is a very annoying issue. If there is not
enough memory or VCPUs available, your app will just stay in ACCEPTED state
until resources are available.

You can have a look at

https://github.com/jubatus/jubaql-docker/blob/master/hadoop/yarn-site.xml#L35
to see some settings that might help.

Tobias


Re: Best practices for query creation in Spark SQL.

2015-03-02 Thread Tobias Pfeiffer
Hi,

I think your chances for a satisfying answer would increase dramatically if
you elaborated a bit more on what you actually want to know.
(Holds for any of your last four questions about Spark SQL...)

Tobias


Re: Dealing with 'smaller' data

2015-02-26 Thread Tobias Pfeiffer
Hi

On Fri, Feb 27, 2015 at 10:50 AM, Gary Malouf malouf.g...@gmail.com wrote:

 The honest answer is that it is unclear to me at this point.  I guess what
 I am really wondering is if there are cases where one would find it
 beneficial to use Spark against one or more RDBs?


Well, RDBs are all about *storage*, while Spark is about *computation*. If
you have a very expensive computation (that can be parallelized in some
way), then you might want to use Spark, even though your data lives in an
ordinary RDB. Think raytracing, where you do something for every pixel in
the output image and you could get your scene description from a database,
write the result to a database, but use Spark to do two minutes of
calculation for every pixel in parallel (or so).

Tobias


Re: Dealing with 'smaller' data

2015-02-26 Thread Tobias Pfeiffer
Gary,

On Fri, Feb 27, 2015 at 8:40 AM, Gary Malouf malouf.g...@gmail.com wrote:

 I'm considering whether or not it is worth introducing Spark at my new
 company.  The data is no-where near Hadoop size at this point (it sits in
 an RDS Postgres cluster).


Will it ever become Hadoop size? Looking at the overhead of running even
a simple Hadoop setup (securely and with good performance, given about 1e6
configuration parameters), I think it makes sense to stay in non-Hadoop
mode as long as possible. People may disagree ;-)

Tobias

PS. You may also want to have a look at
http://aadrake.com/command-line-tools-can-be-235x-faster-than-your-hadoop-cluster.html


Re: Dealing with 'smaller' data

2015-02-26 Thread Tobias Pfeiffer
On Fri, Feb 27, 2015 at 10:57 AM, Gary Malouf malouf.g...@gmail.com wrote:

 So when deciding whether to take on installing/configuring Spark, the size
 of the data does not automatically make that decision in your mind.


You got me there ;-)

Tobias


Re: Spark Streaming - Collecting RDDs into array in the driver program

2015-02-25 Thread Tobias Pfeiffer
Hi,

On Thu, Feb 26, 2015 at 11:24 AM, Thanigai Vellore 
thanigai.vell...@gmail.com wrote:

 It appears that the function immediately returns even before the
 foreachrdd stage is executed. Is that possible?

Sure, that's exactly what happens. foreachRDD() schedules a computation, it
does not perform it. Maybe your streaming application would not ever
terminate, but still the function needs to return, right?

If you remove the toArray(), you will return a reference to the ArrayBuffer
that will be appended to over time. You can then, in a different thread,
check the contents of that ArrayBuffer as processing happens, or wait until
processing ends.

Tobias


Re: Why must the dstream.foreachRDD(...) parameter be serializable?

2015-02-23 Thread Tobias Pfeiffer
Sean,

thanks for your message!

On Mon, Feb 23, 2015 at 6:03 PM, Sean Owen so...@cloudera.com wrote:

 What I haven't investigated is whether you can enable checkpointing
 for the state in updateStateByKey separately from this mechanism,
 which is exactly your question. What happens if you set a checkpoint
 dir, but do *not* use StreamingContext.getOrCreate, but *do* call
 DStream.checkpoint?


I didn't even use StreamingContext.getOrCreate(), just calling
streamingContext.checkpoint(...) blew everything up. Well, blew up in the
sense that actor.OneForOneStrategy will print the stack trace of
the java.io.NotSerializableException every couple of seconds and
something is not going right with execution (I think).

BUT, indeed, just calling sparkContext.setCheckpointDir seems to be
sufficient for updateStateByKey! Looking at what
streamingContext.checkpoint() does, I don't get why ;-) and I am not sure
that this is a robust solution, but in fact that seems to work!

Thanks a lot,
Tobias


Re: Why must the dstream.foreachRDD(...) parameter be serializable?

2015-02-23 Thread Tobias Pfeiffer
Hi,

On Tue, Feb 24, 2015 at 4:34 AM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 There are different kinds of checkpointing going on. updateStateByKey
 requires RDD checkpointing which can be enabled only by called
 sparkContext.setCheckpointDirectory. But that does not enable Spark
 Streaming driver checkpoints, which is necessary for recovering from driver
 failures. That is enabled only by streamingContext.checkpoint(...) which
 internally calls sparkContext.setCheckpointDirectory and also enables other
 stuff.


I see, thank you very much! I'm happy to see I will not have to rewrite the
entire application :-)

Tobias


Re: Use Spark Streaming for Batch?

2015-02-22 Thread Tobias Pfeiffer
Hi,

On Sat, Feb 21, 2015 at 1:05 AM, craigv craigvanderbo...@gmail.com wrote:
  /Might it be possible to perform large batches processing on HDFS time
  series data using Spark Streaming?/
 
  1.I understand that there is not currently an InputDStream that could do
  what's needed.  I would have to create such a thing.
  2. Time is a problem.  I would have to use the timestamps on our events
 for
  any time-based logic and state management
  3. The batch duration would become meaningless in this scenario.
 Could I
  just set it to something really small (say 1 second) and then let it
 fall
  behind, processing the data as quickly as it could?


So, if it is not an issue for you if everything is processed in one batch,
you can use streamingContext.textFileStream(hdfsDirectory). This will
create a DStream that has a huge RDD with all data in the first batch and
then empty batches afterwards. You can have small batch size, should not be
a problem.
An alternative would be to write some code that creates one RDD per file in
your HDFS directory, create a Queue of those RDDs and then use
streamingContext.queueStream(), possibly with the oneAtATime=true parameter
(which will process only one RDD per batch).

However, to do window computations etc with the timestamps embedded *in*
your data will be a major effort, as in: You cannot use the existing
windowing functionality from Spark Streaming. If you want to read more
about that, there have been a number of discussions about that topic on
this list; maybe you can look them up.

Tobias


Re: SQL query over (Long, JSON string) tuples

2015-01-29 Thread Tobias Pfeiffer
Hi Ayoub,

thanks for your mail!

On Thu, Jan 29, 2015 at 6:23 PM, Ayoub benali.ayoub.i...@gmail.com wrote:

 SQLContext and hiveContext have a jsonRDD method which accept an
 RDD[String] where the string is a JSON String a returns a SchemaRDD, it
 extends RDD[Row] which the type you want.

 After words you should be able to do a join to keep your tuple.


I'm afraid that's not so easy, because you can only join on a certain key,
and the key is exactly what I have to drop in order to infer the schema.

Thanks
Tobias


SQL query over (Long, JSON string) tuples

2015-01-29 Thread Tobias Pfeiffer
Hi,

I have data as RDD[(Long, String)], where the Long is a timestamp and the
String is a JSON-encoded string. I want to infer the schema of the JSON and
then do a SQL statement on the data (no aggregates, just column selection
and UDF application), but still have the timestamp associated with each row
of the result. I completely fail to see how that would be possible. Any
suggestions?

I can't even see how I would get an RDD[(Long, Row)] so that I *might* be
able to add the timestamp to the row after schema inference. Is there *any*
way other than string-manipulating the JSON string and adding the timestamp
to it?

Thanks
Tobias


Re: spark challenge: zip with next???

2015-01-29 Thread Tobias Pfeiffer
Hi,

On Fri, Jan 30, 2015 at 6:32 AM, Ganelin, Ilya ilya.gane...@capitalone.com
wrote:

  Make a copy of your RDD with an extra entry in the beginning to offset.
 The you can zip the two RDDs and run a map to generate an RDD of
 differences.


Does that work? I recently tried something to compute differences between
each entry and the next, so I did
  val rdd1 = ... // null element + rdd
  val rdd2 = ... // rdd + null element
but got an error message about zip requiring data sizes in each partition
to match.

Tobias


Re: reduceByKeyAndWindow, but using log timestamps instead of clock seconds

2015-01-28 Thread Tobias Pfeiffer
Hi,

On Thu, Jan 29, 2015 at 1:54 AM, YaoPau jonrgr...@gmail.com wrote:

 My thinking is to maintain state in an RDD and update it an persist it with
 each 2-second pass, but this also seems like it could get messy.  Any
 thoughts or examples that might help me?


I have just implemented some timestamp-based windowing on DStreams (can't
share the code now, but will be published a couple of months ahead),
although with the assumption that items are in correct order. The main
challenge (rather technical) was to keep proper state across RDD boundaries
and to tell the state you can mark this partial window from the last
interval as 'complete' now without shuffling too much data around. For
example, if there are some empty intervals, you don't know when the next
item to go into the partial window will arrive, or if there will be one at
all. I guess if you want to have out-of-order tolerance, that will become
even trickier, as you need to define and think about some timeout for
partial windows in your state...

Tobias


Error reporting/collecting for users

2015-01-27 Thread Tobias Pfeiffer
Hi,

in my Spark Streaming application, computations depend on users' input in
terms of
 * user-defined functions
 * computation rules
 * etc.
that can throw exceptions in various cases (think: exception in UDF,
division by zero, invalid access by key etc.).

Now I am wondering about what is a good/reasonable way to deal with those
errors. I think I want to continue processing (the whole stream processing
pipeline should not die because of one single malformed item in the
stream), i.e., catch the exception, but still I need a way to tell the user
something went wrong.

So how can I get the information that something went wrong back to the
driver and what is a reasonable way to do that?

While writing this, something like the following came into my mind:
  val errors = sc.accumulator(...) // of type List[Throwable]
  dstream.map(item = {
Try {
  someUdf(item)
} match {
  case Success(value) =
value
  case Failure(err) =
errors += err  // remember error
0  // default value
}
  })
Does this make sense?

Thanks
Tobias


Re: Error reporting/collecting for users

2015-01-27 Thread Tobias Pfeiffer
Hi,

On Wed, Jan 28, 2015 at 1:45 PM, Soumitra Kumar kumar.soumi...@gmail.com
wrote:

 It is a Streaming application, so how/when do you plan to access the
 accumulator on driver?


Well... maybe there would be some user command or web interface showing the
errors that have happened during processing...?

Thanks
Tobias


Re: Error reporting/collecting for users

2015-01-27 Thread Tobias Pfeiffer
Hi,

thanks for your mail!

On Wed, Jan 28, 2015 at 11:44 AM, Tathagata Das tathagata.das1...@gmail.com
 wrote:

 That seems reasonable to me. Are you having any problems doing it this way?


Well, actually I haven't done that yet. The idea of using accumulators to
collect errors just came while writing the email, but I thought I'd just
keep writing and see if anyone has any other suggestions ;-)

Thanks
Tobias


Why must the dstream.foreachRDD(...) parameter be serializable?

2015-01-27 Thread Tobias Pfeiffer
Hi,

I want to do something like

dstream.foreachRDD(rdd = if (someCondition) ssc.stop())

so in particular the function does not touch any element in the RDD and
runs completely within the driver. However, this fails with a
NotSerializableException because $outer is not serializable etc. The
DStream code says:

  def foreachRDD(foreachFunc: (RDD[T], Time) = Unit) {
// because the DStream is reachable from the outer object here, and
because
// DStreams can't be serialized with closures, we can't proactively
check
// it for serializability and so we pass the optional false to
SparkContext.clean
new ForEachDStream(this, context.sparkContext.clean(foreachFunc,
false)).register()
  }

To be honest, I don't understand the comment. Why must that function be
serializable even when there is no RDD action involved?

Thanks
Tobias


Re: Why must the dstream.foreachRDD(...) parameter be serializable?

2015-01-27 Thread Tobias Pfeiffer
Hi,

thanks for the answers!

On Wed, Jan 28, 2015 at 11:31 AM, Shao, Saisai saisai.s...@intel.com
wrote:

 Also this `foreachFunc` is more like an action function of RDD, thinking
 of rdd.foreach(func), in which `func` need to be serializable. So maybe I
 think your way of use it is not a normal way :).


Yeah I totally understand why func in rdd.foreach(func) must be
serializable (because it's sent to the executors), but I didn't get why a
function that's not shipped around must be serializable, too.

The explanations made sense, though :-)

Thanks
Tobias


Re: Pairwise Processing of a List

2015-01-25 Thread Tobias Pfeiffer
Hi,

On Mon, Jan 26, 2015 at 9:32 AM, Steve Nunez snu...@hortonworks.com wrote:

  I’ve got a list of points: List[(Float, Float)]) that represent (x,y)
 coordinate pairs and need to sum the distance. It’s easy enough to compute
 the distance:


Are you saying you want all combinations (N^2) of distances? That should be
possible with rdd.cartesian():

val points = sc.parallelize(List((1.0, 2.0), (3.0, 4.0), (5.0, 6.0)))
points.cartesian(points).collect
-- Array[((Double, Double), (Double, Double))] =
Array(((1.0,2.0),(1.0,2.0)), ((1.0,2.0),(3.0,4.0)), ((1.0,2.0),(5.0,6.0)),
((3.0,4.0),(1.0,2.0)), ((3.0,4.0),(3.0,4.0)), ((3.0,4.0),(5.0,6.0)),
((5.0,6.0),(1.0,2.0)), ((5.0,6.0),(3.0,4.0)), ((5.0,6.0),(5.0,6.0)))

I guess this is a very expensive operation, though.

Tobias


Re: spark streaming with checkpoint

2015-01-25 Thread Tobias Pfeiffer
Hi,

On Tue, Jan 20, 2015 at 8:16 PM, balu.naren balu.na...@gmail.com wrote:

 I am a beginner to spark streaming. So have a basic doubt regarding
 checkpoints. My use case is to calculate the no of unique users by day. I
 am using reduce by key and window for this. Where my window duration is 24
 hours and slide duration is 5 mins.

Adding to what others said, this feels more like a task for run a Spark
job every five minutes using cron than using the sliding window
functionality from Spark Streaming.

Tobias


Re: [SQL] Conflicts in inferred Json Schemas

2015-01-25 Thread Tobias Pfeiffer
Hi,

On Thu, Jan 22, 2015 at 2:26 AM, Corey Nolet cjno...@gmail.com wrote:

 Let's say I have 2 formats for json objects in the same file
 schema1 = { location: 12345 My Lane }
 schema2 = { location:{houseAddres:1234 My Lane} }

 From my tests, it looks like the current inferSchema() function will end
 up with only StructField(location, StringType).


In Spark SQL columns need to have a well-defined type (as in SQL in
general). So inferring the schema requires that there is a schema, and
I am afraid that there is not an easy way to achieve what you want in Spark
SQL, as there is no data type covering both values you see. (I am pretty
sure it can be done if you dive deep into the internals, add data types
etc., though.)

Tobias


Re: Serializability: for vs. while loops

2015-01-25 Thread Tobias Pfeiffer
Aaron,

On Thu, Jan 15, 2015 at 5:05 PM, Aaron Davidson ilike...@gmail.com wrote:

 Scala for-loops are implemented as closures using anonymous inner classes
 which are instantiated once and invoked many times. This means, though,
 that the code inside the loop is actually sitting inside a class, which
 confuses Spark's Closure Cleaner, whose job is to remove unused references
 from closures to make otherwise-unserializable objects serializable.

 My understanding is, in particular, that the closure cleaner will null out
 unused fields in the closure, but cannot go past the first level of depth
 (i.e., it will not follow field references and null out *their *unused,
 and possibly unserializable, references), because this could end up
 mutating state outside of the closure itself. Thus, the extra level of
 depth of the closure that was introduced by the anonymous class (where
 presumably the outer this pointer is considered used by the closure
 cleaner) is sufficient to make it unserializable.


Now, two weeks later, let me add that this is one of the most helpful
comments I have received on this mailing list! This insight helped me save
90% of the time I spent with debugging NotSerializableExceptions.
Thank you very much!

Tobias


Re: Pairwise Processing of a List

2015-01-25 Thread Tobias Pfeiffer
Sean,

On Mon, Jan 26, 2015 at 10:28 AM, Sean Owen so...@cloudera.com wrote:

 Note that RDDs don't really guarantee anything about ordering though,
 so this only makes sense if you've already sorted some upstream RDD by
 a timestamp or sequence number.


Speaking of order, is there some reading on guarantees and non-guarantees
about order in RDDs? For example, when reading a file and doing
zipWithIndex, can I assume that the lines are numbered in order? Does this
hold for receiving data from Kafka, too?

Tobias


Re: Closing over a var with changing value in Streaming application

2015-01-21 Thread Tobias Pfeiffer
Hi,

On Wed, Jan 21, 2015 at 9:13 PM, Bob Tiernay btier...@hotmail.com wrote:

 Maybe I'm misunderstanding something here, but couldn't this be done with
 broadcast variables? I there is the following caveat from the docs:

 In addition, the object v should not be modified after it is broadcast
 in order to ensure that all nodes get the same value of the broadcast
 variable (e.g. if the variable is shipped to a new node later)


Well, I think I need a modifiable state (modifiable = changes once per
interval) that stores the number of total items seen so far in the
lifetime of my application, and I need this number on each executor. Since
this number changes after every interval processed, I think broadcast
variables are probably not appropriate in this case.

Thanks
Tobias


Closing over a var with changing value in Streaming application

2015-01-20 Thread Tobias Pfeiffer
Hi,

I am developing a Spark Streaming application where I want every item in my
stream to be assigned a unique, strictly increasing Long. My input data
already has RDD-local integers (from 0 to N-1) assigned, so I am doing the
following:

  var totalNumberOfItems = 0L
  // update the keys of the stream data
  val globallyIndexedItems = inputStream.map(keyVal =
  (keyVal._1 + totalNumberOfItems, keyVal._2))
  // increase the number of total seen items
  inputStream.foreachRDD(rdd = {
totalNumberOfItems += rdd.count
  })

Now this works on my local[*] Spark instance, but I was wondering if this
is actually an ok thing to do. I don't want this to break when going to a
YARN cluster...

The function increasing totalNumberOfItems is closing over a var and
running in the driver, so I think this is ok. Here is my concern: What
about the function in the inputStream.map(...) block? This one is closing
over a var that has a different value in every interval. Will the closure
be serialized with that new value in every interval? Or only once with the
initial value and this will always be 0 during the runtime of the program?

As I said, it works locally, but I was wondering if I can really assume
that the closure is serialized with a new value in every interval.

Thanks,
Tobias


Re: Closing over a var with changing value in Streaming application

2015-01-20 Thread Tobias Pfeiffer
Hi,

On Wed, Jan 21, 2015 at 4:46 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 How about using accumulators
 http://spark.apache.org/docs/1.2.0/programming-guide.html#accumulators?


As far as I understand, they solve the part of the problem that I am not
worried about, namely increasing the counter. I was more worried about
getting that counter/accumulator value back to the executors.

Thanks
Tobias


Re: If an RDD appeared twice in a DAG, of which calculation is triggered by a single action, will this RDD be calculated twice?

2015-01-19 Thread Tobias Pfeiffer
Hi,

On Sat, Jan 17, 2015 at 3:37 AM, Peng Cheng pc...@uow.edu.au wrote:

 I'm talking about RDD1 (not persisted or checkpointed) in this situation:

 ...(somewhere) - RDD1 - RDD2
   ||
  V   V
  RDD3 - RDD4 - Action!

 To my experience the change RDD1 get recalculated is volatile, sometimes
 once, sometimes twice.


That should not happen if your access pattern to RDD2 and RDD3 is always
the same.

A related problem might be in $SQLContest.jsonRDD(), since the source
 jsonRDD is used twice (one for schema inferring, another for data read). It
 almost guarantees that the source jsonRDD is calculated twice. Has this
 problem be addressed so far?


That's exactly why schema inference is expensive. However, I am afraid in
general you have to make a decision between store or recompute (cf.
http://en.wikipedia.org/wiki/Space%E2%80%93time_tradeoff). There is no way
to avoid recomputation on each access except than storing the value, I
guess.

Tobias


Re: How to get the master URL at runtime inside driver program?

2015-01-19 Thread Tobias Pfeiffer
Hi,

On Sun, Jan 18, 2015 at 11:08 AM, guxiaobo1982 guxiaobo1...@qq.com wrote:

 Driver programs submitted by the spark-submit script will get the runtime
 spark master URL, but how it get the URL inside the main method when
 creating the SparkConf object?


The master will be stored in the spark.master property. I use the following
snippet:

// When run through spark-submit, the Java system property
spark.master
// will contain the master passed to spark-submit and we *must* use the
// same; otherwise use local[3].
val master = scala.util.Properties.propOrElse(spark.master,
local[3])

Tobias


Re: MatchError in JsonRDD.toLong

2015-01-19 Thread Tobias Pfeiffer
Hi,

On Fri, Jan 16, 2015 at 6:14 PM, Wang, Daoyuan daoyuan.w...@intel.com
wrote:

 The second parameter of jsonRDD is the sampling ratio when we infer schema.


OK, I was aware of this, but I guess I understand the problem now. My
sampling ratio is so low that I only see the Long values of data items and
infer it's a Long. When I meet the data that's actually longer than Long, I
get the error I posted; basically it's the same situation as when
specifying a wrong schema manually.

So is there any way around this other than increasing the sample ratio to
discover also the very BigDecimal-sized numbers?

Thanks
Tobias


Re: Determine number of running executors

2015-01-19 Thread Tobias Pfeiffer
Hi,

On Sat, Jan 17, 2015 at 3:05 AM, Shuai Zheng szheng.c...@gmail.com wrote:

 Can you share more information about how do you do that? I also have
 similar question about this.


Not very proud about it ;-), but here you go:

// find the number of workers available to us.
val _runCmd = scala.util.Properties.propOrElse(sun.java.command, )
val numCoresRe = .*--executor-cores ([0-9]+) --num-executors ([0-9]+).*.r
val totalNumCores = _runCmd match {
  case numCoresRe(coresPerExecutor, numExecutors) =
coresPerExecutor.toInt * numExecutors.toInt
  case _ =
0
}
if (totalNumCores  0)
  logger.debug(total number of cores:  + totalNumCores)
else
  logger.warn(could not extract number of cores from run command:  + _runCmd)

Tobias


Re: MatchError in JsonRDD.toLong

2015-01-16 Thread Tobias Pfeiffer
Hi again,

On Fri, Jan 16, 2015 at 4:25 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 Now I'm wondering where this comes from (I haven't touched this component
 in a while, nor upgraded Spark etc.) [...]


So the reason that the error is showing up now is that suddenly data from a
different dataset is showing up in my test dataset... don't ask me...
anyway, this different dataset contains data like

  {Click:nonclicked, Impression:1,
   DisplayURL:4401798909506983219, AdId:21215341, ...}
  {Click:nonclicked, Impression:1,
   DisplayURL:14452800566866169008, AdId:10587781, ...}

and the DisplayURL seems to be too long for Long, while it is still
inferred as a Long column.

So, what to do about this? Is jsonRDD inherently incapable of handling
those long numbers or is it just an issue in the schema inference and I
should file a JIRA issue?

Thanks
Tobias


Re: MatchError in JsonRDD.toLong

2015-01-16 Thread Tobias Pfeiffer
Hi,

On Fri, Jan 16, 2015 at 5:55 PM, Wang, Daoyuan daoyuan.w...@intel.com
wrote:

 Can you provide how you create the JsonRDD?


This should be reproducible in the Spark shell:

-
import org.apache.spark.sql._
val sqlc = new SparkContext(sc)
val rdd = sc.parallelize({Click:nonclicked, Impression:1,
DisplayURL:4401798909506983219, AdId:21215341} ::
 {Click:nonclicked, Impression:1,
DisplayURL:14452800566866169008, AdId:10587781} :: Nil)

// works fine
val json = sqlc.jsonRDD(rdd)
json.registerTempTable(test)
sqlc.sql(SELECT * FROM test).collect

// - MatchError
val json2 = sqlc.jsonRDD(rdd, 0.1)
json2.registerTempTable(test2)
sqlc.sql(SELECT * FROM test2).collect
-

I guess the issue in the latter case is that the column is inferred as Long
when some rows actually are too big for Long...

Thanks
Tobias


Re: Testing if an RDD is empty?

2015-01-15 Thread Tobias Pfeiffer
Hi,

On Fri, Jan 16, 2015 at 7:31 AM, freedafeng freedaf...@yahoo.com wrote:

 I myself saw many times that my app threw out exceptions because an empty
 RDD cannot be saved. This is not big issue, but annoying. Having a cheap
 solution testing if an RDD is empty would be nice if there is no such thing
 available now.


I think the cheapest you can have is computing at least one element in the
RDD, which in the case of, say,

  val maybeEmptyRDD = veryExpensiveRDD.filter(false)

will be just as expensive as .count().

Tobias


MatchError in JsonRDD.toLong

2015-01-15 Thread Tobias Pfeiffer
Hi,

I am experiencing a weird error that suddenly popped up in my unit tests. I
have a couple of HDFS files in JSON format and my test is basically
creating a JsonRDD and then issuing a very simple SQL query over it. This
used to work fine, but now suddenly I get:

15:58:49.039 [Executor task launch worker-1] ERROR executor.Executor -
Exception in task 1.0 in stage 29.0 (TID 117)
scala.MatchError: 14452800566866169008 (of class java.math.BigInteger)
at org.apache.spark.sql.json.JsonRDD$.toLong(JsonRDD.scala:282)
at org.apache.spark.sql.json.JsonRDD$.enforceCorrectType(JsonRDD.scala:353)
at
org.apache.spark.sql.json.JsonRDD$$anonfun$org$apache$spark$sql$json$JsonRDD$$asRow$1$$anonfun$apply$12.apply(JsonRDD.scala:381)
at scala.Option.map(Option.scala:145)
at
org.apache.spark.sql.json.JsonRDD$$anonfun$org$apache$spark$sql$json$JsonRDD$$asRow$1.apply(JsonRDD.scala:380)
at
org.apache.spark.sql.json.JsonRDD$$anonfun$org$apache$spark$sql$json$JsonRDD$$asRow$1.apply(JsonRDD.scala:365)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.sql.json.JsonRDD$.org$apache$spark$sql$json$JsonRDD$$asRow(JsonRDD.scala:365)
at
org.apache.spark.sql.json.JsonRDD$$anonfun$jsonStringToRow$1.apply(JsonRDD.scala:38)
at
org.apache.spark.sql.json.JsonRDD$$anonfun$jsonStringToRow$1.apply(JsonRDD.scala:38)
...

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

  
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)

The stack trace contains none of my classes, so it's a bit hard to track
down where this starts.

The code of JsonRDD.toLong is in fact

  private def toLong(value: Any): Long = {
value match {
  case value: java.lang.Integer = value.asInstanceOf[Int].toLong
  case value: java.lang.Long = value.asInstanceOf[Long]
}
  }

so if value is a BigInteger, toLong doesn't work. Now I'm wondering where
this comes from (I haven't touched this component in a while, nor upgraded
Spark etc.), but in particular I would like to know how to work around this.

Thanks
Tobias


Re: Serializability: for vs. while loops

2015-01-15 Thread Tobias Pfeiffer
Aaron,

thanks for your mail!

On Thu, Jan 15, 2015 at 5:05 PM, Aaron Davidson ilike...@gmail.com wrote:

 Scala for-loops are implemented as closures using anonymous inner classes
 [...]
 While loops, on the other hand, involve none of this trickery, and
 everyone is happy.


Ah, I was suspecting something like that... thank you very much for the
detailed explanation!

Tobias


Re: *ByKey aggregations: performance + order

2015-01-14 Thread Tobias Pfeiffer
Sean,

thanks for your message.

On Wed, Jan 14, 2015 at 8:36 PM, Sean Owen so...@cloudera.com wrote:

 On Wed, Jan 14, 2015 at 4:53 AM, Tobias Pfeiffer t...@preferred.jp wrote:
  OK, it seems like even on a local machine (with no network overhead), the
  groupByKey version is about 5 times slower than any of the other
  (reduceByKey, combineByKey etc.) functions...

 Even without network overhead, you're still paying the cost of setting
 up the shuffle and serialization.


Can I pick an appropriate scheduler some time before so that Spark knows
all items with the same key are on the same host? (Or enforce this?)

Thanks
Tobias


Re:

2015-01-14 Thread Tobias Pfeiffer
Hi,

On Thu, Jan 15, 2015 at 12:23 AM, Ted Yu yuzhih...@gmail.com wrote:

 On Wed, Jan 14, 2015 at 6:58 AM, Jianguo Li flyingfromch...@gmail.com
 wrote:

 I am using Spark-1.1.1. When I used sbt test, I ran into the following 
 exceptions. Any idea how to solve it? Thanks! I think somebody posted this 
 question before, but no one seemed to have answered it. Could it be the 
 version of io.netty I put in my build.sbt? I included an dependency 
 libraryDependencies += io.netty % netty % 3.6.6.Final in my build.sbt 
 file.

 From my personal experience, netty dependencies are very painful to get
right with Spark. I recommend to look at the dependency tree using 
https://github.com/jrudolph/sbt-dependency-graph and then fine-tune your
sbt ignores until it works. There are too many issues depending on what
other packages you use to give a general advice, I'm afraid.

And once you have them right and use `sbt assembly` to build your
application jar and want to run it on a cluster with spark-submit, you'll
find that the netty version bundled with Spark will be put on the classpath
before the version you want to use. It seems like there are various  Spark
configuration options to change this,

http://apache-spark-user-list.1001560.n3.nabble.com/netty-on-classpath-when-using-spark-submit-td18030.html
and a unification process is running, I think:
  https://issues.apache.org/jira/browse/SPARK-2996
  https://github.com/apache/spark/pull/3233

I'm also looking forward to this one, as I am stuck with an ancient version
of Finagle due to these Netty issues.

Good luck,
Tobias


Serializability: for vs. while loops

2015-01-14 Thread Tobias Pfeiffer
Hi,

sorry, I don't like questions about serializability myself, but still...

Can anyone give me a hint why

  for (i - 0 to (maxId - 1)) {  ...  }

throws a NotSerializableException in the loop body while

  var i = 0
  while (i  maxId) {
// same code as in the for loop
i += 1
  }

works fine? I guess there is something fundamentally different in the way
Scala realizes for loops?

Thanks
Tobias


Re: quickly counting the number of rows in a partition?

2015-01-13 Thread Tobias Pfeiffer
Hi again,

On Wed, Jan 14, 2015 at 10:06 AM, Tobias Pfeiffer t...@preferred.jp wrote:

 If you think of
 items.map(x = /* throw exception */).count()
 then even though the count you want to get does not necessarily require
 the evaluation of the function in map() (i.e., the number is the same), you
 may not want to get the count if that code actually fails.


Sorry, I think that was a bit confusing. What I mean is: You have to
compute the whole RDD in order to give a meaningful count() result (whether
you use rdd.count() or the mapPartitions() approach).

Tobias


Re: *ByKey aggregations: performance + order

2015-01-13 Thread Tobias Pfeiffer
Hi,

On Wed, Jan 14, 2015 at 12:11 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 Now I don't know (yet) if all of the functions I want to compute can be
 expressed in this way and I was wondering about *how much* more expensive
 we are talking about.


OK, it seems like even on a local machine (with no network overhead), the
groupByKey version is about 5 times slower than any of the other
(reduceByKey, combineByKey etc.) functions...

  val rdd = sc.parallelize(1 to 500)
  val withKeys = rdd.zipWithIndex.map(kv = (kv._2/10, kv._1))
  withKeys.cache()
  withKeys.count

  // around 850-1100 ms
  for (i - 1 to 5) yield {
val start = System.currentTimeMillis
withKeys.reduceByKey(_ + _).count()
System.currentTimeMillis - start
  }

  // around 800-1100 ms
  for (i - 1 to 5) yield {
val start = System.currentTimeMillis
withKeys.combineByKey((x: Int) = x, (x: Int, y: Int) = x + y,
  (x: Int, y: Int) = x + y).count()
System.currentTimeMillis - start
  }

  // around 1500-1900 ms
  for (i - 1 to 5) yield {
val start = System.currentTimeMillis
withKeys.foldByKey(0)(_ + _).count()
System.currentTimeMillis - start
  }

  // around 1400-1800 ms
  for (i - 1 to 5) yield {
val start = System.currentTimeMillis
withKeys.aggregateByKey(0)(_ + _, _ + _).count()
System.currentTimeMillis - start
  }

  // around 5500-6200 ms
  for (i - 1 to 5) yield {
val start = System.currentTimeMillis
withKeys.groupByKey().mapValues(_.reduceLeft(_ + _)).count()
System.currentTimeMillis - start
  }

Tobias


*ByKey aggregations: performance + order

2015-01-13 Thread Tobias Pfeiffer
Hi,

I have an RDD[(Long, MyData)] where I want to compute various functions on
lists of MyData items with the same key (this will in general be a rather
short lists, around 10 items per key).

Naturally I was thinking of groupByKey() but was a bit intimidated by the
warning: This operation may be very expensive. If you are grouping in
order to perform an aggregation (such as a sum or average) over each key,
using PairRDDFunctions.aggregateByKey or PairRDDFunctions.reduceByKey will
provide much better performance.

Now I don't know (yet) if all of the functions I want to compute can be
expressed in this way and I was wondering about *how much* more expensive
we are talking about. Say I have something like

  rdd.zipWithIndex.map(kv = (kv._2/10, kv._1)).groupByKey(),

i.e. items that will be grouped will 99% live in the same partition (do
they?), does this change the performance?

Also, if my operations depend on the order in the original RDD (say, string
concatenation), how could I make sure the order of the original RDD is
respected?

Thanks
Tobias


Re: quickly counting the number of rows in a partition?

2015-01-13 Thread Tobias Pfeiffer
Hi,

On Mon, Jan 12, 2015 at 8:09 PM, Ganelin, Ilya ilya.gane...@capitalone.com
 wrote:

 Use the mapPartitions function. It returns an iterator to each partition.
 Then just get that length by converting to an array.


On Tue, Jan 13, 2015 at 2:50 PM, Kevin Burton bur...@spinn3r.com wrote:

 Doesn’t that just read in all the values?  The count isn’t pre-computed?
 It’s not the end of the world if it’s not but would be faster.


Well, converting to an array may not work due to memory constraints,
counting the items in the iterator may be better. However, there is no
pre-computed value. For counting, you need to compute all values in the
RDD, in general. If you think of

items.map(x = /* throw exception */).count()

then even though the count you want to get does not necessarily require the
evaluation of the function in map() (i.e., the number is the same), you may
not want to get the count if that code actually fails.

Tobias


Re: RDD Moving Average

2015-01-08 Thread Tobias Pfeiffer
Hi,

On Wed, Jan 7, 2015 at 9:47 AM, Asim Jalis asimja...@gmail.com wrote:

 One approach I was considering was to use mapPartitions. It is
 straightforward to compute the moving average over a partition, except for
 near the end point. Does anyone see how to fix that?


Well, I guess this is not a perfect use case for mapPartitions, in
particular since you would have to implement the behavior near the
beginning and end of a partition yourself. I would rather go with the
high-level RDD functions that are partition-independent.

By the way, I am now also trying to implement sliding windows based on
count and embedded timestamp... seems like I should have had a look at
rdd.sliding() before...

Tobias


Re: Create DStream consisting of HDFS and (then) Kafka data

2015-01-07 Thread Tobias Pfeiffer
Hi,

On Thu, Jan 8, 2015 at 2:19 PM, rekt...@voodoowarez.com wrote:

 dstream processing bulk HDFS data- is something I don't feel is super

well socialized yet,  fingers crossed that base gets built up a little
 more.


Just out of interest (and hoping not to hijack my own thread), why are you
not doing plain RDD processing when you are only processing HDFS data?
What's the advantage of doing DStream?

Thanks
Tobias


Create DStream consisting of HDFS and (then) Kafka data

2015-01-07 Thread Tobias Pfeiffer
Hi,

I have a setup where data from an external stream is piped into Kafka and
also written to HDFS periodically for long-term storage. Now I am trying to
build an application that will first process the HDFS files and then switch
to Kafka, continuing with the first item that was not yet in HDFS. (The
items have an increasing timestamp that I can use to find the first item
not yet processed.) I am wondering what an elegant method to provide a
unified view of the data would be.

Currently, I am using two StreamingContexts one after another:
 - start one StreamingContext A and process all data found in HDFS
(updating the largest seen timestamp in an accumulator), stopping when
there was an RDD with 0 items in it,
 - stop that StreamingContext A,
 - start a new StreamingContext B and process the Kafka stream (filtering
out all items with a timestamp smaller than the value in the accumulator),
 - stop when the user requests it.

This *works* as it is now, but I was planning to add sliding windows (based
on item counts or the timestamps in the data), which will get unmanageably
complicated when I have a window spanning data in both HDFS and Kafka, I
guess. Therefore I would like to have a single DStream that is fed with
first HDFS and then Kafka data. Does anyone have a suggestion on how to
realize that (with as few copying of private[spark] classes as possible)? I
guess one issue is that the Kafka processing requires a receiver and
therefore needs to be treated quite a bit differently than HDFS?

Thanks
Tobias


Re: I think I am almost lost in the internals of Spark

2015-01-06 Thread Tobias Pfeiffer
Hi,

On Tue, Jan 6, 2015 at 11:24 PM, Todd bit1...@163.com wrote:

 I am a bit new to Spark, except that I tried simple things like word
 count, and the examples given in the spark sql programming guide.
 Now, I am investigating the internals of Spark, but I think I am almost
 lost, because I could not grasp a whole picture what spark does when it
 executes the word count.


I recommend understanding what an RDD is and how it is processed, using

http://spark.apache.org/docs/latest/programming-guide.html#resilient-distributed-datasets-rdds
and probably also
  http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf
  (once the server is back).
Understanding how an RDD is processed is probably most helpful to
understand the whole of Spark.

Tobias


Re: How to replace user.id to user.names in a file

2015-01-06 Thread Tobias Pfeiffer
Hi,

it looks to me as if you need the whole user database on every node, so
maybe put the id-name information as a Map[Id, String] in a broadcast
variable and then do something like

recommendations.map(line = {
  line.map(uid = usernames(uid))
})

or so?

Tobias


Re: How to replace user.id to user.names in a file

2015-01-06 Thread Tobias Pfeiffer
Hi,

On Wed, Jan 7, 2015 at 10:47 AM, Riginos Samaras samarasrigi...@gmail.com
wrote:

 Yes something like this. Can you please give me an example to create a Map?


That depends heavily on the shape of your input file. What about something
like:

(for (line - Source.fromFile(filename).getLines()) {
  val items = line.trim.split( )
  (items(0).toInt, items(1))
}).toMap

Tobias


Re: How to replace user.id to user.names in a file

2015-01-06 Thread Tobias Pfeiffer
Hi,

On Wed, Jan 7, 2015 at 11:13 AM, Riginos Samaras samarasrigi...@gmail.com
wrote:

 exactly thats what I'm looking for, my code is like this:
 //code

 val users_map = users_file.map{ s =

 val parts = s.split(,)

 (parts(0).toInt, parts(1))

 }.distinct

 //code


 but i get the error:

 error: value toMap is not a member of org.apache.spark.rdd.RDD[(Int,
 String)]

   user_map.toMap

If you want to distribute the Map as a broadcast variable, it must not be
an RDD but a normal Scala map. Make your users_file a regular List, then it
should work.

Tobias


Re: Add StructType column to SchemaRDD

2015-01-05 Thread Tobias Pfeiffer
Hi Michael,

On Tue, Jan 6, 2015 at 3:43 PM, Michael Armbrust mich...@databricks.com
wrote:

 Oh sorry, I'm rereading your email more carefully.  Its only because you
 have some setup code that you want to amortize?


Yes, exactly that.

Concerning the docs, I'd be happy to contribute, but I don't really
understand what is happening here and why ;-)

Thanks
Tobias


Add StructType column to SchemaRDD

2015-01-05 Thread Tobias Pfeiffer
Hi,

I have a SchemaRDD where I want to add a column with a value that is
computed from the rest of the row. As the computation involves a
network operation and requires setup code, I can't use
  SELECT *, myUDF(*) FROM rdd,
but I wanted to use a combination of:

 - get schema of input SchemaRDD
 - issue a mapPartitions call (including the setup code), obtaining a
   new RDD[Row]
 - extend the schema manually
 - create a new RDD by combining the RDD[Row] with the extended
   schema.

This works very well, but I run into trouble querying that resulting
SchemaRDD with SQL if:

 - the result of my computation is a case class
 - and I want to use values in this case class in the SQL query.

In particular, while

  SELECT column FROM resultrdd

works well,

  SELECT column.key_name FROM resultrdd

gives a

  java.lang.ClassCastException: example.MyCaseClass cannot be cast to
org.apache.spark.sql.catalyst.expressions.Row

Here is an example to illustrate that:

---

import org.apache.spark._import org.apache.spark.sql._import
org.apache.spark.sql.catalyst.types._

val sc = new SparkContext(local[3], Test)
val sqlc = new SQLContext(sc)import sqlc._
// this is the case class that my operation is returningcase class
Result(string_values: Map[String, String],
num_values: Map[String, Double])// dummy result dataval data =
(Result(Map(team - a), Map(score - 0.8)) ::
Result(Map(team - b), Map(score - 0.5)) :: Nil)val rdd =
sc.parallelize(data)// simulate my computation by creating an RDD[Row]
and creating// a schema programmaticallyval rowRdd = rdd.map(dr =
Row.fromSeq(7 :: dr :: Nil))val progSchema =
StructType(StructField(hello, IntegerType, false) ::
   StructField(newcol, rdd.schema, true) :: Nil)val progRdd
= sqlc.applySchema(rowRdd,
progSchema)progRdd.registerTempTable(progrdd)// the following call
will *fail* with a ClassCastExceptionsqlc.sql(SELECT
newcol.string_values['team'] FROM progrdd).foreach(println)//
however, the schema I specified is correct. see how embedding// my
result in a proper case class works:case class ResultContainer(hello:
Int, newcol: Result)val caseClassRdd = rdd.map(dr =
ResultContainer(7,
dr))caseClassRdd.registerTempTable(caseclassrdd)// the following
call will *work*sqlc.sql(SELECT newcol.string_values['team'] FROM
caseclassrdd).foreach(println)// even though the schema for both RDDs
is the same:progRdd.schema == caseClassRdd.schema


---

It turns out that I cannot use the case class directly, but I have to
convert it to a Row as well. That is, instead of

  val rowRdd = rdd.map(dr = Row.fromSeq(7 :: dr :: Nil))

I have to use

  val rowRdd = rdd.map(dr = Row.fromSeq(7 ::
Row.fromSeq(dr.productIterator.toSeq) :: Nil))

and then, I can use

  SELECT newcol.string_values['team'] FROM progrdd

So now I found that out and I'm happy that it works, but it was quite
hard to track it down, so I was wondering if this is the most
intuitive way to add a column to a SchemaRDD using mapPartitions (as
opposed to using a UDF, where the conversion case class - Row
seems to happen automatically).

Or, even if there is no more intuitive way, just wanted to have this
documented ;-)

Thanks
Tobias


Re: unable to do group by with 1st column

2014-12-25 Thread Tobias Pfeiffer
Hi,

On Fri, Dec 26, 2014 at 5:22 AM, Amit Behera amit.bd...@gmail.com wrote:

 How can I do it? Please help me to do.


Have you considered using groupByKey?
http://spark.apache.org/docs/latest/programming-guide.html#transformations

Tobias


Re: serialization issue with mapPartitions

2014-12-25 Thread Tobias Pfeiffer
Hi,

On Fri, Dec 26, 2014 at 1:32 AM, ey-chih chow eyc...@hotmail.com wrote:

 I got some issues with mapPartitions with the following piece of code:

 val sessions = sc
   .newAPIHadoopFile(
 ... path to an avro file ...,
 classOf[org.apache.avro.mapreduce.AvroKeyInputFormat[ByteBuffer]],
 classOf[AvroKey[ByteBuffer]],
 classOf[NullWritable],
 job.getConfiguration())
   .mapPartitions { valueIterator =
 val config = job.getConfiguration()
  .
   }
   .collect()

 Why job.getConfiguration() in the function mapPartitions will generate the
 following message?

 Cause: java.io.NotSerializableException: org.apache.hadoop.mapreduce.Job


The functions inside mapPartitions() will be executed on the Spark
executors, not the Spark driver. Therefore, the function body needs to be
serialized and sent to the executors via network. If that is not possible
(in your case, `job` cannot be serialized), you will get a
NotSerializableException. It works inside newAPIHadoopFile because this is
executed on the driver.

Tobias


Re: Discourse: A proposed alternative to the Spark User list

2014-12-25 Thread Tobias Pfeiffer
Nick,

uh, I would have expected a rather heated discussion, but the opposite
seems to be the case ;-)

Independent of my personal preferences w.r.t. usability, habits etc., I
think it is not good for a software/tool/framework if questions and
discussions are spread over too many places. I guess everyone of us knows
an example of where this makes/has made it very hard for newcomers to get
started ;-)

As it is now, I think the mailing list has somewhat of an official touch,
while Stack Overflow is, well, Stack Overflow ;-) To introduce another
discussion platform next to the mailing list (your proposal (2.)) would
increase confusion, the number of double-postings and, as you said,
effectively fork the community. Your proposal (1.) sounds attractive, but I
highly doubt that the user experience can match people's expectations
towards the pure solution on either the mailing list or Discourse, given
the rather different discussion styles.

Having said that, I totally agree to the points you mentioned; even just
linking to a thread where a question has been discussed before is very
time-consuming and I would be happy to use a platform where all those
points are addressed. Stack Overflow seems to provide that, too, and except
for the broader range of discussions you mentioned, I don't see the
benefit of using Discourse over Stack Overflow. So personally, I would
suggest to go with (3.) and encourage SO as a platform for questions that
are ok to be asked there and try to reduce/focus mailing list communication
for everything else. (Note that this is pretty much the same state as now
plus encouraging people in an unspecified way, which means that maybe
nothing changes at all.)

Just my 2 cent,
Tobias


On Wed Dec 24 2014 at 21:50:48 Nick Chammas nicholas.cham...@gmail.com
 wrote:

 When people have questions about Spark, there are 2 main places (as far
 as I can tell) where they ask them:

- Stack Overflow, under the apache-spark tag
http://stackoverflow.com/questions/tagged/apache-spark
- This mailing list

 The mailing list is valuable as an independent place for discussion that
 is part of the Spark project itself. Furthermore, it allows for a broader
 range of discussions than would be allowed on Stack Overflow
 http://stackoverflow.com/help/dont-ask.

 As the Spark project has grown in popularity, I see that a few problems
 have emerged with this mailing list:

- It’s hard to follow topics (e.g. Streaming vs. SQL) that you’re
interested in, and it’s hard to know when someone has mentioned you
specifically.
- It’s hard to search for existing threads and link information
across disparate threads.
- It’s hard to format code and log snippets nicely, and by extension,
hard to read other people’s posts with this kind of information.

 There are existing solutions to all these (and other) problems based
 around straight-up discipline or client-side tooling, which users have to
 conjure up for themselves.

 I’d like us as a community to consider using Discourse
 http://www.discourse.org/ as an alternative to, or overlay on top of,
 this mailing list, that provides better out-of-the-box solutions to these
 problems.

 Discourse is a modern discussion platform built by some of the same
 people who created Stack Overflow. It has many neat features
 http://v1.discourse.org/about/ that I believe this community would
 benefit from.

 For example:

- When a user starts typing up a new post, they get a panel *showing
existing conversations that look similar*, just like on Stack
Overflow.
- It’s easy to search for posts and link between them.
- *Markdown support* is built-in to composer.
- You can *specifically mention people* and they will be notified.
- Posts can be categorized (e.g. Streaming, SQL, etc.).
- There is a built-in option for mailing list support which forwards
all activity on the forum to a user’s email address and which allows for
creation of new posts via email.

 What do you think of Discourse as an alternative, more manageable way to
 discus Spark?

 There are a few options we can consider:

1. Work with the ASF as well as the Discourse team to allow Discourse
to act as an overlay on top of this mailing list

 https://meta.discourse.org/t/discourse-as-a-front-end-for-existing-asf-mailing-lists/23167?u=nicholaschammas,
allowing people to continue to use the mailing list as-is if they want.
(This is the toughest but perhaps most attractive option.)
2. Create a new Discourse forum for Spark that is not bound to this
user list. This is relatively easy but will effectively fork the community
on this list. (We cannot shut down this mailing in favor of one managed by
Discourse.)
3. Don’t use Discourse. Just encourage people on this list to post
instead on Stack Overflow whenever possible.
4. Something else.

 What does everyone think?

 Nick




Re: serialization issue with mapPartitions

2014-12-25 Thread Tobias Pfeiffer
Hi,

On Fri, Dec 26, 2014 at 10:13 AM, ey-chih chow eyc...@hotmail.com wrote:

 I should rephrase my question as follows:

 How to use the corresponding Hadoop Configuration of a HadoopRDD in
 defining
 a function as an input parameter to the MapPartitions function?


Well, you could try to pull the `val config = job.getConfiguration()` out
of the function and just use `config` inside the function, hoping that this
one is serializable.

Tobias


Re: SchemaRDD to RDD[String]

2014-12-24 Thread Tobias Pfeiffer
Hi,

On Wed, Dec 24, 2014 at 3:18 PM, Hafiz Mujadid hafizmujadi...@gmail.com
wrote:

 I want to convert a schemaRDD into RDD of String. How can we do that?

 Currently I am doing like this which is not converting correctly no
 exception but resultant strings are empty

 here is my code


Hehe, this is the most Java-ish Scala code I have ever seen ;-)

Having said that, are you sure that your rows are not empty? The code looks
correct to me, actually.

Tobias


Re: How to run an action and get output?

2014-12-23 Thread Tobias Pfeiffer
Hi,

On Fri, Dec 19, 2014 at 6:53 PM, Ashic Mahtab as...@live.com wrote:

 val doSomething(entry:SomeEntry, session:Session) : SomeOutput = {
 val result = session.SomeOp(entry)
 SomeOutput(entry.Key, result.SomeProp)
 }

 I could use a transformation for rdd.map, but in case of failures, the map
 would run on another executor for the same rdd. I could do rdd.foreach, but
 that returns unit. Is there something like a foreach that can return values?


I think `map()` is pretty much `foreach()` that can return values. If you
want to prevent re-execution on errors, wrap the whole thing in a
scala.util.Try{} block or something.

rdd.map(item = {
  Try{ ... }
}).flatMap(_ match {
  case Success(something) = Some(something)
  case Failure(e) = None
})

or so.

Tobias


Semantics of foreachPartition()

2014-12-18 Thread Tobias Pfeiffer
Hi,

I have the following code in my application:

tmpRdd.foreach(item = {
  println(abc:  + item)
})
tmpRdd.foreachPartition(iter = {
  iter.map(item = {
println(xyz:  + item)
  })
})

In the output, I see only the abc prints (i.e. from the foreach() call).
(The result is the same also if I exchange the order.) What exactly is the
meaning of foreachPartition and how would I use it correctly?

Thanks
Tobias


Re: Semantics of foreachPartition()

2014-12-18 Thread Tobias Pfeiffer
Hi again,

On Thu, Dec 18, 2014 at 6:43 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 tmpRdd.foreachPartition(iter = {
   iter.map(item = {
 println(xyz:  + item)
   })
 })


Uh, with iter.foreach(...) it works... the reason being apparently that
iter.map() returns itself an iterator, is thus evaluated lazily (in this
case: never), while iter.foreach() is evaluated immediately.

Thanks
Tobias


Re: spark streaming kafa best practices ?

2014-12-17 Thread Tobias Pfeiffer
Hi,

On Thu, Dec 18, 2014 at 3:08 AM, Patrick Wendell pwend...@gmail.com wrote:

 On Wed, Dec 17, 2014 at 5:43 AM, Gerard Maas gerard.m...@gmail.com
 wrote:
  I was wondering why one would choose for rdd.map vs rdd.foreach to
 execute a
  side-effecting function on an RDD.


Personally, I like to get the count of processed items, so I do something
like
  rdd.map(item = processItem(item)).count()
instead of
  rdd.foreach(item = processItem(item))
but I would be happy to learn about a better way.

Tobias


Re: Spark SQL DSL for joins?

2014-12-16 Thread Tobias Pfeiffer
Jerry,

On Wed, Dec 17, 2014 at 3:35 PM, Jerry Raj jerry@gmail.com wrote:

 Another problem with the DSL:

 t1.where('term == dmin).count() returns zero.


Looks like you need ===:
https://spark.apache.org/docs/1.1.0/api/scala/index.html#org.apache.spark.sql.SchemaRDD

Tobias


Re: Running spark-submit from a remote machine using a YARN application

2014-12-14 Thread Tobias Pfeiffer
Hi,

On Fri, Dec 12, 2014 at 7:01 AM, ryaminal tacmot...@gmail.com wrote:

 Now our solution is to make a very simply YARN application which execustes
 as its command spark-submit --master yarn-cluster
 s3n://application/jar.jar
  This seemed so simple and elegant, but it has some weird issues. We
 get NoClassDefFoundErrors. When we ssh to the box, run the same
 spark-submit command it works, but doing this through YARN leads in the
 NoClassDefFoundErrors mentioned.


I do something similar, I start Spark using spark-submit from a non-Spark
server application. Make sure that HADOOP_CONF_DIR is set correctly when
running spark-submit from your program so that the YARN configuration can
be found correctly.

Also, keep in mind that some parameters to spark-submit have a different
behavior when using yarn-cluster vs. local[*] master. For example, system
properties set using `--conf` will be available in your Spark application
only in local[*] mode, for YARN you need to wrap them with `--conf
spark.executor.extraJavaOptions=...`.

Tobias


Re: Adding a column to a SchemaRDD

2014-12-14 Thread Tobias Pfeiffer
Nathan,

On Fri, Dec 12, 2014 at 3:11 PM, Nathan Kronenfeld 
nkronenf...@oculusinfo.com wrote:

 I can see how to do it if can express the added values in SQL - just run
 SELECT *,valueCalculation AS newColumnName FROM table

 I've been searching all over for how to do this if my added value is a
 scala function, with no luck.

 Let's say I have a SchemaRDD with columns A, B, and C, and I want to add a
 new column, D, calculated using Utility.process(b, c), and I want (of
 course) to pass in the value B and C from each row, ending up with a new
 SchemaRDD with columns A, B, C, and D.
 nkronenf...@oculusinfo.com


I guess you would have to do two things:
- schemardd.map(row = { extend the row here })
  which will give you a plain RDD[Row] without a schema
- take the schema from the schemardd and extend it manually by the name and
type of the newly added column,
- create a new SchemaRDD from your mapped RDD and the manually extended
schema.

Does that make sense?

Tobias


Count-based windows

2014-12-08 Thread Tobias Pfeiffer
Hi,

I am interested in building an application that uses sliding windows not
based on the time when the item was received, but on either
* a timestamp embedded in the data, or
* a count (like: every 10 items, look at the last 100 items).

Also, I want to do this on stream data received from Kafka, but also on
HDFS data (where clearly the aspect received in is not present). I found 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-windowing-Driven-by-absolutely-time-td1733.html#a1843
as an instruction for how to use the timestamp, but does anyone have a
suggestion on how to use item count as window size constraint?

Thanks
Tobias


Re: spark-submit on YARN is slow

2014-12-08 Thread Tobias Pfeiffer
Hi,

On Tue, Dec 9, 2014 at 4:39 AM, Sandy Ryza sandy.r...@cloudera.com wrote:

 Can you try using the YARN Fair Scheduler and set
 yarn.scheduler.fair.continuous-scheduling-enabled to true?


I'm using Cloudera 5.2.0 and my configuration says
  yarn.resourcemanager.scheduler.class =
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler
yarn.scheduler.fair.continuous-scheduling-enabled = true
by default. Changing to a different Scheduler doesn't really change
anything, from ACCEPTED to RUNNING always takes about 10 seconds.

Thanks
Tobias


Re: spark-submit on YARN is slow

2014-12-07 Thread Tobias Pfeiffer
Hi,

thanks for your responses!

On Sat, Dec 6, 2014 at 4:22 AM, Sandy Ryza sandy.r...@cloudera.com wrote:

 What version are you using?  In some recent versions, we had a couple of
 large hardcoded sleeps on the Spark side.


I am using Spark 1.1.1.

As Andrew mentioned, I guess most of the 10 seconds waiting time probably
comes from YARN itself. (Other YARN applications also take a while to start
up.) I'm just really puzzled about what exactly takes so long there... for
a job that runs an hour or so, that is of course negligible, but I am
starting up an instance per client to do interactive job processing *for
this client*, and it feels like yeah, thanks for logging in, now please
wait a while until you can actually use the program, that's a bit
suboptimal.

Tobias


Re: Market Basket Analysis

2014-12-04 Thread Tobias Pfeiffer
Hi,

On Thu, Dec 4, 2014 at 11:58 PM, Rohit Pujari rpuj...@hortonworks.com
wrote:

 I'd like to do market basket analysis using spark, what're my options?


To do it or not to do it ;-)

Seriously, could you elaborate a bit on what you want to know?

Tobias


Re: Stateful mapPartitions

2014-12-04 Thread Tobias Pfeiffer
Hi,

On Fri, Dec 5, 2014 at 3:56 AM, Akshat Aranya aara...@gmail.com wrote:

 Is it possible to have some state across multiple calls to mapPartitions
 on each partition, for instance, if I want to keep a database connection
 open?


If you're using Scala, you can use a singleton object, this will exist once
per JVM (i.e., once per executor), like

object DatabaseConnector {
  lazy val conn = ...
}

Please be aware that shutting down the connection is much harder than
opening it, because you basically have no idea when processing is done for
an executor, AFAIK.

Tobias


Re: SPARK LIMITATION - more than one case class is not allowed !!

2014-12-04 Thread Tobias Pfeiffer
On Fri, Dec 5, 2014 at 12:53 PM, Rahul Bindlish 
rahul.bindl...@nectechnologies.in wrote:

 Is it a limitation that spark does not support more than one case class at
 a
 time.


What do you mean? I do not have the slightest idea what you *could*
possibly mean by to support a case class.

Tobias


Re: SPARK LIMITATION - more than one case class is not allowed !!

2014-12-04 Thread Tobias Pfeiffer
Rahul,

On Fri, Dec 5, 2014 at 1:29 PM, Rahul Bindlish 
rahul.bindl...@nectechnologies.in wrote:

 I have created  objectfiles [person_obj,office_obj] from
 csv[person_csv,office_csv] files using case classes[person,office] with API
 (saveAsObjectFile)

 Now I restarted spark-shell and load objectfiles using API(objectFile).

 *Once any of one object-class is loaded successfully, rest of object-class
 gives serialization error.*


I have not used saveAsObjectFile, but I think that if you define your case
classes in the spark-shell and serialized the objects, and then you restart
the spark-shell, the *classes* (structure, names etc.) will not be known to
the JVM any more. So if you try to restore the *objects* from a file, the
JVM may fail in restoring them, because there is no class it could create
objects of. Just a guess. Try to write a Scala program, compile it and see
if it still fails when executed.

Tobias


Re: SPARK LIMITATION - more than one case class is not allowed !!

2014-12-04 Thread Tobias Pfeiffer
Rahul,

On Fri, Dec 5, 2014 at 2:50 PM, Rahul Bindlish 
rahul.bindl...@nectechnologies.in wrote:

 I have done so thats why spark is able to load objectfile [e.g. person_obj]
 and spark has maintained serialVersionUID [person_obj].

 Next time when I am trying to load another objectfile [e.g. office_obj] and
 I think spark is matching serialVersionUID [person_obj] with previous
 serialVersionUID [person_obj] and giving mismatch error.

 In my first post, I have give statements which can be executed easily to
 replicate this issue.


Can you post the Scala source for your case classes? I have tried the
following in spark-shell:

case class Dog(name: String)
case class Cat(age: Int)
val dogs = sc.parallelize(Dog(foo) :: Dog(bar) :: Nil)
val cats = sc.parallelize(Cat(1) :: Cat(2) :: Nil)
dogs.saveAsObjectFile(test_dogs)
cats.saveAsObjectFile(test_cats)

This gives two directories test_dogs/ and test_cats/. Then I restarted
spark-shell and entered:

case class Dog(name: String)
case class Cat(age: Int)
val dogs = sc.objectFile(test_dogs)
val cats = sc.objectFile(test_cats)

I don't get an exception, but:

dogs: org.apache.spark.rdd.RDD[Nothing] = FlatMappedRDD[1] at objectFile at
console:12

Trying to access the elements of the RDD gave:

scala dogs.collect()
14/12/05 15:08:58 INFO FileInputFormat: Total input paths to process : 8
...
org.apache.spark.SparkDriverExecutionException: Execution error
at
org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:980)
...
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ArrayStoreException: [Ljava.lang.Object;
at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88)
at
org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1129)
...
org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:976)
... 10 more

So even in the simplest of cases, this doesn't work for me in the
spark-shell, but with a different error. I guess we need to see more of
your code to help.

Tobias


Re: SPARK LIMITATION - more than one case class is not allowed !!

2014-12-04 Thread Tobias Pfeiffer
Rahul,

On Fri, Dec 5, 2014 at 3:51 PM, Rahul Bindlish 
rahul.bindl...@nectechnologies.in wrote:

 1. Copy csv files in current directory.
 2. Open spark-shell from this directory.
 3. Run one_scala file which will create object-files from csv-files in
 current directory.
 4. Restart spark-shell
 5. a. Run two_scala file, while running it is giving error during loading
 of office_csv
 b. If we edit two_scala file by below contents

 ---
 case class person(id: Int, name: String, fathername: String, officeid: Int)
 case class office(id: Int, name: String, landmark: String, areacode:
 String)
 sc.objectFile[office](office_obj).count
 sc.objectFile[person](person_obj).count

 
 while running it is giving error during loading of person_csv


One good news is: I can reproduce the error you see.

Another good news is: I can tell you how to fix this. In your one.scala
file, define all case classes *before* you use saveAsObjectFile() for the
first time. With
  case class person(id: Int, name: String, fathername: String, officeid:
Int)
  case class office(id: Int, name: String, landmark: String, areacode:
String)
  val baseperson =
sc.textFile(person_csv)saveAsObjectFile(person_obj)
  val baseoffice =
sc.textFile(office_csv)saveAsObjectFile(office_obj)
I can deserialize the obj files (in any order).

The bad news is: I have no idea about the reason for this. I blame it on
the REPL/shell and assume it would not happen for a compiled application.

Tobias


Does count() evaluate all mapped functions?

2014-12-03 Thread Tobias Pfeiffer
Hi,

I have an RDD and a function that should be called on every item in this
RDD once (say it updates an external database). So far, I used
  rdd.map(myFunction).count()
or
  rdd.mapPartitions(iter = iter.map(myFunction))
but I am wondering if this always triggers the call of myFunction in both
cases. Actually, in the first case, the count() will be the same whether or
not myFunction is called for each element, so I was just wondering if I can
rely on count() evaluating the whole pipeline including functions that
cannot change the count.

Thanks
Tobias


Re: Spark SQL UDF returning a list?

2014-12-03 Thread Tobias Pfeiffer
Hi,

On Wed, Dec 3, 2014 at 4:31 PM, Jerry Raj jerry@gmail.com wrote:

 Exception in thread main java.lang.RuntimeException: [1.57] failure:
 ``('' expected but identifier myudf found

 I also tried returning a List of Ints, that did not work either. Is there
 a way to write a UDF that returns a list?


You seem to be hitting a parser limitation before your function is even
called. The message you are seeing is saying there must be an opening
bracket here, and I am afraid you won't get around this whatever function
you write... (maybe the HiveContext provides a possibility, though).

Tobias


Re: textFileStream() issue?

2014-12-03 Thread Tobias Pfeiffer
Hi,

On Wed, Dec 3, 2014 at 5:31 PM, Bahubali Jain bahub...@gmail.com wrote:

 I am trying to use textFileStream(some_hdfs_location) to pick new files
 from a HDFS location.I am seeing a pretty strange behavior though.
 textFileStream() is not detecting new files when I move them from a
 location with in hdfs to location at which textFileStream() is checking for
 new files.
 But when I copy files from a location in linux filesystem to hdfs then the
 textFileStream is detecting the new files.


Is it possible that the timestamp of the moved files is actually older than
the ones of previously processed files? I think only new files are picked
up. Try moving the file and set the timestamp to now() to see if it makes a
difference.

Tobias


Re: Best way to have some singleton per worker

2014-12-03 Thread Tobias Pfeiffer
Hi,

On Thu, Dec 4, 2014 at 2:59 AM, Ashic Mahtab as...@live.com wrote:

 I've been doing this with foreachPartition (i.e. have the parameters for
 creating the singleton outside the loop, do a foreachPartition, create the
 instance, loop over entries in the partition, close the partition), but
 it's quite cludgy. Is there a pattern by which I can have an instance of
 something nonserializable on each worker?


I think the pattern you describe is the standard way of doing this, several
people on this list (including me) have used it for database access etc.

Tobias


spark-submit on YARN is slow

2014-12-03 Thread Tobias Pfeiffer
Hi,

I am using spark-submit to submit my application to YARN in yarn-cluster
mode. I have both the Spark assembly jar file as well as my application jar
file put in HDFS and can see from the logging output that both files are
used from there. However, it still takes about 10 seconds for my
application's yarnAppState to switch from ACCEPTED to RUNNING.

I am aware that this is probably not a Spark issue, but some YARN
configuration setting (or YARN-inherent slowness), I was just wondering if
anyone has an advice for how to speed this up.

Thanks
Tobias


Re: netty on classpath when using spark-submit

2014-12-03 Thread Tobias Pfeiffer
Markus,

On Tue, Nov 11, 2014 at 10:40 AM, M. Dale medal...@yahoo.com wrote:

   I never tried to use this property. I was hoping someone else would jump
 in. When I saw your original question I remembered that Hadoop has
 something similar. So I searched and found the link below. A quick JIRA
 search seems to indicate that there is another property:

 https://issues.apache.org/jira/browse/SPARK-2996

 Maybe that property will work with yarn: spark.yarn.user.classpath.first


Thank you very much! That property does in fact load the classes from my
jar file first when running on YARN, great!

However, in local[N] mode, neither that one nor
the spark.files.userClassPathFirst one works. So when using spark-submit
with --master local[3] instead of --master yarn-cluster, the value
for spark.files.userClassPathFirst is displayed correctly, but the classes
are still loaded from the wrong jar...

Tobias


Re: Passing Java Options to Spark AM launching

2014-12-01 Thread Tobias Pfeiffer
Hi,

have a look at the documentation for spark.driver.extraJavaOptions (which
seems to have disappeared since I looked it up last week)
and spark.executor.extraJavaOptions at 
http://spark.apache.org/docs/latest/configuration.html#runtime-environment.

Tobias


Re: kafka pipeline exactly once semantics

2014-11-30 Thread Tobias Pfeiffer
Josh,

On Sun, Nov 30, 2014 at 10:17 PM, Josh J joshjd...@gmail.com wrote:

 I would like to setup a Kafka pipeline whereby I write my data to a single
 topic 1, then I continue to process using spark streaming and write the
 transformed results to topic2, and finally I read the results from topic 2.


Not really related to your question, but you may also want to look into
Samza http://samza.incubator.apache.org/ which was built exactly for this
kind of processing.

Tobias


Re: Determine number of running executors

2014-11-25 Thread Tobias Pfeiffer
Hi,

Thanks for your help!

Sandy, I had a bit of trouble finding the spark.executor.cores property.
(It wasn't there although its value should have been 2.)
I ended up throwing regular expressions
on scala.util.Properties.propOrElse(sun.java.command, ), which worked
surprisingly well ;-)

Thanks
Tobias


Re: Is spark streaming +MlLib for online learning?

2014-11-24 Thread Tobias Pfeiffer
Hi,

On Tue, Nov 25, 2014 at 9:40 AM, Joanne Contact joannenetw...@gmail.com
wrote:

 I seemed to read somewhere that spark is still batch learning, but spark
 streaming could allow online learning.


Spark doesn't do Machine Learning itself, but MLlib does. MLlib currently
can do online learning only for linear regression 
https://spark.apache.org/docs/1.1.0/mllib-linear-methods.html#streaming-linear-regression,
as far as I know.

Tobias


Re: Setup Remote HDFS for Spark

2014-11-24 Thread Tobias Pfeiffer
Hi,

On Sat, Nov 22, 2014 at 12:13 AM, EH eas...@gmail.com wrote:

 Unfortunately whether it is possible to have both Spark and HDFS running on
 the same machine is not under our control.  :(  Right now we have Spark and
 HDFS running in different machines.  In this case, is it still possible to
 hook up a remote HDFS with Spark so that we can use Spark Streaming
 checkpoints?  Thank you for your help.


In my case, after I copied my cluster's core-site.xml, yarn-site.xml, and
hdfs-site.xml to my CLASSPATH, I could access YARN and HDFS remotely
without any problems (read: modulo network/firewall issues), which I was
pretty surprised by myself.

Tobias


spark-submit and logging

2014-11-20 Thread Tobias Pfeiffer
Hi,

I am using spark-submit to submit my application jar to a YARN cluster.  I
want to deliver a single jar file to my users, so I would like to avoid to
tell them also, please put that log4j.xml file somewhere and add that path
to the spark-submit command.

I thought it would be sufficient that my application jar file contains a
log4j.xml file, but that does not seem to be the case.  If I don't add a
log4j.xml file to the classpath before launching spark-submit, the one
bundled with spark will be used -- which has a negative influence on my
program execution.  Is there any way I can tell spark-submit to use the
log4j configuration bundled in my jar file?

Thanks
Tobias


Re: Cannot access data after a join (error: value _1 is not a member of Product with Serializable)

2014-11-19 Thread Tobias Pfeiffer
Hi,

it looks what you are trying to use as a Tuple cannot be inferred to be a
Tuple from the compiler. Try to add type declarations and maybe you will
see where things fail.

Tobias


Re: Spark Streaming with Kafka is failing with Error

2014-11-18 Thread Tobias Pfeiffer
Hi,

do you have some logging backend (log4j, logback) on your classpath? This
seems a bit like there is no particular implementation of the abstract
`log()` method available.

Tobias


Re: Parsing a large XML file using Spark

2014-11-18 Thread Tobias Pfeiffer
Hi,

see https://www.mail-archive.com/dev@spark.apache.org/msg03520.html for one
solution.

One issue with those XML files is that they cannot be processed line by
line in parallel; plus you inherently need shared/global state to parse XML
or check for well-formedness, I think. (Same issue with multi-line JSON, by
the way.)

Tobias


  1   2   3   >