NoSuchMethodError : org.apache.spark.streaming.scheduler.StreamingListenerBus.start()V

2015-08-04 Thread Deepesh Maheshwari
Hi,

I am trying to read data from kafka and process it using spark.
i have attached my source code , error log.

For integrating kafka,
i have added dependency in pom.xml


org.apache.spark
spark-streaming_2.10
1.3.0
 

 
org.apache.spark
spark-streaming-kafka_2.10
1.3.0
 

i have attached  full error log.please check it why it is giving the error
. this class exits in my class path.
I am running spark and kafka locally using java class.

SparkConf conf = new SparkConf().setAppName("Spark Demo").setMaster(
"local[2]").set("spark.executor.memory", "1g");

I

[image: Inline image 2]


spark-error.log
Description: Binary data


kafka-spark.java
Description: Binary data

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Unable to load native-hadoop library for your platform

2015-08-04 Thread Deepesh Maheshwari
Hi,

When i run the spark locally on windows it gives below hadoop library error.
I am using below spark version.


org.apache.spark
spark-core_2.10
1.4.1



2015-08-04 12:22:23,463  WARN (org.apache.hadoop.util.NativeCodeLoader:62)
- Unable to load native-hadoop library for your platform... using
builtin-java classes where applicable

Tried to find it on internet but not able to find exact root cause.
Please let me know what is it, why it is giving warning and how can i
resolve it.

Thanks,
Deepesh


Re: large scheduler delay in pyspark

2015-08-04 Thread Davies Liu
On Mon, Aug 3, 2015 at 9:00 AM, gen tang  wrote:
> Hi,
>
> Recently, I met some problems about scheduler delay in pyspark. I worked
> several days on this problem, but not success. Therefore, I come to here to
> ask for help.
>
> I have a key_value pair rdd like rdd[(key, list[dict])] and I tried to merge
> value by "adding" two list
>
> if I do reduceByKey as follows:
>rdd.reduceByKey(lambda a, b: a+b)
> It works fine, scheduler delay is less than 10s. However if I do
> reduceByKey:
>def f(a, b):
>for i in b:
> if i not in a:
>a.append(i)
>return a
>   rdd.reduceByKey(f)

Is it possible that you have large object that is also named `i` or `a` or `b`?

Btw, the second one could be slow than first one, because you try to lookup
a object in a list, that is O(N), especially when the object is large (dict).

> It will cause very large scheduler delay, about 15-20 mins.(The data I deal
> with is about 300 mb, and I use 5 machine with 32GB memory)

If you see scheduler delay, it means there may be a large broadcast involved.

> I know the second code is not the same as the first. In fact, my purpose is
> to implement the second, but not work. So I try the first one.
> I don't know whether this is related to the data(with long string) or Spark
> on Yarn. But the first code works fine on the same data.
>
> Is there any way to find out the log when spark stall in scheduler delay,
> please? Or any ideas about this problem?
>
> Thanks a lot in advance for your help.
>
> Cheers
> Gen
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Unable to load native-hadoop library for your platform

2015-08-04 Thread Sean Owen
You can ignore it entirely. It just means you haven't installed and
configured native libraries for things like accelerated compression,
but it has no negative impact otherwise.

On Tue, Aug 4, 2015 at 8:11 AM, Deepesh Maheshwari
 wrote:
> Hi,
>
> When i run the spark locally on windows it gives below hadoop library error.
> I am using below spark version.
>
> 
> org.apache.spark
> spark-core_2.10
> 1.4.1
> 
>
>
> 2015-08-04 12:22:23,463  WARN (org.apache.hadoop.util.NativeCodeLoader:62) -
> Unable to load native-hadoop library for your platform... using builtin-java
> classes where applicable
>
> Tried to find it on internet but not able to find exact root cause.
> Please let me know what is it, why it is giving warning and how can i
> resolve it.
>
> Thanks,
> Deepesh

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Unable to load native-hadoop library for your platform

2015-08-04 Thread Deepesh Maheshwari
Can you elaborate about the things this native library covering.
One you mentioned accelerated compression.

It would be very helpful if you can give any useful to link to read more
about it.

On Tue, Aug 4, 2015 at 12:56 PM, Sean Owen  wrote:

> You can ignore it entirely. It just means you haven't installed and
> configured native libraries for things like accelerated compression,
> but it has no negative impact otherwise.
>
> On Tue, Aug 4, 2015 at 8:11 AM, Deepesh Maheshwari
>  wrote:
> > Hi,
> >
> > When i run the spark locally on windows it gives below hadoop library
> error.
> > I am using below spark version.
> >
> > 
> > org.apache.spark
> > spark-core_2.10
> > 1.4.1
> > 
> >
> >
> > 2015-08-04 12:22:23,463  WARN
> (org.apache.hadoop.util.NativeCodeLoader:62) -
> > Unable to load native-hadoop library for your platform... using
> builtin-java
> > classes where applicable
> >
> > Tried to find it on internet but not able to find exact root cause.
> > Please let me know what is it, why it is giving warning and how can i
> > resolve it.
> >
> > Thanks,
> > Deepesh
>


Re: Unable to load native-hadoop library for your platform

2015-08-04 Thread Sean Owen
It won't affect you if you're not actually running Hadoop. But it's
mainly things like Snappy/LZO compression which are implemented as
native libraries under the hood. Spark doesn't necessarily use these
anyway; it's from the Hadoop libs.

On Tue, Aug 4, 2015 at 8:30 AM, Deepesh Maheshwari
 wrote:
> Can you elaborate about the things this native library covering.
> One you mentioned accelerated compression.
>
> It would be very helpful if you can give any useful to link to read more
> about it.
>
> On Tue, Aug 4, 2015 at 12:56 PM, Sean Owen  wrote:
>>
>> You can ignore it entirely. It just means you haven't installed and
>> configured native libraries for things like accelerated compression,
>> but it has no negative impact otherwise.
>>
>> On Tue, Aug 4, 2015 at 8:11 AM, Deepesh Maheshwari
>>  wrote:
>> > Hi,
>> >
>> > When i run the spark locally on windows it gives below hadoop library
>> > error.
>> > I am using below spark version.
>> >
>> > 
>> > org.apache.spark
>> > spark-core_2.10
>> > 1.4.1
>> > 
>> >
>> >
>> > 2015-08-04 12:22:23,463  WARN
>> > (org.apache.hadoop.util.NativeCodeLoader:62) -
>> > Unable to load native-hadoop library for your platform... using
>> > builtin-java
>> > classes where applicable
>> >
>> > Tried to find it on internet but not able to find exact root cause.
>> > Please let me know what is it, why it is giving warning and how can i
>> > resolve it.
>> >
>> > Thanks,
>> > Deepesh
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Unable to load native-hadoop library for your platform

2015-08-04 Thread Deepesh Maheshwari
Hi Sean,

Thanks for the information and clarification.


On Tue, Aug 4, 2015 at 1:04 PM, Sean Owen  wrote:

> It won't affect you if you're not actually running Hadoop. But it's
> mainly things like Snappy/LZO compression which are implemented as
> native libraries under the hood. Spark doesn't necessarily use these
> anyway; it's from the Hadoop libs.
>
> On Tue, Aug 4, 2015 at 8:30 AM, Deepesh Maheshwari
>  wrote:
> > Can you elaborate about the things this native library covering.
> > One you mentioned accelerated compression.
> >
> > It would be very helpful if you can give any useful to link to read more
> > about it.
> >
> > On Tue, Aug 4, 2015 at 12:56 PM, Sean Owen  wrote:
> >>
> >> You can ignore it entirely. It just means you haven't installed and
> >> configured native libraries for things like accelerated compression,
> >> but it has no negative impact otherwise.
> >>
> >> On Tue, Aug 4, 2015 at 8:11 AM, Deepesh Maheshwari
> >>  wrote:
> >> > Hi,
> >> >
> >> > When i run the spark locally on windows it gives below hadoop library
> >> > error.
> >> > I am using below spark version.
> >> >
> >> > 
> >> > org.apache.spark
> >> > spark-core_2.10
> >> > 1.4.1
> >> > 
> >> >
> >> >
> >> > 2015-08-04 12:22:23,463  WARN
> >> > (org.apache.hadoop.util.NativeCodeLoader:62) -
> >> > Unable to load native-hadoop library for your platform... using
> >> > builtin-java
> >> > classes where applicable
> >> >
> >> > Tried to find it on internet but not able to find exact root cause.
> >> > Please let me know what is it, why it is giving warning and how can i
> >> > resolve it.
> >> >
> >> > Thanks,
> >> > Deepesh
> >
> >
>


Twitter live Streaming

2015-08-04 Thread Sadaf
Hi
Is there any way to get all old tweets since when the account was created
using spark streaming and twitters api? Currently my connector is showing
those tweets that get posted after the program runs. I've done this task
using spark streaming and a custom receiver using "twitter user api".

Thanks in anticipation.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Twitter-live-Streaming-tp24124.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Twitter live Streaming

2015-08-04 Thread Enno Shioji
If you want to do it through streaming API you have to pay Gnip; it's not free. 
You can go through non-streaming Twitter API and convert it to stream yourself 
though.



> On 4 Aug 2015, at 09:29, Sadaf  wrote:
> 
> Hi
> Is there any way to get all old tweets since when the account was created
> using spark streaming and twitters api? Currently my connector is showing
> those tweets that get posted after the program runs. I've done this task
> using spark streaming and a custom receiver using "twitter user api".
> 
> Thanks in anticipation.
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Twitter-live-Streaming-tp24124.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> Q

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How do I Process Streams that span multiple lines?

2015-08-04 Thread Akhil Das
If you are using Kafka, then you can basically push an entire file as a
message to Kafka. In that case in your DStream, you will receive the single
message which is the contents of the file and it can of course span
multiple lines.

Thanks
Best Regards

On Mon, Aug 3, 2015 at 8:27 PM, Spark Enthusiast 
wrote:

> All  examples of Spark Stream programming that I see assume streams of
> lines that are then tokenised and acted upon (like the WordCount example).
>
> How do I process Streams that span multiple lines? Are there examples that
> I can use?
>


Re: Twitter Connector-Spark Streaming

2015-08-04 Thread Akhil Das
You will have to write your own consumer for pulling your custom feeds, and
then you can do a union (customfeedDstream.union(twitterStream)) with the
twitter stream api.

Thanks
Best Regards

On Tue, Aug 4, 2015 at 2:28 PM, Sadaf Khan  wrote:

> Thanks alot :)
>
> One more thing that i want to ask is that i have used twitters streaming
> api.and it seems that the above solution uses rest api. how can i used both
> simultaneously ?
>
> Any response will be much appreciated :)
> Regards
>
> On Tue, Aug 4, 2015 at 1:51 PM, Akhil Das 
> wrote:
>
>> Yes you can, when you start the application in the first batch you just
>> need to pull all the tweets from your account. You need to look into the
>> API for that. Have a look at this
>> https://dev.twitter.com/rest/reference/get/statuses/user_timeline
>>
>> Thanks
>> Best Regards
>>
>> On Tue, Aug 4, 2015 at 1:47 PM, Sadaf Khan  wrote:
>>
>>> Hi.
>>> You were really helpful for me last time :) and i have done with the
>>> last problem.
>>> I wanna ask you one more question. Now my connector is showing the
>>> tweets that occurs after running the program. Is there any way to fetch all
>>> old tweets since when the account was created?
>>>
>>> I will be thankful to you for you kind response.
>>>
>>> On Thu, Jul 30, 2015 at 6:49 PM, Sadaf Khan 
>>> wrote:
>>>
 thanks alot for this help :)

 On Thu, Jul 30, 2015 at 6:41 PM, Akhil Das 
 wrote:

> You can create a custom receiver and then inside it you can write
> yourown piece of code to receive data, filter them etc before giving it to
> spark.
>
> Thanks
> Best Regards
>
> On Thu, Jul 30, 2015 at 6:49 PM, Sadaf Khan 
> wrote:
>
>> okay :)
>>
>> then is there anyway to fetch the tweets specific to my account?
>>
>> Thanks in anticipation :)
>>
>> On Thu, Jul 30, 2015 at 6:17 PM, Akhil Das <
>> ak...@sigmoidanalytics.com> wrote:
>>
>>> Owh, this one fetches the public tweets, not the one specific to
>>> your account.
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Thu, Jul 30, 2015 at 6:11 PM, Sadaf Khan 
>>> wrote:
>>>
 yes. but can you please tell me how to mention a specific user
 account in filter?
 I want to fetch my tweets, tweets of my followers and the tweets of
 those  whom i followed.
 So in short i want to fatch the tweets of my account only.

 Recently i have used
val tweets
 =TwitterUtils.createStream(ssc,atwitter,Nil,StorageLevel.MEMORY_AND_DISK_2)


 Any response will be very much appreciated. :)

 Thanks.


 On Thu, Jul 30, 2015 at 5:20 PM, Akhil Das <
 ak...@sigmoidanalytics.com> wrote:

> TwitterUtils.createStream takes a 3rd argument which is a filter,
> once you provide these, it will only fetch tweets of such.
>
> Thanks
> Best Regards
>
> On Thu, Jul 30, 2015 at 4:19 PM, Sadaf 
> wrote:
>
>> Hi.
>> I am writing twitter connector using spark streaming. but it
>> fetched the
>> random tweets.
>> Is there any way to receive the tweets of a particular account?
>>
>> I made an app on twitter and used the credentials as given below.
>>
>>  def managingCredentials(): Option[twitter4j.auth.Authorization]=
>>   {
>>   object auth{
>>   val config = new twitter4j.conf.ConfigurationBuilder()
>> .setOAuthConsumerKey("")
>> .setOAuthConsumerSecret("")
>> .setOAuthAccessToken("")
>> .setOAuthAccessTokenSecret("")
>> .build
>> }
>> val twitter_auth = new TwitterFactory(auth.config)
>> val a = new twitter4j.auth.OAuthAuthorization(auth.config)
>> val atwitter : Option[twitter4j.auth.Authorization] =
>> Some(twitter_auth.getInstance(a).getAuthorization())
>>  atwitter
>>  }
>>
>> Thanks :)
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Twitter-Connector-Spark-Streaming-tp24078.html
>> Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>

>>>
>>
>

>>>
>>
>


Re: Writing to HDFS

2015-08-04 Thread Akhil Das
Just to add rdd.take(1) won't trigger the entire computation, it will just
pull out the first record. You need to do a rdd.count() or rdd.saveAs*Files
to trigger the complete pipeline. How many partitions do you see in the
last stage?

Thanks
Best Regards

On Tue, Aug 4, 2015 at 7:10 AM, ayan guha  wrote:

> Is your data skewed? What happens if you do rdd.count()?
> On 4 Aug 2015 05:49, "Jasleen Kaur"  wrote:
>
>> I am executing a spark job on a cluster as a yarn-client(Yarn cluster not
>> an option due to permission issues).
>>
>>- num-executors 800
>>- spark.akka.frameSize=1024
>>- spark.default.parallelism=25600
>>- driver-memory=4G
>>- executor-memory=32G.
>>- My input size is around 1.5TB.
>>
>> My problem is when I execute rdd.saveAsTextFile(outputPath,
>> classOf[org.apache.hadoop.io.compress.SnappyCodec])(Saving as avro also not
>> an option, I have tried saveAsSequenceFile with GZIP,
>> saveAsNewAPIHadoopFile with same result), I get heap space issue. On the
>> other hand if I execute rdd.take(1). I get no such issue. So I am assuming
>> that issue is due to write.
>>
>


Transform MongoDB Aggregation into Spark Job

2015-08-04 Thread Deepesh Maheshwari
Hi,
I am new to Apache Spark and exploring spark+kafka intergration to process
data using spark which i did earlier in MongoDB Aggregation.

I am not able to figure out to handle my use case.

Mongo Document :
{
"_id" : ObjectId("55bfb3285e90ecbfe37b25c3"),
"url" : "
http://www.z.com/new_car_search.php?bycity=Mumbai&sortfield=price&sortdirection=desc
",
"ip" : "27.5.107.65",
"pgDownloadTime" : NumberLong(2526),
"agentType" : "Mozilla/5.0 (Linux; Android 5.1.1; Nexus 5 Build/LMY48B)
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/43.0.2357.93 Mobile
Safari/537.36",
"referer" : "z.com",
"domain" : "z.com",
"channel" : "z",
"host" : "z.com",
"pgAccessTime" : NumberLong("1438626600021"),
"pgAccessMin" : NumberLong(1438626600),
"pgAccessHour" : NumberLong(1438626600),
"p5Min" : NumberLong(1438626600),
"contentType" : 1,
"articleId" : "4b1ad5357a6987bbc611ff92dcf9cb50",
"location" : 1,
"action" : 1,
"cat" : "Home",
"subcat" : [
""
],
"tags" : [
""
],
"catIds" : [
"Home"
],
"catIdHash" : NumberLong("7115745069349994427"),
"isIndia" : 1,
"geoLocation" : "Mumbai",
"publishTime" : NumberLong(0),
"author" : "",
"pagePosition" : "",
"group" : 0,
"ssoId" : null,
"isAPP" : 0,
"sessionId" : "17a95722-5a48-459f-afd8-78f7edb84897"
}

I am putting data in kafka in above json format.
Now, when i am reading it in  spark and i need group above document using
two keys and get the total count of that key and put it in mongo along with
the whole document details.

Mongo Aggregation Job :

{
"$match": {
"pgAccessMin": {
"$gte": 1438679100,
"$lt": 1438679400
}
}
},
{
"$project": {
"articleId": 1,
"host": 1,
"isAPP": 1,
"cat": 1,
"subcat": 1,
"publishTime": 1,
"channel": 1,
"author": 1,
"tags": 1,
"url": 1,
"catIds": 1,
"catIdHash": 1,
"count": 1,
"contentType": 1,
"_id": 0
}
},
{
"$group": {
"_id": {
"host": "$host",
"isAPP": "$isAPP",
"articleId": "$articleId"
},
"count": {
"$sum": 1
},
"url": {
"$first": "$url"
},
"subcat": {
"$first": "$subcat"
},
"cat": {
"$first": "$cat"
},
"publishTime": {
"$first": "$publishTime"
},
"channel": {
"$first": "$channel"
},
"author": {
"$first": "$author"
},
"tags": {
"$first": "$tags"
},
"catIdHash": {
"$first": "$catIdHash"
},
"catIds": {
"$first": "$catIds"
},
"contentType": {
"$first": "$contentType"
}
}
}

Please suggest how to write this equivalent job in  spark so that i can get
the view count along with other fields and save it in mongo.

Regards,
Deepesh


Re?? About memory leak in spark 1.4.1

2015-08-04 Thread Sea
How much machines are there in your standalone cluster?

I am not using tachyon.


GC can not help me... Can anyone help ?


my configuration:


spark.deploy.spreadOut false
spark.eventLog.enabled true
spark.executor.cores 24


spark.ui.retainedJobs 10
spark.ui.retainedStages 10
spark.history.retainedApplications 5
spark.deploy.retainedApplications 10
spark.deploy.retainedDrivers  10
spark.streaming.ui.retainedBatches 10
spark.sql.thriftserver.ui.retainedSessions 10
spark.sql.thriftserver.ui.retainedStatements 100



spark.file.transferTo false
spark.driver.maxResultSize 4g
spark.sql.hive.metastore.jars=/spark/spark-1.4.1/hive/*


spark.eventLog.dirhdfs://mycluster/user/spark/historylog
spark.history.fs.logDirectory hdfs://mycluster/user/spark/historylog



spark.driver.extraClassPath=/spark/spark-1.4.1/extlib/*
spark.executor.extraClassPath=/spark/spark-1.4.1/extlib/*



spark.sql.parquet.binaryAsString true
spark.serializerorg.apache.spark.serializer.KryoSerializer
spark.kryoserializer.buffer 32
spark.kryoserializer.buffer.max 256
spark.shuffle.consolidateFiles true
spark.io.compression.codec org.apache.spark.io.LZ4CompressionCodec











--  --
??: "Igor Berman";;
: 2015??8??3??(??) 7:56
??: "Sea"<261810...@qq.com>; 
: "Barak Gitsis"; "Ted Yu"; 
"user@spark.apache.org"; "rxin"; 
"joshrosen"; "davies"; 
: Re: About memory leak in spark 1.4.1



in general, what is your configuration? use --conf "spark.logConf=true"



we have 1.4.1 in production standalone cluster and haven't experienced what you 
are describingcan you verify in web-ui that indeed spark got your 50g per 
executor limit? I mean in configuration page..


might be you are using offheap storage(Tachyon)?




On 3 August 2015 at 04:58, Sea <261810...@qq.com> wrote:
"spark uses a lot more than heap memory, it is the expected behavior."  It 
didn't exist in spark 1.3.x
What does "a lot more than" means?  It means that I lose control of it!
I try to  apply 31g, but it still grows to 55g and continues to grow!!! That is 
the point!
I have tried set memoryFraction to 0.2??but it didn't help.
I don't know whether it will still exist in the next release 1.5, I wish not.






--  --
??: "Barak Gitsis";;
: 2015??8??2??(??) 9:55
??: "Sea"<261810...@qq.com>; "Ted Yu"; 
: "user@spark.apache.org"; 
"rxin"; "joshrosen"; 
"davies"; 
: Re: About memory leak in spark 1.4.1





spark uses a lot more than heap memory, it is the expected behavior.in 1.4 
off-heap memory usage is supposed to grow in comparison to 1.3


Better use as little memory as you can for heap, and since you are not 
utilizing it already, it is safe for you to reduce it.
memoryFraction helps you optimize heap usage for your data/application profile 
while keeping it tight.



 






On Sun, Aug 2, 2015 at 12:54 PM Sea <261810...@qq.com> wrote:

spark.storage.memoryFraction is in heap memory, but my situation is that the 
memory is more than heap memory !  


Anyone else use spark 1.4.1 in production? 




--  --
??: "Ted Yu";;
: 2015??8??2??(??) 5:45
??: "Sea"<261810...@qq.com>; 
: "Barak Gitsis"; 
"user@spark.apache.org"; "rxin"; 
"joshrosen"; "davies"; 


: Re: About memory leak in spark 1.4.1




http://spark.apache.org/docs/latest/tuning.html does mention 
spark.storage.memoryFraction in two places.
One is under Cache Size Tuning section.


FYI


On Sun, Aug 2, 2015 at 2:16 AM, Sea <261810...@qq.com> wrote:
Hi, Barak
It is ok with spark 1.3.0, the problem is with spark 1.4.1.
I don't think spark.storage.memoryFraction will make any sense, because it 
is still in heap memory. 




--  --
??: "Barak Gitsis";;
: 2015??8??2??(??) 4:11
??: "Sea"<261810...@qq.com>; "user"; 
: "rxin"; "joshrosen"; 
"davies"; 
: Re: About memory leak in spark 1.4.1



Hi,reducing spark.storage.memoryFraction did the trick for me. Heap doesn't get 
filled because it is reserved..
My reasoning is: 
I give executor all the memory i can give it, so that makes it a boundary.
From here i try to make the best use of memory I can. storage.memoryFraction is 
in a sense user data space.  The rest can be used by the system. 
If you don't have so much data that you MUST store in memory for performance, 
better give spark more space.. 
ended up setting it to 0.3


All that said, it is on spark 1.3 on cluster


hope that helps


On Sat, Aug 1, 2015 at 5:43 PM Sea <261810...@qq.com> wrote:

Hi, all
I upgrage spark to 1.4.1, many applications failed... I find the heap memory is 
not full , but the process of CoarseGrainedExecutorBackend will take more 
memory than I expect, and it will increase as time goes on, finally more than 
max limited of the server, the worker will die.


Any can help??


Mo

Re: How to control Spark Executors from getting Lost when using YARN client mode?

2015-08-04 Thread Jeff Zhang
Please check the node manager logs to see why the container is killed.

On Mon, Aug 3, 2015 at 11:59 PM, Umesh Kacha  wrote:

> Hi all any help will be much appreciated my spark job runs fine but in the
> middle it starts loosing executors because of netafetchfailed exception
> saying shuffle not found at the location since executor is lost
> On Jul 31, 2015 11:41 PM, "Umesh Kacha"  wrote:
>
>> Hi thanks for the response. It looks like YARN container is getting
>> killed but dont know why I see shuffle metafetchexception as mentioned in
>> the following SO link. I have enough memory 8 nodes 8 cores 30 gig memory
>> each. And because of this metafetchexpcetion YARN killing container running
>> executor how can it over run memory I tried to give each executor 25 gig
>> still it is not sufficient and it fails. Please guide I dont understand
>> what is going on I am using Spark 1.4.0 I am using spark.shuffle.memory as
>> 0.0 and spark.storage.memory as 0.5. I have almost all optimal properties
>> like Kyro serializer I have kept 500 akka frame size 20 akka threads dont
>> know I am trapped its been two days I am trying to recover from this issue.
>>
>>
>> http://stackoverflow.com/questions/29850784/what-are-the-likely-causes-of-org-apache-spark-shuffle-metadatafetchfailedexcept
>>
>>
>>
>> On Thu, Jul 30, 2015 at 9:56 PM, Ashwin Giridharan <
>> ashwin.fo...@gmail.com> wrote:
>>
>>> What is your cluster configuration ( size and resources) ?
>>>
>>> If you do not have enough resources, then your executor will not run.
>>> Moreover allocating 8 cores to an executor is too much.
>>>
>>> If you have a cluster with four nodes running NodeManagers, each
>>> equipped with 4 cores and 8GB of memory,
>>> then an optimal configuration would be,
>>>
>>> --num-executors 8 --executor-cores 2 --executor-memory 2G
>>>
>>> Thanks,
>>> Ashwin
>>>
>>> On Thu, Jul 30, 2015 at 12:08 PM, unk1102  wrote:
>>>
 Hi I have one Spark job which runs fine locally with less data but when
 I
 schedule it on YARN to execute I keep on getting the following ERROR and
 slowly all executors gets removed from UI and my job fails

 15/07/30 10:18:13 ERROR cluster.YarnScheduler: Lost executor 8 on
 myhost1.com: remote Rpc client disassociated
 15/07/30 10:18:13 ERROR cluster.YarnScheduler: Lost executor 6 on
 myhost2.com: remote Rpc client disassociated
 I use the following command to schedule spark job in yarn-client mode

  ./spark-submit --class com.xyz.MySpark --conf
 "spark.executor.extraJavaOptions=-XX:MaxPermSize=512M"
 --driver-java-options
 -XX:MaxPermSize=512m --driver-memory 3g --master yarn-client
 --executor-memory 2G --executor-cores 8 --num-executors 12
 /home/myuser/myspark-1.0.jar

 I dont know what is the problem please guide. I am new to Spark. Thanks
 in
 advance.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-control-Spark-Executors-from-getting-Lost-when-using-YARN-client-mode-tp24084.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


>>>
>>>
>>> --
>>> Thanks & Regards,
>>> Ashwin Giridharan
>>>
>>
>>


-- 
Best Regards

Jeff Zhang


Re: Extremely poor predictive performance with RF in mllib

2015-08-04 Thread Yanbo Liang
It looks like the predicted result just opposite with expectation, so could
you check whether the label is right?
Or could you share several data which can help to reproduce this output?

2015-08-03 19:36 GMT+08:00 Barak Gitsis :

> hi,
> I've run into some poor RF behavior, although not as pronounced as you..
> would be great to get more insight into this one
>
> Thanks!
>
> On Mon, Aug 3, 2015 at 8:21 AM pkphlam  wrote:
>
>> Hi,
>>
>> This might be a long shot, but has anybody run into very poor predictive
>> performance using RandomForest with Mllib? Here is what I'm doing:
>>
>> - Spark 1.4.1 with PySpark
>> - Python 3.4.2
>> - ~30,000 Tweets of text
>> - 12289 1s and 15956 0s
>> - Whitespace tokenization and then hashing trick for feature selection
>> using
>> 10,000 features
>> - Run RF with 100 trees and maxDepth of 4 and then predict using the
>> features from all the 1s observations.
>>
>> So in theory, I should get predictions of close to 12289 1s (especially if
>> the model overfits). But I'm getting exactly 0 1s, which sounds ludicrous
>> to
>> me and makes me suspect something is wrong with my code or I'm missing
>> something. I notice similar behavior (although not as extreme) if I play
>> around with the settings. But I'm getting normal behavior with other
>> classifiers, so I don't think it's my setup that's the problem.
>>
>> For example:
>>
>> >>> lrm = LogisticRegressionWithSGD.train(lp, iterations=10)
>> >>> logit_predict = lrm.predict(predict_feat)
>> >>> logit_predict.sum()
>> 9077
>>
>> >>> nb = NaiveBayes.train(lp)
>> >>> nb_predict = nb.predict(predict_feat)
>> >>> nb_predict.sum()
>> 10287.0
>>
>> >>> rf = RandomForest.trainClassifier(lp, numClasses=2,
>> >>> categoricalFeaturesInfo={}, numTrees=100, seed=422)
>> >>> rf_predict = rf.predict(predict_feat)
>> >>> rf_predict.sum()
>> 0.0
>>
>> This code was all run back to back so I didn't change anything in between.
>> Does anybody have a possible explanation for this?
>>
>> Thanks!
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Extremely-poor-predictive-performance-with-RF-in-mllib-tp24112.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>> --
> *-Barak*
>


Total delay per batch in a CSV file

2015-08-04 Thread allonsy
Hi everyone,

I'm working with Spark Streaming, and I need to perform some offline
performance measures.

What I'd like to have is a CSV file that reports something like this:

*Batch number/timestampInput SizeTotal Delay*

which is in fact similar to what the UI outputs.

I tried to get some metrics (metrics.properties), but I'm having hard time
getting precise information on every single batch, since they only have
entries concerning the /last/ (completed/received) batch, and values are
often different to those appearing in the UI.

Can anybody give me some advice on how to get metrics that are close to
those of the UI? 

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Total-delay-per-batch-in-a-CSV-file-tp24129.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Fwd: Writing streaming data to cassandra creates duplicates

2015-08-04 Thread Priya Ch
Yes...union would be one solution. I am not doing any aggregation hence
reduceByKey would not be useful. If I use groupByKey, messages with same
key would be obtained in a partition. But groupByKey is very expensive
operation as it involves shuffle operation. My ultimate goal is to write
the messages to cassandra. if the messages with same key are handled by
different streams...there would be concurrency issues. To resolve this i
can union dstreams and apply hash parttioner so that it would bring all the
same keys to a single partition or do a groupByKey which does the same.

As groupByKey is expensive, is there any work around for this ?

On Thu, Jul 30, 2015 at 2:33 PM, Juan Rodríguez Hortalá <
juan.rodriguez.hort...@gmail.com> wrote:

> Hi,
>
> Just my two cents. I understand your problem is that your problem is that
> you have messages with the same key in two different dstreams. What I would
> do would be making a union of all the dstreams with StreamingContext.union
> or several calls to DStream.union, and then I would create a pair dstream
> with the primary key as key, and then I'd use groupByKey or reduceByKey (or
> combineByKey etc) to combine the messages with the same primary key.
>
> Hope that helps.
>
> Greetings,
>
> Juan
>
>
> 2015-07-30 10:50 GMT+02:00 Priya Ch :
>
>> Hi All,
>>
>>  Can someone throw insights on this ?
>>
>> On Wed, Jul 29, 2015 at 8:29 AM, Priya Ch 
>> wrote:
>>
>>>
>>>
>>> Hi TD,
>>>
>>>  Thanks for the info. I have the scenario like this.
>>>
>>>  I am reading the data from kafka topic. Let's say kafka has 3
>>> partitions for the topic. In my streaming application, I would configure 3
>>> receivers with 1 thread each such that they would receive 3 dstreams (from
>>> 3 partitions of kafka topic) and also I implement partitioner. Now there is
>>> a possibility of receiving messages with same primary key twice or more,
>>> one is at the time message is created and other times if there is an update
>>> to any fields for same message.
>>>
>>> If two messages M1 and M2 with same primary key are read by 2 receivers
>>> then even the partitioner in spark would still end up in parallel
>>> processing as there are altogether in different dstreams. How do we address
>>> in this situation ?
>>>
>>> Thanks,
>>> Padma Ch
>>>
>>> On Tue, Jul 28, 2015 at 12:12 PM, Tathagata Das 
>>> wrote:
>>>
 You have to partition that data on the Spark Streaming by the primary
 key, and then make sure insert data into Cassandra atomically per key, or
 per set of keys in the partition. You can use the combination of the (batch
 time, and partition Id) of the RDD inside foreachRDD as the unique id for
 the data you are inserting. This will guard against multiple attempts to
 run the task that inserts into Cassandra.

 See
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#semantics-of-output-operations

 TD

 On Sun, Jul 26, 2015 at 11:19 AM, Priya Ch <
 learnings.chitt...@gmail.com> wrote:

> Hi All,
>
>  I have a problem when writing streaming data to cassandra. Or
> existing product is on Oracle DB in which while wrtiting data, locks are
> maintained such that duplicates in the DB are avoided.
>
> But as spark has parallel processing architecture, if more than 1
> thread is trying to write same data i.e with same primary key, is there as
> any scope to created duplicates? If yes, how to address this problem 
> either
> from spark or from cassandra side ?
>
> Thanks,
> Padma Ch
>


>>>
>>>
>>
>


Re: TFIDF Transformation

2015-08-04 Thread Yanbo Liang
It can not translate the number back to the word except you store the in
map by yourself.

2015-07-31 1:45 GMT+08:00 hans ziqiu li :

> Hello spark users!
>
> I am having some troubles with the TFIDF in MLlib and was wondering if
> anyone can point me to the right direction.
>
> The data ingestion and the initial term frequency count code taken from the
> example works fine (I am using the first example from this page:
> https://spark.apache.org/docs/1.2.0/mllib-feature-extraction.html).
>
> Below is my input data:
>
> WrappedArray((Frank,  spent,  Friday,  afternoon,  at,  labs,  test,  test,
> test,  test,  test,  test,  test,  test,  test))
> WrappedArray((we,  are,  testing,  the,  algorithm,  with,  us,  test,
> test,  test,  test,  test,  test,  test,  test))
> WrappedArray((hello,  my,  name,  is,  Hans,  and,  I,  am,  testing,
> TFIDF,  test,  test,  test,  test,  test))
> WrappedArray((TFIDF,  is,  an,  amazing,  algorithm,  that,  is,  used,
> for,  spam,  filtering,  and,  search,  test,  test))
> WrappedArray((Accenture,  is,  doing,  great,  test,  test,  test,  test,
> test,  test,  test,  test,  test,  test,  test))
>
> Here’s the output:
>
>
> (1048576,[1065,1463,33868,34122,34252,337086,420523,603314,717226,767673,839152,876983],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,4.0,1.0,1.0,1.0,1.0])
>
> (1048576,[1463,6313,33869,34122,118216,147517,162737,367946,583529,603314,605639,646109,876983,972879],[1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])
>
> (1048576,[20311,34122,340246,603314,778861,876983],[1.0,1.0,1.0,10.0,1.0,1.0])
>
> (1048576,[33875,102986,154015,267598,360614,603314,690972,876983],[1.0,1.0,1.0,1.0,1.0,8.0,1.0,1.0])
>
> (1048576,[1588,19537,34494,42230,603314,696550,839152,876983,972879],[1.0,1.0,1.0,1.0,7.0,1.0,1.0,1.0,1.0])
>
> The problem I am having here is that the output from HashingTF is not
> ordered like the original sentence, I understand that the integer “603314”
> in the output stands for the word “ test” in the input. But how would I
> programmatically translate the number back to the word so I know which
> words
> are most common? Please let me know your thoughts!
>
> I am not sure how helpful these are going to be but here are the things
> I’ve
> noticed when I was looking into the source code of TFIDF:
>
> 1. def
> indexOf(term:
> Any):
> Int
> =
> Utils.nonNegativeMod(term.##,
>  numFeatures) > This line of code hashes the term into it’s ASCII value
> and calculates ‘ASCII’ modulo ‘numberFeatures’(which is defaulted 2^20)
> 2. Then def
> transform(document:
> Iterable[_]):
> Vector
> =
>  { blah blah blah} ———> This part of the code does the counting and spreads
> the current array into two separate ones using Vectors.sparse.
>
>
> Thanks in advance and I hope to hear from you soon!
> Best,
> Hans
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/TFIDF-Transformation-tp24086.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: About memory leak in spark 1.4.1

2015-08-04 Thread Igor Berman
sorry, can't disclose info about my prod cluster

nothing jumps into my mind regarding your config
we don't use lz4 compression, don't know what is spark.deploy.spreadOut(there
is no documentation regarding this)

If you are sure that you don't have memory leak in your business logic I
would try to reset each property to default(or just remove it from your
config) and try to run your job to see if it's not
somehow connected

my config(nothing special really)
spark.shuffle.consolidateFiles true
spark.speculation false
spark.executor.extraJavaOptions -XX:+UseStringCache
-XX:+UseCompressedStrings -XX:+PrintGC -XX:+PrintGCDetails
-XX:+PrintGCTimeStamps -Xloggc:gc.log -verbose:gc
spark.executor.logs.rolling.maxRetainedFiles 1000
spark.executor.logs.rolling.strategy time
spark.worker.cleanup.enabled true
spark.logConf true
spark.rdd.compress true





On 4 August 2015 at 12:59, Sea <261810...@qq.com> wrote:

> How much machines are there in your standalone cluster?
> I am not using tachyon.
>
> GC can not help me... Can anyone help ?
>
> my configuration:
>
> spark.deploy.spreadOut false
> spark.eventLog.enabled true
> spark.executor.cores 24
>
> spark.ui.retainedJobs 10
> spark.ui.retainedStages 10
> spark.history.retainedApplications 5
> spark.deploy.retainedApplications 10
> spark.deploy.retainedDrivers  10
> spark.streaming.ui.retainedBatches 10
> spark.sql.thriftserver.ui.retainedSessions 10
> spark.sql.thriftserver.ui.retainedStatements 100
>
> spark.file.transferTo false
> spark.driver.maxResultSize 4g
> spark.sql.hive.metastore.jars=/spark/spark-1.4.1/hive/*
>
> spark.eventLog.dirhdfs://mycluster/user/spark/historylog
> spark.history.fs.logDirectory hdfs://mycluster/user/spark/historylog
>
> spark.driver.extraClassPath=/spark/spark-1.4.1/extlib/*
> spark.executor.extraClassPath=/spark/spark-1.4.1/extlib/*
>
> spark.sql.parquet.binaryAsString true
> spark.serializerorg.apache.spark.serializer.KryoSerializer
> spark.kryoserializer.buffer 32
> spark.kryoserializer.buffer.max 256
> spark.shuffle.consolidateFiles true
> spark.io.compression.codec org.apache.spark.io.LZ4CompressionCodec
>
>
>
>
>
> -- 原始邮件 --
> *发件人:* "Igor Berman";;
> *发送时间:* 2015年8月3日(星期一) 晚上7:56
> *收件人:* "Sea"<261810...@qq.com>;
> *抄送:* "Barak Gitsis"; "Ted Yu";
> "user@spark.apache.org"; "rxin";
> "joshrosen"; "davies";
> *主题:* Re: About memory leak in spark 1.4.1
>
> in general, what is your configuration? use --conf "spark.logConf=true"
>
> we have 1.4.1 in production standalone cluster and haven't experienced
> what you are describing
> can you verify in web-ui that indeed spark got your 50g per executor
> limit? I mean in configuration page..
>
> might be you are using offheap storage(Tachyon)?
>
>
> On 3 August 2015 at 04:58, Sea <261810...@qq.com> wrote:
>
>> "spark uses a lot more than heap memory, it is the expected behavior."
>>  It didn't exist in spark 1.3.x
>> What does "a lot more than" means?  It means that I lose control of it!
>> I try to  apply 31g, but it still grows to 55g and continues to grow!!!
>> That is the point!
>> I have tried set memoryFraction to 0.2,but it didn't help.
>> I don't know whether it will still exist in the next release 1.5, I wish
>> not.
>>
>>
>>
>> -- 原始邮件 --
>> *发件人:* "Barak Gitsis";;
>> *发送时间:* 2015年8月2日(星期天) 晚上9:55
>> *收件人:* "Sea"<261810...@qq.com>; "Ted Yu";
>> *抄送:* "user@spark.apache.org"; "rxin"<
>> r...@databricks.com>; "joshrosen"; "davies"<
>> dav...@databricks.com>;
>> *主题:* Re: About memory leak in spark 1.4.1
>>
>> spark uses a lot more than heap memory, it is the expected behavior.
>> in 1.4 off-heap memory usage is supposed to grow in comparison to 1.3
>>
>> Better use as little memory as you can for heap, and since you are not
>> utilizing it already, it is safe for you to reduce it.
>> memoryFraction helps you optimize heap usage for your data/application
>> profile while keeping it tight.
>>
>>
>>
>>
>>
>>
>> On Sun, Aug 2, 2015 at 12:54 PM Sea <261810...@qq.com> wrote:
>>
>>> spark.storage.memoryFraction is in heap memory, but my situation is that
>>> the memory is more than heap memory !
>>>
>>> Anyone else use spark 1.4.1 in production?
>>>
>>>
>>> -- 原始邮件 --
>>> *发件人:* "Ted Yu";;
>>> *发送时间:* 2015年8月2日(星期天) 下午5:45
>>> *收件人:* "Sea"<261810...@qq.com>;
>>> *抄送:* "Barak Gitsis"; "user@spark.apache.org"<
>>> user@spark.apache.org>; "rxin"; "joshrosen"<
>>> joshro...@databricks.com>; "davies";
>>> *主题:* Re: About memory leak in spark 1.4.1
>>>
>>> http://spark.apache.org/docs/latest/tuning.html does mention 
>>> spark.storage.memoryFraction
>>> in two places.
>>> One is under Cache Size Tuning section.
>>>
>>> FYI
>>>
>>> On Sun, Aug 2, 2015 at 2:16 AM, Sea <261810...@qq.com> wrote:
>>>
 Hi, Barak
 It is ok with spark 1.3.0, the problem is with spark 1.4.1.
 I don't think spark.storage.memoryFraction will make any sense,
 because it is

Re: Writing streaming data to cassandra creates duplicates

2015-08-04 Thread Gerard Maas
(removing dev from the to: as not relevant)

it would be good to see some sample data and the cassandra schema to have a
more concrete idea of the problem space.

Some thoughts: reduceByKey could still be used to 'pick' one element.
example of arbitrarily choosing the first one: reduceByKey{case (e1,e2) =>
e1}

The question to be answered is: what should happen to the multiple values
that arrive for 1 key?

And why are they creating duplicates in cassandra? if they have the same
key, they will result in an overwrite (that's not desirable due to
tombstones anyway)

-kr, Gerard.



On Tue, Aug 4, 2015 at 1:03 PM, Priya Ch 
wrote:

>
>
>
> Yes...union would be one solution. I am not doing any aggregation hence
> reduceByKey would not be useful. If I use groupByKey, messages with same
> key would be obtained in a partition. But groupByKey is very expensive
> operation as it involves shuffle operation. My ultimate goal is to write
> the messages to cassandra. if the messages with same key are handled by
> different streams...there would be concurrency issues. To resolve this i
> can union dstreams and apply hash parttioner so that it would bring all the
> same keys to a single partition or do a groupByKey which does the same.
>
> As groupByKey is expensive, is there any work around for this ?
>
> On Thu, Jul 30, 2015 at 2:33 PM, Juan Rodríguez Hortalá <
> juan.rodriguez.hort...@gmail.com> wrote:
>
>> Hi,
>>
>> Just my two cents. I understand your problem is that your problem is that
>> you have messages with the same key in two different dstreams. What I would
>> do would be making a union of all the dstreams with StreamingContext.union
>> or several calls to DStream.union, and then I would create a pair dstream
>> with the primary key as key, and then I'd use groupByKey or reduceByKey (or
>> combineByKey etc) to combine the messages with the same primary key.
>>
>> Hope that helps.
>>
>> Greetings,
>>
>> Juan
>>
>>
>> 2015-07-30 10:50 GMT+02:00 Priya Ch :
>>
>>> Hi All,
>>>
>>>  Can someone throw insights on this ?
>>>
>>> On Wed, Jul 29, 2015 at 8:29 AM, Priya Ch 
>>> wrote:
>>>


 Hi TD,

  Thanks for the info. I have the scenario like this.

  I am reading the data from kafka topic. Let's say kafka has 3
 partitions for the topic. In my streaming application, I would configure 3
 receivers with 1 thread each such that they would receive 3 dstreams (from
 3 partitions of kafka topic) and also I implement partitioner. Now there is
 a possibility of receiving messages with same primary key twice or more,
 one is at the time message is created and other times if there is an update
 to any fields for same message.

 If two messages M1 and M2 with same primary key are read by 2 receivers
 then even the partitioner in spark would still end up in parallel
 processing as there are altogether in different dstreams. How do we address
 in this situation ?

 Thanks,
 Padma Ch

 On Tue, Jul 28, 2015 at 12:12 PM, Tathagata Das 
 wrote:

> You have to partition that data on the Spark Streaming by the primary
> key, and then make sure insert data into Cassandra atomically per key, or
> per set of keys in the partition. You can use the combination of the 
> (batch
> time, and partition Id) of the RDD inside foreachRDD as the unique id for
> the data you are inserting. This will guard against multiple attempts to
> run the task that inserts into Cassandra.
>
> See
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#semantics-of-output-operations
>
> TD
>
> On Sun, Jul 26, 2015 at 11:19 AM, Priya Ch <
> learnings.chitt...@gmail.com> wrote:
>
>> Hi All,
>>
>>  I have a problem when writing streaming data to cassandra. Or
>> existing product is on Oracle DB in which while wrtiting data, locks are
>> maintained such that duplicates in the DB are avoided.
>>
>> But as spark has parallel processing architecture, if more than 1
>> thread is trying to write same data i.e with same primary key, is there 
>> as
>> any scope to created duplicates? If yes, how to address this problem 
>> either
>> from spark or from cassandra side ?
>>
>> Thanks,
>> Padma Ch
>>
>
>


>>>
>>
>
>


Re: TFIDF Transformation

2015-08-04 Thread clark djilo kuissu
Hi,  I had the same problem and I didn't found the solution. I used Word2Vec 
instead.
I am interessed by the solution of this problem of how to go back from the 
TF-IDF hashing to word.
Regards,
Clark
 


 Le Mardi 4 août 2015 13h03, Yanbo Liang  a écrit :
   

 It can not translate the number back to the word except you store the in map 
by yourself.
2015-07-31 1:45 GMT+08:00 hans ziqiu li :

Hello spark users!

I am having some troubles with the TFIDF in MLlib and was wondering if
anyone can point me to the right direction.

The data ingestion and the initial term frequency count code taken from the
example works fine (I am using the first example from this page:
https://spark.apache.org/docs/1.2.0/mllib-feature-extraction.html).

Below is my input data:

WrappedArray((Frank,  spent,  Friday,  afternoon,  at,  labs,  test,  test,
test,  test,  test,  test,  test,  test,  test))
WrappedArray((we,  are,  testing,  the,  algorithm,  with,  us,  test,
test,  test,  test,  test,  test,  test,  test))
WrappedArray((hello,  my,  name,  is,  Hans,  and,  I,  am,  testing,
TFIDF,  test,  test,  test,  test,  test))
WrappedArray((TFIDF,  is,  an,  amazing,  algorithm,  that,  is,  used,
for,  spam,  filtering,  and,  search,  test,  test))
WrappedArray((Accenture,  is,  doing,  great,  test,  test,  test,  test,
test,  test,  test,  test,  test,  test,  test))

Here’s the output:

(1048576,[1065,1463,33868,34122,34252,337086,420523,603314,717226,767673,839152,876983],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,4.0,1.0,1.0,1.0,1.0])
(1048576,[1463,6313,33869,34122,118216,147517,162737,367946,583529,603314,605639,646109,876983,972879],[1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])
(1048576,[20311,34122,340246,603314,778861,876983],[1.0,1.0,1.0,10.0,1.0,1.0])
(1048576,[33875,102986,154015,267598,360614,603314,690972,876983],[1.0,1.0,1.0,1.0,1.0,8.0,1.0,1.0])
(1048576,[1588,19537,34494,42230,603314,696550,839152,876983,972879],[1.0,1.0,1.0,1.0,7.0,1.0,1.0,1.0,1.0])

The problem I am having here is that the output from HashingTF is not
ordered like the original sentence, I understand that the integer “603314”
in the output stands for the word “ test” in the input. But how would I
programmatically translate the number back to the word so I know which words
are most common? Please let me know your thoughts!

I am not sure how helpful these are going to be but here are the things I’ve
noticed when I was looking into the source code of TFIDF:

1. def
indexOf(term:
Any):
Int
=
Utils.nonNegativeMod(term.##,
 numFeatures) > This line of code hashes the term into it’s ASCII value
and calculates ‘ASCII’ modulo ‘numberFeatures’(which is defaulted 2^20)
2. Then def
transform(document:
Iterable[_]):
Vector
=
 { blah blah blah} ———> This part of the code does the counting and spreads
the current array into two separate ones using Vectors.sparse.


Thanks in advance and I hope to hear from you soon!
Best,
Hans




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/TFIDF-Transformation-tp24086.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





  

Re: Setting a stage timeout

2015-08-04 Thread William Kinney
Yes I upgraded but I would still like to set an overall stage timeout. Does
that exist?

On Fri, Jul 31, 2015 at 1:13 PM, Ted Yu  wrote:

> The referenced bug has been fixed in 1.4.0, are you able to upgrade ?
>
> Cheers
>
> On Fri, Jul 31, 2015 at 10:01 AM, William Kinney  > wrote:
>
>> Hi,
>>
>> I had a job that got stuck on yarn due to
>> https://issues.apache.org/jira/browse/SPARK-6954
>> It never exited properly.
>>
>> Is there a way to set a timeout for a stage or all stages?
>>
>
>


Re: Safe to write to parquet at the same time?

2015-08-04 Thread Cheng Lian

It should be safe for Spark 1.4.1 and later versions.

Now Spark SQL adds a job-wise UUID to output file names to distinguish 
files written by different write jobs. So those two write jobs you gave 
should play well with each other. And the job committed later will 
generate a summary file for all Parquet data files it sees. (However, 
Parquet summary file generation may fail due to various reasons and is 
generally not reliable.)


Cheng

On 8/4/15 10:37 AM, Philip Weaver wrote:
I think this question applies regardless if I have two completely 
separate Spark jobs or tasks on different machines, or two cores that 
are part of the same task on the same machine.


If two jobs/tasks/cores/stages both save to the same parquet directory 
in parallel like this:


df1.write.mode(SaveMode.Append).partitionBy(a, b).parquet(dir)

df2.write.mode(SaveMode.Append).partitionBy(a, b).parquet(dir)


Will the result be equivalent to this?

df1.unionAll(df2).write.mode(SaveMode.Append).partitionBy(a,
b).parquet(dir)


What if we ensure that 'dir' does not exist first?

- Philip





Re: Parquet SaveMode.Append Trouble.

2015-08-04 Thread Cheng Lian

You need to import org.apache.spark.sql.SaveMode

Cheng

On 7/31/15 6:26 AM, satyajit vegesna wrote:

Hi,

I am new to using Spark and Parquet files,

Below is what i am trying to do, on Spark-shell,

val df = 
sqlContext.parquetFile("/data/LM/Parquet/Segment/pages/part-m-0.gz.parquet") 


Have also tried below command,

val 
df=sqlContext.read.format("parquet").load("/data/LM/Parquet/Segment/pages/part-m-0.gz.parquet")


Now i have an other existing parquet file to which i want to append 
this Parquet file data of df.


so i use,

df.save("/data/LM/Parquet/Segment/pages2/part-m-0.gz.parquet","parquet", 
SaveMode.Append )


also tried below command,

df.save("/data/LM/Parquet/Segment/pages2/part-m-0.gz.parquet", 
SaveMode.Append )



and it throws me below error,

:26: error: not found: value SaveMode
df.save("/data/LM/Parquet/Segment/pages2/part-m-0.gz.parquet","parquet", 
SaveMode.Append )


Please help me, in case i am doing something wrong here.

Regards,
Satyajit.






-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Delete NA in a dataframe

2015-08-04 Thread clark djilo kuissu
Hello, 

I try to magage NA in this dataset. I import my dataset with the 
com.databricks.spark.csv package 

When I do this: allyears2k.na.drop() I have no result.
Can you help me please ?
Regards,
---   The dataset -

dataset:  https://s3.amazonaws.com/h2o-airlines-unpacked/allyears2k.csv

---   The code -
// Prepare environment
import sys.process._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._


val allyears2k = 
sqlContext.read.format("com.databricks.spark.csv").option("header", 
"true").load("/home/clark/allyears2k.csv")
allyears2k.registerTempTable("allyears2k")
val rv = allyears2k.na.drop()
 


Re: Schedule lunchtime today for a free webinar "IoT data ingestion in Spark Streaming using Kaa" 11 a.m. PDT (2 p.m. EDT)

2015-08-04 Thread orozvadovskyy
Hi there! 

If you missed our webinar on "IoT data ingestion in Spark with KaaIoT", see the 
video and slides here: http://goo.gl/VMyQ1M 

We recorded our webinar on “IoT data ingestion in Spark Streaming using Kaa” 
for those who couldn’t see it live or who would like to refresh what they have 
learned. During the webinar, we explained and illustrated how Kaa and Spark can 
be effectively used together to address the challenges of IoT data gathering 
and analysis. In this video, you will find highly crystallized, practical 
instruction on setting up your own stream analytics solution with Kaa and 
Spark. 

Best wishes, 
Oleh Rozvadovskyy 
CyberVision Inc 

- Вихідне повідомлення -

Від: "Oleh Rozvadovskyy"  
Кому: user@spark.apache.org 
Надіслано: Четвер, 23 Липень 2015 р 17:48:11 
Тема: Schedule lunchtime today for a free webinar "IoT data ingestion in Spark 
Streaming using Kaa" 11 a.m. PDT (2 p.m. EDT) 

Hi there! 

Only couple of hours left to our first webinar on IoT data ingestion in Spark 
Streaming using Kaa . 



During the webinar we will build a solution that ingests real-time data from 
Intel Edison into Apache Spark for stream processing. This solution includes a 
client, middleware, and analytics components. All of these software components 
are 100% open-source, therefore, the solution described in this tutorial can be 
used as a prototype for even a commercial product. 

Those, who are interested, please feel free to sign up here . 

Best wishes, 
Oleh Rozvadovskyy 
CyberVision Inc. 

​ 



AW: Twitter live Streaming

2015-08-04 Thread Filli Alem
Hi Sadaf,

Im currently struggling with Twitter Streaming as well. I cant get it working 
using the simple setup bellow. I use spark 1.2 and I replaced twitter4j v3 with 
v4. Am I doing something wrong? How are you doing this?

twitter4j.conf.Configuration conf = new twitter4j.conf.ConfigurationBuilder()

.setOAuthConsumerKey("")

.setOAuthConsumerSecret("*")

.setOAuthAccessToken("*")

.setOAuthAccessTokenSecret("**").build();

TwitterFactory tf =new TwitterFactory(conf);
Authorization a = new OAuthAuthorization(conf);
Authorization a2 = tf.getInstance(a).getAuthorization();

SparkConf sparkConf = new 
SparkConf().setAppName("TwitterStreamJob");

JavaStreamingContext jssc = new 
JavaStreamingContext(sparkConf,Durations.seconds(30));

String[] filter = new String[]{"football"};

JavaReceiverInputDStream receiverStream = 
TwitterUtils.createStream(jssc, a2, filter);

JavaDStream tweets= receiverStream.map(new Function() {

@Override
public String call(Status tweet) throws Exception {
return tweet.getUser().getName() +"_" + 
tweet.getText() +"_" + tweet.getCreatedAt().getTime();
}

});
tweets.foreachRDD(new Function, Void>() {

@Override
public Void call(JavaRDD arg0) throws Exception 
{

arg0.saveAsTextFile("hdfs://myhost/results/twitter_" + 
UUID.randomUUID().toString());
return null;
}
});

 jssc.start();
 jssc.awaitTermination();

-Ursprüngliche Nachricht-
Von: Sadaf [mailto:sa...@platalytics.com]
Gesendet: Dienstag, 4. August 2015 10:29
An: user@spark.apache.org
Betreff: Twitter live Streaming

Hi
Is there any way to get all old tweets since when the account was created using 
spark streaming and twitters api? Currently my connector is showing those 
tweets that get posted after the program runs. I've done this task using spark 
streaming and a custom receiver using "twitter user api".

Thanks in anticipation.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Twitter-live-Streaming-tp24124.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org

[http://www.ti8m.ch/fileadmin/daten/ti8m/Bilder/footer/Footer_Paymit_klein.jpg]

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Delete NA in a dataframe

2015-08-04 Thread Peter Rudenko

Hi Clark,
the problem is that in this dataset null values represented as NA 
marker. Spark-csv doesn't have configurable null values marker (i've 
made a PR with it some time ago: 
https://github.com/databricks/spark-csv/pull/76).


So one option for you is to do post filtering, something like this:

val rv = allyears2k.filter("COLUMN != `NA`")

Thanks,
Peter Rudenko
On 2015-08-04 15:03, clark djilo kuissu wrote:

Hello,

I try to magage NA in this dataset. I import my dataset with the 
com.databricks.spark.csv package


When I do this: allyears2k.na.drop() I have no result.

Can you help me please ?

Regards,

--- The dataset -

dataset: https://s3.amazonaws.com/h2o-airlines-unpacked/allyears2k.csv

---   The code -

// Prepare environment
import sys.process._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._


val allyears2k = 
sqlContext.read.format("com.databricks.spark.csv").option("header", 
"true").load("/home/clark/allyears2k.csv")

allyears2k.registerTempTable("allyears2k")

val rv = allyears2k.na.drop()





Re: Delete NA in a dataframe

2015-08-04 Thread clark djilo kuissu
Thank you Peter
I try this
 


 Le Mardi 4 août 2015 15h02, Peter Rudenko  a 
écrit :
   

  Hi Clark,
 the problem is that in this dataset null values represented as NA marker. 
Spark-csv doesn't have configurable null values marker (i've made a PR with it 
some time ago: https://github.com/databricks/spark-csv/pull/76).
 
 So one option for you is to do post filtering, something like this:
 
 val rv = allyears2k.filter("COLUMN != `NA`") 
 
 Thanks,
 Peter Rudenko
 On 2015-08-04 15:03, clark djilo kuissu wrote:
 
  Hello, 
  
  I try to magage NA in this dataset. I import my dataset with the 
com.databricks.spark.csv package 
  
  When I do this: allyears2k.na.drop() I have no result. 
  Can you help me please ? 
  Regards, 
  ---   The dataset -
  
  dataset:  https://s3.amazonaws.com/h2o-airlines-unpacked/allyears2k.csv
  
 ---   The code - 
  // Prepare environment
 import sys.process._
 val sqlContext = new org.apache.spark.sql.SQLContext(sc)
 import sqlContext.implicits._
  
  
 val allyears2k = 
sqlContext.read.format("com.databricks.spark.csv").option("header", 
"true").load("/home/clark/allyears2k.csv")
 allyears2k.registerTempTable("allyears2k") 
  val rv = allyears2k.na.drop()
   
   
 
 

  

Re: About memory leak in spark 1.4.1

2015-08-04 Thread Barak Gitsis
maybe try reducing spark.executor.cores
perhaps your tasks have large offheap overhead and better have less tasks
running in parallel
is it streaming job?


On Tue, Aug 4, 2015 at 2:14 PM Igor Berman  wrote:

> sorry, can't disclose info about my prod cluster
>
> nothing jumps into my mind regarding your config
> we don't use lz4 compression, don't know what is spark.deploy.spreadOut(there
> is no documentation regarding this)
>
> If you are sure that you don't have memory leak in your business logic I
> would try to reset each property to default(or just remove it from your
> config) and try to run your job to see if it's not
> somehow connected
>
> my config(nothing special really)
> spark.shuffle.consolidateFiles true
> spark.speculation false
> spark.executor.extraJavaOptions -XX:+UseStringCache
> -XX:+UseCompressedStrings -XX:+PrintGC -XX:+PrintGCDetails
> -XX:+PrintGCTimeStamps -Xloggc:gc.log -verbose:gc
> spark.executor.logs.rolling.maxRetainedFiles 1000
> spark.executor.logs.rolling.strategy time
> spark.worker.cleanup.enabled true
> spark.logConf true
> spark.rdd.compress true
>
>
>
>
>
> On 4 August 2015 at 12:59, Sea <261810...@qq.com> wrote:
>
>> How much machines are there in your standalone cluster?
>> I am not using tachyon.
>>
>> GC can not help me... Can anyone help ?
>>
>> my configuration:
>>
>> spark.deploy.spreadOut false
>> spark.eventLog.enabled true
>> spark.executor.cores 24
>>
>> spark.ui.retainedJobs 10
>> spark.ui.retainedStages 10
>> spark.history.retainedApplications 5
>> spark.deploy.retainedApplications 10
>> spark.deploy.retainedDrivers  10
>> spark.streaming.ui.retainedBatches 10
>> spark.sql.thriftserver.ui.retainedSessions 10
>> spark.sql.thriftserver.ui.retainedStatements 100
>>
>> spark.file.transferTo false
>> spark.driver.maxResultSize 4g
>> spark.sql.hive.metastore.jars=/spark/spark-1.4.1/hive/*
>>
>> spark.eventLog.dirhdfs://mycluster/user/spark/historylog
>> spark.history.fs.logDirectory hdfs://mycluster/user/spark/historylog
>>
>> spark.driver.extraClassPath=/spark/spark-1.4.1/extlib/*
>> spark.executor.extraClassPath=/spark/spark-1.4.1/extlib/*
>>
>> spark.sql.parquet.binaryAsString true
>> spark.serializerorg.apache.spark.serializer.KryoSerializer
>> spark.kryoserializer.buffer 32
>> spark.kryoserializer.buffer.max 256
>> spark.shuffle.consolidateFiles true
>> spark.io.compression.codec org.apache.spark.io.LZ4CompressionCodec
>>
>>
>>
>>
>>
>> -- 原始邮件 --
>> *发件人:* "Igor Berman";;
>> *发送时间:* 2015年8月3日(星期一) 晚上7:56
>> *收件人:* "Sea"<261810...@qq.com>;
>> *抄送:* "Barak Gitsis"; "Ted Yu";
>> "user@spark.apache.org"; "rxin"<
>> r...@databricks.com>; "joshrosen"; "davies"<
>> dav...@databricks.com>;
>> *主题:* Re: About memory leak in spark 1.4.1
>>
>> in general, what is your configuration? use --conf "spark.logConf=true"
>>
>> we have 1.4.1 in production standalone cluster and haven't experienced
>> what you are describing
>> can you verify in web-ui that indeed spark got your 50g per executor
>> limit? I mean in configuration page..
>>
>> might be you are using offheap storage(Tachyon)?
>>
>>
>> On 3 August 2015 at 04:58, Sea <261810...@qq.com> wrote:
>>
>>> "spark uses a lot more than heap memory, it is the expected behavior."
>>>  It didn't exist in spark 1.3.x
>>> What does "a lot more than" means?  It means that I lose control of it!
>>> I try to  apply 31g, but it still grows to 55g and continues to grow!!!
>>> That is the point!
>>> I have tried set memoryFraction to 0.2,but it didn't help.
>>> I don't know whether it will still exist in the next release 1.5, I wish
>>> not.
>>>
>>>
>>>
>>> -- 原始邮件 --
>>> *发件人:* "Barak Gitsis";;
>>> *发送时间:* 2015年8月2日(星期天) 晚上9:55
>>> *收件人:* "Sea"<261810...@qq.com>; "Ted Yu";
>>> *抄送:* "user@spark.apache.org"; "rxin"<
>>> r...@databricks.com>; "joshrosen"; "davies"<
>>> dav...@databricks.com>;
>>> *主题:* Re: About memory leak in spark 1.4.1
>>>
>>> spark uses a lot more than heap memory, it is the expected behavior.
>>> in 1.4 off-heap memory usage is supposed to grow in comparison to 1.3
>>>
>>> Better use as little memory as you can for heap, and since you are not
>>> utilizing it already, it is safe for you to reduce it.
>>> memoryFraction helps you optimize heap usage for your data/application
>>> profile while keeping it tight.
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Sun, Aug 2, 2015 at 12:54 PM Sea <261810...@qq.com> wrote:
>>>
 spark.storage.memoryFraction is in heap memory, but my situation is
 that the memory is more than heap memory !

 Anyone else use spark 1.4.1 in production?


 -- 原始邮件 --
 *发件人:* "Ted Yu";;
 *发送时间:* 2015年8月2日(星期天) 下午5:45
 *收件人:* "Sea"<261810...@qq.com>;
 *抄送:* "Barak Gitsis"; "user@spark.apache.org"<
 user@spark.apache.org>; "rxin"; "joshrosen"<
 joshro...@databricks.com>; "davies";
 *主题:* Re: About memory leak in spark 1.4.1


Re: spark streaming max receiver rate doubts

2015-08-04 Thread Cody Koeninger
Those jobs will still be created for each valid time, they just may not
have many messages in them

On Mon, Aug 3, 2015 at 11:11 PM, Shushant Arora 
wrote:

> 1.In spark 1.3(Non receiver)  - If my batch interval is 1 sec and I don't
> set spark.streaming.kafka.maxRatePerPartition - so default behavious is to
> bring all messages from kafka from last offset to current offset ?
>
> Say no of messages were large and it took 5 sec to process those so will
> all jobs for interval 2-5 sec be queued and created afterwards or should
> not be created since all messages are already processed for those interval
> also?
>
> 2.In spark streaming 1.2(Receiver based) if I don't set 
> spark.streaming.receiver.maxRate
> - will it consume all messages from last offset or it will just consume
> messages whatever it can consume in this batch interval of 1 sec.
>


giving offset in spark sql

2015-08-04 Thread Hafiz Mujadid
Hi all!

I want to skip first n rows from a dataframe? This is done in normal sql
using offset keyword. How can we achieve in spark sql?

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/giving-offset-in-spark-sql-tp24130.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: About memory leak in spark 1.4.1

2015-08-04 Thread Ted Yu
w.r.t. spark.deploy.spreadOut , here is the scaladoc:

  // As a temporary workaround before better ways of configuring memory, we
allow users to set
  // a flag that will perform round-robin scheduling across the nodes
(spreading out each app
  // among all the nodes) instead of trying to consolidate each app onto a
small # of nodes.
  private val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut",
true)

Cheers

On Tue, Aug 4, 2015 at 4:13 AM, Igor Berman  wrote:

> sorry, can't disclose info about my prod cluster
>
> nothing jumps into my mind regarding your config
> we don't use lz4 compression, don't know what is spark.deploy.spreadOut(there
> is no documentation regarding this)
>
> If you are sure that you don't have memory leak in your business logic I
> would try to reset each property to default(or just remove it from your
> config) and try to run your job to see if it's not
> somehow connected
>
> my config(nothing special really)
> spark.shuffle.consolidateFiles true
> spark.speculation false
> spark.executor.extraJavaOptions -XX:+UseStringCache
> -XX:+UseCompressedStrings -XX:+PrintGC -XX:+PrintGCDetails
> -XX:+PrintGCTimeStamps -Xloggc:gc.log -verbose:gc
> spark.executor.logs.rolling.maxRetainedFiles 1000
> spark.executor.logs.rolling.strategy time
> spark.worker.cleanup.enabled true
> spark.logConf true
> spark.rdd.compress true
>
>
>
>
>
> On 4 August 2015 at 12:59, Sea <261810...@qq.com> wrote:
>
>> How much machines are there in your standalone cluster?
>> I am not using tachyon.
>>
>> GC can not help me... Can anyone help ?
>>
>> my configuration:
>>
>> spark.deploy.spreadOut false
>> spark.eventLog.enabled true
>> spark.executor.cores 24
>>
>> spark.ui.retainedJobs 10
>> spark.ui.retainedStages 10
>> spark.history.retainedApplications 5
>> spark.deploy.retainedApplications 10
>> spark.deploy.retainedDrivers  10
>> spark.streaming.ui.retainedBatches 10
>> spark.sql.thriftserver.ui.retainedSessions 10
>> spark.sql.thriftserver.ui.retainedStatements 100
>>
>> spark.file.transferTo false
>> spark.driver.maxResultSize 4g
>> spark.sql.hive.metastore.jars=/spark/spark-1.4.1/hive/*
>>
>> spark.eventLog.dirhdfs://mycluster/user/spark/historylog
>> spark.history.fs.logDirectory hdfs://mycluster/user/spark/historylog
>>
>> spark.driver.extraClassPath=/spark/spark-1.4.1/extlib/*
>> spark.executor.extraClassPath=/spark/spark-1.4.1/extlib/*
>>
>> spark.sql.parquet.binaryAsString true
>> spark.serializerorg.apache.spark.serializer.KryoSerializer
>> spark.kryoserializer.buffer 32
>> spark.kryoserializer.buffer.max 256
>> spark.shuffle.consolidateFiles true
>> spark.io.compression.codec org.apache.spark.io.LZ4CompressionCodec
>>
>>
>>
>>
>>
>> -- 原始邮件 --
>> *发件人:* "Igor Berman";;
>> *发送时间:* 2015年8月3日(星期一) 晚上7:56
>> *收件人:* "Sea"<261810...@qq.com>;
>> *抄送:* "Barak Gitsis"; "Ted Yu";
>> "user@spark.apache.org"; "rxin"<
>> r...@databricks.com>; "joshrosen"; "davies"<
>> dav...@databricks.com>;
>> *主题:* Re: About memory leak in spark 1.4.1
>>
>> in general, what is your configuration? use --conf "spark.logConf=true"
>>
>> we have 1.4.1 in production standalone cluster and haven't experienced
>> what you are describing
>> can you verify in web-ui that indeed spark got your 50g per executor
>> limit? I mean in configuration page..
>>
>> might be you are using offheap storage(Tachyon)?
>>
>>
>> On 3 August 2015 at 04:58, Sea <261810...@qq.com> wrote:
>>
>>> "spark uses a lot more than heap memory, it is the expected behavior."
>>>  It didn't exist in spark 1.3.x
>>> What does "a lot more than" means?  It means that I lose control of it!
>>> I try to  apply 31g, but it still grows to 55g and continues to grow!!!
>>> That is the point!
>>> I have tried set memoryFraction to 0.2,but it didn't help.
>>> I don't know whether it will still exist in the next release 1.5, I wish
>>> not.
>>>
>>>
>>>
>>> -- 原始邮件 --
>>> *发件人:* "Barak Gitsis";;
>>> *发送时间:* 2015年8月2日(星期天) 晚上9:55
>>> *收件人:* "Sea"<261810...@qq.com>; "Ted Yu";
>>> *抄送:* "user@spark.apache.org"; "rxin"<
>>> r...@databricks.com>; "joshrosen"; "davies"<
>>> dav...@databricks.com>;
>>> *主题:* Re: About memory leak in spark 1.4.1
>>>
>>> spark uses a lot more than heap memory, it is the expected behavior.
>>> in 1.4 off-heap memory usage is supposed to grow in comparison to 1.3
>>>
>>> Better use as little memory as you can for heap, and since you are not
>>> utilizing it already, it is safe for you to reduce it.
>>> memoryFraction helps you optimize heap usage for your data/application
>>> profile while keeping it tight.
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Sun, Aug 2, 2015 at 12:54 PM Sea <261810...@qq.com> wrote:
>>>
 spark.storage.memoryFraction is in heap memory, but my situation is
 that the memory is more than heap memory !

 Anyone else use spark 1.4.1 in production?


 -- 原始邮件 --
 *发件人:* "Ted Yu";;
 

Re: Does RDD.cartesian involve shuffling?

2015-08-04 Thread Richard Marscher
Yes it does, in fact it's probably going to be one of the more expensive
shuffles you could trigger.

On Mon, Aug 3, 2015 at 12:56 PM, Meihua Wu 
wrote:

> Does RDD.cartesian involve shuffling?
>
> Thanks!
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com  | Our Blog
 | Twitter  |
Facebook  | LinkedIn



Re: Topology.py -- Cannot run on Spark Gateway on Cloudera 5.4.4.

2015-08-04 Thread Upen N
Hi Guru,
It was a no brainer issue. I had to create HDFS user ec2-user to make it
work. It worked like a charm after that.

Thanks
Upender

On Mon, Aug 3, 2015 at 10:27 PM, Guru Medasani  wrote:

> Hi Upen,
>
> Did you deploy the client configs after assigning the gateway roles? You
> should be able to do this from Cloudera Manager.
>
> Can you try this and let us know what you see when you run spark-shell?
>
> Guru Medasani
> gdm...@gmail.com
>
>
>
> On Aug 3, 2015, at 9:10 PM, Upen N  wrote:
>
> Hi,
> I recently installed Cloudera CDH 5.4.4. Sparks comes shipped with this
> version. I created Spark gateways. But I get the following error when run
> Spark shell from the gateway. Does anyone have any similar experience ? If
> so, please share the solution. Google shows to copy the Conf files from
> data nodes to gateway nodes. But I highly doubt if that is the right fix.
>
> Thanks
> Upender
>
> etc/hadoop/conf.cloudera.yarn/topology.py
> java.io.IOException: Cannot run program
> "/etc/hadoop/conf.cloudera.yarn/topology.py"
>
>
>


Re: How to increase parallelism of a Spark cluster?

2015-08-04 Thread Richard Marscher
I think you did a good job of summarizing terminology and describing
spark's operation. However #7 is inaccurate if I am interpreting correctly.
The scheduler schedules X tasks from the current stage across all
executors, where X is the the number of cores assigned to the application
(assuming only this stage is running). `resourceOfferSingleTaskSet` in
TaskSchedulerImpl gives an idea of how it's launching tasks from a stage's
task set based on the current available cores across all executors:
https://github.com/apache/spark/blob/branch-1.3/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L220.
This is what I have observed in all of our Spark Standalone clusters. In
fact I just ran a job against my laptop "cluster" of 1 executor with 8
partitions in a stage. I have my `spark.cores.max` set to 4 and it ran 4
tasks concurrently, running new tasks after a previous one finished.

Also, #8 is only true if you setup fair scheduling at the sub-job level
(there are two kinds of fair scheduling that I've seen, intra-job and
sub-job. `spark.scheduler.mode` will set intra-job but you need to provide
a configuration file to `spark.scheduler.pool` to get sub-job fair
scheduling).


On Mon, Aug 3, 2015 at 4:16 PM, Ajay Singal  wrote:

> Hi Sujit,
>
>
>
> From experimenting with Spark (and other documentation), my understanding
> is as follows:
>
> 1.   Each application consists of one or more Jobs
>
> 2.   Each Job has one or more Stages
>
> 3.   Each Stage creates one or more Tasks (normally, one Task per
> Partition)
>
> 4.   Master allocates one Executor per Worker (that contains
> Partition) per Application
>
> 5.   The Executor stays up for the lifetime of the Application (and
> dies when the Application ends)
>
> 6.   Each Executor can run multiple Tasks in parallel (normally, the
> parallelism depends on the number of cores per Executor).
>
> 7.   The Scheduler schedules only one Task from each Stage to one
> Executor.
>
> 8.   If there are multiple Stages (from a Job) and these Stages could
> be run asynchronously (i.e., in parallel), one Task from each Stage could
> be scheduled on the same Executor (thus this Executor runs multiple Tasks
> in parallel: see #6 above).
>
>
>
> Of course, there could be many exception/exclusions to what I explained
> above.  I expect that Spark community will confirm or correct my
> observations/understanding above.
>
>
>
> Now, let’s come back to your situation.  You have a cluster of 4 Workers
> with 10 Partitions.  All of these 10 Partitions are distributed among these
> 4 Workers.  Also, from the information provided by you, your Application
> has just one Job with a two Stages (repartition and mapPartition).  The
> mapPartition Stage will have 10 Tasks.  Assuming my
> observations/understanding is correct, by virtue of #7 above, only 4 Tasks
> can be executed in parallel.  The subsequent Jobs will have to wait.
>
>
>
> However, if you had 10 or more Workers, all Tasks would have been executed
> in parallel.  BTW, I believe, you can have multiple Workers on one Physical
> Node.  So, one of the solution to your problem would be to increase the
> number of Workers.
>
>
>
> Having said so, I believe #7 above is the bottleneck.  If there is no good
> reason for keeping this bottleneck, this could be a good area of
> improvement (and needs to be addressed by Spark community).  I will wait
> for the community response, and if needed, I will open a JIRA item.
>
>
>
> I hope it helps.
>
>
>
> Regards,
>
> Ajay
>
> On Mon, Aug 3, 2015 at 1:16 PM, Sujit Pal  wrote:
>
>> @Silvio: the mapPartitions instantiates a HttpSolrServer, then for each
>> query string in the partition, sends the query to Solr using SolrJ, and
>> gets back the top N results. It then reformats the result data into one
>> long string and returns the key value pair as (query string, result string).
>>
>> @Igor: Thanks for the parameter suggestions. I will check the
>> --num-executors and if there is a way to set the number of cores/executor
>> with my Databricks admin and update here if I find it, but from the
>> Databricks console, it appears that the number of executors per box is 1.
>> This seems normal though, per the diagram on this page:
>>
>> http://spark.apache.org/docs/latest/cluster-overview.html
>>
>> where it seems that there is 1 executor per box, and each executor can
>> spawn multiple threads to take care of multiple tasks (see bullet #1 copied
>> below).
>>
>>> Each application gets its own executor processes, which stay up for the
>>> duration of the whole application and run tasks in multiple threads. This
>>> has the benefit of isolating applications from each other, on both the
>>> scheduling side (each driver schedules its own tasks) and executor side
>>> (tasks from different applications run in different JVMs).
>>
>>
>> Regarding hitting the max number of requests, thanks for the link. I am
>> using the default client. Just peeked at the Solr

Re: Does RDD.cartesian involve shuffling?

2015-08-04 Thread Meihua Wu
Thanks, Richard!

I basically have two RDD's: A and B; and I need to compute a value for
every pair of (a, b) for a in A and b in B. My first thought is
cartesian, but involves expensive shuffle.

Any alternatives? How about I convert B to an array and broadcast it
to every node (assuming B is relative small to fit)?



On Tue, Aug 4, 2015 at 8:23 AM, Richard Marscher
 wrote:
> Yes it does, in fact it's probably going to be one of the more expensive
> shuffles you could trigger.
>
> On Mon, Aug 3, 2015 at 12:56 PM, Meihua Wu 
> wrote:
>>
>> Does RDD.cartesian involve shuffling?
>>
>> Thanks!
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>
>
>
> --
> Richard Marscher
> Software Engineer
> Localytics
> Localytics.com | Our Blog | Twitter | Facebook | LinkedIn

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Does RDD.cartesian involve shuffling?

2015-08-04 Thread Richard Marscher
That is the only alternative I'm aware of, if either A or B are small
enough to broadcast then you'd at least be done cartesian products all
locally without needing to also transmit and shuffle A. Unless spark
somehow optimizes cartesian product and only transfers the smaller RDD
across the network in the shuffle but I don't have reason to believe that's
true.

I'd try the cartesian first if you haven't tried at all, just to make sure
it actually is too slow before getting tricky with the broadcast.

On Tue, Aug 4, 2015 at 12:25 PM, Meihua Wu 
wrote:

> Thanks, Richard!
>
> I basically have two RDD's: A and B; and I need to compute a value for
> every pair of (a, b) for a in A and b in B. My first thought is
> cartesian, but involves expensive shuffle.
>
> Any alternatives? How about I convert B to an array and broadcast it
> to every node (assuming B is relative small to fit)?
>
>
>
> On Tue, Aug 4, 2015 at 8:23 AM, Richard Marscher
>  wrote:
> > Yes it does, in fact it's probably going to be one of the more expensive
> > shuffles you could trigger.
> >
> > On Mon, Aug 3, 2015 at 12:56 PM, Meihua Wu  >
> > wrote:
> >>
> >> Does RDD.cartesian involve shuffling?
> >>
> >> Thanks!
> >>
> >> -
> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> For additional commands, e-mail: user-h...@spark.apache.org
> >>
> >
> >
> >
> > --
> > Richard Marscher
> > Software Engineer
> > Localytics
> > Localytics.com | Our Blog | Twitter | Facebook | LinkedIn
>



-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com  | Our Blog
 | Twitter  |
Facebook  | LinkedIn



No Twitter Input from Kafka to Spark Streaming

2015-08-04 Thread narendra
My application takes Twitter4j tweets and publishes those to a topic in
Kafka. Spark Streaming subscribes to that topic for processing. But in
actual, Spark Streaming is not able to receive tweet data from Kafka so
Spark Streaming is running empty batch jobs with out input and I am not able
to see any output from Spark Streaming.

The code of the application is - 

import java.util.HashMap
import java.util.Properties
import twitter4j._
import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.conf.ConfigurationBuilder;
import twitter4j.json.DataObjectFactory;
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka._
import kafka.javaapi.producer.Producer
import kafka.producer.{KeyedMessage, ProducerConfig}
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ 

object TwitterPopularTags {
def main(args: Array[String]) {

/** Information necessary for accessing the Twitter API */
val consumerKey= "2AgtQfH8rlyUDyfjwPOCDosEQ"
val consumerSecret=
"vnG8uoaan4gPmoy1rFMbz3i19396jODwGRLRqsHBPTwSlMcUIl"
val accessToken=
"33807905-2vYZMjZyRjFJQrrkPVQwmiQcZCnag6m2wKujpiu4e"
val accessTokenSecret =
"X880Iq3YseBsAs3e8ZoHSOaDnN431dWJ6QpeMJO6VVAzm"
val cb = new ConfigurationBuilder()
cb.setOAuthConsumerKey(consumerKey)
cb.setOAuthConsumerSecret(consumerSecret)
cb.setOAuthAccessToken(accessToken)
cb.setOAuthAccessTokenSecret(accessTokenSecret)
cb.setJSONStoreEnabled(true)
cb.setIncludeEntitiesEnabled(true)
val twitterStream = new
TwitterStreamFactory(cb.build()).getInstance()  

val KafkaTopic = "LiveTweets"
/* kafka producer properties */
val kafkaProducer = {
val props = new Properties()
props.put("metadata.broker.list",
"broker2:9092,localhost:9092")
props.put("serializer.class",
"kafka.serializer.StringEncoder")
props.put("request.required.acks", "1")
val config = new ProducerConfig(props)
new Producer[String, String](config)
 }

/* Invoked when a new tweet comes */
val listener = new StatusListener() { 

   override def onStatus(status: Status): Unit = {
   val msg = new KeyedMessage[String,
String](KafkaTopic,DataObjectFactory.getRawJSON(status))
   kafkaProducer.send(msg)
  }
   override def onException(ex: Exception): Unit = throw ex

  // no-op for the following events
  override def onStallWarning(warning: StallWarning): Unit =
{}
  override def onDeletionNotice(statusDeletionNotice:
StatusDeletionNotice): Unit = {}
  override def onScrubGeo(userId: Long, upToStatusId: Long):
Unit = {}
  override def
onTrackLimitationNotice(numberOfLimitedStatuses: Int): Unit = {}
}

twitterStream.addListener(listener)
// Create Spark Streaming context
val sparkConf = new SparkConf().setAppName("Twitter-Kafka-Spark
Streaming")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(2))

// Define the Kafka parameters, broker list must be specified
val kafkaParams = Map("metadata.broker.list" ->
"broker2:9092,localhost:9092")
val topics = Set(KafkaTopic)

// Create the direct stream with the Kafka parameters and topics
val kafkaStream = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc,kafkaParams,topics)
val lines = kafkaStream.map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()

  }
}

Spark Streaming web UI - 

 

 


Thank you.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/No-Twitter-Input-from-Kafka-to-Spark-Streaming-tp24131.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: No Twitter Input from Kafka to Spark Streaming

2015-08-04 Thread Cody Koeninger
Have you tried using the console consumer to see if anything is actually
getting published to that topic?

On Tue, Aug 4, 2015 at 11:45 AM, narendra  wrote:

> My application takes Twitter4j tweets and publishes those to a topic in
> Kafka. Spark Streaming subscribes to that topic for processing. But in
> actual, Spark Streaming is not able to receive tweet data from Kafka so
> Spark Streaming is running empty batch jobs with out input and I am not
> able
> to see any output from Spark Streaming.
>
> The code of the application is -
>
> import java.util.HashMap
> import java.util.Properties
> import twitter4j._
> import twitter4j.FilterQuery;
> import twitter4j.StallWarning;
> import twitter4j.Status;
> import twitter4j.StatusDeletionNotice;
> import twitter4j.StatusListener;
> import twitter4j.TwitterStream;
> import twitter4j.TwitterStreamFactory;
> import twitter4j.conf.ConfigurationBuilder;
> import twitter4j.json.DataObjectFactory;
> import kafka.serializer.StringDecoder
> import org.apache.spark.streaming.kafka._
> import kafka.javaapi.producer.Producer
> import kafka.producer.{KeyedMessage, ProducerConfig}
> import org.apache.spark._
> import org.apache.spark.streaming._
> import org.apache.spark.streaming.StreamingContext._
>
> object TwitterPopularTags {
> def main(args: Array[String]) {
>
> /** Information necessary for accessing the Twitter API */
> val consumerKey= "2AgtQfH8rlyUDyfjwPOCDosEQ"
> val consumerSecret=
> "vnG8uoaan4gPmoy1rFMbz3i19396jODwGRLRqsHBPTwSlMcUIl"
> val accessToken=
> "33807905-2vYZMjZyRjFJQrrkPVQwmiQcZCnag6m2wKujpiu4e"
> val accessTokenSecret =
> "X880Iq3YseBsAs3e8ZoHSOaDnN431dWJ6QpeMJO6VVAzm"
> val cb = new ConfigurationBuilder()
> cb.setOAuthConsumerKey(consumerKey)
> cb.setOAuthConsumerSecret(consumerSecret)
> cb.setOAuthAccessToken(accessToken)
> cb.setOAuthAccessTokenSecret(accessTokenSecret)
> cb.setJSONStoreEnabled(true)
> cb.setIncludeEntitiesEnabled(true)
> val twitterStream = new
> TwitterStreamFactory(cb.build()).getInstance()
>
> val KafkaTopic = "LiveTweets"
> /* kafka producer properties */
> val kafkaProducer = {
> val props = new Properties()
> props.put("metadata.broker.list",
> "broker2:9092,localhost:9092")
> props.put("serializer.class",
> "kafka.serializer.StringEncoder")
> props.put("request.required.acks", "1")
> val config = new ProducerConfig(props)
> new Producer[String, String](config)
>  }
>
> /* Invoked when a new tweet comes */
> val listener = new StatusListener() {
>
>override def onStatus(status: Status): Unit = {
>val msg = new KeyedMessage[String,
> String](KafkaTopic,DataObjectFactory.getRawJSON(status))
>kafkaProducer.send(msg)
>   }
>override def onException(ex: Exception): Unit = throw ex
>
>   // no-op for the following events
>   override def onStallWarning(warning: StallWarning): Unit
> =
> {}
>   override def onDeletionNotice(statusDeletionNotice:
> StatusDeletionNotice): Unit = {}
>   override def onScrubGeo(userId: Long, upToStatusId:
> Long):
> Unit = {}
>   override def
> onTrackLimitationNotice(numberOfLimitedStatuses: Int): Unit = {}
> }
>
> twitterStream.addListener(listener)
> // Create Spark Streaming context
> val sparkConf = new SparkConf().setAppName("Twitter-Kafka-Spark
> Streaming")
> val sc = new SparkContext(sparkConf)
> val ssc = new StreamingContext(sc, Seconds(2))
>
> // Define the Kafka parameters, broker list must be specified
> val kafkaParams = Map("metadata.broker.list" ->
> "broker2:9092,localhost:9092")
> val topics = Set(KafkaTopic)
>
> // Create the direct stream with the Kafka parameters and topics
> val kafkaStream = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc,kafkaParams,topics)
> val lines = kafkaStream.map(_._2)
> val words = lines.flatMap(_.split(" "))
> val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
> wordCounts.print()
> ssc.start()
> ssc.awaitTermination()
>
>   }
> }
>
> Spark Streaming web UI -
>
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n24131/streaming.png
> >
>
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n24131/sparkjobs.png
> >
>
>
> Thank you.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/No-Twitter-Input-from-Kafka-to-Spark-Streaming-tp24131.html
> Sent from the Apache Spark User List mailing lis

Re: Spark SQL support for Hive 0.14

2015-08-04 Thread Steve Loughran
Spark 1.3.1 & 1.4 only support Hive 0.13

Spark 1.5 is going to be released against Hive 1.2.1; it'll skip Hive .14 
support entirely and go straight to the currently supported Hive release.

See SPARK-8064 for the gory details

> On 3 Aug 2015, at 23:01, Ishwardeep Singh  
> wrote:
> 
> Hi,
> 
> Does spark SQL support Hive 0.14? The documentation refers to Hive 0.13. Is
> there a way to compile spark with Hive 0.14?
> 
> Currently we are using Spark 1.3.1.
> 
> Thanks 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-support-for-Hive-0-14-tp24122.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Unable to load native-hadoop library for your platform

2015-08-04 Thread Steve Loughran
Think it may be needed on Windows, certainly if you start trying to work with 
local files. 


> On 4 Aug 2015, at 00:34, Sean Owen  wrote:
> 
> It won't affect you if you're not actually running Hadoop. But it's
> mainly things like Snappy/LZO compression which are implemented as
> native libraries under the hood.

There's a lot more in those native libs, primarily to bypass bits missing from 
the java APIs (FS permissions) and to add new features (encryption, soon 
erasure coding).

The Hadoop file:// FS uses it on windows, at least for now

> Spark doesn't necessarily use these
> anyway; it's from the Hadoop libs.
> 
> On Tue, Aug 4, 2015 at 8:30 AM, Deepesh Maheshwari
>  wrote:
>> Can you elaborate about the things this native library covering.
>> One you mentioned accelerated compression.
>> 
>> It would be very helpful if you can give any useful to link to read more
>> about it.
>> 
>> On Tue, Aug 4, 2015 at 12:56 PM, Sean Owen  wrote:
>>> 
>>> You can ignore it entirely. It just means you haven't installed and
>>> configured native libraries for things like accelerated compression,
>>> but it has no negative impact otherwise.
>>> 
>>> On Tue, Aug 4, 2015 at 8:11 AM, Deepesh Maheshwari
>>>  wrote:
 Hi,
 
 When i run the spark locally on windows it gives below hadoop library
 error.
 I am using below spark version.
 
 
org.apache.spark
spark-core_2.10
1.4.1

 
 
 2015-08-04 12:22:23,463  WARN
 (org.apache.hadoop.util.NativeCodeLoader:62) -
 Unable to load native-hadoop library for your platform... using
 builtin-java
 classes where applicable
 
 Tried to find it on internet but not able to find exact root cause.
 Please let me know what is it, why it is giving warning and how can i
 resolve it.
 
 Thanks,
 Deepesh
>> 
>> 
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Extremely poor predictive performance with RF in mllib

2015-08-04 Thread Patrick Lam
Yes, I rechecked and the label is correct. As you can see in the code
posted, I used the exact same features for all the classifiers so unless rf
somehow switches the labels, it should be correct.

I have posted a sample dataset and sample code to reproduce what I'm
getting here:

https://github.com/pkphlam/spark_rfpredict

On Tue, Aug 4, 2015 at 6:42 AM, Yanbo Liang  wrote:

> It looks like the predicted result just opposite with expectation, so
> could you check whether the label is right?
> Or could you share several data which can help to reproduce this output?
>
> 2015-08-03 19:36 GMT+08:00 Barak Gitsis :
>
>> hi,
>> I've run into some poor RF behavior, although not as pronounced as you..
>> would be great to get more insight into this one
>>
>> Thanks!
>>
>> On Mon, Aug 3, 2015 at 8:21 AM pkphlam  wrote:
>>
>>> Hi,
>>>
>>> This might be a long shot, but has anybody run into very poor predictive
>>> performance using RandomForest with Mllib? Here is what I'm doing:
>>>
>>> - Spark 1.4.1 with PySpark
>>> - Python 3.4.2
>>> - ~30,000 Tweets of text
>>> - 12289 1s and 15956 0s
>>> - Whitespace tokenization and then hashing trick for feature selection
>>> using
>>> 10,000 features
>>> - Run RF with 100 trees and maxDepth of 4 and then predict using the
>>> features from all the 1s observations.
>>>
>>> So in theory, I should get predictions of close to 12289 1s (especially
>>> if
>>> the model overfits). But I'm getting exactly 0 1s, which sounds
>>> ludicrous to
>>> me and makes me suspect something is wrong with my code or I'm missing
>>> something. I notice similar behavior (although not as extreme) if I play
>>> around with the settings. But I'm getting normal behavior with other
>>> classifiers, so I don't think it's my setup that's the problem.
>>>
>>> For example:
>>>
>>> >>> lrm = LogisticRegressionWithSGD.train(lp, iterations=10)
>>> >>> logit_predict = lrm.predict(predict_feat)
>>> >>> logit_predict.sum()
>>> 9077
>>>
>>> >>> nb = NaiveBayes.train(lp)
>>> >>> nb_predict = nb.predict(predict_feat)
>>> >>> nb_predict.sum()
>>> 10287.0
>>>
>>> >>> rf = RandomForest.trainClassifier(lp, numClasses=2,
>>> >>> categoricalFeaturesInfo={}, numTrees=100, seed=422)
>>> >>> rf_predict = rf.predict(predict_feat)
>>> >>> rf_predict.sum()
>>> 0.0
>>>
>>> This code was all run back to back so I didn't change anything in
>>> between.
>>> Does anybody have a possible explanation for this?
>>>
>>> Thanks!
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Extremely-poor-predictive-performance-with-RF-in-mllib-tp24112.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>> --
>> *-Barak*
>>
>
>


-- 
Patrick Lam
Institute for Quantitative Social Science, Harvard University
http://www.patricklam.org


Re: Transform MongoDB Aggregation into Spark Job

2015-08-04 Thread Jörn Franke
Hi,

I think the combination of Mongodb and Spark is a little bit unlucky.

Why don't you simply use mongodb?

If you want to process a lot of data you should use hdfs or cassandra as
storage. Mongodb is not suitable for heterogeneous processing of  large
scale data.

Best regards

Best regards,

Le mar. 4 août 2015 à 11:19, Deepesh Maheshwari <
deepesh.maheshwar...@gmail.com> a écrit :

> Hi,
> I am new to Apache Spark and exploring spark+kafka intergration to process
> data using spark which i did earlier in MongoDB Aggregation.
>
> I am not able to figure out to handle my use case.
>
> Mongo Document :
> {
> "_id" : ObjectId("55bfb3285e90ecbfe37b25c3"),
> "url" : "
> http://www.z.com/new_car_search.php?bycity=Mumbai&sortfield=price&sortdirection=desc
> ",
> "ip" : "27.5.107.65",
> "pgDownloadTime" : NumberLong(2526),
> "agentType" : "Mozilla/5.0 (Linux; Android 5.1.1; Nexus 5
> Build/LMY48B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/43.0.2357.93
> Mobile Safari/537.36",
> "referer" : "z.com",
> "domain" : "z.com",
> "channel" : "z",
> "host" : "z.com",
> "pgAccessTime" : NumberLong("1438626600021"),
> "pgAccessMin" : NumberLong(1438626600),
> "pgAccessHour" : NumberLong(1438626600),
> "p5Min" : NumberLong(1438626600),
> "contentType" : 1,
> "articleId" : "4b1ad5357a6987bbc611ff92dcf9cb50",
> "location" : 1,
> "action" : 1,
> "cat" : "Home",
> "subcat" : [
> ""
> ],
> "tags" : [
> ""
> ],
> "catIds" : [
> "Home"
> ],
> "catIdHash" : NumberLong("7115745069349994427"),
> "isIndia" : 1,
> "geoLocation" : "Mumbai",
> "publishTime" : NumberLong(0),
> "author" : "",
> "pagePosition" : "",
> "group" : 0,
> "ssoId" : null,
> "isAPP" : 0,
> "sessionId" : "17a95722-5a48-459f-afd8-78f7edb84897"
> }
>
> I am putting data in kafka in above json format.
> Now, when i am reading it in  spark and i need group above document using
> two keys and get the total count of that key and put it in mongo along with
> the whole document details.
>
> Mongo Aggregation Job :
>
> {
> "$match": {
> "pgAccessMin": {
> "$gte": 1438679100,
> "$lt": 1438679400
> }
> }
> },
> {
> "$project": {
> "articleId": 1,
> "host": 1,
> "isAPP": 1,
> "cat": 1,
> "subcat": 1,
> "publishTime": 1,
> "channel": 1,
> "author": 1,
> "tags": 1,
> "url": 1,
> "catIds": 1,
> "catIdHash": 1,
> "count": 1,
> "contentType": 1,
> "_id": 0
> }
> },
> {
> "$group": {
> "_id": {
> "host": "$host",
> "isAPP": "$isAPP",
> "articleId": "$articleId"
> },
> "count": {
> "$sum": 1
> },
> "url": {
> "$first": "$url"
> },
> "subcat": {
> "$first": "$subcat"
> },
> "cat": {
> "$first": "$cat"
> },
> "publishTime": {
> "$first": "$publishTime"
> },
> "channel": {
> "$first": "$channel"
> },
> "author": {
> "$first": "$author"
> },
> "tags": {
> "$first": "$tags"
> },
> "catIdHash": {
> "$first": "$catIdHash"
> },
> "catIds": {
> "$first": "$catIds"
> },
> "contentType": {
> "$first": "$contentType"
> }
> }
> }
>
> Please suggest how to write this equivalent job in  spark so that i can
> get the view count along with other fields and save it in mongo.
>
> Regards,
> Deepesh
>


Re: Repartition question

2015-08-04 Thread Richard Marscher
Hi,

it is possible to control the number of partitions for the RDD without
calling repartition by setting the max split size for the hadoop input
format used. Tracing through the code, XmlInputFormat extends
FileInputFormat which determines the number of splits (which NewHadoopRdd
uses to determine number of partitions:
https://github.com/apache/spark/blob/branch-1.3/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L95)
with a few configs:
https://github.com/apache/hadoop/blob/branch-2.3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java#L200
.

public static final String SPLIT_MAXSIZE =
>
>
> "mapreduce.input.fileinputformat.split.maxsize";
>
>
> public static final String SPLIT_MINSIZE =
"mapreduce.input.fileinputformat.split.minsize";
If you are setting SparkConf fields, prefix the keys with spark.hadoop and
they will end up on the Hadoop conf used for the above values.

On Tue, Aug 4, 2015 at 12:31 AM, Naveen Madhire 
wrote:

> Hi All,
>
> I am running the WikiPedia parsing example present in the "Advance
> Analytics with Spark" book.
>
>
> https://github.com/sryza/aas/blob/d3f62ef3ed43a59140f4ae8afbe2ef81fc643ef2/ch06-lsa/src/main/scala/com/cloudera/datascience/lsa/ParseWikipedia.scala#l112
>
>
> The partitions of the RDD returned by the readFile function (mentioned
> above) is of 32MB size. So if my file size is 100 MB, RDD is getting
> created with 4 partitions with approx 32MB  size.
>
>
> I am running this in a standalone spark cluster mode, every thing is
> working fine only little confused about the nbr of partitions and the size.
>
> I want to increase the nbr of partitions for the RDD to make use of the
> cluster. Is calling repartition() after this the only option or can I pass
> something in the above method to have more partitions of the RDD.
>
> Please let me know.
>
> Thanks.
>



-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com  | Our Blog
 | Twitter  |
Facebook  | LinkedIn



Re: How does the # of tasks affect # of threads?

2015-08-04 Thread Elkhan Dadashov
Hi Connor,

Spark creates cached thread pool in Executor

for executing the tasks:

// Start worker thread pool
*private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor
task launch worker")*

and if we see org.apache.spark.util.ThreadUtils class, then we can see that
number of threads for cached thread pool is not limited, so it can grow
until *INTEGER.MAX_VALUE*

 /**
   * Wrapper over newCachedThreadPool. Thread names are formatted as
prefix-ID, where ID is a
   * unique, sequentially assigned integer.
   */
  def *newDaemonCachedThreadPool*(prefix: String): ThreadPoolExecutor = {
val threadFactory = namedThreadFactory(prefix)

Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
  }

And in java.util.concurrent.Executors class :

/**
 * Creates a thread pool that creates new threads as needed, but
 * will reuse previously constructed threads when they are
 * available, and uses the provided
 * ThreadFactory to create new threads when needed.
 * @param threadFactory the factory to use when creating new threads
 * @return the newly created thread pool
 * @throws NullPointerException if threadFactory is null
 */
public static ExecutorService *newCachedThreadPool*(ThreadFactory
threadFactory) {
return new ThreadPoolExecutor(0, *Integer.MAX_VALUE*,
  60L, TimeUnit.SECONDS,
  new SynchronousQueue(),
  threadFactory);
}

So if there are lots of tasks to be launched at the same time, then the new
thread can potentially grow until INTEGER.MAX_VALUE. But in reality as soon
as tasks are finished, their threads will be returned back to cached pool,
and will be reused for new tasks, instead of creating new thread for each
task.

Now let's see why OutOfMemory occurs due to lots of new threads created.

OutofMemory error occurs usually when your executor/driver process does not
have enough memory for allocating new native threads for executing tasks.

With the help of this command you can see how many threads gets created
while executing your spark job:

*ps -u  -L | wc -l *

(in my case basic KMEANS ML algorithm Spark job creates 400+ threads)

with this command you can see thread limit set for your machine/OS which
you can also increase:

*ulimit -u*

or more detailed:

*ulimit -a*


Here is

the logic how memory gets used by each new created thread in executor:
"The number of threads that can run within a JVM process is generally
limited by the address space for that process. Each thread requires a
thread stack. The more threads you have, the more process address space you
use. The more address space you use for thread stacks, the less you have
for the Java heap."

You can tune thread stack size

:
-Xss determines the size of the stack: –Xss1024k. If the stack space is too
small, eventually you will see an exception class
java.lang.StackOverflowError.

--
Elkhan

On Sat, Aug 1, 2015 at 2:38 PM, Connor Zanin  wrote:

> 1. I believe that the default memory (per executor) is 512m (from the
> documentation)
> 2. I have increased the memory used by spark on workers in my launch
> script when submitting the job
>(--executor-memory 124g)
> 3. The job completes successfully, it is the "road bumps" in the middle I
> am concerned with
>
> I would like insight into how Spark handle thread creation
>
> On Sat, Aug 1, 2015 at 5:33 PM, Fabrice Sznajderman 
> wrote:
>
>> Hello,
>>
>> I am not an expert with Spark, but the error thrown by spark seems
>> indicate that not enough memory for launching job. By default, Spark
>> allocated 1GB for memory, may be you should increase it ?
>>
>> Best regards
>>
>> Fabrice
>>
>> Le sam. 1 août 2015 à 22:51, Connor Zanin  a écrit :
>>
>>> Hello,
>>>
>>> I am having an issue when I run a word count job. I have included the
>>> source and log files for reference. The job finishes successfully, but
>>> about halfway through I get a java.lang.OutOfMemoryError (could not create
>>> native thread), and this leads to the loss of the Executor. After some
>>> searching I found out this was a problem with the environment and the limit
>>> by the OS on how many threads I could spawn.
>>>
>>> However, I had thought that Spark only maintained a thread pool equal in
>>> size to the number of cores available across the nodes (by default), and
>>> schedules tasks dynamically as threads become available. The only Spark
>>> parameter I change is the number of partitions in my RDD.
>>>
>>> My question is, how is Spark deciding how many threads to spawn and when?
>>>
>>> --
>>> Regards,
>>>
>>> Connor Zanin
>>> Computer Science
>>> University of De

Re: Spark SQL support for Hive 0.14

2015-08-04 Thread Michael Armbrust
I'll add that while Spark SQL 1.5 compiles against Hive 1.2.1, it has
support for reading from metastores for Hive 0.12 - 1.2.1

On Tue, Aug 4, 2015 at 9:59 AM, Steve Loughran 
wrote:

> Spark 1.3.1 & 1.4 only support Hive 0.13
>
> Spark 1.5 is going to be released against Hive 1.2.1; it'll skip Hive .14
> support entirely and go straight to the currently supported Hive release.
>
> See SPARK-8064 for the gory details
>
> > On 3 Aug 2015, at 23:01, Ishwardeep Singh <
> ishwardeep.si...@impetus.co.in> wrote:
> >
> > Hi,
> >
> > Does spark SQL support Hive 0.14? The documentation refers to Hive 0.13.
> Is
> > there a way to compile spark with Hive 0.14?
> >
> > Currently we are using Spark 1.3.1.
> >
> > Thanks
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-support-for-Hive-0-14-tp24122.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
> >
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Writing streaming data to cassandra creates duplicates

2015-08-04 Thread gaurav sharma
Ideally the 2 messages read from kafka must differ on some parameter
atleast, or else they are logically same

As a solution to your problem, if the message content is same, u cud create
a new field UUID, which might play the role of partition key while
inserting the 2 messages in Cassandra

Msg1 - UUID1, GAURAV, 100
Msg2 - UUID2, PRIYA, 200
Msg3 - UUID1, GAURAV, 100

Now when inserting in Cassandra 3 different rows would be created, pls
note, that even though Msg1, Msg3 have same content, they are inserted as 2
separate rows in Cassandra, since they differ on UUID,which is partition
key in my column family

Regards,
Gaurav
(please excuse spelling mistakes)
Sent from phone
On Aug 4, 2015 4:54 PM, "Gerard Maas"  wrote:

> (removing dev from the to: as not relevant)
>
> it would be good to see some sample data and the cassandra schema to have
> a more concrete idea of the problem space.
>
> Some thoughts: reduceByKey could still be used to 'pick' one element.
> example of arbitrarily choosing the first one: reduceByKey{case (e1,e2) =>
> e1}
>
> The question to be answered is: what should happen to the multiple values
> that arrive for 1 key?
>
> And why are they creating duplicates in cassandra? if they have the same
> key, they will result in an overwrite (that's not desirable due to
> tombstones anyway)
>
> -kr, Gerard.
>
>
>
> On Tue, Aug 4, 2015 at 1:03 PM, Priya Ch 
> wrote:
>
>>
>>
>>
>> Yes...union would be one solution. I am not doing any aggregation hence
>> reduceByKey would not be useful. If I use groupByKey, messages with same
>> key would be obtained in a partition. But groupByKey is very expensive
>> operation as it involves shuffle operation. My ultimate goal is to write
>> the messages to cassandra. if the messages with same key are handled by
>> different streams...there would be concurrency issues. To resolve this i
>> can union dstreams and apply hash parttioner so that it would bring all the
>> same keys to a single partition or do a groupByKey which does the same.
>>
>> As groupByKey is expensive, is there any work around for this ?
>>
>> On Thu, Jul 30, 2015 at 2:33 PM, Juan Rodríguez Hortalá <
>> juan.rodriguez.hort...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Just my two cents. I understand your problem is that your problem is
>>> that you have messages with the same key in two different dstreams. What I
>>> would do would be making a union of all the dstreams with
>>> StreamingContext.union or several calls to DStream.union, and then I would
>>> create a pair dstream with the primary key as key, and then I'd use
>>> groupByKey or reduceByKey (or combineByKey etc) to combine the messages
>>> with the same primary key.
>>>
>>> Hope that helps.
>>>
>>> Greetings,
>>>
>>> Juan
>>>
>>>
>>> 2015-07-30 10:50 GMT+02:00 Priya Ch :
>>>
 Hi All,

  Can someone throw insights on this ?

 On Wed, Jul 29, 2015 at 8:29 AM, Priya Ch >>> > wrote:

>
>
> Hi TD,
>
>  Thanks for the info. I have the scenario like this.
>
>  I am reading the data from kafka topic. Let's say kafka has 3
> partitions for the topic. In my streaming application, I would configure 3
> receivers with 1 thread each such that they would receive 3 dstreams (from
> 3 partitions of kafka topic) and also I implement partitioner. Now there 
> is
> a possibility of receiving messages with same primary key twice or more,
> one is at the time message is created and other times if there is an 
> update
> to any fields for same message.
>
> If two messages M1 and M2 with same primary key are read by 2
> receivers then even the partitioner in spark would still end up in 
> parallel
> processing as there are altogether in different dstreams. How do we 
> address
> in this situation ?
>
> Thanks,
> Padma Ch
>
> On Tue, Jul 28, 2015 at 12:12 PM, Tathagata Das 
> wrote:
>
>> You have to partition that data on the Spark Streaming by the primary
>> key, and then make sure insert data into Cassandra atomically per key, or
>> per set of keys in the partition. You can use the combination of the 
>> (batch
>> time, and partition Id) of the RDD inside foreachRDD as the unique id for
>> the data you are inserting. This will guard against multiple attempts to
>> run the task that inserts into Cassandra.
>>
>> See
>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#semantics-of-output-operations
>>
>> TD
>>
>> On Sun, Jul 26, 2015 at 11:19 AM, Priya Ch <
>> learnings.chitt...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>>  I have a problem when writing streaming data to cassandra. Or
>>> existing product is on Oracle DB in which while wrtiting data, locks are
>>> maintained such that duplicates in the DB are avoided.
>>>
>>> But as spark has parallel processing architecture, if more than 1
>>> thread is trying to

Spark SQL unable to recognize schema name

2015-08-04 Thread Mohammed Guller
Hi -

I am running the Thrift JDBC/ODBC server (v1.4.1) and encountered a problem 
when querying tables using fully qualified table names(schemaName.tableName). 
The following query works fine from the beeline tool:

SELECT * from test;

However, the following query throws an exception, even though the table “test” 
does exist under the “default” schema:

SELECT * from default.test;

Error: org.apache.spark.sql.AnalysisException: no such table default.test; line 
1 pos 22 (state=,code=0)

Here is the exception trace on the Thrift Server console:

15/08/04 14:27:03 WARN ThriftCLIService: Error executing statement:
org.apache.hive.service.cli.HiveSQLException: 
org.apache.spark.sql.AnalysisException: no such table default.test; line 1 pos 
22
at 
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.run(Shim13.scala:206)
at 
org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementInternal(HiveSessionImpl.java:231)
at 
org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementAsync(HiveSessionImpl.java:218)
at sun.reflect.GeneratedMethodAccessor65.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.hive.service.cli.session.HiveSessionProxy.invoke(HiveSessionProxy.java:79)
at 
org.apache.hive.service.cli.session.HiveSessionProxy.access$000(HiveSessionProxy.java:37)
at 
org.apache.hive.service.cli.session.HiveSessionProxy$1.run(HiveSessionProxy.java:64)
at java.security.AccessController.doPrivileged(Native Method)


Is it a bug in 1.4.1 or am I missing some configuration parameter?

Mohammed



Re: Unable to load native-hadoop library for your platform

2015-08-04 Thread Sean Owen
Oh good point, does the Windows integration need native libs for
POSIX-y file system access? I know there are some binaries shipped for
this purpose but wasn't sure if that's part of what's covered in the
native libs message.

On Tue, Aug 4, 2015 at 6:01 PM, Steve Loughran  wrote:
> Think it may be needed on Windows, certainly if you start trying to work with 
> local files.
>
>
>> On 4 Aug 2015, at 00:34, Sean Owen  wrote:
>>
>> It won't affect you if you're not actually running Hadoop. But it's
>> mainly things like Snappy/LZO compression which are implemented as
>> native libraries under the hood.
>
> There's a lot more in those native libs, primarily to bypass bits missing 
> from the java APIs (FS permissions) and to add new features (encryption, soon 
> erasure coding).
>
> The Hadoop file:// FS uses it on windows, at least for now
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: scheduler delay time

2015-08-04 Thread maxdml
You'd need to provide information such as executor configuration (#cores,
memory size). You might have less scheduler delay with smaller, but more
numerous executors, than the contrary.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/scheduler-delay-time-tp6003p24133.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL unable to recognize schema name

2015-08-04 Thread Ted Yu
This should have been fixed by:
[SPARK-7943] [SPARK-8105] [SPARK-8435] [SPARK-8714] [SPARK-8561] Fixes
multi-database support

The fix is in the upcoming 1.5.0

FYI

On Tue, Aug 4, 2015 at 11:45 AM, Mohammed Guller 
wrote:

> Hi -
>
>
>
> I am running the Thrift JDBC/ODBC server (v1.4.1) and encountered a
> problem when querying tables using fully qualified table
> names(schemaName.tableName). The following query works fine from the
> beeline tool:
>
>
>
> SELECT * from test;
>
>
>
> However, the following query throws an exception, even though the table
> “test” does exist under the “default” schema:
>
>
>
> SELECT * from default.test;
>
>
>
> Error: org.apache.spark.sql.AnalysisException: no such table default.test;
> line 1 pos 22 (state=,code=0)
>
>
>
> Here is the exception trace on the Thrift Server console:
>
>
>
> 15/08/04 14:27:03 WARN ThriftCLIService: Error executing statement:
>
> org.apache.hive.service.cli.HiveSQLException:
> org.apache.spark.sql.AnalysisException: no such table default.test; line 1
> pos 22
>
> at
> org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.run(Shim13.scala:206)
>
> at
> org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementInternal(HiveSessionImpl.java:231)
>
> at
> org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementAsync(HiveSessionImpl.java:218)
>
> at sun.reflect.GeneratedMethodAccessor65.invoke(Unknown Source)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:606)
>
> at
> org.apache.hive.service.cli.session.HiveSessionProxy.invoke(HiveSessionProxy.java:79)
>
> at
> org.apache.hive.service.cli.session.HiveSessionProxy.access$000(HiveSessionProxy.java:37)
>
> at
> org.apache.hive.service.cli.session.HiveSessionProxy$1.run(HiveSessionProxy.java:64)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
>
>
>
>
> Is it a bug in 1.4.1 or am I missing some configuration parameter?
>
>
>
> Mohammed
>
>
>


Re: Problem submiting an script .py against an standalone cluster.

2015-08-04 Thread Ford Farline
The code is very simple, just a couple of lines. When i lanch it runs in
local but not in cluster.

sc = SparkContext("local", "Tech Companies Feedback")

beginning_time = datetime.now()

time.sleep(60)

print datetime.now() - beginning_time

sc.stop()

Thanks for your interest,

Gonzalo



On Fri, Jul 31, 2015 at 4:24 AM, Marcelo Vanzin  wrote:

> Can you share the part of the code in your script where you create the
> SparkContext instance?
>
> On Thu, Jul 30, 2015 at 7:19 PM, fordfarline 
> wrote:
>
>> Hi All,
>>
>> I`m having an issue when lanching an app (python) against a stand alone
>> cluster, but runs in local, as it doesn't reach the cluster.
>> It's the first time i try the cluster, in local works ok.
>>
>> i made this:
>>
>> -> /home/user/Spark/spark-1.3.0-bin-hadoop2.4/sbin/start-all.sh # Master
>> and
>> worker are up in localhost:8080/4040
>> -> /home/user/Spark/spark-1.3.0-bin-hadoop2.4/bin/spark-submit --master
>> spark://localhost:7077 Script.py
>>* The script runs ok but in local :(i can check it in
>> localhost:4040, but i don't see any job in cluster UI
>>
>> The only warning it's:
>> WARN Utils: Your hostname, localhost resolves to a loopback address:
>> 127.0.0.1; using 192.168.1.132 instead (on interface eth0)
>>
>> I set SPARK_LOCAL_IP=127.0.0.1 to solve this, al least de warning
>> disappear,
>> but the script keep executing in local not in cluster.
>>
>> I think it has something to do with my virtual server:
>> -> Host Server: Linux Mint
>> -> The Virtual Server (workstation 10) where runs Spark is Linux Mint as
>> well.
>>
>> Any ideas what am i doing wrong?
>>
>> Thanks in advance for any suggestion, i getting mad on it!!
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Problem-submiting-an-script-py-against-an-standalone-cluster-tp24091.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
> --
> Marcelo
>


Re: How does the # of tasks affect # of threads?

2015-08-04 Thread Connor Zanin
Elkhan,

Thank you for the response. This was a great answer.

On Tue, Aug 4, 2015 at 1:47 PM, Elkhan Dadashov 
wrote:

> Hi Connor,
>
> Spark creates cached thread pool in Executor
> 
> for executing the tasks:
>
> // Start worker thread pool
> *private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor
> task launch worker")*
>
> and if we see org.apache.spark.util.ThreadUtils class, then we can see
> that number of threads for cached thread pool is not limited, so it can
> grow until *INTEGER.MAX_VALUE*
>
>  /**
>* Wrapper over newCachedThreadPool. Thread names are formatted as
> prefix-ID, where ID is a
>* unique, sequentially assigned integer.
>*/
>   def *newDaemonCachedThreadPool*(prefix: String): ThreadPoolExecutor = {
> val threadFactory = namedThreadFactory(prefix)
>
> Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
>   }
>
> And in java.util.concurrent.Executors class :
>
> /**
>  * Creates a thread pool that creates new threads as needed, but
>  * will reuse previously constructed threads when they are
>  * available, and uses the provided
>  * ThreadFactory to create new threads when needed.
>  * @param threadFactory the factory to use when creating new threads
>  * @return the newly created thread pool
>  * @throws NullPointerException if threadFactory is null
>  */
> public static ExecutorService *newCachedThreadPool*(ThreadFactory
> threadFactory) {
> return new ThreadPoolExecutor(0, *Integer.MAX_VALUE*,
>   60L, TimeUnit.SECONDS,
>   new SynchronousQueue(),
>   threadFactory);
> }
>
> So if there are lots of tasks to be launched at the same time, then the
> new thread can potentially grow until INTEGER.MAX_VALUE. But in reality as
> soon as tasks are finished, their threads will be returned back to cached
> pool, and will be reused for new tasks, instead of creating new thread for
> each task.
>
> Now let's see why OutOfMemory occurs due to lots of new threads created.
>
> OutofMemory error occurs usually when your executor/driver process does
> not have enough memory for allocating new native threads for executing
> tasks.
>
> With the help of this command you can see how many threads gets created
> while executing your spark job:
>
> *ps -u  -L | wc -l *
>
> (in my case basic KMEANS ML algorithm Spark job creates 400+ threads)
>
> with this command you can see thread limit set for your machine/OS which
> you can also increase:
>
> *ulimit -u*
>
> or more detailed:
>
> *ulimit -a*
>
>
> Here is
> 
> the logic how memory gets used by each new created thread in executor:
> "The number of threads that can run within a JVM process is generally
> limited by the address space for that process. Each thread requires a
> thread stack. The more threads you have, the more process address space you
> use. The more address space you use for thread stacks, the less you have
> for the Java heap."
>
> You can tune thread stack size
> 
> :
> -Xss determines the size of the stack: –Xss1024k. If the stack space is
> too small, eventually you will see an exception class
> java.lang.StackOverflowError.
>
> --
> Elkhan
>
> On Sat, Aug 1, 2015 at 2:38 PM, Connor Zanin  wrote:
>
>> 1. I believe that the default memory (per executor) is 512m (from the
>> documentation)
>> 2. I have increased the memory used by spark on workers in my launch
>> script when submitting the job
>>(--executor-memory 124g)
>> 3. The job completes successfully, it is the "road bumps" in the middle I
>> am concerned with
>>
>> I would like insight into how Spark handle thread creation
>>
>> On Sat, Aug 1, 2015 at 5:33 PM, Fabrice Sznajderman 
>> wrote:
>>
>>> Hello,
>>>
>>> I am not an expert with Spark, but the error thrown by spark seems
>>> indicate that not enough memory for launching job. By default, Spark
>>> allocated 1GB for memory, may be you should increase it ?
>>>
>>> Best regards
>>>
>>> Fabrice
>>>
>>> Le sam. 1 août 2015 à 22:51, Connor Zanin  a écrit :
>>>
 Hello,

 I am having an issue when I run a word count job. I have included the
 source and log files for reference. The job finishes successfully, but
 about halfway through I get a java.lang.OutOfMemoryError (could not create
 native thread), and this leads to the loss of the Executor. After some
 searching I found out this was a problem with the environment and the limit
 by the OS on how many threads I could spawn.

 However, I had thought that Spark only maintained a thread pool equal
 in size to the number of cores available across the nodes (by

Poor HDFS Data Locality on Spark-EC2

2015-08-04 Thread Jerry Lam
Hi Spark users and developers,

I have been trying to use spark-ec2. After I launched the spark cluster
(1.4.1) with ephemeral hdfs (using hadoop 2.4.0), I tried to execute a job
where the data is stored in the ephemeral hdfs. It does not matter what I
tried to do, there is no data locality at all. For instance, filtering data
and calculating the count of the filter data will always have locality
level "any". I tweaked the configurations spark.locality.wait.* but it does
not seem to care. I'm guessing this is because the hostname cannot be
resolved properly. Does anyone experience this problem before?

Best Regards,

Jerry


Is SPARK-3322 fixed in latest version of Spark?

2015-08-04 Thread Jim Green
*Symotom:*
Even sample job fails:
$ MASTER=spark://xxx:7077 run-example org.apache.spark.examples.SparkPi 10
Pi is roughly 3.140636
ERROR ConnectionManager: Corresponding SendingConnection to
ConnectionManagerId(xxx,) not found
WARN ConnectionManager: All connections not cleaned up

Found https://issues.apache.org/jira/browse/SPARK-3322
But the code changes are not in newer version os Spark, however this jira
is marked as fixed.
Is this issue really fixed in latest version? If so, what is the related
JIRA?

-- 
Thanks,
www.openkb.info
(Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)


Re: Is SPARK-3322 fixed in latest version of Spark?

2015-08-04 Thread Jim Green
And also https://issues.apache.org/jira/browse/SPARK-3106
This one is still open.

On Tue, Aug 4, 2015 at 6:12 PM, Jim Green  wrote:

> *Symotom:*
> Even sample job fails:
> $ MASTER=spark://xxx:7077 run-example org.apache.spark.examples.SparkPi 10
> Pi is roughly 3.140636
> ERROR ConnectionManager: Corresponding SendingConnection to
> ConnectionManagerId(xxx,) not found
> WARN ConnectionManager: All connections not cleaned up
>
> Found https://issues.apache.org/jira/browse/SPARK-3322
> But the code changes are not in newer version os Spark, however this jira
> is marked as fixed.
> Is this issue really fixed in latest version? If so, what is the related
> JIRA?
>
> --
> Thanks,
> www.openkb.info
> (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)
>



-- 
Thanks,
www.openkb.info
(Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)


Turn Off Compression for Textfiles

2015-08-04 Thread Brandon White
How do you turn off gz compression for saving as textfiles? Right now, I am
reading ,gz files and it is saving them as .gz. I would love to not
compress them when I save.

1) DStream.saveAsTextFiles() //no compression

2) RDD.saveAsTextFile() //no compression

Any ideas?


Re: Turn Off Compression for Textfiles

2015-08-04 Thread Philip Weaver
The .gz extension indicates that the file is compressed with gzip. Choose a
different extension (e.g. .txt) when you save them.

On Tue, Aug 4, 2015 at 7:00 PM, Brandon White 
wrote:

> How do you turn off gz compression for saving as textfiles? Right now, I
> am reading ,gz files and it is saving them as .gz. I would love to not
> compress them when I save.
>
> 1) DStream.saveAsTextFiles() //no compression
>
> 2) RDD.saveAsTextFile() //no compression
>
> Any ideas?
>


Re: Difference between RandomForestModel and RandomForestClassificationModel

2015-08-04 Thread Yanbo Liang
The old mllib API will use RandomForest.trainClassifier() to train a
RandomForestModel;
the new mllib API (AKA ML) will use RandomForestClassifier.train() to train
a RandomForestClassificationModel.
They will produce the same result for a given dataset.

2015-07-31 1:34 GMT+08:00 Bryan Cutler :

> Hi Praveen,
>
> In MLLib, the major difference is that RandomForestClassificationModel
> makes use of a newer API which utilizes ML pipelines.  I can't say for
> certain if they will produce the same exact result for a given dataset, but
> I believe they should.
>
> Bryan
>
> On Wed, Jul 29, 2015 at 12:14 PM, praveen S  wrote:
>
>> Hi
>> Wanted to know what is the difference between
>> RandomForestModel and RandomForestClassificationModel?
>> in Mlib.. Will they yield the same results for a given dataset?
>>
>
>


Combining Spark Files with saveAsTextFile

2015-08-04 Thread Brandon White
What is the best way to make saveAsTextFile save as only a single file?


RE: Combining Spark Files with saveAsTextFile

2015-08-04 Thread Mohammed Guller
One options is to use the coalesce method in the RDD class.

Mohammed

From: Brandon White [mailto:bwwintheho...@gmail.com]
Sent: Tuesday, August 4, 2015 7:23 PM
To: user
Subject: Combining Spark Files with saveAsTextFile


What is the best way to make saveAsTextFile save as only a single file?


RE: Combining Spark Files with saveAsTextFile

2015-08-04 Thread Mohammed Guller
Just to further clarify, you can first call coalesce with argument 1 and then 
call saveAsTextFile. For example,

rdd.coalesce(1).saveAsTextFile(...)



Mohammed

From: Mohammed Guller
Sent: Tuesday, August 4, 2015 9:39 PM
To: 'Brandon White'; user
Subject: RE: Combining Spark Files with saveAsTextFile

One options is to use the coalesce method in the RDD class.

Mohammed

From: Brandon White [mailto:bwwintheho...@gmail.com]
Sent: Tuesday, August 4, 2015 7:23 PM
To: user
Subject: Combining Spark Files with saveAsTextFile


What is the best way to make saveAsTextFile save as only a single file?


control the number of reducers for groupby in data frame

2015-08-04 Thread Fang, Mike
Hi,

Does anyone know how I could control the number of reducer when we do operation 
such as groupie For data frame?
I could set spark.sql.shuffle.partitions in sql but not sure how to do in 
df.groupBy("XX") api.

Thanks,
Mike


Re: Is SPARK-3322 fixed in latest version of Spark?

2015-08-04 Thread Aaron Davidson
ConnectionManager has been deprecated and is no longer used by default
(NettyBlockTransferService is the replacement). Hopefully you would no
longer see these messages unless you have explicitly flipped it back on.

On Tue, Aug 4, 2015 at 6:14 PM, Jim Green  wrote:

> And also https://issues.apache.org/jira/browse/SPARK-3106
> This one is still open.
>
> On Tue, Aug 4, 2015 at 6:12 PM, Jim Green  wrote:
>
>> *Symotom:*
>> Even sample job fails:
>> $ MASTER=spark://xxx:7077 run-example org.apache.spark.examples.SparkPi 10
>> Pi is roughly 3.140636
>> ERROR ConnectionManager: Corresponding SendingConnection to
>> ConnectionManagerId(xxx,) not found
>> WARN ConnectionManager: All connections not cleaned up
>>
>> Found https://issues.apache.org/jira/browse/SPARK-3322
>> But the code changes are not in newer version os Spark, however this jira
>> is marked as fixed.
>> Is this issue really fixed in latest version? If so, what is the related
>> JIRA?
>>
>> --
>> Thanks,
>> www.openkb.info
>> (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)
>>
>
>
>
> --
> Thanks,
> www.openkb.info
> (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)
>


Re: Total delay per batch in a CSV file

2015-08-04 Thread Saisai Shao
Hi,

Lots of streaming internal status are exposed through StreamingListener, as
well as what see from web UI, so you could write your own StreamingListener
and register in StreamingContext to get the internal information of Spark
Streaming and write to CSV file.

You could check the source code here (
https://github.com/apache/spark/blob/3c0156899dc1ec1f7dfe6d7c8af47fa6dc7d00bf/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
).

Thanks
Saisai


On Tue, Aug 4, 2015 at 6:58 PM, allonsy  wrote:

> Hi everyone,
>
> I'm working with Spark Streaming, and I need to perform some offline
> performance measures.
>
> What I'd like to have is a CSV file that reports something like this:
>
> *Batch number/timestampInput SizeTotal Delay*
>
> which is in fact similar to what the UI outputs.
>
> I tried to get some metrics (metrics.properties), but I'm having hard time
> getting precise information on every single batch, since they only have
> entries concerning the /last/ (completed/received) batch, and values are
> often different to those appearing in the UI.
>
> Can anybody give me some advice on how to get metrics that are close to
> those of the UI?
>
> Thanks!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Total-delay-per-batch-in-a-CSV-file-tp24129.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Combining Spark Files with saveAsTextFile

2015-08-04 Thread Igor Berman
using coalesce might be dangerous, since 1 worker process will need to
handle whole file and if the file is huge you'll get OOM, however it
depends on implementation, I'm not sure how it will be done
nevertheless, worse to try the coallesce method(please post your results)

another option would be to use FileUtil.copyMerge which copies each
partition one after another into destination stream(file); so as soon as
you've written your hdfs file with spark with multiple partitions in
parallel(as usual), you can then make another step to merge it into any
destination you want

On 5 August 2015 at 07:43, Mohammed Guller  wrote:

> Just to further clarify, you can first call coalesce with argument 1 and
> then call saveAsTextFile. For example,
>
>
>
> rdd.coalesce(1).saveAsTextFile(...)
>
>
>
>
>
>
>
> Mohammed
>
>
>
> *From:* Mohammed Guller
> *Sent:* Tuesday, August 4, 2015 9:39 PM
> *To:* 'Brandon White'; user
> *Subject:* RE: Combining Spark Files with saveAsTextFile
>
>
>
> One options is to use the coalesce method in the RDD class.
>
>
>
> Mohammed
>
>
>
> *From:* Brandon White [mailto:bwwintheho...@gmail.com
> ]
> *Sent:* Tuesday, August 4, 2015 7:23 PM
> *To:* user
> *Subject:* Combining Spark Files with saveAsTextFile
>
>
>
> What is the best way to make saveAsTextFile save as only a single file?
>