Re: Is RankingMetrics' NDCG implementation correct?

2016-09-19 Thread Jong Wook Kim
Thanks for the clarification and the relevant links. I overlooked the
comments explicitly saying that the relevance is binary.

I understand that the label is not a relevance, but I have been, and I
think many people are using the label as relevance in the implicit feedback
context where the user-provided exact label is not defined anyway. I think
that's why RiVal <https://github.com/recommenders/rival>'s using the term
"preference" for both the label for MAE and the relevance for NDCG.

At the same time, I see why Spark decided to assume the relevance is
binary, in part to conform to the class RankingMetrics's constructor. I
think it would be nice if the upcoming DataFrame-based RankingEvaluator can
be optionally set a "relevance column" that has non-binary relevance
values, otherwise defaulting to either 1.0 or the label column.

My extended version of RankingMetrics is here:
https://github.com/jongwook/spark-ranking-metrics . It has a test case
checking that the numbers are same as RiVal's.

Jong Wook



On 19 September 2016 at 03:13, Sean Owen  wrote:

> Yes, relevance is always 1. The label is not a relevance score so
> don't think it's valid to use it as such.
>
> On Mon, Sep 19, 2016 at 4:42 AM, Jong Wook Kim  wrote:
> > Hi,
> >
> > I'm trying to evaluate a recommendation model, and found that Spark and
> > Rival give different results, and it seems that Rival's one is what
> Kaggle
> > defines: https://gist.github.com/jongwook/5d4e78290eaef22cb69abbf68b52e5
> 97
> >
> > Am I using RankingMetrics in a wrong way, or is Spark's implementation
> > incorrect?
> >
> > To my knowledge, NDCG should be dependent on the relevance (or
> preference)
> > values, but Spark's implementation seems not; it uses 1.0 where it
> should be
> > 2^(relevance) - 1, probably assuming that relevance is all 1.0? I also
> tried
> > tweaking, but its method to obtain the ideal DCG also seems wrong.
> >
> > Any feedback from MLlib developers would be appreciated. I made a
> > modified/extended version of RankingMetrics that produces the identical
> > numbers to Kaggle and Rival's results, and I'm wondering if it is
> something
> > appropriate to be added back to MLlib.
> >
> > Jong Wook
>


Is RankingMetrics' NDCG implementation correct?

2016-09-18 Thread Jong Wook Kim
Hi,

I'm trying to evaluate a recommendation model, and found that Spark and
Rival  give different results,
and it seems that Rival's one is what Kaggle defines
:
https://gist.github.com/jongwook/5d4e78290eaef22cb69abbf68b52e597

Am I using RankingMetrics in a wrong way, or is Spark's implementation
incorrect?

To my knowledge, NDCG should be dependent on the relevance (or preference)
values, but Spark's implementation

seems not; it uses 1.0 where it should be 2^(relevance) - 1, probably
assuming that relevance is all 1.0? I also tried tweaking, but its method
to obtain the ideal DCG also seems wrong.

Any feedback from MLlib developers would be appreciated. I made a
modified/extended version of RankingMetrics that produces the identical
numbers to Kaggle and Rival's results, and I'm wondering if it is something
appropriate to be added back to MLlib.

Jong Wook


Re: AVRO vs Parquet

2016-03-03 Thread Jong Wook Kim
How about ORC? I have experimented briefly with Parquet and ORC, and I
liked the fact that ORC has its schema within the file, which makes it
handy to work with any other tools.

Jong Wook

On 3 March 2016 at 23:29, Don Drake  wrote:

> My tests show Parquet has better performance than Avro in just about every
> test.  It really shines when you are querying a subset of columns in a wide
> table.
>
> -Don
>
> On Wed, Mar 2, 2016 at 3:49 PM, Timothy Spann 
> wrote:
>
>> Which format is the best format for SparkSQL adhoc queries and general
>> data storage?
>>
>> There are lots of specialized cases, but generally accessing some but not
>> all the available columns with a reasonable subset of the data.
>>
>> I am learning towards Parquet as it has great support in Spark.
>>
>> I also have to consider any file on HDFS may be accessed from other tools
>> like Hive, Impala, HAWQ.
>>
>> Suggestions?
>> —
>> airis.DATA
>> Timothy Spann, Senior Solutions Architect
>> C: 609-250-5894
>> http://airisdata.com/
>> http://meetup.com/nj-datascience
>>
>>
>>
>
>
> --
> Donald Drake
> Drake Consulting
> http://www.drakeconsulting.com/
> https://twitter.com/dondrake 
> 800-733-2143
>


Spark-shell connecting to Mesos stuck at sched.cpp

2015-11-15 Thread Jong Wook Kim
I'm having problem connecting my spark app to a Mesos cluster; any help on
the below question would be appreciated.

http://stackoverflow.com/questions/33727154/spark-shell-connecting-to-mesos-stuck-at-sched-cpp

Thanks,
Jong Wook


Spark YARN Shuffle service wire compatibility

2015-10-22 Thread Jong Wook Kim
Hi, I’d like to know if there is a guarantee that Spark YARN shuffle service 
has wire compatibility between 1.x versions.

I could run Spark 1.5 job with YARN nodemanagers having shuffle service 1.4, 
but it might’ve been just a coincidence.

Now we’re upgrading CDH to 5.3 to 5.4, whose NodeManager already have shuffle 
service of 1.3 in its classpath, and concerned that it might not be always 
compatible with 1.5 jobs expecting shuffle service.


Jong Wook
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to maintain multiple JavaRDD created within another method like javaStreamRDD.forEachRDD

2015-07-14 Thread Jong Wook Kim
Your question is not very clear, but from what I understand, you want to deal 
with a stream of MyTable that has parsed records from your Kafka topics.

What you need is JavaDStream, and you can use transform() 

 to make one.

It accepts a function that accepts an RDD and returns an RDD, as opposed to 
foreachRDD, whose argument returns Void as in your code.

PS. I wouldn't name a JavaDStream "javaDStreamRdd", first of all it is not an 
RDD, and it should be more specific about what it contains.


Jong Wook.


> On Jul 15, 2015, at 00:41, unk1102  wrote:
> 
> I use Spark Streaming where messages read from Kafka topics are stored into
> JavaDStream this rdd contains actual data. Now after going through
> documentation and other help I have found we traverse JavaDStream using
> foreachRDD
> 
> javaDStreamRdd.foreachRDD(new Function,Void>() {
>public void call(JavaRDD rdd) {
>//now I want to call mapPartitions on above rdd and generate new
> JavaRDD
>JavaRDD rdd_records = rdd.mapPartitions(
>  new FlatMapFunction, MyTable>() {
>  public Iterable call(Iterator stringIterator)
> throws Exception {
> //create List execute the following in while loop
> String[] fields = line.split(",");
> Record record = create Record from above fields 
> MyTable table = new MyTable();
> return table.append(record);
>}
> });
>}
>return null;
>}
> });
> 
> Now my question how does above code work. I want to create JavaRDD
> for each RDD of JavaDStream. How do I make sure above code will work fine
> with all data and JavaRDD will contain all the data and wont lose
> any previous data because of local JavaRDD.
> 
> It is like calling lambda function within lambda function. How do I make
> sure local variable JavaRDD will point to contain all RDD?
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-maintain-multiple-JavaRDD-created-within-another-method-like-javaStreamRDD-forEachRDD-tp23832.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 



Re: ProcessBuilder in SparkLauncher is memory inefficient for launching new process

2015-07-14 Thread Jong Wook Kim
The article you've linked, is specific to an embedded system. the JVM built for 
that architecture (which the author didn't mention) might not be as stable and 
well-supported as HotSpot.

ProcessBuilder is a stable Java API and despite somewhat limited functionality 
it is the standard method to launch a subprocess within a JVM.

You also have misconception about forking and memory. Forking a process does 
not double the memory consumption, any modern unix (except that the poor 
embedded one) will use copy-on-write scheme for the forked process' virtual 
memory, so no more physical memory will be consumed. 



Jong Wook.


> On Jul 15, 2015, at 01:39, Elkhan Dadashov  wrote:
> 
> Hi all,
> 
> If you want to launch Spark job from Java in programmatic way, then you need 
> to Use SparkLauncher.
> 
> SparkLauncher uses ProcessBuilder for creating new process - Java seems 
> handle process creation in an inefficient way.
> 
> "
> When you execute a process, you must first fork() and then exec(). Forking 
> creates a child process by duplicating the current process. Then, you call 
> exec() to change the “process image” to a new “process image”, essentially 
> executing different code within the child process.
> ...
> When we want to fork a new process, we have to copy the ENTIRE Java JVM… What 
> we really are doing is requesting the same amount of memory the JVM been 
> allocated.
> "
> Source: http://bryanmarty.com/2012/01/14/forking-jvm/ 
> 
> This link  shows different 
> solutions for launching new processes in Java.
> 
> If our main program JVM already uses big amount of memory (let's say 6GB), 
> then for creating new process while using SparkLauncher, we need 12 GB 
> (virtual) memory available, even though we will not use it.
> 
> It will be very helpful if someone could share his/her experience for handing 
> this memory inefficiency in creating new processes in Java.
> 



Re: spark on yarn

2015-07-14 Thread Jong Wook Kim
it's probably because your YARN cluster has only 40 vCores available.

Go to your resource manager and check if "VCores Total" and "Memory Total" 
exceeds what you have set. (40 cores and 5120 MB)

If that looks fine, go to "Scheduler" page and find the queue on which your 
jobs run, and check the resources allocated for that queue.

Hope this helps.

Jong Wook


> On Jul 15, 2015, at 01:57, Shushant Arora  wrote:
> 
> I am running spark application on yarn managed cluster.
> 
> When I specify --executor-cores > 4 it fails to start the application.
> I am starting the app as
> 
> spark-submit --class classname --num-executors 10 --executor-cores 5 --master 
> masteradd jarname
> 
> Exception in thread "main" org.apache.spark.SparkException: Yarn application 
> has already ended! It might have been killed or unable to launch application 
> master.
> 
> When I give --executor-cores as 4 , it works fine.
> 
> My Cluster has 10 nodes . 
> Why am I not able to specify more than 4 concurrent tasks. Is there any max 
> limit yarn side or spark side which I can override to make use of more tasks ?


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: About extra memory on yarn mode

2015-07-14 Thread Jong Wook Kim
executor.memory only sets the maximum heap size of executor and the JVM needs 
non-heap memory to store class metadata, interned strings and other native 
overheads coming from networking libraries, off-heap storage levels, etc. These 
are (of course) legitimate usage of resources and you'll have to plan your 
cluster's resource accordingly. If 6g is a hard limit for your cluster, try 
reducing executor.memory to 5g and set executor.memoryOverhead to 1g. If disk 
spill is working correctly it won't hurt much performance.

Jong Wook


> On Jul 14, 2015, at 21:44, Sea <261810...@qq.com> wrote:
> 
> Hi all:
> I have a question about why spark on yarn will need extra memory
> I apply for 10 executors, executor memory 6g,  I find that it will allocate 
> 1g more for 1 executor, totally 7g for 1 executor.
> I try to set spark.yarn.executor.memoryOverhead, but it did not help.
> 1g for 1 executor is too much, who knows how to adjust its size?



Re: RECEIVED SIGNAL 15: SIGTERM

2015-07-12 Thread Jong Wook Kim
Based on my experience, YARN containers can get SIGTERM when 

- it produces too much logs and use up the hard drive
- it uses off-heap memory more than what is given by 
spark.yarn.executor.memoryOverhead configuration. It might be due to too many 
classes loaded (less than MaxPermGen but more than memoryOverhead), or some 
other off-heap memory allocated by networking library, etc.
- it opens too many file descriptors, which you can check on the executor 
node's /proc//fd/

Does any of these apply to your situation?

Jong Wook

> On Jul 7, 2015, at 19:16, Kostas Kougios  
> wrote:
> 
> I am still receiving these weird sigterms on the executors. The driver claims
> it lost the executor, the executor receives a SIGTERM (from whom???)
> 
> It doesn't seem a memory related issue though increasing memory takes the
> job a bit further or completes it. But why? there is no memory pressure on
> neither driver nor executor. And nothing in the logs indicating so.
> 
> driver:
> 
> 15/07/07 10:47:04 INFO scheduler.TaskSetManager: Starting task 14762.0 in
> stage 0.0 (TID 14762, cruncher03.stratified, PROCESS_LOCAL, 13069 bytes)
> 15/07/07 10:47:04 INFO scheduler.TaskSetManager: Finished task 14517.0 in
> stage 0.0 (TID 14517) in 15950 ms on cruncher03.stratified (14507/42240)
> 15/07/07 10:47:04 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated
> or disconnected! Shutting down. cruncher05.stratified:32976
> 15/07/07 10:47:04 ERROR cluster.YarnClusterScheduler: Lost executor 1 on
> cruncher05.stratified: remote Rpc client disassociated
> 15/07/07 10:47:04 INFO scheduler.TaskSetManager: Re-queueing tasks for 1
> from TaskSet 0.0
> 15/07/07 10:47:04 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated
> or disconnected! Shutting down. cruncher05.stratified:32976
> 15/07/07 10:47:04 WARN remote.ReliableDeliverySupervisor: Association with
> remote system [akka.tcp://sparkExecutor@cruncher05.stratified:32976] has
> failed, address is now gated for [5000] ms. Reason is: [Disassociated].
> 
> 15/07/07 10:47:04 WARN scheduler.TaskSetManager: Lost task 14591.0 in stage
> 0.0 (TID 14591, cruncher05.stratified): ExecutorLostFailure (executor 1
> lost)
> 
> gc log for driver, it doesnt look like it run outofmem:
> 
> 2015-07-07T10:45:19.887+0100: [GC (Allocation Failure) 
> 1764131K->1391211K(3393024K), 0.0102839 secs]
> 2015-07-07T10:46:00.934+0100: [GC (Allocation Failure) 
> 1764971K->1391867K(3405312K), 0.0099062 secs]
> 2015-07-07T10:46:45.252+0100: [GC (Allocation Failure) 
> 1782011K->1392596K(3401216K), 0.0167572 secs]
> 
> executor:
> 
> 15/07/07 10:47:03 INFO executor.Executor: Running task 14750.0 in stage 0.0
> (TID 14750)
> 15/07/07 10:47:03 INFO spark.CacheManager: Partition rdd_493_14750 not
> found, computing it
> 15/07/07 10:47:03 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED
> SIGNAL 15: SIGTERM
> 15/07/07 10:47:03 INFO storage.DiskBlockManager: Shutdown hook called
> 
> executor gc log (no outofmem as it seems):
> 2015-07-07T10:47:02.332+0100: [GC (GCLocker Initiated GC) 
> 24696750K->23712939K(33523712K), 0.0416640 secs]
> 2015-07-07T10:47:02.598+0100: [GC (GCLocker Initiated GC) 
> 24700520K->23722043K(33523712K), 0.0391156 secs]
> 2015-07-07T10:47:02.862+0100: [GC (Allocation Failure) 
> 24709182K->23726510K(33518592K), 0.0390784 secs]
> 
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/RECEIVED-SIGNAL-15-SIGTERM-tp23668.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 



Re: Streaming checkpoints and logic change

2015-07-08 Thread Jong Wook Kim
Hi TD, you answered a wrong question. If you read the subject, mine was
specifically about checkpointing. I'll elaborate

The checkpoint, which is a serialized DStream DAG, contains all the
metadata and *logic*, like the function passed to e.g. DStream.transform()

This is serialized as a anonymous inner class at the JVM level, and will
not tolerate the slightest logic change, because the class signature will
change and cannot deserialize from the checkpoint which contains the
serialized from the previous version.

Logic changes are extremely common in stream processing. Say I have a log
transformer which extracts certain fields of logs from a Kafka stream and I
want to add another field to extract. This involves dstream logic changes,
thus cannot be done using checkpoint, I can't even achieve at-least-once
guarantee.

My current workaround is to read current offsets by casting to
HasOffsetRanges
<https://github.com/apache/spark/blob/v1.4.1-rc3/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala#L36-L39>
and
saving them to ZooKeeper, and give fromOffsets parameter read from
ZooKeeper when creating a directStream. I've settled down to this approach
for now, but I want to know how makers of Spark Streaming think about this
drawback of checkpointing.

If anyone had similar experience, suggestions will be appreciated.

Jong Wook



On 9 July 2015 at 02:13, Tathagata Das  wrote:

> You can use DStream.transform for some stuff. Transform takes a RDD => RDD
> function that allow arbitrary RDD operations to be done on RDDs of a
> DStream. This function gets evaluated on the driver on every batch
> interval. If you are smart about writing the function, it can do different
> stuff at different intervals. For example, you can always use a
> continuously updated set of filters
>
> dstream.transform { rdd =>
>val broadcastedFilters = Filters.getLatest()
>val newRDD  = rdd.filter { x => broadcastedFilters.get.filter(x) }
>newRDD
> }
>
>
> The function Filters.getLatest() will return the latest set of filters
> that is broadcasted out, and as the transform function is processed in
> every batch interval, it will always use the latest filters.
>
> HTH.
>
> TD
>
> On Wed, Jul 8, 2015 at 10:02 AM, Jong Wook Kim  wrote:
>
>> I just asked this question at the streaming webinar that just ended, but
>> the speakers didn't answered so throwing here:
>>
>> AFAIK checkpoints are the only recommended method for running Spark
>> streaming without data loss. But it involves serializing the entire dstream
>> graph, which prohibits any logic changes. How should I update / fix logic
>> of a running streaming app without any data loss?
>>
>> Jong Wook
>>
>
>


Streaming checkpoints and logic change

2015-07-08 Thread Jong Wook Kim
I just asked this question at the streaming webinar that just ended, but
the speakers didn't answered so throwing here:

AFAIK checkpoints are the only recommended method for running Spark
streaming without data loss. But it involves serializing the entire dstream
graph, which prohibits any logic changes. How should I update / fix logic
of a running streaming app without any data loss?

Jong Wook


Re: saveAsTextFile of RDD[Array[Any]]

2015-02-09 Thread Jong Wook Kim
If you have `RDD[Array[Any]]` you can do

rdd.map(_.mkString("\t"))

or with some other delimiter to make it `RDD[String]`, and then call
`saveAsTextFile`.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-of-RDD-Array-Any-tp21548p21554.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Custom streaming receiver slow on YARN

2015-02-09 Thread Jong Wook Kim
replying to my own thread; I realized that this only happens when the
replication level is 1.

Regardless of whether setting memory_only or disk or deserialized, I had to
make the replication level >= 2 to make the streaming work properly on YARN.

I still don't get it why, because intuitively less replication should imply
faster computation, and testing on a cloudera VM everything worked fine on
YARN.

If I am missing something important, please let me know. I am going to
settle down to '..._2' variants for now.


Jong Wook



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Custom-streaming-receiver-slow-on-YARN-tp21544p21553.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Custom streaming receiver slow on YARN

2015-02-07 Thread Jong Wook Kim
Hello people, I have an issue that my streaming receiver is laggy on YARN.

Can anyone reply to my question on StackOverflow?:

http://stackoverflow.com/questions/28370362/spark-streaming-receiver-particularly-slow-on-yarn

Thanks
Jong Wook



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Custom-streaming-receiver-slow-on-YARN-tp21544.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org