not able to write to cassandra table from spark

2016-05-10 Thread anandnilkal
I am trying to write incoming stream data to database. Following is the
example program, this code creates a thread to listen to incoming stream of
data which is csv data. this data needs to be split with delimiter and the
array of data needs to be pushed to database as separate columns in the
TABLE.

object dbwrite {
  case class record(id: Long, time: java.sql.Timestamp, rx: Int, tx: Int,
total: Int, multi: Double)
  def main(args: Array[String]) {
if (args.length < 2) {
  System.err.println("Usage: CustomReceiver  ")
  System.exit(1)
}

// Create the context with a 1 second batch size
val sparkConf = new SparkConf()
.set(“spark.cassandra.connection.host", "localhost")
.setAppName("dbwrite")
.set("spark.driver.allowMultipleContexts", "true")
val ssc = new StreamingContext(sparkConf, Seconds(1))
val sc = ssc.sparkContext

// Create a input stream with the custom receiver on target ip:port and
count the
// words in input stream of \n delimited text (eg. generated by 'nc')
val lines = ssc.receiverStream(new CustomReceiver(args(0),
args(1).toInt))
val splitRdd = lines.map(line => line.split(",") )
//val wordCounts = splitRdd.map(x => (x, 1)).reduceByKey(_ + _)
// RDD[Array[String]

val yourRdd = splitRdd.flatMap(arr => {
  val id = arr(0).toLong
  val rx = arr(2).toInt
  val tx = arr(3).toInt
  val total = arr(4).toInt
  val mul = arr(5).toInt
  val parsedDate = new java.util.Date()
  val timestamp = new java.sql.Timestamp(parsedDate.getTime());
  val reco = records(id, timestamp, rx, tx, total, mul);
  Seq(reco)
})

yourRdd.foreachRDD { rdd =>
for(item <- rdd.collect().toArray)
  print(item)
}
val rec = sc.parallelize(Seq(yourRdd))
rec.saveToCassandra("records", "record", SomeColumns(“id”, "time", "rx",
"tx", "total”, "multi"))

ssc.start()
ssc.awaitTermination()
  }
}
but spark does gives following error -
Exception in thread "main" java.lang.IllegalArgumentException: requirement
failed: Columns not found in
org.apache.spark.streaming.dstream.DStream[dbwrite.records]: [mdn, time, rx,
tx, total, multi]
at scala.Predef$.require(Predef.scala:233)
at
com.datastax.spark.connector.mapper.DefaultColumnMapper.columnMapForWriting(DefaultColumnMapper.scala:108)
at
com.datastax.spark.connector.writer.MappedToGettableDataConverter$$anon$1.(MappedToGettableDataConverter.scala:29)
at
com.datastax.spark.connector.writer.MappedToGettableDataConverter$.apply(MappedToGettableDataConverter.scala:20)
at
com.datastax.spark.connector.writer.DefaultRowWriter.(DefaultRowWriter.scala:17)
at
com.datastax.spark.connector.writer.DefaultRowWriter$$anon$1.rowWriter(DefaultRowWriter.scala:31)
at
com.datastax.spark.connector.writer.DefaultRowWriter$$anon$1.rowWriter(DefaultRowWriter.scala:29)
at
com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:272)
at
com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:36)
at dbwrite$.main(dbwrite.scala:63)
at dbwrite.main(dbwrite.scala)
i am using spark-1.6.1 and cassandra 3.5
the TABLE already created on cassandra has same column names. But the column
display in alphabetical order, but all columns are avaialble.
help me with the error.

thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/not-able-to-write-to-cassandra-table-from-spark-tp26923.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark hanging forever when doing decision tree training

2016-05-10 Thread Loic Quertenmont
Hello,

I am new to spark and I am currently learning how to use classification
algorithm with it.
For now on I am playing with a rather small dataset and training a decision
tree on my laptop  (running with --master local[1]).
However, systematically I see that my jobs are hanging forever at the
training stage.
Looking at the application UI, I see that my ongoing stage as still 15 out
of 16 task to do.
But none are scheduled and looking at the executors tab I see that my single
executor is indeed not processing anything (even if executor memory is still
fine).
 
 

If I ask the details of the hanging job I see:
org.apache.spark.rdd.RDD.count(RDD.scala:1125)
org.apache.spark.mllib.tree.impl.DecisionTreeMetadata$.buildMetadata(DecisionTreeMetadata.scala:114)
org.apache.spark.ml.tree.impl.RandomForest$.run(RandomForest.scala:65)
org.apache.spark.ml.classification.DecisionTreeClassifier.train(DecisionTreeClassifier.scala:77)
org.apache.spark.ml.classification.DecisionTreeClassifier.train(DecisionTreeClassifier.scala:40)

Any idea of what my problem could be?
My code is compiled with spark 1.6.1

Thanks in advance,
Loic



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-hanging-forever-when-doing-decision-tree-training-tp26922.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re:Re: Re: Re: Re: Re: Re: How big the spark stream window could be ?

2016-05-10 Thread 李明伟



[root@ES01 test]# jps
10409 Master
12578 CoarseGrainedExecutorBackend
24089 NameNode
17705 Jps
24184 DataNode
10603 Worker
12420 SparkSubmit






[root@ES01 test]# ps -awx | grep -i spark | grep java
10409 ?Sl 1:52 java -cp 
/opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/
 -Xms4G -Xmx4G -XX:MaxPermSize=256m org.apache.spark.deploy.master.Master --ip 
ES01 --port 7077 --webui-port 8080
10603 ?Sl 6:50 java -cp 
/opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/
 -Xms4G -Xmx4G -XX:MaxPermSize=256m org.apache.spark.deploy.worker.Worker 
--webui-port 8081 spark://ES01:7077
12420 ?Sl18:47 java -cp 
/opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/
 -Xms1g -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.SparkSubmit 
--master spark://ES01:7077 --conf spark.storage.memoryFraction=0.2 
--executor-memory 4G --num-executors 1 --total-executor-cores 1 
/opt/flowSpark/sparkStream/ForAsk01.py
12578 ?Sl38:18 java -cp 
/opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/
 -Xms4096M -Xmx4096M -Dspark.driver.port=52931 -XX:MaxPermSize=256m 
org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url 
spark://CoarseGrainedScheduler@10.79.148.184:52931 --executor-id 0 --hostname 
10.79.148.184 --cores 1 --app-id app-20160511080701-0013 --worker-url 
spark://Worker@10.79.148.184:52660





在 2016-05-11 13:18:10,"Mich Talebzadeh"  写道:

what does jps returning?


jps
16738 ResourceManager
14786 Worker
17059 JobHistoryServer
12421 QuorumPeerMain
9061 RunJar
9286 RunJar
5190 SparkSubmit
16806 NodeManager
16264 DataNode
16138 NameNode
16430 SecondaryNameNode
22036 SparkSubmit
9557 Jps
13240 Kafka
2522 Master


and


ps -awx | grep -i spark | grep java





Dr Mich Talebzadeh

 

LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 



On 11 May 2016 at 03:01, 李明伟  wrote:

Hi Mich


From the ps command. I can find four process. 10409 is the master and 10603 is 
the worker. 12420 is the driver program and 12578 should be the executor 
(worker). Am I right? 
So you mean the 12420 is actually running both the driver and the worker role?


[root@ES01 ~]# ps -awx | grep spark | grep java
10409 ?Sl 1:40 java -cp 
/opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/
 -Xms4G -Xmx4G -XX:MaxPermSize=256m org.apache.spark.deploy.master.Master --ip 
ES01 --port 7077 --webui-port 8080
10603 ?Sl 6:00 java -cp 
/opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/
 -Xms4G -Xmx4G -XX:MaxPermSize=256m org.apache.spark.deploy.worker.Worker 
--webui-port 8081 spark://ES01:7077
12420 ?Sl 6:34 java -cp 
/opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/
 -Xms1g -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.SparkSubmit 
--master spark://ES01:7077 --conf spark.storage.memoryFraction=0.2 
--executor-memory 4G --num-executors 1 --total-executor-cores 1 
/opt/flowSpark/sparkStream/ForAsk01.py
12578 ?Sl13:16 java -cp 
/opt/spark-1.6.0-bin-hadoop2.6/conf

Re: Re: Re: Re: Re: Re: How big the spark stream window could be ?

2016-05-10 Thread Mich Talebzadeh
what does jps returning?

jps
16738 ResourceManager
14786 Worker
17059 JobHistoryServer
12421 QuorumPeerMain
9061 RunJar
9286 RunJar
5190 SparkSubmit
16806 NodeManager
16264 DataNode
16138 NameNode
16430 SecondaryNameNode
22036 SparkSubmit
9557 Jps
13240 Kafka
2522 Master

and

ps -awx | grep -i spark | grep java


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 11 May 2016 at 03:01, 李明伟  wrote:

> Hi Mich
>
> From the ps command. I can find four process. 10409 is the master and
> 10603 is the worker. 12420 is the driver program and 12578 should be the
> executor (worker). Am I right?
> So you mean the 12420 is actually running both the driver and the worker
> role?
>
> [root@ES01 ~]# ps -awx | grep spark | grep java
> 10409 ?Sl 1:40 java -cp
> /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/
> -Xms4G -Xmx4G -XX:MaxPermSize=256m org.apache.spark.deploy.master.Master
> --ip ES01 --port 7077 --webui-port 8080
> 10603 ?Sl 6:00 java -cp
> /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/
> -Xms4G -Xmx4G -XX:MaxPermSize=256m org.apache.spark.deploy.worker.Worker
> --webui-port 8081 spark://ES01:7077
> 12420 ?Sl 6:34 java -cp
> /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/
> -Xms1g -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.SparkSubmit
> --master spark://ES01:7077 --conf spark.storage.memoryFraction=0.2
> --executor-memory 4G --num-executors 1 --total-executor-cores 1
> /opt/flowSpark/sparkStream/ForAsk01.py
> 12578 ?Sl13:16 java -cp
> /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/
> -Xms4096M -Xmx4096M -Dspark.driver.port=52931 -XX:MaxPermSize=256m
> org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://
> CoarseGrainedScheduler@10.79.148.184:52931 --executor-id 0 --hostname
> 10.79.148.184 --cores 1 --app-id app-20160511080701-0013 --worker-url
> spark://Worker@10.79.148.184:52660
>
>
>
>
>
>
>
> At 2016-05-11 09:03:21, "Mich Talebzadeh" 
> wrote:
>
> hm,
>
> This is a standalone mode.
>
> When you are running Spark in Standalone mode, you only have one worker
> that lives within the driver JVM process that you start when you start
> spark-shell or spark-submit.
>
> However, since driver-memory setting encapsulates the JVM, you will need
> to set the amount of *driver memory *for any non-default value *before
> starting JVM by providing the new value:*
>
>
>
>
> ${SPARK_HOME}/bin/spark-submit --driver-memory 5g
>
>
>
>
>
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 11 May 2016 at 01:22, 李明伟  wrote:
>
>> I actually provided them in submit command here:
>>
>> nohup ./bin/spark-submit   --master spark://ES01:7077 --executor-memory
>> 4G --num-executors 1 --total-executor-cores 1 --conf
>> "spark.storage.memoryFraction=0.2"  ./mycode.py 1>a.log 2>b.log &
>>
>>
>>
>>
>>
>>
>>
>> At 2016-05-10 21:19:06, "Mich Talebzadeh" 
>> wrote:
>>
>> Hi Mingwei,
>>
>> In your Spark conf setting what are you providing for these parameters. *Are
>> you capping them?*
>>
>> For example
>>
>>   val conf = new SparkConf().
>>setAppName("AppName").
>>setMaster("local[2]").
>>set("spark.executor.memory", "4G").
>>set("spark.cores.max", "2").
>>set("spark.driver.allowMultipleContexts", "true")
>>   val sc = new SparkContext(conf)
>>
>> I assume you are running in standalone mode so each worker/aka
>> slave grabs all the available cores and allocates the rem

Re:Re: Will the HiveContext cause memory leak ?

2016-05-10 Thread 李明伟
Hi  Ted


Spark version :  spark-1.6.0-bin-hadoop2.6
I tried increase the memory of executor. Still have the same problem.
I can use jmap to capture some thing. But the output is too difficult to 
understand. 










在 2016-05-11 11:50:14,"Ted Yu"  写道:

Which Spark release are you using ?


I assume executor crashed due to OOME.


Did you have a chance to capture jmap on the executor before it crashed ?


Have you tried giving more memory to the executor ?


Thanks


On Tue, May 10, 2016 at 8:25 PM, kramer2...@126.com wrote:
I submit my code to a spark stand alone cluster. Find the memory usage
executor process keeps growing. Which cause the program to crash.

I modified the code and submit several times. Find below 4 line may causing
the issue

dataframe =
dataframe.groupBy(['router','interface']).agg(func.sum('bits').alias('bits'))
windowSpec =
Window.partitionBy(dataframe['router']).orderBy(dataframe['bits'].desc())
rank = func.dense_rank().over(windowSpec)
ret =
dataframe.select(dataframe['router'],dataframe['interface'],dataframe['bits'],
rank.alias('rank')).filter("rank<=2")

It looks a little complicated but it is just some Window function on
dataframe. I use the HiveContext because SQLContext do not support window
function yet. Without the 4 line, my code can run all night. Adding them
will cause the memory leak. Program will crash in a few hours.

I will provided the whole code (50 lines)here.  ForAsk01.py

Please advice me if it is a bug..

Also here is the submit command

nohup ./bin/spark-submit  \
--master spark://ES01:7077 \
--executor-memory 4G \
--num-executors 1 \
--total-executor-cores 1 \
--conf "spark.storage.memoryFraction=0.2"  \
./ForAsk.py 1>a.log 2>b.log &





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Will-the-HiveContext-cause-memory-leak-tp26921.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





How to resolve Scheduling delay in Spark streaming applications?

2016-05-10 Thread Hemalatha A
Hello,

We are facing large  Scheduling delay in our  Spark streaming application.
Not sure how to debug why the delay is happening. We have all the tuning
possible on Spark side.

Can someone advice how to debug the cause of the delay and some tips for
resolving it please?

-- 


Regards
Hemalatha


Re: Will the HiveContext cause memory leak ?

2016-05-10 Thread Ted Yu
Which Spark release are you using ?

I assume executor crashed due to OOME.

Did you have a chance to capture jmap on the executor before it crashed ?

Have you tried giving more memory to the executor ?

Thanks

On Tue, May 10, 2016 at 8:25 PM, kramer2...@126.com 
wrote:

> I submit my code to a spark stand alone cluster. Find the memory usage
> executor process keeps growing. Which cause the program to crash.
>
> I modified the code and submit several times. Find below 4 line may causing
> the issue
>
> dataframe =
>
> dataframe.groupBy(['router','interface']).agg(func.sum('bits').alias('bits'))
> windowSpec =
> Window.partitionBy(dataframe['router']).orderBy(dataframe['bits'].desc())
> rank = func.dense_rank().over(windowSpec)
> ret =
>
> dataframe.select(dataframe['router'],dataframe['interface'],dataframe['bits'],
> rank.alias('rank')).filter("rank<=2")
>
> It looks a little complicated but it is just some Window function on
> dataframe. I use the HiveContext because SQLContext do not support window
> function yet. Without the 4 line, my code can run all night. Adding them
> will cause the memory leak. Program will crash in a few hours.
>
> I will provided the whole code (50 lines)here.  ForAsk01.py
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n26921/ForAsk01.py
> >
> Please advice me if it is a bug..
>
> Also here is the submit command
>
> nohup ./bin/spark-submit  \
> --master spark://ES01:7077 \
> --executor-memory 4G \
> --num-executors 1 \
> --total-executor-cores 1 \
> --conf "spark.storage.memoryFraction=0.2"  \
> ./ForAsk.py 1>a.log 2>b.log &
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Will-the-HiveContext-cause-memory-leak-tp26921.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Will the HiveContext cause memory leak ?

2016-05-10 Thread kramer2...@126.com
I submit my code to a spark stand alone cluster. Find the memory usage
executor process keeps growing. Which cause the program to crash.

I modified the code and submit several times. Find below 4 line may causing
the issue

dataframe =
dataframe.groupBy(['router','interface']).agg(func.sum('bits').alias('bits'))
windowSpec =
Window.partitionBy(dataframe['router']).orderBy(dataframe['bits'].desc())
rank = func.dense_rank().over(windowSpec)
ret =
dataframe.select(dataframe['router'],dataframe['interface'],dataframe['bits'],
rank.alias('rank')).filter("rank<=2")

It looks a little complicated but it is just some Window function on
dataframe. I use the HiveContext because SQLContext do not support window
function yet. Without the 4 line, my code can run all night. Adding them
will cause the memory leak. Program will crash in a few hours.

I will provided the whole code (50 lines)here.  ForAsk01.py
  
Please advice me if it is a bug..

Also here is the submit command 

nohup ./bin/spark-submit  \  
--master spark://ES01:7077 \
--executor-memory 4G \
--num-executors 1 \
--total-executor-cores 1 \
--conf "spark.storage.memoryFraction=0.2"  \
./ForAsk.py 1>a.log 2>b.log &





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Will-the-HiveContext-cause-memory-leak-tp26921.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



What does the spark stand alone cluster do?

2016-05-10 Thread kramer2...@126.com
Hello.

My question here is what the spark stand alone cluster do here. Because when
we submit program like below

./bin/spark-submit   --master spark://ES01:7077 --executor-memory 4G
--num-executors 1 --total-executor-cores 1 --conf
"spark.storage.memoryFraction=0.2" 


We specified the resource allocation manually
We specified the config manually 

Then what the cluster do here?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/What-does-the-spark-stand-alone-cluster-do-tp26920.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re:Re: Re: Re: Re: Re: How big the spark stream window could be ?

2016-05-10 Thread 李明伟
Hi Mich


From the ps command. I can find four process. 10409 is the master and 10603 is 
the worker. 12420 is the driver program and 12578 should be the executor 
(worker). Am I right? 
So you mean the 12420 is actually running both the driver and the worker role?


[root@ES01 ~]# ps -awx | grep spark | grep java
10409 ?Sl 1:40 java -cp 
/opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/
 -Xms4G -Xmx4G -XX:MaxPermSize=256m org.apache.spark.deploy.master.Master --ip 
ES01 --port 7077 --webui-port 8080
10603 ?Sl 6:00 java -cp 
/opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/
 -Xms4G -Xmx4G -XX:MaxPermSize=256m org.apache.spark.deploy.worker.Worker 
--webui-port 8081 spark://ES01:7077
12420 ?Sl 6:34 java -cp 
/opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/
 -Xms1g -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.SparkSubmit 
--master spark://ES01:7077 --conf spark.storage.memoryFraction=0.2 
--executor-memory 4G --num-executors 1 --total-executor-cores 1 
/opt/flowSpark/sparkStream/ForAsk01.py
12578 ?Sl13:16 java -cp 
/opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/
 -Xms4096M -Xmx4096M -Dspark.driver.port=52931 -XX:MaxPermSize=256m 
org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url 
spark://CoarseGrainedScheduler@10.79.148.184:52931 --executor-id 0 --hostname 
10.79.148.184 --cores 1 --app-id app-20160511080701-0013 --worker-url 
spark://Worker@10.79.148.184:52660










At 2016-05-11 09:03:21, "Mich Talebzadeh"  wrote:

hm,


This is a standalone mode.


When you are running Spark in Standalone mode, you only have one worker that 
lives within the driver JVM process that you start when you start spark-shell 
or spark-submit.



However, since driver-memory setting encapsulates the JVM, you will need to set 
the amount of driver memory for any non-default value before starting JVM by 
providing the new value:










${SPARK_HOME}/bin/spark-submit --driver-memory 5g










 













Dr Mich Talebzadeh

 

LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 



On 11 May 2016 at 01:22, 李明伟  wrote:

I actually provided them in submit command here:


nohup ./bin/spark-submit   --master spark://ES01:7077 --executor-memory 4G 
--num-executors 1 --total-executor-cores 1 --conf 
"spark.storage.memoryFraction=0.2"  ./mycode.py1>a.log 2>b.log &










At 2016-05-10 21:19:06, "Mich Talebzadeh"  wrote:

Hi Mingwei,


In your Spark conf setting what are you providing for these parameters. Are you 
capping them?


For example


  val conf = new SparkConf().
   setAppName("AppName").
   setMaster("local[2]").
   set("spark.executor.memory", "4G").
   set("spark.cores.max", "2").
   set("spark.driver.allowMultipleContexts", "true")
  val sc = new SparkContext(conf)


I assume you are running in standalone mode so each worker/aka slave grabs all 
the available cores and allocates the remaining memory on each host. Do not 
provide these in


Do not provide new values for these parameter meaning overwrite them in

${SPARK_HOME}/bin/spark-submit  --




HTH

























Dr Mich Talebzadeh

 

LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 



On 10 May 2016 at 03:12, 李明伟  wrote:

Hi Mich


I added some more infor (the spark-env.sh setting and top command output in 
that thread.) Can you help to check pleas?


Regards
Mingwei






At 2016-05-09 23:45:19, "Mich Talebzadeh"  wrote:

I had a look at the thread.


This is what you have which I gather a standalone box in other words one worker 
node


bin/spark-submit   --master spark://ES01:7077 --executor-memory 4G 
--num-executors 1 --total-executor-cores 1

Unable to write stream record to cassandra table with multiple columns

2016-05-10 Thread Anand N Ilkal
I am trying to write incoming stream data to database. Following is the example 
program, this code creates a thread to listen to incoming stream of data which 
is csv data. this data needs to be split with delimiter and the array of data 
needs to be pushed to database as separate columns in the TABLE.

object dbwrite {
  case class record(id: Long, time: java.sql.Timestamp, rx: Int, tx: Int, 
total: Int, multi: Double)
  def main(args: Array[String]) {
if (args.length < 2) {
  System.err.println("Usage: CustomReceiver  ")
  System.exit(1)
}

// Create the context with a 1 second batch size
val sparkConf = new SparkConf()
.set(“spark.cassandra.connection.host", "localhost")
.setAppName("dbwrite")
.set("spark.driver.allowMultipleContexts", "true")
val ssc = new StreamingContext(sparkConf, Seconds(1))
val sc = ssc.sparkContext

// Create a input stream with the custom receiver on target ip:port and 
count the
// words in input stream of \n delimited text (eg. generated by 'nc')
val lines = ssc.receiverStream(new CustomReceiver(args(0), args(1).toInt))
val splitRdd = lines.map(line => line.split(",") )
//val wordCounts = splitRdd.map(x => (x, 1)).reduceByKey(_ + _)
// RDD[Array[String]

val yourRdd = splitRdd.flatMap(arr => {
  val id = arr(0).toLong
  val rx = arr(2).toInt
  val tx = arr(3).toInt
  val total = arr(4).toInt
  val mul = arr(5).toInt
  val parsedDate = new java.util.Date()
  val timestamp = new java.sql.Timestamp(parsedDate.getTime());
  val reco = records(id, timestamp, rx, tx, total, mul);
  Seq(reco)
})

yourRdd.foreachRDD { rdd =>
for(item <- rdd.collect().toArray)
  print(item)
}
val rec = sc.parallelize(Seq(yourRdd))
rec.saveToCassandra("records", "record", SomeColumns(“id”, "time", "rx", 
"tx", "total”, "multi"))

ssc.start()
ssc.awaitTermination()
  }
}
but spark does gives following error -
Exception in thread "main" java.lang.IllegalArgumentException: requirement 
failed: Columns not found in 
org.apache.spark.streaming.dstream.DStream[dbwrite.records]: [mdn, time, rx, 
tx, total, multi]
at scala.Predef$.require(Predef.scala:233)
at 
com.datastax.spark.connector.mapper.DefaultColumnMapper.columnMapForWriting(DefaultColumnMapper.scala:108)
at 
com.datastax.spark.connector.writer.MappedToGettableDataConverter$$anon$1.(MappedToGettableDataConverter.scala:29)
at 
com.datastax.spark.connector.writer.MappedToGettableDataConverter$.apply(MappedToGettableDataConverter.scala:20)
at 
com.datastax.spark.connector.writer.DefaultRowWriter.(DefaultRowWriter.scala:17)
at 
com.datastax.spark.connector.writer.DefaultRowWriter$$anon$1.rowWriter(DefaultRowWriter.scala:31)
at 
com.datastax.spark.connector.writer.DefaultRowWriter$$anon$1.rowWriter(DefaultRowWriter.scala:29)
at 
com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:272)
at 
com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:36)
at dbwrite$.main(dbwrite.scala:63)
at dbwrite.main(dbwrite.scala)
i am using spark-1.6.1 and cassandra 3.5
the TABLE already created on cassandra has same column names. But the column 
display in alphabetical order, but all columns are avaialble.
help me with the error.

thanks.

RE: Accessing Cassandra data from Spark Shell

2016-05-10 Thread Mohammed Guller
Yes, it is very simple to access Cassandra data using Spark shell.

Step 1: Launch the spark-shell with the spark-cassandra-connector package
$SPARK_HOME/bin/spark-shell --packages 
com.datastax.spark:spark-cassandra-connector_2.10:1.5.0

Step 2: Create a DataFrame pointing to your Cassandra table
val dfCassTable = sqlContext.read
 
.format("org.apache.spark.sql.cassandra")
 .options(Map( "table" 
-> "your_column_family", "keyspace" -> "your_keyspace"))
 .load()

From this point onward, you have complete access to the DataFrame API. You can 
even register it as a temporary table, if you would prefer to use SQL/HiveQL.

Mohammed
Author: Big Data Analytics with 
Spark

From: Ben Slater [mailto:ben.sla...@instaclustr.com]
Sent: Monday, May 9, 2016 9:28 PM
To: u...@cassandra.apache.org; user
Subject: Re: Accessing Cassandra data from Spark Shell

You can use SparkShell to access Cassandra via the Spark Cassandra connector. 
The getting started article on our support page will probably give you a good 
steer to get started even if you’re not using Instaclustr: 
https://support.instaclustr.com/hc/en-us/articles/213097877-Getting-Started-with-Instaclustr-Spark-Cassandra-

Cheers
Ben

On Tue, 10 May 2016 at 14:08 Cassa L 
mailto:lcas...@gmail.com>> wrote:
Hi,
Has anyone tried accessing Cassandra data using SparkShell? How do you do it? 
Can you use HiveContext for Cassandra data? I'm using community version of 
Cassandra-3.0

Thanks,
LCassa
--

Ben Slater
Chief Product Officer, Instaclustr
+61 437 929 798


RE: Reading table schema from Cassandra

2016-05-10 Thread Mohammed Guller
You can create a DataFrame directly from a Cassandra table using something like 
this:

val dfCassTable = 
sqlContext.read.format("org.apache.spark.sql.cassandra").options(Map( "table" 
-> "your_column_family", "keyspace" -> "your_keyspace")).load()

Then, you can get schema:
val dfCassTableSchema = dfCassTable.schema

Mohammed
Author: Big Data Analytics with Spark


-Original Message-
From: justneeraj [mailto:justnee...@gmail.com] 
Sent: Tuesday, May 10, 2016 2:22 AM
To: user@spark.apache.org
Subject: Reading table schema from Cassandra

Hi,

We are using Spark Cassandra connector for our app. 

And I am trying to create higher level roll up tables. e.g. minutes table from 
seconds table. 

If my tables are already defined. How can I read the schema of table?
So that I can load them in the Dataframe and create the aggregates. 

Any help will be really thankful. 

Thanks,
Neeraj 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Reading-table-schema-from-Cassandra-tp26915.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Re: Re: Re: Re: How big the spark stream window could be ?

2016-05-10 Thread Mich Talebzadeh
hm,

This is a standalone mode.

When you are running Spark in Standalone mode, you only have one worker
that lives within the driver JVM process that you start when you start
spark-shell or spark-submit.

However, since driver-memory setting encapsulates the JVM, you will need to
set the amount of *driver memory *for any non-default value *before
starting JVM by providing the new value:*




${SPARK_HOME}/bin/spark-submit --driver-memory 5g










Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 11 May 2016 at 01:22, 李明伟  wrote:

> I actually provided them in submit command here:
>
> nohup ./bin/spark-submit   --master spark://ES01:7077 --executor-memory 4G
> --num-executors 1 --total-executor-cores 1 --conf
> "spark.storage.memoryFraction=0.2"  ./mycode.py 1>a.log 2>b.log &
>
>
>
>
>
>
>
> At 2016-05-10 21:19:06, "Mich Talebzadeh" 
> wrote:
>
> Hi Mingwei,
>
> In your Spark conf setting what are you providing for these parameters. *Are
> you capping them?*
>
> For example
>
>   val conf = new SparkConf().
>setAppName("AppName").
>setMaster("local[2]").
>set("spark.executor.memory", "4G").
>set("spark.cores.max", "2").
>set("spark.driver.allowMultipleContexts", "true")
>   val sc = new SparkContext(conf)
>
> I assume you are running in standalone mode so each worker/aka
> slave grabs all the available cores and allocates the remaining memory on
> each host. Do not provide these in
>
> Do not provide new values for these parameter meaning overwrite them in
>
> *${SPARK_HOME}/bin/spark-submit  --*
>
>
> HTH
>
>
>
>
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 10 May 2016 at 03:12, 李明伟  wrote:
>
>> Hi Mich
>>
>> I added some more infor (the spark-env.sh setting and top command output
>> in that thread.) Can you help to check pleas?
>>
>> Regards
>> Mingwei
>>
>>
>>
>>
>>
>> At 2016-05-09 23:45:19, "Mich Talebzadeh" 
>> wrote:
>>
>> I had a look at the thread.
>>
>> This is what you have which I gather a standalone box in other words one
>> worker node
>>
>> bin/spark-submit   --master spark://ES01:7077 --executor-memory 4G
>> --num-executors 1 --total-executor-cores 1 ./latest5min.py 1>a.log 2>b.log
>>
>> But what I don't understand why is using 80% of your RAM as opposed to
>> 25% of it (4GB/16GB) right?
>>
>> Where else have you set up these parameters for example in
>> $SPARK_HOME/con/spark-env.sh?
>>
>> Can you send the output of /usr/bin/free and top
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 9 May 2016 at 16:19, 李明伟  wrote:
>>
>>> Thanks for all the information guys.
>>>
>>> I wrote some code to do the test. Not using window. So only calculating
>>> data for each batch interval. I set the interval to 30 seconds also reduce
>>> the size of data to about 30 000 lines of csv.
>>> Means my code should calculation on 30 000 lines of CSV in 30 seconds. I
>>> think it is not a very heavy workload. But my spark stream code still crash.
>>>
>>> I send another post to the user list here
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Why-I-have-memory-leaking-for-such-simple-spark-stream-code-td26904.html
>>>
>>> Is it possible for you to have a look please? Very appreciate.
>>>
>>>
>>>
>>>
>>>
>>> At 2016-05-09 17:49:22, "Saisai Shao"  wrote:
>>>
>>> Pease see the inline comments.
>>>
>>>
>>> On Mon, May 9, 2016 at 5:31 PM, Ashok Kumar 
>>> wrote:
>>>
 Thank you.

 So If I create spark streaming then


1. The streams will always need to be cached? It cannot be stored
in persistent storage

 You don't need to cache the stream explicitly if you don't have
>>> specific requirement, Spark will do it for you depends on different
>>> streaming sources (Kafka or socket).
>>>

1. The stream data cached will be distributed among all nodes of
Spark among executors
2. As I understand each Spark worker node has one executor that
includes cache. So the streaming data is distributed among these work 
 node
caches. For example if I have 4 worker nodes each cache will have a 
 quarter
of data (this assumes that cache size among worker nodes is the same.)

 Ideally, it will distributed evenly across the executors, also this is
>>> target for tuning. Normally it depen

Spark 1.6 Catalyst optimizer

2016-05-10 Thread Telmo Rodrigues
Hello,

I have a question about the Catalyst optimizer in Spark 1.6.

initial logical plan:

!'Project [unresolvedalias(*)]
!+- 'Filter ('t.id = 1)
!   +- 'Join Inner, Some(('t.id = 'u.id))
!  :- 'UnresolvedRelation `t`, None
!  +- 'UnresolvedRelation `u`, None


logical plan after optimizer execution:

Project [id#0L,id#1L]
!+- Filter (id#0L = cast(1 as bigint))
!   +- Join Inner, Some((id#0L = id#1L))
!  :- Subquery t
!  :  +- Relation[id#0L] JSONRelation
!  +- Subquery u
!  +- Relation[id#1L] JSONRelation


Shouldn't the optimizer push down predicates to subquery t in order to the
filter be executed before join?

Thanks


Re:Re: Re: Re: Re: How big the spark stream window could be ?

2016-05-10 Thread 李明伟
I actually provided them in submit command here:


nohup ./bin/spark-submit   --master spark://ES01:7077 --executor-memory 4G 
--num-executors 1 --total-executor-cores 1 --conf 
"spark.storage.memoryFraction=0.2"  ./mycode.py1>a.log 2>b.log &










At 2016-05-10 21:19:06, "Mich Talebzadeh"  wrote:

Hi Mingwei,


In your Spark conf setting what are you providing for these parameters. Are you 
capping them?


For example


  val conf = new SparkConf().
   setAppName("AppName").
   setMaster("local[2]").
   set("spark.executor.memory", "4G").
   set("spark.cores.max", "2").
   set("spark.driver.allowMultipleContexts", "true")
  val sc = new SparkContext(conf)


I assume you are running in standalone mode so each worker/aka slave grabs all 
the available cores and allocates the remaining memory on each host. Do not 
provide these in


Do not provide new values for these parameter meaning overwrite them in

${SPARK_HOME}/bin/spark-submit  --




HTH

























Dr Mich Talebzadeh

 

LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 



On 10 May 2016 at 03:12, 李明伟  wrote:

Hi Mich


I added some more infor (the spark-env.sh setting and top command output in 
that thread.) Can you help to check pleas?


Regards
Mingwei






At 2016-05-09 23:45:19, "Mich Talebzadeh"  wrote:

I had a look at the thread.


This is what you have which I gather a standalone box in other words one worker 
node


bin/spark-submit   --master spark://ES01:7077 --executor-memory 4G 
--num-executors 1 --total-executor-cores 1 ./latest5min.py 1>a.log 2>b.log


But what I don't understand why is using 80% of your RAM as opposed to 25% of 
it (4GB/16GB) right?


Where else have you set up these parameters for example in 
$SPARK_HOME/con/spark-env.sh?


Can you send the output of /usr/bin/free and top


HTH



Dr Mich Talebzadeh

 

LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 



On 9 May 2016 at 16:19, 李明伟  wrote:

Thanks for all the information guys. 


I wrote some code to do the test. Not using window. So only calculating data 
for each batch interval. I set the interval to 30 seconds also reduce the size 
of data to about 30 000 lines of csv.
Means my code should calculation on 30 000 lines of CSV in 30 seconds. I think 
it is not a very heavy workload. But my spark stream code still crash.


I send another post to the user list here 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-I-have-memory-leaking-for-such-simple-spark-stream-code-td26904.html
 
Is it possible for you to have a look please? Very appreciate.






At 2016-05-09 17:49:22, "Saisai Shao"  wrote:

Pease see the inline comments.




On Mon, May 9, 2016 at 5:31 PM, Ashok Kumar  wrote:

Thank you.


So If I create spark streaming then


The streams will always need to be cached? It cannot be stored in persistent 
storage
You don't need to cache the stream explicitly if you don't have specific 
requirement, Spark will do it for you depends on different streaming sources 
(Kafka or socket).
The stream data cached will be distributed among all nodes of Spark among 
executors
As I understand each Spark worker node has one executor that includes cache. So 
the streaming data is distributed among these work node caches. For example if 
I have 4 worker nodes each cache will have a quarter of data (this assumes that 
cache size among worker nodes is the same.)
Ideally, it will distributed evenly across the executors, also this is target 
for tuning. Normally it depends on several conditions like receiver 
distribution, partition distribution.
 


The issue raises if the amount of streaming data does not fit into these 4 
caches? Will the job crash?



On Monday, 9 May 2016, 10:16, Saisai Shao  wrote:




No, each executor only stores part of data in memory (it depends on how the 
partition are distributed and how many receivers you have). 


For WindowedDStream, it will obviously cache the data in memory, from my 
understanding you don't need to call cache() again.


On Mon, May 9, 2016 at 5:06 PM, Ashok Kumar  wrote:

hi,


so if i have 10gb of streaming data coming in does it require 10gb of memory in 
each node?


also in that case why do we need using


dstream.cache()


thanks



On Monday, 9 May 2016, 9:58, Saisai Shao  wrote:




It depends on you to write the Spark application, normally if data is already 
on the persistent storage, there's no need to be put into memory. The reason 
why Spark Streaming has to be stored in memory is that streaming source is not 
persistent source, so you need to have a place to store the data.


On Mon, May 9, 2016 at 4:43 PM, 李明伟  wrote:

Thanks.
What if I use batch calculation instead of stream computing? Do I still need 
that much memory? For example, if the 24 hour data set is 100 

Re: Cluster Migration

2016-05-10 Thread Ajay Chander
Never mind! I figured it out by saving it as hadoopfile and passing the
codec to it. Thank you!

On Tuesday, May 10, 2016, Ajay Chander  wrote:

> Hi, I have a folder temp1 in hdfs which have multiple format files
> test1.txt, test2.avsc (Avro file) in it. Now I want to compress these files
> together and store it under temp2 folder in hdfs. Expecting that temp2
> folder will have one file test_compress.gz which has test1.txt and
> test2.avsc under it. Is there any possible/effiencient way to achieve this?
>
> Thanks,
> Aj
>
> On Tuesday, May 10, 2016, Ajay Chander  > wrote:
>
>> I will try that out. Thank you!
>>
>> On Tuesday, May 10, 2016, Deepak Sharma  wrote:
>>
>>> Yes that's what I intended to say.
>>>
>>> Thanks
>>> Deepak
>>> On 10 May 2016 11:47 pm, "Ajay Chander"  wrote:
>>>
 Hi Deepak,
Thanks for your response. If I am correct, you suggest reading
 all of those files into an rdd on the cluster using wholeTextFiles then
 apply compression codec on it, save the rdd to another Hadoop cluster?

 Thank you,
 Ajay

 On Tuesday, May 10, 2016, Deepak Sharma  wrote:

> Hi Ajay
> You can look at wholeTextFiles method of rdd[string,string] and then
> map each of rdd  to saveAsTextFile .
> This will serve the purpose .
> I don't think if anything default like distcp exists in spark
>
> Thanks
> Deepak
> On 10 May 2016 11:27 pm, "Ajay Chander"  wrote:
>
>> Hi Everyone,
>>
>> we are planning to migrate the data between 2 clusters and I see
>> distcp doesn't support data compression. Is there any efficient way to
>> compress the data during the migration ? Can I implement any spark job to
>> do this ? Thanks.
>>
>


Re: SparkSQL with large result size

2016-05-10 Thread Buntu Dev
Thanks Chris for pointing out the issue. I think I was able to get over
this issue by:

- repartitioning to increase the number of partitions (about 6k partitions)
- apply sort() on the resulting dataframe to coalesce into single sorted
partition file
- read the sorted file and then adding just limit() to get the desired
number of rows seem to have worked

Thanks everyone for the input!

On Tue, May 10, 2016 at 1:20 AM, Christophe Préaud <
christophe.pre...@kelkoo.com> wrote:

> Hi,
>
> You may be hitting this bug: SPARK-9879
> 
>
> In other words: did you try without the LIMIT clause?
>
> Regards,
> Christophe.
>
>
> On 02/05/16 20:02, Gourav Sengupta wrote:
>
> Hi,
>
> I have worked on 300GB data by querying it  from CSV (using SPARK CSV)
>  and writing it to Parquet format and then querying parquet format to query
> it and partition the data and write out individual csv files without any
> issues on a single node SPARK cluster installation.
>
> Are you trying to cache in the entire data? What is that you are trying to
> achieve in your used case?
>
> Regards,
> Gourav
>
> On Mon, May 2, 2016 at 5:59 PM, Ted Yu  wrote:
>
>> That's my interpretation.
>>
>> On Mon, May 2, 2016 at 9:45 AM, Buntu Dev < 
>> buntu...@gmail.com> wrote:
>>
>>> Thanks Ted, I thought the avg. block size was already low and less than
>>> the usual 128mb. If I need to reduce it further via parquet.block.size, it
>>> would mean an increase in the number of blocks and that should increase the
>>> number of tasks/executors. Is that the correct way to interpret this?
>>>
>>> On Mon, May 2, 2016 at 6:21 AM, Ted Yu < 
>>> yuzhih...@gmail.com> wrote:
>>>
 Please consider decreasing block size.

 Thanks

 > On May 1, 2016, at 9:19 PM, Buntu Dev < 
 buntu...@gmail.com> wrote:
 >
 > I got a 10g limitation on the executors and operating on parquet
 dataset with block size 70M with 200 blocks. I keep hitting the memory
 limits when doing a 'select * from t1 order by c1 limit 100' (ie, 1M).
 It works if I limit to say 100k. What are the options to save a large
 dataset without running into memory issues?
 >
 > Thanks!

>>>
>>>
>>
>
>
> --
> Kelkoo SAS
> Société par Actions Simplifiée
> Au capital de € 4.168.964,30
> Siège social : 158 Ter Rue du Temple 75003 Paris
> 425 093 069 RCS Paris
>
> Ce message et les pièces jointes sont confidentiels et établis à
> l'attention exclusive de leurs destinataires. Si vous n'êtes pas le
> destinataire de ce message, merci de le détruire et d'en avertir
> l'expéditeur.
>


Re: Cluster Migration

2016-05-10 Thread Ajay Chander
Hi, I have a folder temp1 in hdfs which have multiple format files
test1.txt, test2.avsc (Avro file) in it. Now I want to compress these files
together and store it under temp2 folder in hdfs. Expecting that temp2
folder will have one file test_compress.gz which has test1.txt and
test2.avsc under it. Is there any possible/effiencient way to achieve this?

Thanks,
Aj

On Tuesday, May 10, 2016, Ajay Chander  wrote:

> I will try that out. Thank you!
>
> On Tuesday, May 10, 2016, Deepak Sharma  > wrote:
>
>> Yes that's what I intended to say.
>>
>> Thanks
>> Deepak
>> On 10 May 2016 11:47 pm, "Ajay Chander"  wrote:
>>
>>> Hi Deepak,
>>>Thanks for your response. If I am correct, you suggest reading
>>> all of those files into an rdd on the cluster using wholeTextFiles then
>>> apply compression codec on it, save the rdd to another Hadoop cluster?
>>>
>>> Thank you,
>>> Ajay
>>>
>>> On Tuesday, May 10, 2016, Deepak Sharma  wrote:
>>>
 Hi Ajay
 You can look at wholeTextFiles method of rdd[string,string] and then
 map each of rdd  to saveAsTextFile .
 This will serve the purpose .
 I don't think if anything default like distcp exists in spark

 Thanks
 Deepak
 On 10 May 2016 11:27 pm, "Ajay Chander"  wrote:

> Hi Everyone,
>
> we are planning to migrate the data between 2 clusters and I see
> distcp doesn't support data compression. Is there any efficient way to
> compress the data during the migration ? Can I implement any spark job to
> do this ? Thanks.
>



Reliability of JMS Custom Receiver in Spark Streaming JMS

2016-05-10 Thread Sourav Mazumder
Hi,

Need to get bit more understanding of reliability aspects of the Custom
Receivers in the context of the code in spark-streaming-jms
https://github.com/mattf/spark-streaming-jms.

Based on the documentation in
http://spark.apache.org/docs/latest/streaming-custom-receivers.html#receiver-reliability,
I understand that if the store api is called with multiple records the
message is reliably stored as it is a blocking call. On the other hand if
the store api is called with a single record then it is not reliable as the
call is returned back to the calling program before the message is stored
appropriately.

Given that I have few questions

1. Which are the store APIs that relate to multiple records ? Are they the
ones which use scala.collection.mutable.ArrayBufferhttp://spark.apache.org/docs/latest/api/java/org/apache/spark/streaming/receiver/Receiver.html>>,

scala.collection.Iteratorhttp://spark.apache.org/docs/latest/api/java/org/apache/spark/streaming/receiver/Receiver.html>>
and
java.util.Iteratorhttp://spark.apache.org/docs/latest/api/java/org/apache/spark/streaming/receiver/Receiver.html>>
in the parameter signature?

2. Is there a sample code which can show how to create multiple records
like that and send the same to appropriate store API ?

3. If I take the example of spark-streaming-jms, the onMessage method of
JMSReceiver class calls store API with one JMSEvent. Does that mean that
this code does not guarantee the reliability of storage of the message
received even if storage level specified to MEMORY_AND_DISK_SER_2 ?

Regards,
Sourav


Re: Save DataFrame to HBase

2016-05-10 Thread Ted Yu
I think so.

Please refer to the table population tests in (master branch):
hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala

Cheers

On Tue, May 10, 2016 at 2:53 PM, Benjamin Kim  wrote:

> Ted,
>
> Will the hbase-spark module allow for creating tables in Spark SQL that
> reference the hbase tables underneath? In this way, users can query using
> just SQL.
>
> Thanks,
> Ben
>
> On Apr 28, 2016, at 3:09 AM, Ted Yu  wrote:
>
> Hbase 2.0 release likely would come after Spark 2.0 release.
>
> There're other features being developed in hbase 2.0
> I am not sure when hbase 2.0 would be released.
>
> The refguide is incomplete.
> Zhan has assigned the doc JIRA to himself. The documentation would be done
> after fixing bugs in hbase-spark module.
>
> Cheers
>
> On Apr 27, 2016, at 10:31 PM, Benjamin Kim  wrote:
>
> Hi Ted,
>
> Do you know when the release will be? I also see some documentation for
> usage of the hbase-spark module at the hbase website. But, I don’t see an
> example on how to save data. There is only one for reading/querying data.
> Will this be added when the final version does get released?
>
> Thanks,
> Ben
>
> On Apr 21, 2016, at 6:56 AM, Ted Yu  wrote:
>
> The hbase-spark module in Apache HBase (coming with hbase 2.0 release) can
> do this.
>
> On Thu, Apr 21, 2016 at 6:52 AM, Benjamin Kim  wrote:
>
>> Has anyone found an easy way to save a DataFrame into HBase?
>>
>> Thanks,
>> Ben
>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
>


Re: Save DataFrame to HBase

2016-05-10 Thread Benjamin Kim
Ted,

Will the hbase-spark module allow for creating tables in Spark SQL that 
reference the hbase tables underneath? In this way, users can query using just 
SQL.

Thanks,
Ben

> On Apr 28, 2016, at 3:09 AM, Ted Yu  wrote:
> 
> Hbase 2.0 release likely would come after Spark 2.0 release. 
> 
> There're other features being developed in hbase 2.0
> I am not sure when hbase 2.0 would be released. 
> 
> The refguide is incomplete. 
> Zhan has assigned the doc JIRA to himself. The documentation would be done 
> after fixing bugs in hbase-spark module. 
> 
> Cheers
> 
> On Apr 27, 2016, at 10:31 PM, Benjamin Kim  > wrote:
> 
>> Hi Ted,
>> 
>> Do you know when the release will be? I also see some documentation for 
>> usage of the hbase-spark module at the hbase website. But, I don’t see an 
>> example on how to save data. There is only one for reading/querying data. 
>> Will this be added when the final version does get released?
>> 
>> Thanks,
>> Ben
>> 
>>> On Apr 21, 2016, at 6:56 AM, Ted Yu >> > wrote:
>>> 
>>> The hbase-spark module in Apache HBase (coming with hbase 2.0 release) can 
>>> do this.
>>> 
>>> On Thu, Apr 21, 2016 at 6:52 AM, Benjamin Kim >> > wrote:
>>> Has anyone found an easy way to save a DataFrame into HBase?
>>> 
>>> Thanks,
>>> Ben
>>> 
>>> 
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>>> 
>>> For additional commands, e-mail: user-h...@spark.apache.org 
>>> 
>>> 
>>> 
>> 



Re: Pyspark accumulator

2016-05-10 Thread Abi


On May 10, 2016 2:24:41 PM EDT, Abi  wrote:
>1. How come pyspark does not provide the localvalue function like scala
>?
>
>2. Why is pyspark more restrictive than scala ?


Re: Accumulator question

2016-05-10 Thread Abi


On May 9, 2016 8:24:06 PM EDT, Abi  wrote:
>I am splitting an integer array in 2 partitions and using an
>accumulator  to sum the array. problem is
>
>1.  I am not seeing execution time becoming half of a linear summing.
>
>2. The second node (from looking at timestamps) takes 3 times as long
>as the first node. This gives the impression it is "waiting" for the
>first node to finish.
>
>Hence,  I am given the impression using accumulator.sum () in the
>kernel and rdd.foreach (kernel) is making things sequential. 
>
>Any api/setting suggestions where I could make things parallel ?
>
>
> 


Re: pyspark mappartions ()

2016-05-10 Thread Abi


On May 10, 2016 2:20:25 PM EDT, Abi  wrote:
>Is there any example of this ? I want to see how you write the the
>iterable example


Re: Evenly balance the number of items in each RDD partition

2016-05-10 Thread Ayman Khalil
Hi Xinh,

Thanks! Custom partitioner with partitionBy() did the job.


On Tue, May 10, 2016 at 11:36 PM, Xinh Huynh  wrote:

> Hi Ayman,
>
> Have you looked at this:
> http://stackoverflow.com/questions/23127329/how-to-define-custom-partitioner-for-spark-rdds-of-equally-sized-partition-where
>
> It recommends defining a custom partitioner and (PairRDD) partitionBy
> method to accomplish this.
>
> Xinh
>
> On Tue, May 10, 2016 at 1:15 PM, Ayman Khalil 
> wrote:
>
>> And btw, I'm using the Python API if this makes any difference.
>>
>> On Tue, May 10, 2016 at 11:14 PM, Ayman Khalil 
>> wrote:
>>
>>> Hi Don,
>>>
>>> This didn't help. My original rdd is already created using 10
>>> partitions. As a matter of fact, after trying with rdd.coalesce(10,
>>> shuffle = true) out of curiosity, the rdd partitions became even more
>>> imbalanced:
>>> [(0, 5120), (1, 5120), (2, 5120), (3, 5120), (4, *3920*), (5, 4096),
>>> (6, 5120), (7, 5120), (8, 5120), (9, *6144*)]
>>>
>>>
>>> On Tue, May 10, 2016 at 10:16 PM, Don Drake  wrote:
>>>
 You can call rdd.coalesce(10, shuffle = true) and the returning rdd
 will be evenly balanced.  This obviously triggers a shuffle, so be advised
 it could be an expensive operation depending on your RDD size.

 -Don

 On Tue, May 10, 2016 at 12:38 PM, Ayman Khalil 
 wrote:

> Hello,
>
> I have 50,000 items parallelized into an RDD with 10 partitions, I
> would like to evenly split the items over the partitions so:
> 50,000/10 = 5,000 in each RDD partition.
>
> What I get instead is the following (partition index, partition count):
> [(0, 4096), (1, 5120), (2, 5120), (3, 5120), (4, 5120), (5, 5120), (6,
> 5120), (7, 5120), (8, 5120), (9, 4944)]
>
> the total is correct (4096 + 4944 + 8*5120 = 50,000) but the
> partitions are imbalanced.
>
> Is there a way to do that?
>
> Thank you,
> Ayman
>



 --
 Donald Drake
 Drake Consulting
 http://www.drakeconsulting.com/
 https://twitter.com/dondrake 
 800-733-2143

>>>
>>>
>>
>


Re: Evenly balance the number of items in each RDD partition

2016-05-10 Thread Don Drake
Well, for Python, it should be rdd.coalesce(10, shuffle=True)

I have had good success with this using the Scala API in Spark 1.6.1.

-Don

On Tue, May 10, 2016 at 3:15 PM, Ayman Khalil  wrote:

> And btw, I'm using the Python API if this makes any difference.
>
> On Tue, May 10, 2016 at 11:14 PM, Ayman Khalil 
> wrote:
>
>> Hi Don,
>>
>> This didn't help. My original rdd is already created using 10 partitions.
>> As a matter of fact, after trying with rdd.coalesce(10, shuffle =
>> true) out of curiosity, the rdd partitions became even more imbalanced:
>> [(0, 5120), (1, 5120), (2, 5120), (3, 5120), (4, *3920*), (5, 4096), (6,
>> 5120), (7, 5120), (8, 5120), (9, *6144*)]
>>
>>
>> On Tue, May 10, 2016 at 10:16 PM, Don Drake  wrote:
>>
>>> You can call rdd.coalesce(10, shuffle = true) and the returning rdd will
>>> be evenly balanced.  This obviously triggers a shuffle, so be advised it
>>> could be an expensive operation depending on your RDD size.
>>>
>>> -Don
>>>
>>> On Tue, May 10, 2016 at 12:38 PM, Ayman Khalil 
>>> wrote:
>>>
 Hello,

 I have 50,000 items parallelized into an RDD with 10 partitions, I
 would like to evenly split the items over the partitions so:
 50,000/10 = 5,000 in each RDD partition.

 What I get instead is the following (partition index, partition count):
 [(0, 4096), (1, 5120), (2, 5120), (3, 5120), (4, 5120), (5, 5120), (6,
 5120), (7, 5120), (8, 5120), (9, 4944)]

 the total is correct (4096 + 4944 + 8*5120 = 50,000) but the partitions
 are imbalanced.

 Is there a way to do that?

 Thank you,
 Ayman

>>>
>>>
>>>
>>> --
>>> Donald Drake
>>> Drake Consulting
>>> http://www.drakeconsulting.com/
>>> https://twitter.com/dondrake 
>>> 800-733-2143
>>>
>>
>>
>


-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake 
800-733-2143


Not able pass 3rd party jars to mesos executors

2016-05-10 Thread gpatcham
Hi All,

I'm using --jars option in spark-submit to send 3rd party jars . But I don't
see they are actually passed to mesos slaves. Getting Noclass found
exceptions.

This is how I'm using --jars option

--jars hdfs://namenode:8082/user/path/to/jar

Am I missing something here or what's the correct  way to do ?

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Not-able-pass-3rd-party-jars-to-mesos-executors-tp26918.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Evenly balance the number of items in each RDD partition

2016-05-10 Thread Xinh Huynh
Hi Ayman,

Have you looked at this:
http://stackoverflow.com/questions/23127329/how-to-define-custom-partitioner-for-spark-rdds-of-equally-sized-partition-where

It recommends defining a custom partitioner and (PairRDD) partitionBy
method to accomplish this.

Xinh

On Tue, May 10, 2016 at 1:15 PM, Ayman Khalil  wrote:

> And btw, I'm using the Python API if this makes any difference.
>
> On Tue, May 10, 2016 at 11:14 PM, Ayman Khalil 
> wrote:
>
>> Hi Don,
>>
>> This didn't help. My original rdd is already created using 10 partitions.
>> As a matter of fact, after trying with rdd.coalesce(10, shuffle =
>> true) out of curiosity, the rdd partitions became even more imbalanced:
>> [(0, 5120), (1, 5120), (2, 5120), (3, 5120), (4, *3920*), (5, 4096), (6,
>> 5120), (7, 5120), (8, 5120), (9, *6144*)]
>>
>>
>> On Tue, May 10, 2016 at 10:16 PM, Don Drake  wrote:
>>
>>> You can call rdd.coalesce(10, shuffle = true) and the returning rdd will
>>> be evenly balanced.  This obviously triggers a shuffle, so be advised it
>>> could be an expensive operation depending on your RDD size.
>>>
>>> -Don
>>>
>>> On Tue, May 10, 2016 at 12:38 PM, Ayman Khalil 
>>> wrote:
>>>
 Hello,

 I have 50,000 items parallelized into an RDD with 10 partitions, I
 would like to evenly split the items over the partitions so:
 50,000/10 = 5,000 in each RDD partition.

 What I get instead is the following (partition index, partition count):
 [(0, 4096), (1, 5120), (2, 5120), (3, 5120), (4, 5120), (5, 5120), (6,
 5120), (7, 5120), (8, 5120), (9, 4944)]

 the total is correct (4096 + 4944 + 8*5120 = 50,000) but the partitions
 are imbalanced.

 Is there a way to do that?

 Thank you,
 Ayman

>>>
>>>
>>>
>>> --
>>> Donald Drake
>>> Drake Consulting
>>> http://www.drakeconsulting.com/
>>> https://twitter.com/dondrake 
>>> 800-733-2143
>>>
>>
>>
>


Re: Evenly balance the number of items in each RDD partition

2016-05-10 Thread Ayman Khalil
And btw, I'm using the Python API if this makes any difference.

On Tue, May 10, 2016 at 11:14 PM, Ayman Khalil 
wrote:

> Hi Don,
>
> This didn't help. My original rdd is already created using 10 partitions.
> As a matter of fact, after trying with rdd.coalesce(10, shuffle =
> true) out of curiosity, the rdd partitions became even more imbalanced:
> [(0, 5120), (1, 5120), (2, 5120), (3, 5120), (4, *3920*), (5, 4096), (6,
> 5120), (7, 5120), (8, 5120), (9, *6144*)]
>
>
> On Tue, May 10, 2016 at 10:16 PM, Don Drake  wrote:
>
>> You can call rdd.coalesce(10, shuffle = true) and the returning rdd will
>> be evenly balanced.  This obviously triggers a shuffle, so be advised it
>> could be an expensive operation depending on your RDD size.
>>
>> -Don
>>
>> On Tue, May 10, 2016 at 12:38 PM, Ayman Khalil 
>> wrote:
>>
>>> Hello,
>>>
>>> I have 50,000 items parallelized into an RDD with 10 partitions, I would
>>> like to evenly split the items over the partitions so:
>>> 50,000/10 = 5,000 in each RDD partition.
>>>
>>> What I get instead is the following (partition index, partition count):
>>> [(0, 4096), (1, 5120), (2, 5120), (3, 5120), (4, 5120), (5, 5120), (6,
>>> 5120), (7, 5120), (8, 5120), (9, 4944)]
>>>
>>> the total is correct (4096 + 4944 + 8*5120 = 50,000) but the partitions
>>> are imbalanced.
>>>
>>> Is there a way to do that?
>>>
>>> Thank you,
>>> Ayman
>>>
>>
>>
>>
>> --
>> Donald Drake
>> Drake Consulting
>> http://www.drakeconsulting.com/
>> https://twitter.com/dondrake 
>> 800-733-2143
>>
>
>


Re: Evenly balance the number of items in each RDD partition

2016-05-10 Thread Ayman Khalil
Hi Don,

This didn't help. My original rdd is already created using 10 partitions.
As a matter of fact, after trying with rdd.coalesce(10, shuffle = true) out
of curiosity, the rdd partitions became even more imbalanced:
[(0, 5120), (1, 5120), (2, 5120), (3, 5120), (4, *3920*), (5, 4096), (6,
5120), (7, 5120), (8, 5120), (9, *6144*)]


On Tue, May 10, 2016 at 10:16 PM, Don Drake  wrote:

> You can call rdd.coalesce(10, shuffle = true) and the returning rdd will
> be evenly balanced.  This obviously triggers a shuffle, so be advised it
> could be an expensive operation depending on your RDD size.
>
> -Don
>
> On Tue, May 10, 2016 at 12:38 PM, Ayman Khalil 
> wrote:
>
>> Hello,
>>
>> I have 50,000 items parallelized into an RDD with 10 partitions, I would
>> like to evenly split the items over the partitions so:
>> 50,000/10 = 5,000 in each RDD partition.
>>
>> What I get instead is the following (partition index, partition count):
>> [(0, 4096), (1, 5120), (2, 5120), (3, 5120), (4, 5120), (5, 5120), (6,
>> 5120), (7, 5120), (8, 5120), (9, 4944)]
>>
>> the total is correct (4096 + 4944 + 8*5120 = 50,000) but the partitions
>> are imbalanced.
>>
>> Is there a way to do that?
>>
>> Thank you,
>> Ayman
>>
>
>
>
> --
> Donald Drake
> Drake Consulting
> http://www.drakeconsulting.com/
> https://twitter.com/dondrake 
> 800-733-2143
>


Spark crashes with Filesystem recovery

2016-05-10 Thread Imran Akbar
I have some Python code that consistently ends up in this state:

ERROR:py4j.java_gateway:An error occurred while trying to connect to the
Java server
Traceback (most recent call last):
  File
"/home/ubuntu/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line
690, in start
self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 224, in meth
return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the
Java server
Traceback (most recent call last):
  File
"/home/ubuntu/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line
690, in start
self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 224, in meth
return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
Traceback (most recent call last):
  File "", line 2, in 
  File "/home/ubuntu/spark/python/pyspark/sql/dataframe.py", line 280, in
collect
port = self._jdf.collectToPython()
  File "/home/ubuntu/spark/python/pyspark/traceback_utils.py", line 78, in
__exit__
self._context._jsc.setCallSite(None)
  File
"/home/ubuntu/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line
811, in __call__
  File
"/home/ubuntu/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line
624, in send_command
  File
"/home/ubuntu/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line
579, in _get_connection
  File
"/home/ubuntu/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line
585, in _create_connection
  File
"/home/ubuntu/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line
697, in start
py4j.protocol.Py4JNetworkError: An error occurred while trying to connect
to the Java server

Even though I start pyspark with these options:
./pyspark --master local[4] --executor-memory 14g --driver-memory 14g
--packages com.databricks:spark-csv_2.11:1.4.0
--spark.deploy.recoveryMode=FILESYSTEM

and this in my /conf/spark-env.sh file:
- SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=FILESYSTEM
-Dspark.deploy.recoveryDirectory=/user/recovery"

How can I get HA to work in Spark?

thanks,
imran


Re: Evenly balance the number of items in each RDD partition

2016-05-10 Thread Don Drake
You can call rdd.coalesce(10, shuffle = true) and the returning rdd will be
evenly balanced.  This obviously triggers a shuffle, so be advised it could
be an expensive operation depending on your RDD size.

-Don

On Tue, May 10, 2016 at 12:38 PM, Ayman Khalil 
wrote:

> Hello,
>
> I have 50,000 items parallelized into an RDD with 10 partitions, I would
> like to evenly split the items over the partitions so:
> 50,000/10 = 5,000 in each RDD partition.
>
> What I get instead is the following (partition index, partition count):
> [(0, 4096), (1, 5120), (2, 5120), (3, 5120), (4, 5120), (5, 5120), (6,
> 5120), (7, 5120), (8, 5120), (9, 4944)]
>
> the total is correct (4096 + 4944 + 8*5120 = 50,000) but the partitions
> are imbalanced.
>
> Is there a way to do that?
>
> Thank you,
> Ayman
>



-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake 
800-733-2143


Hi test

2016-05-10 Thread Abi
Hello test

Pyspark accumulator

2016-05-10 Thread Abi
1. How come pyspark does not provide the localvalue function like scala ?

2. Why is pyspark more restrictive than scala ?

Re: Cluster Migration

2016-05-10 Thread Ajay Chander
Hi Deepak,
   Thanks for your response. If I am correct, you suggest reading all
of those files into an rdd on the cluster using wholeTextFiles then apply
compression codec on it, save the rdd to another Hadoop cluster?

Thank you,
Ajay

On Tuesday, May 10, 2016, Deepak Sharma  wrote:

> Hi Ajay
> You can look at wholeTextFiles method of rdd[string,string] and then map
> each of rdd  to saveAsTextFile .
> This will serve the purpose .
> I don't think if anything default like distcp exists in spark
>
> Thanks
> Deepak
> On 10 May 2016 11:27 pm, "Ajay Chander"  > wrote:
>
>> Hi Everyone,
>>
>> we are planning to migrate the data between 2 clusters and I see distcp
>> doesn't support data compression. Is there any efficient way to compress
>> the data during the migration ? Can I implement any spark job to do this ?
>>  Thanks.
>>
>


Re: Cluster Migration

2016-05-10 Thread Ajay Chander
I will try that out. Thank you!

On Tuesday, May 10, 2016, Deepak Sharma  wrote:

> Yes that's what I intended to say.
>
> Thanks
> Deepak
> On 10 May 2016 11:47 pm, "Ajay Chander"  > wrote:
>
>> Hi Deepak,
>>Thanks for your response. If I am correct, you suggest reading all
>> of those files into an rdd on the cluster using wholeTextFiles then apply
>> compression codec on it, save the rdd to another Hadoop cluster?
>>
>> Thank you,
>> Ajay
>>
>> On Tuesday, May 10, 2016, Deepak Sharma > > wrote:
>>
>>> Hi Ajay
>>> You can look at wholeTextFiles method of rdd[string,string] and then map
>>> each of rdd  to saveAsTextFile .
>>> This will serve the purpose .
>>> I don't think if anything default like distcp exists in spark
>>>
>>> Thanks
>>> Deepak
>>> On 10 May 2016 11:27 pm, "Ajay Chander"  wrote:
>>>
 Hi Everyone,

 we are planning to migrate the data between 2 clusters and I see distcp
 doesn't support data compression. Is there any efficient way to compress
 the data during the migration ? Can I implement any spark job to do this ?
  Thanks.

>>>


pyspark mappartions ()

2016-05-10 Thread Abi
Is there any example of this ? I want to see how you write the the iterable 
example

Re: Cluster Migration

2016-05-10 Thread Deepak Sharma
Yes that's what I intended to say.

Thanks
Deepak
On 10 May 2016 11:47 pm, "Ajay Chander"  wrote:

> Hi Deepak,
>Thanks for your response. If I am correct, you suggest reading all
> of those files into an rdd on the cluster using wholeTextFiles then apply
> compression codec on it, save the rdd to another Hadoop cluster?
>
> Thank you,
> Ajay
>
> On Tuesday, May 10, 2016, Deepak Sharma  wrote:
>
>> Hi Ajay
>> You can look at wholeTextFiles method of rdd[string,string] and then map
>> each of rdd  to saveAsTextFile .
>> This will serve the purpose .
>> I don't think if anything default like distcp exists in spark
>>
>> Thanks
>> Deepak
>> On 10 May 2016 11:27 pm, "Ajay Chander"  wrote:
>>
>>> Hi Everyone,
>>>
>>> we are planning to migrate the data between 2 clusters and I see distcp
>>> doesn't support data compression. Is there any efficient way to compress
>>> the data during the migration ? Can I implement any spark job to do this ?
>>>  Thanks.
>>>
>>


Re: Cluster Migration

2016-05-10 Thread Deepak Sharma
Hi Ajay
You can look at wholeTextFiles method of rdd[string,string] and then map
each of rdd  to saveAsTextFile .
This will serve the purpose .
I don't think if anything default like distcp exists in spark

Thanks
Deepak
On 10 May 2016 11:27 pm, "Ajay Chander"  wrote:

> Hi Everyone,
>
> we are planning to migrate the data between 2 clusters and I see distcp
> doesn't support data compression. Is there any efficient way to compress
> the data during the migration ? Can I implement any spark job to do this ?
>  Thanks.
>


Cluster Migration

2016-05-10 Thread Ajay Chander
Hi Everyone,

we are planning to migrate the data between 2 clusters and I see distcp
doesn't support data compression. Is there any efficient way to compress
the data during the migration ? Can I implement any spark job to do this ?
 Thanks.


Evenly balance the number of items in each RDD partition

2016-05-10 Thread Ayman Khalil
Hello,

I have 50,000 items parallelized into an RDD with 10 partitions, I would
like to evenly split the items over the partitions so:
50,000/10 = 5,000 in each RDD partition.

What I get instead is the following (partition index, partition count):
[(0, 4096), (1, 5120), (2, 5120), (3, 5120), (4, 5120), (5, 5120), (6,
5120), (7, 5120), (8, 5120), (9, 4944)]

the total is correct (4096 + 4944 + 8*5120 = 50,000) but the partitions are
imbalanced.

Is there a way to do that?

Thank you,
Ayman


Re: partitioner aware subtract

2016-05-10 Thread Raghava Mutharaju
Thank you for the response.

This does not work on the test case that I mentioned in the previous email.

val data1 = Seq((1 -> 2), (1 -> 5), (2 -> 3), (3 -> 20), (3 -> 16))
val data2 = Seq((1 -> 2), (3 -> 30), (3 -> 16), (5 -> 12))
val rdd1 = sc.parallelize(data1, 8)
val rdd2 = sc.parallelize(data2, 8)
val diff = rdd1.zipPartitions(rdd2){ (leftItr, rightItr) =>
  leftItr.filter(p => !rightItr.contains(p))
}
diff.collect().foreach(println)
(1,5)
(2,3)
(3,20)
(3,16)

(3, 16) shouldn't be in the diff. I guess this shows up because rdd2 is
smaller than rdd1 and rdd2's iterator (rightItr) would have completed
before leftIter?

Anyway, we did the subtract in the following way:

using mapPartitions, group the values by key as a set in rdd2. Then do a
left outer join of rdd1 with rdd2 and filter it. This preserves
partitioning and also takes into account that both RDDs are already hash
partitioned.

Regards,
Raghava.


On Tue, May 10, 2016 at 11:44 AM, Rishi Mishra 
wrote:

> As you have same partitioner and number of partitions probably you can use
> zipPartition and provide a user defined function to substract .
>
> A very primitive  example being.
>
> val data1 = Seq(1->1,2->2,3->3,4->4,5->5,6->6,7->7)
> val data2 = Seq(1->1,2->2,3->3,4->4,5->5,6->6)
> val rdd1 = sc.parallelize(data1, 2)
> val rdd2 = sc.parallelize(data2, 2)
> val sum = rdd1.zipPartitions(rdd2){ (leftItr, rightItr) =>
>   leftItr.filter(p => !rightItr.contains(p))
> }
> sum.foreach(println)
>
>
>
> Regards,
> Rishitesh Mishra,
> SnappyData . (http://www.snappydata.io/)
>
> https://in.linkedin.com/in/rishiteshmishra
>
> On Mon, May 9, 2016 at 7:35 PM, Raghava Mutharaju <
> m.vijayaragh...@gmail.com> wrote:
>
>> We tried that but couldn't figure out a way to efficiently filter it.
>> Lets take two RDDs.
>>
>> rdd1:
>>
>> (1,2)
>> (1,5)
>> (2,3)
>> (3,20)
>> (3,16)
>>
>> rdd2:
>>
>> (1,2)
>> (3,30)
>> (3,16)
>> (5,12)
>>
>> rdd1.leftOuterJoin(rdd2) and get rdd1.subtract(rdd2):
>>
>> (1,(2,Some(2)))
>> (1,(5,Some(2)))
>> (2,(3,None))
>> (3,(20,Some(30)))
>> (3,(20,Some(16)))
>> (3,(16,Some(30)))
>> (3,(16,Some(16)))
>>
>> case (x, (y, z)) => Apart from allowing z == None and filtering on y ==
>> z, we also should filter out (3, (16, Some(30))). How can we do that
>> efficiently without resorting to broadcast of any elements of rdd2?
>>
>> Regards,
>> Raghava.
>>
>>
>> On Mon, May 9, 2016 at 6:27 AM, ayan guha  wrote:
>>
>>> How about outer join?
>>> On 9 May 2016 13:18, "Raghava Mutharaju" 
>>> wrote:
>>>
 Hello All,

 We have two PairRDDs (rdd1, rdd2) which are hash partitioned on key
 (number of partitions are same for both the RDDs). We would like to
 subtract rdd2 from rdd1.

 The subtract code at
 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala
 seems to group the elements of both the RDDs using (x, null) where x is the
 element of the RDD and partition them. Then it makes use of
 subtractByKey(). This way, RDDs have to be repartitioned on x (which in our
 case, is both key and value combined). In our case, both the RDDs are
 already hash partitioned on the key of x. Can we take advantage of this by
 having a PairRDD/HashPartitioner-aware subtract? Is there a way to use
 mapPartitions() for this?

 We tried to broadcast rdd2 and use mapPartitions. But this turns out to
 be memory consuming and inefficient. We tried to do a local set difference
 between rdd1 and the broadcasted rdd2 (in mapPartitions of rdd1). We did
 use destroy() on the broadcasted value, but it does not help.

 The current subtract method is slow for us. rdd1 and rdd2 are around
 700MB each and the subtract takes around 14 seconds.

 Any ideas on this issue is highly appreciated.

 Regards,
 Raghava.

>>>
>>
>>
>> --
>> Regards,
>> Raghava
>> http://raghavam.github.io
>>
>
>


-- 
Regards,
Raghava
http://raghavam.github.io


Re: Spark Streaming : is spark.streaming.receiver.maxRate valid for DirectKafkaApproach

2016-05-10 Thread Cody Koeninger
Pretty much the same problems you'd expect any time you have skew in a
distributed system - some leaders are going to be working harder than
others & have more disk space used, some consumers are going to be
working harder than others.

It sounds like you're talking about differences in topics, not
partitions, although 3 partitions per topic may not be enough to
balance depending on the size of your cluster.  If your job has
significantly higher number of topicpartitions than it does executors,
that reduces the chance that some executors will be idle though,
because once an executor finishes processing an rdd partition for a
small topic it will be assigned another one.

If you're worried about some particular topic monopolizing resources,
maxRatePerPartition will let you limit that.  If you have some more
complicated need, you may need to modify the code to suit your
purposes.




On Tue, May 10, 2016 at 10:50 AM, chandan prakash
 wrote:
> Hey Cody,
> What kind of problems exactly?
> ...data rate in kafka topics do vary significantly in my
> caseout of total 50 topics(with 3 partitions each),half of the
> topics generate data at very high speed say 1lakh/sec while other half
> generate at very low rate say 1k/sec...
> i have to process them together and insert into the same database
> table...will it be better to have 2 different spark streaming
> applications instead?
> I dont have control over kafka topics and partitions, they are a central
> system used by many other systems as well.
>
> Regards,
> Chandan
>
> On Tue, May 10, 2016 at 8:01 PM, Cody Koeninger  wrote:
>>
>> maxRate is not used by the direct stream.
>>
>> Significant skew in rate across different partitions for the same
>> topic is going to cause you all kinds of problems, not just with spark
>> streaming.
>>
>> You can turn on backpressure, but you're better off addressing the
>> underlying issue if you can.
>>
>> On Tue, May 10, 2016 at 8:08 AM, Soumitra Siddharth Johri
>>  wrote:
>> > Also look at back pressure enabled. Both of these can be used to limit
>> > the
>> > rate
>> >
>> > Sent from my iPhone
>> >
>> > On May 10, 2016, at 8:02 AM, chandan prakash 
>> > wrote:
>> >
>> > Hi,
>> > I am using Spark Streaming with Direct kafka approach.
>> > Want to limit number of event records coming in my batches.
>> > Have question regarding  following 2 parameters :
>> > 1. spark.streaming.receiver.maxRate
>> > 2. spark.streaming.kafka.maxRatePerPartition
>> >
>> >
>> > The documentation
>> >
>> > (http://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications
>> > ) says .
>> > " spark.streaming.receiver.maxRate for receivers and
>> > spark.streaming.kafka.maxRatePerPartition for Direct Kafka approach "
>> >
>> > Does it mean that  spark.streaming.receiver.maxRate  is valid only for
>> > Receiver based approach only ?  (not the DirectKafkaApproach as well)
>> >
>> > If yes, then how do we control total number of records/sec in
>> > DirectKafka
>> > ?.because spark.streaming.kafka.maxRatePerPartition  only controls
>> > max
>> > rate per partition and not whole records. There might be many
>> > partitions
>> > some with very fast rate and some with very slow rate.
>> >
>> > Regards,
>> > Chandan
>> >
>> >
>> >
>> > --
>> > Chandan Prakash
>> >
>
>
>
>
> --
> Chandan Prakash
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming : is spark.streaming.receiver.maxRate valid for DirectKafkaApproach

2016-05-10 Thread chandan prakash
Hey Cody,
What kind of problems exactly?
...data rate in kafka topics do vary significantly in my
caseout of total 50 topics(with 3 partitions each),half of the
topics generate data at very high speed say 1lakh/sec while other half
generate at very low rate say 1k/sec...
i have to process them together and insert into the same database
table...will it be better to have 2 different spark streaming
applications instead?
I dont have control over kafka topics and partitions, they are a central
system used by many other systems as well.

Regards,
Chandan

On Tue, May 10, 2016 at 8:01 PM, Cody Koeninger  wrote:

> maxRate is not used by the direct stream.
>
> Significant skew in rate across different partitions for the same
> topic is going to cause you all kinds of problems, not just with spark
> streaming.
>
> You can turn on backpressure, but you're better off addressing the
> underlying issue if you can.
>
> On Tue, May 10, 2016 at 8:08 AM, Soumitra Siddharth Johri
>  wrote:
> > Also look at back pressure enabled. Both of these can be used to limit
> the
> > rate
> >
> > Sent from my iPhone
> >
> > On May 10, 2016, at 8:02 AM, chandan prakash 
> > wrote:
> >
> > Hi,
> > I am using Spark Streaming with Direct kafka approach.
> > Want to limit number of event records coming in my batches.
> > Have question regarding  following 2 parameters :
> > 1. spark.streaming.receiver.maxRate
> > 2. spark.streaming.kafka.maxRatePerPartition
> >
> >
> > The documentation
> > (
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications
> > ) says .
> > " spark.streaming.receiver.maxRate for receivers and
> > spark.streaming.kafka.maxRatePerPartition for Direct Kafka approach "
> >
> > Does it mean that  spark.streaming.receiver.maxRate  is valid only for
> > Receiver based approach only ?  (not the DirectKafkaApproach as well)
> >
> > If yes, then how do we control total number of records/sec in DirectKafka
> > ?.because spark.streaming.kafka.maxRatePerPartition  only controls
> max
> > rate per partition and not whole records. There might be many
> partitions
> > some with very fast rate and some with very slow rate.
> >
> > Regards,
> > Chandan
> >
> >
> >
> > --
> > Chandan Prakash
> >
>



-- 
Chandan Prakash


Re: partitioner aware subtract

2016-05-10 Thread Rishi Mishra
As you have same partitioner and number of partitions probably you can use
zipPartition and provide a user defined function to substract .

A very primitive  example being.

val data1 = Seq(1->1,2->2,3->3,4->4,5->5,6->6,7->7)
val data2 = Seq(1->1,2->2,3->3,4->4,5->5,6->6)
val rdd1 = sc.parallelize(data1, 2)
val rdd2 = sc.parallelize(data2, 2)
val sum = rdd1.zipPartitions(rdd2){ (leftItr, rightItr) =>
  leftItr.filter(p => !rightItr.contains(p))
}
sum.foreach(println)



Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra

On Mon, May 9, 2016 at 7:35 PM, Raghava Mutharaju  wrote:

> We tried that but couldn't figure out a way to efficiently filter it. Lets
> take two RDDs.
>
> rdd1:
>
> (1,2)
> (1,5)
> (2,3)
> (3,20)
> (3,16)
>
> rdd2:
>
> (1,2)
> (3,30)
> (3,16)
> (5,12)
>
> rdd1.leftOuterJoin(rdd2) and get rdd1.subtract(rdd2):
>
> (1,(2,Some(2)))
> (1,(5,Some(2)))
> (2,(3,None))
> (3,(20,Some(30)))
> (3,(20,Some(16)))
> (3,(16,Some(30)))
> (3,(16,Some(16)))
>
> case (x, (y, z)) => Apart from allowing z == None and filtering on y == z,
> we also should filter out (3, (16, Some(30))). How can we do that
> efficiently without resorting to broadcast of any elements of rdd2?
>
> Regards,
> Raghava.
>
>
> On Mon, May 9, 2016 at 6:27 AM, ayan guha  wrote:
>
>> How about outer join?
>> On 9 May 2016 13:18, "Raghava Mutharaju" 
>> wrote:
>>
>>> Hello All,
>>>
>>> We have two PairRDDs (rdd1, rdd2) which are hash partitioned on key
>>> (number of partitions are same for both the RDDs). We would like to
>>> subtract rdd2 from rdd1.
>>>
>>> The subtract code at
>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala
>>> seems to group the elements of both the RDDs using (x, null) where x is the
>>> element of the RDD and partition them. Then it makes use of
>>> subtractByKey(). This way, RDDs have to be repartitioned on x (which in our
>>> case, is both key and value combined). In our case, both the RDDs are
>>> already hash partitioned on the key of x. Can we take advantage of this by
>>> having a PairRDD/HashPartitioner-aware subtract? Is there a way to use
>>> mapPartitions() for this?
>>>
>>> We tried to broadcast rdd2 and use mapPartitions. But this turns out to
>>> be memory consuming and inefficient. We tried to do a local set difference
>>> between rdd1 and the broadcasted rdd2 (in mapPartitions of rdd1). We did
>>> use destroy() on the broadcasted value, but it does not help.
>>>
>>> The current subtract method is slow for us. rdd1 and rdd2 are around
>>> 700MB each and the subtract takes around 14 seconds.
>>>
>>> Any ideas on this issue is highly appreciated.
>>>
>>> Regards,
>>> Raghava.
>>>
>>
>
>
> --
> Regards,
> Raghava
> http://raghavam.github.io
>


Re: Spark-csv- partitionBy

2016-05-10 Thread Xinh Huynh
Hi Pradeep,

Here is a way to partition your data into different files, by calling
repartition() on the dataframe:
df.repartition(12, $"Month")
  .write
  .format(...)

This is assuming you want to partition by a "month" column where there are
12 different values. Each partition will be stored in a separate file (but
in the same folder).

Xinh

On Tue, May 10, 2016 at 2:10 AM, Mail.com  wrote:

> Hi,
>
> I don't want to reduce partitions. Should write files depending upon the
> column value.
>
> Trying to understand how reducing partition size will make it work.
>
> Regards,
> Pradeep
>
> On May 9, 2016, at 6:42 PM, Gourav Sengupta 
> wrote:
>
> Hi,
>
> its supported, try to use coalesce(1) (the spelling is wrong) and after
> that do the partitions.
>
> Regards,
> Gourav
>
> On Mon, May 9, 2016 at 7:12 PM, Mail.com  <
> pradeep.mi...@mail.com> wrote:
>
>> Hi,
>>
>> I have to write tab delimited file and need to have one directory for
>> each unique value of a column.
>>
>> I tried using spark-csv with partitionBy and seems it is not supported.
>> Is there any other option available for doing this?
>>
>> Regards,
>> Pradeep
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Updating Values Inside Foreach Rdd loop

2016-05-10 Thread Rishi Mishra
Hi Harsh,
Probably you need to maintain some state for your values, as you are
updating some of the keys in a batch and check for a global state of your
equation.
Can you check the API mapWithState of DStream ?

Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra

On Mon, May 9, 2016 at 8:40 PM, HARSH TAKKAR  wrote:

> Hi
>
> Please help.
>
> On Sat, 7 May 2016, 11:43 p.m. HARSH TAKKAR, 
> wrote:
>
>> Hi Ted
>>
>> Following is my use case.
>>
>> I have a prediction algorithm where i need to update some records to
>> predict the target.
>>
>> For eg.
>> I have an eq. Y=  mX +c
>> I need to change value of Xi of some records and calculate sum(Yi) if the
>> value of prediction is not close to target value then repeat the process.
>>
>> In each iteration different set of values are updated but result is
>> checked when we sum up the values.
>>
>> On Sat, 7 May 2016, 8:58 a.m. Ted Yu,  wrote:
>>
>>> Using RDDs requires some 'low level' optimization techniques.
>>> While using dataframes / Spark SQL allows you to leverage existing code.
>>>
>>> If you can share some more of your use case, that would help other
>>> people provide suggestions.
>>>
>>> Thanks
>>>
>>> On May 6, 2016, at 6:57 PM, HARSH TAKKAR  wrote:
>>>
>>> Hi Ted
>>>
>>> I am aware that rdd are immutable, but in my use case i need to update
>>> same data set after each iteration.
>>>
>>> Following are the points which i was exploring.
>>>
>>> 1. Generating rdd in each iteration.( It might use a lot of memory).
>>>
>>> 2. Using Hive tables and update the same table after each iteration.
>>>
>>> Please suggest,which one of the methods listed above will be good to use
>>> , or is there are more better ways to accomplish it.
>>>
>>> On Fri, 6 May 2016, 7:09 p.m. Ted Yu,  wrote:
>>>
 Please see the doc at the beginning of RDD class:

  * A Resilient Distributed Dataset (RDD), the basic abstraction in
 Spark. Represents an immutable,
  * partitioned collection of elements that can be operated on in
 parallel. This class contains the
  * basic operations available on all RDDs, such as `map`, `filter`, and
 `persist`. In addition,

 On Fri, May 6, 2016 at 5:25 AM, HARSH TAKKAR 
 wrote:

> Hi
>
> Is there a way i can modify a RDD, in for-each loop,
>
> Basically, i have a use case in which i need to perform multiple
> iteration over data and modify few values in each iteration.
>
>
> Please help.
>




Re: Accumulator question

2016-05-10 Thread Rishi Mishra
Your mail does not describe  much , but wont a simple reduce function help
you ?
Something like as below

val data = Seq(1,2,3,4,5,6,7)
val rdd = sc.parallelize(data, 2)
val sum = rdd.reduce((a,b) => a+b)



Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra

On Tue, May 10, 2016 at 10:44 AM, Abi  wrote:

> I am splitting an integer array in 2 partitions and using an accumulator
> to sum the array. problem is
>
> 1. I am not seeing execution time becoming half of a linear summing.
>
> 2. The second node (from looking at timestamps) takes 3 times as long as
> the first node. This gives the impression it is "waiting" for the first
> node to finish.
>
> Hence, I am given the impression using accumulator.sum () in the kernel
> and rdd.foreach (kernel) is making things sequential.
>
> Any api/setting suggestions where I could make things parallel ?
>
> On Mon, May 9, 2016 at 8:24 PM, Abi  wrote:
>
>> I am splitting an integer array in 2 partitions and using an accumulator
>> to sum the array. problem is
>>
>> 1. I am not seeing execution time becoming half of a linear summing.
>>
>> 2. The second node (from looking at timestamps) takes 3 times as long as
>> the first node. This gives the impression it is "waiting" for the first
>> node to finish.
>>
>> Hence, I am given the impression using accumulator.sum () in the kernel
>> and rdd.foreach (kernel) is making things sequential.
>>
>> Any api/setting suggestions where I could make things parallel ?
>>
>>
>>
>


Re: Spark Streaming : is spark.streaming.receiver.maxRate valid for DirectKafkaApproach

2016-05-10 Thread Soumitra Johri
I think a better partitioning scheme can help u too.
On Tue, May 10, 2016 at 10:31 AM Cody Koeninger  wrote:

> maxRate is not used by the direct stream.
>
> Significant skew in rate across different partitions for the same
> topic is going to cause you all kinds of problems, not just with spark
> streaming.
>
> You can turn on backpressure, but you're better off addressing the
> underlying issue if you can.
>
> On Tue, May 10, 2016 at 8:08 AM, Soumitra Siddharth Johri
>  wrote:
> > Also look at back pressure enabled. Both of these can be used to limit
> the
> > rate
> >
> > Sent from my iPhone
> >
> > On May 10, 2016, at 8:02 AM, chandan prakash 
> > wrote:
> >
> > Hi,
> > I am using Spark Streaming with Direct kafka approach.
> > Want to limit number of event records coming in my batches.
> > Have question regarding  following 2 parameters :
> > 1. spark.streaming.receiver.maxRate
> > 2. spark.streaming.kafka.maxRatePerPartition
> >
> >
> > The documentation
> > (
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications
> > ) says .
> > " spark.streaming.receiver.maxRate for receivers and
> > spark.streaming.kafka.maxRatePerPartition for Direct Kafka approach "
> >
> > Does it mean that  spark.streaming.receiver.maxRate  is valid only for
> > Receiver based approach only ?  (not the DirectKafkaApproach as well)
> >
> > If yes, then how do we control total number of records/sec in DirectKafka
> > ?.because spark.streaming.kafka.maxRatePerPartition  only controls
> max
> > rate per partition and not whole records. There might be many
> partitions
> > some with very fast rate and some with very slow rate.
> >
> > Regards,
> > Chandan
> >
> >
> >
> > --
> > Chandan Prakash
> >
>


Re: Init/Setup worker

2016-05-10 Thread Natu Lauchande
Hi,

Not sure if this might be helpful to you :
https://github.com/ondra-m/ruby-spark .

Regards,
Natu

On Tue, May 10, 2016 at 4:37 PM, Lionel PERRIN 
wrote:

> Hello,
>
>
>
> I’m looking for a solution to use jruby on top of spark. The only tricky
> point is that I need that every worker thread has a ruby interpreter
> initialized. *Basically, I need to register a function to be called when
> each worker thread is created* : a thread local variable must be set for
> the ruby interpreter so that ruby object can be deserialized.
>
>
>
> Is there any solution to setup the worker threads before any spark call is
> made using this thread ?
>
>
> Regards,
>
>
> Lionel
>


Init/Setup worker

2016-05-10 Thread Lionel PERRIN
Hello,

 

I’m looking for a solution to use jruby on top of spark. The only tricky point 
is that I
need that every worker thread has a ruby interpreter initialized.
Basically, I need to register a function to be called when each worker thread
is created : a thread local variable must be set for the ruby interpreter so
that ruby object can be deserialized.

 

Is there any solution to setup the worker threads before any
spark call is made using this thread ?
Regards,
Lionel

Re: Spark Streaming : is spark.streaming.receiver.maxRate valid for DirectKafkaApproach

2016-05-10 Thread Cody Koeninger
maxRate is not used by the direct stream.

Significant skew in rate across different partitions for the same
topic is going to cause you all kinds of problems, not just with spark
streaming.

You can turn on backpressure, but you're better off addressing the
underlying issue if you can.

On Tue, May 10, 2016 at 8:08 AM, Soumitra Siddharth Johri
 wrote:
> Also look at back pressure enabled. Both of these can be used to limit the
> rate
>
> Sent from my iPhone
>
> On May 10, 2016, at 8:02 AM, chandan prakash 
> wrote:
>
> Hi,
> I am using Spark Streaming with Direct kafka approach.
> Want to limit number of event records coming in my batches.
> Have question regarding  following 2 parameters :
> 1. spark.streaming.receiver.maxRate
> 2. spark.streaming.kafka.maxRatePerPartition
>
>
> The documentation
> (http://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications
> ) says .
> " spark.streaming.receiver.maxRate for receivers and
> spark.streaming.kafka.maxRatePerPartition for Direct Kafka approach "
>
> Does it mean that  spark.streaming.receiver.maxRate  is valid only for
> Receiver based approach only ?  (not the DirectKafkaApproach as well)
>
> If yes, then how do we control total number of records/sec in DirectKafka
> ?.because spark.streaming.kafka.maxRatePerPartition  only controls max
> rate per partition and not whole records. There might be many partitions
> some with very fast rate and some with very slow rate.
>
> Regards,
> Chandan
>
>
>
> --
> Chandan Prakash
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Re: Re: Re: How big the spark stream window could be ?

2016-05-10 Thread Mich Talebzadeh
Hi Mingwei,

In your Spark conf setting what are you providing for these parameters. *Are
you capping them?*

For example

  val conf = new SparkConf().
   setAppName("AppName").
   setMaster("local[2]").
   set("spark.executor.memory", "4G").
   set("spark.cores.max", "2").
   set("spark.driver.allowMultipleContexts", "true")
  val sc = new SparkContext(conf)

I assume you are running in standalone mode so each worker/aka slave grabs
all the available cores and allocates the remaining memory on each host. Do
not provide these in

Do not provide new values for these parameter meaning overwrite them in

*${SPARK_HOME}/bin/spark-submit  --*


HTH









Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 10 May 2016 at 03:12, 李明伟  wrote:

> Hi Mich
>
> I added some more infor (the spark-env.sh setting and top command output
> in that thread.) Can you help to check pleas?
>
> Regards
> Mingwei
>
>
>
>
>
> At 2016-05-09 23:45:19, "Mich Talebzadeh" 
> wrote:
>
> I had a look at the thread.
>
> This is what you have which I gather a standalone box in other words one
> worker node
>
> bin/spark-submit   --master spark://ES01:7077 --executor-memory 4G
> --num-executors 1 --total-executor-cores 1 ./latest5min.py 1>a.log 2>b.log
>
> But what I don't understand why is using 80% of your RAM as opposed to 25%
> of it (4GB/16GB) right?
>
> Where else have you set up these parameters for example in
> $SPARK_HOME/con/spark-env.sh?
>
> Can you send the output of /usr/bin/free and top
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 9 May 2016 at 16:19, 李明伟  wrote:
>
>> Thanks for all the information guys.
>>
>> I wrote some code to do the test. Not using window. So only calculating
>> data for each batch interval. I set the interval to 30 seconds also reduce
>> the size of data to about 30 000 lines of csv.
>> Means my code should calculation on 30 000 lines of CSV in 30 seconds. I
>> think it is not a very heavy workload. But my spark stream code still crash.
>>
>> I send another post to the user list here
>> http://apache-spark-user-list.1001560.n3.nabble.com/Why-I-have-memory-leaking-for-such-simple-spark-stream-code-td26904.html
>>
>> Is it possible for you to have a look please? Very appreciate.
>>
>>
>>
>>
>>
>> At 2016-05-09 17:49:22, "Saisai Shao"  wrote:
>>
>> Pease see the inline comments.
>>
>>
>> On Mon, May 9, 2016 at 5:31 PM, Ashok Kumar  wrote:
>>
>>> Thank you.
>>>
>>> So If I create spark streaming then
>>>
>>>
>>>1. The streams will always need to be cached? It cannot be stored in
>>>persistent storage
>>>
>>> You don't need to cache the stream explicitly if you don't have specific
>> requirement, Spark will do it for you depends on different streaming
>> sources (Kafka or socket).
>>
>>>
>>>1. The stream data cached will be distributed among all nodes of
>>>Spark among executors
>>>2. As I understand each Spark worker node has one executor that
>>>includes cache. So the streaming data is distributed among these work 
>>> node
>>>caches. For example if I have 4 worker nodes each cache will have a 
>>> quarter
>>>of data (this assumes that cache size among worker nodes is the same.)
>>>
>>> Ideally, it will distributed evenly across the executors, also this is
>> target for tuning. Normally it depends on several conditions like receiver
>> distribution, partition distribution.
>>
>>
>>>
>>> The issue raises if the amount of streaming data does not fit into these
>>> 4 caches? Will the job crash?
>>>
>>>
>>> On Monday, 9 May 2016, 10:16, Saisai Shao 
>>> wrote:
>>>
>>>
>>> No, each executor only stores part of data in memory (it depends on how
>>> the partition are distributed and how many receivers you have).
>>>
>>> For WindowedDStream, it will obviously cache the data in memory, from my
>>> understanding you don't need to call cache() again.
>>>
>>> On Mon, May 9, 2016 at 5:06 PM, Ashok Kumar 
>>> wrote:
>>>
>>> hi,
>>>
>>> so if i have 10gb of streaming data coming in does it require 10gb of
>>> memory in each node?
>>>
>>> also in that case why do we need using
>>>
>>> dstream.cache()
>>>
>>> thanks
>>>
>>>
>>> On Monday, 9 May 2016, 9:58, Saisai Shao  wrote:
>>>
>>>
>>> It depends on you to write the Spark application, normally if data is
>>> already on the persistent storage, there's no need to be put into memory.
>>> The reason why Spark Streaming has to be stored in memory is that streaming
>>> source is not persistent source, so you need to have a place to store the
>>> data.
>>>
>>> On Mon, May

Re: Spark Streaming : is spark.streaming.receiver.maxRate valid for DirectKafkaApproach

2016-05-10 Thread Soumitra Siddharth Johri
Also look at back pressure enabled. Both of these can be used to limit the rate

Sent from my iPhone

> On May 10, 2016, at 8:02 AM, chandan prakash  
> wrote:
> 
> Hi,
> I am using Spark Streaming with Direct kafka approach.
> Want to limit number of event records coming in my batches.
> Have question regarding  following 2 parameters : 
> 1. spark.streaming.receiver.maxRate
> 2. spark.streaming.kafka.maxRatePerPartition
> 
> 
> The documentation 
> (http://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications
>  ) says .
> " spark.streaming.receiver.maxRate for receivers and 
> spark.streaming.kafka.maxRatePerPartition for Direct Kafka approach "
> 
> Does it mean that  spark.streaming.receiver.maxRate  is valid only for 
> Receiver based approach only ?  (not the DirectKafkaApproach as well)
> 
> If yes, then how do we control total number of records/sec in DirectKafka 
> ?.because spark.streaming.kafka.maxRatePerPartition  only controls max 
> rate per partition and not whole records. There might be many partitions 
> some with very fast rate and some with very slow rate.
> 
> Regards,
> Chandan
> 
> 
> 
> -- 
> Chandan Prakash
> 


Re: best fit - Dataframe and spark sql use cases

2016-05-10 Thread Mathieu Longtin
Spark SQL is translated to DataFrame operations by the SQL engine. Use
whichever is more comfortable for the task. Unless I'm doing something very
straight forward, I go with SQL, since any improvement to the SQL engine
will improve the resulting DataFrame operations. Hard-coded DataFrame
operation won't change even if a better operation becomes available.

On Mon, May 9, 2016 at 10:37 PM Divya Gehlot 
wrote:

> Hi,
> I would like to know the uses cases where data frames is best fit and use
> cases where Spark SQL is best fit based on the one's  experience .
>
>
> Thanks,
> Divya
>
>
>
>
>
> --
Mathieu Longtin
1-514-803-8977


Re: spark 2.0 issue with yarn?

2016-05-10 Thread Steve Loughran

On 9 May 2016, at 21:24, Jesse F Chen 
mailto:jfc...@us.ibm.com>> wrote:


I had been running fine until builds around 05/07/2016

If I used the "--master yarn" in builds after 05/07, I got the following 
error...sounds like something jars are missing.

I am using YARN 2.7.2 and Hive 1.2.1.

Do I need something new to deploy related to YARN?

bin/spark-sql -driver-memory 10g --verbose --master yarn --packages 
com.databricks:spark-csv_2.10:1.3.0 --executor-memory 4g --num-executors 20 
--executor-cores 2

16/05/09 13:15:21 INFO server.Server: jetty-8.y.z-SNAPSHOT
16/05/09 13:15:21 INFO server.AbstractConnector: Started 
SelectChannelConnector@0.0.0.0:4041
16/05/09 13:15:21 INFO util.Utils: Successfully started service 'SparkUI' on 
port 4041.
16/05/09 13:15:21 INFO ui.SparkUI: Bound SparkUI to 0.0.0.0, and started at 
http://bigaperf116.svl.ibm.com:4041
Exception in thread "main" java.lang.NoClassDefFoundError: 
com/sun/jersey/api/client/config/ClientConfig
at 
org.apache.hadoop.yarn.client.api.TimelineClient.createTimelineClient(TimelineClient.java:45)
at 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceInit(YarnClientImpl.java:163)
at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:150)
at 
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:56)
at 
org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:148)


Looks like Jersey client isn't on the classpath.

1. Consider filing a JIRA
2. set  spark.hadoop.yarn.timeline-service.enabled false to turn off ATS


at org.apache.spark.SparkContext.(SparkContext.scala:502)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2246)
at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:762)
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.init(SparkSQLEnv.scala:57)
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.(SparkSQLCLIDriver.scala:281)
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:138)
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:727)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:183)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:208)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:122)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: 
com.sun.jersey.api.client.config.ClientConfig
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 22 more
16/05/09 13:15:21 INFO storage.DiskBlockManager: Shutdown hook called
16/05/09 13:15:21 INFO util.ShutdownHookManager: Shutdown hook called
16/05/09 13:15:21 INFO util.ShutdownHookManager: Deleting directory 
/tmp/spark-ac33b501-b9c3-47a3-93c8-fa02720bf4bb
16/05/09 13:15:21 INFO util.ShutdownHookManager: Deleting directory 
/tmp/spark-65cb43d9-c122-4106-a0a8-ae7d92d9e19c
16/05/09 13:15:21 INFO util.ShutdownHookManager: Deleting directory 
/tmp/spark-65cb43d9-c122-4106-a0a8-ae7d92d9e19c/userFiles-46dde536-29e5-46b3-a530-e5ad6640f8b2





<07983638.gif>  JESSE CHEN
Big Data Performance | IBM Analytics

Office: 408 463 2296
Mobile: 408 828 9068
Email: jfc...@us.ibm.com






Spark Streaming : is spark.streaming.receiver.maxRate valid for DirectKafkaApproach

2016-05-10 Thread chandan prakash
Hi,
I am using Spark Streaming with Direct kafka approach.
Want to limit number of event records coming in my batches.
Have question regarding  following 2 parameters :
1. spark.streaming.receiver.maxRate
2. spark.streaming.kafka.maxRatePerPartition


The documentation (
http://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications
) says .
" spark.streaming.receiver.maxRate for receivers and
spark.streaming.kafka.maxRatePerPartition for Direct Kafka approach "

*Does it mean that  spark.streaming.receiver.maxRate  is valid only for
Receiver based approach only ?  (not the DirectKafkaApproach as well)*

*If yes, then how do we control total number of records/sec in DirectKafka
?.because spark.streaming.kafka.maxRatePerPartition  only controls max
rate per partition and not whole records. There might be many
partitions some with very fast rate and some with very slow rate.*

Regards,
Chandan



-- 
Chandan Prakash


Reading table schema from Cassandra

2016-05-10 Thread justneeraj
Hi,

We are using Spark Cassandra connector for our app. 

And I am trying to create higher level roll up tables. e.g. minutes table
from seconds table. 

If my tables are already defined. How can I read the schema of table?
So that I can load them in the Dataframe and create the aggregates. 

Any help will be really thankful. 

Thanks,
Neeraj 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Reading-table-schema-from-Cassandra-tp26915.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark-csv- partitionBy

2016-05-10 Thread Mail.com
Hi,

I don't want to reduce partitions. Should write files depending upon the column 
value.

Trying to understand how reducing partition size will make it work.

Regards,
Pradeep

> On May 9, 2016, at 6:42 PM, Gourav Sengupta  wrote:
> 
> Hi,
> 
> its supported, try to use coalesce(1) (the spelling is wrong) and after that 
> do the partitions.
> 
> Regards,
> Gourav
> 
>> On Mon, May 9, 2016 at 7:12 PM, Mail.com  wrote:
>> Hi,
>> 
>> I have to write tab delimited file and need to have one directory for each 
>> unique value of a column.
>> 
>> I tried using spark-csv with partitionBy and seems it is not supported. Is 
>> there any other option available for doing this?
>> 
>> Regards,
>> Pradeep
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
> 


Re:Re: Re: spark uploading resource error

2016-05-10 Thread 朱旻
thanks!
i solved the problem.
spark-submit changed the HADOOP_CONF_DIR to spark/conf and was corrent
but using java *...  didn't change the HADOOP_CONF_DIR. it still use 
hadoop/etc/hadoop.






At 2016-05-10 16:39:47, "Saisai Shao"  wrote:

The code is in Client.scala under yarn sub-module (see the below link). Maybe 
you need to check the vendor version about their changes to the Apache Spark 
code.


https://github.com/apache/spark/blob/branch-1.3/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala



Thanks
Saisai


On Tue, May 10, 2016 at 4:17 PM, 朱旻  wrote:




it was a product sold by huawei . name is FusionInsight. it says spark was 1.3 
with hadoop 2.7.1


where can i find the code or config file which define the files to be uploaded?



At 2016-05-10 16:06:05, "Saisai Shao"  wrote:

What is the version of Spark are you using? From my understanding, there's no 
code in yarn#client will upload "__hadoop_conf__" into distributed cache.






On Tue, May 10, 2016 at 3:51 PM, 朱旻  wrote:

hi all:
I found a problem using spark .
WHEN I use spark-submit to launch a task. it works


spark-submit --num-executors 8 --executor-memory 8G --class 
com.icbc.nss.spark.PfsjnlSplit  --master yarn-cluster 
/home/nssbatch/nss_schedual/jar/SparkBigtableJoinSqlJava.jar 
/user/nss/nss-20151018-pfsjnl-004_024.txt /user/nss/output_join 
/user/nss/output_join2


but when i use the command created by spark-class  as below


/home/nssbatch/huaweiclient/hadoopclient/JDK/jdk/bin/java 
-Djava.security.krb5.conf=/home/nssbatch/huaweiclient/hadoopclient/KrbClient/kerberos/var/krb5kdc/krb5.conf
 -Dzookeeper.server.principal=zookeeper/hadoop.hadoop.com 
-Djava.security.auth.login.config=/home/nssbatch/huaweiclient/hadoopclient/Spark/adapter/client/controller/jaas.conf
 
-Dzookeeper.kinit=/home/nssbatch/huaweiclient/hadoopclient/KrbClient/kerberos/bin/kinit
 -cp 
/home/nssbatch/huaweiclient/hadoopclient/Spark/spark/conf/:/home/nssbatch/huaweiclient/hadoopclient/Spark/spark/lib/spark-assembly-1.3.0-hadoop2.7.1.jar:/home/nssbatch/huaweiclient/hadoopclient/Spark/spark/lib/datanucleus-core-3.2.10.jar:/home/nssbatch/huaweiclient/hadoopclient/Spark/spark/lib/datanucleus-rdbms-3.2.9.jar:/home/nssbatch/huaweiclient/hadoopclient/Spark/spark/lib/datanucleus-api-jdo-3.2.6.jar:/home/nssbatch/huaweiclient/hadoopclient/Spark/spark/conf/:/home/nssbatch/huaweiclient/hadoopclient/Yarn/config/
 org.apache.spark.deploy.SparkSubmit --master yarn-cluster --class 
com.icbc.nss.spark.PfsjnlSplit --num-executors 8 --executor-memory 8G 
/home/nssbatch/nss_schedual/jar/SparkBigtableJoinSqlJava.jar 
/user/nss/nss-20151018-pfsjnl-004_024.txt /user/nss/output_join 
/user/nss/output_join2


it didn't work. 
i compare the log.and found that: 


16/05/10 22:23:24 INFO Client: Uploading resource 
file:/tmp/spark-a4457754-7183-44ce-bd0d-32a071757c92/__hadoop_conf__4372868703234608846.zip
 -> 
hdfs://hacluster/user/nss/.sparkStaging/application_1462442311990_0057/__hadoop_conf__4372868703234608846.zip
  


the conf_file uploaded into hdfs was different.


why is this happened?
where can i find the resource file to be uploading?




















 







 




Re: Re: spark uploading resource error

2016-05-10 Thread Saisai Shao
The code is in Client.scala under yarn sub-module (see the below link).
Maybe you need to check the vendor version about their changes to the
Apache Spark code.

https://github.com/apache/spark/blob/branch-1.3/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

Thanks
Saisai

On Tue, May 10, 2016 at 4:17 PM, 朱旻  wrote:

>
>
> it was a product sold by huawei . name is FusionInsight. it says spark was
> 1.3 with hadoop 2.7.1
>
> where can i find the code or config file which define the files to be
> uploaded?
>
>
> At 2016-05-10 16:06:05, "Saisai Shao"  wrote:
>
> What is the version of Spark are you using? From my understanding, there's
> no code in yarn#client will upload "__hadoop_conf__" into distributed cache.
>
>
>
> On Tue, May 10, 2016 at 3:51 PM, 朱旻  wrote:
>
>> hi all:
>> I found a problem using spark .
>> WHEN I use spark-submit to launch a task. it works
>>
>> *spark-submit --num-executors 8 --executor-memory 8G --class
>> com.icbc.nss.spark.PfsjnlSplit  --master yarn-cluster
>> /home/nssbatch/nss_schedual/jar/SparkBigtableJoinSqlJava.jar
>> /user/nss/nss-20151018-pfsjnl-004_024.txt /user/nss/output_join
>> /user/nss/output_join2*
>>
>> but when i use the command created by spark-class  as below
>>
>> */home/nssbatch/huaweiclient/hadoopclient/JDK/jdk/bin/java
>> -Djava.security.krb5.conf=/home/nssbatch/huaweiclient/hadoopclient/KrbClient/kerberos/var/krb5kdc/krb5.conf
>> -Dzookeeper.server.principal=zookeeper/hadoop.hadoop.com
>> 
>> -Djava.security.auth.login.config=/home/nssbatch/huaweiclient/hadoopclient/Spark/adapter/client/controller/jaas.conf
>> -Dzookeeper.kinit=/home/nssbatch/huaweiclient/hadoopclient/KrbClient/kerberos/bin/kinit
>> -cp
>> /home/nssbatch/huaweiclient/hadoopclient/Spark/spark/conf/:/home/nssbatch/huaweiclient/hadoopclient/Spark/spark/lib/spark-assembly-1.3.0-hadoop2.7.1.jar:/home/nssbatch/huaweiclient/hadoopclient/Spark/spark/lib/datanucleus-core-3.2.10.jar:/home/nssbatch/huaweiclient/hadoopclient/Spark/spark/lib/datanucleus-rdbms-3.2.9.jar:/home/nssbatch/huaweiclient/hadoopclient/Spark/spark/lib/datanucleus-api-jdo-3.2.6.jar:/home/nssbatch/huaweiclient/hadoopclient/Spark/spark/conf/:/home/nssbatch/huaweiclient/hadoopclient/Yarn/config/
>> org.apache.spark.deploy.SparkSubmit --master yarn-cluster --class
>> com.icbc.nss.spark.PfsjnlSplit --num-executors 8 --executor-memory 8G
>> /home/nssbatch/nss_schedual/jar/SparkBigtableJoinSqlJava.jar
>> /user/nss/nss-20151018-pfsjnl-004_024.txt /user/nss/output_join
>> /user/nss/output_join2*
>>
>> it didn't work.
>> i compare the log.and found that:
>>
>> 16/05/10 22:23:24 INFO Client: Uploading resource
>> file:/tmp/spark-a4457754-7183-44ce-bd0d-32a071757c92/__hadoop_conf__4372868703234608846.zip
>> ->
>> hdfs://hacluster/user/nss/.sparkStaging/application_1462442311990_0057/__hadoop_conf__4372868703234608846.zip
>>
>>
>> the conf_file uploaded into hdfs was different.
>>
>> why is this happened?
>> where can i find the resource file to be uploading?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>
>
>
>
>


Re: SparkSQL with large result size

2016-05-10 Thread Christophe Préaud
Hi,

You may be hitting this bug: 
SPARK-9879

In other words: did you try without the LIMIT clause?

Regards,
Christophe.

On 02/05/16 20:02, Gourav Sengupta wrote:
Hi,

I have worked on 300GB data by querying it  from CSV (using SPARK CSV)  and 
writing it to Parquet format and then querying parquet format to query it and 
partition the data and write out individual csv files without any issues on a 
single node SPARK cluster installation.

Are you trying to cache in the entire data? What is that you are trying to 
achieve in your used case?

Regards,
Gourav

On Mon, May 2, 2016 at 5:59 PM, Ted Yu 
mailto:yuzhih...@gmail.com>> wrote:
That's my interpretation.

On Mon, May 2, 2016 at 9:45 AM, Buntu Dev 
<buntu...@gmail.com> 
wrote:
Thanks Ted, I thought the avg. block size was already low and less than the 
usual 128mb. If I need to reduce it further via parquet.block.size, it would 
mean an increase in the number of blocks and that should increase the number of 
tasks/executors. Is that the correct way to interpret this?

On Mon, May 2, 2016 at 6:21 AM, Ted Yu 
<yuzhih...@gmail.com> 
wrote:
Please consider decreasing block size.

Thanks

> On May 1, 2016, at 9:19 PM, Buntu Dev 
> <buntu...@gmail.com> 
> wrote:
>
> I got a 10g limitation on the executors and operating on parquet dataset with 
> block size 70M with 200 blocks. I keep hitting the memory limits when doing a 
> 'select * from t1 order by c1 limit 100' (ie, 1M). It works if I limit to 
> say 100k. What are the options to save a large dataset without running into 
> memory issues?
>
> Thanks!






Kelkoo SAS
Société par Actions Simplifiée
Au capital de € 4.168.964,30
Siège social : 158 Ter Rue du Temple 75003 Paris
425 093 069 RCS Paris

Ce message et les pièces jointes sont confidentiels et établis à l'attention 
exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce 
message, merci de le détruire et d'en avertir l'expéditeur.


Re:Re: spark uploading resource error

2016-05-10 Thread 朱旻



it was a product sold by huawei . name is FusionInsight. it says spark was 1.3 
with hadoop 2.7.1


where can i find the code or config file which define the files to be uploaded?



At 2016-05-10 16:06:05, "Saisai Shao"  wrote:

What is the version of Spark are you using? From my understanding, there's no 
code in yarn#client will upload "__hadoop_conf__" into distributed cache.






On Tue, May 10, 2016 at 3:51 PM, 朱旻  wrote:

hi all:
I found a problem using spark .
WHEN I use spark-submit to launch a task. it works


spark-submit --num-executors 8 --executor-memory 8G --class 
com.icbc.nss.spark.PfsjnlSplit  --master yarn-cluster 
/home/nssbatch/nss_schedual/jar/SparkBigtableJoinSqlJava.jar 
/user/nss/nss-20151018-pfsjnl-004_024.txt /user/nss/output_join 
/user/nss/output_join2


but when i use the command created by spark-class  as below


/home/nssbatch/huaweiclient/hadoopclient/JDK/jdk/bin/java 
-Djava.security.krb5.conf=/home/nssbatch/huaweiclient/hadoopclient/KrbClient/kerberos/var/krb5kdc/krb5.conf
 -Dzookeeper.server.principal=zookeeper/hadoop.hadoop.com 
-Djava.security.auth.login.config=/home/nssbatch/huaweiclient/hadoopclient/Spark/adapter/client/controller/jaas.conf
 
-Dzookeeper.kinit=/home/nssbatch/huaweiclient/hadoopclient/KrbClient/kerberos/bin/kinit
 -cp 
/home/nssbatch/huaweiclient/hadoopclient/Spark/spark/conf/:/home/nssbatch/huaweiclient/hadoopclient/Spark/spark/lib/spark-assembly-1.3.0-hadoop2.7.1.jar:/home/nssbatch/huaweiclient/hadoopclient/Spark/spark/lib/datanucleus-core-3.2.10.jar:/home/nssbatch/huaweiclient/hadoopclient/Spark/spark/lib/datanucleus-rdbms-3.2.9.jar:/home/nssbatch/huaweiclient/hadoopclient/Spark/spark/lib/datanucleus-api-jdo-3.2.6.jar:/home/nssbatch/huaweiclient/hadoopclient/Spark/spark/conf/:/home/nssbatch/huaweiclient/hadoopclient/Yarn/config/
 org.apache.spark.deploy.SparkSubmit --master yarn-cluster --class 
com.icbc.nss.spark.PfsjnlSplit --num-executors 8 --executor-memory 8G 
/home/nssbatch/nss_schedual/jar/SparkBigtableJoinSqlJava.jar 
/user/nss/nss-20151018-pfsjnl-004_024.txt /user/nss/output_join 
/user/nss/output_join2


it didn't work. 
i compare the log.and found that: 


16/05/10 22:23:24 INFO Client: Uploading resource 
file:/tmp/spark-a4457754-7183-44ce-bd0d-32a071757c92/__hadoop_conf__4372868703234608846.zip
 -> 
hdfs://hacluster/user/nss/.sparkStaging/application_1462442311990_0057/__hadoop_conf__4372868703234608846.zip
  


the conf_file uploaded into hdfs was different.


why is this happened?
where can i find the resource file to be uploading?




















 




Re: spark uploading resource error

2016-05-10 Thread Saisai Shao
What is the version of Spark are you using? From my understanding, there's
no code in yarn#client will upload "__hadoop_conf__" into distributed cache.



On Tue, May 10, 2016 at 3:51 PM, 朱旻  wrote:

> hi all:
> I found a problem using spark .
> WHEN I use spark-submit to launch a task. it works
>
> *spark-submit --num-executors 8 --executor-memory 8G --class
> com.icbc.nss.spark.PfsjnlSplit  --master yarn-cluster
> /home/nssbatch/nss_schedual/jar/SparkBigtableJoinSqlJava.jar
> /user/nss/nss-20151018-pfsjnl-004_024.txt /user/nss/output_join
> /user/nss/output_join2*
>
> but when i use the command created by spark-class  as below
>
> */home/nssbatch/huaweiclient/hadoopclient/JDK/jdk/bin/java
> -Djava.security.krb5.conf=/home/nssbatch/huaweiclient/hadoopclient/KrbClient/kerberos/var/krb5kdc/krb5.conf
> -Dzookeeper.server.principal=zookeeper/hadoop.hadoop.com
> 
> -Djava.security.auth.login.config=/home/nssbatch/huaweiclient/hadoopclient/Spark/adapter/client/controller/jaas.conf
> -Dzookeeper.kinit=/home/nssbatch/huaweiclient/hadoopclient/KrbClient/kerberos/bin/kinit
> -cp
> /home/nssbatch/huaweiclient/hadoopclient/Spark/spark/conf/:/home/nssbatch/huaweiclient/hadoopclient/Spark/spark/lib/spark-assembly-1.3.0-hadoop2.7.1.jar:/home/nssbatch/huaweiclient/hadoopclient/Spark/spark/lib/datanucleus-core-3.2.10.jar:/home/nssbatch/huaweiclient/hadoopclient/Spark/spark/lib/datanucleus-rdbms-3.2.9.jar:/home/nssbatch/huaweiclient/hadoopclient/Spark/spark/lib/datanucleus-api-jdo-3.2.6.jar:/home/nssbatch/huaweiclient/hadoopclient/Spark/spark/conf/:/home/nssbatch/huaweiclient/hadoopclient/Yarn/config/
> org.apache.spark.deploy.SparkSubmit --master yarn-cluster --class
> com.icbc.nss.spark.PfsjnlSplit --num-executors 8 --executor-memory 8G
> /home/nssbatch/nss_schedual/jar/SparkBigtableJoinSqlJava.jar
> /user/nss/nss-20151018-pfsjnl-004_024.txt /user/nss/output_join
> /user/nss/output_join2*
>
> it didn't work.
> i compare the log.and found that:
>
> 16/05/10 22:23:24 INFO Client: Uploading resource
> file:/tmp/spark-a4457754-7183-44ce-bd0d-32a071757c92/__hadoop_conf__4372868703234608846.zip
> ->
> hdfs://hacluster/user/nss/.sparkStaging/application_1462442311990_0057/__hadoop_conf__4372868703234608846.zip
>
>
> the conf_file uploaded into hdfs was different.
>
> why is this happened?
> where can i find the resource file to be uploading?
>
>
>
>
>
>
>
>
>
>
>
>


spark uploading resource error

2016-05-10 Thread 朱旻
hi all:
I found a problem using spark .
WHEN I use spark-submit to launch a task. it works


spark-submit --num-executors 8 --executor-memory 8G --class 
com.icbc.nss.spark.PfsjnlSplit  --master yarn-cluster 
/home/nssbatch/nss_schedual/jar/SparkBigtableJoinSqlJava.jar 
/user/nss/nss-20151018-pfsjnl-004_024.txt /user/nss/output_join 
/user/nss/output_join2


but when i use the command created by spark-class  as below


/home/nssbatch/huaweiclient/hadoopclient/JDK/jdk/bin/java 
-Djava.security.krb5.conf=/home/nssbatch/huaweiclient/hadoopclient/KrbClient/kerberos/var/krb5kdc/krb5.conf
 -Dzookeeper.server.principal=zookeeper/hadoop.hadoop.com 
-Djava.security.auth.login.config=/home/nssbatch/huaweiclient/hadoopclient/Spark/adapter/client/controller/jaas.conf
 
-Dzookeeper.kinit=/home/nssbatch/huaweiclient/hadoopclient/KrbClient/kerberos/bin/kinit
 -cp 
/home/nssbatch/huaweiclient/hadoopclient/Spark/spark/conf/:/home/nssbatch/huaweiclient/hadoopclient/Spark/spark/lib/spark-assembly-1.3.0-hadoop2.7.1.jar:/home/nssbatch/huaweiclient/hadoopclient/Spark/spark/lib/datanucleus-core-3.2.10.jar:/home/nssbatch/huaweiclient/hadoopclient/Spark/spark/lib/datanucleus-rdbms-3.2.9.jar:/home/nssbatch/huaweiclient/hadoopclient/Spark/spark/lib/datanucleus-api-jdo-3.2.6.jar:/home/nssbatch/huaweiclient/hadoopclient/Spark/spark/conf/:/home/nssbatch/huaweiclient/hadoopclient/Yarn/config/
 org.apache.spark.deploy.SparkSubmit --master yarn-cluster --class 
com.icbc.nss.spark.PfsjnlSplit --num-executors 8 --executor-memory 8G 
/home/nssbatch/nss_schedual/jar/SparkBigtableJoinSqlJava.jar 
/user/nss/nss-20151018-pfsjnl-004_024.txt /user/nss/output_join 
/user/nss/output_join2


it didn't work. 
i compare the log.and found that: 


16/05/10 22:23:24 INFO Client: Uploading resource 
file:/tmp/spark-a4457754-7183-44ce-bd0d-32a071757c92/__hadoop_conf__4372868703234608846.zip
 -> 
hdfs://hacluster/user/nss/.sparkStaging/application_1462442311990_0057/__hadoop_conf__4372868703234608846.zip
  


the conf_file uploaded into hdfs was different.


why is this happened?
where can i find the resource file to be uploading?

















Re: sqlCtx.read.parquet yields lots of small tasks

2016-05-10 Thread Johnny W.
Thanks, Ashish. I've created a JIRA:
https://issues.apache.org/jira/browse/SPARK-15247

Best,
J.

On Sun, May 8, 2016 at 7:07 PM, Ashish Dubey  wrote:

> I see the behavior - so it always goes with min total tasks possible on
> your settings ( num-executors * num-cores ) - however if you use a huge
> amount of data then you will see more tasks - that means it has some kind
> of lower bound on num-tasks.. It may require some digging. other formats
> did not seem to have this issue.
>
> On Sun, May 8, 2016 at 12:10 AM, Johnny W.  wrote:
>
>> The file size is very small (< 1M). The stage launches every time i call:
>> --
>> sqlContext.read.parquet(path_to_file)
>>
>> These are the parquet specific configurations I set:
>> --
>> spark.sql.parquet.filterPushdown: true
>> spark.sql.parquet.mergeSchema: true
>>
>> Thanks,
>> J.
>>
>> On Sat, May 7, 2016 at 4:20 PM, Ashish Dubey 
>> wrote:
>>
>>> How big is your file and can you also share the code snippet
>>>
>>>
>>> On Saturday, May 7, 2016, Johnny W.  wrote:
>>>
 hi spark-user,

 I am using Spark 1.6.0. When I call sqlCtx.read.parquet to create a
 dataframe from a parquet data source with a single parquet file, it yields
 a stage with lots of small tasks. It seems the number of tasks depends on
 how many executors I have instead of how many parquet files/partitions I
 have. Actually, it launches 5 tasks on each executor.

 This behavior is quite strange, and may have potential issue if there
 is a slow executor. What is this "parquet" stage for? and why it launches 5
 tasks on each executor?

 Thanks,
 J.

>>>
>>
>