Re: Why are there different "parts" in my CSV?

2015-02-12 Thread Akhil Das
For streaming application, for every batch it will create a new directory
and puts the data in it. If you don't want to have multiple files inside
the directory as part- then you can do a repartition before the saveAs*
call.

messages.repartition(1).saveAsHadoopFiles("hdfs://user/ec2-user/","csv",String.class,
String.class, (Class) TextOutputFormat.class);


Thanks
Best Regards

On Fri, Feb 13, 2015 at 11:59 AM, Su She  wrote:

> Hello Everyone,
>
> I am writing simple word counts to hdfs using
> messages.saveAsHadoopFiles("hdfs://user/ec2-user/","csv",String.class,
> String.class, (Class) TextOutputFormat.class);
>
> 1) However, each 2 seconds I getting a new *directory *that is titled as
> a csv. So i'll have test.csv, which will be a directory that has two files
> inside of it called part-0 and part 1 (something like that). This
> obv makes it very hard for me to read the data stored in the csv files. I
> am wondering if there is a better way to store the JavaPairRecieverDStream
> and JavaPairDStream?
>
> 2) I know there is a copy/merge hadoop api for merging files...can this be
> done inside java? I am not sure the logic behind this api if I am using
> spark streaming which is constantly making new files.
>
> Thanks a lot for the help!
>


Re: Streaming scheduling delay

2015-02-12 Thread Saisai Shao
Yes, you can try it. For example, if you have a cluster of 10 executors, 60
Kafka partitions, you can try to choose 10 receivers * 2 consumer threads,
so each thread will consume 3 partitions ideally, if you increase the
threads to 6, each threads will consume 1 partitions ideally. What I think
importantly is that each executor will have a receiver, so the data will be
distributed to each executor.

If you have a large cluster even number of executors are more than the
Kafka partitions, maybe you need to increase the Kafka partitions to
increase the parallelism, otherwise some of the computation resources may
be idle.

Besides if executors * consumers > Kafka partitions, the left consumers
beyond partition numbers will be idle, each partition could only be
consumed by one consumer.

We have a in house benchmark cluster with such deploy criterion, I'm not
sure if it works for you, you can try it.

Thanks
Saisai

2015-02-13 15:19 GMT+08:00 Tim Smith :

> Hi Saisai,
>
> If I understand correctly, you are suggesting that control parallelism by
> having number of consumers/executors at least 1:1 for number of kafka
> partitions. For example, if I have 50 partitions for a kafka topic then
> either have:
> - 25 or more executors, 25 receivers, each receiver set to 2 consumer
> threads per topic, or,
> - 50 or more executors, 50 receivers, each receiver set to 1 consumer
> thread per topic
>
> Actually, both executors and total consumers can be more than the number
> of kafka partitions (some will probably sit idle).
>
> But do away with dStream partitioning altogether.
>
> Right?
>
> Thanks,
>
> - Tim
>
>
>
>
> On Thu, Feb 12, 2015 at 11:03 PM, Saisai Shao 
> wrote:
>
>> Hi Tim,
>>
>> I think maybe you can try this way:
>>
>> create Receiver per executor and specify thread for each topic large than
>> 1, and the total number of consumer thread will be: total consumer =
>> (receiver number) * (thread number), and make sure this total consumer is
>> less than or equal to Kafka partition number. In this case, I think the
>> parallelism is enough, received blocks are distributed to each executor. So
>> you don't need to repartition to increase the parallelism.
>>
>> Besides for Kafka's high-level API, Kafka partitions may not be equally
>> distributed to all the receivers, so some tasks may process more data than
>> other tasks. another way you can try DirectKafkaInputDStream in Spark 1.3,
>> that will be more balanced because each Kafka partition mapping to Spark
>> partition.
>>
>>
>> Besides "set partition count to 1 for each dStream" means
>> dstream.repartition(1) ? If so I think it will still introduce shuffle and
>> move all the data into one partition.
>>
>> Thanks
>> Saisai
>>
>> 2015-02-13 13:54 GMT+08:00 Tim Smith :
>>
>>> TD - I will try count() and report back. Meanwhile, attached is the
>>> entire driver log that includes the error logs about missing blocks.
>>>
>>> Cody - Let me research a bit about how to do connection pooling. Sorry,
>>> I am not really a programmer. I did see the connection pooling advise in
>>> the Spark Streaming Programming guide as an optimization but wasn't sure
>>> how to implement it. But do you think it will have a significant impact on
>>> performance?
>>>
>>> Saisai - I think, ideally, I'd rather not do any dStream partitioning.
>>> Instead have 1 receiver for each kafka partition (so in this case 23
>>> receivers for 23 kafka partitions) and then have as many or more executors
>>> to handle processing of the dStreams. Right? Trouble is, I tried this
>>> approach and didn't work. Even If I set 23 receivers, and set partition
>>> count to 1 for each dStream (effectively, no stream splitting), my
>>> performance is extremely poor/laggy. Should I modify my code to remove
>>> dStream partitioning altogether and then try setting as many receivers as
>>> kafka partitions?
>>>
>>>
>>>
>>>
>>>
>>> On Thu, Feb 12, 2015 at 9:45 PM, Saisai Shao 
>>> wrote:
>>>
 Hi Tim,

 I think this code will still introduce shuffle even when you call
 repartition on each input stream. Actually this style of implementation
 will generate more jobs (job per each input stream) than union into one
 stream as called DStream.union(), and union normally has no special
 overhead as I understood.

 Also as Cody said, creating Producer per partition could be a potential
 overhead, producer pool or sharing the Producer for one executor might be
 better :).


  // Process stream from each receiver separately
  // (do not attempt to merge all streams and then re-partition,
 this causes un-necessary and high amount of shuffle in the job)
  for (k <- kInStreams)
 {
  // Re-partition stream from each receiver across all
 compute nodes to spread out processing load and allows per partition
 processing
  // and, set persistence level to spill to disk along
>>

Re: Streaming scheduling delay

2015-02-12 Thread Tim Smith
Hi Saisai,

If I understand correctly, you are suggesting that control parallelism by
having number of consumers/executors at least 1:1 for number of kafka
partitions. For example, if I have 50 partitions for a kafka topic then
either have:
- 25 or more executors, 25 receivers, each receiver set to 2 consumer
threads per topic, or,
- 50 or more executors, 50 receivers, each receiver set to 1 consumer
thread per topic

Actually, both executors and total consumers can be more than the number of
kafka partitions (some will probably sit idle).

But do away with dStream partitioning altogether.

Right?

Thanks,

- Tim




On Thu, Feb 12, 2015 at 11:03 PM, Saisai Shao 
wrote:

> Hi Tim,
>
> I think maybe you can try this way:
>
> create Receiver per executor and specify thread for each topic large than
> 1, and the total number of consumer thread will be: total consumer =
> (receiver number) * (thread number), and make sure this total consumer is
> less than or equal to Kafka partition number. In this case, I think the
> parallelism is enough, received blocks are distributed to each executor. So
> you don't need to repartition to increase the parallelism.
>
> Besides for Kafka's high-level API, Kafka partitions may not be equally
> distributed to all the receivers, so some tasks may process more data than
> other tasks. another way you can try DirectKafkaInputDStream in Spark 1.3,
> that will be more balanced because each Kafka partition mapping to Spark
> partition.
>
>
> Besides "set partition count to 1 for each dStream" means
> dstream.repartition(1) ? If so I think it will still introduce shuffle and
> move all the data into one partition.
>
> Thanks
> Saisai
>
> 2015-02-13 13:54 GMT+08:00 Tim Smith :
>
>> TD - I will try count() and report back. Meanwhile, attached is the
>> entire driver log that includes the error logs about missing blocks.
>>
>> Cody - Let me research a bit about how to do connection pooling. Sorry, I
>> am not really a programmer. I did see the connection pooling advise in the
>> Spark Streaming Programming guide as an optimization but wasn't sure how to
>> implement it. But do you think it will have a significant impact on
>> performance?
>>
>> Saisai - I think, ideally, I'd rather not do any dStream partitioning.
>> Instead have 1 receiver for each kafka partition (so in this case 23
>> receivers for 23 kafka partitions) and then have as many or more executors
>> to handle processing of the dStreams. Right? Trouble is, I tried this
>> approach and didn't work. Even If I set 23 receivers, and set partition
>> count to 1 for each dStream (effectively, no stream splitting), my
>> performance is extremely poor/laggy. Should I modify my code to remove
>> dStream partitioning altogether and then try setting as many receivers as
>> kafka partitions?
>>
>>
>>
>>
>>
>> On Thu, Feb 12, 2015 at 9:45 PM, Saisai Shao 
>> wrote:
>>
>>> Hi Tim,
>>>
>>> I think this code will still introduce shuffle even when you call
>>> repartition on each input stream. Actually this style of implementation
>>> will generate more jobs (job per each input stream) than union into one
>>> stream as called DStream.union(), and union normally has no special
>>> overhead as I understood.
>>>
>>> Also as Cody said, creating Producer per partition could be a potential
>>> overhead, producer pool or sharing the Producer for one executor might be
>>> better :).
>>>
>>>
>>>  // Process stream from each receiver separately
>>>  // (do not attempt to merge all streams and then re-partition,
>>> this causes un-necessary and high amount of shuffle in the job)
>>>  for (k <- kInStreams)
>>> {
>>>  // Re-partition stream from each receiver across all
>>> compute nodes to spread out processing load and allows per partition
>>> processing
>>>  // and, set persistence level to spill to disk along
>>> with serialization
>>>  val kInMsgParts =
>>> k.repartition(otherConf("dStreamPartitions").toInt).
>>>
>>> 2015-02-13 13:27 GMT+08:00 Cody Koeninger :
>>>
 outdata.foreachRDD( rdd => rdd.foreachPartition(rec => {

  val writer = new
 KafkaOutputService(otherConf("kafkaProducerTopic").toString, propsMap)

  writer.output(rec)

 }) )


 So this is creating a new kafka producer for every new output
 partition, right?  Have you tried pooling the producers?

 On Thu, Feb 12, 2015 at 10:52 PM, Tim Smith  wrote:

> 1) Yes, if I disable writing out to kafka and replace it with some
> very light weight action is rdd.take(1), the app is stable.
>
> 2) The partitions I spoke of in the previous mail are the number of
> partitions I create from each dStream. But yes, since I do processing and
> writing out, per partition, each dStream partition ends up getting written
> to a kafka partition. Flow is, broadly:
> 5 Spark/Kafka Receivers -> Split each dStream into 30 partition

Re: Streaming scheduling delay

2015-02-12 Thread Saisai Shao
Hi Tim,

I think maybe you can try this way:

create Receiver per executor and specify thread for each topic large than
1, and the total number of consumer thread will be: total consumer =
(receiver number) * (thread number), and make sure this total consumer is
less than or equal to Kafka partition number. In this case, I think the
parallelism is enough, received blocks are distributed to each executor. So
you don't need to repartition to increase the parallelism.

Besides for Kafka's high-level API, Kafka partitions may not be equally
distributed to all the receivers, so some tasks may process more data than
other tasks. another way you can try DirectKafkaInputDStream in Spark 1.3,
that will be more balanced because each Kafka partition mapping to Spark
partition.


Besides "set partition count to 1 for each dStream" means
dstream.repartition(1) ? If so I think it will still introduce shuffle and
move all the data into one partition.

Thanks
Saisai

2015-02-13 13:54 GMT+08:00 Tim Smith :

> TD - I will try count() and report back. Meanwhile, attached is the entire
> driver log that includes the error logs about missing blocks.
>
> Cody - Let me research a bit about how to do connection pooling. Sorry, I
> am not really a programmer. I did see the connection pooling advise in the
> Spark Streaming Programming guide as an optimization but wasn't sure how to
> implement it. But do you think it will have a significant impact on
> performance?
>
> Saisai - I think, ideally, I'd rather not do any dStream partitioning.
> Instead have 1 receiver for each kafka partition (so in this case 23
> receivers for 23 kafka partitions) and then have as many or more executors
> to handle processing of the dStreams. Right? Trouble is, I tried this
> approach and didn't work. Even If I set 23 receivers, and set partition
> count to 1 for each dStream (effectively, no stream splitting), my
> performance is extremely poor/laggy. Should I modify my code to remove
> dStream partitioning altogether and then try setting as many receivers as
> kafka partitions?
>
>
>
>
>
> On Thu, Feb 12, 2015 at 9:45 PM, Saisai Shao 
> wrote:
>
>> Hi Tim,
>>
>> I think this code will still introduce shuffle even when you call
>> repartition on each input stream. Actually this style of implementation
>> will generate more jobs (job per each input stream) than union into one
>> stream as called DStream.union(), and union normally has no special
>> overhead as I understood.
>>
>> Also as Cody said, creating Producer per partition could be a potential
>> overhead, producer pool or sharing the Producer for one executor might be
>> better :).
>>
>>
>>  // Process stream from each receiver separately
>>  // (do not attempt to merge all streams and then re-partition,
>> this causes un-necessary and high amount of shuffle in the job)
>>  for (k <- kInStreams)
>> {
>>  // Re-partition stream from each receiver across all
>> compute nodes to spread out processing load and allows per partition
>> processing
>>  // and, set persistence level to spill to disk along
>> with serialization
>>  val kInMsgParts =
>> k.repartition(otherConf("dStreamPartitions").toInt).
>>
>> 2015-02-13 13:27 GMT+08:00 Cody Koeninger :
>>
>>> outdata.foreachRDD( rdd => rdd.foreachPartition(rec => {
>>>
>>>  val writer = new
>>> KafkaOutputService(otherConf("kafkaProducerTopic").toString, propsMap)
>>>
>>>  writer.output(rec)
>>>
>>> }) )
>>>
>>>
>>> So this is creating a new kafka producer for every new output partition,
>>> right?  Have you tried pooling the producers?
>>>
>>> On Thu, Feb 12, 2015 at 10:52 PM, Tim Smith  wrote:
>>>
 1) Yes, if I disable writing out to kafka and replace it with some very
 light weight action is rdd.take(1), the app is stable.

 2) The partitions I spoke of in the previous mail are the number of
 partitions I create from each dStream. But yes, since I do processing and
 writing out, per partition, each dStream partition ends up getting written
 to a kafka partition. Flow is, broadly:
 5 Spark/Kafka Receivers -> Split each dStream into 30 partitions (150
 partitions) -> Apply some transformation logic to each partition -> write
 out each partition to kafka (kafka has 23 partitions). Let me increase the
 number of partitions on the kafka side and see if that helps.

 Here's what the main block of code looks like (I added persistence
 back):

 val kInStreams = (1 to otherConf("inStreamCount").toInt).map{_ =>
 KafkaUtils.createStream(ssc,otherConf("kafkaConsumerZk").toString,otherConf("kafkaConsumerGroupId").toString,
 Map(otherConf("kafkaConsumerTopic").toString -> 1),
 StorageLevel.MEMORY_AND_DISK_SER) }

 if (!configMap.keySet.isEmpty)
 {
  // Process stream from each receiver separately
  // (do not attempt to merge all streams and then re-partitio

Re: Streaming scheduling delay

2015-02-12 Thread Tim Smith
I replaced the writeToKafka statements with a rdd.count() and sure enough,
I have a stable app with total delay well within the batch window (20
seconds). Here's the total delay lines from the driver log:
15/02/13 06:14:26 INFO JobScheduler: Total delay: 6.521 s for time
142380806 ms (execution: 6.404 s)
15/02/13 06:15:22 INFO JobScheduler: Total delay: 42.396 s for time
142380808 ms (execution: 42.338 s)
15/02/13 06:16:21 INFO JobScheduler: Total delay: 81.879 s for time
142380810 ms (execution: 59.483 s)
15/02/13 06:16:40 INFO JobScheduler: Total delay: 80.242 s for time
142380812 ms (execution: 18.363 s)
15/02/13 06:16:50 INFO JobScheduler: Total delay: 70.342 s for time
142380814 ms (execution: 10.100 s)
15/02/13 06:16:56 INFO JobScheduler: Total delay: 56.551 s for time
142380816 ms (execution: 6.209 s)
15/02/13 06:17:06 INFO JobScheduler: Total delay: 46.405 s for time
142380818 ms (execution: 9.854 s)
15/02/13 06:17:13 INFO JobScheduler: Total delay: 33.443 s for time
142380820 ms (execution: 7.038 s)
15/02/13 06:17:21 INFO JobScheduler: Total delay: 21.483 s for time
142380822 ms (execution: 8.039 s)
15/02/13 06:17:26 INFO JobScheduler: Total delay: 6.697 s for time
142380824 ms (execution: 5.213 s)
15/02/13 06:17:45 INFO JobScheduler: Total delay: 5.814 s for time
142380826 ms (execution: 5.767 s)
15/02/13 06:18:06 INFO JobScheduler: Total delay: 6.905 s for time
142380828 ms (execution: 6.858 s)
15/02/13 06:18:28 INFO JobScheduler: Total delay: 8.604 s for time
142380830 ms (execution: 8.556 s)
15/02/13 06:18:45 INFO JobScheduler: Total delay: 5.631 s for time
142380832 ms (execution: 5.583 s)
15/02/13 06:19:04 INFO JobScheduler: Total delay: 4.838 s for time
142380834 ms (execution: 4.791 s)
15/02/13 06:19:24 INFO JobScheduler: Total delay: 4.467 s for time
142380836 ms (execution: 4.422 s)
15/02/13 06:19:45 INFO JobScheduler: Total delay: 5.779 s for time
142380838 ms (execution: 5.733 s)
15/02/13 06:20:04 INFO JobScheduler: Total delay: 4.747 s for time
142380840 ms (execution: 4.701 s)
15/02/13 06:20:24 INFO JobScheduler: Total delay: 4.829 s for time
142380842 ms (execution: 4.782 s)
15/02/13 06:20:44 INFO JobScheduler: Total delay: 4.724 s for time
142380844 ms (execution: 4.678 s)
15/02/13 06:21:04 INFO JobScheduler: Total delay: 4.110 s for time
142380846 ms (execution: 4.064 s)
15/02/13 06:21:24 INFO JobScheduler: Total delay: 4.562 s for time
142380848 ms (execution: 4.514 s)
15/02/13 06:21:43 INFO JobScheduler: Total delay: 3.999 s for time
142380850 ms (execution: 3.954 s)
15/02/13 06:22:04 INFO JobScheduler: Total delay: 4.353 s for time
142380852 ms (execution: 4.309 s)
15/02/13 06:22:24 INFO JobScheduler: Total delay: 4.712 s for time
142380854 ms (execution: 4.667 s)
15/02/13 06:22:44 INFO JobScheduler: Total delay: 4.726 s for time
142380856 ms (execution: 4.681 s)
15/02/13 06:23:07 INFO JobScheduler: Total delay: 7.860 s for time
142380858 ms (execution: 7.816 s)
15/02/13 06:23:28 INFO JobScheduler: Total delay: 8.426 s for time
142380860 ms (execution: 8.383 s)
15/02/13 06:23:43 INFO JobScheduler: Total delay: 3.857 s for time
142380862 ms (execution: 3.814 s)
15/02/13 06:24:03 INFO JobScheduler: Total delay: 3.936 s for time
142380864 ms (execution: 3.892 s)
15/02/13 06:24:23 INFO JobScheduler: Total delay: 3.810 s for time
142380866 ms (execution: 3.767 s)
15/02/13 06:24:43 INFO JobScheduler: Total delay: 3.889 s for time
142380868 ms (execution: 3.845 s)
15/02/13 06:25:03 INFO JobScheduler: Total delay: 3.553 s for time
142380870 ms (execution: 3.510 s)
15/02/13 06:25:27 INFO JobScheduler: Total delay: 7.031 s for time
142380872 ms (execution: 6.989 s)
15/02/13 06:25:43 INFO JobScheduler: Total delay: 3.636 s for time
142380874 ms (execution: 3.594 s)
15/02/13 06:26:03 INFO JobScheduler: Total delay: 3.425 s for time
142380876 ms (execution: 3.383 s)
15/02/13 06:26:23 INFO JobScheduler: Total delay: 3.939 s for time
142380878 ms (execution: 3.897 s)
15/02/13 06:26:43 INFO JobScheduler: Total delay: 3.640 s for time
142380880 ms (execution: 3.596 s)
15/02/13 06:27:03 INFO JobScheduler: Total delay: 3.905 s for time
142380882 ms (execution: 3.861 s)
15/02/13 06:27:24 INFO JobScheduler: Total delay: 4.068 s for time
142380884 ms (execution: 4.026 s)




On Thu, Feb 12, 2015 at 9:54 PM, Tim Smith  wrote:

> TD - I will try count() and report back. Meanwhile, attached is the entire
> driver log that includes the error logs about missing blocks.
>
> Cody - Let me research a bit about how to do connection pooling. Sorry, I
> am not really a programmer. I did see the connection pooling advise in the
> Spark Streaming Programming guide as an optimization but wasn't sure how to
> implement it. But do you think it will have a significant impact on
> performance?
>
> Saisai - I think, ideally, I'd rather not do any dStream partit

How to sum up the values in the columns of a dataset in Scala?

2015-02-12 Thread Carter

I am new to Scala. I have a dataset with many columns, each column has a
column name. Given several column names (these column names are not fixed,
they are generated dynamically), I need to sum up the values of these
columns. Is there an efficient way of doing this?

I worked out a way by using for loop, but I don't think it is efficient:

val AllLabels = List("ID", "val1", "val2", "val3", "val4")
val lbla = List("val1", "val3", "val4")
val index_lbla = lbla.map(x => AllLabels.indexOf(x))

val dataRDD = sc.textFile("../test.csv").map(_.split(","))

dataRDD.map(x=>
 {
  var sum = 0.0
  for (i <- 1 to index_lbla.length) 
sum = sum + x(i).toDouble
  sum
 }
).collect

The test.csv looks like below (without column names):

"ID", "val1", "val2", "val3", "val4"
 A, 123, 523, 534, 893
 B, 536, 98, 1623, 98472
 C, 537, 89, 83640, 9265
 D, 7297, 98364, 9, 735
 ...

Your help is very much appreciated!




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-sum-up-the-values-in-the-columns-of-a-dataset-in-Scala-tp21639.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Master dies after program finishes normally

2015-02-12 Thread Akhil Das
Increasing your driver memory might help.

Thanks
Best Regards

On Fri, Feb 13, 2015 at 12:09 AM, Manas Kar 
wrote:

> Hi,
>  I have a Hidden Markov Model running with 200MB data.
>  Once the program finishes (i.e. all stages/jobs are done) the program
> hangs for 20 minutes or so before killing master.
>
> In the spark master the following log appears.
>
> 2015-02-12 13:00:05,035 ERROR akka.actor.ActorSystemImpl: Uncaught fatal
> error from thread [sparkMaster-akka.actor.default-dispatcher-31] shutting
> down ActorSystem [sparkMaster]
> java.lang.OutOfMemoryError: GC overhead limit exceeded
> at scala.collection.immutable.List$.newBuilder(List.scala:396)
> at
> scala.collection.generic.GenericTraversableTemplate$class.genericBuilder(GenericTraversableTemplate.scala:69)
> at
> scala.collection.AbstractTraversable.genericBuilder(Traversable.scala:105)
> at
> scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:58)
> at
> scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:53)
> at
> scala.collection.TraversableLike$class.builder$1(TraversableLike.scala:239)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:243)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at
> org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:26)
> at
> org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:22)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> at
> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> at org.json4s.MonadicJValue.org
> $json4s$MonadicJValue$$findDirectByName(MonadicJValue.scala:22)
> at org.json4s.MonadicJValue.$bslash(MonadicJValue.scala:16)
> at
> org.apache.spark.util.JsonProtocol$.taskStartFromJson(JsonProtocol.scala:450)
> at
> org.apache.spark.util.JsonProtocol$.sparkEventFromJson(JsonProtocol.scala:423)
> at
> org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:71)
> at
> org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:69)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:69)
> at
> org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:55)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at
> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
> at
> org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:55)
> at
> org.apache.spark.deploy.master.Master.rebuildSparkUI(Master.scala:726)
> at
> org.apache.spark.deploy.master.Master.removeApplication(Master.scala:675)
> at
> org.apache.spark.deploy.master.Master.finishApplication(Master.scala:653)
> at
> org.apache.spark.deploy.master.Master$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$29.apply(Master.scala:399)
>
> Can anyone help?
>
> ..Manas
>


Why are there different "parts" in my CSV?

2015-02-12 Thread Su She
Hello Everyone,

I am writing simple word counts to hdfs using
messages.saveAsHadoopFiles("hdfs://user/ec2-user/","csv",String.class,
String.class, (Class) TextOutputFormat.class);

1) However, each 2 seconds I getting a new *directory *that is titled as a
csv. So i'll have test.csv, which will be a directory that has two files
inside of it called part-0 and part 1 (something like that). This
obv makes it very hard for me to read the data stored in the csv files. I
am wondering if there is a better way to store the JavaPairRecieverDStream
and JavaPairDStream?

2) I know there is a copy/merge hadoop api for merging files...can this be
done inside java? I am not sure the logic behind this api if I am using
spark streaming which is constantly making new files.

Thanks a lot for the help!


An interesting and serious problem I encountered

2015-02-12 Thread Landmark
Hi foks,

My Spark cluster has 8 machines, each of which has 377GB physical memory,
and thus the total maximum memory can be used for Spark is more than
2400+GB. In my program, I have to deal with 1 billion of (key, value) pairs,
where the key is an integer and the value is an integer array with 43
elements.  Therefore, the memory cost of this raw dataset is [(1+43) *
10 * 4] / (1024 * 1024 * 1024) = 164GB.  

Since I have to use this dataset repeatedly, I have to cache it in memory.
Some key parameter settings are: 
spark.storage.fraction=0.6
spark.driver.memory=30GB
spark.executor.memory=310GB.

But it failed on running a simple countByKey() and the error message is
"java.lang.OutOfMemoryError: Java heap space...". Does this mean a Spark
cluster of 2400+GB memory cannot keep 164GB raw data in memory? 

The codes of my program is as follows:

def main(args: Array[String]):Unit = {
val sc = new SparkContext(new SparkConfig());

val rdd = sc.parallelize(0 until 10, 25600).map(i => (i, new
Array[Int](43))).cache();
println("The number of keys is " + rdd.countByKey());

//some other operations following here ...
}




To figure out the issue, I evaluated the memory cost of key-value pairs and
computed their memory cost using SizeOf.jar. The codes are as follows:

val arr = new Array[Int](43);
println(SizeOf.humanReadable(SizeOf.deepSizeOf(arr)));

val tuple = (1, arr.clone);
println(SizeOf.humanReadable(SizeOf.deepSizeOf(tuple)));

The output is:
192.0b
992.0b


*Hard to believe, but it is true!! This result means, to store a key-value
pair, Tuple2 needs more than 5+ times memory than the simplest method with
array. Even though it may take 5+ times memory, its size is less than
1000GB, which is still much less than the total memory size of my cluster,
i.e., 2400+GB. I really do not understand why this happened.*

BTW, if the number of pairs is 1 million, it works well. If the arr contains
only 1 integer, to store a pair, Tuples needs around 10 times memory.

So I have some questions:
1. Why does Spark choose such a poor data structure, Tuple2, for key-value
pairs? Is there any better data structure for storing (key, value)  pairs
with less memory cost ?
2. Given a dataset with size of M, in general Spark how many times of memory
to handle it?


Best,
Landmark




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/An-interesting-and-serious-problem-I-encountered-tp21637.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Using Spark SQL for temporal data

2015-02-12 Thread Michael Armbrust
>
> I haven't been paying close attention to the JIRA tickets for
> PrunedFilteredScan but I noticed some weird behavior around the filters
> being applied when OR expressions were used in the WHERE clause. From what
> I was seeing, it looks like it could be possible that the "start" and "end"
> ranges you are proposing to place in the WHERE clause could actually never
> be pushed down to the PrunedFilteredScan if there's an OR expression in
> there, like: (start > "2014-12-01" and end < "2015-02-12") or (). I
> haven't done a unit test for this case yet, but I did file SPARK-5296
> because of the behavior I was seeing. I'm requiring a time range in the
> services I'm writing because without it, the full Accumulo table would be
> scanned- and that's no good.


Ah, I see.  Right now we only split up and pass down conjunctive (and)
predicates that can be expressed in the limited set of filters so far.  We
can easily add OR if it works for your use case. It'll be up to the data
source however to recurse down the ORs and either pass multiple time ranges
to accumulo or union multiple RDDs together to return them.  Lets discuss
more on the JIRA.

Are there any plans on making the CatalystScan public in the near future
> (possibly once SparkSQL reaches the proposed stability in 1.3?)


No, it'll remain public so people can experiment with it, but it is
unlikely it'll ever have the same stability guarantees that the Spark
public API does.  This is primarily due to its dependence on the whole
catalyst expression hierarchy.  Instead I'd like to add to the other scan
filters / interfaces that can provide useful information to the data
sources.


Re: 8080 port password protection

2015-02-12 Thread Akhil Das
Just to add to what Arush said, you can go through these links:

http://stackoverflow.com/questions/1162375/apache-port-proxy

http://serverfault.com/questions/153229/password-protect-and-serve-apache-site-by-port-number

Thanks
Best Regards

On Thu, Feb 12, 2015 at 10:43 PM, Arush Kharbanda <
ar...@sigmoidanalytics.com> wrote:

> You could apply a password using a filter using a server. Though it dosnt
> looks like the right grp for the question. It can be done for spark also
> for Spark UI.
>
> On Thu, Feb 12, 2015 at 10:19 PM, MASTER_ZION (Jairo Linux) <
> master.z...@gmail.com> wrote:
>
>> Hi everyone,
>>
>> Im creating a development machine in AWS and i would like to protect the
>> port 8080 using a password.
>>
>> Is it possible?
>>
>>
>> Best Regards
>>
>> *Jairo Moreno*
>>
>
>
>
> --
>
> [image: Sigmoid Analytics] 
>
> *Arush Kharbanda* || Technical Teamlead
>
> ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
>


Re: Can spark job server be used to visualize streaming data?

2015-02-12 Thread Su She
Thanks Kevin for the link, I have had issues trying to install zeppelin as
I believe it is not yet supported for CDH 5.3, and Spark 1.2. Please
correct me if I am mistaken.

On Thu, Feb 12, 2015 at 7:33 PM, Kevin (Sangwoo) Kim 
wrote:

> Apache Zeppelin also has a scheduler and then you can reload your chart
> periodically,
> Check it out:
> http://zeppelin.incubator.apache.org/docs/tutorial/tutorial.html
>
>
>
>
> On Fri Feb 13 2015 at 7:29:00 AM Silvio Fiorito <
> silvio.fior...@granturing.com> wrote:
>
>>   One method I’ve used is to publish each batch to a message bus or
>> queue with a custom UI listening on the other end, displaying the results
>> in d3.js or some other app. As far as I’m aware there isn’t a tool that
>> will directly take a DStream.
>>
>>  Spark Notebook seems to have some support for updating graphs
>> periodically. I haven’t used it myself yet so not sure how well it works.
>> See here: https://github.com/andypetrella/spark-notebook
>>
>>   From: Su She
>> Date: Thursday, February 12, 2015 at 1:55 AM
>> To: Felix C
>> Cc: Kelvin Chu, "user@spark.apache.org"
>>
>> Subject: Re: Can spark job server be used to visualize streaming data?
>>
>>   Hello Felix,
>>
>>  I am already streaming in very simple data using Kafka (few messages /
>> second, each record only has 3 columns...really simple, but looking to
>> scale once I connect everything). I am processing it in Spark Streaming and
>> am currently writing word counts to hdfs. So the part where I am confused
>> is...
>>
>> Kafka Publishes Data -> Kafka Consumer/Spark Streaming Receives Data ->
>> Spark Word Count -> *How do I visualize?*
>>
>>  is there a viz tool that I can set up to visualize JavaPairDStreams? or
>> do I have to write to hbase/hdfs first?
>>
>>  Thanks!
>>
>> On Wed, Feb 11, 2015 at 10:39 PM, Felix C 
>> wrote:
>>
>>>  What kind of data do you have? Kafka is a popular source to use with
>>> spark streaming.
>>> But, spark streaming also support reading from a file. Its called basic
>>> source
>>>
>>> https://spark.apache.org/docs/latest/streaming-programming-guide.html#input-dstreams-and-receivers
>>>
>>> --- Original Message ---
>>>
>>> From: "Su She" 
>>> Sent: February 11, 2015 10:23 AM
>>> To: "Felix C" 
>>> Cc: "Kelvin Chu" <2dot7kel...@gmail.com>, user@spark.apache.org
>>> Subject: Re: Can spark job server be used to visualize streaming data?
>>>
>>>   Thank you Felix and Kelvin. I think I'll def be using the k-means
>>> tools in mlib.
>>>
>>>  It seems the best way to stream data is by storing in hbase and then
>>> using an api in my viz to extract data? Does anyone have any thoughts on
>>> this?
>>>
>>>   Thanks!
>>>
>>>
>>> On Tue, Feb 10, 2015 at 11:45 PM, Felix C 
>>> wrote:
>>>
>>>  Checkout
>>>
>>> https://databricks.com/blog/2015/01/28/introducing-streaming-k-means-in-spark-1-2.html
>>>
>>> In there are links to how that is done.
>>>
>>>
>>> --- Original Message ---
>>>
>>> From: "Kelvin Chu" <2dot7kel...@gmail.com>
>>> Sent: February 10, 2015 12:48 PM
>>> To: "Su She" 
>>> Cc: user@spark.apache.org
>>> Subject: Re: Can spark job server be used to visualize streaming data?
>>>
>>>   Hi Su,
>>>
>>>  Out of the box, no. But, I know people integrate it with Spark
>>> Streaming to do real-time visualization. It will take some work though.
>>>
>>>  Kelvin
>>>
>>> On Mon, Feb 9, 2015 at 5:04 PM, Su She  wrote:
>>>
>>>  Hello Everyone,
>>>
>>>  I was reading this blog post:
>>> http://homes.esat.kuleuven.be/~bioiuser/blog/a-d3-visualisation-from-spark-as-a-service/
>>>
>>>  and was wondering if this approach can be taken to visualize streaming
>>> data...not just historical data?
>>>
>>>  Thank you!
>>>
>>>  -Suh
>>>
>>>
>>>
>>>
>>


Re: saveAsHadoopFile is not a member of ... RDD[(String, MyObject)]

2015-02-12 Thread Vladimir Protsenko
Thank's for reply. I solved porblem with importing
org.apache.spark.SparkContext._ by Imran Rashid suggestion.

In the sake of interest, does JavaPairRDD intended for use from java? What
is the purpose of this class? Does my rdd implicitly converted to it in
some circumstances?

2015-02-12 19:42 GMT+04:00 Ted Yu :

> You can use JavaPairRDD which has:
>
>   override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] =
> JavaPairRDD.fromRDD(rdd)
>
> Cheers
>
> On Thu, Feb 12, 2015 at 7:36 AM, Vladimir Protsenko  > wrote:
>
>> Hi. I am stuck with how to save file to hdfs from spark.
>>
>> I have written MyOutputFormat extends FileOutputFormat,
>> then in spark calling this:
>>
>>   rddres.saveAsHadoopFile[MyOutputFormat]("hdfs://localhost/output") or
>>   rddres.saveAsHadoopFile("hdfs://localhost/output", classOf[String],
>> classOf[MyObject],
>>classOf[MyOutputFormat])
>>
>> where rddres is RDD[(String, MyObject)] from up of transformation
>> pipeline.
>>
>> Compilation error is: /value saveAsHadoopFile is not a member of
>> org.apache.spark.rdd.RDD[(String, vlpr.MyObject)]/.
>>
>> Could someone give me insights on what could be done here to make it
>> working? Why it is not a member? Because of wrong types?
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/saveAsHadoopFile-is-not-a-member-of-RDD-String-MyObject-tp21627.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Streaming scheduling delay

2015-02-12 Thread Saisai Shao
Hi Tim,

I think this code will still introduce shuffle even when you call
repartition on each input stream. Actually this style of implementation
will generate more jobs (job per each input stream) than union into one
stream as called DStream.union(), and union normally has no special
overhead as I understood.

Also as Cody said, creating Producer per partition could be a potential
overhead, producer pool or sharing the Producer for one executor might be
better :).


 // Process stream from each receiver separately
 // (do not attempt to merge all streams and then re-partition,
this causes un-necessary and high amount of shuffle in the job)
 for (k <- kInStreams)
{
 // Re-partition stream from each receiver across all
compute nodes to spread out processing load and allows per partition
processing
 // and, set persistence level to spill to disk along with
serialization
 val kInMsgParts =
k.repartition(otherConf("dStreamPartitions").toInt).

2015-02-13 13:27 GMT+08:00 Cody Koeninger :

> outdata.foreachRDD( rdd => rdd.foreachPartition(rec => {
>
>  val writer = new
> KafkaOutputService(otherConf("kafkaProducerTopic").toString, propsMap)
>
>  writer.output(rec)
> })
> )
>
>
> So this is creating a new kafka producer for every new output partition,
> right?  Have you tried pooling the producers?
>
> On Thu, Feb 12, 2015 at 10:52 PM, Tim Smith  wrote:
>
>> 1) Yes, if I disable writing out to kafka and replace it with some very
>> light weight action is rdd.take(1), the app is stable.
>>
>> 2) The partitions I spoke of in the previous mail are the number of
>> partitions I create from each dStream. But yes, since I do processing and
>> writing out, per partition, each dStream partition ends up getting written
>> to a kafka partition. Flow is, broadly:
>> 5 Spark/Kafka Receivers -> Split each dStream into 30 partitions (150
>> partitions) -> Apply some transformation logic to each partition -> write
>> out each partition to kafka (kafka has 23 partitions). Let me increase the
>> number of partitions on the kafka side and see if that helps.
>>
>> Here's what the main block of code looks like (I added persistence back):
>>
>> val kInStreams = (1 to otherConf("inStreamCount").toInt).map{_ =>
>> KafkaUtils.createStream(ssc,otherConf("kafkaConsumerZk").toString,otherConf("kafkaConsumerGroupId").toString,
>> Map(otherConf("kafkaConsumerTopic").toString -> 1),
>> StorageLevel.MEMORY_AND_DISK_SER) }
>>
>> if (!configMap.keySet.isEmpty)
>> {
>>  // Process stream from each receiver separately
>>  // (do not attempt to merge all streams and then re-partition,
>> this causes un-necessary and high amount of shuffle in the job)
>>  for (k <- kInStreams)
>> {
>>  // Re-partition stream from each receiver across all
>> compute nodes to spread out processing load and allows per partition
>> processing
>>  // and, set persistence level to spill to disk along
>> with serialization
>>  val kInMsgParts =
>> k.repartition(otherConf("dStreamPartitions").toInt).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)
>>
>>  val outdata =
>> kInMsgParts.map(x=>myfunc(x._2,configMap)).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)
>>
>>  // Write each transformed partition to Kafka via the
>> "writer" object
>>  outdata.foreachRDD( rdd => rdd.foreachPartition(rec => {
>>
>>  val writer = new
>> KafkaOutputService(otherConf("kafkaProducerTopic").toString, propsMap)
>>
>>  writer.output(rec)
>>
>> }) )
>> }
>> }
>>
>>
>> Here's the life-cycle of a lost block:
>>
>> 15/02/12 16:26:12 INFO BlockManagerInfo: Added input-4-1423758372200 in
>> memory on nodedn1-23-acme.com:34526 (size: 164.7 KB, free: 379.5 MB)
>> 15/02/12 16:26:12 INFO BlockManagerInfo: Added input-4-1423758372200 in
>> memory on nodedn1-22-acme.com:42084 (size: 164.7 KB, free: 366.5 MB)
>> 15/02/12 16:31:21 INFO BlockManagerInfo: Added input-4-1423758372200 on
>> disk on nodedn1-22-acme.com:42084 (size: 164.7 KB)
>> 15/02/12 16:31:23 INFO BlockManagerInfo: Added input-4-1423758372200 on
>> disk on nodedn1-23-acme.com:34526 (size: 164.7 KB)
>> 15/02/12 16:32:27 WARN TaskSetManager: Lost task 54.0 in stage 16291.0
>> (TID 1042569, nodedn1-13-acme.com): java.lang.Exception: Could not
>> compute split, block input-4-1423758372200 not found
>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.1 in stage 16291.0
>> (TID 1042575) on executor nodedn1-21-acme.com: java.lang.Exception
>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>> 1]
>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.2 in stage 16291.0
>> (TID 1042581) on executor nodedn1-21-acme.com: java.lang.Exception
>> (Could not compute 

Re: HiveContext in SparkSQL - concurrency issues

2015-02-12 Thread Felix C
Your earlier call stack clearly states that it fails because the Derby 
metastore has already been started by another instance, so I think that is 
explained by your attempt to run this concurrently.

Are you running Spark standalone? Do you have a cluster? You should be able to 
run spark in yarn-client mode against the hive metastore service. That should 
give you ability to run multiple concurrently. Be sure to copy hive-site.XML to 
SPARK_HOME/conf

--- Original Message ---

From: "Harika" 
Sent: February 12, 2015 8:22 PM
To: user@spark.apache.org
Subject: Re: HiveContext in SparkSQL - concurrency issues

Hi,

I've been reading about Spark SQL and people suggest that using HiveContext
is better. So can anyone please suggest a solution to the above problem.
This is stopping me from moving forward with HiveContext.

Thanks
Harika



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/HiveContext-in-SparkSQL-concurrency-issues-tp21491p21636.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: saveAsHadoopFile is not a member of ... RDD[(String, MyObject)]

2015-02-12 Thread Vladimir Protsenko
Thank you. That worked.

2015-02-12 20:03 GMT+04:00 Imran Rashid :

> You need to import the implicit conversions to PairRDDFunctions with
>
> import org.apache.spark.SparkContext._
>
> (note that this requirement will go away in 1.3:
> https://issues.apache.org/jira/browse/SPARK-4397)
>
> On Thu, Feb 12, 2015 at 9:36 AM, Vladimir Protsenko  > wrote:
>
>> Hi. I am stuck with how to save file to hdfs from spark.
>>
>> I have written MyOutputFormat extends FileOutputFormat,
>> then in spark calling this:
>>
>>   rddres.saveAsHadoopFile[MyOutputFormat]("hdfs://localhost/output") or
>>   rddres.saveAsHadoopFile("hdfs://localhost/output", classOf[String],
>> classOf[MyObject],
>>classOf[MyOutputFormat])
>>
>> where rddres is RDD[(String, MyObject)] from up of transformation
>> pipeline.
>>
>> Compilation error is: /value saveAsHadoopFile is not a member of
>> org.apache.spark.rdd.RDD[(String, vlpr.MyObject)]/.
>>
>> Could someone give me insights on what could be done here to make it
>> working? Why it is not a member? Because of wrong types?
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/saveAsHadoopFile-is-not-a-member-of-RDD-String-MyObject-tp21627.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Using Spark SQL for temporal data

2015-02-12 Thread Corey Nolet
Ok. I just verified that this is the case with a little test:

WHERE (a = 'v1' and b = 'v2')PrunedFilteredScan passes down 2 filters
WHERE(a = 'v1' and b = 'v2') or (a = 'v3') PrunedFilteredScan passes down 0
filters

On Fri, Feb 13, 2015 at 12:28 AM, Corey Nolet  wrote:

> Michael,
>
> I haven't been paying close attention to the JIRA tickets for
> PrunedFilteredScan but I noticed some weird behavior around the filters
> being applied when OR expressions were used in the WHERE clause. From what
> I was seeing, it looks like it could be possible that the "start" and "end"
> ranges you are proposing to place in the WHERE clause could actually never
> be pushed down to the PrunedFilteredScan if there's an OR expression in
> there, like: (start > "2014-12-01" and end < "2015-02-12") or (). I
> haven't done a unit test for this case yet, but I did file SPARK-5296
> because of the behavior I was seeing. I'm requiring a time range in the
> services I'm writing because without it, the full Accumulo table would be
> scanned- and that's no good.
>
> Are there any plans on making the CatalystScan public in the near future
> (possibly once SparkSQL reaches the proposed stability in 1.3?)
>
>
> On Fri, Feb 13, 2015 at 12:14 AM, Michael Armbrust  > wrote:
>
>> Hi Corey,
>>
>> I would not recommend using the CatalystScan for this.  Its lower level,
>> and not stable across releases.
>>
>> You should be able to do what you want with PrunedFilteredScan
>> ,
>> though.  The filters
>> 
>> that it pushes down are already normalized so you can easily look for range
>> predicates with the start/end columns you care about.
>>
>> val start = filters.find {
>>   case GreaterThan("start", startDate: String) => 
>> DateTime.parse(startDate).toDate
>> }.getOrElse()
>> val end = filters.find {
>>   case LessThan("end", endDate: String) => DateTime.parse(endDate).toDate
>> }.getOrElse()
>>
>> ...
>>
>> Filters are advisory, so you can ignore ones that aren't start/end.
>>
>> Michael
>>
>> On Thu, Feb 12, 2015 at 8:32 PM, Corey Nolet  wrote:
>>
>>> I have a temporal data set in which I'd like to be able to query using
>>> Spark SQL. The dataset is actually in Accumulo and I've already written a
>>> CatalystScan implementation and RelationProvider[1] to register with the
>>> SQLContext so that I can apply my SQL statements.
>>>
>>> With my current implementation, the start and stop time ranges are set
>>> on the RelationProvider (so ultimately they become a per-table setting).
>>> I'd much rather be able to register the table without the time ranges and
>>> just specify them through the SQL query string itself (perhaps a expression
>>> in the WHERE clause?)
>>>
>>>
>>> [1]
>>> https://github.com/calrissian/accumulo-recipes/blob/master/thirdparty/spark/src/main/scala/org/calrissian/accumulorecipes/spark/sql/EventStoreCatalyst.scala
>>>
>>
>>
>


Re: Using Spark SQL for temporal data

2015-02-12 Thread Corey Nolet
Michael,

I haven't been paying close attention to the JIRA tickets for
PrunedFilteredScan but I noticed some weird behavior around the filters
being applied when OR expressions were used in the WHERE clause. From what
I was seeing, it looks like it could be possible that the "start" and "end"
ranges you are proposing to place in the WHERE clause could actually never
be pushed down to the PrunedFilteredScan if there's an OR expression in
there, like: (start > "2014-12-01" and end < "2015-02-12") or (). I
haven't done a unit test for this case yet, but I did file SPARK-5296
because of the behavior I was seeing. I'm requiring a time range in the
services I'm writing because without it, the full Accumulo table would be
scanned- and that's no good.

Are there any plans on making the CatalystScan public in the near future
(possibly once SparkSQL reaches the proposed stability in 1.3?)


On Fri, Feb 13, 2015 at 12:14 AM, Michael Armbrust 
wrote:

> Hi Corey,
>
> I would not recommend using the CatalystScan for this.  Its lower level,
> and not stable across releases.
>
> You should be able to do what you want with PrunedFilteredScan
> ,
> though.  The filters
> 
> that it pushes down are already normalized so you can easily look for range
> predicates with the start/end columns you care about.
>
> val start = filters.find {
>   case GreaterThan("start", startDate: String) => 
> DateTime.parse(startDate).toDate
> }.getOrElse()
> val end = filters.find {
>   case LessThan("end", endDate: String) => DateTime.parse(endDate).toDate
> }.getOrElse()
>
> ...
>
> Filters are advisory, so you can ignore ones that aren't start/end.
>
> Michael
>
> On Thu, Feb 12, 2015 at 8:32 PM, Corey Nolet  wrote:
>
>> I have a temporal data set in which I'd like to be able to query using
>> Spark SQL. The dataset is actually in Accumulo and I've already written a
>> CatalystScan implementation and RelationProvider[1] to register with the
>> SQLContext so that I can apply my SQL statements.
>>
>> With my current implementation, the start and stop time ranges are set on
>> the RelationProvider (so ultimately they become a per-table setting). I'd
>> much rather be able to register the table without the time ranges and just
>> specify them through the SQL query string itself (perhaps a expression in
>> the WHERE clause?)
>>
>>
>> [1]
>> https://github.com/calrissian/accumulo-recipes/blob/master/thirdparty/spark/src/main/scala/org/calrissian/accumulorecipes/spark/sql/EventStoreCatalyst.scala
>>
>
>


Re: Streaming scheduling delay

2015-02-12 Thread Cody Koeninger
outdata.foreachRDD( rdd => rdd.foreachPartition(rec => {

 val writer = new
KafkaOutputService(otherConf("kafkaProducerTopic").toString, propsMap)

 writer.output(rec)
}) )


So this is creating a new kafka producer for every new output partition,
right?  Have you tried pooling the producers?

On Thu, Feb 12, 2015 at 10:52 PM, Tim Smith  wrote:

> 1) Yes, if I disable writing out to kafka and replace it with some very
> light weight action is rdd.take(1), the app is stable.
>
> 2) The partitions I spoke of in the previous mail are the number of
> partitions I create from each dStream. But yes, since I do processing and
> writing out, per partition, each dStream partition ends up getting written
> to a kafka partition. Flow is, broadly:
> 5 Spark/Kafka Receivers -> Split each dStream into 30 partitions (150
> partitions) -> Apply some transformation logic to each partition -> write
> out each partition to kafka (kafka has 23 partitions). Let me increase the
> number of partitions on the kafka side and see if that helps.
>
> Here's what the main block of code looks like (I added persistence back):
>
> val kInStreams = (1 to otherConf("inStreamCount").toInt).map{_ =>
> KafkaUtils.createStream(ssc,otherConf("kafkaConsumerZk").toString,otherConf("kafkaConsumerGroupId").toString,
> Map(otherConf("kafkaConsumerTopic").toString -> 1),
> StorageLevel.MEMORY_AND_DISK_SER) }
>
> if (!configMap.keySet.isEmpty)
> {
>  // Process stream from each receiver separately
>  // (do not attempt to merge all streams and then re-partition,
> this causes un-necessary and high amount of shuffle in the job)
>  for (k <- kInStreams)
> {
>  // Re-partition stream from each receiver across all
> compute nodes to spread out processing load and allows per partition
> processing
>  // and, set persistence level to spill to disk along with
> serialization
>  val kInMsgParts =
> k.repartition(otherConf("dStreamPartitions").toInt).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)
>
>  val outdata =
> kInMsgParts.map(x=>myfunc(x._2,configMap)).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)
>
>  // Write each transformed partition to Kafka via the
> "writer" object
>  outdata.foreachRDD( rdd => rdd.foreachPartition(rec => {
>
>  val writer = new
> KafkaOutputService(otherConf("kafkaProducerTopic").toString, propsMap)
>
>  writer.output(rec)
> })
> )
> }
> }
>
>
> Here's the life-cycle of a lost block:
>
> 15/02/12 16:26:12 INFO BlockManagerInfo: Added input-4-1423758372200 in
> memory on nodedn1-23-acme.com:34526 (size: 164.7 KB, free: 379.5 MB)
> 15/02/12 16:26:12 INFO BlockManagerInfo: Added input-4-1423758372200 in
> memory on nodedn1-22-acme.com:42084 (size: 164.7 KB, free: 366.5 MB)
> 15/02/12 16:31:21 INFO BlockManagerInfo: Added input-4-1423758372200 on
> disk on nodedn1-22-acme.com:42084 (size: 164.7 KB)
> 15/02/12 16:31:23 INFO BlockManagerInfo: Added input-4-1423758372200 on
> disk on nodedn1-23-acme.com:34526 (size: 164.7 KB)
> 15/02/12 16:32:27 WARN TaskSetManager: Lost task 54.0 in stage 16291.0
> (TID 1042569, nodedn1-13-acme.com): java.lang.Exception: Could not
> compute split, block input-4-1423758372200 not found
> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.1 in stage 16291.0
> (TID 1042575) on executor nodedn1-21-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 1]
> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.2 in stage 16291.0
> (TID 1042581) on executor nodedn1-21-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 2]
> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.3 in stage 16291.0
> (TID 1042584) on executor nodedn1-20-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 3]
> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.4 in stage 16291.0
> (TID 1042586) on executor nodedn1-11-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 4]
> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.5 in stage 16291.0
> (TID 1042589) on executor nodedn1-14-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 5]
> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.6 in stage 16291.0
> (TID 1042594) on executor nodedn1-15-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 6]
> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.7 in stage 16291.0
> (TID 1042597) on executor nodedn1-20-acme.com: java.lang.Exception (Could
> not comput

Re: Streaming scheduling delay

2015-02-12 Thread Tathagata Das
1. Can you try count()? Take often does not force the entire computation.
2. Can you give the full log. From the log it seems that the blocks are
added to two nodes but the tasks seem to be launched to different nodes. I
dont see any message removing the blocks. So need the whole log to debug
this.

TD


On Feb 12, 2015 8:52 PM, "Tim Smith"  wrote:
>
> 1) Yes, if I disable writing out to kafka and replace it with some very
light weight action is rdd.take(1), the app is stable.
>
> 2) The partitions I spoke of in the previous mail are the number of
partitions I create from each dStream. But yes, since I do processing and
writing out, per partition, each dStream partition ends up getting written
to a kafka partition. Flow is, broadly:
> 5 Spark/Kafka Receivers -> Split each dStream into 30 partitions (150
partitions) -> Apply some transformation logic to each partition -> write
out each partition to kafka (kafka has 23 partitions). Let me increase the
number of partitions on the kafka side and see if that helps.

>
> Here's what the main block of code looks like (I added persistence back):
>
> val kInStreams = (1 to otherConf("inStreamCount").toInt).map{_ =>
KafkaUtils.createStream(ssc,otherConf("kafkaConsumerZk").toString,otherConf("kafkaConsumerGroupId").toString,
> Map(otherConf("kafkaConsumerTopic").toString -> 1),
StorageLevel.MEMORY_AND_DISK_SER) }
>
> if (!configMap.keySet.isEmpty)
> {
>  // Process stream from each receiver separately
>  // (do not attempt to merge all streams and then re-partition,
this causes un-necessary and high amount of shuffle in the job)
>  for (k <- kInStreams)
> {
>  // Re-partition stream from each receiver across all
compute nodes to spread out processing load and allows per partition
processing
>  // and, set persistence level to spill to disk along
with serialization
>  val kInMsgParts =
k.repartition(otherConf("dStreamPartitions").toInt).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)
>
>  val outdata =
kInMsgParts.map(x=>myfunc(x._2,configMap)).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)
>
>  // Write each transformed partition to Kafka via the
"writer" object
>  outdata.foreachRDD( rdd => rdd.foreachPartition(rec => {
>
 val writer = new
KafkaOutputService(otherConf("kafkaProducerTopic").toString, propsMap)
>
 writer.output(rec)
>
}) )
> }
> }
>
>
> Here's the life-cycle of a lost block:
>
> 15/02/12 16:26:12 INFO BlockManagerInfo: Added input-4-1423758372200 in
memory on nodedn1-23-acme.com:34526 (size: 164.7 KB, free: 379.5 MB)
> 15/02/12 16:26:12 INFO BlockManagerInfo: Added input-4-1423758372200 in
memory on nodedn1-22-acme.com:42084 (size: 164.7 KB, free: 366.5 MB)
> 15/02/12 16:31:21 INFO BlockManagerInfo: Added input-4-1423758372200 on
disk on nodedn1-22-acme.com:42084 (size: 164.7 KB)
> 15/02/12 16:31:23 INFO BlockManagerInfo: Added input-4-1423758372200 on
disk on nodedn1-23-acme.com:34526 (size: 164.7 KB)
> 15/02/12 16:32:27 WARN TaskSetManager: Lost task 54.0 in stage 16291.0
(TID 1042569, nodedn1-13-acme.com): java.lang.Exception: Could not compute
split, block input-4-1423758372200 not found
> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.1 in stage 16291.0
(TID 1042575) on executor nodedn1-21-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 1]
> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.2 in stage 16291.0
(TID 1042581) on executor nodedn1-21-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 2]
> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.3 in stage 16291.0
(TID 1042584) on executor nodedn1-20-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 3]
> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.4 in stage 16291.0
(TID 1042586) on executor nodedn1-11-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 4]
> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.5 in stage 16291.0
(TID 1042589) on executor nodedn1-14-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 5]
> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.6 in stage 16291.0
(TID 1042594) on executor nodedn1-15-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 6]
> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.7 in stage 16291.0
(TID 1042597) on executor nodedn1-20-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 7]
> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.8 in stage 16291.0
(TID 1042600) on executor nodedn1-17-acme.com: java.lang.Exception (Could
not compu

Re: Using Spark SQL for temporal data

2015-02-12 Thread Michael Armbrust
Hi Corey,

I would not recommend using the CatalystScan for this.  Its lower level,
and not stable across releases.

You should be able to do what you want with PrunedFilteredScan
,
though.  The filters

that it pushes down are already normalized so you can easily look for range
predicates with the start/end columns you care about.

val start = filters.find {
  case GreaterThan("start", startDate: String) =>
DateTime.parse(startDate).toDate
}.getOrElse()
val end = filters.find {
  case LessThan("end", endDate: String) => DateTime.parse(endDate).toDate
}.getOrElse()

...

Filters are advisory, so you can ignore ones that aren't start/end.

Michael

On Thu, Feb 12, 2015 at 8:32 PM, Corey Nolet  wrote:

> I have a temporal data set in which I'd like to be able to query using
> Spark SQL. The dataset is actually in Accumulo and I've already written a
> CatalystScan implementation and RelationProvider[1] to register with the
> SQLContext so that I can apply my SQL statements.
>
> With my current implementation, the start and stop time ranges are set on
> the RelationProvider (so ultimately they become a per-table setting). I'd
> much rather be able to register the table without the time ranges and just
> specify them through the SQL query string itself (perhaps a expression in
> the WHERE clause?)
>
>
> [1]
> https://github.com/calrissian/accumulo-recipes/blob/master/thirdparty/spark/src/main/scala/org/calrissian/accumulorecipes/spark/sql/EventStoreCatalyst.scala
>


Re: Streaming scheduling delay

2015-02-12 Thread Tim Smith
1) Yes, if I disable writing out to kafka and replace it with some very
light weight action is rdd.take(1), the app is stable.

2) The partitions I spoke of in the previous mail are the number of
partitions I create from each dStream. But yes, since I do processing and
writing out, per partition, each dStream partition ends up getting written
to a kafka partition. Flow is, broadly:
5 Spark/Kafka Receivers -> Split each dStream into 30 partitions (150
partitions) -> Apply some transformation logic to each partition -> write
out each partition to kafka (kafka has 23 partitions). Let me increase the
number of partitions on the kafka side and see if that helps.

Here's what the main block of code looks like (I added persistence back):

val kInStreams = (1 to otherConf("inStreamCount").toInt).map{_ =>
KafkaUtils.createStream(ssc,otherConf("kafkaConsumerZk").toString,otherConf("kafkaConsumerGroupId").toString,
Map(otherConf("kafkaConsumerTopic").toString -> 1),
StorageLevel.MEMORY_AND_DISK_SER) }

if (!configMap.keySet.isEmpty)
{
 // Process stream from each receiver separately
 // (do not attempt to merge all streams and then re-partition,
this causes un-necessary and high amount of shuffle in the job)
 for (k <- kInStreams)
{
 // Re-partition stream from each receiver across all
compute nodes to spread out processing load and allows per partition
processing
 // and, set persistence level to spill to disk along with
serialization
 val kInMsgParts =
k.repartition(otherConf("dStreamPartitions").toInt).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)

 val outdata =
kInMsgParts.map(x=>myfunc(x._2,configMap)).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)

 // Write each transformed partition to Kafka via the
"writer" object
 outdata.foreachRDD( rdd => rdd.foreachPartition(rec => {

 val writer = new
KafkaOutputService(otherConf("kafkaProducerTopic").toString, propsMap)

 writer.output(rec)
}) )
}
}


Here's the life-cycle of a lost block:

15/02/12 16:26:12 INFO BlockManagerInfo: Added input-4-1423758372200 in
memory on nodedn1-23-acme.com:34526 (size: 164.7 KB, free: 379.5 MB)
15/02/12 16:26:12 INFO BlockManagerInfo: Added input-4-1423758372200 in
memory on nodedn1-22-acme.com:42084 (size: 164.7 KB, free: 366.5 MB)
15/02/12 16:31:21 INFO BlockManagerInfo: Added input-4-1423758372200 on
disk on nodedn1-22-acme.com:42084 (size: 164.7 KB)
15/02/12 16:31:23 INFO BlockManagerInfo: Added input-4-1423758372200 on
disk on nodedn1-23-acme.com:34526 (size: 164.7 KB)
15/02/12 16:32:27 WARN TaskSetManager: Lost task 54.0 in stage 16291.0 (TID
1042569, nodedn1-13-acme.com): java.lang.Exception: Could not compute
split, block input-4-1423758372200 not found
15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.1 in stage 16291.0 (TID
1042575) on executor nodedn1-21-acme.com: java.lang.Exception (Could not
compute split, block input-4-1423758372200 not found) [duplicate 1]
15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.2 in stage 16291.0 (TID
1042581) on executor nodedn1-21-acme.com: java.lang.Exception (Could not
compute split, block input-4-1423758372200 not found) [duplicate 2]
15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.3 in stage 16291.0 (TID
1042584) on executor nodedn1-20-acme.com: java.lang.Exception (Could not
compute split, block input-4-1423758372200 not found) [duplicate 3]
15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.4 in stage 16291.0 (TID
1042586) on executor nodedn1-11-acme.com: java.lang.Exception (Could not
compute split, block input-4-1423758372200 not found) [duplicate 4]
15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.5 in stage 16291.0 (TID
1042589) on executor nodedn1-14-acme.com: java.lang.Exception (Could not
compute split, block input-4-1423758372200 not found) [duplicate 5]
15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.6 in stage 16291.0 (TID
1042594) on executor nodedn1-15-acme.com: java.lang.Exception (Could not
compute split, block input-4-1423758372200 not found) [duplicate 6]
15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.7 in stage 16291.0 (TID
1042597) on executor nodedn1-20-acme.com: java.lang.Exception (Could not
compute split, block input-4-1423758372200 not found) [duplicate 7]
15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.8 in stage 16291.0 (TID
1042600) on executor nodedn1-17-acme.com: java.lang.Exception (Could not
compute split, block input-4-1423758372200 not found) [duplicate 8]
15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.9 in stage 16291.0 (TID
1042606) on executor nodedn1-20-acme.com: java.lang.Exception (Could not
compute split, block input-4-1423758372200 not found) [duplicate 9]
15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.10 in stage 16291.0
(TID 104260

Re: Is there a separate mailing list for Spark Developers ?

2015-02-12 Thread Ted Yu
dev@spark is active.
e.g. see:

http://search-hadoop.com/m/JW1q5zQ1Xw/renaming+SchemaRDD+-%253E+DataFrame&subj=renaming+SchemaRDD+gt+DataFrame

Cheers

On Thu, Feb 12, 2015 at 8:09 PM, Manoj Samel 
wrote:

> d...@spark.apache.org
>  mentioned on
> http://spark.apache.org/community.html seems to be bouncing. Is there
> another one ?
>


Using Spark SQL for temporal data

2015-02-12 Thread Corey Nolet
I have a temporal data set in which I'd like to be able to query using
Spark SQL. The dataset is actually in Accumulo and I've already written a
CatalystScan implementation and RelationProvider[1] to register with the
SQLContext so that I can apply my SQL statements.

With my current implementation, the start and stop time ranges are set on
the RelationProvider (so ultimately they become a per-table setting). I'd
much rather be able to register the table without the time ranges and just
specify them through the SQL query string itself (perhaps a expression in
the WHERE clause?)


[1]
https://github.com/calrissian/accumulo-recipes/blob/master/thirdparty/spark/src/main/scala/org/calrissian/accumulorecipes/spark/sql/EventStoreCatalyst.scala


Re: streaming joining multiple streams

2015-02-12 Thread Tathagata Das
Sorry for the late response. With the amount of data you are planning join,
any system would take time. However, between Hive's MapRduce joins, and
Spark's basic shuffle, and Spark SQL's join, the latter wins hands down.
Furthermore, with the APIs of Spark and Spark Streaming, you will have to
do strictly less work to set the infrastructure that you want to build.

Yes, Spark Streaming currently does not support providing own timer,
because the logic to handle delays etc, is pretty complex and specific to
each application. Usually that logic can be implemented on top of the
windowing solutoin that Spark Streaming already provides.

TD



On Thu, Feb 5, 2015 at 7:37 AM, Zilvinas Saltys 
wrote:

> The challenge I have is this. There's two streams of data where an event
> might look like this in stream1: (time, hashkey, foo1) and in stream2:
> (time, hashkey, foo2)
> The result after joining should be (time, hashkey, foo1, foo2) .. The join
> happens on hashkey and the time difference can be ~30 mins between events.
> The amount of data is enormous .. hundreds of billions of events per
> month. I need not only join the existing history data but continue to do so
> with incoming data (comes in batches not really streamed)
>
> For now I was thinking to implement this in MapReduce and sliding windows
> .. I'm wondering if spark can actually help me with this sort of challenge?
> How would a join of two huge streams of historic data would actually
> happen internally within spark and would it be more efficient than let's
> say hive map reduce stream join of two big tables?
>
> I also saw spark streaming has windowing support but it seems you cannot
> provide your own timer? As in I cannot make the time be derived from events
> itself rather than having an actual clock running.
>
> Thanks,
>


Re: How to broadcast a variable read from a file in yarn-cluster mode?

2015-02-12 Thread Tathagata Das
Can you give me the whole logs?

TD

On Tue, Feb 10, 2015 at 10:48 AM, Jon Gregg  wrote:

> OK that worked and getting close here ... the job ran successfully for a
> bit and I got output for the first couple buckets before getting a
> "java.lang.Exception: Could not compute split, block input-0-1423593163000
> not found" error.
>
> So I bumped up the memory at the command line from 2 gb to 5 gb, ran it
> again ... this time I got around 8 successful outputs before erroring.
>
> Bumped up the memory from 5 gb to 10 gb ... got around 15 successful
> outputs before erroring.
>
>
> I'm not persisting or caching anything except for the broadcast IP table
> and another broadcast small user agents list used for the same type of
> filtering, and both files are tiny.  The Hadoop cluster is nearly empty
> right now and has more than enough available memory to handle this job.  I
> am connecting to Kafka as well and so there's a lot of data coming through
> as my index is trying to catch up to the current date, but yarn-client mode
> has several times in the past few weeks been able to catch up to the
> current date and run successfully for days without issue.
>
> My guess is memory isn't being cleared after each bucket?  Relevant
> portion of the log below.
>
>
> 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593134400 on
> phd40010023.na.com:1 in memory (size: 50.1 MB, free: 10.2 GB)
> 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593135400 on
> phd40010023.na.com:1 in memory (size: 24.9 MB, free: 10.2 GB)
> 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593136400 on
> phd40010023.na.com:1 in memory (size: 129.0 MB, free: 10.3 GB)
> 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593137000 on
> phd40010023.na.com:1 in memory (size: 112.4 MB, free: 10.4 GB)
> 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593137200 on
> phd40010023.na.com:1 in memory (size: 481.0 B, free: 10.4 GB)
> 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593137800 on
> phd40010023.na.com:1 in memory (size: 44.6 MB, free: 10.5 GB)
> 15/02/10 13:34:54 INFO MappedRDD: Removing RDD 754 from persistence list
> 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593138400 on
> phd40010023.na.com:1 in memory (size: 95.8 MB, free: 10.6 GB)
> 15/02/10 13:34:54 INFO BlockManager: Removing RDD 754
> 15/02/10 13:34:54 INFO MapPartitionsRDD: Removing RDD 753 from persistence
> list
> 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593138600 on
> phd40010023.na.com:1 in memory (size: 123.2 MB, free: 10.7 GB)
> 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593138800 on
> phd40010023.na.com:1 in memory (size: 5.2 KB, free: 10.7 GB)
> 15/02/10 13:34:54 INFO BlockManager: Removing RDD 753
> 15/02/10 13:34:54 INFO MappedRDD: Removing RDD 750 from persistence list
> 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593139600 on
> phd40010023.na.com:1 in memory (size: 106.4 MB, free: 10.8 GB)
> 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593139800 on
> phd40010023.na.com:1 in memory (size: 107.0 MB, free: 10.9 GB)
> 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-142359314 on
> phd40010023.na.com:1 in memory (size: 59.5 MB, free: 10.9 GB)
> 15/02/10 13:34:54 INFO BlockManager: Removing RDD 750
> 15/02/10 13:34:54 INFO MappedRDD: Removing RDD 749 from persistence list
> 15/02/10 13:34:54 INFO DAGScheduler: Missing parents: List(Stage 117,
> Stage 114, Stage 115, Stage 116)
> 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593140200 on
> phd40010023.na.com:1 in memory (size: 845.0 B, free: 10.9 GB)
> 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593140600 on
> phd40010023.na.com:1 in memory (size: 19.2 MB, free: 11.0 GB)
> 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593140800 on
> phd40010023.na.com:1 in memory (size: 492.0 B, free: 11.0 GB)
> 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593141000 on
> phd40010023.na.com:1 in memory (size: 1018.0 B, free: 11.0 GB)
> 15/02/10 13:34:54 INFO BlockManager: Removing RDD 749
> 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593142400 on
> phd40010023.na.com:1 in memory (size: 48.6 MB, free: 11.0 GB)
> 15/02/10 13:34:54 INFO MappedRDD: Removing RDD 767 from persistence list
> 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593142600 on
> phd40010023.na.com:1 in memory (size: 4.9 KB, free: 11.0 GB)
> 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593142800 on
> phd40010023.na.com:1 in memory (size: 780.0 B, free: 11.0 GB)
> 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593143800 on
> phd40010023.na.com:1 in memory (size: 847.0 B, free: 11.0 GB)
> 15/02/10 13:34:54 INFO BlockManager: Removing RDD 767
> 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-142359314440

Re: HiveContext in SparkSQL - concurrency issues

2015-02-12 Thread Harika
Hi,

I've been reading about Spark SQL and people suggest that using HiveContext
is better. So can anyone please suggest a solution to the above problem.
This is stopping me from moving forward with HiveContext.

Thanks
Harika



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/HiveContext-in-SparkSQL-concurrency-issues-tp21491p21636.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Streaming scheduling delay

2015-02-12 Thread Tathagata Das
Hey Tim,

Let me get the key points.
1. If you are not writing back to Kafka, the delay is stable? That is,
instead of "foreachRDD { // write to kafka }"  if you do "dstream.count",
then the delay is stable. Right?
2. If so, then Kafka is the bottleneck. Is the number of partitions, that
you spoke of the in the second mail, that determines the parallelism in
writes? Is it stable with 30 partitions?

Regarding the block exception, could you give me a trace of info level
logging that leads to this error? Basically I want trace the lifecycle of
the block.

On Thu, Feb 12, 2015 at 6:29 PM, Tim Smith  wrote:

> Hi Gerard,
>
> Great write-up and really good guidance in there.
>
> I have to be honest, I don't know why but setting # of partitions for each
> dStream to a low number (5-10) just causes the app to choke/crash. Setting
> it to 20 gets the app going but with not so great delays. Bump it up to 30
> and I start winning the war where processing time is consistently below
> batch time window (20 seconds) except for a batch every few batches where
> the compute time spikes 10x the usual.
>
> Following your guide, I took out some "logInfo" statements I had in the
> app but didn't seem to make much difference :(
>
> With a higher time window (20 seconds), I got the app to run stably for a
> few hours but then ran into the dreaded "java.lang.Exception: Could not
> compute split, block input-0-1423761240800 not found". Wonder if I need to
> add RDD persistence back?
>
> Also, I am reaching out to Virdata with some ProServ inquiries.
>
> Thanks
>
>
>
>
>
> On Thu, Feb 12, 2015 at 4:30 AM, Gerard Maas 
> wrote:
>
>> Hi Tim,
>>
>> From this: " There are 5 kafka receivers and each incoming stream is
>> split into 40 partitions"  I suspect that you're creating too many tasks
>> for Spark to process on time.
>> Could you try some of the 'knobs' I describe here to see if that would
>> help?
>>
>> http://www.virdata.com/tuning-spark/
>>
>> -kr, Gerard.
>>
>> On Thu, Feb 12, 2015 at 8:44 AM, Tim Smith  wrote:
>>
>>> Just read the thread "Are these numbers abnormal for spark streaming?"
>>> and I think I am seeing similar results - that is - increasing the window
>>> seems to be the trick here. I will have to monitor for a few hours/days
>>> before I can conclude (there are so many knobs/dials).
>>>
>>>
>>>
>>> On Wed, Feb 11, 2015 at 11:16 PM, Tim Smith  wrote:
>>>
 On Spark 1.2 (have been seeing this behaviour since 1.0), I have a
 streaming app that consumes data from Kafka and writes it back to Kafka
 (different topic). My big problem has been Total Delay. While execution
 time is usually >>> minutes to hours(s) (keeps going up).

 For a little while, I thought I had solved the issue by bumping up the
 driver memory. Then I expanded my Kafka cluster to add more nodes and the
 issue came up again. I tried a few things to smoke out the issue and
 something tells me the driver is the bottleneck again:

 1) From my app, I took out the entire write-out-to-kafka piece. Sure
 enough, execution, scheduling delay and hence total delay fell to sub
 second. This assured me that whatever processing I do before writing back
 to kafka isn't the bottleneck.

 2) In my app, I had RDD persistence set at different points but my code
 wasn't really re-using any RDDs so I took out all explicit persist()
 statements. And added, "spar...unpersist" to "true" in the context. After
 this, it doesn't seem to matter how much memory I give my executor, the
 total delay seems to be in the same range. I tried per executor memory from
 2G to 12G with no change in total delay so executors aren't memory starved.
 Also, in the SparkUI, under the Executors tab, all executors show 0/1060MB
 used when per executor memory is set to 2GB, for example.

 3) Input rate in the kafka consumer restricts spikes in incoming data.

 4) Tried FIFO and FAIR but didn't make any difference.

 5) Adding executors beyond a certain points seems useless (I guess
 excess ones just sit idle).

 At any given point in time, the SparkUI shows only one batch pending
 processing. So with just one batch pending processing, why would the
 scheduling delay run into minutes/hours if execution time is within the
 batch window duration? There aren't any failed stages or jobs.

 Right now, I have 100 executors ( i have tried setting executors from
 50-150), each with 2GB and 4 cores and the driver running with 16GB. There
 are 5 kafka receivers and each incoming stream is split into 40 partitions.
 Per receiver, input rate is restricted to 2 messages per second.

 Can anyone help me with clues or areas to look into, for
 troubleshooting the issue?

 One nugget I found buried in the code says:
 "The scheduler delay includes the network delay to send the task to the
 worker machine and to send back t

Re: Spark streaming job throwing ClassNotFound exception when recovering from checkpointing

2015-02-12 Thread Tathagata Das
Could you come up with a minimal example through which I can reproduce the
problem?

On Tue, Feb 10, 2015 at 12:30 PM, conor  wrote:

> I am getting the following error when I kill the spark driver and restart
> the job:
>
> 15/02/10 17:31:05 INFO CheckpointReader: Attempting to load checkpoint from
> file
>
> hdfs://hdfs-namenode.vagrant:9000/reporting/SampleJob$0.1.0/checkpoint-142358910.bk
> 15/02/10 17:31:05 WARN CheckpointReader: Error reading checkpoint from
> file
>
> hdfs://hdfs-namenode.vagrant:9000/reporting/SampleJob$0.1.0/checkpoint-142358910.bk
> java.io.IOException: java.lang.ClassNotFoundException:
> com.example.spark.streaming.reporting.live.jobs.Bucket
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:988)
> at
> org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:176)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
>
>
>
> Spark version is 1.2.0
>
> The streaming job is executing every 10 seconds with the following steps:
>
>   1. Consuming JSON from a kafka topic called journeys and converting to
>   case classes
>   2. Filters resulting journeys stream based on a time attribute being set
>   3. FlatMaps journeys into separate time buckets 15, 60 and 1440 minutes
>   e.g. ("HOUR1234569000", ActiveState("HOUR", 1234569000,
> hyperLogLog(journey
>   id), 360) )
>   4. ReduceByKey adding hyperloglogs
>   5. UpdateStateByKey to add to previous states hyperloglog
>   6. Then output results to Cassandra
>
>
> I have pasted in a sample app below to mimic the problem and put all
> classes
> into one file, it is also attached here  SampleJob.scala
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n21582/SampleJob.scala
> >
>
> To get around the issue for the moment, I have removed the Bucket class and
> stopped passing in a bucket array to the ActiveJourney class.
> And instead I hard code all the time buckets I need in the ActiveJourney
> class; this approach works and recovers from checkpointing but is not
> extensible.
>
> Can the Spark gurus explain why I get that ClassNotFound exception?
>
> Need any more information, please let me know.
>
> Much thanks,
> Conor
>
>
>
> package com.example.spark.streaming.reporting.live.jobs
> import java.util.Date
> import scala.Array.canBuildFrom
> import scala.collection.mutable.MutableList
> import org.apache.spark.SparkConf
> import org.apache.spark.streaming.Seconds
> import org.apache.spark.streaming.StreamingContext
> import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
> import org.apache.spark.streaming.dstream.DStream
> import org.apache.spark.streaming.kafka.KafkaUtils
> import org.json4s.DefaultFormats
> import org.json4s.jackson.JsonMethods.parse
> import org.json4s.jvalue2extractable
> import org.json4s.string2JsonInput
> import com.example.spark.streaming.utils.MilliSecondUtils
> import com.example.spark.streaming.utils.constants.ColumnFamilies
> import com.example.spark.streaming.utils.constants.Constants
> import com.example.spark.streaming.utils.constants.Milliseconds
> import com.example.spark.streaming.utils.constants.SparkConfig
> import com.datastax.spark.connector.SomeColumns
> import com.datastax.spark.connector.streaming.toDStreamFunctions
> import com.datastax.spark.connector.toNamedColumnRef
> import com.twitter.algebird.HLL
> import com.twitter.algebird.HyperLogLogMonoid
> // Json parsing classes
> case class Journey(o: Option[JourneyCommand], o2: Option[JourneyDetails])
> case class JourneyDetails(_id: String)
> case class JourneyCommand($set: Option[JourneySet])
> case class JourneySet(awayAt: Date)
> // Class not found bucket
> case class Bucket(val bucketType: String, val roundDown: (Long) => Long,
> val
> columnFamily: String, val size: Long, val maxIntervals: Int)
>
> // used for updateStateByKey
> case class ActiveState(var bucketType: String, var time: Long, var
> hyperLogLog: HLL, var ttl: Int)
>
> object SampleJob {
>  private final val Name = this.getClass().getSimpleName()
>  def main(args: Array[String]) {
>if (args.length < 8) {
>  System.err.println(s"Usage: $Name   
> ")
>  System.exit(1)
>}
>System.out.print(args)
>val Array(environment, zkQuorum, group, topics, numThreads, hdfsUri,
> cassandra, intervalSeconds) = args
>val checkpointDirectory = hdfsUri + "/reporting/" + Name + getClass().
> getPackage().getImplementationVersion()
>def functionToCreateContext(): StreamingContext = {
>
>  // how many buckets
>  val fifteen = Bucket("QUARTER_HOUR", MilliSecondUtils.
> roundDownToNearestQuarterHour, ColumnFamilies.Visits_15, Milliseconds.
> FifteenMinutes, 90)
>  val hour = Bucket("HOUR", MilliSecondUtils.roundDownToNearestHour,
> ColumnFamilies.Vis

Is there a separate mailing list for Spark Developers ?

2015-02-12 Thread Manoj Samel
d...@spark.apache.org
 mentioned on
http://spark.apache.org/community.html seems to be bouncing. Is there
another one ?


Re: Extract hour from Timestamp in Spark SQL

2015-02-12 Thread Michael Armbrust
This looks like your executors aren't running a version of spark with hive
support compiled in.
On Feb 12, 2015 7:31 PM, "Wush Wu"  wrote:

> Dear Michael,
>
> After use the org.apache.spark.sql.hive.HiveContext, the Exception:
> "java.util.
> NoSuchElementException: key not found: hour" is gone during the SQL
> planning.
>
> However, I got another error and the complete stacktrace is shown below. I
> am working on this now.
>
> Best,
> Wush
>
> Stacktrace:
>
> java.lang.ClassNotFoundException: org.apache.spark.sql.hive.HiveSimpleUdf
> at
> org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:65)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:274)
> at
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59)
> at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
> at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at
> scala.collection.immutable.$colon$colon.readObject(List.scala:362)
> at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
> at
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:60)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task

Re: Can spark job server be used to visualize streaming data?

2015-02-12 Thread Kevin (Sangwoo) Kim
Apache Zeppelin also has a scheduler and then you can reload your chart
periodically,
Check it out:
http://zeppelin.incubator.apache.org/docs/tutorial/tutorial.html




On Fri Feb 13 2015 at 7:29:00 AM Silvio Fiorito <
silvio.fior...@granturing.com> wrote:

>   One method I’ve used is to publish each batch to a message bus or queue
> with a custom UI listening on the other end, displaying the results in
> d3.js or some other app. As far as I’m aware there isn’t a tool that will
> directly take a DStream.
>
>  Spark Notebook seems to have some support for updating graphs
> periodically. I haven’t used it myself yet so not sure how well it works.
> See here: https://github.com/andypetrella/spark-notebook
>
>   From: Su She
> Date: Thursday, February 12, 2015 at 1:55 AM
> To: Felix C
> Cc: Kelvin Chu, "user@spark.apache.org"
>
> Subject: Re: Can spark job server be used to visualize streaming data?
>
>   Hello Felix,
>
>  I am already streaming in very simple data using Kafka (few messages /
> second, each record only has 3 columns...really simple, but looking to
> scale once I connect everything). I am processing it in Spark Streaming and
> am currently writing word counts to hdfs. So the part where I am confused
> is...
>
> Kafka Publishes Data -> Kafka Consumer/Spark Streaming Receives Data ->
> Spark Word Count -> *How do I visualize?*
>
>  is there a viz tool that I can set up to visualize JavaPairDStreams? or
> do I have to write to hbase/hdfs first?
>
>  Thanks!
>
> On Wed, Feb 11, 2015 at 10:39 PM, Felix C 
> wrote:
>
>>  What kind of data do you have? Kafka is a popular source to use with
>> spark streaming.
>> But, spark streaming also support reading from a file. Its called basic
>> source
>>
>> https://spark.apache.org/docs/latest/streaming-programming-guide.html#input-dstreams-and-receivers
>>
>> --- Original Message ---
>>
>> From: "Su She" 
>> Sent: February 11, 2015 10:23 AM
>> To: "Felix C" 
>> Cc: "Kelvin Chu" <2dot7kel...@gmail.com>, user@spark.apache.org
>> Subject: Re: Can spark job server be used to visualize streaming data?
>>
>>   Thank you Felix and Kelvin. I think I'll def be using the k-means
>> tools in mlib.
>>
>>  It seems the best way to stream data is by storing in hbase and then
>> using an api in my viz to extract data? Does anyone have any thoughts on
>> this?
>>
>>   Thanks!
>>
>>
>> On Tue, Feb 10, 2015 at 11:45 PM, Felix C 
>> wrote:
>>
>>  Checkout
>>
>> https://databricks.com/blog/2015/01/28/introducing-streaming-k-means-in-spark-1-2.html
>>
>> In there are links to how that is done.
>>
>>
>> --- Original Message ---
>>
>> From: "Kelvin Chu" <2dot7kel...@gmail.com>
>> Sent: February 10, 2015 12:48 PM
>> To: "Su She" 
>> Cc: user@spark.apache.org
>> Subject: Re: Can spark job server be used to visualize streaming data?
>>
>>   Hi Su,
>>
>>  Out of the box, no. But, I know people integrate it with Spark
>> Streaming to do real-time visualization. It will take some work though.
>>
>>  Kelvin
>>
>> On Mon, Feb 9, 2015 at 5:04 PM, Su She  wrote:
>>
>>  Hello Everyone,
>>
>>  I was reading this blog post:
>> http://homes.esat.kuleuven.be/~bioiuser/blog/a-d3-visualisation-from-spark-as-a-service/
>>
>>  and was wondering if this approach can be taken to visualize streaming
>> data...not just historical data?
>>
>>  Thank you!
>>
>>  -Suh
>>
>>
>>
>>
>


Re: Streaming scheduling delay

2015-02-12 Thread Tim Smith
Hi Gerard,

Great write-up and really good guidance in there.

I have to be honest, I don't know why but setting # of partitions for each
dStream to a low number (5-10) just causes the app to choke/crash. Setting
it to 20 gets the app going but with not so great delays. Bump it up to 30
and I start winning the war where processing time is consistently below
batch time window (20 seconds) except for a batch every few batches where
the compute time spikes 10x the usual.

Following your guide, I took out some "logInfo" statements I had in the app
but didn't seem to make much difference :(

With a higher time window (20 seconds), I got the app to run stably for a
few hours but then ran into the dreaded "java.lang.Exception: Could not
compute split, block input-0-1423761240800 not found". Wonder if I need to
add RDD persistence back?

Also, I am reaching out to Virdata with some ProServ inquiries.

Thanks





On Thu, Feb 12, 2015 at 4:30 AM, Gerard Maas  wrote:

> Hi Tim,
>
> From this: " There are 5 kafka receivers and each incoming stream is
> split into 40 partitions"  I suspect that you're creating too many tasks
> for Spark to process on time.
> Could you try some of the 'knobs' I describe here to see if that would
> help?
>
> http://www.virdata.com/tuning-spark/
>
> -kr, Gerard.
>
> On Thu, Feb 12, 2015 at 8:44 AM, Tim Smith  wrote:
>
>> Just read the thread "Are these numbers abnormal for spark streaming?"
>> and I think I am seeing similar results - that is - increasing the window
>> seems to be the trick here. I will have to monitor for a few hours/days
>> before I can conclude (there are so many knobs/dials).
>>
>>
>>
>> On Wed, Feb 11, 2015 at 11:16 PM, Tim Smith  wrote:
>>
>>> On Spark 1.2 (have been seeing this behaviour since 1.0), I have a
>>> streaming app that consumes data from Kafka and writes it back to Kafka
>>> (different topic). My big problem has been Total Delay. While execution
>>> time is usually >> minutes to hours(s) (keeps going up).
>>>
>>> For a little while, I thought I had solved the issue by bumping up the
>>> driver memory. Then I expanded my Kafka cluster to add more nodes and the
>>> issue came up again. I tried a few things to smoke out the issue and
>>> something tells me the driver is the bottleneck again:
>>>
>>> 1) From my app, I took out the entire write-out-to-kafka piece. Sure
>>> enough, execution, scheduling delay and hence total delay fell to sub
>>> second. This assured me that whatever processing I do before writing back
>>> to kafka isn't the bottleneck.
>>>
>>> 2) In my app, I had RDD persistence set at different points but my code
>>> wasn't really re-using any RDDs so I took out all explicit persist()
>>> statements. And added, "spar...unpersist" to "true" in the context. After
>>> this, it doesn't seem to matter how much memory I give my executor, the
>>> total delay seems to be in the same range. I tried per executor memory from
>>> 2G to 12G with no change in total delay so executors aren't memory starved.
>>> Also, in the SparkUI, under the Executors tab, all executors show 0/1060MB
>>> used when per executor memory is set to 2GB, for example.
>>>
>>> 3) Input rate in the kafka consumer restricts spikes in incoming data.
>>>
>>> 4) Tried FIFO and FAIR but didn't make any difference.
>>>
>>> 5) Adding executors beyond a certain points seems useless (I guess
>>> excess ones just sit idle).
>>>
>>> At any given point in time, the SparkUI shows only one batch pending
>>> processing. So with just one batch pending processing, why would the
>>> scheduling delay run into minutes/hours if execution time is within the
>>> batch window duration? There aren't any failed stages or jobs.
>>>
>>> Right now, I have 100 executors ( i have tried setting executors from
>>> 50-150), each with 2GB and 4 cores and the driver running with 16GB. There
>>> are 5 kafka receivers and each incoming stream is split into 40 partitions.
>>> Per receiver, input rate is restricted to 2 messages per second.
>>>
>>> Can anyone help me with clues or areas to look into, for troubleshooting
>>> the issue?
>>>
>>> One nugget I found buried in the code says:
>>> "The scheduler delay includes the network delay to send the task to the
>>> worker machine and to send back the result (but not the time to fetch the
>>> task result, if it needed to be fetched from the block manager on the
>>> worker)."
>>>
>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
>>>
>>> Could this be an issue with the driver being a bottlneck? All the
>>> executors posting their logs/stats to the driver?
>>>
>>> Thanks,
>>>
>>> Tim
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>


Re: Task not serializable problem in the multi-thread SQL query

2015-02-12 Thread lihu
Thanks very much, you're right.

I called the sc.stop() before the execute pool shutdown.

On Fri, Feb 13, 2015 at 7:04 AM, Michael Armbrust 
wrote:

> It looks to me like perhaps your SparkContext has shut down due to too
> many failures.  I'd look in the logs of your executors for more information.
>
> On Thu, Feb 12, 2015 at 2:34 AM, lihu  wrote:
>
>> I try to use the multi-thread to use the Spark SQL query.
>> some sample code just like this:
>>
>> val sqlContext = new SqlContext(sc)
>> val rdd_query = sc.parallelize(data,   part)
>> rdd_query.registerTempTable("MyTable")
>> sqlContext.cacheTable("MyTable")
>>
>> val serverPool = Executors.newFixedThreadPool(3)
>> val loopCnt = 10
>>
>> for(i <- 1 to loopCnt ){
>> serverPool.execute(new Runnable(){
>> override def run(){
>> if( some condition){
>> sqlContext.sql("SELECT * from
>> ...").collect().foreach(println)
>> }
>> else{
>> //some other query
>> }
>>
>> }
>> })
>> }
>>
>> this will throw a Task serializable Exception, if I do not use the
>> multi-thread, it works well.
>> Since there is no object is not serializable? so what is the problem?
>>
>>
>> java.lang.Error: org.apache.spark.SparkException: Task not serializable
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1182)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
>> at java.lang.Thread.run(Thread.java:853)
>>
>> Caused by: org.apache.spark.SparkException: Task not serializable
>> at
>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>> at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
>> at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
>> at
>> org.apache.spark.sql.columnar.InMemoryColumnarTableScan.execute(InMemoryColumnarTableScan.scala:105)
>> at org.apache.spark.sql.execution.Filter.execute(basicOperators.scala:57)
>> at
>> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85)
>> at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438)
>> at RDDRelation$$anonfun$main$1$$anon$1.run(RDDRelation.scala:112)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1176)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
>> at java.lang.Thread.run(Thread.java:853)
>>
>> Caused by: java.lang.NullPointerException
>> at
>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
>> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>> at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
>> at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
>> at
>> org.apache.spark.sql.columnar.InMemoryColumnarTableScan.execute(InMemoryColumnarTableScan.scala:105)
>> at org.apache.spark.sql.execution.Filter.execute(basicOperators.scala:57)
>> at
>> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85)
>> at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438)
>> at RDDRelation$$anonfun$main$1$$anon$1.run(RDDRelation.scala:112)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1176)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
>> at java.lang.Thread.run(Thread.java:853)
>>
>> --
>> *Best Wishes!*
>>
>>
>>
>


-- 
*Best Wishes!*


Re: exception with json4s render

2015-02-12 Thread Mohnish Kodnani
Any ideas on how to figure out what is going on when using json4s 3.2.11.
I have a need to use 3.2.11 and just to see if things work I had downgraded
to 3.2.10 and things started working.


On Wed, Feb 11, 2015 at 11:45 AM, Charles Feduke 
wrote:

> I was having a similar problem to this trying to use the Scala Jackson
> module yesterday. I tried setting `spark.files.userClassPathFirst` to true
> but I was still having problems due to the older version of Jackson that
> Spark has a dependency on. (I think its an old org.codehaus version.)
>
> I ended up solving my problem by using Spray JSON (
> https://github.com/spray/spray-json) which has no dependency on Jackson
> and has great control over the JSON rendering process.
>
> http://engineering.ooyala.com/blog/comparing-scala-json-libraries - based
> on that I looked for something that didn't rely on Jackson.
>
> Now that I see that there is some success with json4s on Spark 1.2.x I'll
> have to give that a try.
>
> On Wed Feb 11 2015 at 2:32:59 PM Jonathan Haddad 
> wrote:
>
>> Actually, yes, I was using 3.2.11.  I thought I would need the UUID
>> encoder that seems to have been added in that version, but I'm not using
>> it.  I've downgraded to 3.2.10 and it seems to work.
>>
>> I searched through the spark repo and it looks like it's got 3.2.10 in a
>> pom.  I don't know the first thing about how dependencies are resolved but
>> I'm guessing it's related?
>>
>> On Wed Feb 11 2015 at 11:20:42 AM Mohnish Kodnani <
>> mohnish.kodn...@gmail.com> wrote:
>>
>>> I was getting similar error after I upgraded to spark 1.2.1 from 1.1.1
>>> Are you by any chance using json4s 3.2.11.
>>> I downgraded to 3.2.10 and that seemed to have worked. But I didnt try
>>> to spend much time debugging the issue than that.
>>>
>>>
>>>
>>> On Wed, Feb 11, 2015 at 11:13 AM, Jonathan Haddad 
>>> wrote:
>>>
 I'm trying to use the json4s library in a spark job to push data back
 into kafka.  Everything was working fine when I was hard coding a string,
 but now that I'm trying to render a string from a simple map it's failing.
 The code works in sbt console.

 working console code:
 https://gist.github.com/rustyrazorblade/daa50bf05ff0d48ac6af

 failing spark job line:
 https://github.com/rustyrazorblade/killranalytics/blob/master/spark/src/main/scala/RawEventProcessing.scala#L114

 exception: https://gist.github.com/rustyrazorblade/1e220d87d41cfcad2bb9

 I've seen examples of using render / compact when I searched the ML
 archives, so I'm kind of at a loss here.

 Thanks in advance for any help.

 Jon

>>>
>>>


Re: Is it possible to expose SchemaRDD’s from thrift server?

2015-02-12 Thread Todd Nist
Thanks Michael.  I will give it a try.



On Thu, Feb 12, 2015 at 6:00 PM, Michael Armbrust 
wrote:

> You can start a JDBC server with an existing context.  See my answer here:
> http://apache-spark-user-list.1001560.n3.nabble.com/Standard-SQL-tool-access-to-SchemaRDD-td20197.html
>
> On Thu, Feb 12, 2015 at 7:24 AM, Todd Nist  wrote:
>
>> I have a question with regards to accessing SchemaRDD’s and Spark SQL
>> temp tables via the thrift server.  It appears that a SchemaRDD when
>> created is only available in the local namespace / context and are
>> unavailable to external services accessing Spark through thrift server via
>> ODBC; is this correct?  Does the same apply to temp tables?
>>
>> If we process data on Spark how is it exposed to the thrift server for
>> access by third party BI applications via ODBC?  Dose one need to have two
>> spark context, one for processing, then dump it to metastore from which a
>> third party application can fetch the data or is it possible to expose the
>> resulting SchemaRDD via the thrift server?
>>
>> I am trying to do this with Tableau, Spark SQL Connector.  From what I
>> can see I need the spark context for processing and then dump to
>> metastore.  Is it possible to access the resulting SchemaRDD from doing
>> something like this:
>>
>> create temporary table test
>> using org.apache.spark.sql.json
>> options (path ‘/data/json/*');
>>
>> cache table test;
>>
>> I am using Spark 1.2.1.  If not available now will it be in 1.3.x? Or is
>> the only way to achieve this is store into the metastore and does the imply
>> hive.
>>
>> -Todd
>>
>
>


RE: spark left outer join with java.lang.UnsupportedOperationException: empty collection

2015-02-12 Thread java8964
OK. I think I have to use "None" instead null, then it works. Still switching 
from Java.
I can also just use the field name as what I assume.
Great experience.

From: java8...@hotmail.com
To: user@spark.apache.org
Subject: spark left outer join with java.lang.UnsupportedOperationException: 
empty collection
Date: Thu, 12 Feb 2015 18:06:43 -0500




Hi, 
I am using Spark 1.2.0 with Hadoop 2.2. Now I have to 2 csv files, but have 8 
fields. I know that the first field from both files are IDs. I want to find all 
the IDs existed in the first file, but NOT in the 2nd file.
I am coming with the following code in spark-shell.
case class origAsLeft (id: String)case class newAsRight (id: String)val 
OrigData = sc.textFile("hdfs://firstfile").map(_.split(",")).map( r=>(r(0), 
origAsLeft(r(0val NewData = 
sc.textFile("hdfs://secondfile").map(_.split(",")).map( r=>(r(0), 
newAsRight(r(0val output = OrigData.leftOuterJoin(NewData).filter{ case (k, 
v) => v._2 == null }
Find what I understand, after OrigData left outer join with NewData, it will 
use the id as the key, and a tuple with (leftObject, RightObject).Since I want 
to find out all the IDs existed in the first file, but not in the 2nd one, so 
the output RDD will be the one I want, as it will filter out only when there is 
no newAsRight object from NewData.
Then I run 
output.first
Spark does start to run, but give me the following error message:15/02/12 
16:43:38 INFO scheduler.DAGScheduler: Job 4 finished: first at :21, 
took 78.303549 sjava.lang.UnsupportedOperationException: empty collection   
 at org.apache.spark.rdd.RDD.first(RDD.scala:1095)   at 
$iwC$$iwC$$iwC$$iwC.(:21) at 
$iwC$$iwC$$iwC.(:26)  at $iwC$$iwC.(:28)   at 
$iwC.(:30)at (:32) at .(:36)   
 at .() at .(:7) at .() at 
$print()at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:94)   
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
at java.lang.reflect.Method.invoke(Method.java:619) at 
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)at 
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)   at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)   at 
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705) at 
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669) at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828) at 
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873) at 
org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785)   at 
org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628) at 
org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636)   at 
org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641)  at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968)
   at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) 
 at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) 
 at 
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
   at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)   
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011)  at 
org.apache.spark.repl.Main$.main(Main.scala:31)  at 
org.apache.spark.repl.Main.main(Main.scala)  at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:94)   
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
at java.lang.reflect.Method.invoke(Method.java:619) at 
org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)   at 
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)  at 
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Did I do anything wrong? What is the way to find all the id in the first file, 
but not in the 2nd file?
Second question is how can I use the object field to do the compare in this 
case? For example, if I define:
case class origAsLeft (id: String, name: String)case class newAsRight (id: 
String, name: String)val OrigData = 
sc.textFile("hdfs://firstfile").map(_.split(",")).map( r=>(r(0), 
origAsLeft(r(0), r(1val NewData = 
sc.textFile("hdfs://secondfile").map(_.split(",")).map( r=>(r(0), 
newAsRight(r(0), r(1// in this case, I want to list all the data in the 
first file which has the same ID as in the 2nd file, but with different value 
in name, I want to do something like below:
val output = OrigData.join(NewData).filter{ case (k, v) => v._1.name != 
v._2.name }
But what is the syntax to use the field in the case class I defined?
Thanks
Yong   

Re: Easy way to "partition" an RDD into chunks like Guava's Iterables.partition

2015-02-12 Thread Corey Nolet
The more I'm thinking about this- I may try this instead:

val myChunkedRDD: RDD[List[Event]] = inputRDD.mapPartitions(_
.grouped(300).toList)

I wonder if this would work. I'll try it when I get back to work tomorrow.


Yuyhao, I tried your approach too but it seems to be somehow moving all the
data to a single partition (no matter what window I set) and it seems to
lock up my jobs. I waited for 15 minutes for a stage that usually takes
about 15 seconds and I finally just killed the job in yarn.

On Thu, Feb 12, 2015 at 4:40 PM, Corey Nolet  wrote:

> So I tried this:
>
> .mapPartitions(itr => {
> itr.grouped(300).flatMap(items => {
> myFunction(items)
> })
> })
>
> and I tried this:
>
> .mapPartitions(itr => {
> itr.grouped(300).flatMap(myFunction)
> })
>
>  I tried making myFunction a method, a function val, and even moving it
> into a singleton object.
>
> The closure cleaner throws Task not serliazable exceptions with a distance
> outer class whenever I do this.
>
> Just to test, I tried this:
>
> .flatMap(it => myFunction(Seq(it)))
>
> And it worked just fine. What am I doing wrong here?
>
> Also, my function is a little more complicated and it does take arguments
> that depend on the class actually manipulating the RDD- but why would it
> work fine with a single flatMap and not with mapPartitions? I am somewhat
> new to Scala and maybe I'm missing something here.
>
> On Wed, Feb 11, 2015 at 5:59 PM, Mark Hamstra 
> wrote:
>
>> No, only each group should need to fit.
>>
>> On Wed, Feb 11, 2015 at 2:56 PM, Corey Nolet  wrote:
>>
>>> Doesn't iter still need to fit entirely into memory?
>>>
>>> On Wed, Feb 11, 2015 at 5:55 PM, Mark Hamstra 
>>> wrote:
>>>
 rdd.mapPartitions { iter =>
   val grouped = iter.grouped(batchSize)
   for (group <- grouped) { ... }
 }

 On Wed, Feb 11, 2015 at 2:44 PM, Corey Nolet  wrote:

> I think the word "partition" here is a tad different than the term
> "partition" that we use in Spark. Basically, I want something similar to
> Guava's Iterables.partition [1], that is, If I have an RDD[People] and I
> want to run an algorithm that can be optimized by working on 30 people at 
> a
> time, I'd like to be able to say:
>
> val rdd: RDD[People] = .
> val partitioned: RDD[Seq[People]] = rdd.partition(30)
>
> I also don't want any shuffling- everything can still be processed
> locally.
>
>
> [1]
> http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/collect/Iterables.html#partition(java.lang.Iterable,%20int)
>


>>>
>>
>


Re: Question about mllib als's implicit training

2015-02-12 Thread Crystal Xing
Many thanks!

On Thu, Feb 12, 2015 at 3:31 PM, Sean Owen  wrote:

> This all describes how the implementation operates, logically. The
> matrix P is never formed, for sure, certainly not by the caller.
>
> The implementation actually extends to handle negative values in R too
> but it's all taken care of by the implementation.
>
> On Thu, Feb 12, 2015 at 11:29 PM, Crystal Xing 
> wrote:
> > HI Sean,
> >
> > I am reading the paper of implicit training.
> >
> > Collaborative Filtering for Implicit Feedback Datasets
> >
> > It mentioned
> >
> > "To this end, let us introduce
> > a set of binary variables p_ui, which indicates the preference of user u
> to
> > item i. The p_ui values are derived by
> > binarizing the r_ui values:
> > p_ui = 1 if  r_ui > 0
> > and
> >
> > p_ui=0 if  r_ui = 0
> >
> > "
> >
> >
> > If for user_item without interactions, I do not include it in the
> training
> > data.  All the r_ui will >0 and all the p_ui is always 1?
> > Or the Mllib's implementation automatically takes care of those no
> > interaction user_product pairs ?
> >
> >
> > On Thu, Feb 12, 2015 at 3:13 PM, Sean Owen  wrote:
> >>
> >> Where there is no user-item interaction, you provide no interaction,
> >> not an interaction with strength 0. Otherwise your input is fully
> >> dense.
> >>
> >> On Thu, Feb 12, 2015 at 11:09 PM, Crystal Xing  >
> >> wrote:
> >> > Hi,
> >> >
> >> > I have some implicit rating data, such as the purchasing data.  I read
> >> > the
> >> > paper about the implicit training algorithm used in spark and it
> >> > mentioned
> >> > the for user-prodct pairs which do not have implicit rating data, such
> >> > as no
> >> > purchase, we need to provide the value as 0.
> >> >
> >> > This is different from explicit training where when we provide
> training
> >> > data, for user-product pair without a rating, we just do not have them
> >> > in
> >> > the training data instead of adding a user-product pair with rating 0.
> >> >
> >> > Am I understand this correctly?
> >> >
> >> >  Or for implicit training implementation in spark, the missing data
> will
> >> > be
> >> > automatically filled out as zero and we do not need to add them in the
> >> > training data set?
> >> >
> >> > Thanks,
> >> >
> >> > Crystal.
> >
> >
>


Re: Question about mllib als's implicit training

2015-02-12 Thread Sean Owen
This all describes how the implementation operates, logically. The
matrix P is never formed, for sure, certainly not by the caller.

The implementation actually extends to handle negative values in R too
but it's all taken care of by the implementation.

On Thu, Feb 12, 2015 at 11:29 PM, Crystal Xing  wrote:
> HI Sean,
>
> I am reading the paper of implicit training.
>
> Collaborative Filtering for Implicit Feedback Datasets
>
> It mentioned
>
> "To this end, let us introduce
> a set of binary variables p_ui, which indicates the preference of user u to
> item i. The p_ui values are derived by
> binarizing the r_ui values:
> p_ui = 1 if  r_ui > 0
> and
>
> p_ui=0 if  r_ui = 0
>
> "
>
>
> If for user_item without interactions, I do not include it in the training
> data.  All the r_ui will >0 and all the p_ui is always 1?
> Or the Mllib's implementation automatically takes care of those no
> interaction user_product pairs ?
>
>
> On Thu, Feb 12, 2015 at 3:13 PM, Sean Owen  wrote:
>>
>> Where there is no user-item interaction, you provide no interaction,
>> not an interaction with strength 0. Otherwise your input is fully
>> dense.
>>
>> On Thu, Feb 12, 2015 at 11:09 PM, Crystal Xing 
>> wrote:
>> > Hi,
>> >
>> > I have some implicit rating data, such as the purchasing data.  I read
>> > the
>> > paper about the implicit training algorithm used in spark and it
>> > mentioned
>> > the for user-prodct pairs which do not have implicit rating data, such
>> > as no
>> > purchase, we need to provide the value as 0.
>> >
>> > This is different from explicit training where when we provide training
>> > data, for user-product pair without a rating, we just do not have them
>> > in
>> > the training data instead of adding a user-product pair with rating 0.
>> >
>> > Am I understand this correctly?
>> >
>> >  Or for implicit training implementation in spark, the missing data will
>> > be
>> > automatically filled out as zero and we do not need to add them in the
>> > training data set?
>> >
>> > Thanks,
>> >
>> > Crystal.
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Question about mllib als's implicit training

2015-02-12 Thread Crystal Xing
HI Sean,

I am reading the paper of implicit training.
Collaborative Filtering for Implicit Feedback Datasets


It mentioned

"To this end, let us introduce
a set of binary variables p_ui, which indicates the preference of user u to
item i. The p_ui values are derived by
binarizing the r_ui values:
p_ui = 1 if  r_ui > 0
and

p_ui=0 if  r_ui = 0

"


If for user_item without interactions, I do not include it in the training
data.  All the r_ui will >0 and all the p_ui is always 1?
Or the Mllib's implementation automatically takes care of those no
interaction user_product pairs ?


On Thu, Feb 12, 2015 at 3:13 PM, Sean Owen  wrote:

> Where there is no user-item interaction, you provide no interaction,
> not an interaction with strength 0. Otherwise your input is fully
> dense.
>
> On Thu, Feb 12, 2015 at 11:09 PM, Crystal Xing 
> wrote:
> > Hi,
> >
> > I have some implicit rating data, such as the purchasing data.  I read
> the
> > paper about the implicit training algorithm used in spark and it
> mentioned
> > the for user-prodct pairs which do not have implicit rating data, such
> as no
> > purchase, we need to provide the value as 0.
> >
> > This is different from explicit training where when we provide training
> > data, for user-product pair without a rating, we just do not have them in
> > the training data instead of adding a user-product pair with rating 0.
> >
> > Am I understand this correctly?
> >
> >  Or for implicit training implementation in spark, the missing data will
> be
> > automatically filled out as zero and we do not need to add them in the
> > training data set?
> >
> > Thanks,
> >
> > Crystal.
>


RE: Is there a fast way to do fast top N product recommendations for all users

2015-02-12 Thread Ganelin, Ilya
Hi all - I've spent a while playing with this. Two significant sources of speed 
up that I've achieved are

1) Manually multiplying the feature vectors and caching either the user or 
product vector

2) By doing so, if one of the RDDs is a global it becomes possible to 
parallelize this step by running it in a thread and submitting multiple threads 
to yarn engine.

Doing so I've achieved an over 75x speed up compared with the packaged versio 
inside ml lib.



Sent with Good (www.good.com)


-Original Message-
From: Sean Owen [so...@cloudera.com]
Sent: Thursday, February 12, 2015 05:47 PM Eastern Standard Time
To: Crystal Xing
Cc: user@spark.apache.org
Subject: Re: Is there a fast way to do fast top N product recommendations for 
all users


Not now, but see https://issues.apache.org/jira/browse/SPARK-3066

As an aside, it's quite expensive to make recommendations for all
users. IMHO this is not something to do, if you can avoid it
architecturally. For example, consider precomputing recommendations
only for users whose probability of needing recommendations soon is
not very small. Usually, only a small number of users are active.

On Thu, Feb 12, 2015 at 10:26 PM, Crystal Xing  wrote:
> Hi,
>
>
> I wonder if there is a way to do fast top N product recommendations for all
> users in training using mllib's ALS algorithm.
>
> I am currently calling
>
> public Rating[] recommendProducts(int user,
>  int num)
>
> method in MatrixFactorizatoinModel for users one by one
> and it is quite slow since it does not operate on RDD input?
>
> I also tried to generate all possible
> user-product pairs and use
> public JavaRDD predict(JavaPairRDD usersProducts)
>
> to fill out the matrix. Since I have a large number of user and products,
>
> the job stucks and transforming all pairs.
>
>
> I wonder if there is a better way to do this.
>
> Thanks,
>
> Crystal.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: Question about mllib als's implicit training

2015-02-12 Thread Sean Owen
Where there is no user-item interaction, you provide no interaction,
not an interaction with strength 0. Otherwise your input is fully
dense.

On Thu, Feb 12, 2015 at 11:09 PM, Crystal Xing  wrote:
> Hi,
>
> I have some implicit rating data, such as the purchasing data.  I read the
> paper about the implicit training algorithm used in spark and it mentioned
> the for user-prodct pairs which do not have implicit rating data, such as no
> purchase, we need to provide the value as 0.
>
> This is different from explicit training where when we provide training
> data, for user-product pair without a rating, we just do not have them in
> the training data instead of adding a user-product pair with rating 0.
>
> Am I understand this correctly?
>
>  Or for implicit training implementation in spark, the missing data will be
> automatically filled out as zero and we do not need to add them in the
> training data set?
>
> Thanks,
>
> Crystal.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Question about mllib als's implicit training

2015-02-12 Thread Crystal Xing
Hi,

I have some implicit rating data, such as the purchasing data.  I read the
paper about the implicit training algorithm used in spark and it mentioned
the for user-prodct pairs which do not have implicit rating data, such as
no purchase, we need to provide the value as 0.

This is different from explicit training where when we provide training
data, for user-product pair without a rating, we just do not have them in
the training data instead of adding a user-product pair with rating 0.

Am I understand this correctly?

 Or for implicit training implementation in spark, the missing data will be
automatically filled out as zero and we do not need to add them in the
training data set?

Thanks,

Crystal.


spark left outer join with java.lang.UnsupportedOperationException: empty collection

2015-02-12 Thread java8964
Hi, 
I am using Spark 1.2.0 with Hadoop 2.2. Now I have to 2 csv files, but have 8 
fields. I know that the first field from both files are IDs. I want to find all 
the IDs existed in the first file, but NOT in the 2nd file.
I am coming with the following code in spark-shell.
case class origAsLeft (id: String)case class newAsRight (id: String)val 
OrigData = sc.textFile("hdfs://firstfile").map(_.split(",")).map( r=>(r(0), 
origAsLeft(r(0val NewData = 
sc.textFile("hdfs://secondfile").map(_.split(",")).map( r=>(r(0), 
newAsRight(r(0val output = OrigData.leftOuterJoin(NewData).filter{ case (k, 
v) => v._2 == null }
Find what I understand, after OrigData left outer join with NewData, it will 
use the id as the key, and a tuple with (leftObject, RightObject).Since I want 
to find out all the IDs existed in the first file, but not in the 2nd one, so 
the output RDD will be the one I want, as it will filter out only when there is 
no newAsRight object from NewData.
Then I run 
output.first
Spark does start to run, but give me the following error message:15/02/12 
16:43:38 INFO scheduler.DAGScheduler: Job 4 finished: first at :21, 
took 78.303549 sjava.lang.UnsupportedOperationException: empty collection   
 at org.apache.spark.rdd.RDD.first(RDD.scala:1095)   at 
$iwC$$iwC$$iwC$$iwC.(:21) at 
$iwC$$iwC$$iwC.(:26)  at $iwC$$iwC.(:28)   at 
$iwC.(:30)at (:32) at .(:36)   
 at .() at .(:7) at .() at 
$print()at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:94)   
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
at java.lang.reflect.Method.invoke(Method.java:619) at 
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)at 
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)   at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)   at 
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705) at 
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669) at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828) at 
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873) at 
org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785)   at 
org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628) at 
org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636)   at 
org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641)  at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968)
   at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) 
 at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) 
 at 
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
   at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)   
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011)  at 
org.apache.spark.repl.Main$.main(Main.scala:31)  at 
org.apache.spark.repl.Main.main(Main.scala)  at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:94)   
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
at java.lang.reflect.Method.invoke(Method.java:619) at 
org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)   at 
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)  at 
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Did I do anything wrong? What is the way to find all the id in the first file, 
but not in the 2nd file?
Second question is how can I use the object field to do the compare in this 
case? For example, if I define:
case class origAsLeft (id: String, name: String)case class newAsRight (id: 
String, name: String)val OrigData = 
sc.textFile("hdfs://firstfile").map(_.split(",")).map( r=>(r(0), 
origAsLeft(r(0), r(1val NewData = 
sc.textFile("hdfs://secondfile").map(_.split(",")).map( r=>(r(0), 
newAsRight(r(0), r(1// in this case, I want to list all the data in the 
first file which has the same ID as in the 2nd file, but with different value 
in name, I want to do something like below:
val output = OrigData.join(NewData).filter{ case (k, v) => v._1.name != 
v._2.name }
But what is the syntax to use the field in the case class I defined?
Thanks
Yong  

Predicting Class Probability with Gradient Boosting/Random Forest

2015-02-12 Thread nilesh
We are using Gradient Boosting/Random Forests that I have found provide the
best results for our recommendations. My issue is that I need the
probability of the 0/1 label, and not the predicted label. In the spark
scala api, I see that the predict method also has an option to provide the
probability. Can you provide any pointers to any documentation that I can
reference for implementing this. Thanks!

-Nilesh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Predicting-Class-Probability-with-Gradient-Boosting-Random-Forest-tp21633.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Task not serializable problem in the multi-thread SQL query

2015-02-12 Thread Michael Armbrust
It looks to me like perhaps your SparkContext has shut down due to too many
failures.  I'd look in the logs of your executors for more information.

On Thu, Feb 12, 2015 at 2:34 AM, lihu  wrote:

> I try to use the multi-thread to use the Spark SQL query.
> some sample code just like this:
>
> val sqlContext = new SqlContext(sc)
> val rdd_query = sc.parallelize(data,   part)
> rdd_query.registerTempTable("MyTable")
> sqlContext.cacheTable("MyTable")
>
> val serverPool = Executors.newFixedThreadPool(3)
> val loopCnt = 10
>
> for(i <- 1 to loopCnt ){
> serverPool.execute(new Runnable(){
> override def run(){
> if( some condition){
> sqlContext.sql("SELECT * from
> ...").collect().foreach(println)
> }
> else{
> //some other query
> }
>
> }
> })
> }
>
> this will throw a Task serializable Exception, if I do not use the
> multi-thread, it works well.
> Since there is no object is not serializable? so what is the problem?
>
>
> java.lang.Error: org.apache.spark.SparkException: Task not serializable
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1182)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
> at java.lang.Thread.run(Thread.java:853)
>
> Caused by: org.apache.spark.SparkException: Task not serializable
> at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
> at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
> at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
> at
> org.apache.spark.sql.columnar.InMemoryColumnarTableScan.execute(InMemoryColumnarTableScan.scala:105)
> at org.apache.spark.sql.execution.Filter.execute(basicOperators.scala:57)
> at
> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85)
> at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438)
> at RDDRelation$$anonfun$main$1$$anon$1.run(RDDRelation.scala:112)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1176)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
> at java.lang.Thread.run(Thread.java:853)
>
> Caused by: java.lang.NullPointerException
> at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
> at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
> at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
> at
> org.apache.spark.sql.columnar.InMemoryColumnarTableScan.execute(InMemoryColumnarTableScan.scala:105)
> at org.apache.spark.sql.execution.Filter.execute(basicOperators.scala:57)
> at
> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85)
> at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438)
> at RDDRelation$$anonfun$main$1$$anon$1.run(RDDRelation.scala:112)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1176)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
> at java.lang.Thread.run(Thread.java:853)
>
> --
> *Best Wishes!*
>
>
>


Re: Is there a fast way to do fast top N product recommendations for all users

2015-02-12 Thread Crystal Xing
Thanks, Sean! Glad to know it will be in the future release.

On Thu, Feb 12, 2015 at 2:45 PM, Sean Owen  wrote:

> Not now, but see https://issues.apache.org/jira/browse/SPARK-3066
>
> As an aside, it's quite expensive to make recommendations for all
> users. IMHO this is not something to do, if you can avoid it
> architecturally. For example, consider precomputing recommendations
> only for users whose probability of needing recommendations soon is
> not very small. Usually, only a small number of users are active.
>
> On Thu, Feb 12, 2015 at 10:26 PM, Crystal Xing 
> wrote:
> > Hi,
> >
> >
> > I wonder if there is a way to do fast top N product recommendations for
> all
> > users in training using mllib's ALS algorithm.
> >
> > I am currently calling
> >
> > public Rating[] recommendProducts(int user,
> >  int num)
> >
> > method in MatrixFactorizatoinModel for users one by one
> > and it is quite slow since it does not operate on RDD input?
> >
> > I also tried to generate all possible
> > user-product pairs and use
> > public JavaRDD predict(JavaPairRDD
> usersProducts)
> >
> > to fill out the matrix. Since I have a large number of user and products,
> >
> > the job stucks and transforming all pairs.
> >
> >
> > I wonder if there is a better way to do this.
> >
> > Thanks,
> >
> > Crystal.
>


Re: Is it possible to expose SchemaRDD’s from thrift server?

2015-02-12 Thread Michael Armbrust
You can start a JDBC server with an existing context.  See my answer here:
http://apache-spark-user-list.1001560.n3.nabble.com/Standard-SQL-tool-access-to-SchemaRDD-td20197.html

On Thu, Feb 12, 2015 at 7:24 AM, Todd Nist  wrote:

> I have a question with regards to accessing SchemaRDD’s and Spark SQL temp
> tables via the thrift server.  It appears that a SchemaRDD when created is
> only available in the local namespace / context and are unavailable to
> external services accessing Spark through thrift server via ODBC; is this
> correct?  Does the same apply to temp tables?
>
> If we process data on Spark how is it exposed to the thrift server for
> access by third party BI applications via ODBC?  Dose one need to have two
> spark context, one for processing, then dump it to metastore from which a
> third party application can fetch the data or is it possible to expose the
> resulting SchemaRDD via the thrift server?
>
> I am trying to do this with Tableau, Spark SQL Connector.  From what I can
> see I need the spark context for processing and then dump to metastore.  Is
> it possible to access the resulting SchemaRDD from doing something like
> this:
>
> create temporary table test
> using org.apache.spark.sql.json
> options (path ‘/data/json/*');
>
> cache table test;
>
> I am using Spark 1.2.1.  If not available now will it be in 1.3.x? Or is
> the only way to achieve this is store into the metastore and does the imply
> hive.
>
> -Todd
>


Re: How to do broadcast join in SparkSQL

2015-02-12 Thread Michael Armbrust
In Spark 1.3, parquet tables that are created through the datasources API
will automatically calculate the sizeInBytes, which is used to broadcast.

On Thu, Feb 12, 2015 at 12:46 PM, Dima Zhiyanov 
wrote:

> Hello
>
> Has Spark implemented computing statistics for Parquet files? Or is there
> any other way I can enable broadcast joins between parquet file RDDs in
> Spark Sql?
>
> Thanks
> Dima
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-do-broadcast-join-in-SparkSQL-tp15298p21632.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Is there a fast way to do fast top N product recommendations for all users

2015-02-12 Thread Sean Owen
Not now, but see https://issues.apache.org/jira/browse/SPARK-3066

As an aside, it's quite expensive to make recommendations for all
users. IMHO this is not something to do, if you can avoid it
architecturally. For example, consider precomputing recommendations
only for users whose probability of needing recommendations soon is
not very small. Usually, only a small number of users are active.

On Thu, Feb 12, 2015 at 10:26 PM, Crystal Xing  wrote:
> Hi,
>
>
> I wonder if there is a way to do fast top N product recommendations for all
> users in training using mllib's ALS algorithm.
>
> I am currently calling
>
> public Rating[] recommendProducts(int user,
>  int num)
>
> method in MatrixFactorizatoinModel for users one by one
> and it is quite slow since it does not operate on RDD input?
>
> I also tried to generate all possible
> user-product pairs and use
> public JavaRDD predict(JavaPairRDD usersProducts)
>
> to fill out the matrix. Since I have a large number of user and products,
>
> the job stucks and transforming all pairs.
>
>
> I wonder if there is a better way to do this.
>
> Thanks,
>
> Crystal.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Concurrent batch processing

2015-02-12 Thread Tathagata Das
So you have come across spark.streaming.concurrentJobs already :)
Yeah, that is an undocumented feature that does allow multiple output
operations to submitted in parallel. However, this is not made public for
the exact reasons that you realized - the semantics in case of stateful
operations is not clear. It is semantically safe to enable, how ever it may
cause redundant computations, as next batches jobs may recompute some RDDs
twice rather than using the cached values of another RDDs.

In general, only in a very few cases is it useful to increase this
concurrency. If batch processing times > batch interval, then you need to
use more resources, and parallelize the ingestion and processing enough to
utilize those resources efficiently.
The spikes that you see despite average hardware utilization is low
probably indicates that the parallelization of the Spark Streaming jobs is
insufficient. There are bunch of optimizations that can be done.
http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-processing-time-of-each-batch

If you have already done this, can you tell me more about what sort of
utilization and psike do you see, and what sort of parallelization you have
already done?

TD

On Thu, Feb 12, 2015 at 12:09 PM, Matus Faro  wrote:

> I've been experimenting with my configuration for couple of days and
> gained quite a bit of power through small optimizations, but it may very
> well be something I'm doing crazy that is causing this problem.
>
> To give a little bit of a background, I am in the early stages of a
> project that consumes a stream of data in the order of 100,000 per second
> that requires processing over a sliding window over one day (ideally a
> week). Spark Streaming is a good candidate but I want to make sure I squash
> any performance issues ahead of time before I commit.
>
> With a 5 second batch size, in 40 minutes, the processing time is also 5
> seconds. I see the CPU spikes over two seconds out of five. I assume the
> sliding window operation is very expensive in this case and that's the root
> cause of this effect.
>
> I should've done a little bit more research before I posted, I just came
> across a post about an undocumented property spark.streaming.concurrentJobs
> that I am about to try. I'm still confused how exactly this works with a
> sliding window where the result of one batch depends on the other. I assume
> the concurrency can only be achieved up until the window action is
> executed. Either way, I am going to give this a try and post back here if
> that doesn't work.
>
> Thanks!
>
>
>
> On Thu, Feb 12, 2015 at 2:55 PM, Arush Kharbanda <
> ar...@sigmoidanalytics.com> wrote:
>
>> It could depend on the nature of your application but spark streaming
>> would use spark internally and concurrency should be there what is your use
>> case?
>>
>>
>> Are you sure that your configuration is good?
>>
>>
>> On Fri, Feb 13, 2015 at 1:17 AM, Matus Faro  wrote:
>>
>>> Hi,
>>>
>>> Please correct me if I'm wrong, in Spark Streaming, next batch will
>>> not start processing until the previous batch has completed. Is there
>>> any way to be able to start processing the next batch if the previous
>>> batch is taking longer to process than the batch interval?
>>>
>>> The problem I am facing is that I don't see a hardware bottleneck in
>>> my Spark cluster, but Spark is not able to handle the amount of data I
>>> am pumping through (batch processing time is longer than batch
>>> interval). What I'm seeing is spikes of CPU, network and disk IO usage
>>> which I assume are due to different stages of a job, but on average,
>>> the hardware is under utilized. Concurrency in batch processing would
>>> allow the average batch processing time to be greater than batch
>>> interval while fully utilizing the hardware.
>>>
>>> Any ideas on what can be done? One option I can think of is to split
>>> the application into multiple applications running concurrently and
>>> dividing the initial stream of data between those applications.
>>> However, I would have to lose the benefits of having a single
>>> application.
>>>
>>> Thank you,
>>> Matus
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>>
>> --
>>
>> [image: Sigmoid Analytics] 
>>
>> *Arush Kharbanda* || Technical Teamlead
>>
>> ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
>>
>
>


Re: Can spark job server be used to visualize streaming data?

2015-02-12 Thread Silvio Fiorito
One method I’ve used is to publish each batch to a message bus or queue with a 
custom UI listening on the other end, displaying the results in d3.js or some 
other app. As far as I’m aware there isn’t a tool that will directly take a 
DStream.

Spark Notebook seems to have some support for updating graphs periodically. I 
haven’t used it myself yet so not sure how well it works. See here: 
https://github.com/andypetrella/spark-notebook

From: Su She
Date: Thursday, February 12, 2015 at 1:55 AM
To: Felix C
Cc: Kelvin Chu, "user@spark.apache.org"
Subject: Re: Can spark job server be used to visualize streaming data?

Hello Felix,

I am already streaming in very simple data using Kafka (few messages / second, 
each record only has 3 columns...really simple, but looking to scale once I 
connect everything). I am processing it in Spark Streaming and am currently 
writing word counts to hdfs. So the part where I am confused is...

Kafka Publishes Data -> Kafka Consumer/Spark Streaming Receives Data -> Spark 
Word Count -> How do I visualize?

is there a viz tool that I can set up to visualize JavaPairDStreams? or do I 
have to write to hbase/hdfs first?

Thanks!

On Wed, Feb 11, 2015 at 10:39 PM, Felix C 
mailto:felixcheun...@hotmail.com>> wrote:
What kind of data do you have? Kafka is a popular source to use with spark 
streaming.
But, spark streaming also support reading from a file. Its called basic source
https://spark.apache.org/docs/latest/streaming-programming-guide.html#input-dstreams-and-receivers

--- Original Message ---

From: "Su She" mailto:suhsheka...@gmail.com>>
Sent: February 11, 2015 10:23 AM
To: "Felix C" mailto:felixcheun...@hotmail.com>>
Cc: "Kelvin Chu" <2dot7kel...@gmail.com>, 
user@spark.apache.org
Subject: Re: Can spark job server be used to visualize streaming data?

Thank you Felix and Kelvin. I think I'll def be using the k-means tools in mlib.

It seems the best way to stream data is by storing in hbase and then using an 
api in my viz to extract data? Does anyone have any thoughts on this?

Thanks!


On Tue, Feb 10, 2015 at 11:45 PM, Felix C 
mailto:felixcheun...@hotmail.com>> wrote:
Checkout
https://databricks.com/blog/2015/01/28/introducing-streaming-k-means-in-spark-1-2.html

In there are links to how that is done.


--- Original Message ---

From: "Kelvin Chu" <2dot7kel...@gmail.com>
Sent: February 10, 2015 12:48 PM
To: "Su She" mailto:suhsheka...@gmail.com>>
Cc: user@spark.apache.org
Subject: Re: Can spark job server be used to visualize streaming data?

Hi Su,

Out of the box, no. But, I know people integrate it with Spark Streaming to do 
real-time visualization. It will take some work though.

Kelvin

On Mon, Feb 9, 2015 at 5:04 PM, Su She 
mailto:suhsheka...@gmail.com>> wrote:
Hello Everyone,

I was reading this blog post: 
http://homes.esat.kuleuven.be/~bioiuser/blog/a-d3-visualisation-from-spark-as-a-service/

and was wondering if this approach can be taken to visualize streaming 
data...not just historical data?

Thank you!

-Suh





Is there a fast way to do fast top N product recommendations for all users

2015-02-12 Thread Crystal Xing
Hi,


I wonder if there is a way to do fast top N product recommendations for all
users in training using mllib's ALS algorithm.

I am currently calling

public Rating 
[]
recommendProducts(int user,
 int num)

method in MatrixFactorizatoinModel for users one by one
and it is quite slow since it does not operate on RDD input?

I also tried to generate all possible
user-product pairs
*and use*public JavaRDD
http://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/mllib/recommendation/Rating.html>>
predict(JavaPairRDD

usersProducts)

to fill out the matrix. Since I have a large number of user and products,

the job stucks and transforming all pairs.


I wonder if there is a better way to do this.

Thanks,

Crystal.


Re: No executors allocated on yarn with latest master branch

2015-02-12 Thread Anders Arpteg
The nm logs only seems to contain similar to the following. Nothing else in
the same time range. Any help?

2015-02-12 20:47:31,245 WARN
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
Event EventType: KILL_CONTAINER sent to absent container
container_1422406067005_0053_01_02
2015-02-12 20:47:31,246 WARN
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
Event EventType: KILL_CONTAINER sent to absent container
container_1422406067005_0053_01_12
2015-02-12 20:47:31,246 WARN
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
Event EventType: KILL_CONTAINER sent to absent container
container_1422406067005_0053_01_22
2015-02-12 20:47:31,246 WARN
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
Event EventType: KILL_CONTAINER sent to absent container
container_1422406067005_0053_01_32
2015-02-12 20:47:31,246 WARN
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
Event EventType: KILL_CONTAINER sent to absent container
container_1422406067005_0053_01_42
2015-02-12 21:24:30,515 WARN
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
Event EventType: FINISH_APPLICATION sent to absent application
application_1422406067005_0053

On Thu, Feb 12, 2015 at 10:38 PM, Sandy Ryza 
wrote:

> It seems unlikely to me that it would be a 2.2 issue, though not entirely
> impossible.  Are you able to find any of the container logs?  Is the
> NodeManager launching containers and reporting some exit code?
>
> -Sandy
>
> On Thu, Feb 12, 2015 at 1:21 PM, Anders Arpteg  wrote:
>
>> No, not submitting from windows, from a debian distribution. Had a quick
>> look at the rm logs, and it seems some containers are allocated but then
>> released again for some reason. Not easy to make sense of the logs, but
>> here is a snippet from the logs (from a test in our small test cluster) if
>> you'd like to have a closer look: http://pastebin.com/8WU9ivqC
>>
>> Sandy, sounds like it could possible be a 2.2 issue then, or what do you
>> think?
>>
>> Thanks,
>> Anders
>>
>> On Thu, Feb 12, 2015 at 3:11 PM, Aniket Bhatnagar <
>> aniket.bhatna...@gmail.com> wrote:
>>
>>> This is tricky to debug. Check logs of node and resource manager of YARN
>>> to see if you can trace the error. In the past I have to closely look at
>>> arguments getting passed to YARN container (they get logged before
>>> attempting to launch containers). If I still don't get a clue, I had to
>>> check the script generated by YARN to execute the container and even run
>>> manually to trace at what line the error has occurred.
>>>
>>> BTW are you submitting the job from windows?
>>>
>>> On Thu, Feb 12, 2015, 3:34 PM Anders Arpteg  wrote:
>>>
 Interesting to hear that it works for you. Are you using Yarn 2.2 as
 well? No strange log message during startup, and can't see any other log
 messages since no executer gets launched. Does not seems to work in
 yarn-client mode either, failing with the exception below.

 Exception in thread "main" org.apache.spark.SparkException: Yarn
 application has already ended! It might have been killed or unable to
 launch application master.
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:119)
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59)
 at
 org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141)
 at org.apache.spark.SparkContext.(SparkContext.scala:370)
 at
 com.spotify.analytics.AnalyticsSparkContext.(AnalyticsSparkContext.scala:8)
 at com.spotify.analytics.DataSampler$.main(DataSampler.scala:42)
 at com.spotify.analytics.DataSampler.main(DataSampler.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
 at java.lang.reflect.Method.invoke(Method.java:597)
 at
 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:551)
 at
 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:155)
 at
 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:178)
 at
 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:99)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

 /Anders


 On Thu, Feb 12, 2015 at 1:33 AM, Sandy Ryza 
 wrote:

> Hi Anders,
>
> I just tried this out and was able to successfully acquire executors.
> Any stra

Re: Can spark job server be used to visualize streaming data?

2015-02-12 Thread Felix C
You would probably write to hdfs or check out 
https://databricks.com/blog/2015/01/28/introducing-streaming-k-means-in-spark-1-2.html

You might be able to retrofit it to you use case.

--- Original Message ---

From: "Su She" 
Sent: February 11, 2015 10:55 PM
To: "Felix C" 
Cc: "Kelvin Chu" <2dot7kel...@gmail.com>, user@spark.apache.org
Subject: Re: Can spark job server be used to visualize streaming data?

Hello Felix,

I am already streaming in very simple data using Kafka (few messages /
second, each record only has 3 columns...really simple, but looking to
scale once I connect everything). I am processing it in Spark Streaming and
am currently writing word counts to hdfs. So the part where I am confused
is...

Kafka Publishes Data -> Kafka Consumer/Spark Streaming Receives Data ->
Spark Word Count -> *How do I visualize?*

is there a viz tool that I can set up to visualize JavaPairDStreams? or do
I have to write to hbase/hdfs first?

Thanks!

On Wed, Feb 11, 2015 at 10:39 PM, Felix C  wrote:

>  What kind of data do you have? Kafka is a popular source to use with
> spark streaming.
> But, spark streaming also support reading from a file. Its called basic
> source
>
> https://spark.apache.org/docs/latest/streaming-programming-guide.html#input-dstreams-and-receivers
>
> --- Original Message ---
>
> From: "Su She" 
> Sent: February 11, 2015 10:23 AM
> To: "Felix C" 
> Cc: "Kelvin Chu" <2dot7kel...@gmail.com>, user@spark.apache.org
> Subject: Re: Can spark job server be used to visualize streaming data?
>
>  Thank you Felix and Kelvin. I think I'll def be using the k-means tools
> in mlib.
>
>  It seems the best way to stream data is by storing in hbase and then
> using an api in my viz to extract data? Does anyone have any thoughts on
> this?
>
>   Thanks!
>
>
> On Tue, Feb 10, 2015 at 11:45 PM, Felix C 
> wrote:
>
>  Checkout
>
> https://databricks.com/blog/2015/01/28/introducing-streaming-k-means-in-spark-1-2.html
>
> In there are links to how that is done.
>
>
> --- Original Message ---
>
> From: "Kelvin Chu" <2dot7kel...@gmail.com>
> Sent: February 10, 2015 12:48 PM
> To: "Su She" 
> Cc: user@spark.apache.org
> Subject: Re: Can spark job server be used to visualize streaming data?
>
>   Hi Su,
>
>  Out of the box, no. But, I know people integrate it with Spark Streaming
> to do real-time visualization. It will take some work though.
>
>  Kelvin
>
> On Mon, Feb 9, 2015 at 5:04 PM, Su She  wrote:
>
>  Hello Everyone,
>
>  I was reading this blog post:
> http://homes.esat.kuleuven.be/~bioiuser/blog/a-d3-visualisation-from-spark-as-a-service/
>
>  and was wondering if this approach can be taken to visualize streaming
> data...not just historical data?
>
>  Thank you!
>
>  -Suh
>
>
>
>


Re: spark, reading from s3

2015-02-12 Thread Kane Kim
Looks like my clock is in sync:

-bash-4.1$ date && curl -v s3.amazonaws.com
Thu Feb 12 21:40:18 UTC 2015
* About to connect() to s3.amazonaws.com port 80 (#0)
*   Trying 54.231.12.24... connected
* Connected to s3.amazonaws.com (54.231.12.24) port 80 (#0)
> GET / HTTP/1.1
> User-Agent: curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/
3.14.0.0 zlib/1.2.3 libidn/1.18 libssh2/1.4.2
> Host: s3.amazonaws.com
> Accept: */*
>
< HTTP/1.1 307 Temporary Redirect
< x-amz-id-2:
sl8Tg81ZnBj3tD7Q9f2KFBBZKC83TbAUieHJu9IA3PrBibvB3M7NpwAlfTi/Tdwg
< x-amz-request-id: 48C14DF82BE1A970
< Date: Thu, 12 Feb 2015 21:40:19 GMT
< Location: http://aws.amazon.com/s3/
< Content-Length: 0
< Server: AmazonS3


On Thu, Feb 12, 2015 at 12:26 PM, Franc Carter  wrote:

>
> Check that your timezone is correct as well, an incorrect timezone can
> make it look like your time is correct when it is skewed.
>
> cheers
>
> On Fri, Feb 13, 2015 at 5:51 AM, Kane Kim  wrote:
>
>> The thing is that my time is perfectly valid...
>>
>> On Tue, Feb 10, 2015 at 10:50 PM, Akhil Das 
>> wrote:
>>
>>> Its with the timezone actually, you can either use an NTP to maintain
>>> accurate system clock or you can adjust your system time to match with the
>>> AWS one. You can do it as:
>>>
>>> telnet s3.amazonaws.com 80
>>> GET / HTTP/1.0
>>>
>>>
>>> [image: Inline image 1]
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Wed, Feb 11, 2015 at 6:43 AM, Kane Kim  wrote:
>>>
 I'm getting this warning when using s3 input:
 15/02/11 00:58:37 WARN RestStorageService: Adjusted time offset in
 response to
 RequestTimeTooSkewed error. Local machine and S3 server disagree on the
 time by approximately 0 seconds. Retrying connection.

 After that there are tons of 403/forbidden errors and then job fails.
 It's sporadic, so sometimes I get this error and sometimes not, what
 could be the issue?
 I think it could be related to network connectivity?

>>>
>>>
>>
>
>
> --
>
> *Franc Carter* | Systems Architect | Rozetta Technology
>
> franc.car...@rozettatech.com  |
> www.rozettatechnology.com
>
> Tel: +61 2 8355 2515
>
> Level 4, 55 Harrington St, The Rocks NSW 2000
>
> PO Box H58, Australia Square, Sydney NSW 1215
>
> AUSTRALIA
>
>


Re: Easy way to "partition" an RDD into chunks like Guava's Iterables.partition

2015-02-12 Thread Corey Nolet
So I tried this:

.mapPartitions(itr => {
itr.grouped(300).flatMap(items => {
myFunction(items)
})
})

and I tried this:

.mapPartitions(itr => {
itr.grouped(300).flatMap(myFunction)
})

 I tried making myFunction a method, a function val, and even moving it
into a singleton object.

The closure cleaner throws Task not serliazable exceptions with a distance
outer class whenever I do this.

Just to test, I tried this:

.flatMap(it => myFunction(Seq(it)))

And it worked just fine. What am I doing wrong here?

Also, my function is a little more complicated and it does take arguments
that depend on the class actually manipulating the RDD- but why would it
work fine with a single flatMap and not with mapPartitions? I am somewhat
new to Scala and maybe I'm missing something here.

On Wed, Feb 11, 2015 at 5:59 PM, Mark Hamstra 
wrote:

> No, only each group should need to fit.
>
> On Wed, Feb 11, 2015 at 2:56 PM, Corey Nolet  wrote:
>
>> Doesn't iter still need to fit entirely into memory?
>>
>> On Wed, Feb 11, 2015 at 5:55 PM, Mark Hamstra 
>> wrote:
>>
>>> rdd.mapPartitions { iter =>
>>>   val grouped = iter.grouped(batchSize)
>>>   for (group <- grouped) { ... }
>>> }
>>>
>>> On Wed, Feb 11, 2015 at 2:44 PM, Corey Nolet  wrote:
>>>
 I think the word "partition" here is a tad different than the term
 "partition" that we use in Spark. Basically, I want something similar to
 Guava's Iterables.partition [1], that is, If I have an RDD[People] and I
 want to run an algorithm that can be optimized by working on 30 people at a
 time, I'd like to be able to say:

 val rdd: RDD[People] = .
 val partitioned: RDD[Seq[People]] = rdd.partition(30)

 I also don't want any shuffling- everything can still be processed
 locally.


 [1]
 http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/collect/Iterables.html#partition(java.lang.Iterable,%20int)

>>>
>>>
>>
>


Re: No executors allocated on yarn with latest master branch

2015-02-12 Thread Sandy Ryza
It seems unlikely to me that it would be a 2.2 issue, though not entirely
impossible.  Are you able to find any of the container logs?  Is the
NodeManager launching containers and reporting some exit code?

-Sandy

On Thu, Feb 12, 2015 at 1:21 PM, Anders Arpteg  wrote:

> No, not submitting from windows, from a debian distribution. Had a quick
> look at the rm logs, and it seems some containers are allocated but then
> released again for some reason. Not easy to make sense of the logs, but
> here is a snippet from the logs (from a test in our small test cluster) if
> you'd like to have a closer look: http://pastebin.com/8WU9ivqC
>
> Sandy, sounds like it could possible be a 2.2 issue then, or what do you
> think?
>
> Thanks,
> Anders
>
> On Thu, Feb 12, 2015 at 3:11 PM, Aniket Bhatnagar <
> aniket.bhatna...@gmail.com> wrote:
>
>> This is tricky to debug. Check logs of node and resource manager of YARN
>> to see if you can trace the error. In the past I have to closely look at
>> arguments getting passed to YARN container (they get logged before
>> attempting to launch containers). If I still don't get a clue, I had to
>> check the script generated by YARN to execute the container and even run
>> manually to trace at what line the error has occurred.
>>
>> BTW are you submitting the job from windows?
>>
>> On Thu, Feb 12, 2015, 3:34 PM Anders Arpteg  wrote:
>>
>>> Interesting to hear that it works for you. Are you using Yarn 2.2 as
>>> well? No strange log message during startup, and can't see any other log
>>> messages since no executer gets launched. Does not seems to work in
>>> yarn-client mode either, failing with the exception below.
>>>
>>> Exception in thread "main" org.apache.spark.SparkException: Yarn
>>> application has already ended! It might have been killed or unable to
>>> launch application master.
>>> at
>>> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:119)
>>> at
>>> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59)
>>> at
>>> org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141)
>>> at org.apache.spark.SparkContext.(SparkContext.scala:370)
>>> at
>>> com.spotify.analytics.AnalyticsSparkContext.(AnalyticsSparkContext.scala:8)
>>> at com.spotify.analytics.DataSampler$.main(DataSampler.scala:42)
>>> at com.spotify.analytics.DataSampler.main(DataSampler.scala)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>>> at java.lang.reflect.Method.invoke(Method.java:597)
>>> at
>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:551)
>>> at
>>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:155)
>>> at
>>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:178)
>>> at
>>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:99)
>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>
>>> /Anders
>>>
>>>
>>> On Thu, Feb 12, 2015 at 1:33 AM, Sandy Ryza 
>>> wrote:
>>>
 Hi Anders,

 I just tried this out and was able to successfully acquire executors.
 Any strange log messages or additional color you can provide on your
 setup?  Does yarn-client mode work?

 -Sandy

 On Wed, Feb 11, 2015 at 1:28 PM, Anders Arpteg 
 wrote:

> Hi,
>
> Compiled the latest master of Spark yesterday (2015-02-10) for Hadoop
> 2.2 and failed executing jobs in yarn-cluster mode for that build. Works
> successfully with spark 1.2 (and also master from 2015-01-16), so 
> something
> has changed since then that prevents the job from receiving any executors
> on the cluster.
>
> Basic symptoms are that the jobs fires up the AM, but after examining
> the "executors" page in the web ui, only the driver is listed, no
> executors are ever received, and the driver keep waiting forever. Has
> anyone seemed similar problems?
>
> Thanks for any insights,
> Anders
>


>>>
>


Re: No executors allocated on yarn with latest master branch

2015-02-12 Thread Anders Arpteg
No, not submitting from windows, from a debian distribution. Had a quick
look at the rm logs, and it seems some containers are allocated but then
released again for some reason. Not easy to make sense of the logs, but
here is a snippet from the logs (from a test in our small test cluster) if
you'd like to have a closer look: http://pastebin.com/8WU9ivqC

Sandy, sounds like it could possible be a 2.2 issue then, or what do you
think?

Thanks,
Anders

On Thu, Feb 12, 2015 at 3:11 PM, Aniket Bhatnagar <
aniket.bhatna...@gmail.com> wrote:

> This is tricky to debug. Check logs of node and resource manager of YARN
> to see if you can trace the error. In the past I have to closely look at
> arguments getting passed to YARN container (they get logged before
> attempting to launch containers). If I still don't get a clue, I had to
> check the script generated by YARN to execute the container and even run
> manually to trace at what line the error has occurred.
>
> BTW are you submitting the job from windows?
>
> On Thu, Feb 12, 2015, 3:34 PM Anders Arpteg  wrote:
>
>> Interesting to hear that it works for you. Are you using Yarn 2.2 as
>> well? No strange log message during startup, and can't see any other log
>> messages since no executer gets launched. Does not seems to work in
>> yarn-client mode either, failing with the exception below.
>>
>> Exception in thread "main" org.apache.spark.SparkException: Yarn
>> application has already ended! It might have been killed or unable to
>> launch application master.
>> at
>> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:119)
>> at
>> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59)
>> at
>> org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141)
>> at org.apache.spark.SparkContext.(SparkContext.scala:370)
>> at
>> com.spotify.analytics.AnalyticsSparkContext.(AnalyticsSparkContext.scala:8)
>> at com.spotify.analytics.DataSampler$.main(DataSampler.scala:42)
>> at com.spotify.analytics.DataSampler.main(DataSampler.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>> at java.lang.reflect.Method.invoke(Method.java:597)
>> at
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:551)
>> at
>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:155)
>> at
>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:178)
>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:99)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>> /Anders
>>
>>
>> On Thu, Feb 12, 2015 at 1:33 AM, Sandy Ryza 
>> wrote:
>>
>>> Hi Anders,
>>>
>>> I just tried this out and was able to successfully acquire executors.
>>> Any strange log messages or additional color you can provide on your
>>> setup?  Does yarn-client mode work?
>>>
>>> -Sandy
>>>
>>> On Wed, Feb 11, 2015 at 1:28 PM, Anders Arpteg 
>>> wrote:
>>>
 Hi,

 Compiled the latest master of Spark yesterday (2015-02-10) for Hadoop
 2.2 and failed executing jobs in yarn-cluster mode for that build. Works
 successfully with spark 1.2 (and also master from 2015-01-16), so something
 has changed since then that prevents the job from receiving any executors
 on the cluster.

 Basic symptoms are that the jobs fires up the AM, but after examining
 the "executors" page in the web ui, only the driver is listed, no
 executors are ever received, and the driver keep waiting forever. Has
 anyone seemed similar problems?

 Thanks for any insights,
 Anders

>>>
>>>
>>


RE: PySpark 1.2 Hadoop version mismatch

2015-02-12 Thread Michael Nazario
I looked at the environment which I ran the spark-submit command in, and it 
looks like there is nothing that could be messing with the classpath.

Just to be sure, I checked the web UI which says the classpath contains:
- The two jars I added: /path/to/avro-mapred-1.7.4-hadoop2.jar and 
lib/spark-examples-1.2.0-hadoop2.0.0-cdh4.7.0.jar
- The spark assembly jar in the same location: 
/path/to/spark/lib/spark-assembly-1.2.0-hadoop2.0.0-cdh4.7.0.jar
- The conf folder: /path/to/spark/conf
- The python script I was running


From: Sean Owen [so...@cloudera.com]
Sent: Thursday, February 12, 2015 12:13 AM
To: Akhil Das
Cc: Michael Nazario; user@spark.apache.org
Subject: Re: PySpark 1.2 Hadoop version mismatch

No, "mr1" should not be the issue here, and I think that would break
other things. The OP is not using mr1.

client 4 / server 7 means roughly "client is Hadoop 1.x, server is
Hadoop 2.0.x". Normally, I'd say I think you are packaging Hadoop code
in your app by brining in Spark and its deps. Your app shouldn't have
any of this code.

If you're running straight examples though, I'm less sure. If
anything, your client is later than your server. I wonder if you have
anything else set on your local classpath via env variables?


On Thu, Feb 12, 2015 at 6:52 AM, Akhil Das  wrote:
> Did you have a look at
> https://urldefense.proofpoint.com/v2/url?u=http-3A__spark.apache.org_docs_1.2.0_building-2Dspark.html&d=AwIBaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=yN4Yj1JskMkGMKoYoLUUIQViRLGShPc1wislP1YdU4g&m=qSlV9TOMGsnfA_9xycNM5biA5h11naL5ZuLVhMrxpHQ&s=vWYYgDt86TQENpK2Il3JBZHTEqQe3_bRp4TA83PUjkc&e=
>
> I think you can simply download the source and build for your hadoop version
> as:
>
> mvn -Dhadoop.version=2.0.0-mr1-cdh4.7.0 -DskipTests clean package
>
>
> Thanks
> Best Regards
>
> On Thu, Feb 12, 2015 at 11:45 AM, Michael Nazario 
> wrote:
>>
>> I also forgot some other information. I have made this error go away by
>> making my pyspark application use spark-1.1.1-bin-cdh4 for the driver, but
>> communicate with a spark 1.2 master and worker. It's not a good workaround,
>> so I would like to have the driver also be spark 1.2
>>
>> Michael
>> 
>> From: Michael Nazario
>> Sent: Wednesday, February 11, 2015 10:13 PM
>> To: user@spark.apache.org
>> Subject: PySpark 1.2 Hadoop version mismatch
>>
>> Hi Spark users,
>>
>> I seem to be having this consistent error which I have been trying to
>> reproduce and narrow down the problem. I've been running a PySpark
>> application on Spark 1.2 reading avro files from Hadoop. I was consistently
>> seeing the following error:
>>
>> py4j.protocol.Py4JJavaError: An error occurred while calling
>> z:org.apache.spark.api.python.PythonRDD.newAPIHadoopFile.
>> : org.apache.hadoop.ipc.RemoteException: Server IPC version 7 cannot
>> communicate with client version 4
>>
>> After some searching, I noticed that this most likely meant my hadoop
>> versions were mismatched. I had the following versions at the time:
>>
>> Hadoop: hadoop-2.0.0-cdh4.7.0
>> Spark: spark-1.2.0-bin-cdh4.2.0
>>
>> In the past, I've never had a problem with this setup for Spark 1.1.1 or
>> Spark 1.0.2. I figured it was worth me rebuilding Spark in case I was wrong
>> about versions. To rebuild my Spark, I ran this command on the v1.2.0 tag:
>>
>> ./make-distribution.sh -Dhadoop.version=2.0.0-cdh4.7.0
>>
>> I then retried my previously mentioned application with this new build of
>> Spark. Same error.
>>
>> To narrow down the problem some more, I figured I should try out the
>> example which comes with spark which allows you to load an avro file. I ran
>> the below command (I know it uses a deprecated way of passing jars to the
>> driver classpath):
>>
>>
>> SPARK_CLASSPATH="/path/to/avro-mapred-1.7.4-hadoop2.jar:lib/spark-examples-1.2.0-hadoop2.0.0-cdh4.7.0.jar:$SPARK_CLASSPATH"
>> bin/spark-submit ./examples/src/main/python/avro_inputformat.py
>> "hdfs://localhost:8020/path/to/file.avro"
>>
>> I ended up with the same error. The full stacktrace is below.
>>
>> Traceback (most recent call last):
>>   File "/git/spark/dist/./examples/src/main/python/avro_inputformat.py",
>> line 77, in 
>> conf=conf)
>>   File "/git/spark/dist/python/pyspark/context.py", line 503, in
>> newAPIHadoopFile
>> jconf, batchSize)
>>   File
>> "/git/spark/dist/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line
>> 538, in __call__
>>   File "/git/spark/dist/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
>> line 300, in get_return_value
>> py4j.protocol.Py4JJavaError: An error occurred while calling
>> z:org.apache.spark.api.python.PythonRDD.newAPIHadoopFile.
>> : org.apache.hadoop.ipc.RemoteException: Server IPC version 7 cannot
>> communicate with client version 4
>> at org.apache.hadoop.ipc.Client.call(Client.java:1113)
>> at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:229)
>> at com.sun.proxy.$Proxy8.getProtocolVersion(Unkn

Re: Master dies after program finishes normally

2015-02-12 Thread Imran Rashid
The important thing here is the master's memory, that's where you're
getting the GC overhead limit.  The master is updating its UI to include
your finished app when your app finishes, which would cause a spike in
memory usage.

I wouldn't expect the master to need a ton of memory just to serve the UI
for a modest number of small apps, but maybe some of your apps have a lot
of jobs, stages, or tasks.  And there is always lots of overhead from the
jvm, so bumping it up might help.

On Thu, Feb 12, 2015 at 1:25 PM, Manas Kar 
wrote:

> I have 5 workers each executor-memory 8GB of memory. My driver memory is 8
> GB as well. They are all 8 core machines.
>
> To answer Imran's question my configurations are thus.
> executor_total_max_heapsize = 18GB
> This problem happens at the end of my program.
>
> I don't have to run a lot of jobs to see this behaviour.
> I can see my output correctly in HDFS and all.
> I will give it one more try after increasing master's memory(which is
> default 296MB to 512 MB)
>
> ..manas
>
> On Thu, Feb 12, 2015 at 2:14 PM, Arush Kharbanda <
> ar...@sigmoidanalytics.com> wrote:
>
>> How many nodes do you have in your cluster, how many cores, what is the
>> size of the memory?
>>
>> On Fri, Feb 13, 2015 at 12:42 AM, Manas Kar 
>> wrote:
>>
>>> Hi Arush,
>>>  Mine is a CDH5.3 with Spark 1.2.
>>> The only change to my spark programs are
>>> -Dspark.driver.maxResultSize=3g -Dspark.akka.frameSize=1000.
>>>
>>> ..Manas
>>>
>>> On Thu, Feb 12, 2015 at 2:05 PM, Arush Kharbanda <
>>> ar...@sigmoidanalytics.com> wrote:
>>>
 What is your cluster configuration? Did you try looking at the Web UI?
 There are many tips here

 http://spark.apache.org/docs/1.2.0/tuning.html

 Did you try these?

 On Fri, Feb 13, 2015 at 12:09 AM, Manas Kar >>> > wrote:

> Hi,
>  I have a Hidden Markov Model running with 200MB data.
>  Once the program finishes (i.e. all stages/jobs are done) the program
> hangs for 20 minutes or so before killing master.
>
> In the spark master the following log appears.
>
> 2015-02-12 13:00:05,035 ERROR akka.actor.ActorSystemImpl: Uncaught
> fatal error from thread [sparkMaster-akka.actor.default-dispatcher-31]
> shutting down ActorSystem [sparkMaster]
> java.lang.OutOfMemoryError: GC overhead limit exceeded
> at scala.collection.immutable.List$.newBuilder(List.scala:396)
> at
> scala.collection.generic.GenericTraversableTemplate$class.genericBuilder(GenericTraversableTemplate.scala:69)
> at
> scala.collection.AbstractTraversable.genericBuilder(Traversable.scala:105)
> at
> scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:58)
> at
> scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:53)
> at
> scala.collection.TraversableLike$class.builder$1(TraversableLike.scala:239)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:243)
> at
> scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at
> org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:26)
> at
> org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:22)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> at
> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> at org.json4s.MonadicJValue.org
> $json4s$MonadicJValue$$findDirectByName(MonadicJValue.scala:22)
> at org.json4s.MonadicJValue.$bslash(MonadicJValue.scala:16)
> at
> org.apache.spark.util.JsonProtocol$.taskStartFromJson(JsonProtocol.scala:450)
> at
> org.apache.spark.util.JsonProtocol$.sparkEventFromJson(JsonProtocol.scala:423)
> at
> org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:71)
> at
> org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:69)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:69)
> at
> org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala

Re: How to do broadcast join in SparkSQL

2015-02-12 Thread Dima Zhiyanov
Hello 

Has Spark implemented computing statistics for Parquet files? Or is there
any other way I can enable broadcast joins between parquet file RDDs in
Spark Sql? 

Thanks 
Dima



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-do-broadcast-join-in-SparkSQL-tp15298p21632.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark, reading from s3

2015-02-12 Thread Franc Carter
Check that your timezone is correct as well, an incorrect timezone can make
it look like your time is correct when it is skewed.

cheers

On Fri, Feb 13, 2015 at 5:51 AM, Kane Kim  wrote:

> The thing is that my time is perfectly valid...
>
> On Tue, Feb 10, 2015 at 10:50 PM, Akhil Das 
> wrote:
>
>> Its with the timezone actually, you can either use an NTP to maintain
>> accurate system clock or you can adjust your system time to match with the
>> AWS one. You can do it as:
>>
>> telnet s3.amazonaws.com 80
>> GET / HTTP/1.0
>>
>>
>> [image: Inline image 1]
>>
>> Thanks
>> Best Regards
>>
>> On Wed, Feb 11, 2015 at 6:43 AM, Kane Kim  wrote:
>>
>>> I'm getting this warning when using s3 input:
>>> 15/02/11 00:58:37 WARN RestStorageService: Adjusted time offset in
>>> response to
>>> RequestTimeTooSkewed error. Local machine and S3 server disagree on the
>>> time by approximately 0 seconds. Retrying connection.
>>>
>>> After that there are tons of 403/forbidden errors and then job fails.
>>> It's sporadic, so sometimes I get this error and sometimes not, what
>>> could be the issue?
>>> I think it could be related to network connectivity?
>>>
>>
>>
>


-- 

*Franc Carter* | Systems Architect | Rozetta Technology

franc.car...@rozettatech.com  |
www.rozettatechnology.com

Tel: +61 2 8355 2515

Level 4, 55 Harrington St, The Rocks NSW 2000

PO Box H58, Australia Square, Sydney NSW 1215

AUSTRALIA


Re: Why can't Spark find the classes in this Jar?

2015-02-12 Thread Sandy Ryza
What version of Java are you using?  Core NLP dropped support for Java 7 in
its 3.5.0 release.

Also, the correct command line option is --jars, not --addJars.

On Thu, Feb 12, 2015 at 12:03 PM, Deborah Siegel 
wrote:

> Hi Abe,
> I'm new to Spark as well, so someone else could answer better. A few
> thoughts which may or may not be the right line of thinking..
>
> 1) Spark properties can be set on the SparkConf, and with flags in
> spark-submit, but settings on SparkConf take precedence. I think your jars
> flag for spark-submit may be redundant.
>
> 1) Is there a chance that stanford-corenlp-3.5.0.jar relies on other
> dependencies? I could be wrong, but perhaps if there is no other reason not
> to, try building your application as an uber-jar with a build tool like
> Maven, which will package the whole transitive jar. You can find
> stanford-corenlp on maven central .. I think you would add the below
> dependencies to your pom.xml. After building simple-project-1.0.jar with
> these dependencies, you would not set jars on the sc or jar flags on
> spark-submit.
>
> 
> 
> edu.stanford.nlp
> stanford-corenlp
> 3.5.0
> 
> 
> edu.stanford.nlp
> stanford-corenlp
> 3.5.0
> models
> 
> 
>
> HTH.
> Deb
>
> On Tue, Feb 10, 2015 at 1:12 PM, Abe Handler  wrote:
>
>> I am new to spark. I am trying to compile and run a spark application that
>> requires classes from an (external) jar file on my local machine. If I
>> open
>> the jar (on ~/Desktop) I can see the missing class in the local jar but
>> when
>> I run spark I get
>>
>> NoClassDefFoundError: edu/stanford/nlp/ie/AbstractSequenceClassifier
>>
>> I add the jar to the spark context like this
>>
>> String[] jars = {"/home/pathto/Desktop/stanford-corenlp-3.5.0.jar"};
>> SparkConf conf = new SparkConf().setAppName("Simple
>> Application").setJars(jars);
>> Then I try to run a submit script like this
>>
>> /home/me/Downloads/spark-1.2.0-bin-hadoop2.4/bin/spark-submit \
>>   --class "SimpleApp" \
>>   --master local[4] \
>>   target/simple-project-1.0.jar \
>>   --jars local[4] /home/abe/Desktop/stanford-corenlp-3.5.0.jar
>> and hit the NoClassDefFoundError.
>>
>> I get that this means that the worker threads can't find the class from
>> the
>> jar. But I am not sure what I am doing wrong. I have tried different
>> syntaxes for the last line (below) but none works.
>>
>>   --addJars local[4] /home/abe/Desktop/stanford-corenlp-3.5.0.jar
>>   --addJars local:/home/abe/Desktop/stanford-corenlp-3.5.0.jar
>>   --addJars local:/home/abe/Desktop/stanford-corenlp-3.5.0.jar
>>
>> How can I fix this error?
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Why-can-t-Spark-find-the-classes-in-this-Jar-tp21584.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Concurrent batch processing

2015-02-12 Thread Matus Faro
I've been experimenting with my configuration for couple of days and gained
quite a bit of power through small optimizations, but it may very well be
something I'm doing crazy that is causing this problem.

To give a little bit of a background, I am in the early stages of a project
that consumes a stream of data in the order of 100,000 per second that
requires processing over a sliding window over one day (ideally a week).
Spark Streaming is a good candidate but I want to make sure I squash any
performance issues ahead of time before I commit.

With a 5 second batch size, in 40 minutes, the processing time is also 5
seconds. I see the CPU spikes over two seconds out of five. I assume the
sliding window operation is very expensive in this case and that's the root
cause of this effect.

I should've done a little bit more research before I posted, I just came
across a post about an undocumented property spark.streaming.concurrentJobs
that I am about to try. I'm still confused how exactly this works with a
sliding window where the result of one batch depends on the other. I assume
the concurrency can only be achieved up until the window action is
executed. Either way, I am going to give this a try and post back here if
that doesn't work.

Thanks!



On Thu, Feb 12, 2015 at 2:55 PM, Arush Kharbanda  wrote:

> It could depend on the nature of your application but spark streaming
> would use spark internally and concurrency should be there what is your use
> case?
>
>
> Are you sure that your configuration is good?
>
>
> On Fri, Feb 13, 2015 at 1:17 AM, Matus Faro  wrote:
>
>> Hi,
>>
>> Please correct me if I'm wrong, in Spark Streaming, next batch will
>> not start processing until the previous batch has completed. Is there
>> any way to be able to start processing the next batch if the previous
>> batch is taking longer to process than the batch interval?
>>
>> The problem I am facing is that I don't see a hardware bottleneck in
>> my Spark cluster, but Spark is not able to handle the amount of data I
>> am pumping through (batch processing time is longer than batch
>> interval). What I'm seeing is spikes of CPU, network and disk IO usage
>> which I assume are due to different stages of a job, but on average,
>> the hardware is under utilized. Concurrency in batch processing would
>> allow the average batch processing time to be greater than batch
>> interval while fully utilizing the hardware.
>>
>> Any ideas on what can be done? One option I can think of is to split
>> the application into multiple applications running concurrently and
>> dividing the initial stream of data between those applications.
>> However, I would have to lose the benefits of having a single
>> application.
>>
>> Thank you,
>> Matus
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
> --
>
> [image: Sigmoid Analytics] 
>
> *Arush Kharbanda* || Technical Teamlead
>
> ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
>


Re: Why can't Spark find the classes in this Jar?

2015-02-12 Thread Deborah Siegel
Hi Abe,
I'm new to Spark as well, so someone else could answer better. A few
thoughts which may or may not be the right line of thinking..

1) Spark properties can be set on the SparkConf, and with flags in
spark-submit, but settings on SparkConf take precedence. I think your jars
flag for spark-submit may be redundant.

1) Is there a chance that stanford-corenlp-3.5.0.jar relies on other
dependencies? I could be wrong, but perhaps if there is no other reason not
to, try building your application as an uber-jar with a build tool like
Maven, which will package the whole transitive jar. You can find
stanford-corenlp on maven central .. I think you would add the below
dependencies to your pom.xml. After building simple-project-1.0.jar with
these dependencies, you would not set jars on the sc or jar flags on
spark-submit.



edu.stanford.nlp
stanford-corenlp
3.5.0


edu.stanford.nlp
stanford-corenlp
3.5.0
models



HTH.
Deb

On Tue, Feb 10, 2015 at 1:12 PM, Abe Handler  wrote:

> I am new to spark. I am trying to compile and run a spark application that
> requires classes from an (external) jar file on my local machine. If I open
> the jar (on ~/Desktop) I can see the missing class in the local jar but
> when
> I run spark I get
>
> NoClassDefFoundError: edu/stanford/nlp/ie/AbstractSequenceClassifier
>
> I add the jar to the spark context like this
>
> String[] jars = {"/home/pathto/Desktop/stanford-corenlp-3.5.0.jar"};
> SparkConf conf = new SparkConf().setAppName("Simple
> Application").setJars(jars);
> Then I try to run a submit script like this
>
> /home/me/Downloads/spark-1.2.0-bin-hadoop2.4/bin/spark-submit \
>   --class "SimpleApp" \
>   --master local[4] \
>   target/simple-project-1.0.jar \
>   --jars local[4] /home/abe/Desktop/stanford-corenlp-3.5.0.jar
> and hit the NoClassDefFoundError.
>
> I get that this means that the worker threads can't find the class from the
> jar. But I am not sure what I am doing wrong. I have tried different
> syntaxes for the last line (below) but none works.
>
>   --addJars local[4] /home/abe/Desktop/stanford-corenlp-3.5.0.jar
>   --addJars local:/home/abe/Desktop/stanford-corenlp-3.5.0.jar
>   --addJars local:/home/abe/Desktop/stanford-corenlp-3.5.0.jar
>
> How can I fix this error?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Why-can-t-Spark-find-the-classes-in-this-Jar-tp21584.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


correct way to broadcast a variable

2015-02-12 Thread freedafeng
Suppose I have an object to broadcast and then use it in a mapper function,
sth like follows, (Python codes)

obj2share = sc.broadcast("Some object here")

someRdd.map(createMapper(obj2share)).collect()

The createMapper function will create a mapper function using the shared
object's value. Another way to do this is

someRdd.map(createMapper(obj2share.value)).collect()

Here the creatMapper function directly uses the shared object to create the
mapper function. Is there a difference from spark side for the two methods?
If there is no difference at all, I'd prefer the second, because it hides
the spark from the createMapper function. 

Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/correct-way-to-broadcast-a-variable-tp21631.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Concurrent batch processing

2015-02-12 Thread Arush Kharbanda
It could depend on the nature of your application but spark streaming would
use spark internally and concurrency should be there what is your use case?


Are you sure that your configuration is good?


On Fri, Feb 13, 2015 at 1:17 AM, Matus Faro  wrote:

> Hi,
>
> Please correct me if I'm wrong, in Spark Streaming, next batch will
> not start processing until the previous batch has completed. Is there
> any way to be able to start processing the next batch if the previous
> batch is taking longer to process than the batch interval?
>
> The problem I am facing is that I don't see a hardware bottleneck in
> my Spark cluster, but Spark is not able to handle the amount of data I
> am pumping through (batch processing time is longer than batch
> interval). What I'm seeing is spikes of CPU, network and disk IO usage
> which I assume are due to different stages of a job, but on average,
> the hardware is under utilized. Concurrency in batch processing would
> allow the average batch processing time to be greater than batch
> interval while fully utilizing the hardware.
>
> Any ideas on what can be done? One option I can think of is to split
> the application into multiple applications running concurrently and
> dividing the initial stream of data between those applications.
> However, I would have to lose the benefits of having a single
> application.
>
> Thank you,
> Matus
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 

[image: Sigmoid Analytics] 

*Arush Kharbanda* || Technical Teamlead

ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


Concurrent batch processing

2015-02-12 Thread Matus Faro
Hi,

Please correct me if I'm wrong, in Spark Streaming, next batch will
not start processing until the previous batch has completed. Is there
any way to be able to start processing the next batch if the previous
batch is taking longer to process than the batch interval?

The problem I am facing is that I don't see a hardware bottleneck in
my Spark cluster, but Spark is not able to handle the amount of data I
am pumping through (batch processing time is longer than batch
interval). What I'm seeing is spikes of CPU, network and disk IO usage
which I assume are due to different stages of a job, but on average,
the hardware is under utilized. Concurrency in batch processing would
allow the average batch processing time to be greater than batch
interval while fully utilizing the hardware.

Any ideas on what can be done? One option I can think of is to split
the application into multiple applications running concurrently and
dividing the initial stream of data between those applications.
However, I would have to lose the benefits of having a single
application.

Thank you,
Matus

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Master dies after program finishes normally

2015-02-12 Thread Manas Kar
I have 5 workers each executor-memory 8GB of memory. My driver memory is 8
GB as well. They are all 8 core machines.

To answer Imran's question my configurations are thus.

executor_total_max_heapsize = 18GB

This problem happens at the end of my program.

I don't have to run a lot of jobs to see this behaviour.

I can see my output correctly in HDFS and all.

I will give it one more try after increasing master's memory(which is
default 296MB to 512 MB)


Re: Master dies after program finishes normally

2015-02-12 Thread Manas Kar
I have 5 workers each executor-memory 8GB of memory. My driver memory is 8
GB as well. They are all 8 core machines.

To answer Imran's question my configurations are thus.
executor_total_max_heapsize = 18GB
This problem happens at the end of my program.

I don't have to run a lot of jobs to see this behaviour.
I can see my output correctly in HDFS and all.
I will give it one more try after increasing master's memory(which is
default 296MB to 512 MB)

..manas

On Thu, Feb 12, 2015 at 2:14 PM, Arush Kharbanda  wrote:

> How many nodes do you have in your cluster, how many cores, what is the
> size of the memory?
>
> On Fri, Feb 13, 2015 at 12:42 AM, Manas Kar 
> wrote:
>
>> Hi Arush,
>>  Mine is a CDH5.3 with Spark 1.2.
>> The only change to my spark programs are
>> -Dspark.driver.maxResultSize=3g -Dspark.akka.frameSize=1000.
>>
>> ..Manas
>>
>> On Thu, Feb 12, 2015 at 2:05 PM, Arush Kharbanda <
>> ar...@sigmoidanalytics.com> wrote:
>>
>>> What is your cluster configuration? Did you try looking at the Web UI?
>>> There are many tips here
>>>
>>> http://spark.apache.org/docs/1.2.0/tuning.html
>>>
>>> Did you try these?
>>>
>>> On Fri, Feb 13, 2015 at 12:09 AM, Manas Kar 
>>> wrote:
>>>
 Hi,
  I have a Hidden Markov Model running with 200MB data.
  Once the program finishes (i.e. all stages/jobs are done) the program
 hangs for 20 minutes or so before killing master.

 In the spark master the following log appears.

 2015-02-12 13:00:05,035 ERROR akka.actor.ActorSystemImpl: Uncaught
 fatal error from thread [sparkMaster-akka.actor.default-dispatcher-31]
 shutting down ActorSystem [sparkMaster]
 java.lang.OutOfMemoryError: GC overhead limit exceeded
 at scala.collection.immutable.List$.newBuilder(List.scala:396)
 at
 scala.collection.generic.GenericTraversableTemplate$class.genericBuilder(GenericTraversableTemplate.scala:69)
 at
 scala.collection.AbstractTraversable.genericBuilder(Traversable.scala:105)
 at
 scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:58)
 at
 scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:53)
 at
 scala.collection.TraversableLike$class.builder$1(TraversableLike.scala:239)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:243)
 at
 scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at
 org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:26)
 at
 org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:22)
 at
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
 at
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
 at
 scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
 at org.json4s.MonadicJValue.org
 $json4s$MonadicJValue$$findDirectByName(MonadicJValue.scala:22)
 at org.json4s.MonadicJValue.$bslash(MonadicJValue.scala:16)
 at
 org.apache.spark.util.JsonProtocol$.taskStartFromJson(JsonProtocol.scala:450)
 at
 org.apache.spark.util.JsonProtocol$.sparkEventFromJson(JsonProtocol.scala:423)
 at
 org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:71)
 at
 org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:69)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at
 scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:69)
 at
 org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:55)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at
 scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
 at
 org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:55)
 at
 org.apache.spark.deploy.master.Master.rebuildSparkUI(Master.scala:726)
 at
 org.apache.spark.deploy.master.Master.removeApplication(Master.scala:675)
 at
 org.apache.spark.deploy.master.Master.finishApplication(Master.scala:653)
 at
 org.apache.spark.deploy.master.Master$$anonfun$receiveWithLogging$1$$anonfun$

Re: Master dies after program finishes normally

2015-02-12 Thread Arush Kharbanda
How many nodes do you have in your cluster, how many cores, what is the
size of the memory?

On Fri, Feb 13, 2015 at 12:42 AM, Manas Kar 
wrote:

> Hi Arush,
>  Mine is a CDH5.3 with Spark 1.2.
> The only change to my spark programs are
> -Dspark.driver.maxResultSize=3g -Dspark.akka.frameSize=1000.
>
> ..Manas
>
> On Thu, Feb 12, 2015 at 2:05 PM, Arush Kharbanda <
> ar...@sigmoidanalytics.com> wrote:
>
>> What is your cluster configuration? Did you try looking at the Web UI?
>> There are many tips here
>>
>> http://spark.apache.org/docs/1.2.0/tuning.html
>>
>> Did you try these?
>>
>> On Fri, Feb 13, 2015 at 12:09 AM, Manas Kar 
>> wrote:
>>
>>> Hi,
>>>  I have a Hidden Markov Model running with 200MB data.
>>>  Once the program finishes (i.e. all stages/jobs are done) the program
>>> hangs for 20 minutes or so before killing master.
>>>
>>> In the spark master the following log appears.
>>>
>>> 2015-02-12 13:00:05,035 ERROR akka.actor.ActorSystemImpl: Uncaught fatal
>>> error from thread [sparkMaster-akka.actor.default-dispatcher-31] shutting
>>> down ActorSystem [sparkMaster]
>>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>>> at scala.collection.immutable.List$.newBuilder(List.scala:396)
>>> at
>>> scala.collection.generic.GenericTraversableTemplate$class.genericBuilder(GenericTraversableTemplate.scala:69)
>>> at
>>> scala.collection.AbstractTraversable.genericBuilder(Traversable.scala:105)
>>> at
>>> scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:58)
>>> at
>>> scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:53)
>>> at
>>> scala.collection.TraversableLike$class.builder$1(TraversableLike.scala:239)
>>> at
>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:243)
>>> at
>>> scala.collection.AbstractTraversable.map(Traversable.scala:105)
>>> at
>>> org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:26)
>>> at
>>> org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:22)
>>> at
>>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>>> at
>>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>>> at scala.collection.immutable.List.foreach(List.scala:318)
>>> at
>>> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>>> at
>>> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>>> at org.json4s.MonadicJValue.org
>>> $json4s$MonadicJValue$$findDirectByName(MonadicJValue.scala:22)
>>> at org.json4s.MonadicJValue.$bslash(MonadicJValue.scala:16)
>>> at
>>> org.apache.spark.util.JsonProtocol$.taskStartFromJson(JsonProtocol.scala:450)
>>> at
>>> org.apache.spark.util.JsonProtocol$.sparkEventFromJson(JsonProtocol.scala:423)
>>> at
>>> org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:71)
>>> at
>>> org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:69)
>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>> at
>>> org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:69)
>>> at
>>> org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:55)
>>> at
>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>> at
>>> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
>>> at
>>> org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:55)
>>> at
>>> org.apache.spark.deploy.master.Master.rebuildSparkUI(Master.scala:726)
>>> at
>>> org.apache.spark.deploy.master.Master.removeApplication(Master.scala:675)
>>> at
>>> org.apache.spark.deploy.master.Master.finishApplication(Master.scala:653)
>>> at
>>> org.apache.spark.deploy.master.Master$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$29.apply(Master.scala:399)
>>>
>>> Can anyone help?
>>>
>>> ..Manas
>>>
>>
>>
>>
>> --
>>
>> [image: Sigmoid Analytics] 
>>
>> *Arush Kharbanda* || Technical Teamlead
>>
>> ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
>>
>
>


-- 

[image: Sigmoid Analytics] 

*Arush Kharbanda* || Technical Teamlead

ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


Re: Master dies after program finishes normally

2015-02-12 Thread Manas Kar
Hi Arush,
 Mine is a CDH5.3 with Spark 1.2.
The only change to my spark programs are
-Dspark.driver.maxResultSize=3g -Dspark.akka.frameSize=1000.

..Manas

On Thu, Feb 12, 2015 at 2:05 PM, Arush Kharbanda  wrote:

> What is your cluster configuration? Did you try looking at the Web UI?
> There are many tips here
>
> http://spark.apache.org/docs/1.2.0/tuning.html
>
> Did you try these?
>
> On Fri, Feb 13, 2015 at 12:09 AM, Manas Kar 
> wrote:
>
>> Hi,
>>  I have a Hidden Markov Model running with 200MB data.
>>  Once the program finishes (i.e. all stages/jobs are done) the program
>> hangs for 20 minutes or so before killing master.
>>
>> In the spark master the following log appears.
>>
>> 2015-02-12 13:00:05,035 ERROR akka.actor.ActorSystemImpl: Uncaught fatal
>> error from thread [sparkMaster-akka.actor.default-dispatcher-31] shutting
>> down ActorSystem [sparkMaster]
>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>> at scala.collection.immutable.List$.newBuilder(List.scala:396)
>> at
>> scala.collection.generic.GenericTraversableTemplate$class.genericBuilder(GenericTraversableTemplate.scala:69)
>> at
>> scala.collection.AbstractTraversable.genericBuilder(Traversable.scala:105)
>> at
>> scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:58)
>> at
>> scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:53)
>> at
>> scala.collection.TraversableLike$class.builder$1(TraversableLike.scala:239)
>> at
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:243)
>> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>> at
>> org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:26)
>> at
>> org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:22)
>> at
>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>> at
>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at
>> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>> at
>> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>> at org.json4s.MonadicJValue.org
>> $json4s$MonadicJValue$$findDirectByName(MonadicJValue.scala:22)
>> at org.json4s.MonadicJValue.$bslash(MonadicJValue.scala:16)
>> at
>> org.apache.spark.util.JsonProtocol$.taskStartFromJson(JsonProtocol.scala:450)
>> at
>> org.apache.spark.util.JsonProtocol$.sparkEventFromJson(JsonProtocol.scala:423)
>> at
>> org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:71)
>> at
>> org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:69)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> at
>> org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:69)
>> at
>> org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:55)
>> at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> at
>> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
>> at
>> org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:55)
>> at
>> org.apache.spark.deploy.master.Master.rebuildSparkUI(Master.scala:726)
>> at
>> org.apache.spark.deploy.master.Master.removeApplication(Master.scala:675)
>> at
>> org.apache.spark.deploy.master.Master.finishApplication(Master.scala:653)
>> at
>> org.apache.spark.deploy.master.Master$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$29.apply(Master.scala:399)
>>
>> Can anyone help?
>>
>> ..Manas
>>
>
>
>
> --
>
> [image: Sigmoid Analytics] 
>
> *Arush Kharbanda* || Technical Teamlead
>
> ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
>


Re: Master dies after program finishes normally

2015-02-12 Thread Arush Kharbanda
What is your cluster configuration? Did you try looking at the Web UI?
There are many tips here

http://spark.apache.org/docs/1.2.0/tuning.html

Did you try these?

On Fri, Feb 13, 2015 at 12:09 AM, Manas Kar 
wrote:

> Hi,
>  I have a Hidden Markov Model running with 200MB data.
>  Once the program finishes (i.e. all stages/jobs are done) the program
> hangs for 20 minutes or so before killing master.
>
> In the spark master the following log appears.
>
> 2015-02-12 13:00:05,035 ERROR akka.actor.ActorSystemImpl: Uncaught fatal
> error from thread [sparkMaster-akka.actor.default-dispatcher-31] shutting
> down ActorSystem [sparkMaster]
> java.lang.OutOfMemoryError: GC overhead limit exceeded
> at scala.collection.immutable.List$.newBuilder(List.scala:396)
> at
> scala.collection.generic.GenericTraversableTemplate$class.genericBuilder(GenericTraversableTemplate.scala:69)
> at
> scala.collection.AbstractTraversable.genericBuilder(Traversable.scala:105)
> at
> scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:58)
> at
> scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:53)
> at
> scala.collection.TraversableLike$class.builder$1(TraversableLike.scala:239)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:243)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at
> org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:26)
> at
> org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:22)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> at
> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> at org.json4s.MonadicJValue.org
> $json4s$MonadicJValue$$findDirectByName(MonadicJValue.scala:22)
> at org.json4s.MonadicJValue.$bslash(MonadicJValue.scala:16)
> at
> org.apache.spark.util.JsonProtocol$.taskStartFromJson(JsonProtocol.scala:450)
> at
> org.apache.spark.util.JsonProtocol$.sparkEventFromJson(JsonProtocol.scala:423)
> at
> org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:71)
> at
> org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:69)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:69)
> at
> org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:55)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at
> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
> at
> org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:55)
> at
> org.apache.spark.deploy.master.Master.rebuildSparkUI(Master.scala:726)
> at
> org.apache.spark.deploy.master.Master.removeApplication(Master.scala:675)
> at
> org.apache.spark.deploy.master.Master.finishApplication(Master.scala:653)
> at
> org.apache.spark.deploy.master.Master$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$29.apply(Master.scala:399)
>
> Can anyone help?
>
> ..Manas
>



-- 

[image: Sigmoid Analytics] 

*Arush Kharbanda* || Technical Teamlead

ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


Re: spark, reading from s3

2015-02-12 Thread Kane Kim
The thing is that my time is perfectly valid...

On Tue, Feb 10, 2015 at 10:50 PM, Akhil Das 
wrote:

> Its with the timezone actually, you can either use an NTP to maintain
> accurate system clock or you can adjust your system time to match with the
> AWS one. You can do it as:
>
> telnet s3.amazonaws.com 80
> GET / HTTP/1.0
>
>
> [image: Inline image 1]
>
> Thanks
> Best Regards
>
> On Wed, Feb 11, 2015 at 6:43 AM, Kane Kim  wrote:
>
>> I'm getting this warning when using s3 input:
>> 15/02/11 00:58:37 WARN RestStorageService: Adjusted time offset in
>> response to
>> RequestTimeTooSkewed error. Local machine and S3 server disagree on the
>> time by approximately 0 seconds. Retrying connection.
>>
>> After that there are tons of 403/forbidden errors and then job fails.
>> It's sporadic, so sometimes I get this error and sometimes not, what
>> could be the issue?
>> I think it could be related to network connectivity?
>>
>
>


Master dies after program finishes normally

2015-02-12 Thread Manas Kar
Hi,
 I have a Hidden Markov Model running with 200MB data.
 Once the program finishes (i.e. all stages/jobs are done) the program
hangs for 20 minutes or so before killing master.

In the spark master the following log appears.

2015-02-12 13:00:05,035 ERROR akka.actor.ActorSystemImpl: Uncaught fatal
error from thread [sparkMaster-akka.actor.default-dispatcher-31] shutting
down ActorSystem [sparkMaster]
java.lang.OutOfMemoryError: GC overhead limit exceeded
at scala.collection.immutable.List$.newBuilder(List.scala:396)
at
scala.collection.generic.GenericTraversableTemplate$class.genericBuilder(GenericTraversableTemplate.scala:69)
at
scala.collection.AbstractTraversable.genericBuilder(Traversable.scala:105)
at
scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:58)
at
scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:53)
at
scala.collection.TraversableLike$class.builder$1(TraversableLike.scala:239)
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:243)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at
org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:26)
at
org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:22)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.immutable.List.foreach(List.scala:318)
at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at
scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at org.json4s.MonadicJValue.org
$json4s$MonadicJValue$$findDirectByName(MonadicJValue.scala:22)
at org.json4s.MonadicJValue.$bslash(MonadicJValue.scala:16)
at
org.apache.spark.util.JsonProtocol$.taskStartFromJson(JsonProtocol.scala:450)
at
org.apache.spark.util.JsonProtocol$.sparkEventFromJson(JsonProtocol.scala:423)
at
org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:71)
at
org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:69)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:69)
at
org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:55)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at
org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:55)
at
org.apache.spark.deploy.master.Master.rebuildSparkUI(Master.scala:726)
at
org.apache.spark.deploy.master.Master.removeApplication(Master.scala:675)
at
org.apache.spark.deploy.master.Master.finishApplication(Master.scala:653)
at
org.apache.spark.deploy.master.Master$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$29.apply(Master.scala:399)

Can anyone help?

..Manas


Re: can we insert and update with spark sql

2015-02-12 Thread Debasish Das
I thought more on it...can we provide access to the IndexedRDD through
thriftserver API and let the mapPartitions query the API ? I am not sure if
ThriftServer is as performant as opening up an API using other akka based
frameworks (like play or spray)...

Any pointers will be really helpful...

Neither play nor spray is being used in Spark right nowso it brings
dependencies and we already know about the akka conflicts...thriftserver on
the other hand is already integrated for JDBC access


On Tue, Feb 10, 2015 at 3:43 PM, Debasish Das 
wrote:

> Also I wanted to run get() and set() from mapPartitions (from spark
> workers and not master)...
>
> To be able to do that I think I have to create a separate spark context
> for the cache...
>
> But I am not sure how SparkContext from job1 can access SparkContext from
> job2 !
>
>
> On Tue, Feb 10, 2015 at 3:25 PM, Debasish Das 
> wrote:
>
>> Thanks...this is what I was looking for...
>>
>> It will be great if Ankur can give brief details about it...Basically how
>> does it contrast with memcached for example...
>>
>> On Tue, Feb 10, 2015 at 2:32 PM, Michael Armbrust > > wrote:
>>
>>> You should look at https://github.com/amplab/spark-indexedrdd
>>>
>>> On Tue, Feb 10, 2015 at 2:27 PM, Debasish Das 
>>> wrote:
>>>
 Hi Michael,

 I want to cache a RDD and define get() and set() operators on it.
 Basically like memcached. Is it possible to build a memcached like
 distributed cache using Spark SQL ? If not what do you suggest we should
 use for such operations...

 Thanks.
 Deb

 On Fri, Jul 18, 2014 at 1:00 PM, Michael Armbrust <
 mich...@databricks.com> wrote:

> You can do insert into.  As with other SQL on HDFS systems there is no
> updating of data.
> On Jul 17, 2014 1:26 AM, "Akhil Das" 
> wrote:
>
>> Is this what you are looking for?
>>
>>
>> https://spark.apache.org/docs/1.0.0/api/java/org/apache/spark/sql/parquet/InsertIntoParquetTable.html
>>
>> According to the doc, it says "Operator that acts as a sink for
>> queries on RDDs and can be used to store the output inside a directory of
>> Parquet files. This operator is similar to Hive's INSERT INTO TABLE
>> operation in the sense that one can choose to either overwrite or append 
>> to
>> a directory. Note that consecutive insertions to the same table must have
>> compatible (source) schemas."
>>
>> Thanks
>> Best Regards
>>
>>
>> On Thu, Jul 17, 2014 at 11:42 AM, Hu, Leo  wrote:
>>
>>>  Hi
>>>
>>>As for spark 1.0, can we insert and update a table with SPARK
>>> SQL, and how?
>>>
>>>
>>>
>>> Thanks
>>>
>>> Best Regard
>>>
>>
>>

>>>
>>
>


Re: Shuffle on joining two RDDs

2015-02-12 Thread Davies Liu
The feature works as expected in Scala/Java, but not implemented in Python.

On Thu, Feb 12, 2015 at 9:24 AM, Imran Rashid  wrote:
> I wonder if the issue is that these lines just need to add
> preservesPartitioning = true
> ?
>
> https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38
>
> I am getting the feeling this is an issue w/ pyspark
>
>
> On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid  wrote:
>>
>> ah, sorry I am not too familiar w/ pyspark, sorry I missed that part.  It
>> could be that pyspark doesn't properly support narrow dependencies, or maybe
>> you need to be more explicit about the partitioner.  I am looking into the
>> pyspark api but you might have some better guesses here than I thought.
>>
>> My suggestion to do
>>
>> joinedRdd.getPartitions.foreach{println}
>>
>> was just to see if the partition was a NarrowCoGroupSplitDep or a
>> ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, those fields
>> are hidden deeper inside and are not user-visible.  But I think a better way
>> (in scala, anyway) is to look at rdd.dependencies.  its a little tricky,
>> though, you need to look deep into the lineage (example at the end).
>>
>> Sean -- yes it does require both RDDs have the same partitioner, but that
>> should happen naturally if you just specify the same number of partitions,
>> you'll get equal HashPartitioners.  There is a little difference in the
>> scala & python api that I missed here.  For partitionBy in scala, you
>> actually need to specify the partitioner, but not in python.  However I
>> thought it would work like groupByKey, which does just take an int.
>>
>>
>> Here's a code example in scala -- not sure what is available from python.
>> Hopefully somebody knows a simpler way to confirm narrow dependencies??
>>
>>> val d = sc.parallelize(1 to 1e6.toInt).map{x => x -> x}.groupByKey(64)
>>> val d2 = sc.parallelize(3 to 1e6.toInt).map{x => x -> x}.groupByKey(64)
>>> scala> d.partitioner == d2.partitioner
>>> res2: Boolean = true
>>> val joined = d.join(d2)
>>> val d3 = sc.parallelize(3 to 1e6.toInt).map{x => x -> x}.groupByKey(100)
>>> val badJoined = d.join(d3)
>>>
>>> d.setName("d")
>>> d2.setName("d2")
>>> d3.setName("d3")
>>> joined.setName("joined")
>>> badJoined.setName("badJoined")
>>>
>>>
>>> //unfortunatley, just looking at the immediate dependencies of joined &
>>> badJoined is misleading, b/c join actually creates
>>> // one more step after the shuffle
>>> scala> joined.dependencies
>>> res20: Seq[org.apache.spark.Dependency[_]] =
>>> List(org.apache.spark.OneToOneDependency@74751ac8)
>>> //even with the join that does require a shuffle, we still see a
>>> OneToOneDependency, but thats just a simple flatMap step
>>> scala> badJoined.dependencies
>>> res21: Seq[org.apache.spark.Dependency[_]] =
>>> List(org.apache.spark.OneToOneDependency@1cf356cc)
>>
>>
>>
>>>
>>>  //so lets make a helper function to get all the dependencies recursively
>>>
>>> def flattenDeps(rdd: RDD[_]): Seq[(RDD[_], Dependency[_])] = {
>>>   val deps = rdd.dependencies
>>>   deps.map{rdd -> _} ++ deps.flatMap{dep => flattenDeps(dep.rdd)}
>>> }
>>>
>>>
>>> //full dependencies of the good join
>>>
>>> scala> flattenDeps(joined).foreach{println}
>>> (joined FlatMappedValuesRDD[9] at join at
>>> :16,org.apache.spark.OneToOneDependency@74751ac8)
>>> (MappedValuesRDD[8] at join at
>>> :16,org.apache.spark.OneToOneDependency@623264af)
>>> (CoGroupedRDD[7] at join at
>>> :16,org.apache.spark.OneToOneDependency@5a704f86)
>>> (CoGroupedRDD[7] at join at
>>> :16,org.apache.spark.OneToOneDependency@37514cd)
>>> (d ShuffledRDD[3] at groupByKey at
>>> :12,org.apache.spark.ShuffleDependency@7ba8a080)
>>> (MappedRDD[2] at map at
>>> :12,org.apache.spark.OneToOneDependency@7bc172ec)
>>> (d2 ShuffledRDD[6] at groupByKey at
>>> :12,org.apache.spark.ShuffleDependency@5960236d)
>>> (MappedRDD[5] at map at
>>> :12,org.apache.spark.OneToOneDependency@36b5f6f2)
>>>
>>>
>>>
>>> //full dependencies of the bad join -- notice the ShuffleDependency!
>>>
>>> scala> flattenDeps(badJoined).foreach{println}
>>> (badJoined FlatMappedValuesRDD[15] at join at
>>> :16,org.apache.spark.OneToOneDependency@1cf356cc)
>>> (MappedValuesRDD[14] at join at
>>> :16,org.apache.spark.OneToOneDependency@5dea4db)
>>> (CoGroupedRDD[13] at join at
>>> :16,org.apache.spark.ShuffleDependency@5c1928df)
>>> (CoGroupedRDD[13] at join at
>>> :16,org.apache.spark.OneToOneDependency@77ca77b5)
>>> (d ShuffledRDD[3] at groupByKey at
>>> :12,org.apache.spark.ShuffleDependency@7ba8a080)
>>> (MappedRDD[2] at map at
>>> :12,org.apache.spark.OneToOneDependency@7bc172ec)
>>> (d3 ShuffledRDD[12] at groupByKey at
>>> :12,org.apache.spark.ShuffleDependency@d794984)
>>> (MappedRDD[11] at map at
>>> :12,org.apache.spark.OneToOneDependency@15c98005)
>>
>>
>>
>> On Thu, Feb 12, 2015 at 10:05 AM, Karlson  wrote:
>>>
>>> Hi Imran,
>>>
>>> thanks for your quick reply.
>>>
>>> Actually I am doing this:
>>>
>>> rddA = rddA.partitionBy(n).cache()

Re: Custom Kryo serializer

2015-02-12 Thread Corey Nolet
I was able to get this working by extending KryoRegistrator and setting the
"spark.kryo.registrator" property.

On Thu, Feb 12, 2015 at 12:31 PM, Corey Nolet  wrote:

> I'm trying to register a custom class that extends Kryo's Serializer
> interface. I can't tell exactly what Class the registerKryoClasses()
> function on the SparkConf is looking for.
>
> How do I register the Serializer class?
>


Re: No executors allocated on yarn with latest master branch

2015-02-12 Thread Sandy Ryza
I ran against 2.6, not 2.2.

For that yarn-client run, do you have the application master log?

On Thu, Feb 12, 2015 at 6:11 AM, Aniket Bhatnagar <
aniket.bhatna...@gmail.com> wrote:

> This is tricky to debug. Check logs of node and resource manager of YARN
> to see if you can trace the error. In the past I have to closely look at
> arguments getting passed to YARN container (they get logged before
> attempting to launch containers). If I still don't get a clue, I had to
> check the script generated by YARN to execute the container and even run
> manually to trace at what line the error has occurred.
>
> BTW are you submitting the job from windows?
>
> On Thu, Feb 12, 2015, 3:34 PM Anders Arpteg  wrote:
>
>> Interesting to hear that it works for you. Are you using Yarn 2.2 as
>> well? No strange log message during startup, and can't see any other log
>> messages since no executer gets launched. Does not seems to work in
>> yarn-client mode either, failing with the exception below.
>>
>> Exception in thread "main" org.apache.spark.SparkException: Yarn
>> application has already ended! It might have been killed or unable to
>> launch application master.
>> at
>> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:119)
>> at
>> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59)
>> at
>> org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141)
>> at org.apache.spark.SparkContext.(SparkContext.scala:370)
>> at
>> com.spotify.analytics.AnalyticsSparkContext.(AnalyticsSparkContext.scala:8)
>> at com.spotify.analytics.DataSampler$.main(DataSampler.scala:42)
>> at com.spotify.analytics.DataSampler.main(DataSampler.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>> at java.lang.reflect.Method.invoke(Method.java:597)
>> at
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:551)
>> at
>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:155)
>> at
>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:178)
>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:99)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>> /Anders
>>
>>
>> On Thu, Feb 12, 2015 at 1:33 AM, Sandy Ryza 
>> wrote:
>>
>>> Hi Anders,
>>>
>>> I just tried this out and was able to successfully acquire executors.
>>> Any strange log messages or additional color you can provide on your
>>> setup?  Does yarn-client mode work?
>>>
>>> -Sandy
>>>
>>> On Wed, Feb 11, 2015 at 1:28 PM, Anders Arpteg 
>>> wrote:
>>>
 Hi,

 Compiled the latest master of Spark yesterday (2015-02-10) for Hadoop
 2.2 and failed executing jobs in yarn-cluster mode for that build. Works
 successfully with spark 1.2 (and also master from 2015-01-16), so something
 has changed since then that prevents the job from receiving any executors
 on the cluster.

 Basic symptoms are that the jobs fires up the AM, but after examining
 the "executors" page in the web ui, only the driver is listed, no
 executors are ever received, and the driver keep waiting forever. Has
 anyone seemed similar problems?

 Thanks for any insights,
 Anders

>>>
>>>
>>


Custom Kryo serializer

2015-02-12 Thread Corey Nolet
I'm trying to register a custom class that extends Kryo's Serializer
interface. I can't tell exactly what Class the registerKryoClasses()
function on the SparkConf is looking for.

How do I register the Serializer class?


Re: Shuffle on joining two RDDs

2015-02-12 Thread Imran Rashid
I wonder if the issue is that these lines just need to add
preservesPartitioning = true
?

https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38

I am getting the feeling this is an issue w/ pyspark


On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid  wrote:

> ah, sorry I am not too familiar w/ pyspark, sorry I missed that part.  It
> could be that pyspark doesn't properly support narrow dependencies, or
> maybe you need to be more explicit about the partitioner.  I am looking
> into the pyspark api but you might have some better guesses here than I
> thought.
>
> My suggestion to do
>
> joinedRdd.getPartitions.foreach{println}
>
> was just to see if the partition was a NarrowCoGroupSplitDep or a
> ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, those
> fields are hidden deeper inside and are not user-visible.  But I think a
> better way (in scala, anyway) is to look at rdd.dependencies.  its a little
> tricky, though, you need to look deep into the lineage (example at the end).
>
> Sean -- yes it does require both RDDs have the same partitioner, but that
> should happen naturally if you just specify the same number of partitions,
> you'll get equal HashPartitioners.  There is a little difference in the
> scala & python api that I missed here.  For partitionBy in scala, you
> actually need to specify the partitioner, but not in python.  However I
> thought it would work like groupByKey, which does just take an int.
>
>
> Here's a code example in scala -- not sure what is available from python.
> Hopefully somebody knows a simpler way to confirm narrow dependencies??
>
> val d = sc.parallelize(1 to 1e6.toInt).map{x => x -> x}.groupByKey(64)
>> val d2 = sc.parallelize(3 to 1e6.toInt).map{x => x -> x}.groupByKey(64)
>> scala> d.partitioner == d2.partitioner
>> res2: Boolean = true
>> val joined = d.join(d2)
>> val d3 = sc.parallelize(3 to 1e6.toInt).map{x => x -> x}.groupByKey(100)
>> val badJoined = d.join(d3)
>>
>> d.setName("d")
>> d2.setName("d2")
>> d3.setName("d3")
>> joined.setName("joined")
>> badJoined.setName("badJoined")
>
>
>> //unfortunatley, just looking at the immediate dependencies of joined &
>> badJoined is misleading, b/c join actually creates
>> // one more step after the shuffle
>> scala> joined.dependencies
>> res20: Seq[org.apache.spark.Dependency[_]] =
>> List(org.apache.spark.OneToOneDependency@74751ac8)
>> //even with the join that does require a shuffle, we still see a
>> OneToOneDependency, but thats just a simple flatMap step
>> scala> badJoined.dependencies
>> res21: Seq[org.apache.spark.Dependency[_]] =
>> List(org.apache.spark.OneToOneDependency@1cf356cc)
>>
>
>
>
>>  //so lets make a helper function to get all the dependencies recursively
>>
> def flattenDeps(rdd: RDD[_]): Seq[(RDD[_], Dependency[_])] = {
>>   val deps = rdd.dependencies
>>   deps.map{rdd -> _} ++ deps.flatMap{dep => flattenDeps(dep.rdd)}
>> }
>>
>>
>> //full dependencies of the good join
>
> scala> flattenDeps(joined).foreach{println}
>> (joined FlatMappedValuesRDD[9] at join at
>> :16,org.apache.spark.OneToOneDependency@74751ac8)
>> (MappedValuesRDD[8] at join at
>> :16,org.apache.spark.OneToOneDependency@623264af)
>>
>> *(CoGroupedRDD[7] at join at
>> :16,org.apache.spark.OneToOneDependency@5a704f86)*(CoGroupedRDD[7]
>> at join at :16,org.apache.spark.OneToOneDependency@37514cd)
>> (d ShuffledRDD[3] at groupByKey at
>> :12,org.apache.spark.ShuffleDependency@7ba8a080)
>> (MappedRDD[2] at map at
>> :12,org.apache.spark.OneToOneDependency@7bc172ec)
>> (d2 ShuffledRDD[6] at groupByKey at
>> :12,org.apache.spark.ShuffleDependency@5960236d)
>> (MappedRDD[5] at map at
>> :12,org.apache.spark.OneToOneDependency@36b5f6f2)
>>
>>
>
>> //full dependencies of the bad join -- notice the ShuffleDependency!
>
> scala> flattenDeps(badJoined).foreach{println}
>> (badJoined FlatMappedValuesRDD[15] at join at
>> :16,org.apache.spark.OneToOneDependency@1cf356cc)
>> (MappedValuesRDD[14] at join at
>> :16,org.apache.spark.OneToOneDependency@5dea4db)
>>
>> *(CoGroupedRDD[13] at join at
>> :16,org.apache.spark.ShuffleDependency@5c1928df)*(CoGroupedRDD[13]
>> at join at :16,org.apache.spark.OneToOneDependency@77ca77b5)
>> (d ShuffledRDD[3] at groupByKey at
>> :12,org.apache.spark.ShuffleDependency@7ba8a080)
>> (MappedRDD[2] at map at
>> :12,org.apache.spark.OneToOneDependency@7bc172ec)
>> (d3 ShuffledRDD[12] at groupByKey at
>> :12,org.apache.spark.ShuffleDependency@d794984)
>> (MappedRDD[11] at map at
>> :12,org.apache.spark.OneToOneDependency@15c98005)
>
>
>
> On Thu, Feb 12, 2015 at 10:05 AM, Karlson  wrote:
>
>> Hi Imran,
>>
>> thanks for your quick reply.
>>
>> Actually I am doing this:
>>
>> rddA = rddA.partitionBy(n).cache()
>> rddB = rddB.partitionBy(n).cache()
>>
>> followed by
>>
>> rddA.count()
>> rddB.count()
>>
>> then joinedRDD = rddA.join(rddB)
>>
>> I thought that the count() would force the evaluation, so any subsequent
>> joins would be shuffleless. I was wrong about t

Re: 8080 port password protection

2015-02-12 Thread Arush Kharbanda
You could apply a password using a filter using a server. Though it dosnt
looks like the right grp for the question. It can be done for spark also
for Spark UI.

On Thu, Feb 12, 2015 at 10:19 PM, MASTER_ZION (Jairo Linux) <
master.z...@gmail.com> wrote:

> Hi everyone,
>
> Im creating a development machine in AWS and i would like to protect the
> port 8080 using a password.
>
> Is it possible?
>
>
> Best Regards
>
> *Jairo Moreno*
>



-- 

[image: Sigmoid Analytics] 

*Arush Kharbanda* || Technical Teamlead

ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


spark mllib error when predict on linear regression model

2015-02-12 Thread Donbeo
Hi,
I have a model and I am trying to predict regPoints.  Here is the code that
I have used. 
A more detailed question is available at 
http://stackoverflow.com/questions/28482476/spark-mllib-predict-error-with-map

scala> model
res26: org.apache.spark.mllib.regression.LinearRegressionModel =
(weights=[-4.00245512323736E-15,-7.110058964543731E-15,2.0790436644401968E-15,1.7497510523275056E-15,6.593638326021273E-15],
intercept=0.0)

scala> regPoints
res27:
org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] =
MappedRDD[32] at map at :54

//ERROR
scala> val y_predicted = regPoints map (point =>
model.predict(point.features))
15/02/12 16:14:45 INFO BlockManager: Removing broadcast 285
15/02/12 16:14:45 INFO BlockManager: Removing block broadcast_285_piece0
15/..


Thanks a lot,
Luca



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-mllib-error-when-predict-on-linear-regression-model-tp21629.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



8080 port password protection

2015-02-12 Thread MASTER_ZION (Jairo Linux)
Hi everyone,

Im creating a development machine in AWS and i would like to protect the
port 8080 using a password.

Is it possible?


Best Regards

*Jairo Moreno*


Re: Shuffle on joining two RDDs

2015-02-12 Thread Imran Rashid
ah, sorry I am not too familiar w/ pyspark, sorry I missed that part.  It
could be that pyspark doesn't properly support narrow dependencies, or
maybe you need to be more explicit about the partitioner.  I am looking
into the pyspark api but you might have some better guesses here than I
thought.

My suggestion to do

joinedRdd.getPartitions.foreach{println}

was just to see if the partition was a NarrowCoGroupSplitDep or a
ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, those
fields are hidden deeper inside and are not user-visible.  But I think a
better way (in scala, anyway) is to look at rdd.dependencies.  its a little
tricky, though, you need to look deep into the lineage (example at the end).

Sean -- yes it does require both RDDs have the same partitioner, but that
should happen naturally if you just specify the same number of partitions,
you'll get equal HashPartitioners.  There is a little difference in the
scala & python api that I missed here.  For partitionBy in scala, you
actually need to specify the partitioner, but not in python.  However I
thought it would work like groupByKey, which does just take an int.


Here's a code example in scala -- not sure what is available from python.
Hopefully somebody knows a simpler way to confirm narrow dependencies??

val d = sc.parallelize(1 to 1e6.toInt).map{x => x -> x}.groupByKey(64)
> val d2 = sc.parallelize(3 to 1e6.toInt).map{x => x -> x}.groupByKey(64)
> scala> d.partitioner == d2.partitioner
> res2: Boolean = true
> val joined = d.join(d2)
> val d3 = sc.parallelize(3 to 1e6.toInt).map{x => x -> x}.groupByKey(100)
> val badJoined = d.join(d3)
>
> d.setName("d")
> d2.setName("d2")
> d3.setName("d3")
> joined.setName("joined")
> badJoined.setName("badJoined")


> //unfortunatley, just looking at the immediate dependencies of joined &
> badJoined is misleading, b/c join actually creates
> // one more step after the shuffle
> scala> joined.dependencies
> res20: Seq[org.apache.spark.Dependency[_]] =
> List(org.apache.spark.OneToOneDependency@74751ac8)
> //even with the join that does require a shuffle, we still see a
> OneToOneDependency, but thats just a simple flatMap step
> scala> badJoined.dependencies
> res21: Seq[org.apache.spark.Dependency[_]] =
> List(org.apache.spark.OneToOneDependency@1cf356cc)
>



>  //so lets make a helper function to get all the dependencies recursively
>
def flattenDeps(rdd: RDD[_]): Seq[(RDD[_], Dependency[_])] = {
>   val deps = rdd.dependencies
>   deps.map{rdd -> _} ++ deps.flatMap{dep => flattenDeps(dep.rdd)}
> }
>
>
> //full dependencies of the good join

scala> flattenDeps(joined).foreach{println}
> (joined FlatMappedValuesRDD[9] at join at
> :16,org.apache.spark.OneToOneDependency@74751ac8)
> (MappedValuesRDD[8] at join at
> :16,org.apache.spark.OneToOneDependency@623264af)
>
> *(CoGroupedRDD[7] at join at
> :16,org.apache.spark.OneToOneDependency@5a704f86)*(CoGroupedRDD[7]
> at join at :16,org.apache.spark.OneToOneDependency@37514cd)
> (d ShuffledRDD[3] at groupByKey at
> :12,org.apache.spark.ShuffleDependency@7ba8a080)
> (MappedRDD[2] at map at
> :12,org.apache.spark.OneToOneDependency@7bc172ec)
> (d2 ShuffledRDD[6] at groupByKey at
> :12,org.apache.spark.ShuffleDependency@5960236d)
> (MappedRDD[5] at map at
> :12,org.apache.spark.OneToOneDependency@36b5f6f2)
>
>

> //full dependencies of the bad join -- notice the ShuffleDependency!

scala> flattenDeps(badJoined).foreach{println}
> (badJoined FlatMappedValuesRDD[15] at join at
> :16,org.apache.spark.OneToOneDependency@1cf356cc)
> (MappedValuesRDD[14] at join at
> :16,org.apache.spark.OneToOneDependency@5dea4db)
>
> *(CoGroupedRDD[13] at join at
> :16,org.apache.spark.ShuffleDependency@5c1928df)*(CoGroupedRDD[13]
> at join at :16,org.apache.spark.OneToOneDependency@77ca77b5)
> (d ShuffledRDD[3] at groupByKey at
> :12,org.apache.spark.ShuffleDependency@7ba8a080)
> (MappedRDD[2] at map at
> :12,org.apache.spark.OneToOneDependency@7bc172ec)
> (d3 ShuffledRDD[12] at groupByKey at
> :12,org.apache.spark.ShuffleDependency@d794984)
> (MappedRDD[11] at map at
> :12,org.apache.spark.OneToOneDependency@15c98005)



On Thu, Feb 12, 2015 at 10:05 AM, Karlson  wrote:

> Hi Imran,
>
> thanks for your quick reply.
>
> Actually I am doing this:
>
> rddA = rddA.partitionBy(n).cache()
> rddB = rddB.partitionBy(n).cache()
>
> followed by
>
> rddA.count()
> rddB.count()
>
> then joinedRDD = rddA.join(rddB)
>
> I thought that the count() would force the evaluation, so any subsequent
> joins would be shuffleless. I was wrong about the shuffle amounts however.
> The shuffle write is actually 2GB (i.e. the size of the bigger RDD) whil
> there is no Shuffle read. A joinedRdd.count() does a shuffle read of about
> 1GB in size, though.
>
> The getPartitions-method does not exist on the resulting RDD (I am using
> the Python API). There is however foreachPartition(). What is the line
>
> joinedRdd.getPartitions.foreach{println}
>
> supposed to do?
>
> Th

Re: Shuffle on joining two RDDs

2015-02-12 Thread Karlson

Hi,

I believe that partitionBy will use the same (default) partitioner on 
both RDDs.


On 2015-02-12 17:12, Sean Owen wrote:

Doesn't this require that both RDDs have the same partitioner?

On Thu, Feb 12, 2015 at 3:48 PM, Imran Rashid  
wrote:

Hi Karlson,

I think your assumptions are correct -- that join alone shouldn't 
require

any shuffling.  But its possible you are getting tripped up by lazy
evaluation of RDDs.  After you do your partitionBy, are you sure those 
RDDs
are actually materialized & cached somewhere?  eg., if you just did 
this:


val rddA = someData.partitionBy(N)
val rddB = someOtherData.partitionBy(N)
val joinedRdd = rddA.join(rddB)
joinedRdd.count() //or any other action

then the partitioning isn't actually getting run until you do the 
join.  So
though the join itself can happen without partitioning, 
joinedRdd.count()
will trigger the evaluation of rddA & rddB which will require 
shuffles.

Note that even if you have some intervening action on rddA & rddB that
shuffles them, unless you persist the result, you will need to 
reshuffle

them for the join.

If this doesn't help explain things, for debugging

joinedRdd.getPartitions.foreach{println}

this is getting into the weeds, but at least this will tell us whether 
or
not you are getting narrow dependencies, which would avoid the 
shuffle.

(Does anyone know of a simpler way to check this?)

hope this helps,
Imran




On Thu, Feb 12, 2015 at 9:25 AM, Karlson  wrote:


Hi All,

using Pyspark, I create two RDDs (one with about 2M records (~200MB), 
the

other with about 8M records (~2GB)) of the format (key, value).

I've done a partitionBy(num_partitions) on both RDDs and verified 
that
both RDDs have the same number of partitions and that equal keys 
reside on

the same partition (via mapPartitionsWithIndex).

Now I'd expect that for a join on the two RDDs no shuffling is 
necessary.
Looking at the Web UI under http://driver:4040 however reveals that 
that

assumption is false.

In fact I am seeing shuffle writes of about 200MB and reads of about 
50MB.


What's the explanation for that behaviour? Where am I wrong with my
assumption?

Thanks in advance,

Karlson

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: obtain cluster assignment in K-means

2015-02-12 Thread Shi Yu
Thanks Robin, got it.

On Thu, Feb 12, 2015 at 2:21 AM, Robin East  wrote:

> KMeans.train actually returns a KMeansModel so you can use predict()
> method of the model
>
> e.g. clusters.predict(pointToPredict)
> or
>
> clusters.predict(pointsToPredict)
>
> first is a single Vector, 2nd is RDD[Vector]
>
> Robin
>
> On 12 Feb 2015, at 06:37, Shi Yu  wrote:
>
> Hi there,
>
> I am new to spark.  When training a model using K-means using the following 
> code, how do I obtain the cluster assignment in the next step?
>
>
> val clusters = KMeans.train(parsedData, numClusters, numIterations)
>
>  I searched around many examples but they mostly calculate the WSSSE. I am 
> still confused.
>
> Thanks!
>
>
> Eilian
>
>
>
>
>


Re: Shuffle on joining two RDDs

2015-02-12 Thread Sean Owen
Doesn't this require that both RDDs have the same partitioner?

On Thu, Feb 12, 2015 at 3:48 PM, Imran Rashid  wrote:
> Hi Karlson,
>
> I think your assumptions are correct -- that join alone shouldn't require
> any shuffling.  But its possible you are getting tripped up by lazy
> evaluation of RDDs.  After you do your partitionBy, are you sure those RDDs
> are actually materialized & cached somewhere?  eg., if you just did this:
>
> val rddA = someData.partitionBy(N)
> val rddB = someOtherData.partitionBy(N)
> val joinedRdd = rddA.join(rddB)
> joinedRdd.count() //or any other action
>
> then the partitioning isn't actually getting run until you do the join.  So
> though the join itself can happen without partitioning, joinedRdd.count()
> will trigger the evaluation of rddA & rddB which will require shuffles.
> Note that even if you have some intervening action on rddA & rddB that
> shuffles them, unless you persist the result, you will need to reshuffle
> them for the join.
>
> If this doesn't help explain things, for debugging
>
> joinedRdd.getPartitions.foreach{println}
>
> this is getting into the weeds, but at least this will tell us whether or
> not you are getting narrow dependencies, which would avoid the shuffle.
> (Does anyone know of a simpler way to check this?)
>
> hope this helps,
> Imran
>
>
>
>
> On Thu, Feb 12, 2015 at 9:25 AM, Karlson  wrote:
>>
>> Hi All,
>>
>> using Pyspark, I create two RDDs (one with about 2M records (~200MB), the
>> other with about 8M records (~2GB)) of the format (key, value).
>>
>> I've done a partitionBy(num_partitions) on both RDDs and verified that
>> both RDDs have the same number of partitions and that equal keys reside on
>> the same partition (via mapPartitionsWithIndex).
>>
>> Now I'd expect that for a join on the two RDDs no shuffling is necessary.
>> Looking at the Web UI under http://driver:4040 however reveals that that
>> assumption is false.
>>
>> In fact I am seeing shuffle writes of about 200MB and reads of about 50MB.
>>
>> What's the explanation for that behaviour? Where am I wrong with my
>> assumption?
>>
>> Thanks in advance,
>>
>> Karlson
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Shuffle on joining two RDDs

2015-02-12 Thread Karlson

Hi Imran,

thanks for your quick reply.

Actually I am doing this:

rddA = rddA.partitionBy(n).cache()
rddB = rddB.partitionBy(n).cache()

followed by

rddA.count()
rddB.count()

then joinedRDD = rddA.join(rddB)

I thought that the count() would force the evaluation, so any subsequent 
joins would be shuffleless. I was wrong about the shuffle amounts 
however. The shuffle write is actually 2GB (i.e. the size of the bigger 
RDD) whil there is no Shuffle read. A joinedRdd.count() does a shuffle 
read of about 1GB in size, though.


The getPartitions-method does not exist on the resulting RDD (I am using 
the Python API). There is however foreachPartition(). What is the line


joinedRdd.getPartitions.foreach{println}

supposed to do?

Thank you,

Karlson

PS: Sorry for sending this twice, I accidentally did not reply to the 
mailing list first.



On 2015-02-12 16:48, Imran Rashid wrote:

Hi Karlson,

I think your assumptions are correct -- that join alone shouldn't 
require

any shuffling.  But its possible you are getting tripped up by lazy
evaluation of RDDs.  After you do your partitionBy, are you sure those 
RDDs
are actually materialized & cached somewhere?  eg., if you just did 
this:


val rddA = someData.partitionBy(N)
val rddB = someOtherData.partitionBy(N)
val joinedRdd = rddA.join(rddB)
joinedRdd.count() //or any other action

then the partitioning isn't actually getting run until you do the join. 
 So
though the join itself can happen without partitioning, 
joinedRdd.count()

will trigger the evaluation of rddA & rddB which will require shuffles.
Note that even if you have some intervening action on rddA & rddB that
shuffles them, unless you persist the result, you will need to 
reshuffle

them for the join.

If this doesn't help explain things, for debugging

joinedRdd.getPartitions.foreach{println}

this is getting into the weeds, but at least this will tell us whether 
or

not you are getting narrow dependencies, which would avoid the shuffle.
 (Does anyone know of a simpler way to check this?)

hope this helps,
Imran




On Thu, Feb 12, 2015 at 9:25 AM, Karlson  wrote:


Hi All,

using Pyspark, I create two RDDs (one with about 2M records (~200MB), 
the

other with about 8M records (~2GB)) of the format (key, value).

I've done a partitionBy(num_partitions) on both RDDs and verified that
both RDDs have the same number of partitions and that equal keys 
reside on

the same partition (via mapPartitionsWithIndex).

Now I'd expect that for a join on the two RDDs no shuffling is 
necessary.
Looking at the Web UI under http://driver:4040 however reveals that 
that

assumption is false.

In fact I am seeing shuffle writes of about 200MB and reads of about 
50MB.


What's the explanation for that behaviour? Where am I wrong with my
assumption?

Thanks in advance,

Karlson

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: saveAsHadoopFile is not a member of ... RDD[(String, MyObject)]

2015-02-12 Thread Imran Rashid
You need to import the implicit conversions to PairRDDFunctions with

import org.apache.spark.SparkContext._

(note that this requirement will go away in 1.3:
https://issues.apache.org/jira/browse/SPARK-4397)

On Thu, Feb 12, 2015 at 9:36 AM, Vladimir Protsenko 
wrote:

> Hi. I am stuck with how to save file to hdfs from spark.
>
> I have written MyOutputFormat extends FileOutputFormat,
> then in spark calling this:
>
>   rddres.saveAsHadoopFile[MyOutputFormat]("hdfs://localhost/output") or
>   rddres.saveAsHadoopFile("hdfs://localhost/output", classOf[String],
> classOf[MyObject],
>classOf[MyOutputFormat])
>
> where rddres is RDD[(String, MyObject)] from up of transformation pipeline.
>
> Compilation error is: /value saveAsHadoopFile is not a member of
> org.apache.spark.rdd.RDD[(String, vlpr.MyObject)]/.
>
> Could someone give me insights on what could be done here to make it
> working? Why it is not a member? Because of wrong types?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/saveAsHadoopFile-is-not-a-member-of-RDD-String-MyObject-tp21627.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: [hive context] Unable to query array once saved as parquet

2015-02-12 Thread Ayoub
Hi,

as I was trying to find a work around until this bug will be fixed, I
discovered an other bug posted here:
https://issues.apache.org/jira/browse/SPARK-5775

For those who might had the same issue, one could use the "LOAD" sql
command in a hive context to load the parquet file into the table as long
it not partitioned. The queries work fine after that.

Best,
Ayoub.

2015-01-31 4:05 GMT+01:00 Cheng Lian :

>  According to the Gist Ayoub provided, the schema is fine. I reproduced
> this issue locally, it should be bug, but I don't think it's related to
> SPARK-5236. Will investigate this soon.
>
> Ayoub - would you mind to help to file a JIRA for this issue? Thanks!
>
> Cheng
>
>
> On 1/30/15 11:28 AM, Michael Armbrust wrote:
>
> Is it possible that your schema contains duplicate columns or column with
> spaces in the name?  The parquet library will often give confusing error
> messages in this case.
>
> On Fri, Jan 30, 2015 at 10:33 AM, Ayoub 
> wrote:
>
>>  Hello,
>>
>> I have a problem when querying, with a hive context on spark
>> 1.2.1-snapshot, a column in my table which is nested data structure like an
>> array of struct.
>> The problems happens only on the table stored as parquet, while querying
>> the Schema RDD saved, as a temporary table, don't lead to any exception.
>>
>> my steps are:
>> 1) reading JSON file
>> 2) creating a schema RDD and saving it as a tmp table
>> 3) creating an external table in hive meta store saved as parquet file
>> 4) inserting the data from the tmp table to the persisted table
>> 5) queering the persisted table lead to this exception:
>>
>> "select data.field1 from persisted_table LATERAL VIEW explode(data_array)
>> nestedStuff AS data"
>>
>> parquet.io.ParquetDecodingException: Can not read value at 0 in block -1
>> in file hdfs://***/test_table/part-1
>> at
>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
>> at
>> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
>> at
>> org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145)
>> at
>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> at
>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>> at
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>> at
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>> at scala.collection.TraversableOnce$class.to
>> (TraversableOnce.scala:273)
>> at 
>> scala.collection.AbstractIterator.to(Iterator.scala:1157)
>> at
>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>> at
>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>> at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797)
>> at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797)
>> at
>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
>> at
>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:744)
>> Caused by: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
>> at java.util.ArrayList.rangeCheck(ArrayList.java:635)
>> at java.util.ArrayList.get(ArrayList.java:411)
>> at parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:99)
>> at parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:99)
>> at parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:99)
>> at parquet.io.PrimitiveColumnIO.getFirst(PrimitiveColumnIO.java:99)
>> at parquet.io.PrimitiveColumnIO.isFirst(PrimitiveColumnIO.java:94)
>> at
>> parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:274)
>> at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
>> at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
>> at
>> parquet.filter2.compat.F

Re: Shuffle on joining two RDDs

2015-02-12 Thread Imran Rashid
Hi Karlson,

I think your assumptions are correct -- that join alone shouldn't require
any shuffling.  But its possible you are getting tripped up by lazy
evaluation of RDDs.  After you do your partitionBy, are you sure those RDDs
are actually materialized & cached somewhere?  eg., if you just did this:

val rddA = someData.partitionBy(N)
val rddB = someOtherData.partitionBy(N)
val joinedRdd = rddA.join(rddB)
joinedRdd.count() //or any other action

then the partitioning isn't actually getting run until you do the join.  So
though the join itself can happen without partitioning, joinedRdd.count()
will trigger the evaluation of rddA & rddB which will require shuffles.
Note that even if you have some intervening action on rddA & rddB that
shuffles them, unless you persist the result, you will need to reshuffle
them for the join.

If this doesn't help explain things, for debugging

joinedRdd.getPartitions.foreach{println}

this is getting into the weeds, but at least this will tell us whether or
not you are getting narrow dependencies, which would avoid the shuffle.
 (Does anyone know of a simpler way to check this?)

hope this helps,
Imran




On Thu, Feb 12, 2015 at 9:25 AM, Karlson  wrote:

> Hi All,
>
> using Pyspark, I create two RDDs (one with about 2M records (~200MB), the
> other with about 8M records (~2GB)) of the format (key, value).
>
> I've done a partitionBy(num_partitions) on both RDDs and verified that
> both RDDs have the same number of partitions and that equal keys reside on
> the same partition (via mapPartitionsWithIndex).
>
> Now I'd expect that for a join on the two RDDs no shuffling is necessary.
> Looking at the Web UI under http://driver:4040 however reveals that that
> assumption is false.
>
> In fact I am seeing shuffle writes of about 200MB and reads of about 50MB.
>
> What's the explanation for that behaviour? Where am I wrong with my
> assumption?
>
> Thanks in advance,
>
> Karlson
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


failing GraphX application ('GC overhead limit exceeded', 'Lost executor', 'Connection refused', etc.)

2015-02-12 Thread Matthew Cornell
Hi Folks,

I'm running a five-step path following-algorithm on a movie graph with 120K 
verticies and 400K edges. The graph has vertices for actors, directors, movies, 
users, and user ratings, and my Scala code is walking the path "rating > movie 
> rating > user > rating". There are 75K rating nodes and each has ~100 edges. 
My program iterates over each path item, calling aggregateMessages() then 
joinVertices() each time, and then processing that result on the next 
iteration. The program never finishes the second 'rating' step, which makes 
sense as, IIUC from my back-of-the-napkin estimate, the intermediate result 
would have ~4B active vertices.

Spark is version 1.2.0 and running in standalone mode on a small cluster of 
five hosts: four compute nodes and a head node where the computes have 4 cores 
and 32GB RAM each, and the head has 32 cores and 128GB RAM. After restarting 
Spark just now, the Master web UI shows 15 workers (5 dead), two per node, with 
cores and memory listed as "32 (0 Used)" and "125.0 GB (0.0 B Used)" on the two 
head node workers and "4 (0 Used)" and "30.5 GB (0.0 B Used)" for the 8 workers 
running on the compute nodes. (Note: I don't understand why it's configured to 
run two workers per node.) The small Spark example programs run to completion.

I've listed the console output at http://pastebin.com/DPECKgQ9 (I'm running in 
spark-shell).

I hope you can provide some advice on things to try next (e.g., configuration 
vars). My guess is the cluster is running out of memory, though I think it has 
adequate aggregate ram to handle this app.

Thanks very much -- matt



Matthew Cornell, Research Fellow, Computer Science Department, Umass Amherst


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: saveAsHadoopFile is not a member of ... RDD[(String, MyObject)]

2015-02-12 Thread Ted Yu
You can use JavaPairRDD which has:

  override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] =
JavaPairRDD.fromRDD(rdd)

Cheers

On Thu, Feb 12, 2015 at 7:36 AM, Vladimir Protsenko 
wrote:

> Hi. I am stuck with how to save file to hdfs from spark.
>
> I have written MyOutputFormat extends FileOutputFormat,
> then in spark calling this:
>
>   rddres.saveAsHadoopFile[MyOutputFormat]("hdfs://localhost/output") or
>   rddres.saveAsHadoopFile("hdfs://localhost/output", classOf[String],
> classOf[MyObject],
>classOf[MyOutputFormat])
>
> where rddres is RDD[(String, MyObject)] from up of transformation pipeline.
>
> Compilation error is: /value saveAsHadoopFile is not a member of
> org.apache.spark.rdd.RDD[(String, vlpr.MyObject)]/.
>
> Could someone give me insights on what could be done here to make it
> working? Why it is not a member? Because of wrong types?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/saveAsHadoopFile-is-not-a-member-of-RDD-String-MyObject-tp21627.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


saveAsHadoopFile is not a member of ... RDD[(String, MyObject)]

2015-02-12 Thread Vladimir Protsenko
Hi. I am stuck with how to save file to hdfs from spark. 

I have written MyOutputFormat extends FileOutputFormat,
then in spark calling this:

  rddres.saveAsHadoopFile[MyOutputFormat]("hdfs://localhost/output") or
  rddres.saveAsHadoopFile("hdfs://localhost/output", classOf[String],
classOf[MyObject], 
   classOf[MyOutputFormat])

where rddres is RDD[(String, MyObject)] from up of transformation pipeline.

Compilation error is: /value saveAsHadoopFile is not a member of
org.apache.spark.rdd.RDD[(String, vlpr.MyObject)]/. 

Could someone give me insights on what could be done here to make it
working? Why it is not a member? Because of wrong types?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/saveAsHadoopFile-is-not-a-member-of-RDD-String-MyObject-tp21627.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Shuffle on joining two RDDs

2015-02-12 Thread Karlson

Hi All,

using Pyspark, I create two RDDs (one with about 2M records (~200MB), 
the other with about 8M records (~2GB)) of the format (key, value).


I've done a partitionBy(num_partitions) on both RDDs and verified that 
both RDDs have the same number of partitions and that equal keys reside 
on the same partition (via mapPartitionsWithIndex).


Now I'd expect that for a join on the two RDDs no shuffling is 
necessary. Looking at the Web UI under http://driver:4040 however 
reveals that that assumption is false.


In fact I am seeing shuffle writes of about 200MB and reads of about 
50MB.


What's the explanation for that behaviour? Where am I wrong with my 
assumption?


Thanks in advance,

Karlson

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Is it possible to expose SchemaRDD’s from thrift server?

2015-02-12 Thread Todd Nist
I have a question with regards to accessing SchemaRDD’s and Spark SQL temp
tables via the thrift server.  It appears that a SchemaRDD when created is
only available in the local namespace / context and are unavailable to
external services accessing Spark through thrift server via ODBC; is this
correct?  Does the same apply to temp tables?

If we process data on Spark how is it exposed to the thrift server for
access by third party BI applications via ODBC?  Dose one need to have two
spark context, one for processing, then dump it to metastore from which a
third party application can fetch the data or is it possible to expose the
resulting SchemaRDD via the thrift server?

I am trying to do this with Tableau, Spark SQL Connector.  From what I can
see I need the spark context for processing and then dump to metastore.  Is
it possible to access the resulting SchemaRDD from doing something like
this:

create temporary table test
using org.apache.spark.sql.json
options (path ‘/data/json/*');

cache table test;

I am using Spark 1.2.1.  If not available now will it be in 1.3.x? Or is
the only way to achieve this is store into the metastore and does the imply
hive.

-Todd


Use of nscala-time within spark-shell

2015-02-12 Thread Hammam
Hi All,

Thanks in advance for your help. I have timestamp which I need to convert to
datetime using scala. A folder contains the three needed jar files:
"joda-convert-1.5.jar  joda-time-2.4.jar  nscala-time_2.11-1.8.0.jar"
Using scala REPL and adding the jars: scala -classpath "*.jar"
I can use nscala-time like following:

scala> import com.github.nscala_time.time.Imports._
import com.github.nscala_time.time.Imports._

scala> import org.joda._
import org.joda._

scala> DateTime.now
res0: org.joda.time.DateTime = 2015-02-12T15:51:46.928+01:00

But when i try to use spark-shell:
ADD_JARS=/home/scala_test_class/nscala-time_2.11-1.8.0.jar,/home/scala_test_class/joda-time-2.4.jar,/home/scala_test_class/joda-convert-1.5.jar
/usr/local/spark/bin/spark-shell --master local --driver-memory 2g
--executor-memory 2g --executor-cores 1

It successfully imports the jars:

scala> import com.github.nscala_time.time.Imports._
import com.github.nscala_time.time.Imports._

scala> import org.joda._
import org.joda._

but fails using them
scala> DateTime.now
java.lang.NoSuchMethodError:
scala.Predef$.$conforms()Lscala/Predef$$less$colon$less;
at
com.github.nscala_time.time.LowPriorityOrderingImplicits$class.ReadableInstantOrdering(Implicits.scala:69)
at
com.github.nscala_time.time.Imports$.ReadableInstantOrdering(Imports.scala:20)
at
com.github.nscala_time.time.OrderingImplicits$class.$init$(Implicits.scala:61)
at com.github.nscala_time.time.Imports$.(Imports.scala:20)
at com.github.nscala_time.time.Imports$.(Imports.scala)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:17)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:22)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:24)
at $iwC$$iwC$$iwC$$iwC$$iwC.(:26)
at $iwC$$iwC$$iwC$$iwC.(:28)
at $iwC$$iwC$$iwC.(:30)
at $iwC$$iwC.(:32)
at $iwC.(:34)
at (:36)
at .(:40)
at .()
at .(:7)
at .()
at $print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)
at
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)
at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)
at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828)
at
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
at
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Your help is very aappreciated,

Regards,

Hammam



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Use-of-nscala-time-within-spark-shell-tp21624.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



  1   2   >