Does reduceByKey only work properly for numeric keys?

2015-04-18 Thread SecondDatke
I'm trying to solve a Word-Count like problem, the difference lies in that, I 
need the count of a specific word among a specific timespan in a social message 
stream.
My data is in the format of (time, message), and I transformed (flatMap etc.) 
it into a series of (time, word_id), the time is represented with Python 
datetime.datetime class. And I continued to transform it to ((time, word_id), 
1) then use reduceByKey for result.
But the dataset returned is a little weird, just like the following:
format:((timespan with datetime.datetime, wordid), freq)
((datetime.datetime(2009, 10, 6, 2, 0), 0), 8)((datetime.datetime(2009, 10, 6, 
3, 0), 0), 3)((datetime.datetime(2009, 10, 6, 3, 0), 0), 14)
As you can see, there are DUPLICATED keys, but as a result of reducedByKey, all 
keys SHOULD BE UNIQUE.
I tried to convert the key to a string (like '2006-12-02 21:00:00-000') and 
reducedByKey again, the problem stays. It seems the only way left for me is 
convert the date to a timestamp, but this time it works.
Is this expected behavior of reduceByKey(and all other transformations that 
work with keys)?
Currently I'm still working on it.

RE: Does reduceByKey only work properly for numeric keys?

2015-04-18 Thread SecondDatke
The comparison of Python tuple is lexicographical(that is, two tuple equals if 
and only if every element is same and in the same position), and though I'm not 
very clear with what's going on inside Spark, I have tried comparing the 
duplicate keys in the result with == operator, and hashing them in Python, 
they're same.
And as I said, I also tried using String as the key, which also failed.

 From: so...@cloudera.com
 Date: Sat, 18 Apr 2015 16:30:59 +0100
 Subject: Re: Does reduceByKey only work properly for numeric keys?
 To: lovejay-lovemu...@outlook.com
 CC: user@spark.apache.org
 
 Do these datetime objects implement a the notion of equality you'd
 expect? (This may be a dumb question; I'm thinking of the equivalent
 of equals() / hashCode() from the Java world.)
 
 On Sat, Apr 18, 2015 at 4:17 PM, SecondDatke
 lovejay-lovemu...@outlook.com wrote:
  I'm trying to solve a Word-Count like problem, the difference lies in that,
  I need the count of a specific word among a specific timespan in a social
  message stream.
 
  My data is in the format of (time, message), and I transformed (flatMap
  etc.) it into a series of (time, word_id), the time is represented with
  Python datetime.datetime class. And I continued to transform it to ((time,
  word_id), 1) then use reduceByKey for result.
 
  But the dataset returned is a little weird, just like the following:
 
  format:
  ((timespan with datetime.datetime, wordid), freq)
 
  ((datetime.datetime(2009, 10, 6, 2, 0), 0), 8)
  ((datetime.datetime(2009, 10, 6, 3, 0), 0), 3)
  ((datetime.datetime(2009, 10, 6, 3, 0), 0), 14)
 
  As you can see, there are DUPLICATED keys, but as a result of reducedByKey,
  all keys SHOULD BE UNIQUE.
 
  I tried to convert the key to a string (like '2006-12-02 21:00:00-000') and
  reducedByKey again, the problem stays. It seems the only way left for me is
  convert the date to a timestamp, but this time it works.
 
  Is this expected behavior of reduceByKey(and all other transformations that
  work with keys)?
 
  Currently I'm still working on it.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
  

RE: Does reduceByKey only work properly for numeric keys?

2015-04-18 Thread SecondDatke
I don't think my experiment is suprising, it's my fault:
To move away from my case, I wrote a test program, which generates data 
randomly, and cast the key to string:
import randomimport operator
COUNT = 2COUNT_PARTITIONS = 36LEN = 233
rdd = sc.parallelize(((str(random.randint(1, LEN)), 1) for i in xrange(COUNT)), 
COUNT_PARTITIONS)reduced = 
rdd.reduceByKey(operator.add).sortByKey()print(reduced.count(), LEN) # the 
result is valid if count == LEN
More about my environment: I'm running Spark on a small Mesos cluster, I'm 
always using pyspark shell with Python 2.7.9, IPython 3.0.0. The operating 
system is ArchLinux.
And, there is a node, running x86 Arch Linux, while the others x86_64.
The problem arises, as long as the x86 node and x64 nodes works together. 
Nothing wrong if there is only a x86 node in the cluster, or just x64 nodes. 
And currently only reduceByKey with int32 keys makes sense.
Maybe I should update my system.
Date: Sat, 18 Apr 2015 08:28:50 -0700
Subject: Re: Does reduceByKey only work properly for numeric keys?
From: yuzhih...@gmail.com
To: lovejay-lovemu...@outlook.com
CC: user@spark.apache.org

Can you show us the function you passed to reduceByKey() ?
What release of Spark are you using ?
Cheers
On Sat, Apr 18, 2015 at 8:17 AM, SecondDatke lovejay-lovemu...@outlook.com 
wrote:



I'm trying to solve a Word-Count like problem, the difference lies in that, I 
need the count of a specific word among a specific timespan in a social message 
stream.
My data is in the format of (time, message), and I transformed (flatMap etc.) 
it into a series of (time, word_id), the time is represented with Python 
datetime.datetime class. And I continued to transform it to ((time, word_id), 
1) then use reduceByKey for result.
But the dataset returned is a little weird, just like the following:
format:((timespan with datetime.datetime, wordid), freq)
((datetime.datetime(2009, 10, 6, 2, 0), 0), 8)((datetime.datetime(2009, 10, 6, 
3, 0), 0), 3)((datetime.datetime(2009, 10, 6, 3, 0), 0), 14)
As you can see, there are DUPLICATED keys, but as a result of reducedByKey, all 
keys SHOULD BE UNIQUE.
I tried to convert the key to a string (like '2006-12-02 21:00:00-000') and 
reducedByKey again, the problem stays. It seems the only way left for me is 
convert the date to a timestamp, but this time it works.
Is this expected behavior of reduceByKey(and all other transformations that 
work with keys)?
Currently I'm still working on it.

  

RE: Does reduceByKey only work properly for numeric keys?

2015-04-18 Thread SecondDatke
Well, I'm changing the strategy.
(Assuming we have N(words)  10, interval between two time points  1 hour)
* Use only the timestamp as the key, no duplicate key. But it would only reduce 
to something like the total number of words in a timespan.
* Key = tuple(timestamp, word_id), duplication.
* Key = timestamp*1000+word_id, just like hashing it, duplication. But it has 
already exceeded 2^31 so its a long in Python(IIRC the man value of Python long 
type is not limited, thus it is not same as common int internally), not integer.
* Key = timestamp/3600*1000+word_id, which prevents the hash value from 
converted to long. And it seems there is no duplication.
(All conclusion above came from simple sampling from the returned dataset and 
is not well validated)

From: lovejay-lovemu...@outlook.com
To: so...@cloudera.com
CC: user@spark.apache.org
Subject: RE: Does reduceByKey only work properly for numeric keys?
Date: Sat, 18 Apr 2015 23:52:44 +0800




The comparison of Python tuple is lexicographical(that is, two tuple equals if 
and only if every element is same and in the same position), and though I'm not 
very clear with what's going on inside Spark, I have tried comparing the 
duplicate keys in the result with == operator, and hashing them in Python, 
they're same.
And as I said, I also tried using String as the key, which also failed.

 From: so...@cloudera.com
 Date: Sat, 18 Apr 2015 16:30:59 +0100
 Subject: Re: Does reduceByKey only work properly for numeric keys?
 To: lovejay-lovemu...@outlook.com
 CC: user@spark.apache.org
 
 Do these datetime objects implement a the notion of equality you'd
 expect? (This may be a dumb question; I'm thinking of the equivalent
 of equals() / hashCode() from the Java world.)
 
 On Sat, Apr 18, 2015 at 4:17 PM, SecondDatke
 lovejay-lovemu...@outlook.com wrote:
  I'm trying to solve a Word-Count like problem, the difference lies in that,
  I need the count of a specific word among a specific timespan in a social
  message stream.
 
  My data is in the format of (time, message), and I transformed (flatMap
  etc.) it into a series of (time, word_id), the time is represented with
  Python datetime.datetime class. And I continued to transform it to ((time,
  word_id), 1) then use reduceByKey for result.
 
  But the dataset returned is a little weird, just like the following:
 
  format:
  ((timespan with datetime.datetime, wordid), freq)
 
  ((datetime.datetime(2009, 10, 6, 2, 0), 0), 8)
  ((datetime.datetime(2009, 10, 6, 3, 0), 0), 3)
  ((datetime.datetime(2009, 10, 6, 3, 0), 0), 14)
 
  As you can see, there are DUPLICATED keys, but as a result of reducedByKey,
  all keys SHOULD BE UNIQUE.
 
  I tried to convert the key to a string (like '2006-12-02 21:00:00-000') and
  reducedByKey again, the problem stays. It seems the only way left for me is
  convert the date to a timestamp, but this time it works.
 
  Is this expected behavior of reduceByKey(and all other transformations that
  work with keys)?
 
  Currently I'm still working on it.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 

  

How to submit job in a different user?

2015-04-09 Thread SecondDatke
Well, maybe a Linux configure problem...
I have a cluster that is about to expose to the public, and I want everyone 
that uses my cluster owns a user (without permissions of sudo, etc.)(e.g. 
'guest'), and is able to submit tasks to Spark, which working on Mesos that 
running with a different, private user ('sparkuser' for example).
But, now let's say I launched Mesos slave at Node 1 with 'sparkuser', Node 2 
with 'guest', and submit a job with 'guest', then Node 1 will fail, saying:
Failed to change user to 'guest': Failed to set gid: Operation not permitted.
Any solution? Or this just doesn't make any sense?
Thanks.   

How to work with sparse data in Python?

2015-04-06 Thread SecondDatke
I'm trying to apply Spark to a NLP problem that I'm working around. I have near 
4 million tweets text and I have converted them into word vectors. It's pretty 
sparse because each message just has dozens of words but the vocabulary has 
tens of thousand words.
These vectors should be loaded each time my program handles the data. I stack 
these vectors to a 50k(size of voca.)x4M(count of msg.) sparse matrix with 
scipy.sparse to persist it on my disk for two reasons: 1) It just costs 400MB 
of disk space 2) Loading and parsing it is really fast. (I convert it to 
csr_matrix and index each row for the messages)
This works good on my local machine, with common Python and scipy/numpy. 
However, It seems Spark does not support scipy.sparse directly. Again, I used a 
csr_matrix, and I can extract a specific row and convert to a numpy array 
efficiently. But when I parallelize it Spark errored: sparse matrix length is 
ambiguous; use getnnz() or shape[0].
csr_matrix does not support len(), so Spark cannot partition it.
Now I use this matrix as a broadcast variable (it's relatively small for the 
memory), and parallelize a xrange(0, matrix.shape[0]) list to index the matrix 
in map function.
It's there a better solution?
Thanks.