Re: Dynamic partition pruning

2015-10-16 Thread Michael Armbrust
We don't support dynamic partition pruning yet.

On Fri, Oct 16, 2015 at 10:20 AM, Younes Naguib <
younes.nag...@tritondigital.com> wrote:

> Hi all
>
>
>
> I’m running sqls on spark 1.5.1 and using tables based on parquets.
>
> My tables are not pruned when joined on partition columns.
>
> Ex:
>
> Select  from tab where partcol=1 will prune on value 1
>
> Select  from tab join dim on (dim.partcol=tab.partcol) where
> dim.partcol=1 will scan all partitions.
>
>
>
> Any ideas or workarounds?
>
>
>
>
>
> *Thanks,*
>
> *Younes*
>
>
>


PySpark + Streaming + DataFrames

2015-10-16 Thread Jason White
I'm trying to create a DStream of DataFrames using PySpark. I receive data
from Kafka in the form of a JSON string, and I'm parsing these RDDs of
Strings into DataFrames.

My code is:


I get the following error at pyspark/streaming/util.py, line 64:


I've verified that the sqlContext is properly creating a DataFrame. The
issue is in the return value in the callback. Am I doing something wrong in
the DStream transform? I suspect it may be a problem in the DStream
implementation, given that it's expecting a `_jrdd` attribute.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Streaming-DataFrames-tp25095.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Turn off logs in spark-sql shell

2015-10-16 Thread Jakob Odersky
[repost to mailing list, ok I gotta really start hitting that
reply-to-all-button]

Hi,
Spark uses Log4j which unfortunately does not support fine-grained
configuration over the command line. Therefore some configuration file
editing will have to be done (unless you want to configure Loggers
programatically, which however would require editing spark-sql).
Nevertheless, there seems to be a kind of "trick" where you can substitute
java environment variables in the log4j configuration file. See this
stackoverflow answer for details http://stackoverflow.com/a/31208461/917519.
After editing the properties file, you can then start spark-sql with:

bin/spark-sql --conf
"spark.driver.extraJavaOptions=-Dmy.logger.threshold=OFF"

this is untested but I hop it helps,
--Jakob

On 15 October 2015 at 22:56, Muhammad Ahsan 
wrote:

> Hello Everyone!
>
> I want to know how to turn off logging during starting *spark-sql shell*
> without changing log4j configuration files. In normal spark-shell I can use
> the following commands
>
> import org.apache.log4j.Loggerimport org.apache.log4j.Level
> Logger.getLogger("org").setLevel(Level.OFF)Logger.getLogger("akka").setLevel(Level.OFF)
>
>
> Thanks
>
> --
> Thanks
>
> Muhammad Ahsan
>
>


Dynamic partition pruning

2015-10-16 Thread Younes Naguib
Hi all

I'm running sqls on spark 1.5.1 and using tables based on parquets.
My tables are not pruned when joined on partition columns.
Ex:
Select  from tab where partcol=1 will prune on value 1
Select  from tab join dim on (dim.partcol=tab.partcol) where dim.partcol=1 
will scan all partitions.

Any ideas or workarounds?


Thanks,
Younes



Question of RDD in calculation

2015-10-16 Thread Shepherd
Hi all,I am new in Spark, and I have a question in dealing with RDD.I’ve
converted RDD to DataFrame. So there are two DF: DF1 and DF2DF1 contains:
userID, time, dataUsage, durationDF2 contains: userIDEach userID has
multiple rows in DF1.DF2 has distinct userID, and I would like to compute
the average, max and min value of both dataUsage and duration for each
userID in DF1?And store the results in a new dataframe.How can I do
that?Thanks a lot.BestFrank



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Question-of-RDD-in-calculation-tp25100.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: PySpark + Streaming + DataFrames

2015-10-16 Thread Jason White
Hi Ken, thanks for replying.

Unless I'm misunderstanding something, I don't believe that's correct.
Dstream.transform() accepts a single argument, func. func should be a
function that accepts a single RDD, and returns a single RDD. That's what
transform_to_df does, except the RDD it returns is a DF.

I've used Dstream.transform() successfully in the past when transforming
RDDs, so I don't think my problem is there.

I haven't tried this in Scala yet, and all of the examples I've seen on the
website seem to use foreach instead of transform. Does this approach work in
Scala?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Streaming-DataFrames-tp25095p25099.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Problems w/YARN Spark Streaming app reading from Kafka

2015-10-16 Thread Robert Towne
I have a Spark Streaming app that reads using a reciever-less connection ( 
KafkaUtils.createDirectStream) with an interval of 1 minute.
For about 15 hours it was running fine, ranging in input size of 3,861,758 to 
16,836 events.

Then about 3 hours ago, every minute batch brought in the same number of 
records = 5,760 (2 topics, topic 1 = 64 partitions, topic 2 = 32 partitions).

I know there is more data than the 5,760 records that being piped in, and 
eventually we’ll fall so far behind that our kafka offsets will not be 
available.
It seems odd that 5760/96 (partitions) = 60 – or my interval in seconds.

I do have spark.streaming.backpressure.enabled = true and even though the 
current documentation states it isn’t used I have a value set for 
spark.streaming.kafka.maxRatePerPartition.



Has anyone else seen this issue where the rate seems capped even though it 
should be pulling more data?

Thanks,
Robert


Problem of RDD in calculation

2015-10-16 Thread ChengBo
Hi all,

I am new in Spark, and I have a question in dealing with RDD.
I've converted RDD to DataFrame. So there are two DF: DF1 and DF2
DF1 contains: userID, time, dataUsage, duration
DF2 contains: userID

Each userID has multiple rows in DF1.
DF2 has distinct userID, and I would like to compute the average, max and min 
value of both dataUsage and duration for each userID in DF1?
And store the results in a new dataframe.
How can I do that?
Thanks a lot.

Best
Frank


Re: Convert SchemaRDD to RDD

2015-10-16 Thread Ted Yu
bq. type mismatch found String required Serializable

See line 110:
http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/6-b14/java/lang/String.java#109

Can you pastebin the complete stack trace for the error you encountered ?

Cheers

On Fri, Oct 16, 2015 at 8:01 AM, satish chandra j 
wrote:

> HI Ted,
> I have implemented the below snipped but getting an error"type mismatch
> found String required Serializable" as mentioned in mail chain
>
> class MyRecord(val val1: String, val val2: String, ... more then 22,
> in this case f.e. 26)
>   extends Product with Serializable {
>
>   def canEqual(that: Any): Boolean = that.isInstanceOf[MyRecord]
>
>   def productArity: Int = 26 // example value, it is amount of arguments
>
>   def productElement(n: Int): Serializable = n match {
> case  1 => val1
> case  2 => val2
> //... cases up to 26
>   }
> }
> ​
> hence expecting an approach to convert SchemaRDD to RDD without using
> Tuple or Case Class as we have restrictions in Scala 2.10
>
> Regards
> Satish Chandra
>


Re: Problems w/YARN Spark Streaming app reading from Kafka

2015-10-16 Thread Cody Koeninger
What do you mean by "the current documentation states it isn’t used"?
http://spark.apache.org/docs/latest/configuration.html  still lists the
value and its meaning.

As far as the issue you're seeing, are you measuring records by looking at
logs, the spark ui, or actual downstream sinks of data?  I don't think the
backpressure code does any logging, but KafkaRDD will log at info level the
offsets for each topicpartition that is computed (message starts with
"Computing topic")




On Fri, Oct 16, 2015 at 1:52 PM, Robert Towne 
wrote:

> I have a Spark Streaming app that reads using a reciever-less connection
> ( KafkaUtils.createDirectStream) with an interval of 1 minute.
> For about 15 hours it was running fine, ranging in input size of 3,861,758
> to 16,836 events.
>
> Then about 3 hours ago, every minute batch brought in the same number of
> records = 5,760 (2 topics, topic 1 = 64 partitions, topic 2 = 32
> partitions).
>
> I know there is more data than the 5,760 records that being piped in, and
> eventually we’ll fall so far behind that our kafka offsets will not be
> available.
> It seems odd that 5760/96 (partitions) = 60 – or my interval in seconds.
>
> I do have spark.streaming.backpressure.enabled = true and even though the
> current documentation states it isn’t used I have a value set
> for spark.streaming.kafka.maxRatePerPartition.
>
>
>
> Has anyone else seen this issue where the rate seems capped even though it
> should be pulling more data?
>
> Thanks,
> Robert
>


RE: Dynamic partition pruning

2015-10-16 Thread Younes Naguib
Thanks,
Do you have a Jira I can follow for this?

y

From: Michael Armbrust [mailto:mich...@databricks.com]
Sent: October-16-15 2:18 PM
To: Younes Naguib
Cc: user@spark.apache.org
Subject: Re: Dynamic partition pruning

We don't support dynamic partition pruning yet.

On Fri, Oct 16, 2015 at 10:20 AM, Younes Naguib 
> wrote:
Hi all

I’m running sqls on spark 1.5.1 and using tables based on parquets.
My tables are not pruned when joined on partition columns.
Ex:
Select  from tab where partcol=1 will prune on value 1
Select  from tab join dim on (dim.partcol=tab.partcol) where dim.partcol=1 
will scan all partitions.

Any ideas or workarounds?


Thanks,
Younes




RE: Spark SQL running totals

2015-10-16 Thread Stefan Panayotov
Thanks Deenar.
This works perfectly.
I can't test the solution with window functions because I am still on Spark 
1.3.1
Hopefully will move to 1.5 soon.

Stefan Panayotov
Sent from my Windows Phone

From: Deenar Toraskar
Sent: ‎10/‎15/‎2015 2:35 PM
To: Stefan Panayotov
Cc: user@spark.apache.org
Subject: Re: Spark SQL running totals

you can do a self join of the table with itself with the join clause being
a.col1 >= b.col1

select a.col1, a.col2, sum(b.col2)
from tablea as a left outer join tablea as b on (a.col1 >= b.col1)
group by a.col1, a.col2

I havent tried it, but cant see why it cant work, but doing it in RDD might
be more efficient see
https://bzhangusc.wordpress.com/2014/06/21/calculate-running-sums/

On 15 October 2015 at 18:48, Stefan Panayotov  wrote:

> Hi,
>
> I need help with Spark SQL. I need to achieve something like the following.
> If I have data like:
>
> col_1  col_2
> 1 10
> 2 30
> 3 15
> 4 20
> 5 25
>
> I need to get col_3 to be the running total of the sum of the previous
> rows of col_2, e.g.
>
> col_1  col_2  col_3
> 1 1010
> 2 3040
> 3 1555
> 4 2075
> 5 25100
>
> Is there a way to achieve this in Spark SQL or maybe with Data frame
> transformations?
>
> Thanks in advance,
>
>
> *Stefan Panayotov, PhD **Home*: 610-355-0919
> *Cell*: 610-517-5586
> *email*: spanayo...@msn.com
> spanayo...@outlook.com
> spanayo...@comcast.net
>
>


Clustering KMeans error in 1.5.1

2015-10-16 Thread robin_up
We upgraded from 1.4.0 to 1.5.1 (skipped 1.5.0) and one of our clustering job
hit the below error. Does anyone know what this is about or if it is a bug?


stdout4260Traceback (most recent call last):
  File "user_clustering.py", line 137, in 
uig_model = KMeans.train(uigs,i,nIter, runs = nRuns)
  File
"/mnt/yarn/nm/usercache/ds/appcache/application_1444086959272_0238/container_1444086959272_0238_01_01/pyspark.zip/pyspark/mllib/clustering.py",
line 150, in train
  File
"/mnt/yarn/nm/usercache/ds/appcache/application_1444086959272_0238/container_1444086959272_0238_01_01/pyspark.zip/pyspark/mllib/common.py",
line 130, in callMLlibFunc
  File
"/mnt/yarn/nm/usercache/ds/appcache/application_1444086959272_0238/container_1444086959272_0238_01_01/pyspark.zip/pyspark/mllib/common.py",
line 123, in callJavaFunc
  File
"/mnt/yarn/nm/usercache/ds/appcache/application_1444086959272_0238/container_1444086959272_0238_01_01/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
line 538, in __call__
  File
"/mnt/yarn/nm/usercache/ds/appcache/application_1444086959272_0238/container_1444086959272_0238_01_01/pyspark.zip/pyspark/sql/utils.py",
line 36, in deco
  File
"/mnt/yarn/nm/usercache/ds/appcache/application_1444086959272_0238/container_1444086959272_0238_01_01/py4j-0.8.2.1-src.zip/py4j/protocol.py",
line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling
o220.trainKMeansModel.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 73
in stage 14.0 failed 4 times, most recent failure: Lost task 73.3 in stage
14.0 (TID 1357, hadoop-sandbox-dn07): ExecutorLostFailure (executor 13 lost)
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1835)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1848)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1919)
at org.apache.spark.rdd.RDD.count(RDD.scala:1121)
at org.apache.spark.rdd.RDD.takeSample(RDD.scala:485)
at
org.apache.spark.mllib.clustering.KMeans.initKMeansParallel(KMeans.scala:376)
at 
org.apache.spark.mllib.clustering.KMeans.runAlgorithm(KMeans.scala:249)
at org.apache.spark.mllib.clustering.KMeans.run(KMeans.scala:213)
at
org.apache.spark.mllib.api.python.PythonMLLibAPI.trainKMeansModel(PythonMLLibAPI.scala:341)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)



-
-- Robin Li
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Clustering-KMeans-error-in-1-5-1-tp25101.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 

How to speed up reading from file?

2015-10-16 Thread Saif.A.Ellafi
Hello,

Is there an optimal number of partitions per number of rows, when writing into 
disk, so we can re-read later from source in a distributed way?
Any  thoughts?

Thanks
Saif



In-memory computing and cache() in Spark

2015-10-16 Thread Jia Zhan
Hi all,

I am running Spark locally in one node and trying to sweep the memory size
for performance tuning. The machine has 8 CPUs and 16G main memory, the
dataset in my local disk is about 10GB. I have several quick questions and
appreciate any comments.

1. Spark performs in-memory computing, but without using RDD.cache(), will
anything be cached in memory at all? My guess is that, without RDD.cache(),
only a small amount of data will be stored in OS buffer cache, and every
iteration of computation will still need to fetch most data from disk every
time, is that right?

2. To evaluate how caching helps with iterative computation, I wrote a
simple program as shown below, which basically consists of one saveAsText()
and three reduce() actions/stages. I specify "spark.driver.memory" to
"15g", others by default. Then I run three experiments.

*   val* *conf* = *new* *SparkConf*().setAppName(*"wordCount"*)

   *val* *sc* = *new* *SparkContext*(conf)

   *val* *input* = sc.textFile(*"/InputFiles"*)

  *val* *words* = input.flatMap(line *=>* line.split(*" "*)).map(word
*=>* (word, *1*)).reduceByKey(_+_).saveAsTextFile(*"/OutputFiles"*)

  *val* *ITERATIONS* = *3*

  *for* (i *<-* *1* to *ITERATIONS*) {

  *val* *totallength* = input.filter(line*=>*line.contains(*"the"*
)).map(s*=>*s.length).reduce((a,b)*=>*a+b)

  }

(I) The first run: no caching at all. The application finishes in ~12
minutes (2.6min+3.3min+3.2min+3.3min)

(II) The second run, I modified the code so that the input will be cached:
 *val input = sc.textFile("/InputFiles").cache()*
 The application finishes in ~11 mins!! (5.4min+1.9min+1.9min+2.0min)!
 The storage page in Web UI shows 48% of the dataset  is cached, which
makes sense due to large java object overhead, and
spark.storage.memoryFraction is 0.6 by default.

(III) However, the third run, same program as the second one, but I changed
"spark.driver.memory" to be "2g".
   The application finishes in just 3.6 minutes (3.0min + 9s + 9s + 9s)!!
And UI shows 6% of the data is cached.

*From the results we can see the reduce stages finish in seconds, how could
that happen with only 6% cached? Can anyone explain?*

I am new to Spark and would appreciate any help on this. Thanks!

Jia


Streaming of COAP Resources

2015-10-16 Thread Sadaf
I am currently working on IOT Coap protocol.I accessed server on local host
through copper firefox plugin. Then i Added resouce having "GET"
functionality in server. After that i made its client as a streaming source.
Here is the code of client streaming

 class customReceiver(test:String) extends 
Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging with
Serializable { 
   @volatile private var stopped = false
   override def onStart() {

  val client = new CoapClient("ip/resource")
  var text = client.get().getResponseText();  
  store(text)
   }
   override def onStop(): Unit = synchronized { 
  try
  {
 stopped = true
  }
  catch
  {
 case e: Exception => println("exception caught: " + e);
  }
   }
 }
but i am facing a problem. During streaming it just read a resource once.
after that it fetches all empty rdd and completes its batches. Meanwhile if
resource changes its value it doesn't read that. are i doing something
wrong? or is there exists any other functionality to read whenever resource
get changed that i can handle in my Custom receiver.? or any idea about how
to GET value continuously during streaming?

Any help is much awaited and appreciated. Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-of-COAP-Resources-tp25084.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Best practices to handle corrupted records

2015-10-16 Thread Erwan ALLAIN
Either[FailureResult[T], Either[SuccessWithWarnings[T], SuccessResult[T]]]
maybe ?


On Thu, Oct 15, 2015 at 5:31 PM, Antonio Murgia <
antonio.murg...@studio.unibo.it> wrote:

> 'Either' does not cover the case where the outcome was successful but
> generated warnings. I already looked into it and also at 'Try' from which I
> got inspired. Thanks for pointing it out anyway!
>
> #A.M.
>
> Il giorno 15 ott 2015, alle ore 16:19, Erwan ALLAIN <
> eallain.po...@gmail.com> ha scritto:
>
> What about http://www.scala-lang.org/api/2.9.3/scala/Either.html ?
>
>
> On Thu, Oct 15, 2015 at 2:57 PM, Roberto Congiu 
> wrote:
>
>> I came to a similar solution to a similar problem. I deal with a lot of
>> CSV files from many different sources and they are often malformed.
>> HOwever, I just have success/failure. Maybe you should  make
>> SuccessWithWarnings a subclass of success, or getting rid of it altogether
>> making the warnings optional.
>> I was thinking of making this cleaning/conforming library open source if
>> you're interested.
>>
>> R.
>>
>> 2015-10-15 5:28 GMT-07:00 Antonio Murgia > >:
>>
>>> Hello,
>>> I looked around on the web and I couldn’t find any way to deal in a
>>> structured way with malformed/faulty records during computation. All I was
>>> able to find was the flatMap/Some/None technique + logging.
>>> I’m facing this problem because I have a processing algorithm that
>>> extracts more than one value from each record, but can fail in extracting
>>> one of those multiple values, and I want to keep track of them. Logging is
>>> not feasible because this “warning” happens so frequently that the logs
>>> would become overwhelming and impossibile to read.
>>> Since I have 3 different possible outcomes from my processing I modeled
>>> it with this class hierarchy:
>>> That holds result and/or warnings.
>>> Since Result implements Traversable it can be used in a flatMap,
>>> discarding all warnings and failure results, in the other hand, if we want
>>> to keep track of warnings, we can elaborate them and output them if we need.
>>>
>>> Kind Regards
>>> #A.M.
>>>
>>
>>
>>
>> --
>> --
>> "Good judgment comes from experience.
>> Experience comes from bad judgment"
>> --
>>
>
>


Re: Get the previous state string in Spark streaming

2015-10-16 Thread Tathagata Das
Its hard to help without any stacktrace associated
with UnsupportedOperationException.

On Thu, Oct 15, 2015 at 10:40 PM, Chandra Mohan, Ananda Vel Murugan <
ananda.muru...@honeywell.com> wrote:

> One of my co-worker(Yogesh) was trying to get this posted in spark mailing
> and it seems it did not get posted. So I am reposting it here. Please help.
>
>
>
>
>
> Hi,
>
> I am new to Spark and was trying to do some experiments with it.
>
>
>
> I had a JavaPairDStream RDD.
>
> I want to get the list of string from its previous state. For that I use
> updateStateByKey function as follows:
>
>
>
> final Function2,
> Optional> updateFunc =
>
>new Function2,
>
> Optional>() {
>
>
>
> public Optional call(List arg0,
> Optional arg1) throws Exception {
>
> // TODO Auto-generated method stub
>
> if(arg1.toString()==null)
>
>return Optional.of(arg0);
>
> else {
>
>arg0.add(arg1.toString());
>
>return Optional.of(arg0);
>
> }
>
>}
>
> };
>
>
>
> I want the function to append the new list of string to the previous list
> and return the new list. But I am not able to do so. I am getting the "
> java.lang.UnsupportedOperationException" error.
>
> Can anyone which help me out in getting the desired output?
>
>
>


Re: Best practices to handle corrupted records

2015-10-16 Thread Ravindra
+1 Erwan..

May be a trivial solution like this -
class Result (msg: String, record: Record)

class Success (msgSuccess: String, val msg: String, val record: Record)
extends Result(msg, record)

class Failure (msgFailure: String, val msg: String, val record: Record)
extends Result (msg, record)

trait Warning {

}
class SuccessWithWarning(msgWaring: String, val msgSuccess:String,
override val msg: String, override val record: Record)
extends Success(msgSuccess, msg, record) with Warning


val record1 = new Record("k1", "val11", "val21")
val record2 = new Record("k2", "val12", "val22")
val record3 = new Record("k3", "val13", "val23")
val record4 = new Record("k4", "val14", "val24")
val records : List[Record] = List (record1, record2, record3, record4 )

def processRecord(record: Record) : Either[Result,Result] = {
//(record, new Result)
val result: Either[Result,Result] = {
if (record.key.equals("k1"))
Left(new Failure("failed", "result", record))
else
successHandler(record)
}
result
}

def successHandler (record: Record): Either[Result, Result] = {
val result: Either[Result, Result] = {
if (record.key.equals("k2"))
Left(new Success("success", "result", record))
else Right(new SuccessWithWarning("warning", "success", "result", record))
}
result
}

for(record <- records) {
println (processRecord(record))
}


On Fri, Oct 16, 2015 at 1:45 PM Erwan ALLAIN 
wrote:

> Either[FailureResult[T], Either[SuccessWithWarnings[T],
> SuccessResult[T]]]  maybe ?
>
>
> On Thu, Oct 15, 2015 at 5:31 PM, Antonio Murgia <
> antonio.murg...@studio.unibo.it> wrote:
>
>> 'Either' does not cover the case where the outcome was successful but
>> generated warnings. I already looked into it and also at 'Try' from which I
>> got inspired. Thanks for pointing it out anyway!
>>
>> #A.M.
>>
>> Il giorno 15 ott 2015, alle ore 16:19, Erwan ALLAIN <
>> eallain.po...@gmail.com> ha scritto:
>>
>> What about http://www.scala-lang.org/api/2.9.3/scala/Either.html ?
>>
>>
>> On Thu, Oct 15, 2015 at 2:57 PM, Roberto Congiu > > wrote:
>>
>>> I came to a similar solution to a similar problem. I deal with a lot of
>>> CSV files from many different sources and they are often malformed.
>>> HOwever, I just have success/failure. Maybe you should  make
>>> SuccessWithWarnings a subclass of success, or getting rid of it altogether
>>> making the warnings optional.
>>> I was thinking of making this cleaning/conforming library open source if
>>> you're interested.
>>>
>>> R.
>>>
>>> 2015-10-15 5:28 GMT-07:00 Antonio Murgia <
>>> antonio.murg...@studio.unibo.it>:
>>>
 Hello,
 I looked around on the web and I couldn’t find any way to deal in a
 structured way with malformed/faulty records during computation. All I was
 able to find was the flatMap/Some/None technique + logging.
 I’m facing this problem because I have a processing algorithm that
 extracts more than one value from each record, but can fail in extracting
 one of those multiple values, and I want to keep track of them. Logging is
 not feasible because this “warning” happens so frequently that the logs
 would become overwhelming and impossibile to read.
 Since I have 3 different possible outcomes from my processing I modeled
 it with this class hierarchy:
 That holds result and/or warnings.
 Since Result implements Traversable it can be used in a flatMap,
 discarding all warnings and failure results, in the other hand, if we want
 to keep track of warnings, we can elaborate them and output them if we 
 need.

 Kind Regards
 #A.M.

>>>
>>>
>>>
>>> --
>>> --
>>> "Good judgment comes from experience.
>>> Experience comes from bad judgment"
>>> --
>>>
>>
>>
>


Ensuring eager evaluation inside mapPartitions

2015-10-16 Thread alberskib
Hi all,

I am wondering whether there is way to ensure that two consecutive maps
inside mapPartition will not be chained together. 

To illustrate my question I prepared short example:

rdd.mapPartitions(it => {
it.map(x => foo(x)).map(y => y.getResult)
}

I would like to ensure that foo method will be applied to all records (from
partition) and only after that method getResult invoked on each record. It
could be beneficial in situation that foo method is some kind of time
consuming IO operation i.e. request to external service for data (data that
couldn't be prefetched).

I know that converting iterator into list will do the job but maybe there is
more clever way for doing it.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Ensuring-eager-evaluation-inside-mapPartitions-tp25085.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Get the previous state string in Spark streaming

2015-10-16 Thread Chandra Mohan, Ananda Vel Murugan
Hi,

Thanks for the response. We are trying to implement something similar as 
discussed in the following SFO post.

http://stackoverflow.com/questions/27535668/spark-streaming-groupbykey-and-updatestatebykey-implementation

We are doing it in java while accepted answer(second answer) in this post is in 
Scala.


We wrote our java code taking this scala code as reference. But we are getting 
exception in highlighted line i.e. in  return 
Optional.of(events.add(state.toString());); Specifically, it happens when we 
call events.add()



final Function2, Optional> 
updateFunc =

   new Function2,

Optional>() {



public Optional call(List events, 
Optional state) throws Exception {

// TODO Auto-generated method stub

if(state.toString()==null)

   return Optional.of(events);

else {

//UnsupportedOperationException here

   return Optional.of(events.add(state.toString()););

}

   }

};

Please let us know if you need more details. Unfortunately we are not in a 
position to share whole code.

Thanks



Regards,

Anand/Yogesh
From: Tathagata Das [mailto:t...@databricks.com]
Sent: Friday, October 16, 2015 1:22 PM
To: Chandra Mohan, Ananda Vel Murugan
Cc: user
Subject: Re: Get the previous state string in Spark streaming

Its hard to help without any stacktrace associated with 
UnsupportedOperationException.

On Thu, Oct 15, 2015 at 10:40 PM, Chandra Mohan, Ananda Vel Murugan 
> wrote:

One of my co-worker(Yogesh) was trying to get this posted in spark mailing and 
it seems it did not get posted. So I am reposting it here. Please help.





Hi,

I am new to Spark and was trying to do some experiments with it.



I had a JavaPairDStream RDD.

I want to get the list of string from its previous state. For that I use 
updateStateByKey function as follows:



final Function2, Optional> 
updateFunc =

   new Function2,

Optional>() {



public Optional call(List arg0, 
Optional arg1) throws Exception {

// TODO Auto-generated method stub

if(arg1.toString()==null)

   return Optional.of(arg0);

else {

   arg0.add(arg1.toString());

   return Optional.of(arg0);

}

   }

};



I want the function to append the new list of string to the previous list and 
return the new list. But I am not able to do so. I am getting the C error.

Can anyone which help me out in getting the desired output?




Re: s3a file system and spark deployment mode

2015-10-16 Thread Steve Loughran

> On 15 Oct 2015, at 19:04, Scott Reynolds  wrote:
> 
> List,
> 
> Right now we build our spark jobs with the s3a hadoop client. We do this 
> because our machines are only allowed to use IAM access to the s3 store. We 
> can build our jars with the s3a filesystem and the aws sdk just fine and this 
> jars run great in *client mode*. 
> 
> We would like to move from client mode to cluster mode as that will allow us 
> to be more resilient to driver failure. In order to do this either:
> 1. the jar file has to be on worker's local disk
> 2. the jar file is in shared storage (s3a)
> 
> We would like to put the jar file in s3 storage, but when we give the jar 
> path as s3a://.., the worker node doesn't have the hadoop s3a and aws sdk 
> in its classpath / uber jar.
> 
> Other then building spark with those two dependencies, what other options do 
> I have ? We are using 1.5.1 so SPARK_CLASSPATH is no longer a thing.
> 
> Need to get s3a access to both the master (so that we can log spark event log 
> to s3) and to the worker processes (driver, executor).
> 
> Looking for ideas before just adding the dependencies to our spark build and 
> calling it a day.


you can use --jars to add these, e.g

-jars hadoop-aws.jar,aws-java-sdk-s3


as others have warned, you need Hadoop 2.7.1 for s3a to work proplery

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Best practices to handle corrupted records

2015-10-16 Thread Antonio Murgia
Unfortunately Either doesn’t accept 3 type parameters but only 2 so Either 
solution is not viable.
My solution is pretty similar to Ravindra one. This “post” was to find out if 
there was a common and established solution to this problem, in the spark 
“world”.
On Oct 16, 2015, at 11:05 AM, Ravindra 
> wrote:

+1 Erwan..

May be a trivial solution like this -
class Result (msg: String, record: Record)

class Success (msgSuccess: String, val msg: String, val record: Record)
extends Result(msg, record)

class Failure (msgFailure: String, val msg: String, val record: Record)
extends Result (msg, record)

trait Warning {

}
class SuccessWithWarning(msgWaring: String, val msgSuccess:String,
override val msg: String, override val record: Record)
extends Success(msgSuccess, msg, record) with Warning


val record1 = new Record("k1", "val11", "val21")
val record2 = new Record("k2", "val12", "val22")
val record3 = new Record("k3", "val13", "val23")
val record4 = new Record("k4", "val14", "val24")
val records : List[Record] = List (record1, record2, record3, record4 )

def processRecord(record: Record) : Either[Result,Result] = {
//(record, new Result)
val result: Either[Result,Result] = {
if (record.key.equals("k1"))
Left(new Failure("failed", "result", record))
else
successHandler(record)
}
result
}

def successHandler (record: Record): Either[Result, Result] = {
val result: Either[Result, Result] = {
if (record.key.equals("k2"))
Left(new Success("success", "result", record))
else Right(new SuccessWithWarning("warning", "success", "result", record))
}
result
}

for(record <- records) {
println (processRecord(record))
}


On Fri, Oct 16, 2015 at 1:45 PM Erwan ALLAIN 
> wrote:
Either[FailureResult[T], Either[SuccessWithWarnings[T], SuccessResult[T]]]  
maybe ?


On Thu, Oct 15, 2015 at 5:31 PM, Antonio Murgia 
> wrote:
'Either' does not cover the case where the outcome was successful but generated 
warnings. I already looked into it and also at 'Try' from which I got inspired. 
Thanks for pointing it out anyway!

#A.M.

Il giorno 15 ott 2015, alle ore 16:19, Erwan ALLAIN 
> ha scritto:

What about http://www.scala-lang.org/api/2.9.3/scala/Either.html ?


On Thu, Oct 15, 2015 at 2:57 PM, Roberto Congiu 
> wrote:
I came to a similar solution to a similar problem. I deal with a lot of CSV 
files from many different sources and they are often malformed.
HOwever, I just have success/failure. Maybe you should  make 
SuccessWithWarnings a subclass of success, or getting rid of it altogether 
making the warnings optional.
I was thinking of making this cleaning/conforming library open source if you're 
interested.

R.

2015-10-15 5:28 GMT-07:00 Antonio Murgia 
>:
Hello,
I looked around on the web and I couldn’t find any way to deal in a structured 
way with malformed/faulty records during computation. All I was able to find 
was the flatMap/Some/None technique + logging.
I’m facing this problem because I have a processing algorithm that extracts 
more than one value from each record, but can fail in extracting one of those 
multiple values, and I want to keep track of them. Logging is not feasible 
because this “warning” happens so frequently that the logs would become 
overwhelming and impossibile to read.
Since I have 3 different possible outcomes from my processing I modeled it with 
this class hierarchy:

That holds result and/or warnings.
Since Result implements Traversable it can be used in a flatMap, discarding all 
warnings and failure results, in the other hand, if we want to keep track of 
warnings, we can elaborate them and output them if we need.

Kind Regards
#A.M.



--
--
"Good judgment comes from experience.
Experience comes from bad judgment"
--





Re: Get the previous state string in Spark streaming

2015-10-16 Thread Tathagata Das
A simple Javadoc look up in Java List
 shows
that List.add can throw UnsupportedOperationException if the implementation
of the List interface does not support that operation (example, does not
support adding null). A good place to start would be to print the parameter
to the List.add(), and the type of class used for List.add(). Even better
if you can reproduce it locally, then you will get the prints in the driver
logs. Otherwise you will have to hunt for that print statement in the
executor logs.

Also, I am not sure what you are trying to do but using state.toString()
(which is, Optional.toString) to compare with null, and then trying to add
that to a list, both seem pretty weird to me. Are you what you want to do
works as you intend to. Might be worth trying this out locally first with
println() to see whether the logic actually works as you think it should.

Hope this helps.

On Fri, Oct 16, 2015 at 2:23 AM, Chandra Mohan, Ananda Vel Murugan <
ananda.muru...@honeywell.com> wrote:

> Hi,
>
>
>
> Thanks for the response. We are trying to implement something similar as
> discussed in the following SFO post.
>
>
>
>
> http://stackoverflow.com/questions/27535668/spark-streaming-groupbykey-and-updatestatebykey-implementation
>
>
>
> We are doing it in java while accepted answer(second answer) in this post
> is in Scala.
>
>
>
> We wrote our java code taking this scala code as reference. But we are
> getting exception in highlighted line i.e. in  return Optional.of(
> events.add(state.toString());); Specifically, it happens when we call
> events.add()
>
>
>
> final Function2,
> Optional> updateFunc =
>
>new Function2,
>
> Optional>() {
>
>
>
> public Optional call(List events,
> Optional state) throws Exception {
>
> // TODO Auto-generated method stub
>
> if(state.toString()==null)
>
>return Optional.of(events);
>
> else {
>
> //UnsupportedOperationException here
>
>return Optional.of(events.add(state.toString()););
>
> }
>
>}
>
> };
>
> Please let us know if you need more details. Unfortunately we are not in a
> position to share whole code.
>
> Thanks
>
>
>
> Regards,
>
> Anand/Yogesh
>
> *From:* Tathagata Das [mailto:t...@databricks.com]
> *Sent:* Friday, October 16, 2015 1:22 PM
> *To:* Chandra Mohan, Ananda Vel Murugan
> *Cc:* user
> *Subject:* Re: Get the previous state string in Spark streaming
>
>
>
> Its hard to help without any stacktrace associated
> with UnsupportedOperationException.
>
>
>
> On Thu, Oct 15, 2015 at 10:40 PM, Chandra Mohan, Ananda Vel Murugan <
> ananda.muru...@honeywell.com> wrote:
>
> One of my co-worker(Yogesh) was trying to get this posted in spark mailing
> and it seems it did not get posted. So I am reposting it here. Please help.
>
>
>
>
>
> Hi,
>
> I am new to Spark and was trying to do some experiments with it.
>
>
>
> I had a JavaPairDStream RDD.
>
> I want to get the list of string from its previous state. For that I use
> updateStateByKey function as follows:
>
>
>
> final Function2,
> Optional> updateFunc =
>
>new Function2,
>
> Optional>() {
>
>
>
> public Optional call(List arg0,
> Optional arg1) throws Exception {
>
> // TODO Auto-generated method stub
>
> if(arg1.toString()==null)
>
>return Optional.of(arg0);
>
> else {
>
>arg0.add(arg1.toString());
>
>return Optional.of(arg0);
>
> }
>
>}
>
> };
>
>
>
> I want the function to append the new list of string to the previous list
> and return the new list. But I am not able to do so. I am getting the C
> error.
>
> Can anyone which help me out in getting the desired output?
>
>
>
>
>


Re: Ensuring eager evaluation inside mapPartitions

2015-10-16 Thread Sean Owen
If you mean, getResult is called on the result of foo for each record, then
that already happens. If you mean getResults is called only after foo has
been called on all records, then you have to collect to a list, yes.

Why does it help with foo being slow in either case though?
You can try to consume the iterator in parallel with ".par" if that's what
you're getting at.

On Fri, Oct 16, 2015 at 10:47 AM, alberskib  wrote:

> Hi all,
>
> I am wondering whether there is way to ensure that two consecutive maps
> inside mapPartition will not be chained together.
>
> To illustrate my question I prepared short example:
>
> rdd.mapPartitions(it => {
> it.map(x => foo(x)).map(y => y.getResult)
> }
>
> I would like to ensure that foo method will be applied to all records (from
> partition) and only after that method getResult invoked on each record. It
> could be beneficial in situation that foo method is some kind of time
> consuming IO operation i.e. request to external service for data (data that
> couldn't be prefetched).
>
> I know that converting iterator into list will do the job but maybe there
> is
> more clever way for doing it.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Ensuring-eager-evaluation-inside-mapPartitions-tp25085.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Does Spark use more memory than MapReduce?

2015-10-16 Thread Gylfi
By default Spark will actually not keep the data at all, it will just store
"how" to recreate the data. 
The programmer can however choose to keep the data once instantiated by
calling "/.persist()/" or "/.cache()/" on the RDD. 
/.cache/ will store the data in-memory only and fail if it will not fit. 
/.persist/ will by default use memory but spill to disk if needed. 
/.persist(StorageLevel)/ allows you to write it all to disk (no in-memory
overhead). 

See:
http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence

In addition, you can define your own StorageLevel and thus if you have
magnetic and SSD disks you can choose to persist the data to the disk-level
you want (depending on how "hot" you consider the data). 

Essentially, you have full freedom to do what you will with the data in
Spark :)  

Hope this helps. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Does-Spark-use-more-memory-than-MapReduce-tp25030p25087.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Ensuring eager evaluation inside mapPartitions

2015-10-16 Thread Bartłomiej Alberski
I mean getResults is called only after foo has been called on all records.

It could be useful if foo is asynchronous call to external service
returning Future that provide you some additional data i.e REST API (IO
operations).
If such API has latency of 100ms, sending all requests (for 1000 records)
before waiting on first result will give you total latency around 100ms.
If you sequentially invoke foo (hit to the rest API), wait for it result
and just after that process second record you will loose around 100ms on
each record.

Ideally (from external service point of view) it will be to not only use
asynchronous requests but also batch them.

2015-10-16 12:08 GMT+02:00 Sean Owen :

> If you mean, getResult is called on the result of foo for each record,
> then that already happens. If you mean getResults is called only after foo
> has been called on all records, then you have to collect to a list, yes.
>
> Why does it help with foo being slow in either case though?
> You can try to consume the iterator in parallel with ".par" if that's what
> you're getting at.
>
> On Fri, Oct 16, 2015 at 10:47 AM, alberskib  wrote:
>
>> Hi all,
>>
>> I am wondering whether there is way to ensure that two consecutive maps
>> inside mapPartition will not be chained together.
>>
>> To illustrate my question I prepared short example:
>>
>> rdd.mapPartitions(it => {
>> it.map(x => foo(x)).map(y => y.getResult)
>> }
>>
>> I would like to ensure that foo method will be applied to all records
>> (from
>> partition) and only after that method getResult invoked on each record. It
>> could be beneficial in situation that foo method is some kind of time
>> consuming IO operation i.e. request to external service for data (data
>> that
>> couldn't be prefetched).
>>
>> I know that converting iterator into list will do the job but maybe there
>> is
>> more clever way for doing it.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Ensuring-eager-evaluation-inside-mapPartitions-tp25085.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


HBase Spark Streaming giving error after restore

2015-10-16 Thread Amit Singh Hora
Hi All,

I am using below code to stream data from kafka to hbase ,everything works
fine until i restart the job so that it can restore the state from
checkpoint directory ,but while trying to restore the state it give me below
error

ge 0.0 (TID 0, localhost): java.lang.ClassCastException:
scala.runtime.BoxedUnit cannot be cast to
org.apache.hadoop.hbase.client.Mutation

please find below code

tweetsRDD.foreachRDD(rdd=>{
  val hconf = HBaseConfiguration.create();
hconf.set(TableOutputFormat.OUTPUT_TABLE, hbasetablename)
hconf.set("zookeeper.session.timeout",
conf.getString("hbase.zookeepertimeout"));
hconf.set("hbase.client.retries.number", Integer.toString(1));
hconf.set("zookeeper.recovery.retry", Integer.toString(1));
hconf.set("hbase.master", conf.getString("hbase.hbase_master"));
   
hconf.set("hbase.zookeeper.quorum",conf.getString("hbase.hbase_zkquorum"));
hconf.set("zookeeper.znode.parent", "/hbase-unsecure");
hconf.set("hbase.zookeeper.property.clientPort",
conf.getString("hbase.hbase_zk_port"));
 hconf.setClass("mapreduce.outputformat.class",
classOf[TableOutputFormat[String]], classOf[OutputFormat[String,
BoxedUnit]])
 
 rdd.map ( record =>(new ImmutableBytesWritable,{
 

var maprecord = new HashMap[String, String];
  val mapper = new ObjectMapper();

  //convert JSON string to Map

  maprecord = mapper.readValue(record.toString(),
new TypeReference[HashMap[String, String]]() {});

  
  var ts:Long= maprecord.get("ts").toLong
  var tweetID:Long= maprecord.get("id").toLong
  val key=ts+"_"+tweetID;
  val   put=new Put(Bytes.toBytes(key))
   maprecord.foreach(kv => {
//  println(kv._1+" - "+kv._2)
 
put.add(Bytes.toBytes(colfamily.value),Bytes.toBytes(kv._1),Bytes.toBytes(kv._2))
  

  }
   )
   put
  
}  
 ) 
 ).saveAsNewAPIHadoopDataset(hconf)
 
  })
 


help me out in solving this as it is urgent for me



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/HBase-Spark-Streaming-giving-error-after-restore-tp25089.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



HBase Spark Streaming giving error after restore

2015-10-16 Thread Amit Singh Hora
Hi All,

I am using below code to stream data from kafka to hbase ,everything works
fine until i restart the job so that it can restore the state from
checkpoint directory ,but while trying to restore the state it give me below
error

ge 0.0 (TID 0, localhost): java.lang.ClassCastException:
scala.runtime.BoxedUnit cannot be cast to
org.apache.hadoop.hbase.client.Mutation

please find below code

tweetsRDD.foreachRDD(rdd=>{
  val hconf = HBaseConfiguration.create();
hconf.set(TableOutputFormat.OUTPUT_TABLE, hbasetablename)
hconf.set("zookeeper.session.timeout",
conf.getString("hbase.zookeepertimeout"));
hconf.set("hbase.client.retries.number", Integer.toString(1));
hconf.set("zookeeper.recovery.retry", Integer.toString(1));
hconf.set("hbase.master", conf.getString("hbase.hbase_master"));
   
hconf.set("hbase.zookeeper.quorum",conf.getString("hbase.hbase_zkquorum"));
hconf.set("zookeeper.znode.parent", "/hbase-unsecure");
hconf.set("hbase.zookeeper.property.clientPort",
conf.getString("hbase.hbase_zk_port"));
 hconf.setClass("mapreduce.outputformat.class",
classOf[TableOutputFormat[String]], classOf[OutputFormat[String,
BoxedUnit]])

//i have also tried using 
 // hconf.setClass("mapreduce.outputformat.class",
classOf[TableOutputFormat[String]], //classOf[OutputFormat[String,
Mutation]])
 rdd.map ( record =>(new ImmutableBytesWritable,{
 

var maprecord = new HashMap[String, String];
  val mapper = new ObjectMapper();

  //convert JSON string to Map

  maprecord = mapper.readValue(record.toString(),
new TypeReference[HashMap[String, String]]() {});

  
  var ts:Long= maprecord.get("ts").toLong
  var tweetID:Long= maprecord.get("id").toLong
  val key=ts+"_"+tweetID;
  val   put=new Put(Bytes.toBytes(key))
   maprecord.foreach(kv => {
//  println(kv._1+" - "+kv._2)
 
put.add(Bytes.toBytes(colfamily.value),Bytes.toBytes(kv._1),Bytes.toBytes(kv._2))
  

  }
   )
   put
  
}  
 ) 
 ).saveAsNewAPIHadoopDataset(hconf)
 
  })
 


help me out in solving this as it is urgent for me



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/HBase-Spark-Streaming-giving-error-after-restore-tp25090.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



issue of tableau connect to spark sql 1.5

2015-10-16 Thread Wangfei (X)

  Hi all!

  I test tableau(9.1.0 32bit) to read tables form spark sql(build from 
branch-1.5) using odbc. And found the following issue:

#

# "[Simba][SQLEngine] (31740) Table or view not found: SPARK.default.src
# table "[default].[src]"  not exist"


and i found a very stange issue that if i run the sql of "select 
count(1) from src" from tableau the spark ui show that it run the sql of 
" SELECT 1". 



Any one tested tableau with spark sql 1.5 successfully?

Thanks Fei.









Convert SchemaRDD to RDD

2015-10-16 Thread satish chandra j
Hi All,
To convert SchemaRDD to RDD below snipped is working if SQL statement has
columns in a row are less than 22 as per tuple restriction

rdd.map(row => row.toString)

But if SQL statement has columns more than 22 than the above snippet will
error "*object Tuple27 is not a member of package scala*"

Could anybody please provide inputs to convert SchemaRDD to RDD without
using Tuple in the implementation approach

Thanks for your valuable inputs in advance

Regards,
Satish Chandra


Issue of jar dependency in yarn-cluster mode

2015-10-16 Thread Rex Xiong
Hi folks,

In my spark application, executor task depends on snakeyaml-1.10.jar
I build it with Maven and it works fine:
spark-submit --master local --jars d:\snakeyaml-1.10.jar ...

But when I try to run it in yarn, I have issue, it seems spark executor
cannot find the jar file:
spark-submit --master yarn-cluster --jars hdfs://../snakeyaml-1.10.jar
  ..

java.lang.NoSuchMethodError:
org.yaml.snakeyaml.Yaml.(Lorg/yaml/snakeyaml/constructor/BaseConstructor;)V

I check one executor container folder, snakeyaml-1.10.jar has been
successfully downloaded, and in spark driver page, in environment tab,
spark.yarn.secondary.jars
also contains snakeyaml-1.10.jar

I have no idea why it doesn't work. Could some one help to take a look?

Thanks


Re: HBase Spark Streaming giving error after restore

2015-10-16 Thread Ted Yu
Can you show the complete stack trace ?

Subclass of Mutation is expected. Put is a subclass.

Have you tried replacing BoxedUnit with Put in your code ?

Cheers

On Fri, Oct 16, 2015 at 6:02 AM, Amit Singh Hora 
wrote:

> Hi All,
>
> I am using below code to stream data from kafka to hbase ,everything works
> fine until i restart the job so that it can restore the state from
> checkpoint directory ,but while trying to restore the state it give me
> below
> error
>
> ge 0.0 (TID 0, localhost): java.lang.ClassCastException:
> scala.runtime.BoxedUnit cannot be cast to
> org.apache.hadoop.hbase.client.Mutation
>
> please find below code
>
> tweetsRDD.foreachRDD(rdd=>{
>   val hconf = HBaseConfiguration.create();
> hconf.set(TableOutputFormat.OUTPUT_TABLE, hbasetablename)
> hconf.set("zookeeper.session.timeout",
> conf.getString("hbase.zookeepertimeout"));
> hconf.set("hbase.client.retries.number", Integer.toString(1));
> hconf.set("zookeeper.recovery.retry", Integer.toString(1));
> hconf.set("hbase.master", conf.getString("hbase.hbase_master"));
>
> hconf.set("hbase.zookeeper.quorum",conf.getString("hbase.hbase_zkquorum"));
> hconf.set("zookeeper.znode.parent", "/hbase-unsecure");
> hconf.set("hbase.zookeeper.property.clientPort",
> conf.getString("hbase.hbase_zk_port"));
>  hconf.setClass("mapreduce.outputformat.class",
> classOf[TableOutputFormat[String]], classOf[OutputFormat[String,
> BoxedUnit]])
>
>  rdd.map ( record =>(new ImmutableBytesWritable,{
>
>
> var maprecord = new HashMap[String, String];
>   val mapper = new ObjectMapper();
>
>   //convert JSON string to Map
>
>   maprecord = mapper.readValue(record.toString(),
> new TypeReference[HashMap[String, String]]() {});
>
>
>   var ts:Long= maprecord.get("ts").toLong
>   var tweetID:Long= maprecord.get("id").toLong
>   val key=ts+"_"+tweetID;
>   val   put=new Put(Bytes.toBytes(key))
>maprecord.foreach(kv => {
> //  println(kv._1+" - "+kv._2)
>
>
> put.add(Bytes.toBytes(colfamily.value),Bytes.toBytes(kv._1),Bytes.toBytes(kv._2))
>
>
>   }
>)
>put
>
> }
>  )
>  ).saveAsNewAPIHadoopDataset(hconf)
>
>   })
>
>
>
> help me out in solving this as it is urgent for me
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/HBase-Spark-Streaming-giving-error-after-restore-tp25089.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


HTTP 500 if try to access Spark UI in yarn-cluster (only)

2015-10-16 Thread Sebastian YEPES FERNANDEZ
​Hello,

I am wondering if anyone else is also facing this ​issue:
https://issues.apache.org/jira/browse/SPARK-11147


​


Re: Convert SchemaRDD to RDD

2015-10-16 Thread Ted Yu
Have you seen this thread ?
http://search-hadoop.com/m/q3RTt9YBFr17u8j8=Scala+Limitation+Case+Class+definition+with+more+than+22+arguments

On Fri, Oct 16, 2015 at 7:41 AM, satish chandra j 
wrote:

> Hi All,
> To convert SchemaRDD to RDD below snipped is working if SQL statement has
> columns in a row are less than 22 as per tuple restriction
>
> rdd.map(row => row.toString)
>
> But if SQL statement has columns more than 22 than the above snippet will
> error "*object Tuple27 is not a member of package scala*"
>
> Could anybody please provide inputs to convert SchemaRDD to RDD without
> using Tuple in the implementation approach
>
> Thanks for your valuable inputs in advance
>
> Regards,
> Satish Chandra
>


Compiling spark 1.5.1 fails with scala.reflect.internal.Types$TypeError: bad symbolic reference.

2015-10-16 Thread Simon Hafner
Fresh clone of spark 1.5.1, java version "1.7.0_85"

build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package

[error] bad symbolic reference. A signature in WebUI.class refers to
term eclipse
[error] in package org which is not available.
[error] It may be completely missing from the current classpath, or
the version on
[error] the classpath might be incompatible with the version used when
compiling WebUI.class.
[error] bad symbolic reference. A signature in WebUI.class refers to term jetty
[error] in value org.eclipse which is not available.
[error] It may be completely missing from the current classpath, or
the version on
[error] the classpath might be incompatible with the version used when
compiling WebUI.class.
[error]
[error]  while compiling:
/root/spark/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
[error] during phase: erasure
[error]  library version: version 2.10.4
[error] compiler version: version 2.10.4
[error]   reconstructed args: -deprecation -classpath
/root/spark/sql/core/target/scala-2.10/classes:/root/.m2/repository/org/apache/spark/spark-core_2.10/1.6.0-SNAPSHOT/spark-core_2.10-1.6.0-SNAPSHOT.jar:/root/.m
2/repository/org/apache/avro/avro-mapred/1.7.7/avro-mapred-1.7.7-hadoop2.jar:/root/.m2/repository/org/apache/avro/avro-ipc/1.7.7/avro-ipc-1.7.7.jar:/root/.m2/repository/org/apache/avro/avro-ipc/1.7.7/avro-ipc-1.7.
7-tests.jar:/root/.m2/repository/com/twitter/chill_2.10/0.5.0/chill_2.10-0.5.0.jar:/root/.m2/repository/com/esotericsoftware/kryo/kryo/2.21/kryo-2.21.jar:/root/.m2/repository/com/esotericsoftware/reflectasm/reflec
tasm/1.07/reflectasm-1.07-shaded.jar:/root/.m2/repository/com/esotericsoftware/minlog/minlog/1.2/minlog-1.2.jar:/root/.m2/repository/com/twitter/chill-java/0.5.0/chill-java-0.5.0.jar:/root/.m2/repository/org/apach
e/hadoop/hadoop-client/2.4.0/hadoop-client-2.4.0.jar:/root/.m2/repository/org/apache/hadoop/hadoop-common/2.4.0/hadoop-common-2.4.0.jar:/root/.m2/repository/commons-cli/commons-cli/1.2/commons-cli-1.2.jar:/root/.m
2/repository/xmlenc/xmlenc/0.52/xmlenc-0.52.jar:/root/.m2/repository/commons-httpclient/commons-httpclient/3.1/commons-httpclient-3.1.jar:/root/.m2/repository/commons-collections/commons-collections/3.2.1/commons-
collections-3.2.1.jar:/root/.m2/repository/commons-configuration/commons-configuration/1.6/commons-configuration-1.6.jar:/root/.m2/repository/commons-digester/commons-digester/1.8/commons-digester-1.8.jar:/root/.m
2/repository/commons-beanutils/commons-beanutils/1.7.0/commons-beanutils-1.7.0.jar:/root/.m2/repository/commons-beanutils/commons-beanutils-core/1.8.0/commons-beanutils-core-1.8.0.jar:/root/.m2/repository/org/apac
he/hadoop/hadoop-auth/2.4.0/hadoop-auth-2.4.0.jar:/root/.m2/repository/org/apache/hadoop/hadoop-hdfs/2.4.0/hadoop-hdfs-2.4.0.jar:/root/.m2/repository/org/mortbay/jetty/jetty-util/6.1.26/jetty-util-6.1.26.jar:/root
/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-app/2.4.0/hadoop-mapreduce-client-app-2.4.0.jar:/root/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-common/2.4.0/hadoop-mapreduce-client-common-
2.4.0.jar:/root/.m2/repository/org/apache/hadoop/hadoop-yarn-client/2.4.0/hadoop-yarn-client-2.4.0.jar:/root/.m2/repository/com/sun/jersey/jersey-client/1.9/jersey-client-1.9.jar:/root/.m2/repository/org/apache/ha
doop/hadoop-yarn-server-common/2.4.0/hadoop-yarn-server-common-2.4.0.jar:/root/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-shuffle/2.4.0/hadoop-mapreduce-client-shuffle-2.4.0.jar:/root/.m2/repository/
org/apache/hadoop/hadoop-yarn-api/2.4.0/hadoop-yarn-api-2.4.0.jar:/root/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-core/2.4.0/hadoop-mapreduce-client-core-2.4.0.jar:/root/.m2/repository/org/apache/ha
doop/hadoop-yarn-common/2.4.0/hadoop-yarn-common-2.4.0.jar:/root/.m2/repository/javax/xml/bind/jaxb-api/2.2.2/jaxb-api-2.2.2.jar:/root/.m2/repository/javax/xml/stream/stax-api/1.0-2/stax-api-1.0-2.jar:/root/.m2/re
pository/org/apache/hadoop/hadoop-mapreduce-client-jobclient/2.4.0/hadoop-mapreduce-client-jobclient-2.4.0.jar:/root/.m2/repository/org/apache/hadoop/hadoop-annotations/2.4.0/hadoop-annotations-2.4.0.jar:/root/.m2
/repository/org/apache/spark/spark-launcher_2.10/1.6.0-SNAPSHOT/spark-launcher_2.10-1.6.0-SNAPSHOT.jar:/root/.m2/repository/org/apache/spark/spark-network-common_2.10/1.6.0-SNAPSHOT/spark-network-common_2.10-1.6.0
-SNAPSHOT.jar:/root/.m2/repository/org/apache/spark/spark-network-shuffle_2.10/1.6.0-SNAPSHOT/spark-network-shuffle_2.10-1.6.0-SNAPSHOT.jar:/root/.m2/repository/org/fusesource/leveldbjni/leveldbjni-all/1.8/leveldb
jni-all-1.8.jar:/root/.m2/repository/org/apache/spark/spark-unsafe_2.10/1.6.0-SNAPSHOT/spark-unsafe_2.10-1.6.0-SNAPSHOT.jar:/root/.m2/repository/net/java/dev/jets3t/jets3t/0.9.3/jets3t-0.9.3.jar:/root/.m2/reposito

Re: Convert SchemaRDD to RDD

2015-10-16 Thread satish chandra j
HI Ted,
I have implemented the below snipped but getting an error"type mismatch
found String required Serializable" as mentioned in mail chain

class MyRecord(val val1: String, val val2: String, ... more then 22,
in this case f.e. 26)
  extends Product with Serializable {

  def canEqual(that: Any): Boolean = that.isInstanceOf[MyRecord]

  def productArity: Int = 26 // example value, it is amount of arguments

  def productElement(n: Int): Serializable = n match {
case  1 => val1
case  2 => val2
//... cases up to 26
  }
}
​
hence expecting an approach to convert SchemaRDD to RDD without using Tuple
or Case Class as we have restrictions in Scala 2.10

Regards
Satish Chandra


Re: s3a file system and spark deployment mode

2015-10-16 Thread Scott Reynolds
hmm I tried using --jars and that got passed to MasterArguments and that
doesn't work :-(

https://github.com/apache/spark/blob/branch-1.5/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala

Same with Worker:
https://github.com/apache/spark/blob/branch-1.5/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala

Both Master and Worker have to start with these two jars because
a.) the Master has to serve the event log in s3
b.) the Worker runs the Driver and has to download the jar from s3

And yes I am using these deps:



org.apache.hadoop
hadoop-aws
2.7.1



com.amazonaws
aws-java-sdk
1.7.4


I think I have settled on just modifying the java command line that starts
up the worker and master. Just seems easier. Currently launching them with
spark-class bash script

/mnt/services/spark/bin/spark-class org.apache.spark.deploy.master.Master \
--ip `hostname -i` --port 7077 --webui-port 8080

If all else fails I will update the spark pom and and include it in the
shaded spark jar.

On Fri, Oct 16, 2015 at 2:25 AM, Steve Loughran 
wrote:

>
> > On 15 Oct 2015, at 19:04, Scott Reynolds  wrote:
> >
> > List,
> >
> > Right now we build our spark jobs with the s3a hadoop client. We do this
> because our machines are only allowed to use IAM access to the s3 store. We
> can build our jars with the s3a filesystem and the aws sdk just fine and
> this jars run great in *client mode*.
> >
> > We would like to move from client mode to cluster mode as that will
> allow us to be more resilient to driver failure. In order to do this either:
> > 1. the jar file has to be on worker's local disk
> > 2. the jar file is in shared storage (s3a)
> >
> > We would like to put the jar file in s3 storage, but when we give the
> jar path as s3a://.., the worker node doesn't have the hadoop s3a and
> aws sdk in its classpath / uber jar.
> >
> > Other then building spark with those two dependencies, what other
> options do I have ? We are using 1.5.1 so SPARK_CLASSPATH is no longer a
> thing.
> >
> > Need to get s3a access to both the master (so that we can log spark
> event log to s3) and to the worker processes (driver, executor).
> >
> > Looking for ideas before just adding the dependencies to our spark build
> and calling it a day.
>
>
> you can use --jars to add these, e.g
>
> -jars hadoop-aws.jar,aws-java-sdk-s3
>
>
> as others have warned, you need Hadoop 2.7.1 for s3a to work proplery
>


Re: Issue of jar dependency in yarn-cluster mode

2015-10-16 Thread Rex Xiong
I resolve this issue finally by adding --conf spark.executor.extraClassPath=
snakeyaml-1.10.jar

2015-10-16 22:57 GMT+08:00 Rex Xiong :

> Hi folks,
>
> In my spark application, executor task depends on snakeyaml-1.10.jar
> I build it with Maven and it works fine:
> spark-submit --master local --jars d:\snakeyaml-1.10.jar ...
>
> But when I try to run it in yarn, I have issue, it seems spark executor
> cannot find the jar file:
> spark-submit --master yarn-cluster --jars hdfs://../snakeyaml-1.10.jar
>   ..
>
> java.lang.NoSuchMethodError:
> org.yaml.snakeyaml.Yaml.(Lorg/yaml/snakeyaml/constructor/BaseConstructor;)V
>
> I check one executor container folder, snakeyaml-1.10.jar has been
> successfully downloaded, and in spark driver page, in environment tab, 
> spark.yarn.secondary.jars
> also contains snakeyaml-1.10.jar
>
> I have no idea why it doesn't work. Could some one help to take a look?
>
> Thanks
>
>


Accessing HDFS HA from spark job (UnknownHostException error)

2015-10-16 Thread kyarovoy
I have Apache Mesos 0.22.1 cluster (3 masters & 5 slaves), running Cloudera
HDFS (2.5.0-cdh5.3.1) in HA configuration and Spark 1.5.1 framework. 

When I try to spark-submit compiled HdfsTest.scala example app (from Spark
1.5.1 sources) - it fails with "java.lang.IllegalArgumentException:
java.net.UnknownHostException: hdfs" error in executor logs. This error is
only observed when I pass HDFS HA Path as an argument ("hdfs://hdfs/"), when
I pass "hdfs://namenode1.hdfs.mesos:50071/tesfile" - everything works fine. 

What I've found after enabling TRACE logging is that Spark driver actually
reads hdfs://hdfs URL correctly, but Spark executor - doesn't. 

My Scala app code: 

import org.apache.spark._ 
object HdfsTest { 
  def main(args: Array[String]) { 
val sparkConf = new SparkConf().setAppName("HdfsTest") 
val sc = new SparkContext(sparkConf) 
val file = sc.textFile(args(0)) 
val mapped = file.map(s => s.length).cache() 
for (iter <- 1 to 10) { 
  val start = System.currentTimeMillis() 
  for (x <- mapped) { x + 2 } 
  val end = System.currentTimeMillis() 
  println("Iteration " + iter + " took " + (end-start) + " ms") 
} 
sc.stop() 
   } 
  } 
I compile this code and submit jar file to Spark in cluster mode: 
/opt/spark/bin/spark-submit --deploy-mode cluster --class
com.cisco.hdfs.HdfsTest http://1.2.3.4/HdfsTest-0.0.1.jar
hdfs://hdfs/testfile 

My spark-defaults.conf file: 

spark.master spark://1.2.3.4:7077 
spark.eventLog.enabled   true 
spark.driver.memory  1g 

My spark-env.sh file: 

export HADOOP_HOME=/opt/spark 
export HADOOP_CONF_DIR=/opt/spark/conf 

I have spark deployed on each slave in /opt/spark directory. 

I can accesses HDFS using "hdfs dfs -ls hdfs://hdfs/" command in console,
without the need to specify active namenode address and port. 

core-site.xml: 
-- 

 
  fs.default.name
  hdfs://hdfs
 


hdfs-site.xml: 
-- 

 
  dfs.ha.automatic-failover.enabled
  true
 

 
  dfs.nameservice.id
  hdfs
 

 
  dfs.nameservices
  hdfs
 

 
  dfs.ha.namenodes.hdfs
  nn1,nn2
 

 
  dfs.namenode.rpc-address.hdfs.nn1
  namenode1.hdfs.mesos:50071
 

 
  dfs.namenode.http-address.hdfs.nn1
  namenode1.hdfs.mesos:50070
 

 
  dfs.namenode.rpc-address.hdfs.nn2
  namenode2.hdfs.mesos:50071
 

 
  dfs.namenode.http-address.hdfs.nn2
  namenode2.hdfs.mesos:50070
 

 
  dfs.client.failover.proxy.provider.hdfs
 
org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
 

 

 
  dfs.namenode.shared.edits.dir

qjournal://journalnode1.hdfs.mesos:8485;journalnode2.hdfs.mesos:8485;journalnode3.hdfs.mesos:8485/hdfs
   

 
   ha.zookeeper.quorum
   master.mesos:2181
 

 
  dfs.journalnode.edits.dir
  /var/lib/hdfs/data/jn
 

 
   dfs.namenode.name.dir
   file:///var/lib/hdfs/data/name
 

 
   dfs.datanode.data.dir
   file:///var/lib/hdfs/data/data
 

 
  dfs.ha.fencing.methods
  shell(/bin/true)
 

 
  dfs.permissions
  false
 

 
  dfs.datanode.du.reserved
  10485760
 

 
  dfs.datanode.balance.bandwidthPerSec
  41943040
 

 
   dfs.namenode.safemode.threshold-pct
   0.90
 

 
  dfs.namenode.heartbeat.recheck-interval
  6
 

 
  dfs.datanode.handler.count
  10
 

 
  dfs.namenode.handler.count
  20
 

 
  dfs.image.compress
  true
 

 
  dfs.image.compression.codec
  org.apache.hadoop.io.compress.SnappyCodec
 

 
  dfs.namenode.invalidate.work.pct.per.iteration
  0.35f
 

 
  dfs.namenode.replication.work.multiplier.per.iteration
  4
 

 
  dfs.namenode.datanode.registration.ip-hostname-check
  false
 

 
   dfs.client.read.shortcircuit
   true
 

 
  dfs.client.read.shortcircuit.streams.cache.size
  1000
 

 
  dfs.client.read.shortcircuit.streams.cache.size.expiry.ms
   1000
 

 
  dfs.domain.socket.path
  /var/run/hadoop-hdfs/dn._PORT
 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Accessing-HDFS-HA-from-spark-job-UnknownHostException-error-tp25092.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Dynamic partition pruning

2015-10-16 Thread Xiao Li
Hi, Younes,

Maybe you can open a JIRA?

Thanks,

Xiao Li

2015-10-16 12:43 GMT-07:00 Younes Naguib :

> Thanks,
>
> Do you have a Jira I can follow for this?
>
>
>
> y
>
>
>
> *From:* Michael Armbrust [mailto:mich...@databricks.com]
> *Sent:* October-16-15 2:18 PM
> *To:* Younes Naguib
> *Cc:* user@spark.apache.org
> *Subject:* Re: Dynamic partition pruning
>
>
>
> We don't support dynamic partition pruning yet.
>
>
>
> On Fri, Oct 16, 2015 at 10:20 AM, Younes Naguib <
> younes.nag...@tritondigital.com> wrote:
>
> Hi all
>
>
>
> I’m running sqls on spark 1.5.1 and using tables based on parquets.
>
> My tables are not pruned when joined on partition columns.
>
> Ex:
>
> Select  from tab where partcol=1 will prune on value 1
>
> Select  from tab join dim on (dim.partcol=tab.partcol) where
> dim.partcol=1 will scan all partitions.
>
>
>
> Any ideas or workarounds?
>
>
>
>
>
> *Thanks,*
>
> *Younes*
>
>
>
>
>


Multiple joins in Spark

2015-10-16 Thread Shyam Parimal Katti
Hello All,

I have a following SQL query like this:

select a.a_id, b.b_id, c.c_id from table_a a join table_b b on a.a_id =
b.a_id join table_c c on b.b_id = c.b_id

In scala i have done this so far:

table_a_rdd = sc.textFile(...)
table_b_rdd = sc.textFile(...)
table_c_rdd = sc.textFile(...)

val table_a_rowRDD = table_a_rdd.map(_.split("\\x07")).map(line =>
(line(0), line))
val table_b_rowRDD = table_a_rdd.map(_.split("\\x07")).map(line =>
(line(0), line))
val table_c_rowRDD = table_a_rdd.map(_.split("\\x07")).map(line =>
(line(0), line))

Each line has the first value at its primary key.

While I can join 2 RDDs using table_a_rowRDD.join(table_b_rowRDD) to join,
is it possible to join multiple RDDs in a single expression? like
table_a_rowRDD.join(table_b_rowRDD).join(table_c_rowRDD) ? Also, how can I
specify the column on which I can join multiple RDDs?


Re: Problem of RDD in calculation

2015-10-16 Thread Xiao Li
Hi, Frank,

After registering these DF as a temp table (via the API registerTempTable),
you can do it using SQL. I believe this should be much easier.

Good luck,

Xiao Li

2015-10-16 12:10 GMT-07:00 ChengBo :

> Hi all,
>
>
>
> I am new in Spark, and I have a question in dealing with RDD.
>
> I’ve converted RDD to DataFrame. So there are two DF: DF1 and DF2
>
> DF1 contains: userID, time, dataUsage, duration
>
> DF2 contains: userID
>
>
>
> Each userID has multiple rows in DF1.
>
> DF2 has distinct userID, and I would like to compute the average, max and
> min value of both dataUsage and duration for each userID in DF1?
>
> And store the results in a new dataframe.
>
> How can I do that?
>
> Thanks a lot.
>
>
>
> Best
>
> Frank
>


driver ClassNotFoundException when MySQL JDBC exceptions are thrown on executor

2015-10-16 Thread Hurshal Patel
Hi all,

I've been struggling with a particularly puzzling issue after upgrading to
Spark 1.5.1 from Spark 1.4.1.

When I use the MySQL JDBC connector and an exception (e.g.
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException) is thrown on the
executor, I get a ClassNotFoundException on the driver, which results in
this error (logs are abbreviated):

15/10/16 17:20:59 INFO SparkContext: Starting job: collect at repro.scala:73
...
15/10/16 17:20:59 INFO Executor: Running task 0.0 in stage 3.0 (TID 3)
15/10/16 17:20:59 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 3)
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException
at repro.Repro$$anonfun$main$3.apply$mcZI$sp(repro.scala:69)
...
15/10/16 17:20:59 WARN ThrowableSerializationWrapper: Task exception could
not be deserialized
java.lang.ClassNotFoundException:
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
...
15/10/16 17:20:59 ERROR TaskResultGetter: Could not deserialize
TaskEndReason: ClassNotFound with classloader
org.apache.spark.util.MutableURLClassLoader@7f08a6b1
15/10/16 17:20:59 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 3,
localhost): UnknownReason
15/10/16 17:20:59 ERROR TaskSetManager: Task 0 in stage 3.0 failed 1 times;
aborting job
15/10/16 17:20:59 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks
have all completed, from pool
15/10/16 17:20:59 INFO TaskSchedulerImpl: Cancelling stage 3
15/10/16 17:20:59 INFO DAGScheduler: ResultStage 3 (collect at
repro.scala:73) failed in 0.012 s
15/10/16 17:20:59 INFO DAGScheduler: Job 3 failed: collect at
repro.scala:73, took 0.018694 s

 In Spark 1.4.1, I get the following (logs are abbreviated):
15/10/16 17:42:41 INFO SparkContext: Starting job: collect at repro.scala:53
...
15/10/16 17:42:41 INFO Executor: Running task 0.0 in stage 2.0 (TID 2)
15/10/16 17:42:41 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2)
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException
at repro.Repro$$anonfun$main$2.apply$mcZI$sp(repro.scala:49)
...
15/10/16 17:42:41 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2,
localhost): com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException
at repro.Repro$$anonfun$main$2.apply$mcZI$sp(repro.scala:49)
...

15/10/16 17:42:41 ERROR TaskSetManager: Task 0 in stage 2.0 failed 1 times;
aborting job
15/10/16 17:42:41 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks
have all completed, from pool
15/10/16 17:42:41 INFO TaskSchedulerImpl: Cancelling stage 2
15/10/16 17:42:41 INFO DAGScheduler: ResultStage 2 (collect at
repro.scala:53) failed in 0.016 s
15/10/16 17:42:41 INFO DAGScheduler: Job 2 failed: collect at
repro.scala:53, took 0.024584 s


I have seriously screwed up somewhere or this is a change in behavior that
I have not been able to find in the documentation. For those that are
interested, a full repro and logs follow.

Hurshal

---

I am running this on Spark 1.5.1+Hadoop 2.6. I have tried this in various
combinations of
 * local/standalone mode
 * putting mysql on the classpath with --jars/building a fat jar with mysql
in it/manually running sc.addJar on the mysql jar
 * --deploy-mode client/--deploy-mode cluster
but nothing seems to change.



Here is an example invocation, and the accompanying source code:

$ ./bin/spark-submit --master local --deploy-mode client --class
repro.Repro /home/nix/repro/target/scala-2.10/repro-assembly-0.0.1.jar
Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties
15/10/16 17:40:53 INFO SparkContext: Running Spark version 1.5.1
15/10/16 17:40:53 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
15/10/16 17:40:53 WARN Utils: Your hostname, choochootrain resolves to a
loopback address: 127.0.1.1; using 10.0.1.97 instead (on interface wlan0)
15/10/16 17:40:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
another address
15/10/16 17:40:53 INFO SecurityManager: Changing view acls to: root
15/10/16 17:40:53 INFO SecurityManager: Changing modify acls to: root
15/10/16 17:40:53 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(root); users
with modify permissions: Set(root)
15/10/16 17:40:54 INFO Slf4jLogger: Slf4jLogger started
15/10/16 17:40:54 INFO Remoting: Starting remoting
15/10/16 17:40:54 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkDriver@10.0.1.97:48116]
15/10/16 17:40:54 INFO Utils: Successfully started service 'sparkDriver' on
port 48116.
15/10/16 17:40:54 INFO SparkEnv: Registering MapOutputTracker
15/10/16 17:40:54 INFO SparkEnv: Registering BlockManagerMaster
15/10/16 17:40:54 INFO DiskBlockManager: Created local directory at
/tmp/blockmgr-7e7cf2b0-397e-4c44-97e9-508f5c6ec5ab
15/10/16 17:40:54 INFO MemoryStore: MemoryStore started with capacity 530.3
MB
15/10/16 17:40:54 INFO HttpFileServer: HTTP File server 

Location preferences in pyspark?

2015-10-16 Thread Philip Weaver
I believe what I want is the exact functionality provided by
SparkContext.makeRDD in Scala. For each element in the RDD, I want specify
a list of preferred hosts for processing that element.

It looks like this method only exists in Scala, and as far as I can tell
there is no similar functionality available in python. Is this true?

- Philip


How to have Single refernce of a class in Spark Streaming?

2015-10-16 Thread swetha
Hi,

How to have a single reference of a class across all the executors in Spark
Streaming? The contents of the class will be updated at all the executors. 
Would using it as a variable inside updateStateByKey guarantee that
reference is updated across all the  executors and no
concurrentModificationException? Following is how I am trying to use a
Tracker Class across all the JVMs.

val trackerClass = new TrackerClass();


val runningCounts = pairs.updateStateByKey[Int](updateFunction _)

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]):
Option[Int] = {
getMergedSession(this.trackerClass)
Some(newCount)
}



Thanks,
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-have-Single-refernce-of-a-class-in-Spark-Streaming-tp25103.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to put an object in cache for ever in Streaming

2015-10-16 Thread swetha kasireddy
What about cleaning up the tempData that gets generated by shuffles. We
have a lot of temp data that gets generated by shuffles in /tmp folder.
That's why we are using ttl. Also if I keep an RDD in cache is it available
across all the executors or just the same executor?

On Fri, Oct 16, 2015 at 5:49 PM, Tathagata Das  wrote:

> Setting a ttl is not recommended any more as Spark works with Java GC to
> clean up stuff (RDDs, shuffles, broadcasts,etc.) that are not in reference
> any more.
>
> So you can keep an RDD cached in Spark, and every minute uncache the
> previous one, and cache a new one.
>
> TD
>
> On Fri, Oct 16, 2015 at 12:02 PM, swetha 
> wrote:
>
>> Hi,
>>
>> How to put a changing object in Cache for ever in Streaming. I know that
>> we
>> can do rdd.cache but I think .cache would be cleaned up if we set ttl in
>> Streaming. Our requirement is to have an object in memory. The object
>> would
>> be updated every minute  based on the records that we get in our Streaming
>> job.
>>
>>  Currently I am keeping that in updateStateByKey. But, my updateStateByKey
>> is tracking the realtime Session information as well. So, my
>> updateStateByKey has 4 arguments that tracks session information and  this
>> object  that tracks the performance info separately. I was thinking it may
>> be too much to keep so much data in updateStateByKey.
>>
>> Is it recommended to hold a lot of data using updateStateByKey?
>>
>>
>> Thanks,
>> Swetha
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-put-an-object-in-cache-for-ever-in-Streaming-tp25098.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Multiple joins in Spark

2015-10-16 Thread Xiao Li
Hi, Shyam,

You still can use SQL to do the same thing in Spark:

For example,

val df1 = sqlContext.createDataFrame(rdd)
val df2 = sqlContext.createDataFrame(rdd2)
val df3 = sqlContext.createDataFrame(rdd3)
df1.registerTempTable("tab1")
df2.registerTempTable("tab2")
df3.registerTempTable("tab3")
val exampleSQL = sqlContext.sql("select * from tab1, tab2, tab3 where
tab1.name = tab2.name and tab2.id = tab3.id")

Good luck,

Xiao Li

2015-10-16 17:01 GMT-07:00 Shyam Parimal Katti :

> Hello All,
>
> I have a following SQL query like this:
>
> select a.a_id, b.b_id, c.c_id from table_a a join table_b b on a.a_id =
> b.a_id join table_c c on b.b_id = c.b_id
>
> In scala i have done this so far:
>
> table_a_rdd = sc.textFile(...)
> table_b_rdd = sc.textFile(...)
> table_c_rdd = sc.textFile(...)
>
> val table_a_rowRDD = table_a_rdd.map(_.split("\\x07")).map(line =>
> (line(0), line))
> val table_b_rowRDD = table_a_rdd.map(_.split("\\x07")).map(line =>
> (line(0), line))
> val table_c_rowRDD = table_a_rdd.map(_.split("\\x07")).map(line =>
> (line(0), line))
>
> Each line has the first value at its primary key.
>
> While I can join 2 RDDs using table_a_rowRDD.join(table_b_rowRDD) to join,
> is it possible to join multiple RDDs in a single expression? like
> table_a_rowRDD.join(table_b_rowRDD).join(table_c_rowRDD) ? Also, how can I
> specify the column on which I can join multiple RDDs?
>
>
>
>
>


Re: How to put an object in cache for ever in Streaming

2015-10-16 Thread Tathagata Das
Setting a ttl is not recommended any more as Spark works with Java GC to
clean up stuff (RDDs, shuffles, broadcasts,etc.) that are not in reference
any more.

So you can keep an RDD cached in Spark, and every minute uncache the
previous one, and cache a new one.

TD

On Fri, Oct 16, 2015 at 12:02 PM, swetha  wrote:

> Hi,
>
> How to put a changing object in Cache for ever in Streaming. I know that we
> can do rdd.cache but I think .cache would be cleaned up if we set ttl in
> Streaming. Our requirement is to have an object in memory. The object would
> be updated every minute  based on the records that we get in our Streaming
> job.
>
>  Currently I am keeping that in updateStateByKey. But, my updateStateByKey
> is tracking the realtime Session information as well. So, my
> updateStateByKey has 4 arguments that tracks session information and  this
> object  that tracks the performance info separately. I was thinking it may
> be too much to keep so much data in updateStateByKey.
>
> Is it recommended to hold a lot of data using updateStateByKey?
>
>
> Thanks,
> Swetha
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-put-an-object-in-cache-for-ever-in-Streaming-tp25098.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Problem of RDD in calculation

2015-10-16 Thread Xiao Li
For most programmers, dataFrames are preferred thanks to the flexibility,
but using sql syntax is a great option for users who feel more comfortable
using SQL. : )

2015-10-16 18:22 GMT-07:00 Ali Tajeldin EDU :

> Since DF2 only has the userID, I'm assuming you are musing DF2 to filter
> for desired userIDs.
> You can just use the join() and groupBy operations on DataFrame to do what
> you desire.  For example:
>
> scala> val df1=app.createDF("id:String; v:Integer", "X,1;X,2;Y,3;Y,4;Z,10")
> df1: org.apache.spark.sql.DataFrame = [id: string, v: int]
>
> scala> df1.show
> +---+---+
> | id|  v|
> +---+---+
> |  X|  1|
> |  X|  2|
> |  Y|  3|
> |  Y|  4|
> |  Z| 10|
> +---+---+
>
> scala> val df2=app.createDF("id:String", "X;Y")
> df2: org.apache.spark.sql.DataFrame = [id: string]
>
> scala> df2.show
> +---+
> | id|
> +---+
> |  X|
> |  Y|
> +---+
>
> scala> df1.join(df2, "id").groupBy("id").agg(avg("v") as "avg_v", min("v")
> as "min_v").show
> +---+-+-+
> | id|avg_v|min_v|
> +---+-+-+
> |  X|  1.5|1|
> |  Y|  3.5|3|
> |---+-+-+
>
>
> Notes:
> * The above uses createDF method in SmvApp from SMV package, but the rest
> of the code is just standard Spark DataFrame ops.
> * One advantage of doing this using DataFrame rather than SQL is that you
> can build the expressions programmatically (e.g. imagine doing this for 100
> columns instead of 2).
>
> ---
> Ali
>
>
> On Oct 16, 2015, at 12:10 PM, ChengBo  wrote:
>
> Hi all,
>
> I am new in Spark, and I have a question in dealing with RDD.
> I’ve converted RDD to DataFrame. So there are two DF: DF1 and DF2
> DF1 contains: userID, time, dataUsage, duration
> DF2 contains: userID
>
> Each userID has multiple rows in DF1.
> DF2 has distinct userID, and I would like to compute the average, max and
> min value of both dataUsage and duration for each userID in DF1?
> And store the results in a new dataframe.
> How can I do that?
> Thanks a lot.
>
> Best
> Frank
>
>
>


Re: Multiple joins in Spark

2015-10-16 Thread Xiao Li
Hi, Shyam,

The method registerTempTable is to register a [DataFrame as a temporary
table in the Catalog using the given table name.

In the Catalog, Spark maintains a concurrent hashmap, which contains the
pair of the table names and the logical plan.

For example, when we submit the following query,

SELECT * FROM inMemoryDF

The concurrent hashmap contains one map from name to the Logical Plan:

"inMemoryDF" -> "LogicalRDD [c1#0,c2#1,c3#2,c4#3], MapPartitionsRDD[1] at
createDataFrame at SimpleApp.scala:42

Therefore, using SQL will not hurt your performance. The actual physical
plan to execute your SQL query is generated by the result of Catalyst
optimizer.

Good luck,

Xiao Li



2015-10-16 20:53 GMT-07:00 Shyam Parimal Katti :

> Thanks Xiao! Question about the internals, would you know what happens
> when createTempTable() is called? I. E.  Does it create an RDD internally
> or some internal representation that lets it join with  an RDD?
>
> Again, thanks for the answer.
> On Oct 16, 2015 8:15 PM, "Xiao Li"  wrote:
>
>> Hi, Shyam,
>>
>> You still can use SQL to do the same thing in Spark:
>>
>> For example,
>>
>> val df1 = sqlContext.createDataFrame(rdd)
>> val df2 = sqlContext.createDataFrame(rdd2)
>> val df3 = sqlContext.createDataFrame(rdd3)
>> df1.registerTempTable("tab1")
>> df2.registerTempTable("tab2")
>> df3.registerTempTable("tab3")
>> val exampleSQL = sqlContext.sql("select * from tab1, tab2, tab3 where
>> tab1.name = tab2.name and tab2.id = tab3.id")
>>
>> Good luck,
>>
>> Xiao Li
>>
>> 2015-10-16 17:01 GMT-07:00 Shyam Parimal Katti :
>>
>>> Hello All,
>>>
>>> I have a following SQL query like this:
>>>
>>> select a.a_id, b.b_id, c.c_id from table_a a join table_b b on a.a_id =
>>> b.a_id join table_c c on b.b_id = c.b_id
>>>
>>> In scala i have done this so far:
>>>
>>> table_a_rdd = sc.textFile(...)
>>> table_b_rdd = sc.textFile(...)
>>> table_c_rdd = sc.textFile(...)
>>>
>>> val table_a_rowRDD = table_a_rdd.map(_.split("\\x07")).map(line =>
>>> (line(0), line))
>>> val table_b_rowRDD = table_a_rdd.map(_.split("\\x07")).map(line =>
>>> (line(0), line))
>>> val table_c_rowRDD = table_a_rdd.map(_.split("\\x07")).map(line =>
>>> (line(0), line))
>>>
>>> Each line has the first value at its primary key.
>>>
>>> While I can join 2 RDDs using table_a_rowRDD.join(table_b_rowRDD) to
>>> join, is it possible to join multiple RDDs in a single expression? like
>>> table_a_rowRDD.join(table_b_rowRDD).join(table_c_rowRDD) ? Also, how can I
>>> specify the column on which I can join multiple RDDs?
>>>
>>>
>>>
>>>
>>>
>>


Re: Problem of RDD in calculation

2015-10-16 Thread Ali Tajeldin EDU
Since DF2 only has the userID, I'm assuming you are musing DF2 to filter for 
desired userIDs.
You can just use the join() and groupBy operations on DataFrame to do what you 
desire.  For example:

scala> val df1=app.createDF("id:String; v:Integer", "X,1;X,2;Y,3;Y,4;Z,10")
df1: org.apache.spark.sql.DataFrame = [id: string, v: int]

scala> df1.show
+---+---+
| id|  v|
+---+---+
|  X|  1|
|  X|  2|
|  Y|  3|
|  Y|  4|
|  Z| 10|
+---+---+

scala> val df2=app.createDF("id:String", "X;Y")
df2: org.apache.spark.sql.DataFrame = [id: string]

scala> df2.show
+---+
| id|
+---+
|  X|
|  Y|
+---+

scala> df1.join(df2, "id").groupBy("id").agg(avg("v") as "avg_v", min("v") as 
"min_v").show
+---+-+-+
| id|avg_v|min_v|
+---+-+-+
|  X|  1.5|1|
|  Y|  3.5|3|
|---+-+-+


Notes:
* The above uses createDF method in SmvApp from SMV package, but the rest of 
the code is just standard Spark DataFrame ops.
* One advantage of doing this using DataFrame rather than SQL is that you can 
build the expressions programmatically (e.g. imagine doing this for 100 columns 
instead of 2).

---
Ali


On Oct 16, 2015, at 12:10 PM, ChengBo  wrote:

> Hi all,
>  
> I am new in Spark, and I have a question in dealing with RDD.
> I’ve converted RDD to DataFrame. So there are two DF: DF1 and DF2
> DF1 contains: userID, time, dataUsage, duration
> DF2 contains: userID
>  
> Each userID has multiple rows in DF1.
> DF2 has distinct userID, and I would like to compute the average, max and min 
> value of both dataUsage and duration for each userID in DF1?
> And store the results in a new dataframe.
> How can I do that?
> Thanks a lot.
>  
> Best
> Frank



Re: Spark on Mesos / Executor Memory

2015-10-16 Thread Bharath Ravi Kumar
Can someone respond if you're aware of the reason for such a memory
footprint? It seems unintuitive and hard to reason about.

Thanks,
Bharath

On Thu, Oct 15, 2015 at 12:29 PM, Bharath Ravi Kumar 
wrote:

> Resending since user@mesos bounced earlier. My apologies.
>
> On Thu, Oct 15, 2015 at 12:19 PM, Bharath Ravi Kumar 
> wrote:
>
>> (Reviving this thread since I ran into similar issues...)
>>
>> I'm running two spark jobs (in mesos fine grained mode), each belonging
>> to a different mesos role, say low and high. The low:high mesos weights are
>> 1:10. On expected lines, I see that the low priority job occupies cluster
>> resources to the maximum extent when running alone. However, when the high
>> priority job is submitted, it does not start and continues to await cluster
>> resources (as seen in the logs). Since the jobs run in fine grained mode
>> and the low priority tasks begin to finish, the high priority job should
>> ideally be able to start and gradually take over cluster resources as per
>> the weights. However, I noticed that while the "low" job gives up CPU cores
>> with each completing task (e.g. reduction from 72 -> 12 with default
>> parallelism set to 72), the memory resources are held on (~500G out of
>> 768G). The spark.executor.memory setting appears to directly impact the
>> amount of memory that the job holds on to. In this case, it was set to 200G
>> in the low priority task and 100G in the high priority task. The nature of
>> these jobs is such that setting the numbers to smaller values (say 32g)
>> resulted in job failures with outofmemoryerror.  It appears that the spark
>> framework is retaining memory (across tasks)  proportional to
>> spark.executor.memory for the duration of the job and not releasing memory
>> as tasks complete. This defeats the purpose of fine grained mode execution
>> as the memory occupancy is preventing the high priority job from accepting
>> the prioritized cpu offers and beginning execution. Can this be explained /
>> documented better please?
>>
>> Thanks,
>> Bharath
>>
>> On Sat, Apr 11, 2015 at 10:59 PM, Tim Chen  wrote:
>>
>>> (Adding spark user list)
>>>
>>> Hi Tom,
>>>
>>> If I understand correctly you're saying that you're running into memory
>>> problems because the scheduler is allocating too much CPUs and not enough
>>> memory to acoomodate them right?
>>>
>>> In the case of fine grain mode I don't think that's a problem since we
>>> have a fixed amount of CPU and memory per task.
>>> However, in coarse grain you can run into that problem if you're with in
>>> the spark.cores.max limit, and memory is a fixed number.
>>>
>>> I have a patch out to configure how much max cpus should coarse grain
>>> executor use, and it also allows multiple executors in coarse grain mode.
>>> So you could say try to launch multiples of max 4 cores with
>>> spark.executor.memory (+ overhead and etc) in a slave. (
>>> https://github.com/apache/spark/pull/4027)
>>>
>>> It also might be interesting to include a cores to memory multiplier so
>>> that with a larger amount of cores we try to scale the memory with some
>>> factor, but I'm not entirely sure that's intuitive to use and what people
>>> know what to set it to, as that can likely change with different workload.
>>>
>>> Tim
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Sat, Apr 11, 2015 at 9:51 AM, Tom Arnfeld  wrote:
>>>
 We're running Spark 1.3.0 (with a couple of patches over the top for
 docker related bits).

 I don't think SPARK-4158 is related to what we're seeing, things do run
 fine on the cluster, given a ridiculously large executor memory
 configuration. As for SPARK-3535 although that looks useful I think we'e
 seeing something else.

 Put a different way, the amount of memory required at any given time by
 the spark JVM process is directly proportional to the amount of CPU it has,
 because more CPU means more tasks and more tasks means more memory. Even if
 we're using coarse mode, the amount of executor memory should be
 proportionate to the amount of CPUs in the offer.

 On 11 April 2015 at 17:39, Brenden Matthews 
 wrote:

> I ran into some issues with it a while ago, and submitted a couple PRs
> to fix it:
>
> https://github.com/apache/spark/pull/2401
> https://github.com/apache/spark/pull/3024
>
> Do these look relevant? What version of Spark are you running?
>
> On Sat, Apr 11, 2015 at 9:33 AM, Tom Arnfeld  wrote:
>
>> Hey,
>>
>> Not sure whether it's best to ask this on the spark mailing list or
>> the mesos one, so I'll try here first :-)
>>
>> I'm having a bit of trouble with out of memory errors in my spark
>> jobs... it seems fairly odd to me that memory resources can only be set 
>> at
>> the executor level, and not also at the task 

Re: How to speed up reading from file?

2015-10-16 Thread Xiao Li
Hi, Saif,

The optimal number of rows per partition depends on many factors, right?
for example, your row size, your file system configuration, your
replication configuration and the performance of your underlying hardware.
The best way is to do the performance testing and tuning your
configurations. Generally, if each batch contains just a few MB, the
performance is bad compared with a bigger batch.

Check the following paper regarding the performance of Spark and MR,
http://www.vldb.org/pvldb/vol8/p2110-shi.pdf. It might help you understand
your use case. For example, caching can be used in your system.

Good luck,

Xiao Li

2015-10-16 14:08 GMT-07:00 :

> Hello,
>
> Is there an optimal number of partitions per number of rows, when writing
> into disk, so we can re-read later from source in a distributed way?
> Any  thoughts?
>
> Thanks
> Saif
>
>