How to use groupByKey() in spark structured streaming without aggregates

2020-10-27 Thread act_coder
Is there a way through which we can use* groupByKey() Function in spark
structured streaming without aggregates ?*

I have a scenario like below, where we would like to group the items based
on a key without applying any aggregates.

*Sample incoming data:*



I would like to apply groupByKey on field - "device_id", so that i will be
getting an output like below.



I have also tried using collect_list() in the aggregate expression of
groupByKey, but that is taking more time to process the datasets.

Also, since we are aggregating - we could only use either 'Complete' or
'Update' in output modes, but 'Append' mode looks more suitable for our use
case.

I have also looked at the groupByKey(Num_Partitions) and reduceByKey()
functions available in Direct Dstream which gives results like in the form
of -> (String, Itreable[String,Int]) without doing any aggregates.

Is there something available similar to that in structured streaming ?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How to use Dataset forEachPartion and groupByKey together

2018-11-01 Thread Kuttaiah Robin
Hello all,

Am using  spark-2.3.0 and hadoop-2.7.4.
I have spark streaming application which listens to kafka topic, does some
transformation and writes to Oracle database using JDBC client.


Step 1.
Read events from Kafka as shown below;
--
   Dataset kafkaEvents = getSparkSession().readStream().format("kafka")
  .option("kafka.bootstrap.servers", strKafkaAddress)
  .option("assign", strSubscription)
  .option("maxOffsetsPerTrigger", "10")
  .option("startingOffsets", "latest")
  .option("failOnDataLoss", false)
  .load()
  .filter(strFilter)

.select(functions.from_json(functions.col("value").cast("string"),
oSchema).alias("events"))
  .select("events.*");

I do groupByKey and then for each group, use those set of events obtained
per group, create JDBC connection/preparedStatement, insert and then close
connection.
Am using Oracle JDBC along with flatMapGroupsWithState.


Step 2.
Groupby and flatMapGroupwithState
-
Dataset  sessionUpdates = kafkaEvents
   .groupByKey(
  new MapFunction() {
@Override public String call(Row event) {
  return event.getAs(m_InsightRawEvent.getPrimaryKey());
}
  }, Encoders.STRING())
  .flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(),
  Encoders.bean(InsightEventInfo.class),
Encoders.bean(InsightEventUpdate.class),
  GroupStateTimeout.ProcessingTimeTimeout());


This has a drawback where it creates connection, inserts into DB for each
group.

I need to do it for each partition so that only one connection and one
bacth insert can be done for all the new events which is read from the
partition.

Can somebody point me on how I can achieve this?

Basically am looking below;
1. Read from kafka as said above.
2. kafkaEvents.forEachPartion - Create one connection here.
3. Groupby and flatMapGroupwithState

thanks
Robin Kuttaiah


Re: Replacing groupBykey() with reduceByKey()

2018-08-08 Thread Biplob Biswas
Hi Santhosh,

My name is not Bipin, its Biplob as is clear from my Signature.

Regarding your question, I have no clue what your map operation is doing on
the grouped data, so I can only suggest you to do :

dd = hive_context.read.orc(orcfile_dir).rdd.map(lambda x:
(x[0],x)).reduceByKey(build_edges, 25)

Although based on the return type you would have to modify your build_edges
function.

Thanks & Regards
Biplob Biswas


On Mon, Aug 6, 2018 at 6:28 PM Bathi CCDB  wrote:

> Hey Bipin,
> Thanks for the reply, I am actually aggregating after the groupByKey() 
> operation,
> I have posted the wrong code snippet in my first email. Here is what I am
> doing
>
> dd = hive_context.read.orc(orcfile_dir).rdd.map(lambda x: 
> (x[0],x)).groupByKey(25).map(build_edges)
>
> Can we replace reduceByKey() in this context ?
>
> Santhosh
>
>
> On Mon, Aug 6, 2018 at 1:20 AM, Biplob Biswas 
> wrote:
>
>> Hi Santhosh,
>>
>> If you are not performing any aggregation, then I don't think you can
>> replace your groupbykey with a reducebykey, and as I see you are only
>> grouping and taking 2 values of the result, thus I believe you can't just
>> replace your groupbykey with that.
>>
>> Thanks & Regards
>> Biplob Biswas
>>
>>
>> On Sat, Aug 4, 2018 at 12:05 AM Bathi CCDB  wrote:
>>
>>> I am trying to replace groupByKey() with reudceByKey(), I am a pyspark
>>> and python newbie and I am having a hard time figuring out the lambda
>>> function for the reduceByKey() operation.
>>>
>>> Here is the code
>>>
>>> dd = hive_context.read.orc(orcfile_dir).rdd.map(lambda x: 
>>> (x[0],x)).groupByKey(25).take(2)
>>>
>>> Here is the return value
>>>
>>> >>> dd[(u'KEY_1', >> >>> 0x107be0c50>), (u'KEY_2', >> >>> at 0x107be0c10>)]
>>>
>>> and Here are the iterable contents dd[0][1]
>>>
>>> Row(key=u'KEY_1', hash_fn=u'deec95d65ca6b3b4f2e1ef259040aa79', 
>>> value=u'e7dc1f2a')Row(key=u'KEY_1', 
>>> hash_fn=u'f8891048a9ef8331227b4af080ecd28a', 
>>> value=u'fb0bc953')...Row(key=u'KEY_1', 
>>> hash_fn=u'1b9d2bb2db28603ff21052efcd13f242', 
>>> value=u'd39714d3')Row(key=u'KEY_1', 
>>> hash_fn=u'c41b0269706ac423732a6bab24bf8a6a', value=u'ab58db92')
>>>
>>> My question is how do replace with reduceByKey() and get the same
>>> output as above?
>>>
>>> Santhosh
>>>
>>
>


Re: Replacing groupBykey() with reduceByKey()

2018-08-06 Thread Bathi CCDB
Hey Bipin,
Thanks for the reply, I am actually aggregating after the groupByKey()
operation,
I have posted the wrong code snippet in my first email. Here is what I am
doing

dd = hive_context.read.orc(orcfile_dir).rdd.map(lambda x:
(x[0],x)).groupByKey(25).map(build_edges)

Can we replace reduceByKey() in this context ?

Santhosh


On Mon, Aug 6, 2018 at 1:20 AM, Biplob Biswas 
wrote:

> Hi Santhosh,
>
> If you are not performing any aggregation, then I don't think you can
> replace your groupbykey with a reducebykey, and as I see you are only
> grouping and taking 2 values of the result, thus I believe you can't just
> replace your groupbykey with that.
>
> Thanks & Regards
> Biplob Biswas
>
>
> On Sat, Aug 4, 2018 at 12:05 AM Bathi CCDB  wrote:
>
>> I am trying to replace groupByKey() with reudceByKey(), I am a pyspark
>> and python newbie and I am having a hard time figuring out the lambda
>> function for the reduceByKey() operation.
>>
>> Here is the code
>>
>> dd = hive_context.read.orc(orcfile_dir).rdd.map(lambda x: 
>> (x[0],x)).groupByKey(25).take(2)
>>
>> Here is the return value
>>
>> >>> dd[(u'KEY_1', > >>> 0x107be0c50>), (u'KEY_2', > >>> at 0x107be0c10>)]
>>
>> and Here are the iterable contents dd[0][1]
>>
>> Row(key=u'KEY_1', hash_fn=u'deec95d65ca6b3b4f2e1ef259040aa79', 
>> value=u'e7dc1f2a')Row(key=u'KEY_1', 
>> hash_fn=u'f8891048a9ef8331227b4af080ecd28a', 
>> value=u'fb0bc953')...Row(key=u'KEY_1', 
>> hash_fn=u'1b9d2bb2db28603ff21052efcd13f242', 
>> value=u'd39714d3')Row(key=u'KEY_1', 
>> hash_fn=u'c41b0269706ac423732a6bab24bf8a6a', value=u'ab58db92')
>>
>> My question is how do replace with reduceByKey() and get the same output
>> as above?
>>
>> Santhosh
>>
>


Re: Replacing groupBykey() with reduceByKey()

2018-08-06 Thread Biplob Biswas
Hi Santhosh,

If you are not performing any aggregation, then I don't think you can
replace your groupbykey with a reducebykey, and as I see you are only
grouping and taking 2 values of the result, thus I believe you can't just
replace your groupbykey with that.

Thanks & Regards
Biplob Biswas


On Sat, Aug 4, 2018 at 12:05 AM Bathi CCDB  wrote:

> I am trying to replace groupByKey() with reudceByKey(), I am a pyspark
> and python newbie and I am having a hard time figuring out the lambda
> function for the reduceByKey() operation.
>
> Here is the code
>
> dd = hive_context.read.orc(orcfile_dir).rdd.map(lambda x: 
> (x[0],x)).groupByKey(25).take(2)
>
> Here is the return value
>
> >>> dd[(u'KEY_1',  >>> 0x107be0c50>), (u'KEY_2',  >>> at 0x107be0c10>)]
>
> and Here are the iterable contents dd[0][1]
>
> Row(key=u'KEY_1', hash_fn=u'deec95d65ca6b3b4f2e1ef259040aa79', 
> value=u'e7dc1f2a')Row(key=u'KEY_1', 
> hash_fn=u'f8891048a9ef8331227b4af080ecd28a', 
> value=u'fb0bc953')...Row(key=u'KEY_1', 
> hash_fn=u'1b9d2bb2db28603ff21052efcd13f242', 
> value=u'd39714d3')Row(key=u'KEY_1', 
> hash_fn=u'c41b0269706ac423732a6bab24bf8a6a', value=u'ab58db92')
>
> My question is how do replace with reduceByKey() and get the same output
> as above?
>
> Santhosh
>


Replacing groupBykey() with reduceByKey()

2018-08-03 Thread Bathi CCDB
I am trying to replace groupByKey() with reudceByKey(), I am a pyspark and
python newbie and I am having a hard time figuring out the lambda function
for the reduceByKey() operation.

Here is the code

dd = hive_context.read.orc(orcfile_dir).rdd.map(lambda x:
(x[0],x)).groupByKey(25).take(2)

Here is the return value

>>> dd[(u'KEY_1', >> 0x107be0c50>), (u'KEY_2', >> 0x107be0c10>)]

and Here are the iterable contents dd[0][1]

Row(key=u'KEY_1', hash_fn=u'deec95d65ca6b3b4f2e1ef259040aa79',
value=u'e7dc1f2a')Row(key=u'KEY_1',
hash_fn=u'f8891048a9ef8331227b4af080ecd28a',
value=u'fb0bc953')...Row(key=u'KEY_1',
hash_fn=u'1b9d2bb2db28603ff21052efcd13f242',
value=u'd39714d3')Row(key=u'KEY_1',
hash_fn=u'c41b0269706ac423732a6bab24bf8a6a', value=u'ab58db92')

My question is how do replace with reduceByKey() and get the same output as
above?

Santhosh


why groupByKey still shuffle if SQL does "Distribute By" on same columns ?

2018-01-30 Thread Dibyendu Bhattacharya
 Hi,

I am trying something like this..

val sesDS:  Dataset[XXX] = hiveContext.sql(select).as[XXX]

The select statement is something like this : "select * from sometable 
DISTRIBUTE by col1, col2, col3"

Then comes groupByKey...

val gpbyDS = sesDS .groupByKey(x => (x.col1, x.col2, x.col3))

As my select is already Distribute the data based on columns which are same
as what I used in groupByKey, Why does groupByKey  still doing the shuffle
? Is this an issue or I am missing something ?

Regards,
Dibyendu


Re: groupByKey vs mapPartitions for efficient grouping within a Partition

2017-01-16 Thread Andy Dang
groupByKey() is a wide dependency and will cause a full shuffle. It's
advised against using this transformation unless you keys are balanced
(well-distributed) and you need a full shuffle.

Otherwise, what you want is aggregateByKey() or reduceByKey() (depending on
the output). These actions are backed by comineByKey(), which can perform
map-side aggregation.

---
Regards,
Andy

On Mon, Jan 16, 2017 at 2:21 PM, Patrick <titlibat...@gmail.com> wrote:

> Hi,
>
> Does groupByKey has intelligence associated with it, such that if all the
> keys resides in the same partition, it should not do the shuffle?
>
> Or user should write mapPartitions( scala groupBy code).
>
> Which would be more efficient and what are the memory considerations?
>
>
> Thanks
>
>
>
>


groupByKey vs mapPartitions for efficient grouping within a Partition

2017-01-16 Thread Patrick
Hi,

Does groupByKey has intelligence associated with it, such that if all the
keys resides in the same partition, it should not do the shuffle?

Or user should write mapPartitions( scala groupBy code).

Which would be more efficient and what are the memory considerations?


Thanks


groupByKey vs reduceByKey

2016-12-09 Thread Appu K
Hi,

Read somewhere that

groupByKey() in RDD disables map-side aggregation as the aggregation
function (appending to a list) does not save any space.


However from my understanding, using something like reduceByKey or
 (CombineByKey + a combiner function,) we could reduce the data shuffled
around.

Wondering why map-side aggregation is disabled for groupByKey() and why it
wouldn’t save space at the executor where data is received after the
shuffle.


cheers
Appu


groupbykey data access size vs Reducer number

2016-11-28 Thread memoryzpp
Hi all,

How shuffle in Spark 1.6.2 work? I am using groupbykey(int: partitionSize).
groupbykey, a shuffle operation, has mapper side (M mappers) and reducer
side (R reducers). 

Here R=partitionSize, and each mapper will produce a local file output and
store in spark.local.dir. Let's assume total shuffle data size is D, then
each reducer will shuffle read in D/R data. 

My question is, when changing R(for example, decreasing R), each reducer
will read in more data (size is P = D/R increases as R decreases) per
partition. Since data for each reducer comes from every mapper output, does
that mean on average, each reducer reads in P/M = D/(R*M) data. However,
what I observe is not consistent with the theory model. I use iostat tool to
examine the I/O request size, and found no different in I/O request size
when decreasing R. Does anyone know any details on shuffle? Many thanks!

R = 6000
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n28140/iostat_m14_reduceBy2_core6_readSize.png>
 

R = 3000
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n28140/iostat_m14_reduceBy4_core6_readSize.png>
 

As seen from two figures comparing two iostat plot results, the average IO
request sizes of two reducer number are the same, 250 sectors ( 250 * 512
B/sector = 128 KB).






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/groupbykey-data-access-size-vs-Reducer-number-tp28140.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



task not serializable in case of groupByKey() + mapGroups + map?

2016-10-31 Thread Yang
with the following simple code


val a =
sc.createDataFrame(sc.parallelize(Seq((1,2),(3,4.as[(Int,Int)]
val grouped = a.groupByKey({x:(Int,Int)=>x._1})
val mappedGroups = grouped.mapGroups((k,x)=>{(k,1)})
val yyy = sc.broadcast(1)
val last = mappedGroups.rdd.map(xx=>{
  val simpley = yyy.value

  1
})



I'm seeing error:
org.apache.spark.SparkException: Task not serializable
  at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
  at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2053)
  at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:366)
  at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:365)
  at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
  at org.apache.spark.rdd.RDD.map(RDD.scala:365)
  ... 56 elided
Caused by: java.io.NotSerializableException:
org.apache.spark.sql.execution.QueryExecution
Serialization stack:
- object not serializable (class:
org.apache.spark.sql.execution.QueryExecution, value: == Parsed Logical
Plan ==
'AppendColumns , unresolveddeserializer(newInstance(class
scala.Tuple2)), [input[0, int, true] AS value#210]
+- LogicalRDD [_1#201, _2#202]

== Analyzed Logical Plan ==
_1: int, _2: int, value: int
AppendColumns , newInstance(class scala.Tuple2), [input[0, int,
true] AS value#210]
+- LogicalRDD [_1#201, _2#202]

== Optimized Logical Plan ==
AppendColumns , newInstance(class scala.Tuple2), [input[0, int,
true] AS value#210]
+- LogicalRDD [_1#201, _2#202]

== Physical Plan ==
AppendColumns , newInstance(class scala.Tuple2), [input[0, int,
true] AS value#210]
+- Scan ExistingRDD[_1#201,_2#202])
- field (class: org.apache.spark.sql.KeyValueGroupedDataset, name:
queryExecution, type: class org.apache.spark.sql.execution.QueryExecution)
- object (class org.apache.spark.sql.KeyValueGroupedDataset,
org.apache.spark.sql.KeyValueGroupedDataset@71148f10)
- field (class: $iw, name: grouped, type: class
org.apache.spark.sql.KeyValueGroupedDataset)
- object (class $iw, $iw@7b1c13e4)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3e9a0c21)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@218cc682)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@2ecedd08)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@79efd402)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@d81976c)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@2d5d6e2a)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@74dc6a7a)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5e220d85)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@1c790a4f)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@1d954b06)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@1343c904)
- field (class: $line115.$read, name: $iw, type: class $iw)
- object (class $line115.$read, $line115.$read@42497908)
- field (class: $iw, name: $line115$read, type: class
$line115.$read)
- object (class $iw, $iw@af36da5)
- field (class: $iw, name: $outer, type: class $iw)
- object (class $iw, $iw@2fd5b99a)
- field (class: $anonfun$1, name: $outer, type: class $iw)
- object (class $anonfun$1, )
  at
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
  at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
  at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
  at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
  ... 65 more


Predict a single vector with the new spark.ml API to avoid groupByKey() after a flatMap()?

2016-10-20 Thread jglov
Is there a way to predict a single vector with the new spark.ml API, although
in my case it's because I want to do this within a map() to avoid calling
groupByKey() after a flatMap():

*Current code (pyspark):*

% Given 'model', 'rdd', and a function 'split_element' that splits an
element of the RDD into a list of elements (and assuming
% each element has both a value and a key so that groupByKey will work to
merge them later)

split_rdd = rdd.flatMap(split_element)
split_results = model.transform(split_rdd.toDF()).rdd
return split_results.groupByKey()

*Desired code:*

split_rdd = rdd.map(split_element)
split_results = split_rdd.map(lambda elem_list: [model.transformOne(elem)
for elem in elem_list])
return split_results



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Predict-a-single-vector-with-the-new-spark-ml-API-to-avoid-groupByKey-after-a-flatMap-tp27932.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: groupByKey() compile error after upgrading from 1.6.2 to 2.0.0

2016-08-10 Thread Arun Luthra
Thanks, pair_rdd.rdd.groupByKey() did the trick.

On Wed, Aug 10, 2016 at 8:24 PM, Holden Karau <hol...@pigscanfly.ca> wrote:

> So it looks like (despite the name) pair_rdd is actually a Dataset - my
> guess is you might have a map on a dataset up above which used to return an
> RDD but now returns another dataset or an unexpected implicit conversion.
> Just add rdd() before the groupByKey call to push it into an RDD. That
> being said - groupByKey generally is an anti-pattern so please be careful
> with it.
>
> On Wed, Aug 10, 2016 at 8:07 PM, Arun Luthra <arun.lut...@gmail.com>
> wrote:
>
>> Here is the offending line:
>>
>> val some_rdd = pair_rdd.groupByKey().flatMap { case (mk: MyKey, md_iter:
>> Iterable[MyData]) => {
>> ...
>>
>>
>> [error]  .scala:249: overloaded method value groupByKey with
>> alternatives:
>> [error]   [K](func: 
>> org.apache.spark.api.java.function.MapFunction[(aaa.MyKey,
>> aaa.MyData),K], encoder: org.apache.spark.sql.Encoder[K
>> ])org.apache.spark.sql.KeyValueGroupedDataset[K,(aaa.MyKey, aaa.MyData)]
>> 
>> [error]   [K](func: ((aaa.MyKey, aaa.MyData)) => K)(implicit evidence$4:
>> org.apache.spark.sql.Encoder[K])org.apache.spark.sql.KeyValueGroupedDataset[K,(aaa.MyKey,
>> aaa.MyData)]
>> [error]  cannot be applied to ()
>> [error] val some_rdd = pair_rdd.groupByKey().flatMap { case (mk:
>> MyKey, hd_iter: Iterable[MyData]) => {
>> [error] ^
>> [warn] .scala:249: non-variable type argument aaa.MyData in
>> type pattern Iterable[aaa.MyData] (the underlying of Iterable[aaa.MyData])
>> is unchecked since it is eliminated by erasure
>> [warn] val some_rdd = pair_rdd.groupByKey().flatMap { case (mk:
>> MyKey, hd_iter: Iterable[MyData]) => {
>> [warn]
>>   ^
>> [warn] one warning found
>>
>>
>> I can't see any obvious API change... what is the problem?
>>
>> Thanks,
>> Arun
>>
>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>


Re: groupByKey() compile error after upgrading from 1.6.2 to 2.0.0

2016-08-10 Thread Holden Karau
So it looks like (despite the name) pair_rdd is actually a Dataset - my
guess is you might have a map on a dataset up above which used to return an
RDD but now returns another dataset or an unexpected implicit conversion.
Just add rdd() before the groupByKey call to push it into an RDD. That
being said - groupByKey generally is an anti-pattern so please be careful
with it.

On Wed, Aug 10, 2016 at 8:07 PM, Arun Luthra <arun.lut...@gmail.com> wrote:

> Here is the offending line:
>
> val some_rdd = pair_rdd.groupByKey().flatMap { case (mk: MyKey, md_iter:
> Iterable[MyData]) => {
> ...
>
>
> [error]  .scala:249: overloaded method value groupByKey with
> alternatives:
> [error]   [K](func: org.apache.spark.api.java.function.MapFunction[(aaa.MyKey,
> aaa.MyData),K], encoder: org.apache.spark.sql.Encoder[
> K])org.apache.spark.sql.KeyValueGroupedDataset[K,(aaa.MyKey, aaa.MyData)]
> 
> [error]   [K](func: ((aaa.MyKey, aaa.MyData)) => K)(implicit evidence$4:
> org.apache.spark.sql.Encoder[K])org.apache.spark.sql.
> KeyValueGroupedDataset[K,(aaa.MyKey, aaa.MyData)]
> [error]  cannot be applied to ()
> [error] val some_rdd = pair_rdd.groupByKey().flatMap { case (mk:
> MyKey, hd_iter: Iterable[MyData]) => {
> [error] ^
> [warn] .scala:249: non-variable type argument aaa.MyData in
> type pattern Iterable[aaa.MyData] (the underlying of Iterable[aaa.MyData])
> is unchecked since it is eliminated by erasure
> [warn] val some_rdd = pair_rdd.groupByKey().flatMap { case (mk: MyKey,
> hd_iter: Iterable[MyData]) => {
> [warn]
> ^
> [warn] one warning found
>
>
> I can't see any obvious API change... what is the problem?
>
> Thanks,
> Arun
>



-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


groupByKey() compile error after upgrading from 1.6.2 to 2.0.0

2016-08-10 Thread Arun Luthra
Here is the offending line:

val some_rdd = pair_rdd.groupByKey().flatMap { case (mk: MyKey, md_iter:
Iterable[MyData]) => {
...


[error]  .scala:249: overloaded method value groupByKey with
alternatives:
[error]   [K](func:
org.apache.spark.api.java.function.MapFunction[(aaa.MyKey, aaa.MyData),K],
encoder:
org.apache.spark.sql.Encoder[K])org.apache.spark.sql.KeyValueGroupedDataset[K,(aaa.MyKey,
aaa.MyData)] 
[error]   [K](func: ((aaa.MyKey, aaa.MyData)) => K)(implicit evidence$4:
org.apache.spark.sql.Encoder[K])org.apache.spark.sql.KeyValueGroupedDataset[K,(aaa.MyKey,
aaa.MyData)]
[error]  cannot be applied to ()
[error] val some_rdd = pair_rdd.groupByKey().flatMap { case (mk: MyKey,
hd_iter: Iterable[MyData]) => {
[error] ^
[warn] .scala:249: non-variable type argument aaa.MyData in
type pattern Iterable[aaa.MyData] (the underlying of Iterable[aaa.MyData])
is unchecked since it is eliminated by erasure
[warn] val some_rdd = pair_rdd.groupByKey().flatMap { case (mk: MyKey,
hd_iter: Iterable[MyData]) => {
[warn]
^
[warn] one warning found


I can't see any obvious API change... what is the problem?

Thanks,
Arun


Re: groupByKey returns an emptyRDD

2016-06-06 Thread Ted Yu
Can you give us a bit more information ?

how you packaged the code into jar
command you used for execution
version of Spark
related log snippet

Thanks

On Mon, Jun 6, 2016 at 10:43 AM, Daniel Haviv <
daniel.ha...@veracity-group.com> wrote:

> Hi,
> I'm wrapped the following code into a jar:
>
> val test = sc.parallelize(Seq(("daniel", "a"), ("daniel", "b"), ("test", 
> "1)")))
>
> val agg = test.groupByKey()
> agg.collect.foreach(r=>{println(r._1)})
>
>
> The result of groupByKey is an empty RDD, when I'm trying the same code using 
> the spark-shell it's running as expected.
>
>
> Any ideas?
>
>
> Thank you,
>
> Daniel
>
>


groupByKey returns an emptyRDD

2016-06-06 Thread Daniel Haviv
Hi,
I'm wrapped the following code into a jar:

val test = sc.parallelize(Seq(("daniel", "a"), ("daniel", "b"), ("test", "1)")))

val agg = test.groupByKey()
agg.collect.foreach(r=>{println(r._1)})


The result of groupByKey is an empty RDD, when I'm trying the same
code using the spark-shell it's running as expected.


Any ideas?


Thank you,

Daniel


Re: Alternative to groupByKey() + mapValues() for non-commutative, non-associative aggregate?

2016-05-03 Thread Kevin Mellott
If you put this into a dataframe then you may be able to use one hot
encoding and treat these as categorical features. I believe that the ml
pipeline components use project tungsten so the performance will be very
fast. After you process the result on the dataframe you would then need to
assemble your desired format.
On May 3, 2016 4:29 PM, "Bibudh Lahiri" <bibudhlah...@gmail.com> wrote:

> Hi,
>   I have multiple procedure codes that a patient has undergone, in an RDD
> with a different row for each combination of patient and procedure. I am
> trying to covert this data to the LibSVM format, so that the resultant
> looks as follows:
>
>   "0 1:1 2:0 3:1 29:1 30:1 32:1 110:1"
>
>   where 1, 2, 3, 29, 30, 32, 110 are numeric procedure codes a given
> patient has undergone. Note that Spark needs these codes to be one-based
> and in ascending order, so I am using a combination of groupByKey() and
> mapValues() to do this conversion as follows:
>
> procedures_rdd = procedures_rdd.groupByKey().mapValues(combine_procedures)
>
> where combine_procedures() is defined as:
>
> def combine_procedures(l_procs):
>   ascii_procs = map(lambda x: int(custom_encode(x)), l_procs)
>   return ' '.join([str(x) + ":1" for x in sorted(ascii_procs)])
>
>   Note that this reduction is neither commutative nor associative, since
> combining "29:1 30:1" and "32:1 110:1" to "32:1 110:1 29:1 30:1" is not
> going to work.
>   Can someone suggest some faster alternative to the combination
> of groupByKey() and mapValues() for this?
>
> Thanks
>Bibudh
>
>
> --
> Bibudh Lahiri
> Senior Data Scientist, Impetus Technolgoies
> 720 University Avenue, Suite 130
> Los Gatos, CA 95129
> http://knowthynumbers.blogspot.com/
>
>


Alternative to groupByKey() + mapValues() for non-commutative, non-associative aggregate?

2016-05-03 Thread Bibudh Lahiri
Hi,
  I have multiple procedure codes that a patient has undergone, in an RDD
with a different row for each combination of patient and procedure. I am
trying to covert this data to the LibSVM format, so that the resultant
looks as follows:

  "0 1:1 2:0 3:1 29:1 30:1 32:1 110:1"

  where 1, 2, 3, 29, 30, 32, 110 are numeric procedure codes a given
patient has undergone. Note that Spark needs these codes to be one-based
and in ascending order, so I am using a combination of groupByKey() and
mapValues() to do this conversion as follows:

procedures_rdd = procedures_rdd.groupByKey().mapValues(combine_procedures)

where combine_procedures() is defined as:

def combine_procedures(l_procs):
  ascii_procs = map(lambda x: int(custom_encode(x)), l_procs)
  return ' '.join([str(x) + ":1" for x in sorted(ascii_procs)])

  Note that this reduction is neither commutative nor associative, since
combining "29:1 30:1" and "32:1 110:1" to "32:1 110:1 29:1 30:1" is not
going to work.
  Can someone suggest some faster alternative to the combination
of groupByKey() and mapValues() for this?

Thanks
   Bibudh


-- 
Bibudh Lahiri
Senior Data Scientist, Impetus Technolgoies
720 University Avenue, Suite 130
Los Gatos, CA 95129
http://knowthynumbers.blogspot.com/


Re: How to call a custom function from GroupByKey which takes Iterable[Row] as input and returns a Map[Int,String] as output in scala

2016-01-20 Thread Neha Mehta
Hi Vishal,

Thanks for the solution. I was able to get it working for my scenario.
Regarding the Task not serializable error, I still get it when I declare a
function outside the main method. However, if I declare it inside the main
"val func = {}", it is working fine for me.

In case you have any insight to share on the same, then please do share it.

Thanks for the help.

Regards,
Neha

On Wed, Jan 20, 2016 at 11:39 AM, Vishal Maru <vzm...@gmail.com> wrote:

> It seems Spark is not able to serialize your function code to worker nodes.
>
> I have tried to put a solution in simple set of commands. Maybe you can
> combine last four line into function.
>
>
> val arr = Array((1,"A","<20","0"), (1,"A",">20 & <40","1"), (1,"B",">20 &
> <40","0"), (1,"C",">20 & <40","0"), (1,"C",">20 & <40","0"),
> (2,"A","<20","0"), (3,"B",">20 & <40","1"), (3,"B",">40","2"))
>
> val rdd = sc.parallelize(arr)
>
> val prdd = rdd.map(a => (a._1,a))
> val totals = prdd.groupByKey.map(a => (a._1, a._2.size))
>
> var n1 = rdd.map(a => ((a._1, a._2), 1) )
> var n2 = n1.reduceByKey(_+_).map(a => (a._1._1, (a._1._2, a._2)))
> var n3 = n2.join(totals).map(a => (a._1, (a._2._1._1, a._2._1._2.toDouble
> / a._2._2)))
> var n4 = n3.map(a => (a._1, a._2._1 + ":" +
> a._2._2.toString)).reduceByKey((a, b) => a + "|" + b)
>
> n4.collect.foreach(println)
>
>
>
>
> On Mon, Jan 18, 2016 at 6:47 AM, Neha Mehta <nehamehta...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I have a scenario wherein my dataset has around 30 columns. It is
>> basically user activity information. I need to group the information by
>> each user and then for each column/activity parameter I need to find the
>> percentage affinity for each value in that column for that user. Below is
>> the sample input and output.
>>
>> UserId C1 C2 C3
>> 1 A <20 0
>> 1 A >20 & <40 1
>> 1 B >20 & <40 0
>> 1 C >20 & <40 0
>> 1 C >20 & <40 0
>> 2 A <20 0
>> 3 B >20 & <40 1
>> 3 B >40 2
>>
>>
>>
>>
>>
>>
>>
>>
>> Output
>>
>>
>> 1 A:0.4|B:0.2|C:0.4 <20:02|>20 & <40:0.8 0:0.8|1:0.2
>> 2 A:1 <20:1 0:01
>> 3 B:1 >20 & <40:0.5|>40:0.5 1:0.5|2:0.5
>>
>> Presently this is how I am calculating these values:
>> Group by UserId and C1 and compute values for C1 for all the users, then
>> do a group by by Userid and C2 and find the fractions for C2 for each user
>> and so on. This approach is quite slow.  Also the number of records for
>> each user will be at max 30. So I would like to take a second approach
>> wherein I do a groupByKey and pass the entire list of records for each key
>> to a function which computes all the percentages for each column for each
>> user at once. Below are the steps I am trying to follow:
>>
>> 1. Dataframe1 => group by UserId , find the counts of records for each
>> user. Join the results back to the input so that counts are available with
>> each record
>> 2. Dataframe1.map(s=>s(1),s).groupByKey().map(s=>myUserAggregator(s._2))
>>
>> def myUserAggregator(rows: Iterable[Row]):
>> scala.collection.mutable.Map[Int,String] = {
>> val returnValue = scala.collection.mutable.Map[Int,String]()
>> if (rows != null) {
>>   val activityMap = scala.collection.mutable.Map[Int,
>> scala.collection.mutable.Map[String,
>> Int]]().withDefaultValue(scala.collection.mutable.Map[String,
>> Int]().withDefaultValue(0))
>>
>>   val rowIt = rows.iterator
>>   var sentCount = 1
>>   for (row <- rowIt) {
>> sentCount = row(29).toString().toInt
>> for (i <- 0 until row.length) {
>>   var m = activityMap(i)
>>   if (activityMap(i) == null) {
>> m = collection.mutable.Map[String,
>> Int]().withDefaultValue(0)
>>   }
>>   m(row(i).toString()) += 1
>>   activityMap.update(i, m)
>> }
>>   }
>>   var activityPPRow: Row = Row()
>>   for((k,v) <- activityMap) {
>>   var rowVal:String = ""
>>   for((a,b) <- v) {
>> rowVal += rowVal + a + ":" + b

Re: How to call a custom function from GroupByKey which takes Iterable[Row] as input and returns a Map[Int,String] as output in scala

2016-01-19 Thread Vishal Maru
It seems Spark is not able to serialize your function code to worker nodes.

I have tried to put a solution in simple set of commands. Maybe you can
combine last four line into function.


val arr = Array((1,"A","<20","0"), (1,"A",">20 & <40","1"), (1,"B",">20 &
<40","0"), (1,"C",">20 & <40","0"), (1,"C",">20 & <40","0"),
(2,"A","<20","0"), (3,"B",">20 & <40","1"), (3,"B",">40","2"))

val rdd = sc.parallelize(arr)

val prdd = rdd.map(a => (a._1,a))
val totals = prdd.groupByKey.map(a => (a._1, a._2.size))

var n1 = rdd.map(a => ((a._1, a._2), 1) )
var n2 = n1.reduceByKey(_+_).map(a => (a._1._1, (a._1._2, a._2)))
var n3 = n2.join(totals).map(a => (a._1, (a._2._1._1, a._2._1._2.toDouble /
a._2._2)))
var n4 = n3.map(a => (a._1, a._2._1 + ":" +
a._2._2.toString)).reduceByKey((a, b) => a + "|" + b)

n4.collect.foreach(println)




On Mon, Jan 18, 2016 at 6:47 AM, Neha Mehta <nehamehta...@gmail.com> wrote:

> Hi,
>
> I have a scenario wherein my dataset has around 30 columns. It is
> basically user activity information. I need to group the information by
> each user and then for each column/activity parameter I need to find the
> percentage affinity for each value in that column for that user. Below is
> the sample input and output.
>
> UserId C1 C2 C3
> 1 A <20 0
> 1 A >20 & <40 1
> 1 B >20 & <40 0
> 1 C >20 & <40 0
> 1 C >20 & <40 0
> 2 A <20 0
> 3 B >20 & <40 1
> 3 B >40 2
>
>
>
>
>
>
>
>
> Output
>
>
> 1 A:0.4|B:0.2|C:0.4 <20:02|>20 & <40:0.8 0:0.8|1:0.2
> 2 A:1 <20:1 0:01
> 3 B:1 >20 & <40:0.5|>40:0.5 1:0.5|2:0.5
>
> Presently this is how I am calculating these values:
> Group by UserId and C1 and compute values for C1 for all the users, then
> do a group by by Userid and C2 and find the fractions for C2 for each user
> and so on. This approach is quite slow.  Also the number of records for
> each user will be at max 30. So I would like to take a second approach
> wherein I do a groupByKey and pass the entire list of records for each key
> to a function which computes all the percentages for each column for each
> user at once. Below are the steps I am trying to follow:
>
> 1. Dataframe1 => group by UserId , find the counts of records for each
> user. Join the results back to the input so that counts are available with
> each record
> 2. Dataframe1.map(s=>s(1),s).groupByKey().map(s=>myUserAggregator(s._2))
>
> def myUserAggregator(rows: Iterable[Row]):
> scala.collection.mutable.Map[Int,String] = {
> val returnValue = scala.collection.mutable.Map[Int,String]()
> if (rows != null) {
>   val activityMap = scala.collection.mutable.Map[Int,
> scala.collection.mutable.Map[String,
> Int]]().withDefaultValue(scala.collection.mutable.Map[String,
> Int]().withDefaultValue(0))
>
>   val rowIt = rows.iterator
>   var sentCount = 1
>   for (row <- rowIt) {
> sentCount = row(29).toString().toInt
> for (i <- 0 until row.length) {
>   var m = activityMap(i)
>   if (activityMap(i) == null) {
> m = collection.mutable.Map[String,
> Int]().withDefaultValue(0)
>   }
>   m(row(i).toString()) += 1
>   activityMap.update(i, m)
> }
>   }
>   var activityPPRow: Row = Row()
>   for((k,v) <- activityMap) {
>   var rowVal:String = ""
>   for((a,b) <- v) {
> rowVal += rowVal + a + ":" + b/sentCount + "|"
>   }
>   returnValue.update(k, rowVal)
> //  activityPPRow.apply(k) = rowVal
>   }
>
> }
> return returnValue
>   }
>
> When I run step 2 I get the following error. I am new to Scala and Spark
> and am unable to figure out how to pass the Iterable[Row] to a function and
> get back the results.
>
> org.apache.spark.SparkException: Task not serializable
> at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
> at
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
> at
> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
> at org.apache.spark.SparkContext.clean(SparkContext.scala:2032)
> at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:318)
> at org.apache.sp

How to call a custom function from GroupByKey which takes Iterable[Row] as input and returns a Map[Int,String] as output in scala

2016-01-18 Thread Neha Mehta
Hi,

I have a scenario wherein my dataset has around 30 columns. It is basically
user activity information. I need to group the information by each user and
then for each column/activity parameter I need to find the percentage
affinity for each value in that column for that user. Below is the sample
input and output.

UserId C1 C2 C3
1 A <20 0
1 A >20 & <40 1
1 B >20 & <40 0
1 C >20 & <40 0
1 C >20 & <40 0
2 A <20 0
3 B >20 & <40 1
3 B >40 2








Output


1 A:0.4|B:0.2|C:0.4 <20:02|>20 & <40:0.8 0:0.8|1:0.2
2 A:1 <20:1 0:01
3 B:1 >20 & <40:0.5|>40:0.5 1:0.5|2:0.5

Presently this is how I am calculating these values:
Group by UserId and C1 and compute values for C1 for all the users, then do
a group by by Userid and C2 and find the fractions for C2 for each user and
so on. This approach is quite slow.  Also the number of records for each
user will be at max 30. So I would like to take a second approach wherein I
do a groupByKey and pass the entire list of records for each key to a
function which computes all the percentages for each column for each user
at once. Below are the steps I am trying to follow:

1. Dataframe1 => group by UserId , find the counts of records for each
user. Join the results back to the input so that counts are available with
each record
2. Dataframe1.map(s=>s(1),s).groupByKey().map(s=>myUserAggregator(s._2))

def myUserAggregator(rows: Iterable[Row]):
scala.collection.mutable.Map[Int,String] = {
val returnValue = scala.collection.mutable.Map[Int,String]()
if (rows != null) {
  val activityMap = scala.collection.mutable.Map[Int,
scala.collection.mutable.Map[String,
Int]]().withDefaultValue(scala.collection.mutable.Map[String,
Int]().withDefaultValue(0))

  val rowIt = rows.iterator
  var sentCount = 1
  for (row <- rowIt) {
sentCount = row(29).toString().toInt
for (i <- 0 until row.length) {
  var m = activityMap(i)
  if (activityMap(i) == null) {
m = collection.mutable.Map[String,
Int]().withDefaultValue(0)
  }
  m(row(i).toString()) += 1
  activityMap.update(i, m)
}
  }
  var activityPPRow: Row = Row()
  for((k,v) <- activityMap) {
  var rowVal:String = ""
  for((a,b) <- v) {
rowVal += rowVal + a + ":" + b/sentCount + "|"
  }
  returnValue.update(k, rowVal)
//  activityPPRow.apply(k) = rowVal
  }

}
return returnValue
  }

When I run step 2 I get the following error. I am new to Scala and Spark
and am unable to figure out how to pass the Iterable[Row] to a function and
get back the results.

org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2032)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:318)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:317)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
at org.apache.spark.rdd.RDD.map(RDD.scala:317)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:97)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:102)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:104)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:106)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:108)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:110)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:112)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:114)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:116)
..


Thanks for the help.

Regards,
Neha Mehta


How to call a custom function from GroupByKey which takes Iterable[Row] as input and returns a Map[Int,String] as output in scala

2016-01-18 Thread Neha Mehta
Hi,

I have a scenario wherein my dataset has around 30 columns. It is basically
user activity information. I need to group the information by each user and
then for each column/activity parameter I need to find the percentage
affinity for each value in that column for that user. Below is the sample
input and output.

UserId C1 C2 C3
1 A <20 0
1 A >20 & <40 1
1 B >20 & <40 0
1 C >20 & <40 0
1 C >20 & <40 0
2 A <20 0
3 B >20 & <40 1
3 B >40 2








Output


1 A:0.4|B:0.2|C:0.4 <20:02|>20 & <40:0.8 0:0.8|1:0.2
2 A:1 <20:1 0:01
3 B:1 >20 & <40:0.5|>40:0.5 1:0.5|2:0.5

Presently this is how I am calculating these values:
Group by UserId and C1 and compute values for C1 for all the users, then do
a group by by Userid and C2 and find the fractions for C2 for each user and
so on. This approach is quite slow.  Also the number of records for each
user will be at max 30. So I would like to take a second approach wherein I
do a groupByKey and pass the entire list of records for each key to a
function which computes all the percentages for each column for each user
at once. Below are the steps I am trying to follow:

1. Dataframe1 => group by UserId , find the counts of records for each
user. Join the results back to the input so that counts are available with
each record
2. Dataframe1.map(s=>s(1),s).groupByKey().map(s=>myUserAggregator(s._2))

def myUserAggregator(rows: Iterable[Row]):
scala.collection.mutable.Map[Int,String] = {
val returnValue = scala.collection.mutable.Map[Int,String]()
if (rows != null) {
  val activityMap = scala.collection.mutable.Map[Int,
scala.collection.mutable.Map[String,
Int]]().withDefaultValue(scala.collection.mutable.Map[String,
Int]().withDefaultValue(0))

  val rowIt = rows.iterator
  var sentCount = 1
  for (row <- rowIt) {
sentCount = row(29).toString().toInt
for (i <- 0 until row.length) {
  var m = activityMap(i)
  if (activityMap(i) == null) {
m = collection.mutable.Map[String,
Int]().withDefaultValue(0)
  }
  m(row(i).toString()) += 1
  activityMap.update(i, m)
}
  }
  var activityPPRow: Row = Row()
  for((k,v) <- activityMap) {
  var rowVal:String = ""
  for((a,b) <- v) {
rowVal += rowVal + a + ":" + b/sentCount + "|"
  }
  returnValue.update(k, rowVal)
//  activityPPRow.apply(k) = rowVal
  }

}
return returnValue
  }

When I run step 2 I get the following error. I am new to Scala and Spark
and am unable to figure out how to pass the Iterable[Row] to a function and
get back the results.

org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2032)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:318)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:317)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
at org.apache.spark.rdd.RDD.map(RDD.scala:317)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:97)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:102)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:104)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:106)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:108)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:110)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:112)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:114)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:116)
..


Thanks for the help.

Regards,
Neha Mehta


Spark Streaming: routing by key without groupByKey

2016-01-15 Thread Lin Zhao
I have requirement to route a paired DStream to a series of map and flatMap 
such that entries with the same key goes to the same thread within the same 
batch. Closest I can come up with is groupByKey().flatMap(_._2). But this kills 
throughput by 50%.

When I think about it groupByKey is more than I need. With groupByKey the same 
thread sees all events with key Alice at a time, and only Alice. For my 
requirement if there are Bob, Charlie in between it's still OK. This seems to 
be a common routing requirement and shouldn't cause 50% performance hit. Is 
there a way to construct the stream in such way that I'm not aware of?

I have read 
https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html
 but reduceByKey isn't the solution since we are not doing aggregation. Our 
stream is a chain of map and flatMap[withState]


Re: groupByKey does not work?

2016-01-05 Thread Sean Owen
I suspect this is another instance of case classes not working as
expected between the driver and executor when used with spark-shell.
Search JIRA for some back story.

On Tue, Jan 5, 2016 at 12:42 AM, Arun Luthra <arun.lut...@gmail.com> wrote:
> Spark 1.5.0
>
> data:
>
> p1,lo1,8,0,4,0,5,20150901|5,1,1.0
> p1,lo2,8,0,4,0,5,20150901|5,1,1.0
> p1,lo3,8,0,4,0,5,20150901|5,1,1.0
> p1,lo4,8,0,4,0,5,20150901|5,1,1.0
> p1,lo1,8,0,4,0,5,20150901|5,1,1.0
> p1,lo2,8,0,4,0,5,20150901|5,1,1.0
> p1,lo3,8,0,4,0,5,20150901|5,1,1.0
> p1,lo4,8,0,4,0,5,20150901|5,1,1.0
> p1,lo1,8,0,4,0,5,20150901|5,1,1.0
> p1,lo2,8,0,4,0,5,20150901|5,1,1.0
> p1,lo3,8,0,4,0,5,20150901|5,1,1.0
> p1,lo4,8,0,4,0,5,20150901|5,1,1.0
> p1,lo1,8,0,4,0,5,20150901|5,1,1.0
> p1,lo2,8,0,4,0,5,20150901|5,1,1.0
> p1,lo3,8,0,4,0,5,20150901|5,1,1.0
> p1,lo4,8,0,4,0,5,20150901|5,1,1.0
>
> spark-shell:
>
> spark-shell \
> --num-executors 2 \
> --driver-memory 1g \
> --executor-memory 10g \
> --executor-cores 8 \
> --master yarn-client
>
>
> case class Mykey(uname:String, lo:String, f1:Char, f2:Char, f3:Char,
> f4:Char, f5:Char, f6:String)
> case class Myvalue(count1:Long, count2:Long, num:Double)
>
> val myrdd = sc.textFile("/user/al733a/mydata.txt").map { case line => {
> val spl = line.split("\\|", -1)
> val k = spl(0).split(",")
> val v = spl(1).split(",")
> (Mykey(k(0), k(1), k(2)(0).toChar, k(3)(0).toChar, k(4)(0).toChar,
> k(5)(0).toChar, k(6)(0).toChar, k(7)),
>  Myvalue(v(0).toLong, v(1).toLong, v(2).toDouble)
> )
> }}
>
> myrdd.groupByKey().map { case (mykey, val_iterable) => (mykey, 1)
> }.collect().foreach(println)
>
> (Mykey(p1,lo1,8,0,4,0,5,20150901),1)
> (Mykey(p1,lo1,8,0,4,0,5,20150901),1)
> (Mykey(p1,lo3,8,0,4,0,5,20150901),1)
> (Mykey(p1,lo3,8,0,4,0,5,20150901),1)
> (Mykey(p1,lo4,8,0,4,0,5,20150901),1)
> (Mykey(p1,lo4,8,0,4,0,5,20150901),1)
> (Mykey(p1,lo2,8,0,4,0,5,20150901),1)
> (Mykey(p1,lo2,8,0,4,0,5,20150901),1)
>
>
>
> You can see that each key is repeated 2 times but each key should only
> appear once.
>
> Arun
>
> On Mon, Jan 4, 2016 at 4:07 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>> Can you give a bit more information ?
>>
>> Release of Spark you're using
>> Minimal dataset that shows the problem
>>
>> Cheers
>>
>> On Mon, Jan 4, 2016 at 3:55 PM, Arun Luthra <arun.lut...@gmail.com> wrote:
>>>
>>> I tried groupByKey and noticed that it did not group all values into the
>>> same group.
>>>
>>> In my test dataset (a Pair rdd) I have 16 records, where there are only 4
>>> distinct keys, so I expected there to be 4 records in the groupByKey object,
>>> but instead there were 8. Each of the 4 distinct keys appear 2 times.
>>>
>>> Is this the expected behavior? I need to be able to get ALL values
>>> associated with each key grouped into a SINGLE record. Is it possible?
>>>
>>> Arun
>>>
>>> p.s. reducebykey will not be sufficient for me
>>
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: groupByKey does not work?

2016-01-04 Thread Ted Yu
Can you give a bit more information ?

Release of Spark you're using
Minimal dataset that shows the problem

Cheers

On Mon, Jan 4, 2016 at 3:55 PM, Arun Luthra <arun.lut...@gmail.com> wrote:

> I tried groupByKey and noticed that it did not group all values into the
> same group.
>
> In my test dataset (a Pair rdd) I have 16 records, where there are only 4
> distinct keys, so I expected there to be 4 records in the groupByKey
> object, but instead there were 8. Each of the 4 distinct keys appear 2
> times.
>
> Is this the expected behavior? I need to be able to get ALL values
> associated with each key grouped into a SINGLE record. Is it possible?
>
> Arun
>
> p.s. reducebykey will not be sufficient for me
>


Re: groupByKey does not work?

2016-01-04 Thread Daniel Imberman
Could you please post the associated code and output?

On Mon, Jan 4, 2016 at 3:55 PM Arun Luthra <arun.lut...@gmail.com> wrote:

> I tried groupByKey and noticed that it did not group all values into the
> same group.
>
> In my test dataset (a Pair rdd) I have 16 records, where there are only 4
> distinct keys, so I expected there to be 4 records in the groupByKey
> object, but instead there were 8. Each of the 4 distinct keys appear 2
> times.
>
> Is this the expected behavior? I need to be able to get ALL values
> associated with each key grouped into a SINGLE record. Is it possible?
>
> Arun
>
> p.s. reducebykey will not be sufficient for me
>


groupByKey does not work?

2016-01-04 Thread Arun Luthra
I tried groupByKey and noticed that it did not group all values into the
same group.

In my test dataset (a Pair rdd) I have 16 records, where there are only 4
distinct keys, so I expected there to be 4 records in the groupByKey
object, but instead there were 8. Each of the 4 distinct keys appear 2
times.

Is this the expected behavior? I need to be able to get ALL values
associated with each key grouped into a SINGLE record. Is it possible?

Arun

p.s. reducebykey will not be sufficient for me


Re: groupByKey does not work?

2016-01-04 Thread Arun Luthra
Spark 1.5.0

data:

p1,lo1,8,0,4,0,5,20150901|5,1,1.0
p1,lo2,8,0,4,0,5,20150901|5,1,1.0
p1,lo3,8,0,4,0,5,20150901|5,1,1.0
p1,lo4,8,0,4,0,5,20150901|5,1,1.0
p1,lo1,8,0,4,0,5,20150901|5,1,1.0
p1,lo2,8,0,4,0,5,20150901|5,1,1.0
p1,lo3,8,0,4,0,5,20150901|5,1,1.0
p1,lo4,8,0,4,0,5,20150901|5,1,1.0
p1,lo1,8,0,4,0,5,20150901|5,1,1.0
p1,lo2,8,0,4,0,5,20150901|5,1,1.0
p1,lo3,8,0,4,0,5,20150901|5,1,1.0
p1,lo4,8,0,4,0,5,20150901|5,1,1.0
p1,lo1,8,0,4,0,5,20150901|5,1,1.0
p1,lo2,8,0,4,0,5,20150901|5,1,1.0
p1,lo3,8,0,4,0,5,20150901|5,1,1.0
p1,lo4,8,0,4,0,5,20150901|5,1,1.0

spark-shell:

spark-shell \
--num-executors 2 \
--driver-memory 1g \
--executor-memory 10g \
--executor-cores 8 \
--master yarn-client


case class Mykey(uname:String, lo:String, f1:Char, f2:Char, f3:Char,
f4:Char, f5:Char, f6:String)
case class Myvalue(count1:Long, count2:Long, num:Double)

val myrdd = sc.textFile("/user/al733a/mydata.txt").map { case line => {
val spl = line.split("\\|", -1)
val k = spl(0).split(",")
val v = spl(1).split(",")
(Mykey(k(0), k(1), k(2)(0).toChar, k(3)(0).toChar, k(4)(0).toChar,
k(5)(0).toChar, k(6)(0).toChar, k(7)),
 Myvalue(v(0).toLong, v(1).toLong, v(2).toDouble)
)
}}

myrdd.groupByKey().map { case (mykey, val_iterable) => (mykey, 1)
}.collect().foreach(println)

(Mykey(p1,lo1,8,0,4,0,5,20150901),1)

(Mykey(p1,lo1,8,0,4,0,5,20150901),1)
(Mykey(p1,lo3,8,0,4,0,5,20150901),1)
(Mykey(p1,lo3,8,0,4,0,5,20150901),1)
(Mykey(p1,lo4,8,0,4,0,5,20150901),1)
(Mykey(p1,lo4,8,0,4,0,5,20150901),1)
(Mykey(p1,lo2,8,0,4,0,5,20150901),1)
(Mykey(p1,lo2,8,0,4,0,5,20150901),1)



You can see that each key is repeated 2 times but each key should only
appear once.

Arun

On Mon, Jan 4, 2016 at 4:07 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> Can you give a bit more information ?
>
> Release of Spark you're using
> Minimal dataset that shows the problem
>
> Cheers
>
> On Mon, Jan 4, 2016 at 3:55 PM, Arun Luthra <arun.lut...@gmail.com> wrote:
>
>> I tried groupByKey and noticed that it did not group all values into the
>> same group.
>>
>> In my test dataset (a Pair rdd) I have 16 records, where there are only 4
>> distinct keys, so I expected there to be 4 records in the groupByKey
>> object, but instead there were 8. Each of the 4 distinct keys appear 2
>> times.
>>
>> Is this the expected behavior? I need to be able to get ALL values
>> associated with each key grouped into a SINGLE record. Is it possible?
>>
>> Arun
>>
>> p.s. reducebykey will not be sufficient for me
>>
>
>


Re: groupByKey does not work?

2016-01-04 Thread Daniel Imberman
Could you try simplifying the key and seeing if that makes any difference?
Make it just a string or an int so we can count out any issues in object
equality.

On Mon, Jan 4, 2016 at 4:42 PM Arun Luthra <arun.lut...@gmail.com> wrote:

> Spark 1.5.0
>
> data:
>
> p1,lo1,8,0,4,0,5,20150901|5,1,1.0
> p1,lo2,8,0,4,0,5,20150901|5,1,1.0
> p1,lo3,8,0,4,0,5,20150901|5,1,1.0
> p1,lo4,8,0,4,0,5,20150901|5,1,1.0
> p1,lo1,8,0,4,0,5,20150901|5,1,1.0
> p1,lo2,8,0,4,0,5,20150901|5,1,1.0
> p1,lo3,8,0,4,0,5,20150901|5,1,1.0
> p1,lo4,8,0,4,0,5,20150901|5,1,1.0
> p1,lo1,8,0,4,0,5,20150901|5,1,1.0
> p1,lo2,8,0,4,0,5,20150901|5,1,1.0
> p1,lo3,8,0,4,0,5,20150901|5,1,1.0
> p1,lo4,8,0,4,0,5,20150901|5,1,1.0
> p1,lo1,8,0,4,0,5,20150901|5,1,1.0
> p1,lo2,8,0,4,0,5,20150901|5,1,1.0
> p1,lo3,8,0,4,0,5,20150901|5,1,1.0
> p1,lo4,8,0,4,0,5,20150901|5,1,1.0
>
> spark-shell:
>
> spark-shell \
> --num-executors 2 \
> --driver-memory 1g \
> --executor-memory 10g \
> --executor-cores 8 \
> --master yarn-client
>
>
> case class Mykey(uname:String, lo:String, f1:Char, f2:Char, f3:Char,
> f4:Char, f5:Char, f6:String)
> case class Myvalue(count1:Long, count2:Long, num:Double)
>
> val myrdd = sc.textFile("/user/al733a/mydata.txt").map { case line => {
> val spl = line.split("\\|", -1)
> val k = spl(0).split(",")
> val v = spl(1).split(",")
> (Mykey(k(0), k(1), k(2)(0).toChar, k(3)(0).toChar, k(4)(0).toChar,
> k(5)(0).toChar, k(6)(0).toChar, k(7)),
>  Myvalue(v(0).toLong, v(1).toLong, v(2).toDouble)
> )
> }}
>
> myrdd.groupByKey().map { case (mykey, val_iterable) => (mykey, 1)
> }.collect().foreach(println)
>
> (Mykey(p1,lo1,8,0,4,0,5,20150901),1)
>
> (Mykey(p1,lo1,8,0,4,0,5,20150901),1)
> (Mykey(p1,lo3,8,0,4,0,5,20150901),1)
> (Mykey(p1,lo3,8,0,4,0,5,20150901),1)
> (Mykey(p1,lo4,8,0,4,0,5,20150901),1)
> (Mykey(p1,lo4,8,0,4,0,5,20150901),1)
> (Mykey(p1,lo2,8,0,4,0,5,20150901),1)
> (Mykey(p1,lo2,8,0,4,0,5,20150901),1)
>
>
>
> You can see that each key is repeated 2 times but each key should only
> appear once.
>
> Arun
>
> On Mon, Jan 4, 2016 at 4:07 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Can you give a bit more information ?
>>
>> Release of Spark you're using
>> Minimal dataset that shows the problem
>>
>> Cheers
>>
>> On Mon, Jan 4, 2016 at 3:55 PM, Arun Luthra <arun.lut...@gmail.com>
>> wrote:
>>
>>> I tried groupByKey and noticed that it did not group all values into the
>>> same group.
>>>
>>> In my test dataset (a Pair rdd) I have 16 records, where there are only
>>> 4 distinct keys, so I expected there to be 4 records in the groupByKey
>>> object, but instead there were 8. Each of the 4 distinct keys appear 2
>>> times.
>>>
>>> Is this the expected behavior? I need to be able to get ALL values
>>> associated with each key grouped into a SINGLE record. Is it possible?
>>>
>>> Arun
>>>
>>> p.s. reducebykey will not be sufficient for me
>>>
>>
>>
>


Re: groupByKey does not work?

2016-01-04 Thread Arun Luthra
If I simplify the key to String column with values lo1, lo2, lo3, lo4, it
works correctly.

On Mon, Jan 4, 2016 at 4:49 PM, Daniel Imberman <daniel.imber...@gmail.com>
wrote:

> Could you try simplifying the key and seeing if that makes any difference?
> Make it just a string or an int so we can count out any issues in object
> equality.
>
> On Mon, Jan 4, 2016 at 4:42 PM Arun Luthra <arun.lut...@gmail.com> wrote:
>
>> Spark 1.5.0
>>
>> data:
>>
>> p1,lo1,8,0,4,0,5,20150901|5,1,1.0
>> p1,lo2,8,0,4,0,5,20150901|5,1,1.0
>> p1,lo3,8,0,4,0,5,20150901|5,1,1.0
>> p1,lo4,8,0,4,0,5,20150901|5,1,1.0
>> p1,lo1,8,0,4,0,5,20150901|5,1,1.0
>> p1,lo2,8,0,4,0,5,20150901|5,1,1.0
>> p1,lo3,8,0,4,0,5,20150901|5,1,1.0
>> p1,lo4,8,0,4,0,5,20150901|5,1,1.0
>> p1,lo1,8,0,4,0,5,20150901|5,1,1.0
>> p1,lo2,8,0,4,0,5,20150901|5,1,1.0
>> p1,lo3,8,0,4,0,5,20150901|5,1,1.0
>> p1,lo4,8,0,4,0,5,20150901|5,1,1.0
>> p1,lo1,8,0,4,0,5,20150901|5,1,1.0
>> p1,lo2,8,0,4,0,5,20150901|5,1,1.0
>> p1,lo3,8,0,4,0,5,20150901|5,1,1.0
>> p1,lo4,8,0,4,0,5,20150901|5,1,1.0
>>
>> spark-shell:
>>
>> spark-shell \
>> --num-executors 2 \
>> --driver-memory 1g \
>> --executor-memory 10g \
>> --executor-cores 8 \
>> --master yarn-client
>>
>>
>> case class Mykey(uname:String, lo:String, f1:Char, f2:Char, f3:Char,
>> f4:Char, f5:Char, f6:String)
>> case class Myvalue(count1:Long, count2:Long, num:Double)
>>
>> val myrdd = sc.textFile("/user/al733a/mydata.txt").map { case line => {
>> val spl = line.split("\\|", -1)
>> val k = spl(0).split(",")
>> val v = spl(1).split(",")
>> (Mykey(k(0), k(1), k(2)(0).toChar, k(3)(0).toChar, k(4)(0).toChar,
>> k(5)(0).toChar, k(6)(0).toChar, k(7)),
>>  Myvalue(v(0).toLong, v(1).toLong, v(2).toDouble)
>> )
>> }}
>>
>> myrdd.groupByKey().map { case (mykey, val_iterable) => (mykey, 1)
>> }.collect().foreach(println)
>>
>> (Mykey(p1,lo1,8,0,4,0,5,20150901),1)
>>
>> (Mykey(p1,lo1,8,0,4,0,5,20150901),1)
>> (Mykey(p1,lo3,8,0,4,0,5,20150901),1)
>> (Mykey(p1,lo3,8,0,4,0,5,20150901),1)
>> (Mykey(p1,lo4,8,0,4,0,5,20150901),1)
>> (Mykey(p1,lo4,8,0,4,0,5,20150901),1)
>> (Mykey(p1,lo2,8,0,4,0,5,20150901),1)
>> (Mykey(p1,lo2,8,0,4,0,5,20150901),1)
>>
>>
>>
>> You can see that each key is repeated 2 times but each key should only
>> appear once.
>>
>> Arun
>>
>> On Mon, Jan 4, 2016 at 4:07 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> Can you give a bit more information ?
>>>
>>> Release of Spark you're using
>>> Minimal dataset that shows the problem
>>>
>>> Cheers
>>>
>>> On Mon, Jan 4, 2016 at 3:55 PM, Arun Luthra <arun.lut...@gmail.com>
>>> wrote:
>>>
>>>> I tried groupByKey and noticed that it did not group all values into
>>>> the same group.
>>>>
>>>> In my test dataset (a Pair rdd) I have 16 records, where there are only
>>>> 4 distinct keys, so I expected there to be 4 records in the groupByKey
>>>> object, but instead there were 8. Each of the 4 distinct keys appear 2
>>>> times.
>>>>
>>>> Is this the expected behavior? I need to be able to get ALL values
>>>> associated with each key grouped into a SINGLE record. Is it possible?
>>>>
>>>> Arun
>>>>
>>>> p.s. reducebykey will not be sufficient for me
>>>>
>>>
>>>
>>


Re: groupByKey does not work?

2016-01-04 Thread Daniel Imberman
That's interesting.

I would try

case class Mykey(uname:String)
case class Mykey(uname:String, c1:Char)
case class Mykey(uname:String, lo:String, f1:Char, f2:Char, f3:Char,
f4:Char, f5:Char, f6:String)

In that order. It seems like there is some issue with equality between keys.

On Mon, Jan 4, 2016 at 5:05 PM Arun Luthra <arun.lut...@gmail.com> wrote:

> If I simplify the key to String column with values lo1, lo2, lo3, lo4, it
> works correctly.
>
> On Mon, Jan 4, 2016 at 4:49 PM, Daniel Imberman <daniel.imber...@gmail.com
> > wrote:
>
>> Could you try simplifying the key and seeing if that makes any
>> difference? Make it just a string or an int so we can count out any issues
>> in object equality.
>>
>> On Mon, Jan 4, 2016 at 4:42 PM Arun Luthra <arun.lut...@gmail.com> wrote:
>>
>>> Spark 1.5.0
>>>
>>> data:
>>>
>>> p1,lo1,8,0,4,0,5,20150901|5,1,1.0
>>> p1,lo2,8,0,4,0,5,20150901|5,1,1.0
>>> p1,lo3,8,0,4,0,5,20150901|5,1,1.0
>>> p1,lo4,8,0,4,0,5,20150901|5,1,1.0
>>> p1,lo1,8,0,4,0,5,20150901|5,1,1.0
>>> p1,lo2,8,0,4,0,5,20150901|5,1,1.0
>>> p1,lo3,8,0,4,0,5,20150901|5,1,1.0
>>> p1,lo4,8,0,4,0,5,20150901|5,1,1.0
>>> p1,lo1,8,0,4,0,5,20150901|5,1,1.0
>>> p1,lo2,8,0,4,0,5,20150901|5,1,1.0
>>> p1,lo3,8,0,4,0,5,20150901|5,1,1.0
>>> p1,lo4,8,0,4,0,5,20150901|5,1,1.0
>>> p1,lo1,8,0,4,0,5,20150901|5,1,1.0
>>> p1,lo2,8,0,4,0,5,20150901|5,1,1.0
>>> p1,lo3,8,0,4,0,5,20150901|5,1,1.0
>>> p1,lo4,8,0,4,0,5,20150901|5,1,1.0
>>>
>>> spark-shell:
>>>
>>> spark-shell \
>>> --num-executors 2 \
>>> --driver-memory 1g \
>>> --executor-memory 10g \
>>> --executor-cores 8 \
>>> --master yarn-client
>>>
>>>
>>> case class Mykey(uname:String, lo:String, f1:Char, f2:Char, f3:Char,
>>> f4:Char, f5:Char, f6:String)
>>> case class Myvalue(count1:Long, count2:Long, num:Double)
>>>
>>> val myrdd = sc.textFile("/user/al733a/mydata.txt").map { case line => {
>>> val spl = line.split("\\|", -1)
>>> val k = spl(0).split(",")
>>> val v = spl(1).split(",")
>>> (Mykey(k(0), k(1), k(2)(0).toChar, k(3)(0).toChar, k(4)(0).toChar,
>>> k(5)(0).toChar, k(6)(0).toChar, k(7)),
>>>  Myvalue(v(0).toLong, v(1).toLong, v(2).toDouble)
>>> )
>>> }}
>>>
>>> myrdd.groupByKey().map { case (mykey, val_iterable) => (mykey, 1)
>>> }.collect().foreach(println)
>>>
>>> (Mykey(p1,lo1,8,0,4,0,5,20150901),1)
>>>
>>> (Mykey(p1,lo1,8,0,4,0,5,20150901),1)
>>> (Mykey(p1,lo3,8,0,4,0,5,20150901),1)
>>> (Mykey(p1,lo3,8,0,4,0,5,20150901),1)
>>> (Mykey(p1,lo4,8,0,4,0,5,20150901),1)
>>> (Mykey(p1,lo4,8,0,4,0,5,20150901),1)
>>> (Mykey(p1,lo2,8,0,4,0,5,20150901),1)
>>> (Mykey(p1,lo2,8,0,4,0,5,20150901),1)
>>>
>>>
>>>
>>> You can see that each key is repeated 2 times but each key should only
>>> appear once.
>>>
>>> Arun
>>>
>>> On Mon, Jan 4, 2016 at 4:07 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>
>>>> Can you give a bit more information ?
>>>>
>>>> Release of Spark you're using
>>>> Minimal dataset that shows the problem
>>>>
>>>> Cheers
>>>>
>>>> On Mon, Jan 4, 2016 at 3:55 PM, Arun Luthra <arun.lut...@gmail.com>
>>>> wrote:
>>>>
>>>>> I tried groupByKey and noticed that it did not group all values into
>>>>> the same group.
>>>>>
>>>>> In my test dataset (a Pair rdd) I have 16 records, where there are
>>>>> only 4 distinct keys, so I expected there to be 4 records in the 
>>>>> groupByKey
>>>>> object, but instead there were 8. Each of the 4 distinct keys appear 2
>>>>> times.
>>>>>
>>>>> Is this the expected behavior? I need to be able to get ALL values
>>>>> associated with each key grouped into a SINGLE record. Is it possible?
>>>>>
>>>>> Arun
>>>>>
>>>>> p.s. reducebykey will not be sufficient for me
>>>>>
>>>>
>>>>
>>>
>


groupByKey()

2015-12-08 Thread Yasemin Kaya
Hi,

Sorry for the long inputs but it is my situation.

i have two list and i wana grupbykey them but some value of list disapear.
i can't understand this.

(8867989628612931721,[1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])

(8867989628612931721,[1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,* 1*, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])

result of groupbykey
(8867989628612931721,[[1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0

Re: pyspark groupbykey throwing error: unpack requires a string argument of length 4

2015-10-19 Thread Jeff Zhang
Stacktrace would be helpful if you can provide that.



On Mon, Oct 19, 2015 at 1:42 PM, fahad shah <sfaha...@gmail.com> wrote:

>  Hi
>
> I am trying to do pair rdd's, group by the key assign id based on key.
> I am using Pyspark with spark 1.3, for some reason, I am getting this
> error that I am unable to figure out - any help much appreciated.
>
> Things I tried (but to no effect),
>
> 1. make sure I am not doing any conversions on the strings
> 2. make sure that the fields used in the key are all there  and not
> empty string (or else I toss the row out)
>
> My code is along following lines (split is using stringio to parse
> csv, header removes the header row and parse_train is putting the 54
> fields into named tuple after whitespace/quote removal):
>
> #Error for string argument is thrown on the BB.take(1) where the
> groupbykey is evaluated
>
> A = sc.textFile("train.csv").filter(lambda x:not
> isHeader(x)).map(split).map(parse_train).filter(lambda x: not x is
> None)
>
> A.count()
>
> B = A.map(lambda k:
>
> ((k.srch_destination_id,k.srch_length_of_stay,k.srch_booking_window,k.srch_adults_count,
>  k.srch_children_count,k.srch_room_count),
> (k[0:54])))
> BB = B.groupByKey()
> BB.take(1)
>
>
> best fahad
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards

Jeff Zhang


Re: pyspark groupbykey throwing error: unpack requires a string argument of length 4

2015-10-19 Thread fahad shah
ortStage(DAGScheduler.scala:1192)
 at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
 at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
 at scala.Option.foreach(Option.scala:236)
 at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
 at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

On Sun, Oct 18, 2015 at 11:17 PM, Jeff Zhang <zjf...@gmail.com> wrote:
> Stacktrace would be helpful if you can provide that.
>
>
>
> On Mon, Oct 19, 2015 at 1:42 PM, fahad shah <sfaha...@gmail.com> wrote:
>>
>>  Hi
>>
>> I am trying to do pair rdd's, group by the key assign id based on key.
>> I am using Pyspark with spark 1.3, for some reason, I am getting this
>> error that I am unable to figure out - any help much appreciated.
>>
>> Things I tried (but to no effect),
>>
>> 1. make sure I am not doing any conversions on the strings
>> 2. make sure that the fields used in the key are all there  and not
>> empty string (or else I toss the row out)
>>
>> My code is along following lines (split is using stringio to parse
>> csv, header removes the header row and parse_train is putting the 54
>> fields into named tuple after whitespace/quote removal):
>>
>> #Error for string argument is thrown on the BB.take(1) where the
>> groupbykey is evaluated
>>
>> A = sc.textFile("train.csv").filter(lambda x:not
>> isHeader(x)).map(split).map(parse_train).filter(lambda x: not x is
>> None)
>>
>> A.count()
>>
>> B = A.map(lambda k:
>>
>> ((k.srch_destination_id,k.srch_length_of_stay,k.srch_booking_window,k.srch_adults_count,
>>  k.srch_children_count,k.srch_room_count),
>> (k[0:54])))
>> BB = B.groupByKey()
>> BB.take(1)
>>
>>
>> best fahad
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>
>
>
> --
> Best Regards
>
> Jeff Zhang

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: pyspark groupbykey throwing error: unpack requires a string argument of length 4

2015-10-19 Thread fahad shah
Thanks Davies, sure, I can share the code/data in pm - best fahad

On Mon, Oct 19, 2015 at 10:52 AM, Davies Liu <dav...@databricks.com> wrote:
> Could you simplify the code a little bit so we can reproduce the failure?
> (may also have some sample dataset if it depends on them)
>
> On Sun, Oct 18, 2015 at 10:42 PM, fahad shah <sfaha...@gmail.com> wrote:
>>  Hi
>>
>> I am trying to do pair rdd's, group by the key assign id based on key.
>> I am using Pyspark with spark 1.3, for some reason, I am getting this
>> error that I am unable to figure out - any help much appreciated.
>>
>> Things I tried (but to no effect),
>>
>> 1. make sure I am not doing any conversions on the strings
>> 2. make sure that the fields used in the key are all there  and not
>> empty string (or else I toss the row out)
>>
>> My code is along following lines (split is using stringio to parse
>> csv, header removes the header row and parse_train is putting the 54
>> fields into named tuple after whitespace/quote removal):
>>
>> #Error for string argument is thrown on the BB.take(1) where the
>> groupbykey is evaluated
>>
>> A = sc.textFile("train.csv").filter(lambda x:not
>> isHeader(x)).map(split).map(parse_train).filter(lambda x: not x is
>> None)
>>
>> A.count()
>>
>> B = A.map(lambda k:
>> ((k.srch_destination_id,k.srch_length_of_stay,k.srch_booking_window,k.srch_adults_count,
>>  k.srch_children_count,k.srch_room_count), 
>> (k[0:54])))
>> BB = B.groupByKey()
>> BB.take(1)
>>
>>
>> best fahad
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: pyspark groupbykey throwing error: unpack requires a string argument of length 4

2015-10-19 Thread Davies Liu
Could you simplify the code a little bit so we can reproduce the failure?
(may also have some sample dataset if it depends on them)

On Sun, Oct 18, 2015 at 10:42 PM, fahad shah <sfaha...@gmail.com> wrote:
>  Hi
>
> I am trying to do pair rdd's, group by the key assign id based on key.
> I am using Pyspark with spark 1.3, for some reason, I am getting this
> error that I am unable to figure out - any help much appreciated.
>
> Things I tried (but to no effect),
>
> 1. make sure I am not doing any conversions on the strings
> 2. make sure that the fields used in the key are all there  and not
> empty string (or else I toss the row out)
>
> My code is along following lines (split is using stringio to parse
> csv, header removes the header row and parse_train is putting the 54
> fields into named tuple after whitespace/quote removal):
>
> #Error for string argument is thrown on the BB.take(1) where the
> groupbykey is evaluated
>
> A = sc.textFile("train.csv").filter(lambda x:not
> isHeader(x)).map(split).map(parse_train).filter(lambda x: not x is
> None)
>
> A.count()
>
> B = A.map(lambda k:
> ((k.srch_destination_id,k.srch_length_of_stay,k.srch_booking_window,k.srch_adults_count,
>  k.srch_children_count,k.srch_room_count), (k[0:54])))
> BB = B.groupByKey()
> BB.take(1)
>
>
> best fahad
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



pyspark groupbykey throwing error: unpack requires a string argument of length 4

2015-10-18 Thread fahad shah
 Hi

I am trying to do pair rdd's, group by the key assign id based on key.
I am using Pyspark with spark 1.3, for some reason, I am getting this
error that I am unable to figure out - any help much appreciated.

Things I tried (but to no effect),

1. make sure I am not doing any conversions on the strings
2. make sure that the fields used in the key are all there  and not
empty string (or else I toss the row out)

My code is along following lines (split is using stringio to parse
csv, header removes the header row and parse_train is putting the 54
fields into named tuple after whitespace/quote removal):

#Error for string argument is thrown on the BB.take(1) where the
groupbykey is evaluated

A = sc.textFile("train.csv").filter(lambda x:not
isHeader(x)).map(split).map(parse_train).filter(lambda x: not x is
None)

A.count()

B = A.map(lambda k:
((k.srch_destination_id,k.srch_length_of_stay,k.srch_booking_window,k.srch_adults_count,
 k.srch_children_count,k.srch_room_count), (k[0:54])))
BB = B.groupByKey()
BB.take(1)


best fahad

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Different partition number of GroupByKey leads different result

2015-10-09 Thread Devin Huang
Hi everyone,

 I got a trouble these days,and I don't know whether it is a bug of
spark.When I use  GroupByKey for our sequenceFile Data,I find that different
partition number lead different result, so as ReduceByKey. I think the
problem happens on the shuffle stage.I read the source code,  but still
can't find the answer.


this is the main code:

val rdd = sc.sequenceFile[UserWritable, TagsWritable](input,
classOf[UserWritable], classOf[TagsWritable])
val combinedRdd = rdd.map(s => (s._1.getuserid(),
s._2)).groupByKey(num).filter(_._1 == uid)

num is the number of partition and uid is a filter id for result
comparision.
TagsWritable implements WritableComparable and Serializable.

I used GroupByKey on text file, the result was right. 

Thanks,
Devin Huang




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Different-partition-number-of-GroupByKey-leads-different-result-tp24989.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Different partition number of GroupByKey leads different result

2015-10-09 Thread Devin Huang
Forgive me for not understanding what you mean.The sequence file key is 
UserWritable,and Value is TagsWritable.Both of them implement 
WritableComparable and Serializable and rewrite the clone().
The key of string is collected from UserWritable through a map transformation.

Have you ever read the spark source code?Which step can lead to data 
dislocation?

> 在 2015年10月9日,17:37,Sean Owen <so...@cloudera.com> 写道:
> 
> Another guess, since you say the key is String (offline): you are not
> cloning the value of TagsWritable. Hadoop reuses the object under the
> hood, and so is changing your object value. You can't save references
> to the object you get from reading a SequenceFile.
> 
> On Fri, Oct 9, 2015 at 10:22 AM, Sean Owen <so...@cloudera.com> wrote:
>> First guess: your key class does not implement hashCode/equals
>> 
>> On Fri, Oct 9, 2015 at 10:05 AM, Devin Huang <hos...@163.com> wrote:
>>> Hi everyone,
>>> 
>>> I got a trouble these days,and I don't know whether it is a bug of
>>> spark.When I use  GroupByKey for our sequenceFile Data,I find that different
>>> partition number lead different result, so as ReduceByKey. I think the
>>> problem happens on the shuffle stage.I read the source code,  but still
>>> can't find the answer.
>>> 
>>> 
>>> this is the main code:
>>> 
>>> val rdd = sc.sequenceFile[UserWritable, TagsWritable](input,
>>> classOf[UserWritable], classOf[TagsWritable])
>>> val combinedRdd = rdd.map(s => (s._1.getuserid(),
>>> s._2)).groupByKey(num).filter(_._1 == uid)
>>> 
>>> num is the number of partition and uid is a filter id for result
>>> comparision.
>>> TagsWritable implements WritableComparable and Serializable.
>>> 
>>> I used GroupByKey on text file, the result was right.
>>> 
>>> Thanks,
>>> Devin Huang
>>> 
>>> 
>>> 
>>> 
>>> --
>>> View this message in context: 
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Different-partition-number-of-GroupByKey-leads-different-result-tp24989.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>> 
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>> 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Different partition number of GroupByKey leads different result

2015-10-09 Thread Sean Owen
If you are not copying or cloning the value (TagsWritable) object,
then that is likely the problem. The value is not immutable and is
changed by the InputFormat code reading the file, because it is
reused.

On Fri, Oct 9, 2015 at 11:04 AM, Devin Huang <hos...@163.com> wrote:
> Forgive me for not understanding what you mean.The sequence file key is 
> UserWritable,and Value is TagsWritable.Both of them implement 
> WritableComparable and Serializable and rewrite the clone().
> The key of string is collected from UserWritable through a map transformation.
>
> Have you ever read the spark source code?Which step can lead to data 
> dislocation?
>
>> 在 2015年10月9日,17:37,Sean Owen <so...@cloudera.com> 写道:
>>
>> Another guess, since you say the key is String (offline): you are not
>> cloning the value of TagsWritable. Hadoop reuses the object under the
>> hood, and so is changing your object value. You can't save references
>> to the object you get from reading a SequenceFile.
>>
>> On Fri, Oct 9, 2015 at 10:22 AM, Sean Owen <so...@cloudera.com> wrote:
>>> First guess: your key class does not implement hashCode/equals
>>>
>>> On Fri, Oct 9, 2015 at 10:05 AM, Devin Huang <hos...@163.com> wrote:
>>>> Hi everyone,
>>>>
>>>> I got a trouble these days,and I don't know whether it is a bug of
>>>> spark.When I use  GroupByKey for our sequenceFile Data,I find that 
>>>> different
>>>> partition number lead different result, so as ReduceByKey. I think the
>>>> problem happens on the shuffle stage.I read the source code,  but still
>>>> can't find the answer.
>>>>
>>>>
>>>> this is the main code:
>>>>
>>>> val rdd = sc.sequenceFile[UserWritable, TagsWritable](input,
>>>> classOf[UserWritable], classOf[TagsWritable])
>>>> val combinedRdd = rdd.map(s => (s._1.getuserid(),
>>>> s._2)).groupByKey(num).filter(_._1 == uid)
>>>>
>>>> num is the number of partition and uid is a filter id for result
>>>> comparision.
>>>> TagsWritable implements WritableComparable and Serializable.
>>>>
>>>> I used GroupByKey on text file, the result was right.
>>>>
>>>> Thanks,
>>>> Devin Huang
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context: 
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Different-partition-number-of-GroupByKey-leads-different-result-tp24989.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Different partition number of GroupByKey leads different result

2015-10-09 Thread Devin Huang
Let me add.

The problem is that GroupByKey cannot divide our sequence data into groups
correctly ,and produce wrong key/value .The shuffle stage might not be
execute correctly.And I don’t know what leads this.


The type of key is String, and the type of value is TagsWritable.

I take out one user’s data for example.

when the partition number is 300, the value of this user is
270102,1.00;130098967f,1.00;270027,1.00;270001,1.00.
when the partition number is 100, the value of this user is
282133,1.00;150098921f,1.00;

I guess the wrong value is the other user’s value.The data may be mismatched
on the shuffle stage.







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Different-partition-number-of-GroupByKey-leads-different-result-tp24989p24990.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Different partition number of GroupByKey leads different result

2015-10-09 Thread Sean Owen
Another guess, since you say the key is String (offline): you are not
cloning the value of TagsWritable. Hadoop reuses the object under the
hood, and so is changing your object value. You can't save references
to the object you get from reading a SequenceFile.

On Fri, Oct 9, 2015 at 10:22 AM, Sean Owen <so...@cloudera.com> wrote:
> First guess: your key class does not implement hashCode/equals
>
> On Fri, Oct 9, 2015 at 10:05 AM, Devin Huang <hos...@163.com> wrote:
>> Hi everyone,
>>
>>  I got a trouble these days,and I don't know whether it is a bug of
>> spark.When I use  GroupByKey for our sequenceFile Data,I find that different
>> partition number lead different result, so as ReduceByKey. I think the
>> problem happens on the shuffle stage.I read the source code,  but still
>> can't find the answer.
>>
>>
>> this is the main code:
>>
>> val rdd = sc.sequenceFile[UserWritable, TagsWritable](input,
>> classOf[UserWritable], classOf[TagsWritable])
>> val combinedRdd = rdd.map(s => (s._1.getuserid(),
>> s._2)).groupByKey(num).filter(_._1 == uid)
>>
>> num is the number of partition and uid is a filter id for result
>> comparision.
>> TagsWritable implements WritableComparable and Serializable.
>>
>> I used GroupByKey on text file, the result was right.
>>
>> Thanks,
>> Devin Huang
>>
>>
>>
>>
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/Different-partition-number-of-GroupByKey-leads-different-result-tp24989.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Different partition number of GroupByKey leads different result

2015-10-09 Thread Sean Owen
First guess: your key class does not implement hashCode/equals

On Fri, Oct 9, 2015 at 10:05 AM, Devin Huang <hos...@163.com> wrote:
> Hi everyone,
>
>  I got a trouble these days,and I don't know whether it is a bug of
> spark.When I use  GroupByKey for our sequenceFile Data,I find that different
> partition number lead different result, so as ReduceByKey. I think the
> problem happens on the shuffle stage.I read the source code,  but still
> can't find the answer.
>
>
> this is the main code:
>
> val rdd = sc.sequenceFile[UserWritable, TagsWritable](input,
> classOf[UserWritable], classOf[TagsWritable])
> val combinedRdd = rdd.map(s => (s._1.getuserid(),
> s._2)).groupByKey(num).filter(_._1 == uid)
>
> num is the number of partition and uid is a filter id for result
> comparision.
> TagsWritable implements WritableComparable and Serializable.
>
> I used GroupByKey on text file, the result was right.
>
> Thanks,
> Devin Huang
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Different-partition-number-of-GroupByKey-leads-different-result-tp24989.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: how to handle OOMError from groupByKey

2015-09-28 Thread Alexis Gillain
"Note: As currently implemented, groupByKey must be able to hold all the
key-value pairs for any key in memory. If a key has too many values, it can
result in an [[OutOfMemoryError]]."

Obvioulsy one of your key value pair is two large. You can try to increase
spark.shuffle.memoryFraction.

Are you sure you can't :
partition your data by user/time-interval => process with a mapPartition =>
partition by user => process with a mapPartition
Not efficient but if your operation decrease the amount of data per user it
may work.


2015-09-29 0:17 GMT+08:00 Fabien Martin <fabien.marti...@gmail.com>:

> You can try to reduce the number of containers in order to increase their
> memory.
>
> 2015-09-28 9:35 GMT+02:00 Akhil Das <ak...@sigmoidanalytics.com>:
>
>> You can try to increase the number of partitions to get ride of the OOM
>> errors. Also try to use reduceByKey instead of groupByKey.
>>
>> Thanks
>> Best Regards
>>
>> On Sat, Sep 26, 2015 at 1:05 AM, Elango Cheran <elango.che...@gmail.com>
>> wrote:
>>
>>> Hi everyone,
>>> I have an RDD of the format (user: String, timestamp: Long, state:
>>> Boolean).  My task invovles converting the states, where on/off is
>>> represented as true/false, into intervals of 'on' of the format (beginTs:
>>> Long, endTs: Long).  So this task requires me, per user, to line up all of
>>> the on/off states so that I can compute when it is on, since the
>>> calculation is neither associative nor commutative.
>>>
>>> So there are 2 main operations that I'm trying to accomplish together:
>>> 1. group by each user
>>> 2. sort by time -- keep all of the states in sorted order by time
>>>
>>> The main code inside the method that does grouping by user and sorting
>>> by time looks sort of looks like this:
>>>
>>>
>>> // RDD starts off in format (user, ts, state) of type RDD[(String, Long,
>>> Boolean)]
>>> val grouped = keyedStatesRDD.groupByKey
>>> // after .groupByKey, format for RDD is (user, seq-of-(ts, state)) of
>>> type RDD[(String, Iterable(Long, Boolean))]
>>> // take the sequence of (ts, state) per user, sort, get intervals
>>> val groupedIntervals = grouped.mapValues(
>>>   states => {
>>> val sortedStates = states.toSeq.sortBy(_._1)
>>> val intervals = DFUtil.statesToIntervals(sortedStates)
>>> val intervalsList = bucketDurations.map{case(k,v) =>
>>> (k,v)}(collection.breakOut).sortBy(_._1)
>>> intervalsList
>>>   }
>>> )
>>> // after .mapValues, new format for RDD is (user, seq-of-(startTime,
>>> endTime)) of type RDD[(String, IndexedSeq(Long, Long))]
>>>
>>>
>>> When I run my Spark job with 1 day's worth of data, the job completes
>>> successfully.  When I run with 1 month's or 1 year's worth of data, that
>>> method is where my Spark job consistently crashes with get
>>> OutOfMemoryErrors.  I need to run on the full year's worth of data.
>>>
>>> My suspicion is that the groupByKey is the problem (it's pulling all of
>>> the matching data values into a single executor's heap as a plain Scala
>>> Iterable).  But alternatives of doing sortByKey on the RDD first before
>>> grouping, or sortByKey followed by a fold[ByKey] or aggregate[ByKey] don't
>>> quite apply in my scenario because my operation is not associative (can't
>>> combine per-partition results) and I still need to group by users before
>>> doing a foldLeft.
>>>
>>> I've definitely thought about the issue before and come across users
>>> with issues that are similar but not exactly the same:
>>>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Folding-an-RDD-in-order-td16577.html
>>>
>>> http://mail-archives.us.apache.org/mod_mbox/spark-user/201501.mbox/%3ccaae1cqr8rd8ypebcmbjwfhm+lxh6nw4+r+uharx00psk_sh...@mail.gmail.com%3E
>>>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Partition-sorting-by-Spark-framework-td18213.html
>>>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Alternatives-to-groupByKey-td20293.html
>>>
>>> And this Jira seems relevant too:
>>> https://issues.apache.org/jira/browse/SPARK-3655
>>>
>>> The amount of memory that I'm using is 2g per executor, and I can't go
>>> higher than that because each executor gets a YARN container from nodes
>>> with 16 GB of RAM and 5 YARN containers allowed per node.
>>>
>>> So I'd like to know if there's an easy solution to executing my logic on
>>> my full dataset in Spark.
>>>
>>> Thanks!
>>>
>>> -- Elango
>>>
>>
>>
>


-- 
Alexis GILLAIN


Re: how to handle OOMError from groupByKey

2015-09-28 Thread Fabien Martin
You can try to reduce the number of containers in order to increase their
memory.

2015-09-28 9:35 GMT+02:00 Akhil Das <ak...@sigmoidanalytics.com>:

> You can try to increase the number of partitions to get ride of the OOM
> errors. Also try to use reduceByKey instead of groupByKey.
>
> Thanks
> Best Regards
>
> On Sat, Sep 26, 2015 at 1:05 AM, Elango Cheran <elango.che...@gmail.com>
> wrote:
>
>> Hi everyone,
>> I have an RDD of the format (user: String, timestamp: Long, state:
>> Boolean).  My task invovles converting the states, where on/off is
>> represented as true/false, into intervals of 'on' of the format (beginTs:
>> Long, endTs: Long).  So this task requires me, per user, to line up all of
>> the on/off states so that I can compute when it is on, since the
>> calculation is neither associative nor commutative.
>>
>> So there are 2 main operations that I'm trying to accomplish together:
>> 1. group by each user
>> 2. sort by time -- keep all of the states in sorted order by time
>>
>> The main code inside the method that does grouping by user and sorting by
>> time looks sort of looks like this:
>>
>>
>> // RDD starts off in format (user, ts, state) of type RDD[(String, Long,
>> Boolean)]
>> val grouped = keyedStatesRDD.groupByKey
>> // after .groupByKey, format for RDD is (user, seq-of-(ts, state)) of
>> type RDD[(String, Iterable(Long, Boolean))]
>> // take the sequence of (ts, state) per user, sort, get intervals
>> val groupedIntervals = grouped.mapValues(
>>   states => {
>> val sortedStates = states.toSeq.sortBy(_._1)
>> val intervals = DFUtil.statesToIntervals(sortedStates)
>> val intervalsList = bucketDurations.map{case(k,v) =>
>> (k,v)}(collection.breakOut).sortBy(_._1)
>> intervalsList
>>   }
>> )
>> // after .mapValues, new format for RDD is (user, seq-of-(startTime,
>> endTime)) of type RDD[(String, IndexedSeq(Long, Long))]
>>
>>
>> When I run my Spark job with 1 day's worth of data, the job completes
>> successfully.  When I run with 1 month's or 1 year's worth of data, that
>> method is where my Spark job consistently crashes with get
>> OutOfMemoryErrors.  I need to run on the full year's worth of data.
>>
>> My suspicion is that the groupByKey is the problem (it's pulling all of
>> the matching data values into a single executor's heap as a plain Scala
>> Iterable).  But alternatives of doing sortByKey on the RDD first before
>> grouping, or sortByKey followed by a fold[ByKey] or aggregate[ByKey] don't
>> quite apply in my scenario because my operation is not associative (can't
>> combine per-partition results) and I still need to group by users before
>> doing a foldLeft.
>>
>> I've definitely thought about the issue before and come across users with
>> issues that are similar but not exactly the same:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Folding-an-RDD-in-order-td16577.html
>>
>> http://mail-archives.us.apache.org/mod_mbox/spark-user/201501.mbox/%3ccaae1cqr8rd8ypebcmbjwfhm+lxh6nw4+r+uharx00psk_sh...@mail.gmail.com%3E
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Partition-sorting-by-Spark-framework-td18213.html
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Alternatives-to-groupByKey-td20293.html
>>
>> And this Jira seems relevant too:
>> https://issues.apache.org/jira/browse/SPARK-3655
>>
>> The amount of memory that I'm using is 2g per executor, and I can't go
>> higher than that because each executor gets a YARN container from nodes
>> with 16 GB of RAM and 5 YARN containers allowed per node.
>>
>> So I'd like to know if there's an easy solution to executing my logic on
>> my full dataset in Spark.
>>
>> Thanks!
>>
>> -- Elango
>>
>
>


Re: how to handle OOMError from groupByKey

2015-09-28 Thread Akhil Das
You can try to increase the number of partitions to get ride of the OOM
errors. Also try to use reduceByKey instead of groupByKey.

Thanks
Best Regards

On Sat, Sep 26, 2015 at 1:05 AM, Elango Cheran <elango.che...@gmail.com>
wrote:

> Hi everyone,
> I have an RDD of the format (user: String, timestamp: Long, state:
> Boolean).  My task invovles converting the states, where on/off is
> represented as true/false, into intervals of 'on' of the format (beginTs:
> Long, endTs: Long).  So this task requires me, per user, to line up all of
> the on/off states so that I can compute when it is on, since the
> calculation is neither associative nor commutative.
>
> So there are 2 main operations that I'm trying to accomplish together:
> 1. group by each user
> 2. sort by time -- keep all of the states in sorted order by time
>
> The main code inside the method that does grouping by user and sorting by
> time looks sort of looks like this:
>
>
> // RDD starts off in format (user, ts, state) of type RDD[(String, Long,
> Boolean)]
> val grouped = keyedStatesRDD.groupByKey
> // after .groupByKey, format for RDD is (user, seq-of-(ts, state)) of type
> RDD[(String, Iterable(Long, Boolean))]
> // take the sequence of (ts, state) per user, sort, get intervals
> val groupedIntervals = grouped.mapValues(
>   states => {
> val sortedStates = states.toSeq.sortBy(_._1)
> val intervals = DFUtil.statesToIntervals(sortedStates)
> val intervalsList = bucketDurations.map{case(k,v) =>
> (k,v)}(collection.breakOut).sortBy(_._1)
> intervalsList
>   }
> )
> // after .mapValues, new format for RDD is (user, seq-of-(startTime,
> endTime)) of type RDD[(String, IndexedSeq(Long, Long))]
>
>
> When I run my Spark job with 1 day's worth of data, the job completes
> successfully.  When I run with 1 month's or 1 year's worth of data, that
> method is where my Spark job consistently crashes with get
> OutOfMemoryErrors.  I need to run on the full year's worth of data.
>
> My suspicion is that the groupByKey is the problem (it's pulling all of
> the matching data values into a single executor's heap as a plain Scala
> Iterable).  But alternatives of doing sortByKey on the RDD first before
> grouping, or sortByKey followed by a fold[ByKey] or aggregate[ByKey] don't
> quite apply in my scenario because my operation is not associative (can't
> combine per-partition results) and I still need to group by users before
> doing a foldLeft.
>
> I've definitely thought about the issue before and come across users with
> issues that are similar but not exactly the same:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Folding-an-RDD-in-order-td16577.html
>
> http://mail-archives.us.apache.org/mod_mbox/spark-user/201501.mbox/%3ccaae1cqr8rd8ypebcmbjwfhm+lxh6nw4+r+uharx00psk_sh...@mail.gmail.com%3E
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Partition-sorting-by-Spark-framework-td18213.html
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Alternatives-to-groupByKey-td20293.html
>
> And this Jira seems relevant too:
> https://issues.apache.org/jira/browse/SPARK-3655
>
> The amount of memory that I'm using is 2g per executor, and I can't go
> higher than that because each executor gets a YARN container from nodes
> with 16 GB of RAM and 5 YARN containers allowed per node.
>
> So I'd like to know if there's an easy solution to executing my logic on
> my full dataset in Spark.
>
> Thanks!
>
> -- Elango
>


how to handle OOMError from groupByKey

2015-09-25 Thread Elango Cheran
Hi everyone,
I have an RDD of the format (user: String, timestamp: Long, state:
Boolean).  My task invovles converting the states, where on/off is
represented as true/false, into intervals of 'on' of the format (beginTs:
Long, endTs: Long).  So this task requires me, per user, to line up all of
the on/off states so that I can compute when it is on, since the
calculation is neither associative nor commutative.

So there are 2 main operations that I'm trying to accomplish together:
1. group by each user
2. sort by time -- keep all of the states in sorted order by time

The main code inside the method that does grouping by user and sorting by
time looks sort of looks like this:


// RDD starts off in format (user, ts, state) of type RDD[(String, Long,
Boolean)]
val grouped = keyedStatesRDD.groupByKey
// after .groupByKey, format for RDD is (user, seq-of-(ts, state)) of type
RDD[(String, Iterable(Long, Boolean))]
// take the sequence of (ts, state) per user, sort, get intervals
val groupedIntervals = grouped.mapValues(
  states => {
val sortedStates = states.toSeq.sortBy(_._1)
val intervals = DFUtil.statesToIntervals(sortedStates)
val intervalsList = bucketDurations.map{case(k,v) =>
(k,v)}(collection.breakOut).sortBy(_._1)
intervalsList
  }
)
// after .mapValues, new format for RDD is (user, seq-of-(startTime,
endTime)) of type RDD[(String, IndexedSeq(Long, Long))]


When I run my Spark job with 1 day's worth of data, the job completes
successfully.  When I run with 1 month's or 1 year's worth of data, that
method is where my Spark job consistently crashes with get
OutOfMemoryErrors.  I need to run on the full year's worth of data.

My suspicion is that the groupByKey is the problem (it's pulling all of the
matching data values into a single executor's heap as a plain Scala
Iterable).  But alternatives of doing sortByKey on the RDD first before
grouping, or sortByKey followed by a fold[ByKey] or aggregate[ByKey] don't
quite apply in my scenario because my operation is not associative (can't
combine per-partition results) and I still need to group by users before
doing a foldLeft.

I've definitely thought about the issue before and come across users with
issues that are similar but not exactly the same:
http://apache-spark-user-list.1001560.n3.nabble.com/Folding-an-RDD-in-order-td16577.html
http://mail-archives.us.apache.org/mod_mbox/spark-user/201501.mbox/%3ccaae1cqr8rd8ypebcmbjwfhm+lxh6nw4+r+uharx00psk_sh...@mail.gmail.com%3E
http://apache-spark-user-list.1001560.n3.nabble.com/Partition-sorting-by-Spark-framework-td18213.html
http://apache-spark-user-list.1001560.n3.nabble.com/Alternatives-to-groupByKey-td20293.html

And this Jira seems relevant too:
https://issues.apache.org/jira/browse/SPARK-3655

The amount of memory that I'm using is 2g per executor, and I can't go
higher than that because each executor gets a YARN container from nodes
with 16 GB of RAM and 5 YARN containers allowed per node.

So I'd like to know if there's an easy solution to executing my logic on my
full dataset in Spark.

Thanks!

-- Elango


Re: Transformation not happening for reduceByKey or GroupByKey

2015-08-24 Thread satish chandra j
HI All,

Please find fix info for users who are following the mail chain of this
issue and the respective solution below:

*reduceByKey: Non working snippet*

import org.apache.spark.Context
import org.apache.spark.Context._
import org.apache.spark.SparkConf
val conf = new SparkConf()
val sc = new SparkContext(conf)

val DataRDD =  SC.makeRDD(Seq((0,1),(0,2),(1,2),(1,3),(2,4)))
DataRDD.reduceByKey(_+_).collect

Result: Array() is empty

*reduceByKey: Working snippet*

import org.apache.spark.Context
import org.apache.spark.Context._
import org.apache.spark.SparkConf
val conf = new SparkConf()
val sc = new
SparkContext(conf).set(spark.driver.allowMultipleContexts,true)

val DataRDD =  SC.makeRDD(Seq((0,1),(0,2),(1,2),(1,3),(2,4)))
DataRDD.reduceByKey(_+_).collect

Result: Array((0,3),(1,5),(2,4))

Regards,
Satish Chandra


On Sat, Aug 22, 2015 at 11:27 AM, satish chandra j jsatishchan...@gmail.com
 wrote:

 HI All,
 Currently using DSE 4.7 and Spark 1.2.2 version

 Regards,
 Satish

 On Fri, Aug 21, 2015 at 7:30 PM, java8964 java8...@hotmail.com wrote:

 What version of Spark you are using, or comes with DSE 4.7?

 We just cannot reproduce it in Spark.

 yzhang@localhost$ more test.spark
 val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))
 pairs.reduceByKey((x,y) = x + y).collect
 yzhang@localhost$ ~/spark/bin/spark-shell --master local -i test.spark
 Welcome to
     __
  / __/__  ___ _/ /__
 _\ \/ _ \/ _ `/ __/  '_/
/___/ .__/\_,_/_/ /_/\_\   version 1.3.1
   /_/

 Using Scala version 2.10.4
 Spark context available as sc.
 SQL context available as sqlContext.
 Loading test.spark...
 pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at
 makeRDD at console:21
 15/08/21 09:58:51 WARN SizeEstimator: Failed to check whether
 UseCompressedOops is set; assuming yes
 res0: Array[(Int, Int)] = Array((0,3), (1,50), (2,40))

 Yong


 --
 Date: Fri, 21 Aug 2015 19:24:09 +0530
 Subject: Re: Transformation not happening for reduceByKey or GroupByKey
 From: jsatishchan...@gmail.com
 To: abhis...@tetrationanalytics.com
 CC: user@spark.apache.org


 HI Abhishek,

 I have even tried that but rdd2 is empty

 Regards,
 Satish

 On Fri, Aug 21, 2015 at 6:47 PM, Abhishek R. Singh 
 abhis...@tetrationanalytics.com wrote:

 You had:

  RDD.reduceByKey((x,y) = x+y)
  RDD.take(3)

 Maybe try:

  rdd2 = RDD.reduceByKey((x,y) = x+y)
  rdd2.take(3)

 -Abhishek-

 On Aug 20, 2015, at 3:05 AM, satish chandra j jsatishchan...@gmail.com
 wrote:

  HI All,
  I have data in RDD as mentioned below:
 
  RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40))
 
 
  I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function
 on Values for each key
 
  Code:
  RDD.reduceByKey((x,y) = x+y)
  RDD.take(3)
 
  Result in console:
  RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey
 at console:73
  res:Array[(Int,Int)] = Array()
 
  Command as mentioned
 
  dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile
 
 
  Please let me know what is missing in my code, as my resultant Array is
 empty
 
 
 
  Regards,
  Satish
 






Re: Transformation not happening for reduceByKey or GroupByKey

2015-08-22 Thread satish chandra j
HI All,
Currently using DSE 4.7 and Spark 1.2.2 version

Regards,
Satish

On Fri, Aug 21, 2015 at 7:30 PM, java8964 java8...@hotmail.com wrote:

 What version of Spark you are using, or comes with DSE 4.7?

 We just cannot reproduce it in Spark.

 yzhang@localhost$ more test.spark
 val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))
 pairs.reduceByKey((x,y) = x + y).collect
 yzhang@localhost$ ~/spark/bin/spark-shell --master local -i test.spark
 Welcome to
     __
  / __/__  ___ _/ /__
 _\ \/ _ \/ _ `/ __/  '_/
/___/ .__/\_,_/_/ /_/\_\   version 1.3.1
   /_/

 Using Scala version 2.10.4
 Spark context available as sc.
 SQL context available as sqlContext.
 Loading test.spark...
 pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at
 makeRDD at console:21
 15/08/21 09:58:51 WARN SizeEstimator: Failed to check whether
 UseCompressedOops is set; assuming yes
 res0: Array[(Int, Int)] = Array((0,3), (1,50), (2,40))

 Yong


 --
 Date: Fri, 21 Aug 2015 19:24:09 +0530
 Subject: Re: Transformation not happening for reduceByKey or GroupByKey
 From: jsatishchan...@gmail.com
 To: abhis...@tetrationanalytics.com
 CC: user@spark.apache.org


 HI Abhishek,

 I have even tried that but rdd2 is empty

 Regards,
 Satish

 On Fri, Aug 21, 2015 at 6:47 PM, Abhishek R. Singh 
 abhis...@tetrationanalytics.com wrote:

 You had:

  RDD.reduceByKey((x,y) = x+y)
  RDD.take(3)

 Maybe try:

  rdd2 = RDD.reduceByKey((x,y) = x+y)
  rdd2.take(3)

 -Abhishek-

 On Aug 20, 2015, at 3:05 AM, satish chandra j jsatishchan...@gmail.com
 wrote:

  HI All,
  I have data in RDD as mentioned below:
 
  RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40))
 
 
  I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function
 on Values for each key
 
  Code:
  RDD.reduceByKey((x,y) = x+y)
  RDD.take(3)
 
  Result in console:
  RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey
 at console:73
  res:Array[(Int,Int)] = Array()
 
  Command as mentioned
 
  dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile
 
 
  Please let me know what is missing in my code, as my resultant Array is
 empty
 
 
 
  Regards,
  Satish
 





RE: Transformation not happening for reduceByKey or GroupByKey

2015-08-21 Thread java8964
I believe spark-shell -i scriptFile is there. We also use it, at least in 
Spark 1.3.1.
dse spark will just wrap spark-shell command, underline it is just invoking 
spark-shell.
I don't know too much about the original problem though.
Yong
Date: Fri, 21 Aug 2015 18:19:49 +0800
Subject: Re: Transformation not happening for reduceByKey or GroupByKey
From: zjf...@gmail.com
To: jsatishchan...@gmail.com
CC: robin.e...@xense.co.uk; user@spark.apache.org

Hi Satish,
I don't see where spark support -i, so suspect it is provided by DSE. In that 
case, it might be bug of DSE.


On Fri, Aug 21, 2015 at 6:02 PM, satish chandra j jsatishchan...@gmail.com 
wrote:
HI Robin,Yes, it is DSE but issue is related to Spark only
Regards,Satish Chandra
On Fri, Aug 21, 2015 at 3:06 PM, Robin East robin.e...@xense.co.uk wrote:
Not sure, never used dse - it’s part of DataStax Enterprise right?
On 21 Aug 2015, at 10:07, satish chandra j jsatishchan...@gmail.com wrote:
HI Robin,Yes, below mentioned piece or code works fine in Spark Shell but the 
same when place in Script File and executed with -i file name it creating an 
empty RDD
scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))pairs: 
org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at 
console:28

scala pairs.reduceByKey((x,y) = x + y).collectres43: Array[(Int, Int)] = 
Array((0,3), (1,50), (2,40))
Command:
dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile

I understand, I am missing something here due to which my final RDD does not 
have as required output
Regards,Satish Chandra
On Thu, Aug 20, 2015 at 8:23 PM, Robin East robin.e...@xense.co.uk wrote:
This works for me:
scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))pairs: 
org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at 
console:28

scala pairs.reduceByKey((x,y) = x + y).collectres43: Array[(Int, Int)] = 
Array((0,3), (1,50), (2,40))
On 20 Aug 2015, at 11:05, satish chandra j jsatishchan...@gmail.com wrote:
HI All,I have data in RDD as mentioned below:
RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40))

I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on 
Values for each key
Code:RDD.reduceByKey((x,y) = x+y)RDD.take(3)
Result in console:
RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at 
console:73res:Array[(Int,Int)] = Array()
Command as mentioned

dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile

Please let me know what is missing in my code, as my resultant Array is empty



Regards,Satish









-- 
Best Regards

Jeff Zhang
  

Re: Transformation not happening for reduceByKey or GroupByKey

2015-08-21 Thread satish chandra j
HI All,
Any inputs for the actual problem statement

Regards,
Satish


On Fri, Aug 21, 2015 at 5:57 PM, Jeff Zhang zjf...@gmail.com wrote:

 Yong, Thanks for your reply.

 I tried spark-shell -i script-file, it works fine for me. Not sure the
 different with
 dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile

 On Fri, Aug 21, 2015 at 7:01 PM, java8964 java8...@hotmail.com wrote:

 I believe spark-shell -i scriptFile is there. We also use it, at least
 in Spark 1.3.1.

 dse spark will just wrap spark-shell command, underline it is just
 invoking spark-shell.

 I don't know too much about the original problem though.

 Yong

 --
 Date: Fri, 21 Aug 2015 18:19:49 +0800
 Subject: Re: Transformation not happening for reduceByKey or GroupByKey
 From: zjf...@gmail.com
 To: jsatishchan...@gmail.com
 CC: robin.e...@xense.co.uk; user@spark.apache.org


 Hi Satish,

 I don't see where spark support -i, so suspect it is provided by DSE.
 In that case, it might be bug of DSE.



 On Fri, Aug 21, 2015 at 6:02 PM, satish chandra j 
 jsatishchan...@gmail.com wrote:

 HI Robin,
 Yes, it is DSE but issue is related to Spark only

 Regards,
 Satish Chandra

 On Fri, Aug 21, 2015 at 3:06 PM, Robin East robin.e...@xense.co.uk
 wrote:

 Not sure, never used dse - it’s part of DataStax Enterprise right?

 On 21 Aug 2015, at 10:07, satish chandra j jsatishchan...@gmail.com
 wrote:

 HI Robin,
 Yes, below mentioned piece or code works fine in Spark Shell but the same
 when place in Script File and executed with -i file name it creating an
 empty RDD

 scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))
 pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77]
 at makeRDD at console:28


 scala pairs.reduceByKey((x,y) = x + y).collect
 res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40))

 Command:

 dse spark --master local --jars postgresql-9.4-1201.jar -i
  ScriptFile

 I understand, I am missing something here due to which my final RDD does
 not have as required output

 Regards,
 Satish Chandra

 On Thu, Aug 20, 2015 at 8:23 PM, Robin East robin.e...@xense.co.uk
 wrote:

 This works for me:

 scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))
 pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77]
 at makeRDD at console:28


 scala pairs.reduceByKey((x,y) = x + y).collect
 res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40))

 On 20 Aug 2015, at 11:05, satish chandra j jsatishchan...@gmail.com
 wrote:

 HI All,
 I have data in RDD as mentioned below:

 RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40))


 I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function
 on Values for each key

 Code:
 RDD.reduceByKey((x,y) = x+y)
 RDD.take(3)

 Result in console:
 RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey
 at console:73
 res:Array[(Int,Int)] = Array()

 Command as mentioned

 dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile


 Please let me know what is missing in my code, as my resultant Array is
 empty



 Regards,
 Satish








 --
 Best Regards

 Jeff Zhang




 --
 Best Regards

 Jeff Zhang



Re: Transformation not happening for reduceByKey or GroupByKey

2015-08-21 Thread Abhishek R. Singh
You had:

 RDD.reduceByKey((x,y) = x+y)
 RDD.take(3)

Maybe try:

 rdd2 = RDD.reduceByKey((x,y) = x+y)
 rdd2.take(3)

-Abhishek-

On Aug 20, 2015, at 3:05 AM, satish chandra j jsatishchan...@gmail.com wrote:

 HI All,
 I have data in RDD as mentioned below:
 
 RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40))
 
 
 I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on 
 Values for each key
 
 Code:
 RDD.reduceByKey((x,y) = x+y)
 RDD.take(3)
 
 Result in console:
 RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at 
 console:73
 res:Array[(Int,Int)] = Array()
 
 Command as mentioned
 
 dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile
 
 
 Please let me know what is missing in my code, as my resultant Array is empty
 
 
 
 Regards,
 Satish
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Transformation not happening for reduceByKey or GroupByKey

2015-08-21 Thread satish chandra j
HI Abhishek,

I have even tried that but rdd2 is empty

Regards,
Satish

On Fri, Aug 21, 2015 at 6:47 PM, Abhishek R. Singh 
abhis...@tetrationanalytics.com wrote:

 You had:

  RDD.reduceByKey((x,y) = x+y)
  RDD.take(3)

 Maybe try:

  rdd2 = RDD.reduceByKey((x,y) = x+y)
  rdd2.take(3)

 -Abhishek-

 On Aug 20, 2015, at 3:05 AM, satish chandra j jsatishchan...@gmail.com
 wrote:

  HI All,
  I have data in RDD as mentioned below:
 
  RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40))
 
 
  I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function
 on Values for each key
 
  Code:
  RDD.reduceByKey((x,y) = x+y)
  RDD.take(3)
 
  Result in console:
  RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey
 at console:73
  res:Array[(Int,Int)] = Array()
 
  Command as mentioned
 
  dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile
 
 
  Please let me know what is missing in my code, as my resultant Array is
 empty
 
 
 
  Regards,
  Satish
 




RE: Transformation not happening for reduceByKey or GroupByKey

2015-08-21 Thread java8964
What version of Spark you are using, or comes with DSE 4.7?
We just cannot reproduce it in Spark.
yzhang@localhost$ more test.sparkval pairs = 
sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))pairs.reduceByKey((x,y) = x + 
y).collectyzhang@localhost$ ~/spark/bin/spark-shell --master local -i 
test.sparkWelcome to    __ / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/   /___/ .__/\_,_/_/ /_/\_\   version 1.3.1  /_/
Using Scala version 2.10.4Spark context available as sc.SQL context available 
as sqlContext.Loading test.spark...pairs: org.apache.spark.rdd.RDD[(Int, Int)] 
= ParallelCollectionRDD[0] at makeRDD at console:2115/08/21 09:58:51 WARN 
SizeEstimator: Failed to check whether UseCompressedOops is set; assuming 
yesres0: Array[(Int, Int)] = Array((0,3), (1,50), (2,40))
Yong

Date: Fri, 21 Aug 2015 19:24:09 +0530
Subject: Re: Transformation not happening for reduceByKey or GroupByKey
From: jsatishchan...@gmail.com
To: abhis...@tetrationanalytics.com
CC: user@spark.apache.org

HI Abhishek,
I have even tried that but rdd2 is empty
Regards,Satish
On Fri, Aug 21, 2015 at 6:47 PM, Abhishek R. Singh 
abhis...@tetrationanalytics.com wrote:
You had:



 RDD.reduceByKey((x,y) = x+y)

 RDD.take(3)



Maybe try:



 rdd2 = RDD.reduceByKey((x,y) = x+y)

 rdd2.take(3)



-Abhishek-



On Aug 20, 2015, at 3:05 AM, satish chandra j jsatishchan...@gmail.com wrote:



 HI All,

 I have data in RDD as mentioned below:



 RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40))





 I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on 
 Values for each key



 Code:

 RDD.reduceByKey((x,y) = x+y)

 RDD.take(3)



 Result in console:

 RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at 
 console:73

 res:Array[(Int,Int)] = Array()



 Command as mentioned



 dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile





 Please let me know what is missing in my code, as my resultant Array is empty







 Regards,

 Satish






  

Re: Transformation not happening for reduceByKey or GroupByKey

2015-08-21 Thread satish chandra j
Yes, DSE 4.7

Regards,
Satish Chandra

On Fri, Aug 21, 2015 at 3:06 PM, Robin East robin.e...@xense.co.uk wrote:

 Not sure, never used dse - it’s part of DataStax Enterprise right?

 On 21 Aug 2015, at 10:07, satish chandra j jsatishchan...@gmail.com
 wrote:

 HI Robin,
 Yes, below mentioned piece or code works fine in Spark Shell but the same
 when place in Script File and executed with -i file name it creating an
 empty RDD

 scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))
 pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at
 makeRDD at console:28


 scala pairs.reduceByKey((x,y) = x + y).collect
 res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40))

 Command:

 dse spark --master local --jars postgresql-9.4-1201.jar -i
  ScriptFile

 I understand, I am missing something here due to which my final RDD does
 not have as required output

 Regards,
 Satish Chandra

 On Thu, Aug 20, 2015 at 8:23 PM, Robin East robin.e...@xense.co.uk
 wrote:

 This works for me:

 scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))
 pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77]
 at makeRDD at console:28


 scala pairs.reduceByKey((x,y) = x + y).collect
 res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40))

 On 20 Aug 2015, at 11:05, satish chandra j jsatishchan...@gmail.com
 wrote:

 HI All,
 I have data in RDD as mentioned below:

 RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40))


 I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function
 on Values for each key

 Code:
 RDD.reduceByKey((x,y) = x+y)
 RDD.take(3)

 Result in console:
 RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey
 at console:73
 res:Array[(Int,Int)] = Array()

 Command as mentioned

 dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile


 Please let me know what is missing in my code, as my resultant Array is
 empty



 Regards,
 Satish







Re: Transformation not happening for reduceByKey or GroupByKey

2015-08-21 Thread satish chandra j
HI Robin,
Yes, it is DSE but issue is related to Spark only

Regards,
Satish Chandra

On Fri, Aug 21, 2015 at 3:06 PM, Robin East robin.e...@xense.co.uk wrote:

 Not sure, never used dse - it’s part of DataStax Enterprise right?

 On 21 Aug 2015, at 10:07, satish chandra j jsatishchan...@gmail.com
 wrote:

 HI Robin,
 Yes, below mentioned piece or code works fine in Spark Shell but the same
 when place in Script File and executed with -i file name it creating an
 empty RDD

 scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))
 pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at
 makeRDD at console:28


 scala pairs.reduceByKey((x,y) = x + y).collect
 res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40))

 Command:

 dse spark --master local --jars postgresql-9.4-1201.jar -i
  ScriptFile

 I understand, I am missing something here due to which my final RDD does
 not have as required output

 Regards,
 Satish Chandra

 On Thu, Aug 20, 2015 at 8:23 PM, Robin East robin.e...@xense.co.uk
 wrote:

 This works for me:

 scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))
 pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77]
 at makeRDD at console:28


 scala pairs.reduceByKey((x,y) = x + y).collect
 res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40))

 On 20 Aug 2015, at 11:05, satish chandra j jsatishchan...@gmail.com
 wrote:

 HI All,
 I have data in RDD as mentioned below:

 RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40))


 I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function
 on Values for each key

 Code:
 RDD.reduceByKey((x,y) = x+y)
 RDD.take(3)

 Result in console:
 RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey
 at console:73
 res:Array[(Int,Int)] = Array()

 Command as mentioned

 dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile


 Please let me know what is missing in my code, as my resultant Array is
 empty



 Regards,
 Satish







Re: Transformation not happening for reduceByKey or GroupByKey

2015-08-21 Thread Jeff Zhang
Hi Satish,

I don't see where spark support -i, so suspect it is provided by DSE. In
that case, it might be bug of DSE.



On Fri, Aug 21, 2015 at 6:02 PM, satish chandra j jsatishchan...@gmail.com
wrote:

 HI Robin,
 Yes, it is DSE but issue is related to Spark only

 Regards,
 Satish Chandra

 On Fri, Aug 21, 2015 at 3:06 PM, Robin East robin.e...@xense.co.uk
 wrote:

 Not sure, never used dse - it’s part of DataStax Enterprise right?

 On 21 Aug 2015, at 10:07, satish chandra j jsatishchan...@gmail.com
 wrote:

 HI Robin,
 Yes, below mentioned piece or code works fine in Spark Shell but the same
 when place in Script File and executed with -i file name it creating an
 empty RDD

 scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))
 pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77]
 at makeRDD at console:28


 scala pairs.reduceByKey((x,y) = x + y).collect
 res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40))

 Command:

 dse spark --master local --jars postgresql-9.4-1201.jar -i
  ScriptFile

 I understand, I am missing something here due to which my final RDD does
 not have as required output

 Regards,
 Satish Chandra

 On Thu, Aug 20, 2015 at 8:23 PM, Robin East robin.e...@xense.co.uk
 wrote:

 This works for me:

 scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))
 pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77]
 at makeRDD at console:28


 scala pairs.reduceByKey((x,y) = x + y).collect
 res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40))

 On 20 Aug 2015, at 11:05, satish chandra j jsatishchan...@gmail.com
 wrote:

 HI All,
 I have data in RDD as mentioned below:

 RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40))


 I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function
 on Values for each key

 Code:
 RDD.reduceByKey((x,y) = x+y)
 RDD.take(3)

 Result in console:
 RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey
 at console:73
 res:Array[(Int,Int)] = Array()

 Command as mentioned

 dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile


 Please let me know what is missing in my code, as my resultant Array is
 empty



 Regards,
 Satish








-- 
Best Regards

Jeff Zhang


Re: Transformation not happening for reduceByKey or GroupByKey

2015-08-21 Thread satish chandra j
HI Robin,
Yes, below mentioned piece or code works fine in Spark Shell but the same
when place in Script File and executed with -i file name it creating an
empty RDD

scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))
pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at
makeRDD at console:28


scala pairs.reduceByKey((x,y) = x + y).collect
res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40))

Command:

dse spark --master local --jars postgresql-9.4-1201.jar -i
 ScriptFile

I understand, I am missing something here due to which my final RDD does
not have as required output

Regards,
Satish Chandra

On Thu, Aug 20, 2015 at 8:23 PM, Robin East robin.e...@xense.co.uk wrote:

 This works for me:

 scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))
 pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at
 makeRDD at console:28


 scala pairs.reduceByKey((x,y) = x + y).collect
 res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40))

 On 20 Aug 2015, at 11:05, satish chandra j jsatishchan...@gmail.com
 wrote:

 HI All,
 I have data in RDD as mentioned below:

 RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40))


 I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on
 Values for each key

 Code:
 RDD.reduceByKey((x,y) = x+y)
 RDD.take(3)

 Result in console:
 RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at
 console:73
 res:Array[(Int,Int)] = Array()

 Command as mentioned

 dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile


 Please let me know what is missing in my code, as my resultant Array is
 empty



 Regards,
 Satish





Transformation not happening for reduceByKey or GroupByKey

2015-08-20 Thread satish chandra j
HI All,
I have data in RDD as mentioned below:

RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40))


I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on
Values for each key

Code:
RDD.reduceByKey((x,y) = x+y)
RDD.take(3)

Result in console:
RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at
console:73
res:Array[(Int,Int)] = Array()

Command as mentioned

dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile


Please let me know what is missing in my code, as my resultant Array is
empty



Regards,
Satish


Re: Transformation not happening for reduceByKey or GroupByKey

2015-08-20 Thread satish chandra j
HI All,
Could anybody let me know what is that i missing here, it should work as
its a basic transformation

Please let me know if any additional information required

Regards,
Satish

On Thu, Aug 20, 2015 at 3:35 PM, satish chandra j jsatishchan...@gmail.com
wrote:

 HI All,
 I have data in RDD as mentioned below:

 RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40))


 I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on
 Values for each key

 Code:
 RDD.reduceByKey((x,y) = x+y)
 RDD.take(3)

 Result in console:
 RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at
 console:73
 res:Array[(Int,Int)] = Array()

 Command as mentioned

 dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile


 Please let me know what is missing in my code, as my resultant Array is
 empty



 Regards,
 Satish




Want to avoid groupByKey as its running for ever

2015-06-30 Thread ๏̯͡๏
I have a RDD of type (String,
 
Iterable[(com.ebay.ep.poc.spark.reporting.process.detail.model.DetailInputRecord,
com.ebay.ep.poc.spark.reporting.process.model.DataRecord)])]

Here String is Key and a list of tuples for that key. I got above RDD after
doing a groupByKey. I later want to compute total number of values for a
given key and total number of unique values for the same given key and
hence i do this

val totalViCount = details.size.toLong
val uniqueViCount =
details.map(_._1.get(itemId).asInstanceOf[Long]).distinct.size.toLong

How do i do this using reduceByKey.

*Total Code:*

  val groupedDetail: RDD[(String, Iterable[(DetailInputRecord,
DataRecord)])] = detailInputsToGroup.map {
case (detailInput, dataRecord) =
  val key: StringBuilder = new StringBuilder
  dimensions.foreach {
dimension =
  key ++= {

Option(dataRecord.get(dimension)).getOrElse(Option(detailInput.get(dimension)).getOrElse()).toString
  }
  }
  (key.toString, (detailInput, dataRecord))
  }.groupByKey

  groupedDetail.map {
case (key, values) = {
  val valueList = values.toList

  //Compute dimensions // You can skup this
  val (detailInput, dataRecord) = valueList.head
  val schema = SchemaUtil.outputSchema(_detail)
  val detailOutput = new DetailOutputRecord(detail, new
SessionRecord(schema))
  DataUtil.populateDimensions(schema, dimensions.toArray,
detailInput, dataRecord, detailOutput)


  val metricsData = metricProviders.flatMap {
case (className, instance) =
  val data = instance.getMetrics(valueList)
  ReflectionUtil.getData(data,
_metricProviderMemberNames(className))
  }
  metricsData.map { case (k, v) = detailOutput.put(k, v) }
  val wrap = new AvroKey[DetailOutputRecord](detailOutput)
  (wrap, NullWritable.get)
}
  }


//getMetrics:
  def getMetrics(details: List[(DetailInputRecord, DataRecord)]) = {
val totalViCount = details.size.toLong
val uniqueViCount =
details.map(_._1.get(itemId).asInstanceOf[Long]).distinct.size.toLong
new ViewItemCountMetric(totalViCount, uniqueViCount)
  }


I understand that totalViCount can be implemented using reduceByKey. How
can i implement total unique count as i need to have the full list to know
the unique values.

-- 
Deepak


Re: Want to avoid groupByKey as its running for ever

2015-06-30 Thread Daniel Siegmann
If the number of items is very large, have you considered using
probabilistic counting? The HyperLogLogPlus
https://github.com/addthis/stream-lib/blob/master/src/main/java/com/clearspring/analytics/stream/cardinality/HyperLogLogPlus.java
class from stream-lib https://github.com/addthis/stream-lib might be
suitable.

On Tue, Jun 30, 2015 at 2:29 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 I have a RDD of type (String,
  
 Iterable[(com.ebay.ep.poc.spark.reporting.process.detail.model.DetailInputRecord,
 com.ebay.ep.poc.spark.reporting.process.model.DataRecord)])]

 Here String is Key and a list of tuples for that key. I got above RDD
 after doing a groupByKey. I later want to compute total number of values
 for a given key and total number of unique values for the same given key
 and hence i do this

 val totalViCount = details.size.toLong
 val uniqueViCount =
 details.map(_._1.get(itemId).asInstanceOf[Long]).distinct.size.toLong

 How do i do this using reduceByKey.

 *Total Code:*

   val groupedDetail: RDD[(String, Iterable[(DetailInputRecord,
 DataRecord)])] = detailInputsToGroup.map {
 case (detailInput, dataRecord) =
   val key: StringBuilder = new StringBuilder
   dimensions.foreach {
 dimension =
   key ++= {

 Option(dataRecord.get(dimension)).getOrElse(Option(detailInput.get(dimension)).getOrElse()).toString
   }
   }
   (key.toString, (detailInput, dataRecord))
   }.groupByKey

   groupedDetail.map {
 case (key, values) = {
   val valueList = values.toList

   //Compute dimensions // You can skup this
   val (detailInput, dataRecord) = valueList.head
   val schema = SchemaUtil.outputSchema(_detail)
   val detailOutput = new DetailOutputRecord(detail, new
 SessionRecord(schema))
   DataUtil.populateDimensions(schema, dimensions.toArray,
 detailInput, dataRecord, detailOutput)


   val metricsData = metricProviders.flatMap {
 case (className, instance) =
   val data = instance.getMetrics(valueList)
   ReflectionUtil.getData(data,
 _metricProviderMemberNames(className))
   }
   metricsData.map { case (k, v) = detailOutput.put(k, v) }
   val wrap = new AvroKey[DetailOutputRecord](detailOutput)
   (wrap, NullWritable.get)
 }
   }


 //getMetrics:
   def getMetrics(details: List[(DetailInputRecord, DataRecord)]) = {
 val totalViCount = details.size.toLong
 val uniqueViCount =
 details.map(_._1.get(itemId).asInstanceOf[Long]).distinct.size.toLong
 new ViewItemCountMetric(totalViCount, uniqueViCount)
   }


 I understand that totalViCount can be implemented using reduceByKey. How
 can i implement total unique count as i need to have the full list to know
 the unique values.

 --
 Deepak




Re: Want to avoid groupByKey as its running for ever

2015-06-30 Thread ๏̯͡๏
I modified to



  detailInputsToGroup.map {
case (detailInput, dataRecord) =
  val key: StringBuilder = new StringBuilder
  dimensions.foreach {
dimension =
  key ++= {

Option(dataRecord.get(dimension)).getOrElse(Option(detailInput.get(dimension)).getOrElse()).toString
  }
  }
  (key.toString, (detailInput, dataRecord))
  }.reduceByKey {
case (v1, v2) = {
  val v1Detail = v1._1
  val v2Detail = v2._1
  val v1Data = v1._2
  val v2Data = v2._2

*  val totalViCount =
Option(v1Data.get(totalViCount).asInstanceOf[Int]).getOrElse(0)*
*  v1Data.getRecord.put(totalViCount, totalViCount + 1)*
  (v1)
}
  }.map {
case (k, v) = {
  val schema = SchemaUtil.outputSchema(_detail)
  val detailOutputRecord = new DetailOutputRecord(detail, new
SessionRecord(schema))

  //Compute dimensions
  DataUtil.populateDimensions(schema, dimensions.toArray, v._1,
v._2, detailOutputRecord)

  //Construct Output
  val wrap = new AvroKey[DetailOutputRecord](detailOutputRecord)
  (wrap, NullWritable.get)
}
  }


How do i compute unique count ?

On Tue, Jun 30, 2015 at 12:04 PM, Daniel Siegmann 
daniel.siegm...@teamaol.com wrote:

 If the number of items is very large, have you considered using
 probabilistic counting? The HyperLogLogPlus
 https://github.com/addthis/stream-lib/blob/master/src/main/java/com/clearspring/analytics/stream/cardinality/HyperLogLogPlus.java
 class from stream-lib https://github.com/addthis/stream-lib might be
 suitable.

 On Tue, Jun 30, 2015 at 2:29 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I have a RDD of type (String,
  
 Iterable[(com.ebay.ep.poc.spark.reporting.process.detail.model.DetailInputRecord,
 com.ebay.ep.poc.spark.reporting.process.model.DataRecord)])]

 Here String is Key and a list of tuples for that key. I got above RDD
 after doing a groupByKey. I later want to compute total number of values
 for a given key and total number of unique values for the same given key
 and hence i do this

 val totalViCount = details.size.toLong
 val uniqueViCount =
 details.map(_._1.get(itemId).asInstanceOf[Long]).distinct.size.toLong

 How do i do this using reduceByKey.

 *Total Code:*

   val groupedDetail: RDD[(String, Iterable[(DetailInputRecord,
 DataRecord)])] = detailInputsToGroup.map {
 case (detailInput, dataRecord) =
   val key: StringBuilder = new StringBuilder
   dimensions.foreach {
 dimension =
   key ++= {

 Option(dataRecord.get(dimension)).getOrElse(Option(detailInput.get(dimension)).getOrElse()).toString
   }
   }
   (key.toString, (detailInput, dataRecord))
   }.groupByKey

   groupedDetail.map {
 case (key, values) = {
   val valueList = values.toList

   //Compute dimensions // You can skup this
   val (detailInput, dataRecord) = valueList.head
   val schema = SchemaUtil.outputSchema(_detail)
   val detailOutput = new DetailOutputRecord(detail, new
 SessionRecord(schema))
   DataUtil.populateDimensions(schema, dimensions.toArray,
 detailInput, dataRecord, detailOutput)


   val metricsData = metricProviders.flatMap {
 case (className, instance) =
   val data = instance.getMetrics(valueList)
   ReflectionUtil.getData(data,
 _metricProviderMemberNames(className))
   }
   metricsData.map { case (k, v) = detailOutput.put(k, v) }
   val wrap = new AvroKey[DetailOutputRecord](detailOutput)
   (wrap, NullWritable.get)
 }
   }


 //getMetrics:
   def getMetrics(details: List[(DetailInputRecord, DataRecord)]) = {
 val totalViCount = details.size.toLong
 val uniqueViCount =
 details.map(_._1.get(itemId).asInstanceOf[Long]).distinct.size.toLong
 new ViewItemCountMetric(totalViCount, uniqueViCount)
   }


 I understand that totalViCount can be implemented using reduceByKey. How
 can i implement total unique count as i need to have the full list to know
 the unique values.

 --
 Deepak





-- 
Deepak


Re: workaround for groupByKey

2015-06-23 Thread Silvio Fiorito
It all depends on what it is you need to do with the pages. If you’re just 
going to be collecting them then it’s really not much different than a 
groupByKey. If instead you’re looking to derive some other value from the 
series of pages then you could potentially partition by user id and run a 
mapPartitions or one of the other combineByKey APIs?


From: Jianguo Li
Date: Tuesday, June 23, 2015 at 9:46 AM
To: Silvio Fiorito
Cc: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: workaround for groupByKey

Thanks. Yes, unfortunately, they all need to be grouped. I guess I can 
partition the record by user id. However, I have millions of users, do you 
think partition by user id will help?

Jianguo

On Mon, Jun 22, 2015 at 6:28 PM, Silvio Fiorito 
silvio.fior...@granturing.commailto:silvio.fior...@granturing.com wrote:
You’re right of course, I’m sorry. I was typing before thinking about what you 
actually asked!

On a second thought, what is the ultimate outcome for what you want the 
sequence of pages for? Do they need to actually all be grouped? Could you 
instead partition by user id then use a mapPartitions perhaps?

From: Jianguo Li
Date: Monday, June 22, 2015 at 6:21 PM
To: Silvio Fiorito
Cc: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: workaround for groupByKey

Thanks for your suggestion. I guess aggregateByKey is similar to combineByKey. 
I read in the Learning Sparking

We can disable map-side aggregation in combineByKey() if we know that our data 
won’t benefit from it. For example, groupByKey() disables map-side aggregation 
as the aggregation function (appending to a list) does not save any space. If 
we want to disable map-side combines, we need to specify the partitioner; for 
now you can just use the partitioner on the source RDD by passingrdd.partitioner

It seems that when the map-side aggregation function is to append something to 
a list (as opposed to summing over all the numbers), then this map-side 
aggregation does not offer any benefit since appending to a list does not save 
any space. Is my understanding correct?

Thanks,

Jianguo

On Mon, Jun 22, 2015 at 4:43 PM, Silvio Fiorito 
silvio.fior...@granturing.commailto:silvio.fior...@granturing.com wrote:
You can use aggregateByKey as one option:

val input: RDD[Int, String] = ...

val test = input.aggregateByKey(ListBuffer.empty[String])((a, b) = a += b, (a, 
b) = a ++ b)

From: Jianguo Li
Date: Monday, June 22, 2015 at 5:12 PM
To: user@spark.apache.orgmailto:user@spark.apache.org
Subject: workaround for groupByKey

Hi,

I am processing an RDD of key-value pairs. The key is an user_id, and the value 
is an website url the user has ever visited.

Since I need to know all the urls each user has visited, I am  tempted to call 
the groupByKey on this RDD. However, since there could be millions of users and 
urls, the shuffling caused by groupByKey proves to be a major bottleneck to get 
the job done. Is there any workaround? I want to end up with an RDD of 
key-value pairs, where the key is an user_id, the value is a list of all the 
urls visited by the user.

Thanks,

Jianguo




Re: workaround for groupByKey

2015-06-23 Thread Jianguo Li
Thanks. Yes, unfortunately, they all need to be grouped. I guess I can
partition the record by user id. However, I have millions of users, do you
think partition by user id will help?

Jianguo

On Mon, Jun 22, 2015 at 6:28 PM, Silvio Fiorito 
silvio.fior...@granturing.com wrote:

   You’re right of course, I’m sorry. I was typing before thinking about
 what you actually asked!

  On a second thought, what is the ultimate outcome for what you want the
 sequence of pages for? Do they need to actually all be grouped? Could you
 instead partition by user id then use a mapPartitions perhaps?

   From: Jianguo Li
 Date: Monday, June 22, 2015 at 6:21 PM
 To: Silvio Fiorito
 Cc: user@spark.apache.org
 Subject: Re: workaround for groupByKey

   Thanks for your suggestion. I guess aggregateByKey is similar to
 combineByKey. I read in the Learning Sparking

  *We can disable map-side aggregation in combineByKey() if we know that
 our data won’t benefit from it. For example, groupByKey() disables map-side
 aggregation as the aggregation function (appending to a list) does not save
 any space. If we want to disable map-side combines, we need to specify the
 partitioner; for now you can just use the partitioner on the source RDD by
 passingrdd.partitioner*

  It seems that when the map-side aggregation function is to append
 something to a list (as opposed to summing over all the numbers), then this
 map-side aggregation does not offer any benefit since appending to a list
 does not save any space. Is my understanding correct?

  Thanks,

  Jianguo

 On Mon, Jun 22, 2015 at 4:43 PM, Silvio Fiorito 
 silvio.fior...@granturing.com wrote:

  You can use aggregateByKey as one option:

  val input: RDD[Int, String] = ...

  val test = input.aggregateByKey(ListBuffer.empty[String])((a, b) = a
 += b, (a, b) = a ++ b)

   From: Jianguo Li
 Date: Monday, June 22, 2015 at 5:12 PM
 To: user@spark.apache.org
 Subject: workaround for groupByKey

   Hi,

  I am processing an RDD of key-value pairs. The key is an user_id, and
 the value is an website url the user has ever visited.

  Since I need to know all the urls each user has visited, I am  tempted
 to call the groupByKey on this RDD. However, since there could be millions
 of users and urls, the shuffling caused by groupByKey proves to be a major
 bottleneck to get the job done. Is there any workaround? I want to end up
 with an RDD of key-value pairs, where the key is an user_id, the value is a
 list of all the urls visited by the user.

  Thanks,

  Jianguo





workaround for groupByKey

2015-06-22 Thread Jianguo Li
Hi,

I am processing an RDD of key-value pairs. The key is an user_id, and the
value is an website url the user has ever visited.

Since I need to know all the urls each user has visited, I am  tempted to
call the groupByKey on this RDD. However, since there could be millions of
users and urls, the shuffling caused by groupByKey proves to be a major
bottleneck to get the job done. Is there any workaround? I want to end up
with an RDD of key-value pairs, where the key is an user_id, the value is a
list of all the urls visited by the user.

Thanks,

Jianguo


Re: workaround for groupByKey

2015-06-22 Thread ๏̯͡๏
There is reduceByKey that works on K,V. You need to accumulate partial
results and proceed. does your computation allow that ?



On Mon, Jun 22, 2015 at 2:12 PM, Jianguo Li flyingfromch...@gmail.com
wrote:

 Hi,

 I am processing an RDD of key-value pairs. The key is an user_id, and the
 value is an website url the user has ever visited.

 Since I need to know all the urls each user has visited, I am  tempted to
 call the groupByKey on this RDD. However, since there could be millions of
 users and urls, the shuffling caused by groupByKey proves to be a major
 bottleneck to get the job done. Is there any workaround? I want to end up
 with an RDD of key-value pairs, where the key is an user_id, the value is a
 list of all the urls visited by the user.

 Thanks,

 Jianguo




-- 
Deepak


Re: workaround for groupByKey

2015-06-22 Thread ๏̯͡๏
Silvio,

Suppose my RDD is (K-1, v1,v2,v3,v4).
If i want to do simple addition i can use reduceByKey or aggregateByKey.
What if my processing needs to check all the items in the value list each
time, Above two operations do not get all the values, they just get two
pairs (v1, v2) , you do some processing and store it back into v1.

How do i use the combiner facility present with reduceByKey 
aggregateByKey.

-deepak

On Mon, Jun 22, 2015 at 2:43 PM, Silvio Fiorito 
silvio.fior...@granturing.com wrote:

  You can use aggregateByKey as one option:

  val input: RDD[Int, String] = ...

  val test = input.aggregateByKey(ListBuffer.empty[String])((a, b) = a +=
 b, (a, b) = a ++ b)

   From: Jianguo Li
 Date: Monday, June 22, 2015 at 5:12 PM
 To: user@spark.apache.org
 Subject: workaround for groupByKey

   Hi,

  I am processing an RDD of key-value pairs. The key is an user_id, and
 the value is an website url the user has ever visited.

  Since I need to know all the urls each user has visited, I am  tempted
 to call the groupByKey on this RDD. However, since there could be millions
 of users and urls, the shuffling caused by groupByKey proves to be a major
 bottleneck to get the job done. Is there any workaround? I want to end up
 with an RDD of key-value pairs, where the key is an user_id, the value is a
 list of all the urls visited by the user.

  Thanks,

  Jianguo




-- 
Deepak


Re: workaround for groupByKey

2015-06-22 Thread Silvio Fiorito
You’re right of course, I’m sorry. I was typing before thinking about what you 
actually asked!

On a second thought, what is the ultimate outcome for what you want the 
sequence of pages for? Do they need to actually all be grouped? Could you 
instead partition by user id then use a mapPartitions perhaps?

From: Jianguo Li
Date: Monday, June 22, 2015 at 6:21 PM
To: Silvio Fiorito
Cc: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: workaround for groupByKey

Thanks for your suggestion. I guess aggregateByKey is similar to combineByKey. 
I read in the Learning Sparking

We can disable map-side aggregation in combineByKey() if we know that our data 
won’t benefit from it. For example, groupByKey() disables map-side aggregation 
as the aggregation function (appending to a list) does not save any space. If 
we want to disable map-side combines, we need to specify the partitioner; for 
now you can just use the partitioner on the source RDD by passingrdd.partitioner

It seems that when the map-side aggregation function is to append something to 
a list (as opposed to summing over all the numbers), then this map-side 
aggregation does not offer any benefit since appending to a list does not save 
any space. Is my understanding correct?

Thanks,

Jianguo

On Mon, Jun 22, 2015 at 4:43 PM, Silvio Fiorito 
silvio.fior...@granturing.commailto:silvio.fior...@granturing.com wrote:
You can use aggregateByKey as one option:

val input: RDD[Int, String] = ...

val test = input.aggregateByKey(ListBuffer.empty[String])((a, b) = a += b, (a, 
b) = a ++ b)

From: Jianguo Li
Date: Monday, June 22, 2015 at 5:12 PM
To: user@spark.apache.orgmailto:user@spark.apache.org
Subject: workaround for groupByKey

Hi,

I am processing an RDD of key-value pairs. The key is an user_id, and the value 
is an website url the user has ever visited.

Since I need to know all the urls each user has visited, I am  tempted to call 
the groupByKey on this RDD. However, since there could be millions of users and 
urls, the shuffling caused by groupByKey proves to be a major bottleneck to get 
the job done. Is there any workaround? I want to end up with an RDD of 
key-value pairs, where the key is an user_id, the value is a list of all the 
urls visited by the user.

Thanks,

Jianguo



Re: Creating RDD from Iterable from groupByKey results

2015-06-16 Thread nir
I updated code sample so people can understand better what are my inputs and
outputs.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Creating-RDD-from-Iterable-from-groupByKey-results-tp23328p23341.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Creating RDD from Iterable from groupByKey results

2015-06-15 Thread Nirav Patel
I am trying to create new RDD based on given PairRDD. I have a PairRDD with
few keys but each keys have large (about 100k) values. I want to somehow
repartition, make each `Iterablev` into RDD[v] so that I can further
apply map, reduce, sortBy etc effectively on those values. I am sensing
flatMapValues is my friend but want to check with other sparkens. This is
for real-time spark app. I have already tried collect() and computing all
measures in-memory of app server but trying to improve upon it.
This is what I try (psuedo)

class ComputeMetrices{
transient JavaSparkContext sparkContext;

public MapString, V computeMetrices(JavaPairRdd javaPairRdd) {

  javaPairRdd.groupByKey(10).mapValues(itr = {
  sparContext.parallelize(list(itr)) //null pointer ; probably at
sparkContext
  })
}
}

I want to create RDD out of that Iterable from groupByKey result so that I
can user further spark transformations.

Thanks
Nir

-- 


[image: What's New with Xactly] http://www.xactlycorp.com/email-click/

[image: Facebook] http://www.facebook.com/XactlyCorp  [image: LinkedIn] 
http://www.linkedin.com/company/xactly-corporation  [image: Twitter] 
https://twitter.com/xactly  [image: YouTube] 
http://www.youtube.com/xactlycorporation


Spark groupByKey, does it always create at least 1 partition per key?

2015-05-18 Thread tomboyle
I am currently using spark streaming. During my batch processing I must
groupByKey. Afterwards I call foreachRDD  foreachPartition  write to an
external datastore.

My only concern with this is if it's future proof? I know groupByKey by
default uses the hashPartitioner. I have printed out the internals of
partitions and loaded large text files into memory and ran groupByKey just
to make sure.

I have two questions.
#1 First will my implementation ever break in the future? Will partitions 
groupByKey work differently?
#2 Is it possible for a (key,values) to exist on more than 1 partition after
using groupByKey.

Notes: I'm aware groupByKey, is not very efficient. However I am not working
with large amounts of data  can process batches very quickly. Below I could
have used aggregateByKey because I printed the sum, however my real
implementation is much different and I do need each value for each key I can
not reduce the data.

1 Million line test log file
Partition HashCode: 965943941 Key:lol Size:2346
Partition HashCode: 1605678983 Key:ee Size:4692
Partition HashCode: 1605678983 Key:aa Size:32844
Partition HashCode: 1605678983 Key:gg Size:4692
Partition HashCode: 1605678983 Key:dd Size:11730
Partition HashCode: 1605678983 Key:hh Size:4692
Partition HashCode: 1605678983 Key:kk Size:2346
Partition HashCode: 1605678983 Key:tt Size:4692
Partition HashCode: 1605678983 Key:ff Size:2346
Partition HashCode: 1605678983 Key:bb Size:18768
Partition HashCode: 1605678983 Key:cc Size:14076




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-groupByKey-does-it-always-create-at-least-1-partition-per-key-tp22938.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Using groupByKey with Spark SQL

2015-05-15 Thread Michael Armbrust
Perhaps you are looking for GROUP BY and collect_set, which would allow you
to stay in SQL.  I'll add that in Spark 1.4 you can get access to items of
a row by name.

On Fri, May 15, 2015 at 10:48 AM, Edward Sargisson ejsa...@gmail.com
wrote:

 Hi all,
 This might be a question to be answered or feedback for a possibly new
 feature depending:

 We have source data which is events about the state changes of an entity
 (identified by an ID) represented as nested JSON.
 We wanted to sessionize this data so that we had a collection of all the
 events for a given ID as we have to do more processing based on what we
 find.

 We tried doing this using Spark SQL and then converting to a JavaPairRDD
 using DataFrame.javaRdd.groupByKey.

 The schema inference worked great but what was frustrating was that the
 result of groupByKey is String, IterableRow. Rows only have get(int)
 methods and don't take notice of the schema stuff so they ended up being
 something we didn't want to work with.

 We are currently solving this problem by ignoring Spark SQL and
 deserializing the event JSON into a POJO for further processing.

 Are there better approaches to this?
 Perhaps Spark should have a DataFrame.groupByKey that returns Rows that
 can be used with the schema stuff?

 Thanks!
 Edward



Re: why does groupByKey return RDD[(K, Iterable[V])] not RDD[(K, CompactBuffer[V])] ?

2015-04-23 Thread Hao Ren
Should I repost this to dev list ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/why-does-groupByKey-return-RDD-K-Iterable-V-not-RDD-K-CompactBuffer-V-tp22616p22640.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: why does groupByKey return RDD[(K, Iterable[V])] not RDD[(K, CompactBuffer[V])] ?

2015-04-23 Thread Corey Nolet
If you return an iterable, you are not tying the API to a compactbuffer.
Someday, the data could be fetched lazily and he API would not have to
change.
On Apr 23, 2015 6:59 PM, Dean Wampler deanwamp...@gmail.com wrote:

 I wasn't involved in this decision (I just make the fries), but
 CompactBuffer is designed for relatively small data sets that at least fit
 in memory. It's more or less an Array. In principle, returning an iterator
 could hide the actual data structure that might be needed to hold a much
 bigger data set, if necessary.

 HOWEVER, it actually returns a CompactBuffer.


 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L444


 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Thu, Apr 23, 2015 at 5:46 PM, Hao Ren inv...@gmail.com wrote:

 Should I repost this to dev list ?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/why-does-groupByKey-return-RDD-K-Iterable-V-not-RDD-K-CompactBuffer-V-tp22616p22640.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: why does groupByKey return RDD[(K, Iterable[V])] not RDD[(K, CompactBuffer[V])] ?

2015-04-23 Thread Koert Kuipers
because CompactBuffer is considered an implementation detail. It is also
not public for the same reason.

On Thu, Apr 23, 2015 at 6:46 PM, Hao Ren inv...@gmail.com wrote:

 Should I repost this to dev list ?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/why-does-groupByKey-return-RDD-K-Iterable-V-not-RDD-K-CompactBuffer-V-tp22616p22640.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: why does groupByKey return RDD[(K, Iterable[V])] not RDD[(K, CompactBuffer[V])] ?

2015-04-23 Thread Dean Wampler
I wasn't involved in this decision (I just make the fries), but
CompactBuffer is designed for relatively small data sets that at least fit
in memory. It's more or less an Array. In principle, returning an iterator
could hide the actual data structure that might be needed to hold a much
bigger data set, if necessary.

HOWEVER, it actually returns a CompactBuffer.

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L444


Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Thu, Apr 23, 2015 at 5:46 PM, Hao Ren inv...@gmail.com wrote:

 Should I repost this to dev list ?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/why-does-groupByKey-return-RDD-K-Iterable-V-not-RDD-K-CompactBuffer-V-tp22616p22640.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




why does groupByKey return RDD[(K, Iterable[V])] not RDD[(K, CompactBuffer[V])] ?

2015-04-22 Thread Hao Ren
Hi,

Just a quick question,

Regarding the source code of groupByKey:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L453

In the end, it cast CompactBuffer to Iterable. But why ? Any advantage?

Thank you.

Hao.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/why-does-groupByKey-return-RDD-K-Iterable-V-not-RDD-K-CompactBuffer-V-tp22616.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: GroupByKey causing problem

2015-02-26 Thread Imran Rashid
Hi Tushar,

The most scalable option is probably for you to consider doing some
approximation.  Eg., sample the first to come up with the bucket
boundaries.  Then you can assign data points to buckets without needing to
do a full groupByKey.  You could even have more passes which corrects any
errors in your approximation (eg., see how sortByKey() works, and how it
samples the underlying RDD when constructing the RangePartitioner).  Though
its more passes through the data, it will probably be much faster since you
avoid the expensive groupByKey()

Imran

On Thu, Feb 26, 2015 at 3:38 AM, Tushar Sharma tushars...@gmail.com wrote:

 Hi,

 I am trying to apply binning to a large CSV dataset. Here are the steps I
 am taking:

 1. Emit each value of CSV as (ColIndex,(RowIndex,value))
 2. Then I groupByKey (here ColumnIndex) and get all values of a particular
 index to one node, as I have to work on the collection of all values
 3. I apply my binning algorithm which is as follows:
 a. Sort the values
 b. Iterate through values and see if it is different than the previous
 one
 if no then add it to the same bin
 if yes then check the size of that bin, if it is greater than a
 particular size (say 5% of wholedataset) then change the bin
 number, else keep the same bin
 c. repeat for each column

 Due to this algorithm I can't calculate it partition wise and merge for
 final result. But even for groupByKey I expect it should work , maybe
 slowly, but it should finish. I increased the partition to reduce the
 output of each groupByKey so that it helps in successful completion of the
 process. But even with that it is stuck at the same stage. The log for
 executor says:

 ExternalMapAppendOnly(splilling to disk) (Trying ...)

 The code works for small CSV files but can't complete for big files.

 val inputfile = hdfs://hm41:9000/user/file1
 val table = sc.textFile(inputfile,1000)

 val withoutHeader: RDD[String] = dropHeader(table)

 val kvPairs = withoutHeader.flatMap(retAtrTuple)

 //val filter_na = kvPairs.map{case (x,y) = (x,if(y == NA)  else y)}

 val isNum = kvPairs.map{case (x,y) = (x,isNumeric(y))}.reduceByKey(__)

 val numeric_indexes = isNum.filter{case (x,y) = y}.sortByKey().map{case
 (x,y) = x}.collect()
 //val isNum_Arr = isNum.sortByKey().collect()

 val kvidx = withoutHeader.zipWithIndex
 //val t = kvidx.map{case (a,b) = retAtrTuple(a).map(x =(x,b)) }


 val t = kvidx.flatMap{case (a,b) = retAtrTuple(a).map(x =(x,b)) }
 val t2 = t.filter{case (a,b) = numeric_indexes contains a._1 }

 //val t2 = t.filter{case (a,b) = a._1 ==0 }
 val t3 = t2.map{case ((a,b),c) = (a,(c,b.toDouble))}
 //val t4 = t3.sortBy(_._2._1)
 val t4 = t3.groupByKey.map{case (a,b) =
 (a,classing_summary(b.toArray.sortBy(_._2)))}

 def dropHeader(data: RDD[String]): RDD[String] = {
 data.mapPartitionsWithIndex((idx, lines) = {
   if (idx == 0) {
 lines.drop(1)
   }
   lines
 })
   }


   def retAtrTuple(x: String) = {
 val newX = x.split(',')
 for (h - 0 until newX.length)
   yield (h, newX(h))
   }

 def isNumeric(s: String): Boolean = {
 (allCatch opt s.toDouble).isDefined
   }

 def classing_summary(arr: Array[(Long, Double)]) = {
   var idx = 0L
   var value = 0.0
   var prevValue = Double.MinValue
   var counter = 1
   var classSize = 0.0
   var size = arr.length

   val output = for(i - 0 until arr.length) yield {
   idx = arr(i)._1;
   value = arr(i)._2;
   if(value==prevValue){
 classSize+=1.0/size;
 //println(both values same)
 //println(idx,value,classSize,counter,classSize);
 prevValue = value;
 (idx,value,counter,classSize);
   }
   else if(classSize(0.05)){
 classSize+=1.0/size;
 //println(both values not same, adding to present bucket)
 //println(idx,value,classSize,counter,classSize);
 prevValue = value;
 (idx,value,counter,classSize);
   }
   else {
 classSize = 1.0/size;
 counter +=1;
 //println(both values not same, adding to different bucket)
 //println(idx,value,classSize,counter,classSize);
 prevValue = value;
 (idx,value,counter,classSize);
   }
   }
   output.toArray
 }

 Thanks in advance,

 Tushar Sharma



GroupByKey causing problem

2015-02-26 Thread Tushar Sharma
Hi,

I am trying to apply binning to a large CSV dataset. Here are the steps I
am taking:

1. Emit each value of CSV as (ColIndex,(RowIndex,value))
2. Then I groupByKey (here ColumnIndex) and get all values of a particular
index to one node, as I have to work on the collection of all values
3. I apply my binning algorithm which is as follows:
a. Sort the values
b. Iterate through values and see if it is different than the previous
one
if no then add it to the same bin
if yes then check the size of that bin, if it is greater than a
particular size (say 5% of wholedataset) then change the bin
number, else keep the same bin
c. repeat for each column

Due to this algorithm I can't calculate it partition wise and merge for
final result. But even for groupByKey I expect it should work , maybe
slowly, but it should finish. I increased the partition to reduce the
output of each groupByKey so that it helps in successful completion of the
process. But even with that it is stuck at the same stage. The log for
executor says:

ExternalMapAppendOnly(splilling to disk) (Trying ...)

The code works for small CSV files but can't complete for big files.

val inputfile = hdfs://hm41:9000/user/file1
val table = sc.textFile(inputfile,1000)

val withoutHeader: RDD[String] = dropHeader(table)

val kvPairs = withoutHeader.flatMap(retAtrTuple)

//val filter_na = kvPairs.map{case (x,y) = (x,if(y == NA)  else y)}

val isNum = kvPairs.map{case (x,y) = (x,isNumeric(y))}.reduceByKey(__)

val numeric_indexes = isNum.filter{case (x,y) = y}.sortByKey().map{case
(x,y) = x}.collect()
//val isNum_Arr = isNum.sortByKey().collect()

val kvidx = withoutHeader.zipWithIndex
//val t = kvidx.map{case (a,b) = retAtrTuple(a).map(x =(x,b)) }


val t = kvidx.flatMap{case (a,b) = retAtrTuple(a).map(x =(x,b)) }
val t2 = t.filter{case (a,b) = numeric_indexes contains a._1 }

//val t2 = t.filter{case (a,b) = a._1 ==0 }
val t3 = t2.map{case ((a,b),c) = (a,(c,b.toDouble))}
//val t4 = t3.sortBy(_._2._1)
val t4 = t3.groupByKey.map{case (a,b) =
(a,classing_summary(b.toArray.sortBy(_._2)))}

def dropHeader(data: RDD[String]): RDD[String] = {
data.mapPartitionsWithIndex((idx, lines) = {
  if (idx == 0) {
lines.drop(1)
  }
  lines
})
  }


  def retAtrTuple(x: String) = {
val newX = x.split(',')
for (h - 0 until newX.length)
  yield (h, newX(h))
  }

def isNumeric(s: String): Boolean = {
(allCatch opt s.toDouble).isDefined
  }

def classing_summary(arr: Array[(Long, Double)]) = {
  var idx = 0L
  var value = 0.0
  var prevValue = Double.MinValue
  var counter = 1
  var classSize = 0.0
  var size = arr.length

  val output = for(i - 0 until arr.length) yield {
  idx = arr(i)._1;
  value = arr(i)._2;
  if(value==prevValue){
classSize+=1.0/size;
//println(both values same)
//println(idx,value,classSize,counter,classSize);
prevValue = value;
(idx,value,counter,classSize);
  }
  else if(classSize(0.05)){
classSize+=1.0/size;
//println(both values not same, adding to present bucket)
//println(idx,value,classSize,counter,classSize);
prevValue = value;
(idx,value,counter,classSize);
  }
  else {
classSize = 1.0/size;
counter +=1;
//println(both values not same, adding to different bucket)
//println(idx,value,classSize,counter,classSize);
prevValue = value;
(idx,value,counter,classSize);
  }
  }
  output.toArray
}

Thanks in advance,

Tushar Sharma


A spark join and groupbykey that is making my containers on EC2 go over their memory limits

2015-02-11 Thread Sina Samangooei
Hello,

I have many questions about joins, but arguably just one.

specifically about memory and containers that are overstepping their limits, as 
per errors dotted around all over the place, but something like: 
http://mail-archives.apache.org/mod_mbox/spark-issues/201405.mbox/%3CJIRA.12716648.1401112206043.18368.1401118322196@arcas%3E
 
http://mail-archives.apache.org/mod_mbox/spark-issues/201405.mbox/%3CJIRA.12716648.1401112206043.18368.1401118322196@arcas%3E

I have a job (mostly like this link: http://hastebin.com/quwamoreko.scala 
http://hastebin.com/quwamoreko.scala, but with a write-to-files-based-on-keys 
thing at the end) that is doing a join between a medium sized (like, 150,000 
entries, classRDD in the link) RDD to a larger (108 million entry, objRDD in 
the link) RDD… 

the keys and values for each entry are quite small. In the linked join most 
objects will have 10 or so classes and most classes 100k associated objects. 
Though a few (10 or so?) classes will have millions of objects and some objects 
hundreds of classes.

The issue i'm having is that (on an m2.xlarge ec2 instance) my container is 
overstepping the memory limits and being shut down

This confuses me and makes me question my fundamental understanding of joins.

I thought joins were a reduce operation that happened on disk. Further, my 
joins don’t seem to hold very much in memory, indeed at any given point a pair 
of strings and another string is all i seem to hold.

The container limit is 7Gb according to the error in my container logs and has 
been apparently reasonable for jobs i’ve run in the past.
But again, I don’t see where in my program i am actually keeping anything in 
memory at all.
And yet sure enough, after about 30 minutes of running, over a time period of 
like 2 or so minutes, one of the containers grows from about 800mb to 7Gb and 
is promptly killed. 

So, my questions, what could be going on here and how can i fix it? Is this 
just some fundamental feature of my data or is there anything else i can do? 

Further rider questions: Is there some logger settings I can use for the logs 
to tell me exactly where in my job has been reached? i.e. which RDD is being 
constructed or which join is being performed? The RDD numbers and stages aren’t 
all that helpful and though i know the spark UI exists some logs i can refer 
back to when my cluster has long died would be great.

Cheers
- Sina

A spark join and groupbykey that is making my containers on EC2 go over their memory limits

2015-02-11 Thread Sina Samangooei
Hello,

I have many questions about joins, but arguably just one.

specifically about memory and containers that are overstepping their limits, as 
per errors dotted around all over the place, but something like: 
http://mail-archives.apache.org/mod_mbox/spark-issues/201405.mbox/%3CJIRA.12716648.1401112206043.18368.1401118322196@arcas%3E
 
http://mail-archives.apache.org/mod_mbox/spark-issues/201405.mbox/%3CJIRA.12716648.1401112206043.18368.1401118322196@arcas%3E

I have a job (mostly like this link: http://hastebin.com/quwamoreko.scala 
http://hastebin.com/quwamoreko.scala, but with a write-to-files-based-on-keys 
thing at the end) that is doing a join between a medium sized (like, 150,000 
entries, classRDD in the link) RDD to a larger (108 million entry, objRDD in 
the link) RDD… 

the keys and values for each entry are quite small. In the linked join most 
objects will have 10 or so classes and most classes 100k associated objects. 
Though a few (10 or so?) classes will have millions of objects and some objects 
hundreds of classes.

The issue i'm having is that (on an m2.xlarge ec2 instance) my container is 
overstepping the memory limits and being shut down

This confuses me and makes me question my fundamental understanding of joins.

I thought joins were a reduce operation that happened on disk. Further, my 
joins don’t seem to hold very much in memory, indeed at any given point a pair 
of strings and another string is all i seem to hold.

The container limit is 7Gb according to the error in my container logs and has 
been apparently reasonable for jobs i’ve run in the past.
But again, I don’t see where in my program i am actually keeping anything in 
memory at all.
And yet sure enough, after about 30 minutes of running, over a time period of 
like 2 or so minutes, one of the containers grows from about 800mb to 7Gb and 
is promptly killed. 

So, my questions, what could be going on here and how can i fix it? Is this 
just some fundamental feature of my data or is there anything else i can do? 

Further rider questions: Is there some logger settings I can use for the logs 
to tell me exactly where in my job has been reached? i.e. which RDD is being 
constructed or which join is being performed? The RDD numbers and stages aren’t 
all that helpful and though i know the spark UI exists some logs i can refer 
back to when my cluster has long died would be great.

Cheers
- Sina

A spark join and groupbykey that is making my containers on EC2 go over their memory limits

2015-02-11 Thread Sina Samangooei
Hello,

I have many questions about joins, but arguably just one.

specifically about memory and containers that are overstepping their limits, as 
per errors dotted around all over the place, but something like: 
http://mail-archives.apache.org/mod_mbox/spark-issues/201405.mbox/%3CJIRA.12716648.1401112206043.18368.1401118322196@arcas%3E
 
http://mail-archives.apache.org/mod_mbox/spark-issues/201405.mbox/%3CJIRA.12716648.1401112206043.18368.1401118322196@arcas%3E

I have a job (mostly like this link: http://hastebin.com/quwamoreko.scala 
http://hastebin.com/quwamoreko.scala, but with a write-to-files-based-on-keys 
thing at the end) that is doing a join between a medium sized (like, 150,000 
entries, classRDD in the link) RDD to a larger (108 million entry, objRDD in 
the link) RDD… 

the keys and values for each entry are quite small. In the linked join most 
objects will have 10 or so classes and most classes 100k associated objects. 
Though a few (10 or so?) classes will have millions of objects and some objects 
hundreds of classes.

The issue i'm having is that (on an m2.xlarge ec2 instance) my container is 
overstepping the memory limits and being shut down

This confuses me and makes me question my fundamental understanding of joins.

I thought joins were a reduce operation that happened on disk. Further, my 
joins don’t seem to hold very much in memory, indeed at any given point a pair 
of strings and another string is all i seem to hold.

The container limit is 7Gb according to the error in my container logs and has 
been apparently reasonable for jobs i’ve run in the past.
But again, I don’t see where in my program i am actually keeping anything in 
memory at all.
And yet sure enough, after about 30 minutes of running, over a time period of 
like 2 or so minutes, one of the containers grows from about 800mb to 7Gb and 
is promptly killed. 

So, my questions, what could be going on here and how can i fix it? Is this 
just some fundamental feature of my data or is there anything else i can do? 

Further rider questions: Is there some logger settings I can use for the logs 
to tell me exactly where in my job has been reached? i.e. which RDD is being 
constructed or which join is being performed? The RDD numbers and stages aren’t 
all that helpful and though i know the spark UI exists some logs i can refer 
back to when my cluster has long died would be great.

Cheers
- Sina

Re: groupByKey is not working

2015-01-30 Thread Stephen Boesch
Amit - IJ will not find it until you add the import as Sean mentioned.  It
includes implicits that intellij will not know about otherwise.

2015-01-30 12:44 GMT-08:00 Amit Behera amit.bd...@gmail.com:

 I am sorry Sean.

 I am developing code in intelliJ Idea. so with the above dependencies I am
 not able to find *groupByKey* when I am searching by ctrl+space


 On Sat, Jan 31, 2015 at 2:04 AM, Sean Owen so...@cloudera.com wrote:

 When you post a question anywhere, and say it's not working, you
 *really* need to say what that means.


 On Fri, Jan 30, 2015 at 8:20 PM, Amit Behera amit.bd...@gmail.com
 wrote:
  hi all,
 
  my sbt file is like this:
 
  name := Spark
 
  version := 1.0
 
  scalaVersion := 2.10.4
 
  libraryDependencies += org.apache.spark %% spark-core % 1.1.0
 
  libraryDependencies += net.sf.opencsv % opencsv % 2.3
 
 
  code:
 
  object SparkJob
  {
 
def pLines(lines:Iterator[String])={
  val parser=new CSVParser()
  lines.map(l={val vs=parser.parseLine(l)
(vs(0),vs(1).toInt)})
}
 
def main(args: Array[String]) {
  val conf = new SparkConf().setAppName(Spark
 Job).setMaster(local)
  val sc = new SparkContext(conf)
  val data = sc.textFile(/home/amit/testData.csv).cache()
  val result = data.mapPartitions(pLines).groupByKey
  //val list = result.filter(x= {(x._1).contains(24050881)})
 
}
 
  }
 
 
  Here groupByKey is not working . But same thing is working from
 spark-shell.
 
  Please help me
 
 
  Thanks
 
  Amit





Re: groupByKey is not working

2015-01-30 Thread Amit Behera
Hi Charles,

I forgot to mention. But I imported the following

import au.com.bytecode.opencsv.CSVParser

import org.apache.spark._

On Sat, Jan 31, 2015 at 2:09 AM, Charles Feduke charles.fed...@gmail.com
wrote:

 Define not working. Not compiling? If so you need:

 import org.apache.spark.SparkContext._


 On Fri Jan 30 2015 at 3:21:45 PM Amit Behera amit.bd...@gmail.com wrote:

 hi all,

 my sbt file is like this:

 name := Spark

 version := 1.0

 scalaVersion := 2.10.4

 libraryDependencies += org.apache.spark %% spark-core % 1.1.0

 libraryDependencies += net.sf.opencsv % opencsv % 2.3


 *code:*

 object SparkJob
 {

   def pLines(lines:Iterator[String])={
 val parser=new CSVParser()
 lines.map(l={val vs=parser.parseLine(l)
   (vs(0),vs(1).toInt)})
   }

   def main(args: Array[String]) {
 val conf = new SparkConf().setAppName(Spark Job).setMaster(local)
 val sc = new SparkContext(conf)
 val data = sc.textFile(/home/amit/testData.csv).cache()
 val result = data.mapPartitions(pLines).groupByKey
 //val list = result.filter(x= {(x._1).contains(24050881)})

   }

 }


 Here groupByKey is not working . But same thing is working from 
 *spark-shell.*

 Please help me


 Thanks

 Amit




Re: groupByKey is not working

2015-01-30 Thread Amit Behera
Thank you very much Charles, I got it  :)



On Sat, Jan 31, 2015 at 2:20 AM, Charles Feduke charles.fed...@gmail.com
wrote:

 You'll still need to:

 import org.apache.spark.SparkContext._

 Importing org.apache.spark._ does _not_ recurse into sub-objects or
 sub-packages, it only brings in whatever is at the level of the package or
 object imported.

 SparkContext._ has some implicits, one of them for adding groupByKey to an
 RDD[_] IIRC.


 On Fri Jan 30 2015 at 3:48:22 PM Stephen Boesch java...@gmail.com wrote:

 Amit - IJ will not find it until you add the import as Sean mentioned.
 It includes implicits that intellij will not know about otherwise.

 2015-01-30 12:44 GMT-08:00 Amit Behera amit.bd...@gmail.com:

 I am sorry Sean.

 I am developing code in intelliJ Idea. so with the above dependencies I
 am not able to find *groupByKey* when I am searching by ctrl+space


 On Sat, Jan 31, 2015 at 2:04 AM, Sean Owen so...@cloudera.com wrote:

 When you post a question anywhere, and say it's not working, you
 *really* need to say what that means.


 On Fri, Jan 30, 2015 at 8:20 PM, Amit Behera amit.bd...@gmail.com
 wrote:
  hi all,
 
  my sbt file is like this:
 
  name := Spark
 
  version := 1.0
 
  scalaVersion := 2.10.4
 
  libraryDependencies += org.apache.spark %% spark-core % 1.1.0
 
  libraryDependencies += net.sf.opencsv % opencsv % 2.3
 
 
  code:
 
  object SparkJob
  {
 
def pLines(lines:Iterator[String])={
  val parser=new CSVParser()
  lines.map(l={val vs=parser.parseLine(l)
(vs(0),vs(1).toInt)})
}
 
def main(args: Array[String]) {
  val conf = new SparkConf().setAppName(Spark
 Job).setMaster(local)
  val sc = new SparkContext(conf)
  val data = sc.textFile(/home/amit/testData.csv).cache()
  val result = data.mapPartitions(pLines).groupByKey
  //val list = result.filter(x= {(x._1).contains(24050881)})
 
}
 
  }
 
 
  Here groupByKey is not working . But same thing is working from
 spark-shell.
 
  Please help me
 
 
  Thanks
 
  Amit





Re: groupByKey is not working

2015-01-30 Thread Amit Behera
I am sorry Sean.

I am developing code in intelliJ Idea. so with the above dependencies I am
not able to find *groupByKey* when I am searching by ctrl+space


On Sat, Jan 31, 2015 at 2:04 AM, Sean Owen so...@cloudera.com wrote:

 When you post a question anywhere, and say it's not working, you
 *really* need to say what that means.

 On Fri, Jan 30, 2015 at 8:20 PM, Amit Behera amit.bd...@gmail.com wrote:
  hi all,
 
  my sbt file is like this:
 
  name := Spark
 
  version := 1.0
 
  scalaVersion := 2.10.4
 
  libraryDependencies += org.apache.spark %% spark-core % 1.1.0
 
  libraryDependencies += net.sf.opencsv % opencsv % 2.3
 
 
  code:
 
  object SparkJob
  {
 
def pLines(lines:Iterator[String])={
  val parser=new CSVParser()
  lines.map(l={val vs=parser.parseLine(l)
(vs(0),vs(1).toInt)})
}
 
def main(args: Array[String]) {
  val conf = new SparkConf().setAppName(Spark Job).setMaster(local)
  val sc = new SparkContext(conf)
  val data = sc.textFile(/home/amit/testData.csv).cache()
  val result = data.mapPartitions(pLines).groupByKey
  //val list = result.filter(x= {(x._1).contains(24050881)})
 
}
 
  }
 
 
  Here groupByKey is not working . But same thing is working from
 spark-shell.
 
  Please help me
 
 
  Thanks
 
  Amit



Re: groupByKey is not working

2015-01-30 Thread Charles Feduke
You'll still need to:

import org.apache.spark.SparkContext._

Importing org.apache.spark._ does _not_ recurse into sub-objects or
sub-packages, it only brings in whatever is at the level of the package or
object imported.

SparkContext._ has some implicits, one of them for adding groupByKey to an
RDD[_] IIRC.

On Fri Jan 30 2015 at 3:48:22 PM Stephen Boesch java...@gmail.com wrote:

 Amit - IJ will not find it until you add the import as Sean mentioned.  It
 includes implicits that intellij will not know about otherwise.

 2015-01-30 12:44 GMT-08:00 Amit Behera amit.bd...@gmail.com:

 I am sorry Sean.

 I am developing code in intelliJ Idea. so with the above dependencies I
 am not able to find *groupByKey* when I am searching by ctrl+space


 On Sat, Jan 31, 2015 at 2:04 AM, Sean Owen so...@cloudera.com wrote:

 When you post a question anywhere, and say it's not working, you
 *really* need to say what that means.


 On Fri, Jan 30, 2015 at 8:20 PM, Amit Behera amit.bd...@gmail.com
 wrote:
  hi all,
 
  my sbt file is like this:
 
  name := Spark
 
  version := 1.0
 
  scalaVersion := 2.10.4
 
  libraryDependencies += org.apache.spark %% spark-core % 1.1.0
 
  libraryDependencies += net.sf.opencsv % opencsv % 2.3
 
 
  code:
 
  object SparkJob
  {
 
def pLines(lines:Iterator[String])={
  val parser=new CSVParser()
  lines.map(l={val vs=parser.parseLine(l)
(vs(0),vs(1).toInt)})
}
 
def main(args: Array[String]) {
  val conf = new SparkConf().setAppName(Spark
 Job).setMaster(local)
  val sc = new SparkContext(conf)
  val data = sc.textFile(/home/amit/testData.csv).cache()
  val result = data.mapPartitions(pLines).groupByKey
  //val list = result.filter(x= {(x._1).contains(24050881)})
 
}
 
  }
 
 
  Here groupByKey is not working . But same thing is working from
 spark-shell.
 
  Please help me
 
 
  Thanks
 
  Amit





Re: groupByKey is not working

2015-01-30 Thread Charles Feduke
Define not working. Not compiling? If so you need:

import org.apache.spark.SparkContext._


On Fri Jan 30 2015 at 3:21:45 PM Amit Behera amit.bd...@gmail.com wrote:

 hi all,

 my sbt file is like this:

 name := Spark

 version := 1.0

 scalaVersion := 2.10.4

 libraryDependencies += org.apache.spark %% spark-core % 1.1.0

 libraryDependencies += net.sf.opencsv % opencsv % 2.3


 *code:*

 object SparkJob
 {

   def pLines(lines:Iterator[String])={
 val parser=new CSVParser()
 lines.map(l={val vs=parser.parseLine(l)
   (vs(0),vs(1).toInt)})
   }

   def main(args: Array[String]) {
 val conf = new SparkConf().setAppName(Spark Job).setMaster(local)
 val sc = new SparkContext(conf)
 val data = sc.textFile(/home/amit/testData.csv).cache()
 val result = data.mapPartitions(pLines).groupByKey
 //val list = result.filter(x= {(x._1).contains(24050881)})

   }

 }


 Here groupByKey is not working . But same thing is working from *spark-shell.*

 Please help me


 Thanks

 Amit




groupByKey is not working

2015-01-30 Thread Amit Behera
hi all,

my sbt file is like this:

name := Spark

version := 1.0

scalaVersion := 2.10.4

libraryDependencies += org.apache.spark %% spark-core % 1.1.0

libraryDependencies += net.sf.opencsv % opencsv % 2.3


*code:*

object SparkJob
{

  def pLines(lines:Iterator[String])={
val parser=new CSVParser()
lines.map(l={val vs=parser.parseLine(l)
  (vs(0),vs(1).toInt)})
  }

  def main(args: Array[String]) {
val conf = new SparkConf().setAppName(Spark Job).setMaster(local)
val sc = new SparkContext(conf)
val data = sc.textFile(/home/amit/testData.csv).cache()
val result = data.mapPartitions(pLines).groupByKey
//val list = result.filter(x= {(x._1).contains(24050881)})

  }

}


Here groupByKey is not working . But same thing is working from *spark-shell.*

Please help me


Thanks

Amit


Re: groupByKey is not working

2015-01-30 Thread Arush Kharbanda
Hi Amit,

What error does it through?

Thanks
Arush

On Sat, Jan 31, 2015 at 1:50 AM, Amit Behera amit.bd...@gmail.com wrote:

 hi all,

 my sbt file is like this:

 name := Spark

 version := 1.0

 scalaVersion := 2.10.4

 libraryDependencies += org.apache.spark %% spark-core % 1.1.0

 libraryDependencies += net.sf.opencsv % opencsv % 2.3


 *code:*

 object SparkJob
 {

   def pLines(lines:Iterator[String])={
 val parser=new CSVParser()
 lines.map(l={val vs=parser.parseLine(l)
   (vs(0),vs(1).toInt)})
   }

   def main(args: Array[String]) {
 val conf = new SparkConf().setAppName(Spark Job).setMaster(local)
 val sc = new SparkContext(conf)
 val data = sc.textFile(/home/amit/testData.csv).cache()
 val result = data.mapPartitions(pLines).groupByKey
 //val list = result.filter(x= {(x._1).contains(24050881)})

   }

 }


 Here groupByKey is not working . But same thing is working from *spark-shell.*

 Please help me


 Thanks

 Amit




-- 

[image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com

*Arush Kharbanda* || Technical Teamlead

ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


PySpark Loading Json Following by groupByKey seems broken in spark 1.1.1

2014-12-06 Thread Brad Willard
When I run a groupByKey it seems to create a single tasks after the
groupByKey that never stops executing. I'm loading a smallish json dataset
that is 4 million. This is the code I'm running.

rdd = sql_context.jsonFile(uri)
rdd = rdd.cache()

grouped = rdd.map(lambda row: (row.id, row)).groupByKey(160)

grouped.take(1)

The groupByKey stage takes a few minutes with 160 tasks which is expected.
However it then creates a single task runjob at PythonRDD.scala:300 that
never ends. I gave up after 30minutes.

http://apache-spark-user-list.1001560.n3.nabble.com/file/n20559/Screen_Shot_2014-12-05_at_6.png
 







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Loading-Json-Following-by-groupByKey-seems-broken-in-spark-1-1-1-tp20559.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Alternatives to groupByKey

2014-12-03 Thread Nathan Kronenfeld
I think it would depend on the type and amount of information you're
collecting.

If you're just trying to collect small numbers for each window, and don't
have an overwhelming number of windows, you might consider using
accumulators.  Just make one per value per time window, and for each data
point, add it to the accumulators for the time windows in which it
belongs.  We've found this approach a lot faster than anything involving a
shuffle.  This should work fine for stuff like max(), min(), and mean()

If you're collecting enough data that accumulators are impractical, I think
I would try multiple passes.  Cache your data, and for each pass, filter to
that window, and perform all your operations on the filtered RDD.  Because
of the caching, it won't be significantly slower than processing it all at
once - in fact, it will probably be a lot faster, because the shuffles are
shuffling less information.  This is similar to what you're suggesting
about partitioning your rdd, but probably simpler and easier.

That being said, your restriction 3 seems to be in contradiction to the
rest of your request - if your aggregation needs to be able to look at all
the data at once, then that seems contradictory to viewing the data through
an RDD.  Could you explain a bit more what you mean by that?

-Nathan


On Wed, Dec 3, 2014 at 4:26 PM, ameyc ambr...@gmail.com wrote:

 Hi,

 So my Spark app needs to run a sliding window through a time series dataset
 (I'm not using Spark streaming). And then run different types on
 aggregations on per window basis. Right now I'm using a groupByKey() which
 gives me Iterables for each window. There are a few concerns I have with
 this approach:

 1. groupByKey() could potentially fail for a key not fitting in the memory.
 2. I'd like to run aggregations like max(), mean() on each of the groups,
 it'd be nice to have the RDD functionality at this point instead of the
 iterables.
 3. I can't use reduceByKey() or aggregateByKey() are some of my
 aggregations
 need to have a view of the entire window.

 Only other way I could think of is partitioning my RDDs into multiple RDDs
 with each RDD representing a window. Is this a sensible approach? Or is
 there any other way of going about this?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Alternatives-to-groupByKey-tp20293.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


Re: Alternatives to groupByKey

2014-12-03 Thread Koert Kuipers
do these requirements boils down to a need for foldLeftByKey with sorting
of the values?

https://issues.apache.org/jira/browse/SPARK-3655


On Wed, Dec 3, 2014 at 6:34 PM, Xuefeng Wu ben...@gmail.com wrote:

 I have similar requirememt,take top N by key. right now I use
 groupByKey,but one key would group more than half data in some dataset.

 Yours, Xuefeng Wu 吴雪峰 敬上

 On 2014年12月4日, at 上午7:26, Nathan Kronenfeld nkronenf...@oculusinfo.com
 wrote:

 I think it would depend on the type and amount of information you're
 collecting.

 If you're just trying to collect small numbers for each window, and don't
 have an overwhelming number of windows, you might consider using
 accumulators.  Just make one per value per time window, and for each data
 point, add it to the accumulators for the time windows in which it
 belongs.  We've found this approach a lot faster than anything involving a
 shuffle.  This should work fine for stuff like max(), min(), and mean()

 If you're collecting enough data that accumulators are impractical, I
 think I would try multiple passes.  Cache your data, and for each pass,
 filter to that window, and perform all your operations on the filtered
 RDD.  Because of the caching, it won't be significantly slower than
 processing it all at once - in fact, it will probably be a lot faster,
 because the shuffles are shuffling less information.  This is similar to
 what you're suggesting about partitioning your rdd, but probably simpler
 and easier.

 That being said, your restriction 3 seems to be in contradiction to the
 rest of your request - if your aggregation needs to be able to look at all
 the data at once, then that seems contradictory to viewing the data through
 an RDD.  Could you explain a bit more what you mean by that?

 -Nathan


 On Wed, Dec 3, 2014 at 4:26 PM, ameyc ambr...@gmail.com wrote:

 Hi,

 So my Spark app needs to run a sliding window through a time series
 dataset
 (I'm not using Spark streaming). And then run different types on
 aggregations on per window basis. Right now I'm using a groupByKey() which
 gives me Iterables for each window. There are a few concerns I have with
 this approach:

 1. groupByKey() could potentially fail for a key not fitting in the
 memory.
 2. I'd like to run aggregations like max(), mean() on each of the groups,
 it'd be nice to have the RDD functionality at this point instead of the
 iterables.
 3. I can't use reduceByKey() or aggregateByKey() are some of my
 aggregations
 need to have a view of the entire window.

 Only other way I could think of is partitioning my RDDs into multiple RDDs
 with each RDD representing a window. Is this a sensible approach? Or is
 there any other way of going about this?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Alternatives-to-groupByKey-tp20293.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




 --
 Nathan Kronenfeld
 Senior Visualization Developer
 Oculus Info Inc
 2 Berkeley Street, Suite 600,
 Toronto, Ontario M5A 4J5
 Phone:  +1-416-203-3003 x 238
 Email:  nkronenf...@oculusinfo.com




Re: Alternatives to groupByKey

2014-12-03 Thread Xuefeng Wu
looks good.

I concern about the  foldLeftByKey which looks break the consistence from 
foldLeft in RDD and aggregateByKey in PairRDD


Yours, Xuefeng Wu 吴雪峰 敬上

 On 2014年12月4日, at 上午7:47, Koert Kuipers ko...@tresata.com wrote:
 
 foldLeftByKey

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to make operation like cogrop() , groupbykey() on pair RDD = [ [ ], [ ] , [ ] ]

2014-10-16 Thread Gen
Hi,

You just need add list() in the sorted function. 
For example, 
map((lambda (x,y): (x, (list(y[0]), list(y[1],
sorted(list(rdd1.cogroup(rdd2).collect(


I think you just forget the list...

PS: your post has NOT been accepted by the mailing list yet.

Best 
Gen


pm wrote
 Hi ,
 
 Thanks for reply ,
 
 
 now after doing cogroup mentioned in below,
 
 merge_rdd = map((lambda (x,y): (x, (list(y[0]), list(y[1],
 sorted((rdd1.cogroup(rdd2).collect(
 
 map((lambda (x,y): (x, (list(y[0]), list(y[1],
 sorted((merge_rdd.cogroup(rdd3).collect(
 
 
 i m getting output like  
 
 
 [((u'abc', u'0010'),
   ([(
 pyspark.resultiterable.ResultIterable at 0x4b1b4d0
 ,
  
 pyspark.resultiterable.ResultIterable at 0x4b1b550
 )],
[[(u'address, u'2017 CAN'),
  (u'address_city', u'VESTAVIA '),
 ]])),
  ((u'abc', u'0020'),
   ([(
 pyspark.resultiterable.ResultIterable at 0x4b1bd50
 ,
  
 pyspark.resultiterable.ResultIterable at 0x4b1bf10
 )],
[[(u'address', u'2017 CAN'),
  (u'address_city', u'VESTAV'),
 ]]))]
 
 How to show value for object pyspark.resultiterable.ResultIterable at
 0x4b1b4d0.
 
 I want to show data for pyspark.resultiterable.ResultIterable at
 0x4b1bd50.
 
 
 Could please tell me the way to show data for those object . I m using
 python
 
 
 
 Thanks,





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-operation-like-cogrop-groupbykey-on-pair-RDD-tp16487p16598.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to make operation like cogrop() , groupbykey() on pair RDD = [ [ ], [ ] , [ ] ]

2014-10-15 Thread Gen
What results do you want?

If your pair is like (a, b), where a is the key and b is the value, you
can try 
rdd1 = rdd1.flatMap(lambda l: l)
and then use cogroup.

Best
Gen



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-operation-like-cogrop-groupbykey-on-pair-RDD-tp16487p16489.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark groupByKey partition out of memory

2014-09-07 Thread julyfire
When a MappedRDD is handled by groupByKey transformation, tuples distributed
in different worker nodes with the same key will be collected into one
worker nodes, say, 
(K, V1), (K, V2), ..., (K, Vn) - (K, Seq(V1, V2, ..., Vn)). 

I want to know whether the value /Seq(V1, V2, ..., Vn)/ of a tuple in the
grouped RDD can reside in different nodes or have to be in one node, if I
set the number of partitions when using groupByKey. If  the value /Seq(V1,
V2, ..., Vn)/ can only reside in the memory of just one machine, out of
memory risk exists in case the size of the /Seq(V1, V2, ..., Vn)/ is larger
than the JVM memory limit of this machine. if this case happens, how should
we deal with?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-groupByKey-partition-out-of-memory-tp13669.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



  1   2   >