Re: in joins, does one side stream?

2015-09-19 Thread Rishitesh Mishra
Got it..thnx Reynold..
On 20 Sep 2015 07:08, "Reynold Xin"  wrote:

> The RDDs themselves are not materialized, but the implementations can
> materialize.
>
> E.g. in cogroup (which is used by RDD.join), it materializes all the data
> during grouping.
>
> In SQL/DataFrame join, depending on the join:
>
> 1. For broadcast join, only the smaller side is materialized in memory as
> a hash table.
>
> 2. For sort-merge join, both sides are sorted & streamed through --
> however, one of the sides need to buffer all the rows having the same join
> key in order to perform the join.
>
>
>
> On Sat, Sep 19, 2015 at 12:55 PM, Rishitesh Mishra <
> rishi80.mis...@gmail.com> wrote:
>
>> Hi Reynold,
>> Can you please elaborate on this. I thought RDD also opens only an
>> iterator. Does it get materialized for joins?
>>
>> Rishi
>>
>> On Saturday, September 19, 2015, Reynold Xin  wrote:
>>
>>> Yes for RDD -- both are materialized. No for DataFrame/SQL - one side
>>> streams.
>>>
>>>
>>> On Thu, Sep 17, 2015 at 11:21 AM, Koert Kuipers 
>>> wrote:
>>>
 in scalding we join with the smaller side on the left, since the
 smaller side will get buffered while the bigger side streams through the
 join.

 looking at CoGroupedRDD i do not get the impression such a distiction
 is made. it seems both sided are put into a map that can spill to disk. is
 this correct?

 thanks

>>>
>>>
>


Re: Using Spark for portfolio manager app

2015-09-19 Thread Jörn Franke
I think generally the way forward would be to put aggregate statistics to
an external storage (eg hbase) - it should not have that much influence on
latency. You will probably need it anyway if you need to store historical
information. Wrt to deltas - always a tricky topic. You may want to work
with absolute values and when the application queries the external
datastore then it calculates deltas. Once this works you can think if you
still need to do the delta approach or not.

Le dim. 20 sept. 2015 à 6:26, Thúy Hằng Lê  a écrit :

> Thanks Adrian and Jorn for the answers.
>
> Yes, you're right there are lot of things I need to consider if I want to
> use Spark for my app.
>
> I still have few concerns/questions from your information:
>
> 1/ I need to combine trading stream with tick stream, I am planning to use
> Kafka for that
> If I am using approach #2 (Direct Approach) in this tutorial
> https://spark.apache.org/docs/latest/streaming-kafka-integration.html
> Will I receive exactly one semantics? Or I have to add some logic in my
> code to archive that.
> As your suggestion of using delta update, exactly one semantic is required
> for this application.
>
> 2/ For ad-hoc query, I must output of Spark to external storage and query
> on that right?
> Is there any way to do ah-hoc query on Spark? my application could have
> 50k updates per second at pick time.
> Persistent to external storage lead to high latency in my app.
>
> 3/ How to get real-time statistics from Spark,
> In  most of the Spark streaming examples, the statistics are echo to the
> stdout.
> However, I want to display those statics on GUI, is there any way to
> retrieve data from Spark directly without using external Storage?
>
>
> 2015-09-19 16:23 GMT+07:00 Jörn Franke :
>
>> If you want to be able to let your users query their portfolio then you
>> may want to think about storing the current state of the portfolios in
>> hbase/phoenix or alternatively a cluster of relationaldatabases can make
>> sense. For the rest you may use Spark.
>>
>> Le sam. 19 sept. 2015 à 4:43, Thúy Hằng Lê  a
>> écrit :
>>
>>> Hi all,
>>>
>>> I am going to build a financial application for Portfolio Manager, where
>>> each portfolio contains a list of stocks, the number of shares purchased,
>>> and the purchase price.
>>> Another source of information is stocks price from market data. The
>>> application need to calculate real-time gain or lost of each stock in each
>>> portfolio ( compared to the purchase price).
>>>
>>> I am new with Spark, i know using Spark Streaming I can aggregate
>>> portfolio possitions in real-time, for example:
>>> user A contains:
>>>   - 100 IBM stock with transactionValue=$15000
>>>   - 500 AAPL stock with transactionValue=$11400
>>>
>>> Now given the stock prices change in real-time too, e.g if IBM price at
>>> 151, i want to update the gain or lost of it: gainOrLost(IBM) = 151*100 -
>>> 15000 = $100
>>>
>>> My questions are:
>>>
>>>  * What is the best method to combine 2 real-time streams(
>>> transaction made by user and market pricing data) in Spark.
>>>  * How can I use real-time Adhoc SQL again
>>> portfolio's positions, is there any way i can do SQL on the output of Spark
>>> Streamming.
>>>  For example,
>>>   select sum(gainOrLost) from portfolio where user='A';
>>>  * What are prefered external storages for Spark in this use
>>> case.
>>>  * Is spark is right choice for my use case?
>>>
>>>
>>
>


Re: Using Spark for portfolio manager app

2015-09-19 Thread Huy Banh
Hi Thuy,

You can check Rdd.lookup(). It requires the rdd is partitioned, and of
course, cached in memory. Or you may consider a distributed cache like
ehcache, aws elastic cache.

I think an external storage is an option, too. Especially nosql databases,
they can handle updates at high speed, at constant time.

Cheers,
Huy.

On Sun, Sep 20, 2015 at 11:26 AM Thúy Hằng Lê  wrote:

> Thanks Adrian and Jorn for the answers.
>
> Yes, you're right there are lot of things I need to consider if I want to
> use Spark for my app.
>
> I still have few concerns/questions from your information:
>
> 1/ I need to combine trading stream with tick stream, I am planning to use
> Kafka for that
> If I am using approach #2 (Direct Approach) in this tutorial
> https://spark.apache.org/docs/latest/streaming-kafka-integration.html
> Will I receive exactly one semantics? Or I have to add some logic in my
> code to archive that.
> As your suggestion of using delta update, exactly one semantic is required
> for this application.
>
> 2/ For ad-hoc query, I must output of Spark to external storage and query
> on that right?
> Is there any way to do ah-hoc query on Spark? my application could have
> 50k updates per second at pick time.
> Persistent to external storage lead to high latency in my app.
>
> 3/ How to get real-time statistics from Spark,
> In  most of the Spark streaming examples, the statistics are echo to the
> stdout.
> However, I want to display those statics on GUI, is there any way to
> retrieve data from Spark directly without using external Storage?
>
>
> 2015-09-19 16:23 GMT+07:00 Jörn Franke :
>
>> If you want to be able to let your users query their portfolio then you
>> may want to think about storing the current state of the portfolios in
>> hbase/phoenix or alternatively a cluster of relationaldatabases can make
>> sense. For the rest you may use Spark.
>>
>> Le sam. 19 sept. 2015 à 4:43, Thúy Hằng Lê  a
>> écrit :
>>
>>> Hi all,
>>>
>>> I am going to build a financial application for Portfolio Manager, where
>>> each portfolio contains a list of stocks, the number of shares purchased,
>>> and the purchase price.
>>> Another source of information is stocks price from market data. The
>>> application need to calculate real-time gain or lost of each stock in each
>>> portfolio ( compared to the purchase price).
>>>
>>> I am new with Spark, i know using Spark Streaming I can aggregate
>>> portfolio possitions in real-time, for example:
>>> user A contains:
>>>   - 100 IBM stock with transactionValue=$15000
>>>   - 500 AAPL stock with transactionValue=$11400
>>>
>>> Now given the stock prices change in real-time too, e.g if IBM price at
>>> 151, i want to update the gain or lost of it: gainOrLost(IBM) = 151*100 -
>>> 15000 = $100
>>>
>>> My questions are:
>>>
>>>  * What is the best method to combine 2 real-time streams(
>>> transaction made by user and market pricing data) in Spark.
>>>  * How can I use real-time Adhoc SQL again
>>> portfolio's positions, is there any way i can do SQL on the output of Spark
>>> Streamming.
>>>  For example,
>>>   select sum(gainOrLost) from portfolio where user='A';
>>>  * What are prefered external storages for Spark in this use
>>> case.
>>>  * Is spark is right choice for my use case?
>>>
>>>
>>
>


Re: question building spark in a virtual machine

2015-09-19 Thread Eyal Altshuler
I allocated almost 6GB of RAM to the ubuntu virtual machine and got the
same problem.
I will go over this post and try to zoom in into the java vm settings.

meanwhile - can someone with a working ubuntu machine can specify her JVM
settings?

Thanks,
Eyal

On Sat, Sep 19, 2015 at 7:49 PM, Ted Yu  wrote:

> Please read this article:
>
> http://blogs.vmware.com/apps/2011/06/taking-a-closer-look-at-sizing-the-java-process.html
>
> Can you increase the memory given to the ubuntu virtual machine ?
>
> Cheers
>
> On Sat, Sep 19, 2015 at 9:30 AM, Eyal Altshuler 
> wrote:
>
>> Hi,
>>
>> I allocate 4GB for the ubuntu virtual machine, how to check what is the
>> maximal available for a jvm process?
>> Regarding the thread - I see it's related to building on windows.
>>
>> Thanks,
>> Eyal
>>
>> On Sat, Sep 19, 2015 at 6:54 PM, Ted Yu  wrote:
>>
>>> See also this thread:
>>>
>>> https://bukkit.org/threads/complex-craftbukkit-server-and-java-problem-could-not-reserve-enough-space-for-object-heap.155192/
>>>
>>> Cheers
>>>
>>> On Sat, Sep 19, 2015 at 8:51 AM, Aniket Bhatnagar <
>>> aniket.bhatna...@gmail.com> wrote:
>>>
 Hi Eval

 Can you check if your Ubuntu VM has enough RAM allocated to run JVM of
 size 3gb?

 thanks,
 Aniket

 On Sat, Sep 19, 2015, 9:09 PM Eyal Altshuler 
 wrote:

> Hi,
>
> I had configured the MAVEN_OPTS environment variable the same as you
> wrote.
> My java version is 1.7.0_75.
> I didn't customized the JVM heap size specifically. Is there an
> additional configuration I have to run besides the MAVEN_OPTS 
> configutaion?
>
> Thanks,
> Eyal
>
> On Sat, Sep 19, 2015 at 5:29 PM, Ted Yu  wrote:
>
>> Can you tell us how you configured the JVM heap size ?
>> Which version of Java are you using ?
>>
>> When I build Spark, I do the following:
>>
>> export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M
>> -XX:ReservedCodeCacheSize=512m"
>>
>> Cheers
>>
>> On Sat, Sep 19, 2015 at 5:31 AM, Eyal Altshuler <
>> eyal.altshu...@gmail.com> wrote:
>>
>>> Hi,
>>> Trying to build spark in my ubuntu virtual machine, I am getting the
>>> following error:
>>>
>>> "Error occurred during initialization of VM
>>> Could not reserve enough space for object heap
>>> Error: could not create the Java Virtual Machine.
>>> Error: A fatal exception has occurred. Program will exit".
>>>
>>> I have configured the JVM heap size correctly.
>>>
>>> How can I fix it?
>>>
>>> Thanks,
>>> Eyal
>>>
>>
>>
>
>>>
>>
>


DataGenerator for streaming application

2015-09-19 Thread Saiph Kappa
Hi,

I am trying to build a data generator that feeds a streaming application.
This data generator just reads a file and send its lines through a socket.
I get no errors on the logs, and the benchmark bellow always prints
"Received 0 records". Am I doing something wrong?


object MyDataGenerator {

  def main(args: Array[String]) {
if (args.length != 3) {
  System.err.println("Usage: RawTextSender   ")
  System.exit(1)
}
// Parse the arguments using a pattern match
val (port, file, sleepMillis) = (args(0).toInt, args(1), args(2).toInt)

val serverSocket = new ServerSocket(port)
println("Listening on port " + port)


while (true) {
  val socket = serverSocket.accept()
  println("Got a new connection")


  val out = new PrintWriter(socket.getOutputStream)
  try {
var count = 0
var startTimestamp = -1
for (line <- Source.fromFile(file).getLines()) {
  val ts = line.substring(2, line.indexOf(',',2)).toInt
  if(startTimestamp < 0)
startTimestamp = ts

  if(ts - startTimestamp <= 30) {
out.println(line)
count += 1
  } else {
println(s"Emmited reports: $count")
count = 0
out.flush()
startTimestamp = ts
Thread.sleep(sleepMillis)
  }
}
  } catch {
case e: IOException =>
  println("Client disconnected")
  socket.close()
  }
}
}
}



object Benchmark {
  def main(args: Array[String]) {
if (args.length != 4) {
  System.err.println("Usage: RawNetworkGrep  
 ")
  System.exit(1)
}

val (numStreams, host, port, batchMillis) = (args(0).toInt,
args(1), args(2).toInt, args(3).toInt)
val sparkConf = new SparkConf()
sparkConf.setAppName("BenchMark")

sparkConf.setJars(Array("target/scala-2.10/benchmark-app_2.10-0.1-SNAPSHOT.jar"))
sparkConf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
sparkConf.set("spark.executor.extraJavaOptions", "
-XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+AggressiveOpts
-XX:FreqInlineSize=300 -XX:MaxInlineSize=300 ")
if (sparkConf.getOption("spark.master") == None) {
  // Master not set, as this was not launched through
Spark-submit. Setting master as local."
  sparkConf.setMaster("local[*]")
}

// Create the context
val ssc = new StreamingContext(sparkConf, Duration(batchMillis))

val rawStreams = (1 to numStreams).map(_ =>
  ssc.rawSocketStream[String](host, port,
StorageLevel.MEMORY_ONLY_SER)).toArray
val union = ssc.union(rawStreams)
union.count().map(c => s"Received $c records").print()
ssc.start()
ssc.awaitTermination()
  }
}

Thanks.


Re: Using Spark for portfolio manager app

2015-09-19 Thread Thúy Hằng Lê
Thanks Adrian and Jorn for the answers.

Yes, you're right there are lot of things I need to consider if I want to
use Spark for my app.

I still have few concerns/questions from your information:

1/ I need to combine trading stream with tick stream, I am planning to use
Kafka for that
If I am using approach #2 (Direct Approach) in this tutorial
https://spark.apache.org/docs/latest/streaming-kafka-integration.html
Will I receive exactly one semantics? Or I have to add some logic in my
code to archive that.
As your suggestion of using delta update, exactly one semantic is required
for this application.

2/ For ad-hoc query, I must output of Spark to external storage and query
on that right?
Is there any way to do ah-hoc query on Spark? my application could have 50k
updates per second at pick time.
Persistent to external storage lead to high latency in my app.

3/ How to get real-time statistics from Spark,
In  most of the Spark streaming examples, the statistics are echo to the
stdout.
However, I want to display those statics on GUI, is there any way to
retrieve data from Spark directly without using external Storage?


2015-09-19 16:23 GMT+07:00 Jörn Franke :

> If you want to be able to let your users query their portfolio then you
> may want to think about storing the current state of the portfolios in
> hbase/phoenix or alternatively a cluster of relationaldatabases can make
> sense. For the rest you may use Spark.
>
> Le sam. 19 sept. 2015 à 4:43, Thúy Hằng Lê  a
> écrit :
>
>> Hi all,
>>
>> I am going to build a financial application for Portfolio Manager, where
>> each portfolio contains a list of stocks, the number of shares purchased,
>> and the purchase price.
>> Another source of information is stocks price from market data. The
>> application need to calculate real-time gain or lost of each stock in each
>> portfolio ( compared to the purchase price).
>>
>> I am new with Spark, i know using Spark Streaming I can aggregate
>> portfolio possitions in real-time, for example:
>> user A contains:
>>   - 100 IBM stock with transactionValue=$15000
>>   - 500 AAPL stock with transactionValue=$11400
>>
>> Now given the stock prices change in real-time too, e.g if IBM price at
>> 151, i want to update the gain or lost of it: gainOrLost(IBM) = 151*100 -
>> 15000 = $100
>>
>> My questions are:
>>
>>  * What is the best method to combine 2 real-time streams(
>> transaction made by user and market pricing data) in Spark.
>>  * How can I use real-time Adhoc SQL again portfolio's positions,
>> is there any way i can do SQL on the output of Spark Streamming.
>>  For example,
>>   select sum(gainOrLost) from portfolio where user='A';
>>  * What are prefered external storages for Spark in this use case.
>>  * Is spark is right choice for my use case?
>>
>>
>


Re: PrunedFilteredScan does not work for UDTs and Struct fields

2015-09-19 Thread Zhan Zhang
Hi Richard,


I am not sure how to support user-defined type. But regarding your second 
question, you can have a walkaround as following.


Suppose you have a struct a, and want to filter a.c with a.c > X. You can 
define a alias C as a.c, and add extra column C to the schema of the relation, 
and your query would be C > X instead of a.c > X. In this way, in the buildScan 
you would have GreaterThan(C, X). You then can programmatically convert C to 
a.c. Note that in the buildScan required columns would also have an extra 
column C you need to returned in the buildScan RDD.


It looks complicated, but I think it would work.


Thanks.


Zhan Zhang


From: Richard Eggert 
Sent: Saturday, September 19, 2015 3:59 PM
To: User
Subject: PrunedFilteredScan does not work for UDTs and Struct fields

I defined my own relation (extending BaseRelation) and implemented the 
PrunedFilteredScan interface, but discovered that if the column referenced in a 
WHERE = clause is a user-defined type or a field of a struct column, then Spark 
SQL passes NO filters to the PrunedFilteredScan.buildScan method, rendering the 
interface useless. Is there really no way to implement a relation to optimize 
on such fields?

--
Rich


Re: in joins, does one side stream?

2015-09-19 Thread Reynold Xin
The RDDs themselves are not materialized, but the implementations can
materialize.

E.g. in cogroup (which is used by RDD.join), it materializes all the data
during grouping.

In SQL/DataFrame join, depending on the join:

1. For broadcast join, only the smaller side is materialized in memory as a
hash table.

2. For sort-merge join, both sides are sorted & streamed through --
however, one of the sides need to buffer all the rows having the same join
key in order to perform the join.



On Sat, Sep 19, 2015 at 12:55 PM, Rishitesh Mishra  wrote:

> Hi Reynold,
> Can you please elaborate on this. I thought RDD also opens only an
> iterator. Does it get materialized for joins?
>
> Rishi
>
> On Saturday, September 19, 2015, Reynold Xin  wrote:
>
>> Yes for RDD -- both are materialized. No for DataFrame/SQL - one side
>> streams.
>>
>>
>> On Thu, Sep 17, 2015 at 11:21 AM, Koert Kuipers 
>> wrote:
>>
>>> in scalding we join with the smaller side on the left, since the smaller
>>> side will get buffered while the bigger side streams through the join.
>>>
>>> looking at CoGroupedRDD i do not get the impression such a distiction is
>>> made. it seems both sided are put into a map that can spill to disk. is
>>> this correct?
>>>
>>> thanks
>>>
>>
>>


PrunedFilteredScan does not work for UDTs and Struct fields

2015-09-19 Thread Richard Eggert
I defined my own relation (extending BaseRelation) and implemented the
PrunedFilteredScan interface, but discovered that if the column referenced
in a WHERE = clause is a user-defined type or a field of a struct column,
then Spark SQL passes NO filters to the PrunedFilteredScan.buildScan
method, rendering the interface useless. Is there really no way to
implement a relation to optimize on such fields?

-- 
Rich


Re: Unable to see my kafka spark streaming output

2015-09-19 Thread kali.tumm...@gmail.com
Hi All,

figured it out for got mention local as loca[2] , at least two node
required.

package com.examples

/**
 * Created by kalit_000 on 19/09/2015.
 */

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark._
import  org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka.KafkaUtils


object SparkStreamingKafka {

  def main(args: Array[String]): Unit =
  {
Logger.getLogger("org").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)

val conf = new
SparkConf().setMaster("local[2]").setAppName("KafkaStreaming").set("spark.executor.memory",
"1g")
val sc=new SparkContext(conf)
val ssc= new StreamingContext(sc,Seconds(2))

val zkQuorm="localhost:2181"
val group="test-group"
val topics="first"
val numThreads=1

val topicMap=topics.split(",").map((_,numThreads.toInt)).toMap

val lineMap=KafkaUtils.createStream(ssc,zkQuorm,group,topicMap)

val lines=lineMap.map(_._2)

lines.print

//lines.print()

//val words=lines.flatMap(_.split(" "))

   // val pair=words.map( x => (x,1))

//val
wordcount=pair.reduceByKeyAndWindow(_+_,_-_,Minutes(1),Seconds(2),2)

//wordcount.print

//ssc.checkpoint("hdfs://localhost:9000/user/hduser/checkpoint")

ssc.checkpoint("C:\\scalatutorials\\sparkstreaming_checkpoint_folder")

//C:\scalatutorials\sparkstreaming_checkpoint_folder

ssc.start()

ssc.awaitTermination()





  }

}
Thanks
Sri 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-see-my-kafka-spark-streaming-output-tp24750p24751.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Unable to see my kafka spark streaming output

2015-09-19 Thread kali.tumm...@gmail.com
Hi All,
I am unable to see the output getting printed in the console can anyone
help.

package com.examples

/**
 * Created by kalit_000 on 19/09/2015.
 */

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark._
import  org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka.KafkaUtils


object SparkStreamingKafka {

  def main(args: Array[String]): Unit =
  {
Logger.getLogger("org").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)

val conf = new
SparkConf().setMaster("local").setAppName("KafkaStreaming").set("spark.executor.memory",
"1g")
val sc=new SparkContext(conf)
val ssc= new StreamingContext(sc,Seconds(2))

val zkQuorm="localhost:2181"
val group="test-group"
val topics="first"
val numThreads=1

val topicMap=topics.split(",").map((_,numThreads.toInt)).toMap

val lineMap=KafkaUtils.createStream(ssc,zkQuorm,group,topicMap)

val lines=lineMap.map(_._2)

//lines.print()

val words=lines.flatMap(_.split(" "))

val pair=words.map( x => (x,1))

val wordcount=pair.reduceByKeyAndWindow(_+_,_-_,Minutes(1),Seconds(2),2)

wordcount.print

//ssc.checkpoint("hdfs://localhost:9000/user/hduser/checkpoint")

ssc.checkpoint("C:\\scalatutorials\\sparkstreaming_checkpoint_folder")

//C:\scalatutorials\sparkstreaming_checkpoint_folder

ssc.start()

ssc.awaitTermination()





  }

}


Thanks
Sri 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-see-my-kafka-spark-streaming-output-tp24750.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Kafka createDirectStream ​issue

2015-09-19 Thread kali.tumm...@gmail.com
Hi ,

I am trying to develop in intellij Idea same code I am having the same issue
is there any work around.

Error in intellij:- cannot resolve symbol createDirectStream

import kafka.serializer.StringDecoder
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark._
import  org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka.KafkaUtils
import  org.apache.spark.streaming.kafka._
import com.datastax.spark.connector.streaming._
import org.apache.spark.streaming.kafka._

object SparkKafkaOffsetTest {

  def main(args: Array[String]): Unit = {
//Logger.getLogger("org").setLevel(Level.WARN)
//Logger.getLogger("akka").setLevel(Level.WARN)

val conf = new
SparkConf().setMaster("local").setAppName("KafkaOffsetStreaming").set("spark.executor.memory",
"1g")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(2))

val zkQuorm="localhost:2181"
val group="test-group"
val topics="first"
val numThreads=1
val broker="localhost:9091"
val kafkaParams = Map[String, String]("metadata.broker.list" -> broker)
//val kafkaParams = Map[String, String]("metadata.broker.list" )
val topicMap=topics.split(",").map((_,numThreads.toInt)).toMap

//val lineMap=KafkaUtils.createStream(ssc,zkQuorm,group,topicMap)

//val directKafkaStream=KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topics)

val messages= KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topics)



//val directKafkaStream = KafkaUtils.createDirectStream[
  //[key class], [value class], [key decoder class], [value decoder
class] ](
  //streamingContext, [map of Kafka parameters], [set of topics to
consume])

  }

}

Thanks
Sri




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-createDirectStream-issue-tp23456p24749.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: in joins, does one side stream?

2015-09-19 Thread Rishitesh Mishra
Hi Reynold,
Can you please elaborate on this. I thought RDD also opens only an
iterator. Does it get materialized for joins?

Rishi

On Saturday, September 19, 2015, Reynold Xin  wrote:

> Yes for RDD -- both are materialized. No for DataFrame/SQL - one side
> streams.
>
>
> On Thu, Sep 17, 2015 at 11:21 AM, Koert Kuipers  > wrote:
>
>> in scalding we join with the smaller side on the left, since the smaller
>> side will get buffered while the bigger side streams through the join.
>>
>> looking at CoGroupedRDD i do not get the impression such a distiction is
>> made. it seems both sided are put into a map that can spill to disk. is
>> this correct?
>>
>> thanks
>>
>
>


Re: Spark on Mesos with Jobs in Cluster Mode Documentation

2015-09-19 Thread Timothy Chen
You can still provide properties through the docker container by putting 
configuration in the conf directory, but we try to pass all properties 
submitted from the driver spark-submit through which I believe will override 
the defaults.

This is not what you are seeing?

Tim


> On Sep 19, 2015, at 9:01 AM, Alan Braithwaite  wrote:
> 
> The assumption that the executor has no default properties set in it's 
> environment through the docker container.  Correct me if I'm wrong, but any 
> properties which are unset in the SparkContext will come from the environment 
> of the executor will it not?
> 
> Thanks,
> - Alan
> 
>> On Sat, Sep 19, 2015 at 1:09 AM, Tim Chen  wrote:
>> I guess I need a bit more clarification, what kind of assumptions was the 
>> dispatcher making?
>> 
>> Tim
>> 
>> 
>>> On Thu, Sep 17, 2015 at 10:18 PM, Alan Braithwaite  
>>> wrote:
>>> Hi Tim,
>>> 
>>> Thanks for the follow up.  It's not so much that I expect the executor to 
>>> inherit the configuration of the dispatcher as I don't expect the 
>>> dispatcher to make assumptions about the system environment of the executor 
>>> (since it lives in a docker).  I could potentially see a case where you 
>>> might want to explicitly forbid the defaults, but I can't think of any 
>>> right now.
>>> 
>>> Otherwise, I'm confused as to why the defaults in the docker image for the 
>>> executor are just ignored.  I suppose that it's the dispatchers job to 
>>> ensure the exact configuration of the executor, regardless of the defaults 
>>> set on the executors machine?  Is that the assumption being made?  I can 
>>> understand that in contexts which aren't docker driven since jobs could be 
>>> rolling out in the middle of a config update.  Trying to think of this 
>>> outside the terms of just mesos/docker (since I'm fully aware that docker 
>>> doesn't rule the world yet).
>>> 
>>> So I can see this from both perspectives now and passing in the properties 
>>> file will probably work just fine for me, but for my better understanding: 
>>> When the executor starts, will it read any of the environment that it's 
>>> executing in or will it just take only the properties given to it by the 
>>> dispatcher and nothing more?
>>> 
>>> Lemme know if anything needs more clarification and thanks for your mesos 
>>> contribution to spark!
>>> 
>>> - Alan
>>> 
 On Thu, Sep 17, 2015 at 5:03 PM, Timothy Chen  wrote:
 Hi Alan,
 
 If I understand correctly, you are setting executor home when you launch 
 the dispatcher and not on the configuration when you submit job, and 
 expect it to inherit that configuration?
 
 When I worked on the dispatcher I was assuming all configuration is passed 
 to the dispatcher to launch the job exactly how you will need to launch it 
 with client mode.
 
 But indeed it shouldn't crash dispatcher, I'll take a closer look when I 
 get a chance.
 
 Can you recommend changes on the documentation, either in email or a PR?
 
 Thanks!
 
 Tim
 
 Sent from my iPhone
 
> On Sep 17, 2015, at 12:29 PM, Alan Braithwaite  
> wrote:
> 
> Hey All,
> 
> To bump this thread once again, I'm having some trouble using the 
> dispatcher as well.
> 
> I'm using Mesos Cluster Manager with Docker Executors.  I've deployed the 
> dispatcher as Marathon job.  When I submit a job using spark submit, the 
> dispatcher writes back that the submission was successful and then 
> promptly dies in marathon.  Looking at the logs reveals it was hitting 
> the following line:
> 
> 398:  throw new SparkException("Executor Spark home 
> `spark.mesos.executor.home` is not set!")
> 
> Which is odd because it's set in multiple places (SPARK_HOME, 
> spark.mesos.executor.home, spark.home, etc).  Reading the code, it 
> appears that the driver desc pulls only from the request and disregards 
> any other properties that may be configured.  Testing by passing --conf 
> spark.mesos.executor.home=/usr/local/spark on the command line to 
> spark-submit confirms this.  We're trying to isolate the number of places 
> where we have to set properties within spark and were hoping that it will 
> be possible to have this pull in the spark-defaults.conf from somewhere, 
> or at least allow the user to inform the dispatcher through spark-submit 
> that those properties will be available once the job starts. 
> 
> Finally, I don't think the dispatcher should crash in this event.  It 
> seems not exceptional that a job is misconfigured when submitted.
> 
> Please direct me on the right path if I'm headed in the wrong direction.  
> Also let me know if I should open some tickets for these issues.
> 
> Thanks,
> - Alan
> 
>> On Fri, Sep 11, 2015 at 1:05 PM, Tim Chen  wrote:
>> Yes you can create an issue, or actually contribute a patch

Re: question building spark in a virtual machine

2015-09-19 Thread Ted Yu
Please read this article:
http://blogs.vmware.com/apps/2011/06/taking-a-closer-look-at-sizing-the-java-process.html

Can you increase the memory given to the ubuntu virtual machine ?

Cheers

On Sat, Sep 19, 2015 at 9:30 AM, Eyal Altshuler 
wrote:

> Hi,
>
> I allocate 4GB for the ubuntu virtual machine, how to check what is the
> maximal available for a jvm process?
> Regarding the thread - I see it's related to building on windows.
>
> Thanks,
> Eyal
>
> On Sat, Sep 19, 2015 at 6:54 PM, Ted Yu  wrote:
>
>> See also this thread:
>>
>> https://bukkit.org/threads/complex-craftbukkit-server-and-java-problem-could-not-reserve-enough-space-for-object-heap.155192/
>>
>> Cheers
>>
>> On Sat, Sep 19, 2015 at 8:51 AM, Aniket Bhatnagar <
>> aniket.bhatna...@gmail.com> wrote:
>>
>>> Hi Eval
>>>
>>> Can you check if your Ubuntu VM has enough RAM allocated to run JVM of
>>> size 3gb?
>>>
>>> thanks,
>>> Aniket
>>>
>>> On Sat, Sep 19, 2015, 9:09 PM Eyal Altshuler 
>>> wrote:
>>>
 Hi,

 I had configured the MAVEN_OPTS environment variable the same as you
 wrote.
 My java version is 1.7.0_75.
 I didn't customized the JVM heap size specifically. Is there an
 additional configuration I have to run besides the MAVEN_OPTS configutaion?

 Thanks,
 Eyal

 On Sat, Sep 19, 2015 at 5:29 PM, Ted Yu  wrote:

> Can you tell us how you configured the JVM heap size ?
> Which version of Java are you using ?
>
> When I build Spark, I do the following:
>
> export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M
> -XX:ReservedCodeCacheSize=512m"
>
> Cheers
>
> On Sat, Sep 19, 2015 at 5:31 AM, Eyal Altshuler <
> eyal.altshu...@gmail.com> wrote:
>
>> Hi,
>> Trying to build spark in my ubuntu virtual machine, I am getting the
>> following error:
>>
>> "Error occurred during initialization of VM
>> Could not reserve enough space for object heap
>> Error: could not create the Java Virtual Machine.
>> Error: A fatal exception has occurred. Program will exit".
>>
>> I have configured the JVM heap size correctly.
>>
>> How can I fix it?
>>
>> Thanks,
>> Eyal
>>
>
>

>>
>


Re: question building spark in a virtual machine

2015-09-19 Thread Eyal Altshuler
Hi,

I allocate 4GB for the ubuntu virtual machine, how to check what is the
maximal available for a jvm process?
Regarding the thread - I see it's related to building on windows.

Thanks,
Eyal

On Sat, Sep 19, 2015 at 6:54 PM, Ted Yu  wrote:

> See also this thread:
>
> https://bukkit.org/threads/complex-craftbukkit-server-and-java-problem-could-not-reserve-enough-space-for-object-heap.155192/
>
> Cheers
>
> On Sat, Sep 19, 2015 at 8:51 AM, Aniket Bhatnagar <
> aniket.bhatna...@gmail.com> wrote:
>
>> Hi Eval
>>
>> Can you check if your Ubuntu VM has enough RAM allocated to run JVM of
>> size 3gb?
>>
>> thanks,
>> Aniket
>>
>> On Sat, Sep 19, 2015, 9:09 PM Eyal Altshuler 
>> wrote:
>>
>>> Hi,
>>>
>>> I had configured the MAVEN_OPTS environment variable the same as you
>>> wrote.
>>> My java version is 1.7.0_75.
>>> I didn't customized the JVM heap size specifically. Is there an
>>> additional configuration I have to run besides the MAVEN_OPTS configutaion?
>>>
>>> Thanks,
>>> Eyal
>>>
>>> On Sat, Sep 19, 2015 at 5:29 PM, Ted Yu  wrote:
>>>
 Can you tell us how you configured the JVM heap size ?
 Which version of Java are you using ?

 When I build Spark, I do the following:

 export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M
 -XX:ReservedCodeCacheSize=512m"

 Cheers

 On Sat, Sep 19, 2015 at 5:31 AM, Eyal Altshuler <
 eyal.altshu...@gmail.com> wrote:

> Hi,
> Trying to build spark in my ubuntu virtual machine, I am getting the
> following error:
>
> "Error occurred during initialization of VM
> Could not reserve enough space for object heap
> Error: could not create the Java Virtual Machine.
> Error: A fatal exception has occurred. Program will exit".
>
> I have configured the JVM heap size correctly.
>
> How can I fix it?
>
> Thanks,
> Eyal
>


>>>
>


Re: word count (group by users) in spark

2015-09-19 Thread Aniket Bhatnagar
Using scala API, you can first group by user and then use combineByKey.

Thanks,
Aniket

On Sat, Sep 19, 2015, 6:41 PM kali.tumm...@gmail.com 
wrote:

> Hi All,
> I would like to achieve this below output using spark , I managed to write
> in Hive and call it in spark but not in just spark (scala), how to group
> word counts on particular user (column) for example.
> Imagine users and their given tweets I want to do word count based on user
> name.
>
> Input:-
> kaliA,B,A,B,B
> james B,A,A,A,B
>
> Output:-
> kali A [Count] B [Count]
> James A [Count] B [Count]
>
> My Hive Answer:-
> CREATE EXTERNAL TABLE  TEST
> (
>  user_name string   ,
>  COMMENTS  STRING
>
> )  ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001'  STORED AS TEXTFILE
> LOCATION '/data/kali/test';    HDFS FOLDER (create hdfs folder and
> create a text file with data mentioned in the email)
>
> use default;select user_name,COLLECT_SET(text) from (select
> user_name,concat(sub,' ',count(comments)) as text  from test LATERAL VIEW
> explode(split(comments,',')) subView AS sub group by user_name,sub)w group
> by user_name;
>
> Spark With Hive:-
> package com.examples
>
> /**
>  * Created by kalit_000 on 17/09/2015.
>  */
> import org.apache.log4j.Logger
> import org.apache.log4j.Level
> import org.apache.spark.sql.SQLContext
> import org.apache.spark.sql.hive.HiveContext
> import org.apache.spark.{SparkContext, SparkConf}
> import org.apache.spark.SparkContext._
>
>
> object HiveWordCount {
>
>   def main(args: Array[String]): Unit =
>   {
> Logger.getLogger("org").setLevel(Level.WARN)
> Logger.getLogger("akka").setLevel(Level.WARN)
>
> val conf = new
>
> SparkConf().setMaster("local").setAppName("HiveWordCount").set("spark.executor.memory",
> "1g")
> val sc = new SparkContext(conf)
> val sqlContext= new SQLContext(sc)
>
> val hc=new HiveContext(sc)
>
> hc.sql("CREATE EXTERNAL TABLE IF NOT EXISTS default.TEST  (user_name
> string ,COMMENTS STRING )ROW FORMAT DELIMITED FIELDS TERMINATED BY '001'
> STORED AS TEXTFILE LOCATION '/data/kali/test' ")
>
> val op=hc.sql("select user_name,COLLECT_SET(text) from (select
> user_name,concat(sub,' ',count(comments)) as text  from default.test
> LATERAL
> VIEW explode(split(comments,',')) subView AS sub group by user_name,sub)w
> group by user_name")
>
> op.collect.foreach(println)
>
>
>   }
>
>
>
>
> Thanks
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/word-count-group-by-users-in-spark-tp24748.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark on Mesos with Jobs in Cluster Mode Documentation

2015-09-19 Thread Alan Braithwaite
The assumption that the executor has no default properties set in it's
environment through the docker container.  Correct me if I'm wrong, but any
properties which are unset in the SparkContext will come from the
environment of the executor will it not?

Thanks,
- Alan

On Sat, Sep 19, 2015 at 1:09 AM, Tim Chen  wrote:

> I guess I need a bit more clarification, what kind of assumptions was the
> dispatcher making?
>
> Tim
>
>
> On Thu, Sep 17, 2015 at 10:18 PM, Alan Braithwaite 
> wrote:
>
>> Hi Tim,
>>
>> Thanks for the follow up.  It's not so much that I expect the executor to
>> inherit the configuration of the dispatcher as I* don't *expect the
>> dispatcher to make assumptions about the system environment of the executor
>> (since it lives in a docker).  I could potentially see a case where you
>> might want to explicitly forbid the defaults, but I can't think of any
>> right now.
>>
>> Otherwise, I'm confused as to why the defaults in the docker image for
>> the executor are just ignored.  I suppose that it's the dispatchers job to
>> ensure the *exact* configuration of the executor, regardless of the
>> defaults set on the executors machine?  Is that the assumption being made?
>> I can understand that in contexts which aren't docker driven since jobs
>> could be rolling out in the middle of a config update.  Trying to think of
>> this outside the terms of just mesos/docker (since I'm fully aware that
>> docker doesn't rule the world yet).
>>
>> So I can see this from both perspectives now and passing in the
>> properties file will probably work just fine for me, but for my better
>> understanding: When the executor starts, will it read any of the
>> environment that it's executing in or will it just take only the properties
>> given to it by the dispatcher and nothing more?
>>
>> Lemme know if anything needs more clarification and thanks for your mesos
>> contribution to spark!
>>
>> - Alan
>>
>> On Thu, Sep 17, 2015 at 5:03 PM, Timothy Chen  wrote:
>>
>>> Hi Alan,
>>>
>>> If I understand correctly, you are setting executor home when you launch
>>> the dispatcher and not on the configuration when you submit job, and expect
>>> it to inherit that configuration?
>>>
>>> When I worked on the dispatcher I was assuming all configuration is
>>> passed to the dispatcher to launch the job exactly how you will need to
>>> launch it with client mode.
>>>
>>> But indeed it shouldn't crash dispatcher, I'll take a closer look when I
>>> get a chance.
>>>
>>> Can you recommend changes on the documentation, either in email or a PR?
>>>
>>> Thanks!
>>>
>>> Tim
>>>
>>> Sent from my iPhone
>>>
>>> On Sep 17, 2015, at 12:29 PM, Alan Braithwaite 
>>> wrote:
>>>
>>> Hey All,
>>>
>>> To bump this thread once again, I'm having some trouble using the
>>> dispatcher as well.
>>>
>>> I'm using Mesos Cluster Manager with Docker Executors.  I've deployed
>>> the dispatcher as Marathon job.  When I submit a job using spark submit,
>>> the dispatcher writes back that the submission was successful and then
>>> promptly dies in marathon.  Looking at the logs reveals it was hitting the
>>> following line:
>>>
>>> 398:  throw new SparkException("Executor Spark home
>>> `spark.mesos.executor.home` is not set!")
>>>
>>> Which is odd because it's set in multiple places (SPARK_HOME,
>>> spark.mesos.executor.home, spark.home, etc).  Reading the code, it
>>> appears that the driver desc pulls only from the request and disregards any
>>> other properties that may be configured.  Testing by passing --conf
>>> spark.mesos.executor.home=/usr/local/spark on the command line to
>>> spark-submit confirms this.  We're trying to isolate the number of places
>>> where we have to set properties within spark and were hoping that it will
>>> be possible to have this pull in the spark-defaults.conf from somewhere, or
>>> at least allow the user to inform the dispatcher through spark-submit that
>>> those properties will be available once the job starts.
>>>
>>> Finally, I don't think the dispatcher should crash in this event.  It
>>> seems not exceptional that a job is misconfigured when submitted.
>>>
>>> Please direct me on the right path if I'm headed in the wrong
>>> direction.  Also let me know if I should open some tickets for these issues.
>>>
>>> Thanks,
>>> - Alan
>>>
>>> On Fri, Sep 11, 2015 at 1:05 PM, Tim Chen  wrote:
>>>
 Yes you can create an issue, or actually contribute a patch to update
 it :)

 Sorry the docs is a bit light, I'm going to make it more complete along
 the way.

 Tim


 On Fri, Sep 11, 2015 at 11:11 AM, Tom Waterhouse (tomwater) <
 tomwa...@cisco.com> wrote:

> Tim,
>
> Thank you for the explanation.  You are correct, my Mesos experience
> is very light, and I haven’t deployed anything via Marathon yet.  What you
> have stated here makes sense, I will look into doing this.
>
> Adding this info to the docs would be great.  Is the appropri

Re: question building spark in a virtual machine

2015-09-19 Thread Ted Yu
See also this thread:
https://bukkit.org/threads/complex-craftbukkit-server-and-java-problem-could-not-reserve-enough-space-for-object-heap.155192/

Cheers

On Sat, Sep 19, 2015 at 8:51 AM, Aniket Bhatnagar <
aniket.bhatna...@gmail.com> wrote:

> Hi Eval
>
> Can you check if your Ubuntu VM has enough RAM allocated to run JVM of
> size 3gb?
>
> thanks,
> Aniket
>
> On Sat, Sep 19, 2015, 9:09 PM Eyal Altshuler 
> wrote:
>
>> Hi,
>>
>> I had configured the MAVEN_OPTS environment variable the same as you
>> wrote.
>> My java version is 1.7.0_75.
>> I didn't customized the JVM heap size specifically. Is there an
>> additional configuration I have to run besides the MAVEN_OPTS configutaion?
>>
>> Thanks,
>> Eyal
>>
>> On Sat, Sep 19, 2015 at 5:29 PM, Ted Yu  wrote:
>>
>>> Can you tell us how you configured the JVM heap size ?
>>> Which version of Java are you using ?
>>>
>>> When I build Spark, I do the following:
>>>
>>> export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M
>>> -XX:ReservedCodeCacheSize=512m"
>>>
>>> Cheers
>>>
>>> On Sat, Sep 19, 2015 at 5:31 AM, Eyal Altshuler <
>>> eyal.altshu...@gmail.com> wrote:
>>>
 Hi,
 Trying to build spark in my ubuntu virtual machine, I am getting the
 following error:

 "Error occurred during initialization of VM
 Could not reserve enough space for object heap
 Error: could not create the Java Virtual Machine.
 Error: A fatal exception has occurred. Program will exit".

 I have configured the JVM heap size correctly.

 How can I fix it?

 Thanks,
 Eyal

>>>
>>>
>>


Re: question building spark in a virtual machine

2015-09-19 Thread Aniket Bhatnagar
Hi Eval

Can you check if your Ubuntu VM has enough RAM allocated to run JVM of size
3gb?

thanks,
Aniket

On Sat, Sep 19, 2015, 9:09 PM Eyal Altshuler 
wrote:

> Hi,
>
> I had configured the MAVEN_OPTS environment variable the same as you wrote.
> My java version is 1.7.0_75.
> I didn't customized the JVM heap size specifically. Is there an additional
> configuration I have to run besides the MAVEN_OPTS configutaion?
>
> Thanks,
> Eyal
>
> On Sat, Sep 19, 2015 at 5:29 PM, Ted Yu  wrote:
>
>> Can you tell us how you configured the JVM heap size ?
>> Which version of Java are you using ?
>>
>> When I build Spark, I do the following:
>>
>> export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M
>> -XX:ReservedCodeCacheSize=512m"
>>
>> Cheers
>>
>> On Sat, Sep 19, 2015 at 5:31 AM, Eyal Altshuler > > wrote:
>>
>>> Hi,
>>> Trying to build spark in my ubuntu virtual machine, I am getting the
>>> following error:
>>>
>>> "Error occurred during initialization of VM
>>> Could not reserve enough space for object heap
>>> Error: could not create the Java Virtual Machine.
>>> Error: A fatal exception has occurred. Program will exit".
>>>
>>> I have configured the JVM heap size correctly.
>>>
>>> How can I fix it?
>>>
>>> Thanks,
>>> Eyal
>>>
>>
>>
>


Re: question building spark in a virtual machine

2015-09-19 Thread Eyal Altshuler
Hi,

I had configured the MAVEN_OPTS environment variable the same as you wrote.
My java version is 1.7.0_75.
I didn't customized the JVM heap size specifically. Is there an additional
configuration I have to run besides the MAVEN_OPTS configutaion?

Thanks,
Eyal

On Sat, Sep 19, 2015 at 5:29 PM, Ted Yu  wrote:

> Can you tell us how you configured the JVM heap size ?
> Which version of Java are you using ?
>
> When I build Spark, I do the following:
>
> export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M
> -XX:ReservedCodeCacheSize=512m"
>
> Cheers
>
> On Sat, Sep 19, 2015 at 5:31 AM, Eyal Altshuler 
> wrote:
>
>> Hi,
>> Trying to build spark in my ubuntu virtual machine, I am getting the
>> following error:
>>
>> "Error occurred during initialization of VM
>> Could not reserve enough space for object heap
>> Error: could not create the Java Virtual Machine.
>> Error: A fatal exception has occurred. Program will exit".
>>
>> I have configured the JVM heap size correctly.
>>
>> How can I fix it?
>>
>> Thanks,
>> Eyal
>>
>
>


Re: question building spark in a virtual machine

2015-09-19 Thread Ted Yu
Can you tell us how you configured the JVM heap size ?
Which version of Java are you using ?

When I build Spark, I do the following:

export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M
-XX:ReservedCodeCacheSize=512m"

Cheers

On Sat, Sep 19, 2015 at 5:31 AM, Eyal Altshuler 
wrote:

> Hi,
> Trying to build spark in my ubuntu virtual machine, I am getting the
> following error:
>
> "Error occurred during initialization of VM
> Could not reserve enough space for object heap
> Error: could not create the Java Virtual Machine.
> Error: A fatal exception has occurred. Program will exit".
>
> I have configured the JVM heap size correctly.
>
> How can I fix it?
>
> Thanks,
> Eyal
>


word count (group by users) in spark

2015-09-19 Thread kali.tumm...@gmail.com
Hi All, 
I would like to achieve this below output using spark , I managed to write
in Hive and call it in spark but not in just spark (scala), how to group
word counts on particular user (column) for example.
Imagine users and their given tweets I want to do word count based on user
name.

Input:-
kaliA,B,A,B,B
james B,A,A,A,B

Output:-
kali A [Count] B [Count]
James A [Count] B [Count]

My Hive Answer:-
CREATE EXTERNAL TABLE  TEST
(
 user_name string   ,
 COMMENTS  STRING 
   
)  ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001'  STORED AS TEXTFILE
LOCATION '/data/kali/test';    HDFS FOLDER (create hdfs folder and
create a text file with data mentioned in the email)

use default;select user_name,COLLECT_SET(text) from (select
user_name,concat(sub,' ',count(comments)) as text  from test LATERAL VIEW
explode(split(comments,',')) subView AS sub group by user_name,sub)w group
by user_name;

Spark With Hive:-
package com.examples

/**
 * Created by kalit_000 on 17/09/2015.
 */
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._


object HiveWordCount {

  def main(args: Array[String]): Unit =
  {
Logger.getLogger("org").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)

val conf = new
SparkConf().setMaster("local").setAppName("HiveWordCount").set("spark.executor.memory",
"1g")
val sc = new SparkContext(conf)
val sqlContext= new SQLContext(sc)

val hc=new HiveContext(sc)

hc.sql("CREATE EXTERNAL TABLE IF NOT EXISTS default.TEST  (user_name
string ,COMMENTS STRING )ROW FORMAT DELIMITED FIELDS TERMINATED BY '001' 
STORED AS TEXTFILE LOCATION '/data/kali/test' ")

val op=hc.sql("select user_name,COLLECT_SET(text) from (select
user_name,concat(sub,' ',count(comments)) as text  from default.test LATERAL
VIEW explode(split(comments,',')) subView AS sub group by user_name,sub)w
group by user_name")

op.collect.foreach(println)


  }




Thanks




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/word-count-group-by-users-in-spark-tp24748.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Docker/Mesos with Spark

2015-09-19 Thread John Omernik
I was searching in the 1.5.0 docs on the Docker on Mesos capabilities and
just found you CAN run it this way.  Are there any user posts, blog posts,
etc on why and how you'd do this?

Basically, at first I was questioning why you'd run spark in a docker
container, i.e., if you run with tar balled executor, what are you really
gaining?  And in this setup, are you losing out on performance somehow? (I
am guessing smarter people than I have figured that out).

Then I came along a situation where I wanted to use a python library with
spark, and it had to be installed on every node, and I realized one big
advantage of dockerized spark would be that spark apps that needed other
libraries could be contained and built well.

OK, that's huge, let's do that.  For my next question there are lot of
"questions" have on how this actually works.  Does Clustermode/client mode
apply here? If so, how?  Is there a good walk through on getting this
setup? Limitations? Gotchas?  Should I just dive in an start working with
it? Has anyone done any stories/rough documentation? This seems like a
really helpful feature to scaling out spark, and letting developers truly
build what they need without tons of admin overhead, so I really want to
explore.

Thanks!

John


question building spark in a virtual machine

2015-09-19 Thread Eyal Altshuler
Hi,
Trying to build spark in my ubuntu virtual machine, I am getting the
following error:

"Error occurred during initialization of VM
Could not reserve enough space for object heap
Error: could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit".

I have configured the JVM heap size correctly.

How can I fix it?

Thanks,
Eyal


Re: Using Spark for portfolio manager app

2015-09-19 Thread Jörn Franke
If you want to be able to let your users query their portfolio then you may
want to think about storing the current state of the portfolios in
hbase/phoenix or alternatively a cluster of relationaldatabases can make
sense. For the rest you may use Spark.

Le sam. 19 sept. 2015 à 4:43, Thúy Hằng Lê  a écrit :

> Hi all,
>
> I am going to build a financial application for Portfolio Manager, where
> each portfolio contains a list of stocks, the number of shares purchased,
> and the purchase price.
> Another source of information is stocks price from market data. The
> application need to calculate real-time gain or lost of each stock in each
> portfolio ( compared to the purchase price).
>
> I am new with Spark, i know using Spark Streaming I can aggregate
> portfolio possitions in real-time, for example:
> user A contains:
>   - 100 IBM stock with transactionValue=$15000
>   - 500 AAPL stock with transactionValue=$11400
>
> Now given the stock prices change in real-time too, e.g if IBM price at
> 151, i want to update the gain or lost of it: gainOrLost(IBM) = 151*100 -
> 15000 = $100
>
> My questions are:
>
>  * What is the best method to combine 2 real-time streams(
> transaction made by user and market pricing data) in Spark.
>  * How can I use real-time Adhoc SQL again portfolio's positions,
> is there any way i can do SQL on the output of Spark Streamming.
>  For example,
>   select sum(gainOrLost) from portfolio where user='A';
>  * What are prefered external storages for Spark in this use case.
>  * Is spark is right choice for my use case?
>
>


Re: Spark on Mesos with Jobs in Cluster Mode Documentation

2015-09-19 Thread Tim Chen
I guess I need a bit more clarification, what kind of assumptions was the
dispatcher making?

Tim


On Thu, Sep 17, 2015 at 10:18 PM, Alan Braithwaite 
wrote:

> Hi Tim,
>
> Thanks for the follow up.  It's not so much that I expect the executor to
> inherit the configuration of the dispatcher as I* don't *expect the
> dispatcher to make assumptions about the system environment of the executor
> (since it lives in a docker).  I could potentially see a case where you
> might want to explicitly forbid the defaults, but I can't think of any
> right now.
>
> Otherwise, I'm confused as to why the defaults in the docker image for the
> executor are just ignored.  I suppose that it's the dispatchers job to
> ensure the *exact* configuration of the executor, regardless of the
> defaults set on the executors machine?  Is that the assumption being made?
> I can understand that in contexts which aren't docker driven since jobs
> could be rolling out in the middle of a config update.  Trying to think of
> this outside the terms of just mesos/docker (since I'm fully aware that
> docker doesn't rule the world yet).
>
> So I can see this from both perspectives now and passing in the properties
> file will probably work just fine for me, but for my better understanding:
> When the executor starts, will it read any of the environment that it's
> executing in or will it just take only the properties given to it by the
> dispatcher and nothing more?
>
> Lemme know if anything needs more clarification and thanks for your mesos
> contribution to spark!
>
> - Alan
>
> On Thu, Sep 17, 2015 at 5:03 PM, Timothy Chen  wrote:
>
>> Hi Alan,
>>
>> If I understand correctly, you are setting executor home when you launch
>> the dispatcher and not on the configuration when you submit job, and expect
>> it to inherit that configuration?
>>
>> When I worked on the dispatcher I was assuming all configuration is
>> passed to the dispatcher to launch the job exactly how you will need to
>> launch it with client mode.
>>
>> But indeed it shouldn't crash dispatcher, I'll take a closer look when I
>> get a chance.
>>
>> Can you recommend changes on the documentation, either in email or a PR?
>>
>> Thanks!
>>
>> Tim
>>
>> Sent from my iPhone
>>
>> On Sep 17, 2015, at 12:29 PM, Alan Braithwaite 
>> wrote:
>>
>> Hey All,
>>
>> To bump this thread once again, I'm having some trouble using the
>> dispatcher as well.
>>
>> I'm using Mesos Cluster Manager with Docker Executors.  I've deployed the
>> dispatcher as Marathon job.  When I submit a job using spark submit, the
>> dispatcher writes back that the submission was successful and then promptly
>> dies in marathon.  Looking at the logs reveals it was hitting the following
>> line:
>>
>> 398:  throw new SparkException("Executor Spark home
>> `spark.mesos.executor.home` is not set!")
>>
>> Which is odd because it's set in multiple places (SPARK_HOME,
>> spark.mesos.executor.home, spark.home, etc).  Reading the code, it
>> appears that the driver desc pulls only from the request and disregards any
>> other properties that may be configured.  Testing by passing --conf
>> spark.mesos.executor.home=/usr/local/spark on the command line to
>> spark-submit confirms this.  We're trying to isolate the number of places
>> where we have to set properties within spark and were hoping that it will
>> be possible to have this pull in the spark-defaults.conf from somewhere, or
>> at least allow the user to inform the dispatcher through spark-submit that
>> those properties will be available once the job starts.
>>
>> Finally, I don't think the dispatcher should crash in this event.  It
>> seems not exceptional that a job is misconfigured when submitted.
>>
>> Please direct me on the right path if I'm headed in the wrong direction.
>> Also let me know if I should open some tickets for these issues.
>>
>> Thanks,
>> - Alan
>>
>> On Fri, Sep 11, 2015 at 1:05 PM, Tim Chen  wrote:
>>
>>> Yes you can create an issue, or actually contribute a patch to update it
>>> :)
>>>
>>> Sorry the docs is a bit light, I'm going to make it more complete along
>>> the way.
>>>
>>> Tim
>>>
>>>
>>> On Fri, Sep 11, 2015 at 11:11 AM, Tom Waterhouse (tomwater) <
>>> tomwa...@cisco.com> wrote:
>>>
 Tim,

 Thank you for the explanation.  You are correct, my Mesos experience is
 very light, and I haven’t deployed anything via Marathon yet.  What you
 have stated here makes sense, I will look into doing this.

 Adding this info to the docs would be great.  Is the appropriate action
 to create an issue regarding improvement of the docs?  For those of us who
 are gaining the experience having such a pointer is very helpful.

 Tom

 From: Tim Chen 
 Date: Thursday, September 10, 2015 at 10:25 AM
 To: Tom Waterhouse 
 Cc: "user@spark.apache.org" 
 Subject: Re: Spark on Mesos with Jobs in Cluster Mode Documentation

 Hi Tom,

 Sorry the documentation isn't rea

Re: Zeppelin on Yarn : org.apache.spark.SparkException: Detected yarn-cluster mode, but isn't running on a cluster. Deployment to YARN is not supported directly by SparkContext. Please use spark-submi

2015-09-19 Thread Ewan Leith
yarn-client still runs the executor tasks on the cluster, the main difference 
is where the driver job runs.


Thanks,

Ewan


-- Original message--

From: shahab

Date: Fri, 18 Sep 2015 13:11

To: Aniket Bhatnagar;

Cc: user@spark.apache.org;

Subject:Re: Zeppelin on Yarn : org.apache.spark.SparkException: Detected 
yarn-cluster mode, but isn't running on a cluster. Deployment to YARN is not 
supported directly by SparkContext. Please use spark-submit.


It works using yarn-client but I want to make it running on cluster. Is there 
any way to do so?

best,
/Shahab

On Fri, Sep 18, 2015 at 12:54 PM, Aniket Bhatnagar 
mailto:aniket.bhatna...@gmail.com>> wrote:

Can you try yarn-client mode?

On Fri, Sep 18, 2015, 3:38 PM shahab 
mailto:shahab.mok...@gmail.com>> wrote:
Hi,

Probably I have wrong zeppelin  configuration, because I get the following 
error when I execute spark statements in Zeppelin:

org.apache.spark.SparkException: Detected yarn-cluster mode, but isn't running 
on a cluster. Deployment to YARN is not supported directly by SparkContext. 
Please use spark-submit.


Anyone knows What's the solution to this?

best,
/Shahab