It seems --py-files only takes the first two arguments. Can someone please confirm?

2024-03-05 Thread Pedro, Chuck
Hi all,

I am working in Databricks. When I submit a spark job with the -py-files 
argument, it seems the first two are read in but the third is ignored.

"--py-files",
"s3://some_path/appl_src.py",
"s3://some_path/main.py",
"s3://a_different_path/common.py",

I can see the first two acknowledged in the Log4j but not the third.

24/02/28 21:41:00 INFO Utils: Fetching s3://some_path/appl_src.py to ...
24/02/28 21:41:00 INFO Utils: Fetching s3://some_path/main.py to ...

As a result, the job fails because appl_src.py is importing from common.py but 
can't find it.

I posted to both Databricks community 
here<https://community.databricks.com/t5/data-engineering/spark-submit-not-reading-one-of-my-py-files-arguments/m-p/62361#M31953>
 and Stack Overflow 
here<https://stackoverflow.com/questions/78077822/databricks-spark-submit-getting-error-with-py-files>
 but did not get a response.

I'm aware that we could use a .zip file, so I tried zipping the first two 
arguments but then got a totally different error:

"Exception in thread "main" org.apache.spark.SparkException: Failed to get main 
class in JAR with error 'null'.  Please specify one with --class."

Basically I just want the application code in one s3 path and a "common" 
utilities package in another path. Thanks for your help.



Kind regards,
Chuck Pedro



This message (including any attachments) may contain confidential, proprietary, 
privileged and/or private information. The information is intended to be for 
the use of the individual or entity designated above. If you are not the 
intended recipient of this message, please notify the sender immediately, and 
delete the message and any attachments. Any disclosure, reproduction, 
distribution or other use of this message or any attachments by an individual 
or entity other than the intended recipient is prohibited.

TRVDiscDefault::1201


Unsubscribe

2022-11-07 Thread Pedro Tuero
Unsubscribe


Re: Java : Testing RDD aggregateByKey

2021-08-23 Thread Pedro Tuero
Same here, repartition(0) throws IllegalArgument (What I would have
expected for ) , but aggregateByKey(zeroValue, 0, seqFunc, combFunc) is not
throwing any exception nor logging any error message. The only consequence
is an empty RDD.

El sáb, 21 de ago. de 2021 a la(s) 07:45, Jacek Laskowski (ja...@japila.pl)
escribió:

> Hi Pedro,
>
> > Anyway, maybe the behavior is weird, I could expect that repartition to
> zero was not allowed or at least warned instead of just discarting all the
> data .
>
> Interesting...
>
> scala> spark.version
> res3: String = 3.1.2
>
> scala> spark.range(5).repartition(0)
> java.lang.IllegalArgumentException: requirement failed: Number of
> partitions (0) must be positive.
>   at scala.Predef$.require(Predef.scala:281)
>   at
> org.apache.spark.sql.catalyst.plans.logical.Repartition.(basicLogicalOperators.scala:1032)
>   at org.apache.spark.sql.Dataset.repartition(Dataset.scala:3016)
>   ... 47 elided
>
> How are the above different from yours?
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://about.me/JacekLaskowski
> "The Internals Of" Online Books <https://books.japila.pl/>
> Follow me on https://twitter.com/jaceklaskowski
>
> <https://twitter.com/jaceklaskowski>
>
>
> On Thu, Aug 19, 2021 at 5:43 PM Pedro Tuero  wrote:
>
>> Hi, I'm sorry , the problem was really silly: In the test the number of
>> partitions were zero  (it was a division of the original number of
>> partitions of the RDD source and in the test that number was just one) and
>> that's why the test was failing.
>> Anyway, maybe the behavior is weird, I could expect that repartition to
>> zero was not allowed or at least warned instead of just discarting all the
>> data .
>>
>> Thanks for your time!
>> Regards,
>> Pedro
>>
>> El jue, 19 de ago. de 2021 a la(s) 07:42, Jacek Laskowski (
>> ja...@japila.pl) escribió:
>>
>>> Hi Pedro,
>>>
>>> No idea what might be causing it. Do you perhaps have some code to
>>> reproduce it locally?
>>>
>>> Pozdrawiam,
>>> Jacek Laskowski
>>> 
>>> https://about.me/JacekLaskowski
>>> "The Internals Of" Online Books <https://books.japila.pl/>
>>> Follow me on https://twitter.com/jaceklaskowski
>>>
>>> <https://twitter.com/jaceklaskowski>
>>>
>>>
>>> On Tue, Aug 17, 2021 at 4:14 PM Pedro Tuero 
>>> wrote:
>>>
>>>>
>>>> Context: spark-core_2.12-3.1.1
>>>> Testing with maven and eclipse.
>>>>
>>>> I'm modifying a project and a test stops working as expected.
>>>> The difference is in the parameters passed to the function
>>>> aggregateByKey of JavaPairRDD.
>>>>
>>>> JavaSparkContext is created this way:
>>>> new JavaSparkContext(new SparkConf()
>>>> .setMaster("local[1]")
>>>> .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"));
>>>> Then I construct a JavaPairRdd using sparkContext.paralellizePairs and
>>>> call a method which makes an aggregateByKey over the input JavaPairRDD  and
>>>> test that the result is the expected.
>>>>
>>>> When I use JavaPairRDD line 369 (doing .aggregateByKey(zeroValue,
>>>> combiner, merger);
>>>>  def aggregateByKey[U](zeroValue: U, seqFunc: JFunction2[U, V, U],
>>>> combFunc: JFunction2[U, U, U]):
>>>>   JavaPairRDD[K, U] = {
>>>> implicit val ctag: ClassTag[U] = fakeClassTag
>>>> fromRDD(rdd.aggregateByKey(zeroValue)(seqFunc, combFunc))
>>>>   }
>>>> The test works as expected.
>>>> But when I use: JavaPairRDD line 355 (doing .aggregateByKey(zeroValue,
>>>> *partitions*,combiner, merger);)
>>>> def aggregateByKey[U](zeroValue: U, *numPartitions: Int,* seqFunc:
>>>> JFunction2[U, V, U],
>>>>   combFunc: JFunction2[U, U, U]): JavaPairRDD[K, U] = {
>>>> implicit val ctag: ClassTag[U] = fakeClassTag
>>>> fromRDD(rdd.aggregateByKey(zeroValue, *numPartitions)*(seqFunc,
>>>> combFunc))
>>>>   }
>>>> The result is always empty. It looks like there is a problem with the
>>>> hashPartitioner created at PairRddFunctions :
>>>>  def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions:
>>>> Int)(seqOp: (U, V) => U,
>>>>   combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
>>>> aggregateByKey(zeroValue, *new HashPartitioner(numPartitions)*)(seqOp,
>>>> combOp)
>>>>   }
>>>> vs:
>>>>  def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
>>>>   combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
>>>> aggregateByKey(zeroValue, *defaultPartitioner*(self))(seqOp,
>>>> combOp)
>>>>   }
>>>> I can't debug it properly with eclipse, and error occurs when threads
>>>> are in spark code (system editor can only open file base resources).
>>>>
>>>> Does anyone know how to resolve this issue?
>>>>
>>>> Thanks in advance,
>>>> Pedro.
>>>>
>>>>
>>>>
>>>>


Re: Java : Testing RDD aggregateByKey

2021-08-19 Thread Pedro Tuero
Hi, I'm sorry , the problem was really silly: In the test the number of
partitions were zero  (it was a division of the original number of
partitions of the RDD source and in the test that number was just one) and
that's why the test was failing.
Anyway, maybe the behavior is weird, I could expect that repartition to
zero was not allowed or at least warned instead of just discarting all the
data .

Thanks for your time!
Regards,
Pedro

El jue, 19 de ago. de 2021 a la(s) 07:42, Jacek Laskowski (ja...@japila.pl)
escribió:

> Hi Pedro,
>
> No idea what might be causing it. Do you perhaps have some code to
> reproduce it locally?
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://about.me/JacekLaskowski
> "The Internals Of" Online Books <https://books.japila.pl/>
> Follow me on https://twitter.com/jaceklaskowski
>
> <https://twitter.com/jaceklaskowski>
>
>
> On Tue, Aug 17, 2021 at 4:14 PM Pedro Tuero  wrote:
>
>>
>> Context: spark-core_2.12-3.1.1
>> Testing with maven and eclipse.
>>
>> I'm modifying a project and a test stops working as expected.
>> The difference is in the parameters passed to the function aggregateByKey
>> of JavaPairRDD.
>>
>> JavaSparkContext is created this way:
>> new JavaSparkContext(new SparkConf()
>> .setMaster("local[1]")
>> .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"));
>> Then I construct a JavaPairRdd using sparkContext.paralellizePairs and
>> call a method which makes an aggregateByKey over the input JavaPairRDD  and
>> test that the result is the expected.
>>
>> When I use JavaPairRDD line 369 (doing .aggregateByKey(zeroValue,
>> combiner, merger);
>>  def aggregateByKey[U](zeroValue: U, seqFunc: JFunction2[U, V, U],
>> combFunc: JFunction2[U, U, U]):
>>   JavaPairRDD[K, U] = {
>> implicit val ctag: ClassTag[U] = fakeClassTag
>> fromRDD(rdd.aggregateByKey(zeroValue)(seqFunc, combFunc))
>>   }
>> The test works as expected.
>> But when I use: JavaPairRDD line 355 (doing .aggregateByKey(zeroValue,
>> *partitions*,combiner, merger);)
>> def aggregateByKey[U](zeroValue: U, *numPartitions: Int,* seqFunc:
>> JFunction2[U, V, U],
>>   combFunc: JFunction2[U, U, U]): JavaPairRDD[K, U] = {
>> implicit val ctag: ClassTag[U] = fakeClassTag
>> fromRDD(rdd.aggregateByKey(zeroValue, *numPartitions)*(seqFunc,
>> combFunc))
>>   }
>> The result is always empty. It looks like there is a problem with the
>> hashPartitioner created at PairRddFunctions :
>>  def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions:
>> Int)(seqOp: (U, V) => U,
>>   combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
>> aggregateByKey(zeroValue, *new HashPartitioner(numPartitions)*)(seqOp,
>> combOp)
>>   }
>> vs:
>>  def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
>>   combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
>> aggregateByKey(zeroValue, *defaultPartitioner*(self))(seqOp, combOp)
>>   }
>> I can't debug it properly with eclipse, and error occurs when threads are
>> in spark code (system editor can only open file base resources).
>>
>> Does anyone know how to resolve this issue?
>>
>> Thanks in advance,
>> Pedro.
>>
>>
>>
>>


Java : Testing RDD aggregateByKey

2021-08-17 Thread Pedro Tuero
Context: spark-core_2.12-3.1.1
Testing with maven and eclipse.

I'm modifying a project and a test stops working as expected.
The difference is in the parameters passed to the function aggregateByKey
of JavaPairRDD.

JavaSparkContext is created this way:
new JavaSparkContext(new SparkConf()
.setMaster("local[1]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"));
Then I construct a JavaPairRdd using sparkContext.paralellizePairs and call
a method which makes an aggregateByKey over the input JavaPairRDD  and test
that the result is the expected.

When I use JavaPairRDD line 369 (doing .aggregateByKey(zeroValue, combiner,
merger);
 def aggregateByKey[U](zeroValue: U, seqFunc: JFunction2[U, V, U],
combFunc: JFunction2[U, U, U]):
  JavaPairRDD[K, U] = {
implicit val ctag: ClassTag[U] = fakeClassTag
fromRDD(rdd.aggregateByKey(zeroValue)(seqFunc, combFunc))
  }
The test works as expected.
But when I use: JavaPairRDD line 355 (doing .aggregateByKey(zeroValue,
*partitions*,combiner, merger);)
def aggregateByKey[U](zeroValue: U, *numPartitions: Int,* seqFunc:
JFunction2[U, V, U],
  combFunc: JFunction2[U, U, U]): JavaPairRDD[K, U] = {
implicit val ctag: ClassTag[U] = fakeClassTag
fromRDD(rdd.aggregateByKey(zeroValue, *numPartitions)*(seqFunc,
combFunc))
  }
The result is always empty. It looks like there is a problem with the
hashPartitioner created at PairRddFunctions :
 def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp:
(U, V) => U,
  combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
aggregateByKey(zeroValue, *new HashPartitioner(numPartitions)*)(seqOp,
combOp)
  }
vs:
 def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
  combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
aggregateByKey(zeroValue, *defaultPartitioner*(self))(seqOp, combOp)
  }
I can't debug it properly with eclipse, and error occurs when threads are
in spark code (system editor can only open file base resources).

Does anyone know how to resolve this issue?

Thanks in advance,
Pedro.


Coalesce vs reduce operation parameter

2021-03-18 Thread Pedro Tuero
I was reviewing a spark java application running on aws emr.

The code was like:
RDD.reduceByKey(func).coalesce(number).saveAsTextFile()

That stage took hours to complete.
I changed to:
RDD.reduceByKey(func, number).saveAsTextFile()
And it now takes less than 2 minutes, and the final output is the same.

So, is it a bug or a feature?
Why spark doesn't treat a coalesce after a reduce like a reduce with output
partitions parameterized?

Just for understanding,
Thanks,
Pedro.


Submitting extra jars on spark applications on yarn with cluster mode

2020-11-14 Thread Pedro Cardoso
Hello,

I am submitting a spark application on spark yarn using the cluster
execution mode.
The application itself depends on a couple of jars. I can successfully
submit and run the application using spark-submit --jars option as seen
below:

spark-submit \
--name Yarn-App \
--class  \
--properties-file conf/yarn.properties \
--jars lib/,lib/,lib/ \
 > log/yarn-app.txt 2>&1


With the yarn.properties being something like:

# Spark submit config which used in conjunction with yarn cluster mode
of execution to not block spark-submit command
# for application completion.
spark.yarn.submit.waitAppCompletion=false
spark.submit.deployMode=cluster
spark.master=yarn

## General Spark Application properties
spark.driver.cores=2
spark.driver.memory=4G
spark.executor.memory=5G
spark.executor.cores=2
spark.driver.extraJavaOptions=-Xms2G
spark.driver.extraClassPath=::
spark.executor.heartbeatInterval=30s

spark.shuffle.service.enabled=true
spark.dynamicAllocation.enabled: True
spark.dynamicAllocation.minExecutors: 1
spark.dynamicAllocation.maxExecutors: 100
spark.dynamicAllocation.initialExecutors: 10
spark.kryo.referenceTracking=false
spark.kryoserializer.buffer.max=1G

spark.ui.showConsoleProgress=true
spark.yarn.am.cores=4
spark.yarn.am.memory=10G
spark.yarn.archive=
spark.yarn.historyServer.address=


However, I would like to have everyting specified in the properties file to
simplify the work of my team and not force them to specify the jars every
time.
So my question is what is the spark.property that replaces the spark-submit
*--jars* parameter such that I can specify everything in properties file?

I've tried creating a tar.gz with the contents of the archive
specified in *spark.yarn.archive
+ *the extra 3 jars that I need, upload that to HDFS and change the archive
property but it did not work.
I got class not defined exceptions on classes that come from the 3 extra
jars.

If it helps, the jars are only required for the driver not the executors.
They will simply perform spark-only operations.

Thank you and have good weekend.

--

*Pedro Cardoso*

*Research Engineer*

pedro.card...@feedzai.com


[image: Follow Feedzai on Facebook.] <https://www.facebook.com/Feedzai/>[image:
Follow Feedzai on Twitter!] <https://twitter.com/feedzai>[image: Connect
with Feedzai on LinkedIn!] <https://www.linkedin.com/company/feedzai/>


[image: Feedzai best in class aite report]
<https://feedzai.com/press-releases/aite-group-names-feedzai-market-leader/>

*The content of this email is confidential and intended for the recipient
specified in message only. It is strictly prohibited to share any part of
this message with any third party, without a written consent of the sender.
If you received this message by mistake, please reply to this message and
follow with its deletion, so that we can ensure such a mistake does not
occur in the future.*

-- 
The content of this email is confidential and 
intended for the recipient 
specified in message only. It is strictly 
prohibited to share any part of 
this message with any third party, 
without a written consent of the 
sender. If you received this message by
 mistake, please reply to this 
message and follow with its deletion, so 
that we can ensure such a mistake 
does not occur in the future.


Distribute entire columns to executors

2020-09-24 Thread Pedro Cardoso
Hello,

Is it possible in Spark to map partitions such that partitions are
column-based and not row-based?
My use-case is to compute temporal series of numerical values.
I.e: Exponential moving averages over the values of a given dataset's
column.

Suppose there is a dataset with roughly 200 columns, a high percentage of
which are numerical (> 60%) and at least one timestamp column, as shown in
the attached file.

I want to shuffle data to executors such that each executor has a smaller
dataset with only 2 columns, [Col0: Timestamp, Col: Numerical type].
Over which I can then sort the dataset by increasing timestamp and then
iterate over the rows with a custom function which receives a tuple:
{timestamp; value}.

Partitoning by column value does not make sense for me since there is a
temporal lineage of values which I must keep. On the other hand I would
like to parallelize this workload as my datasets can be quite big (> 2
billion rows). The only way I see how is to distribute the entire columns
so that each executor has 2B timestamp + numerical values rather than
2B*size of an entire row.

Is this possible in Spark? Can someone point in the right direction? A code
snippet example (not working is fine if the logic is sound) would be highly
appreciated!

Thank you for your time.
--

*Pedro Cardoso*

*Research Engineer*

pedro.card...@feedzai.com


[image: Follow Feedzai on Facebook.] <https://www.facebook.com/Feedzai/>[image:
Follow Feedzai on Twitter!] <https://twitter.com/feedzai>[image: Connect
with Feedzai on LinkedIn!] <https://www.linkedin.com/company/feedzai/>


[image: Feedzai best in class aite report]
<https://feedzai.com/press-releases/aite-group-names-feedzai-market-leader/>

*The content of this email is confidential and intended for the recipient
specified in message only. It is strictly prohibited to share any part of
this message with any third party, without a written consent of the sender.
If you received this message by mistake, please reply to this message and
follow with its deletion, so that we can ensure such a mistake does not
occur in the future.*

-- 
The content of this email is confidential and 
intended for the recipient 
specified in message only. It is strictly 
prohibited to share any part of 
this message with any third party, 
without a written consent of the 
sender. If you received this message by
 mistake, please reply to this 
message and follow with its deletion, so 
that we can ensure such a mistake 
does not occur in the future.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: Spark 2.4 partitions and tasks

2019-02-25 Thread Pedro Tuero
Good question.  What I have read about is that Spark is not a magician and
can't know how many tasks will be better for your input, so it can fail.
Spark set the default parallelism as twice the number of cores on the
cluster.
In my jobs, it seemed that using the parallelism inherited from input parts
worked well sometimes, and it was 100x the default parallelism.
When every job started to use default parallelism (apparently when
switching from emr 5.16 to 5.20), I first tried to do some repartitions but
in some cases, it was the same: The repartition job took as long as the job
I wanted to affect (or failed directly).
Doing the repartition inside some operation on Rdd pairs worked really
better (https: //
stackoverflow.com/questions/43027306/is-there-an-effective-partitioning-method-when-using-reducebykey-in-spark
).

It will be nice to have a more comprehensive look at which Rdds should need
more or less parallelism.

Regards,
Pedro.

El sáb., 23 de feb. de 2019 a la(s) 21:27, Yeikel (em...@yeikel.com)
escribió:

> I am following up on this question because I have a similar issue.
>
> When is that we need to control the parallelism manually? Skewed
> partitions?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark 2.4 partitions and tasks

2019-02-12 Thread Pedro Tuero
* It is not getPartitions() but getNumPartitions().

El mar., 12 de feb. de 2019 a la(s) 13:08, Pedro Tuero (tuerope...@gmail.com)
escribió:

> And this is happening in every job I run. It is not just one case. If I
> add a forced repartitions it works fine, even better than before. But I run
> the same code for different inputs so the number to make repartitions must
> be related to the input.
>
>
> El mar., 12 de feb. de 2019 a la(s) 11:22, Pedro Tuero (
> tuerope...@gmail.com) escribió:
>
>> Hi Jacek.
>> I 'm not using SparkSql, I'm using RDD API directly.
>> I can confirm that the jobs and stages are the same on both executions.
>> In the environment tab of the web UI, when using spark 2.4
>> spark.default.parallelism=128 is shown while in 2.3.1 is not.
>> But in 2.3.1 should be the same, because 128 is the number of cores of
>> cluster * 2 and it didn't change in the latest version.
>>
>> In the example I gave, 5580 is the number of parts left by a previous job
>> in S3, in Hadoop sequence files. So the initial RDD has 5580 partitions.
>> While in 2.3.1, RDDs that are created with transformations from the
>> initial RDD conserve the same number of partitions, in 2.4 the number of
>> partitions reset to default.
>> So RDD1, the product of the first mapToPair, prints 5580 when
>> getPartitions() is called in 2.3.1, while prints 128 in 2.4.
>>
>> Regards,
>> Pedro
>>
>>
>> El mar., 12 de feb. de 2019 a la(s) 09:13, Jacek Laskowski (
>> ja...@japila.pl) escribió:
>>
>>> Hi,
>>>
>>> Can you show the plans with explain(extended=true) for both versions?
>>> That's where I'd start to pinpoint the issue. Perhaps the underlying
>>> execution engine change to affect keyBy? Dunno and guessing...
>>>
>>> Pozdrawiam,
>>> Jacek Laskowski
>>> 
>>> https://about.me/JacekLaskowski
>>> Mastering Spark SQL https://bit.ly/mastering-spark-sql
>>> Spark Structured Streaming https://bit.ly/spark-structured-streaming
>>> Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
>>> Follow me at https://twitter.com/jaceklaskowski
>>>
>>>
>>> On Fri, Feb 8, 2019 at 5:09 PM Pedro Tuero  wrote:
>>>
>>>> I did a repartition to 1 (hardcoded) before the keyBy and it ends
>>>> in 1.2 minutes.
>>>> The questions remain open, because I don't want to harcode paralellism.
>>>>
>>>> El vie., 8 de feb. de 2019 a la(s) 12:50, Pedro Tuero (
>>>> tuerope...@gmail.com) escribió:
>>>>
>>>>> 128 is the default parallelism defined for the cluster.
>>>>> The question now is why keyBy operation is using default parallelism
>>>>> instead of the number of partition of the RDD created by the previous step
>>>>> (5580).
>>>>> Any clues?
>>>>>
>>>>> El jue., 7 de feb. de 2019 a la(s) 15:30, Pedro Tuero (
>>>>> tuerope...@gmail.com) escribió:
>>>>>
>>>>>> Hi,
>>>>>> I am running a job in spark (using aws emr) and some stages are
>>>>>> taking a lot more using spark  2.4 instead of Spark 2.3.1:
>>>>>>
>>>>>> Spark 2.4:
>>>>>> [image: image.png]
>>>>>>
>>>>>> Spark 2.3.1:
>>>>>> [image: image.png]
>>>>>>
>>>>>> With Spark 2.4, the keyBy operation take more than 10X what it took
>>>>>> with Spark 2.3.1
>>>>>> It seems to be related to the number of tasks / partitions.
>>>>>>
>>>>>> Questions:
>>>>>> - Is it not supposed that the number of task of a job is related to
>>>>>> number of parts of the RDD left by the previous job? Did that change in
>>>>>> version 2.4??
>>>>>> - Which tools/ configuration may I try, to reduce this aberrant
>>>>>> downgrade of performance??
>>>>>>
>>>>>> Thanks.
>>>>>> Pedro.
>>>>>>
>>>>>


Re: Spark 2.4 partitions and tasks

2019-02-12 Thread Pedro Tuero
And this is happening in every job I run. It is not just one case. If I add
a forced repartitions it works fine, even better than before. But I run the
same code for different inputs so the number to make repartitions must be
related to the input.


El mar., 12 de feb. de 2019 a la(s) 11:22, Pedro Tuero (tuerope...@gmail.com)
escribió:

> Hi Jacek.
> I 'm not using SparkSql, I'm using RDD API directly.
> I can confirm that the jobs and stages are the same on both executions.
> In the environment tab of the web UI, when using spark 2.4
> spark.default.parallelism=128 is shown while in 2.3.1 is not.
> But in 2.3.1 should be the same, because 128 is the number of cores of
> cluster * 2 and it didn't change in the latest version.
>
> In the example I gave, 5580 is the number of parts left by a previous job
> in S3, in Hadoop sequence files. So the initial RDD has 5580 partitions.
> While in 2.3.1, RDDs that are created with transformations from the
> initial RDD conserve the same number of partitions, in 2.4 the number of
> partitions reset to default.
> So RDD1, the product of the first mapToPair, prints 5580 when
> getPartitions() is called in 2.3.1, while prints 128 in 2.4.
>
> Regards,
> Pedro
>
>
> El mar., 12 de feb. de 2019 a la(s) 09:13, Jacek Laskowski (
> ja...@japila.pl) escribió:
>
>> Hi,
>>
>> Can you show the plans with explain(extended=true) for both versions?
>> That's where I'd start to pinpoint the issue. Perhaps the underlying
>> execution engine change to affect keyBy? Dunno and guessing...
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://about.me/JacekLaskowski
>> Mastering Spark SQL https://bit.ly/mastering-spark-sql
>> Spark Structured Streaming https://bit.ly/spark-structured-streaming
>> Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
>> Follow me at https://twitter.com/jaceklaskowski
>>
>>
>> On Fri, Feb 8, 2019 at 5:09 PM Pedro Tuero  wrote:
>>
>>> I did a repartition to 1 (hardcoded) before the keyBy and it ends in
>>> 1.2 minutes.
>>> The questions remain open, because I don't want to harcode paralellism.
>>>
>>> El vie., 8 de feb. de 2019 a la(s) 12:50, Pedro Tuero (
>>> tuerope...@gmail.com) escribió:
>>>
>>>> 128 is the default parallelism defined for the cluster.
>>>> The question now is why keyBy operation is using default parallelism
>>>> instead of the number of partition of the RDD created by the previous step
>>>> (5580).
>>>> Any clues?
>>>>
>>>> El jue., 7 de feb. de 2019 a la(s) 15:30, Pedro Tuero (
>>>> tuerope...@gmail.com) escribió:
>>>>
>>>>> Hi,
>>>>> I am running a job in spark (using aws emr) and some stages are taking
>>>>> a lot more using spark  2.4 instead of Spark 2.3.1:
>>>>>
>>>>> Spark 2.4:
>>>>> [image: image.png]
>>>>>
>>>>> Spark 2.3.1:
>>>>> [image: image.png]
>>>>>
>>>>> With Spark 2.4, the keyBy operation take more than 10X what it took
>>>>> with Spark 2.3.1
>>>>> It seems to be related to the number of tasks / partitions.
>>>>>
>>>>> Questions:
>>>>> - Is it not supposed that the number of task of a job is related to
>>>>> number of parts of the RDD left by the previous job? Did that change in
>>>>> version 2.4??
>>>>> - Which tools/ configuration may I try, to reduce this aberrant
>>>>> downgrade of performance??
>>>>>
>>>>> Thanks.
>>>>> Pedro.
>>>>>
>>>>


Re: Spark 2.4 partitions and tasks

2019-02-12 Thread Pedro Tuero
Hi Jacek.
I 'm not using SparkSql, I'm using RDD API directly.
I can confirm that the jobs and stages are the same on both executions.
In the environment tab of the web UI, when using spark 2.4
spark.default.parallelism=128 is shown while in 2.3.1 is not.
But in 2.3.1 should be the same, because 128 is the number of cores of
cluster * 2 and it didn't change in the latest version.

In the example I gave, 5580 is the number of parts left by a previous job
in S3, in Hadoop sequence files. So the initial RDD has 5580 partitions.
While in 2.3.1, RDDs that are created with transformations from the initial
RDD conserve the same number of partitions, in 2.4 the number of partitions
reset to default.
So RDD1, the product of the first mapToPair, prints 5580 when
getPartitions() is called in 2.3.1, while prints 128 in 2.4.

Regards,
Pedro


El mar., 12 de feb. de 2019 a la(s) 09:13, Jacek Laskowski (ja...@japila.pl)
escribió:

> Hi,
>
> Can you show the plans with explain(extended=true) for both versions?
> That's where I'd start to pinpoint the issue. Perhaps the underlying
> execution engine change to affect keyBy? Dunno and guessing...
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://about.me/JacekLaskowski
> Mastering Spark SQL https://bit.ly/mastering-spark-sql
> Spark Structured Streaming https://bit.ly/spark-structured-streaming
> Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Fri, Feb 8, 2019 at 5:09 PM Pedro Tuero  wrote:
>
>> I did a repartition to 1 (hardcoded) before the keyBy and it ends in
>> 1.2 minutes.
>> The questions remain open, because I don't want to harcode paralellism.
>>
>> El vie., 8 de feb. de 2019 a la(s) 12:50, Pedro Tuero (
>> tuerope...@gmail.com) escribió:
>>
>>> 128 is the default parallelism defined for the cluster.
>>> The question now is why keyBy operation is using default parallelism
>>> instead of the number of partition of the RDD created by the previous step
>>> (5580).
>>> Any clues?
>>>
>>> El jue., 7 de feb. de 2019 a la(s) 15:30, Pedro Tuero (
>>> tuerope...@gmail.com) escribió:
>>>
>>>> Hi,
>>>> I am running a job in spark (using aws emr) and some stages are taking
>>>> a lot more using spark  2.4 instead of Spark 2.3.1:
>>>>
>>>> Spark 2.4:
>>>> [image: image.png]
>>>>
>>>> Spark 2.3.1:
>>>> [image: image.png]
>>>>
>>>> With Spark 2.4, the keyBy operation take more than 10X what it took
>>>> with Spark 2.3.1
>>>> It seems to be related to the number of tasks / partitions.
>>>>
>>>> Questions:
>>>> - Is it not supposed that the number of task of a job is related to
>>>> number of parts of the RDD left by the previous job? Did that change in
>>>> version 2.4??
>>>> - Which tools/ configuration may I try, to reduce this aberrant
>>>> downgrade of performance??
>>>>
>>>> Thanks.
>>>> Pedro.
>>>>
>>>


Re: Spark 2.4 partitions and tasks

2019-02-08 Thread Pedro Tuero
I did a repartition to 1 (hardcoded) before the keyBy and it ends in
1.2 minutes.
The questions remain open, because I don't want to harcode paralellism.

El vie., 8 de feb. de 2019 a la(s) 12:50, Pedro Tuero (tuerope...@gmail.com)
escribió:

> 128 is the default parallelism defined for the cluster.
> The question now is why keyBy operation is using default parallelism
> instead of the number of partition of the RDD created by the previous step
> (5580).
> Any clues?
>
> El jue., 7 de feb. de 2019 a la(s) 15:30, Pedro Tuero (
> tuerope...@gmail.com) escribió:
>
>> Hi,
>> I am running a job in spark (using aws emr) and some stages are taking a
>> lot more using spark  2.4 instead of Spark 2.3.1:
>>
>> Spark 2.4:
>> [image: image.png]
>>
>> Spark 2.3.1:
>> [image: image.png]
>>
>> With Spark 2.4, the keyBy operation take more than 10X what it took with
>> Spark 2.3.1
>> It seems to be related to the number of tasks / partitions.
>>
>> Questions:
>> - Is it not supposed that the number of task of a job is related to
>> number of parts of the RDD left by the previous job? Did that change in
>> version 2.4??
>> - Which tools/ configuration may I try, to reduce this aberrant downgrade
>> of performance??
>>
>> Thanks.
>> Pedro.
>>
>


Re: Spark 2.4 partitions and tasks

2019-02-08 Thread Pedro Tuero
128 is the default parallelism defined for the cluster.
The question now is why keyBy operation is using default parallelism
instead of the number of partition of the RDD created by the previous step
(5580).
Any clues?

El jue., 7 de feb. de 2019 a la(s) 15:30, Pedro Tuero (tuerope...@gmail.com)
escribió:

> Hi,
> I am running a job in spark (using aws emr) and some stages are taking a
> lot more using spark  2.4 instead of Spark 2.3.1:
>
> Spark 2.4:
> [image: image.png]
>
> Spark 2.3.1:
> [image: image.png]
>
> With Spark 2.4, the keyBy operation take more than 10X what it took with
> Spark 2.3.1
> It seems to be related to the number of tasks / partitions.
>
> Questions:
> - Is it not supposed that the number of task of a job is related to number
> of parts of the RDD left by the previous job? Did that change in version
> 2.4??
> - Which tools/ configuration may I try, to reduce this aberrant downgrade
> of performance??
>
> Thanks.
> Pedro.
>


Re: Aws

2019-02-08 Thread Pedro Tuero
Hi Noritaka,

I start clusters from Java API.
Clusters running on 5.16 have not manual configurations in the Emr console
Configuration tab, so I assume the value of this property should be the
default on 5.16.
I enabled maximize resource allocation because otherwise, the number of
cores automatically assigned (without assigning spark.executor.cores
manually) was always one per executor.

I already use the same configurations. I used the same scripts and
configuration files for running the same job with same input data with the
same configuration, only changing the binaries with my own code which
include launching the clusters using emr 5.20 release label.

Anyway, setting maximize resource allocation seems to have helped with the
cores distribution enough.
Some jobs take even less than before.
Now I'm stuck analyzing a case where the number of tasks created seems to
be the problem. I have posted in this forum another thread about that
recently.

Regards,
Pedro


El jue., 7 de feb. de 2019 a la(s) 21:37, Noritaka Sekiyama (
moomind...@gmail.com) escribió:

> Hi Pedro,
>
> It seems that you disabled maximize resource allocation in 5.16, but
> enabled in 5.20.
> This config can be different based on how you start EMR cluster (via quick
> wizard, advanced wizard in console, or CLI/API).
> You can see that in EMR console Configuration tab.
>
> Please compare spark properties (especially spark.executor.cores,
> spark.executor.memory, spark.dynamicAllocation.enabled, etc.)  between
> your two Spark cluster with different version of EMR.
> You can see them from Spark web UI’s environment tab or log files.
> Then please try with the same properties against the same dataset with the
> same deployment mode (cluster or client).
>
> Even in EMR, you can configure num of cores and memory of driver/executors
> in config files, arguments in spark-submit, and inside Spark app if you
> need.
>
>
> Warm regards,
> Nori
>
> 2019年2月8日(金) 8:16 Hiroyuki Nagata :
>
>> Hi,
>> thank you Pedro
>>
>> I tested maximizeResourceAllocation option. When it's enabled, it seems
>> Spark utilized their cores fully. However the performance is not so
>> different from default setting.
>>
>> I consider to use s3-distcp for uploading files. And, I think
>> table(dataframe) caching is also effectiveness.
>>
>> Regards,
>> Hiroyuki
>>
>> 2019年2月2日(土) 1:12 Pedro Tuero :
>>
>>> Hi Hiroyuki, thanks for the answer.
>>>
>>> I found a solution for the cores per executor configuration:
>>> I set this configuration to true:
>>>
>>> https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html#emr-spark-maximizeresourceallocation
>>> Probably it was true by default at version 5.16, but I didn't find when
>>> it has changed.
>>> In the same link, it says that dynamic allocation is true by default. I
>>> thought it would do the trick but reading again I think it is related to
>>> the number of executors rather than the number of cores.
>>>
>>> But the jobs are still taking more than before.
>>> Watching application history,  I see these differences:
>>> For the same job, the same kind of instances types, default (aws
>>> managed) configuration for executors, cores, and memory:
>>> Instances:
>>> 6 r5.xlarge :  4 vCpu , 32gb of mem. (So there is 24 cores: 6 instances
>>> * 4 cores).
>>>
>>> With 5.16:
>>> - 24 executors  (4 in each instance, including the one who also had the
>>> driver).
>>> - 4 cores each.
>>> - 2.7  * 2 (Storage + on-heap storage) memory each.
>>> - 1 executor per core, but at the same time  4 cores per executor (?).
>>> - Total Mem in executors per Instance : 21.6 (2.7 * 2 * 4)
>>> - Total Elapsed Time: 6 minutes
>>> With 5.20:
>>> - 5 executors (1 in each instance, 0 in the instance with the driver).
>>> - 4 cores each.
>>> - 11.9  * 2 (Storage + on-heap storage) memory each.
>>> - Total Mem  in executors per Instance : 23.8 (11.9 * 2 * 1)
>>> - Total Elapsed Time: 8 minutes
>>>
>>>
>>> I don't understand the configuration of 5.16, but it works better.
>>> It seems that in 5.20, a full instance is wasted with the driver only,
>>> while it could also contain an executor.
>>>
>>>
>>> Regards,
>>> Pedro.
>>>
>>>
>>>
>>> l jue., 31 de ene. de 2019 20:16, Hiroyuki Nagata 
>>> escribió:
>>>
>>>> Hi, Pedro
>>>>
>>>>
>>>> I also start using AWS EMR, with Spark 2.4.0. I'm seeking methods 

Spark 2.4 partitions and tasks

2019-02-07 Thread Pedro Tuero
Hi,
I am running a job in spark (using aws emr) and some stages are taking a
lot more using spark  2.4 instead of Spark 2.3.1:

Spark 2.4:
[image: image.png]

Spark 2.3.1:
[image: image.png]

With Spark 2.4, the keyBy operation take more than 10X what it took with
Spark 2.3.1
It seems to be related to the number of tasks / partitions.

Questions:
- Is it not supposed that the number of task of a job is related to number
of parts of the RDD left by the previous job? Did that change in version
2.4??
- Which tools/ configuration may I try, to reduce this aberrant downgrade
of performance??

Thanks.
Pedro.


Re: Aws

2019-02-01 Thread Pedro Tuero
Hi Hiroyuki, thanks for the answer.

I found a solution for the cores per executor configuration:
I set this configuration to true:
https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html#emr-spark-maximizeresourceallocation
Probably it was true by default at version 5.16, but I didn't find when it
has changed.
In the same link, it says that dynamic allocation is true by default. I
thought it would do the trick but reading again I think it is related to
the number of executors rather than the number of cores.

But the jobs are still taking more than before.
Watching application history,  I see these differences:
For the same job, the same kind of instances types, default (aws managed)
configuration for executors, cores, and memory:
Instances:
6 r5.xlarge :  4 vCpu , 32gb of mem. (So there is 24 cores: 6 instances * 4
cores).

With 5.16:
- 24 executors  (4 in each instance, including the one who also had the
driver).
- 4 cores each.
- 2.7  * 2 (Storage + on-heap storage) memory each.
- 1 executor per core, but at the same time  4 cores per executor (?).
- Total Mem in executors per Instance : 21.6 (2.7 * 2 * 4)
- Total Elapsed Time: 6 minutes
With 5.20:
- 5 executors (1 in each instance, 0 in the instance with the driver).
- 4 cores each.
- 11.9  * 2 (Storage + on-heap storage) memory each.
- Total Mem  in executors per Instance : 23.8 (11.9 * 2 * 1)
- Total Elapsed Time: 8 minutes


I don't understand the configuration of 5.16, but it works better.
It seems that in 5.20, a full instance is wasted with the driver only,
while it could also contain an executor.


Regards,
Pedro.



l jue., 31 de ene. de 2019 20:16, Hiroyuki Nagata 
escribió:

> Hi, Pedro
>
>
> I also start using AWS EMR, with Spark 2.4.0. I'm seeking methods for
> performance tuning.
>
> Do you configure dynamic allocation ?
>
> FYI:
>
> https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation
>
> I've not tested it yet. I guess spark-submit needs to specify number of
> executors.
>
> Regards,
> Hiroyuki
>
> 2019年2月1日(金) 5:23、Pedro Tuero さん(tuerope...@gmail.com)のメッセージ:
>
>> Hi guys,
>> I use to run spark jobs in Aws emr.
>> Recently I switch from aws emr label  5.16 to 5.20 (which use Spark
>> 2.4.0).
>> I've noticed that a lot of steps are taking longer than before.
>> I think it is related to the automatic configuration of cores by executor.
>> In version 5.16, some executors toke more cores if the instance allows it.
>> Let say, if an instance had 8 cores and 40gb of ram, and ram configured
>> by executor was 10gb, then aws emr automatically assigned 2 cores by
>> executor.
>> Now in label 5.20, unless I configure the number of cores manually, only
>> one core is assigned per executor.
>>
>> I don't know if it is related to Spark 2.4.0 or if it is something
>> managed by aws...
>> Does anyone know if there is a way to automatically use more cores when
>> it is physically possible?
>>
>> Thanks,
>> Peter.
>>
>


Aws

2019-01-31 Thread Pedro Tuero
Hi guys,
I use to run spark jobs in Aws emr.
Recently I switch from aws emr label  5.16 to 5.20 (which use Spark 2.4.0).
I've noticed that a lot of steps are taking longer than before.
I think it is related to the automatic configuration of cores by executor.
In version 5.16, some executors toke more cores if the instance allows it.
Let say, if an instance had 8 cores and 40gb of ram, and ram configured by
executor was 10gb, then aws emr automatically assigned 2 cores by executor.
Now in label 5.20, unless I configure the number of cores manually, only
one core is assigned per executor.

I don't know if it is related to Spark 2.4.0 or if it is something managed
by aws...
Does anyone know if there is a way to automatically use more cores when it
is physically possible?

Thanks,
Peter.


unsubscribe

2018-01-16 Thread Jose Pedro de Santana Neto
unsubscribe


Broadcasted Object is empty in executors.

2017-05-22 Thread Pedro Tuero
Hi,
I'm using spark 2.1.0 in aws emr. Kryo Serializer.

I'm broadcasting a java class :

public class NameMatcher {

private static final Logger LOG =
LoggerFactory.getLogger(NameMatcher.class);
private final Splitter splitter;
private final SetMultimap<String, IdNamed> itemsByWord;
private final Multiset wordCount;

private NameMatcher(Builder builder) {
splitter = builder.splitter;
itemsByWord = cloneMultiMap(builder.itemsByWord);
wordCount = cloneMultiSet(builder.wordCount);
LOG.info("Matcher itemsByWorld size: {}", itemsByWord.size());
LOG.info("Matcher wordCount size: {}", wordCount.size());
}

private  Multiset cloneMultiSet(Multiset multiset) {
Multiset result = HashMultiset.create();
result.addAll(multiset);
return result;
}

private <T, U> SetMultimap<T, U> cloneMultiMap(Multimap<T, U> multimap)
{
SetMultimap<T, U> result = HashMultimap.create();
result.putAll(multimap);
return result;
}

public Set match(CharSequence text) {
LOG.info("itemsByWorld Keys {}", itemsByWord.keys());
LOG.info("QueryMatching: {}", text);
Multiset counter = HashMultiset.create();
Set result = Sets.newHashSet();
for (String word : Sets.newHashSet(splitter.split(text))) {
if (itemsByWord.containsKey(word)) {
for (IdNamed item : itemsByWord.get(word)) {
counter.add(item);
if (wordCount.count(item) == counter.count(item)) {
result.add(item);
}
}
}
}
return result;
}
}

So the logs in the constructor are ok:
LOG.info("Matcher itemsByWorld size: {}", itemsByWord.size());
prints itemsByWorld sizes and it's as expected. But when calling:
nameMatcher.getValue().match(...
in a RDD transformation, the log line in match method:
 LOG.info("itemsByWorld Keys {}", itemsByWord.keys());
Prints an empty list.

This works alright running locally  in my computer, but fail with no match
running in aws emr.
I usually broadcast objects and map with no problems.
Can anyone give me a clue about what's happening here?
Thanks you very much,
Pedro.


Kryo Exception: NegativeArraySizeException

2016-11-24 Thread Pedro Tuero
Hi, I'm trying to broadcast a map of 2.6GB but I'm getting a weird Kryo
exception.

I tried to set -XX:hashCode=0 in executor and driver, following this
copmment:
https://github.com/broadinstitute/gatk/issues/1524#issuecomment-189368808
But it didn't change anything.

Are you aware of this problem?
Is there a workaround?

Thank for yuor comments,
Pedro.

Map info:
 INFO 2016-11-24 15:29:34,230 [main] (Logging.scala:54) - Block broadcast_3
stored as values in memory (estimated size 2.6 GB, free 5.7 GB)

Error Trace:
ERROR ApplicationMaster: User class threw exception:
com.esotericsoftware.kryo.KryoException:
java.lang.NegativeArraySizeException
Serialization trace:
...
com.esotericsoftware.kryo.KryoException:
java.lang.NegativeArraySizeException
Serialization trace:
...
at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
at
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:113)
at
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:39)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
at
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:195)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:236)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:236)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1307)
$blockifyObject$2.apply(TorrentBroadcast.scala:236)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:236)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1307)
at
org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:237)
at
org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:107)
at
org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:86)
at
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1387)
at
org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:646)
at com.personal.sparkJob.main(sparkJob..java:81)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:627)
Caused by: java.lang.NegativeArraySizeException
at
com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:447)
at
com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:245)
at
com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:239)
at
com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:135)
at
com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:246)
at
com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:239)
at
com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:135)
at
com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:41)
at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:658)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:623)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
... 22 more


Broadcasting Complex Custom Objects

2016-10-17 Thread Pedro Tuero
Hi guys,

I'm  trying to do a a job with Spark, using Java.

The thing is I need to have an index of words of about 3 GB in each
machine, so I'm trying to broadcast custom objects to represent the index
and the interface with it.
I'm using java standard serialization, so I tried to implement serializable
interface in each class involved, but some objects come from libraries so I
can't go any further.

Is there another way to make it works?
Should I try with Kryo?
Is there a way to work with non-serializable objects?

I use a fat-jar, so the code is available in all workers really. I thing it
should be a way to use it instead of being serializing and deserializing
everything.

Thanks,
Pedro


Re: Guys is this some form of Spam or someone has left his auto-reply loose LOL

2016-07-28 Thread Pedro Rodriguez
Same here, but maybe this is a really urgent matter we need to contact him
about... or just make a filter

On Thu, Jul 28, 2016 at 7:59 AM, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

>
> -- Forwarded message --
> From: Geert Van Landeghem [Napoleon Games NV] <
> g.vanlandeg...@napoleongames.be>
> Date: 28 July 2016 at 14:38
> Subject: Re: Re: Is spark-1.6.1-bin-2.6.0 compatible with
> hive-1.1.0-cdh5.7.1
> To: Mich Talebzadeh <mich.talebza...@gmail.com>
>
>
> Hello,
>
> I am enjoying holidays untill the end of august, for urgent matters
> contact the BI department on 702 or 703 or send an email to
> b...@napoleongames.be.
>
> For really urgent matters contact me on my mobile phone: +32 477 75 95 33.
>
> kind regards
> Geert
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>



-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: performance problem when reading lots of small files created by spark streaming.

2016-07-27 Thread Pedro Rodriguez
There are a few blog posts that detail one possible/likely issue for
example:
http://tech.kinja.com/how-not-to-pull-from-s3-using-apache-spark-1704509219

TLDR: The hadoop libraries spark uses assumes that its input comes from a
 file system (works with HDFS) however S3 is a key value store, not a file
system. Somewhere along the line, this makes things very slow. Below I
describe their approach and a library I am working on to solve this problem.

(Much) Longer Version (with a shiny new library in development):
So far in my reading of source code, Hadoop attempts to actually read from
S3 which can be expensive particularly since it does so from a single
driver core (different from listing files, actually reading them, I can
find the source code and link it later if you would like). The concept
explained above is to instead use the AWS sdk to list files then distribute
the files names as a collection with sc.parallelize, then read them in
parallel. I found this worked, but lacking in a few ways so I started this
project: https://github.com/EntilZha/spark-s3

This takes that idea further by:
1. Rather than sc.parallelize, implement the RDD interface where each
partition is defined by the files it needs to read (haven't gotten to
DataFrames yet)
2. At the driver node, use the AWS SDK to list all the files with their
size (listing is fast), then run the Least Processing Time Algorithm to
sift the files into roughly balanced partitions by size
3. API: S3Context(sc).textFileByPrefix("bucket", "file1",
"folder2").regularRDDOperationsHere or import implicits and do
sc.s3.textFileByPrefix

At present, I am battle testing and benchmarking it at my current job and
results are promising with significant improvements to jobs dealing with
many files especially many small files and to jobs whose input is
unbalanced to start with. Jobs perform better because: 1) there isn't a
long stall at the driver when hadoop decides how to split S3 files 2) the
partitions end up nearly perfectly balanced because of LPT algorithm.

Since I hadn't intended to advertise this quite yet the documentation is
not super polished but exists here:
http://spark-s3.entilzha.io/latest/api/#io.entilzha.spark.s3.S3Context

I am completing the sonatype process for publishing artifacts on maven
central (this should be done by tomorrow so referencing
"io.entilzha:spark-s3_2.10:0.0.0" should work very soon). I would love to
hear if this library solution works, otherwise I hope the blog post above
is illuminating.

Pedro

On Wed, Jul 27, 2016 at 8:19 PM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> I have a relatively small data set however it is split into many small
> JSON files. Each file is between maybe 4K and 400K
> This is probably a very common issue for anyone using spark streaming. My
> streaming app works fine, how ever my batch application takes several hours
> to run.
>
> All I am doing is calling count(). Currently I am trying to read the files
> from s3. When I look at the app UI it looks like spark is blocked probably
> on IO? Adding additional workers and memory does not improve performance.
>
> I am able to copy the files from s3 to a worker relatively quickly. So I
> do not think s3 read time is the problem.
>
> In the past when I had similar data sets stored on HDFS I was able to use
> coalesce() to reduce the number of partition from 200K to 30. This made a
> big improvement in processing time. How ever when I read from s3 coalesce()
> does not improve performance.
>
> I tried copying the files to a normal file system and then using ‘hadoop
> fs put’ to copy the files to hdfs how ever this takes several hours and is
> no where near completion. It appears hdfs does not deal with small files
> well.
>
> I am considering copying the files from s3 to a normal file system on one
> of my workers and then concatenating the files into a few much large files,
> then using ‘hadoop fs put’ to move them to hdfs. Do you think this would
> improve the spark count() performance issue?
>
> Does anyone know of heuristics for determining the number or size of the
> concatenated files?
>
> Thanks in advance
>
> Andy
>



-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: dynamic coalesce to pick file size

2016-07-26 Thread Pedro Rodriguez
I asked something similar if you search for "Tools for Balancing Partitions
By Size" (I couldn't find link on archives). Unfortunately there doesn't
seem to be something good right now other than knowing your job statistics.
I am planning on implementing the idea I explained in the last paragraph or
so of the last email I sent in this library
https://github.com/EntilZha/spark-s3 although it could be a while to make
my way up to data frames (adds for now).

On Tue, Jul 26, 2016 at 1:02 PM, Maurin Lenglart <mau...@cuberonlabs.com>
wrote:

> Hi,
>
> I am doing a Sql query that return a Dataframe. Then I am writing the
> result of the query using “df.write”, but the result get written in a lot
> of different small files (~100 of 200 ko). So now I am doing a
> “.coalesce(2)” before the write.
>
> But the number “2” that I picked is static, is there have a way of
> dynamically picking the number depending of the file size wanted? (around
> 256mb would be perfect)
>
>
>
> I am running spark 1.6 on CDH using yarn, the files are written in parquet
> format.
>
>
>
> Thanks
>
>
>



-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: dataframe.foreach VS dataframe.collect().foreach

2016-07-26 Thread Pedro Rodriguez
:)

Just realized you didn't get your original question answered though:

scala> import sqlContext.implicits._
import sqlContext.implicits._

scala> case class Person(age: Long, name: String)
defined class Person

scala> val df = Seq(Person(24, "pedro"), Person(22, "fritz")).toDF()
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.select("age")
res2: org.apache.spark.sql.DataFrame = [age: bigint]

scala> df.select("age").collect.map(_.getLong(0))
res3: Array[Long] = Array(24, 22)

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> df.collect.flatMap {
 | case Row(age: Long, name: String) => Seq(Tuple1(age))
 | case _ => Seq()
 | }
res7: Array[(Long,)] = Array((24,), (22,))

These docs are helpful
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Row
(1.6 docs, but should be similar in 2.0)

On Tue, Jul 26, 2016 at 7:08 AM, Gourav Sengupta <gourav.sengu...@gmail.com>
wrote:

> And Pedro has made sense of a world running amok, scared, and drunken
> stupor.
>
> Regards,
> Gourav
>
> On Tue, Jul 26, 2016 at 2:01 PM, Pedro Rodriguez <ski.rodrig...@gmail.com>
> wrote:
>
>> I am not 100% as I haven't tried this out, but there is a huge difference
>> between the two. Both foreach and collect are actions irregardless of
>> whether or not the data frame is empty.
>>
>> Doing a collect will bring all the results back to the driver, possibly
>> forcing it to run out of memory. Foreach will apply your function to each
>> element of the DataFrame, but will do so across the cluster. This behavior
>> is useful for when you need to do something custom for each element
>> (perhaps save to a db for which there is no driver or something custom like
>> make an http request per element, careful here though due to overhead cost).
>>
>> In your example, I am going to assume that hrecords is something like a
>> list buffer. The reason that will be empty is that each worker will get
>> sent an empty list (its captured in the closure for foreach) and append to
>> it. The instance of the list at the driver doesn't know about what happened
>> at the workers so its empty.
>>
>> I don't know why Chanh's comment applies here since I am guessing the df
>> is not empty.
>>
>> On Tue, Jul 26, 2016 at 1:53 AM, kevin <kiss.kevin...@gmail.com> wrote:
>>
>>> thank you Chanh
>>>
>>> 2016-07-26 15:34 GMT+08:00 Chanh Le <giaosu...@gmail.com>:
>>>
>>>> Hi Ken,
>>>>
>>>> *blacklistDF -> just DataFrame *
>>>> Spark is lazy until you call something like* collect, take, write* it
>>>> will execute the hold process *like you do map or filter before you
>>>> collect*.
>>>> That mean until you call collect spark* do nothing* so you df would
>>>> not have any data -> can’t call foreach.
>>>> Call collect execute the process -> get data -> foreach is ok.
>>>>
>>>>
>>>> On Jul 26, 2016, at 2:30 PM, kevin <kiss.kevin...@gmail.com> wrote:
>>>>
>>>>  blacklistDF.collect()
>>>>
>>>>
>>>>
>>>
>>
>>
>> --
>> Pedro Rodriguez
>> PhD Student in Distributed Machine Learning | CU Boulder
>> UC Berkeley AMPLab Alumni
>>
>> ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
>> Github: github.com/EntilZha | LinkedIn:
>> https://www.linkedin.com/in/pedrorodriguezscience
>>
>>
>


-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: dataframe.foreach VS dataframe.collect().foreach

2016-07-26 Thread Pedro Rodriguez
I am not 100% as I haven't tried this out, but there is a huge difference
between the two. Both foreach and collect are actions irregardless of
whether or not the data frame is empty.

Doing a collect will bring all the results back to the driver, possibly
forcing it to run out of memory. Foreach will apply your function to each
element of the DataFrame, but will do so across the cluster. This behavior
is useful for when you need to do something custom for each element
(perhaps save to a db for which there is no driver or something custom like
make an http request per element, careful here though due to overhead cost).

In your example, I am going to assume that hrecords is something like a
list buffer. The reason that will be empty is that each worker will get
sent an empty list (its captured in the closure for foreach) and append to
it. The instance of the list at the driver doesn't know about what happened
at the workers so its empty.

I don't know why Chanh's comment applies here since I am guessing the df is
not empty.

On Tue, Jul 26, 2016 at 1:53 AM, kevin <kiss.kevin...@gmail.com> wrote:

> thank you Chanh
>
> 2016-07-26 15:34 GMT+08:00 Chanh Le <giaosu...@gmail.com>:
>
>> Hi Ken,
>>
>> *blacklistDF -> just DataFrame *
>> Spark is lazy until you call something like* collect, take, write* it
>> will execute the hold process *like you do map or filter before you
>> collect*.
>> That mean until you call collect spark* do nothing* so you df would not
>> have any data -> can’t call foreach.
>> Call collect execute the process -> get data -> foreach is ok.
>>
>>
>> On Jul 26, 2016, at 2:30 PM, kevin <kiss.kevin...@gmail.com> wrote:
>>
>>  blacklistDF.collect()
>>
>>
>>
>


-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: Spark SQL overwrite/append for partitioned tables

2016-07-25 Thread Pedro Rodriguez
Probably should have been more specific with the code we are using, which
is something like

val df = 
df.write.mode("append or overwrite
here").partitionBy("date").saveAsTable("my_table")

Unless there is something like what I described on the native API, I will
probably take the approach of having a S3 API call to wipe out that
partition before the job starts, but it would be nice to not have to
incorporate another step in the job.

Pedro

On Mon, Jul 25, 2016 at 5:23 PM, RK Aduri <rkad...@collectivei.com> wrote:

> You can have a temporary file to capture the data that you would like to
> overwrite. And swap that with existing partition that you would want to
> wipe the data away. Swapping can be done by simple rename of the partition
> and just repair the table to pick up the new partition.
>
> Am not sure if that addresses your scenario.
>
> On Jul 25, 2016, at 4:18 PM, Pedro Rodriguez <ski.rodrig...@gmail.com>
> wrote:
>
> What would be the best way to accomplish the following behavior:
>
> 1. There is a table which is partitioned by date
> 2. Spark job runs on a particular date, we would like it to wipe out all
> data for that date. This is to make the job idempotent and lets us rerun a
> job if it failed without fear of duplicated data
> 3. Preserve data for all other dates
>
> I am guessing that overwrite would not work here or if it does its not
> guaranteed to stay that way, but am not sure. If thats the case, is there a
> good/robust way to get this behavior?
>
> --
> Pedro Rodriguez
> PhD Student in Distributed Machine Learning | CU Boulder
> UC Berkeley AMPLab Alumni
>
> ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
> Github: github.com/EntilZha | LinkedIn:
> https://www.linkedin.com/in/pedrorodriguezscience
>
>
>
> Collective[i] dramatically improves sales and marketing performance using
> technology, applications and a revolutionary network designed to provide
> next generation analytics and decision-support directly to business users.
> Our goal is to maximize human potential and minimize mistakes. In most
> cases, the results are astounding. We cannot, however, stop emails from
> sometimes being sent to the wrong person. If you are not the intended
> recipient, please notify us by replying to this email's sender and deleting
> it (and any attachments) permanently from your system. If you are, please
> respect the confidentiality of this communication's contents.




-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Spark SQL overwrite/append for partitioned tables

2016-07-25 Thread Pedro Rodriguez
What would be the best way to accomplish the following behavior:

1. There is a table which is partitioned by date
2. Spark job runs on a particular date, we would like it to wipe out all
data for that date. This is to make the job idempotent and lets us rerun a
job if it failed without fear of duplicated data
3. Preserve data for all other dates

I am guessing that overwrite would not work here or if it does its not
guaranteed to stay that way, but am not sure. If thats the case, is there a
good/robust way to get this behavior?

-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: Spark 2.0

2016-07-25 Thread Pedro Rodriguez
Spark 2.0 vote for RC5 passed last Friday night so it will probably be
released early this week if I had to guess.

On Mon, Jul 25, 2016 at 12:23 PM, Bryan Jeffrey <bryan.jeff...@gmail.com>
wrote:

> All,
>
> I had three questions:
>
> (1) Is there a timeline for stable Spark 2.0 release?  I know the
> 'preview' build is out there, but was curious what the timeline was for
> full release. Jira seems to indicate that there should be a release 7/27.
>
> (2)  For 'continuous' datasets there has been a lot of discussion. One
> item that came up in tickets was the idea that 'count()' and other
> functions do not apply to continuous datasets:
> https://github.com/apache/spark/pull/12080.  In this case what is the
> intended procedure to calculate a streaming statistic based on an interval
> (e.g. count the number of records in a 2 minute window every 2 minutes)?
>
> (3) In previous releases (1.6.1) the call to DStream / RDD repartition w/
> a number of partitions set to zero silently deletes data.  I have looked in
> Jira for a similar issue, but I do not see one.  I would like to address
> this (and would likely be willing to go fix it myself).  Should I just
> create a ticket?
>
> Thank you,
>
> Bryan Jeffrey
>
>


-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: How to generate a sequential key in rdd across executors

2016-07-24 Thread Pedro Rodriguez
If you can use a dataframe then you could use rank + window function at the
expense of an extra sort. Do you have an example of zip with index not
working, that seems surprising.
On Jul 23, 2016 10:24 PM, "Andrew Ehrlich"  wrote:

> It’s hard to do in a distributed system. Maybe try generating a meaningful
> key using a timestamp + hashed unique key fields in the record?
>
> > On Jul 23, 2016, at 7:53 PM, yeshwanth kumar 
> wrote:
> >
> > Hi,
> >
> > i am doing bulk load to hbase using spark,
> > in which i need to generate a sequential key for each record,
> > the key should be sequential across all the executors.
> >
> > i tried zipwith index, didn't worked because zipwith index gives index
> per executor not across all executors.
> >
> > looking for some suggestions.
> >
> >
> > Thanks,
> > -Yeshwanth
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Choosing RDD/DataFrame/DataSet and Cluster Tuning

2016-07-23 Thread Pedro Rodriguez
Hi Jestin,

Spark is smart about how it does joins. In this case, if df2 is sufficiently 
small it will do a broadcast join. Basically, rather than shuffle df1/df2 for a 
join, it broadcasts df2 to all workers and joins locally. Looks like you may 
already have known that though based on using the 
spark.sql.autoBroadcastJoinThreshold.

Its hard to say why your job is slow without knowing more. For example, it 
could be a CPU intensive calculation or maybe you have imbalance over keys 
which would cause a straggler. Hard to know without knowing what some of the 
metrics from the Spark UI are like.

1. If you aren’t tied down by legacy code, Spark 2.0 has a nicer Dataset API 
and more improvements so I don’t see why not. Spark 2.0 RC5 vote passed last 
night so the official release will probably go out early next week
2. RDDs will make it worse. In the case of reduceByKey/groupByKey this is 
specific to RDDs, the DataFrame API doesn’t mirror that. You hear that because 
reduceByKey will run reduce locally at each node for each key, then reduce all 
those results to get the final result. groupByKey will shuffle all keys across 
the network which if you are just doing a reduce right after is wasteful. 
DataFrame’s have lots of optimizations as well
3. Shouldn’t need to explicitly call broadcast
4. Driver memory is important if your node needs to collect results back to it 
for some reason. One good example is in mllib/ml its common to collect 
parameters back to the driver to update a global model. For some algorithms 
(like LDA), the model can be quite large so it requires high driver memory.
5. Hard to know without more metrics from your job. That being said, your 
number of executor instances vs number of cores seems a bit high. I would try 5 
instances of 15 cores each or 10 of 7 cores each. You can also kick up the 
memory to use more of your cluster’s memory. Lastly, if you are running on EC2 
make sure to configure spark.local.dir to write to something that is not an EBS 
volume, preferably an attached SSD to something like an r3 machine.

—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 23, 2016 at 9:31:21 AM, Jestin Ma (jestinwith.a...@gmail.com) wrote:

Hello,
Right now I'm using DataFrames to perform a df1.groupBy(key).count() on one 
DataFrame and join with another, df2.

The first, df1, is very large (many gigabytes) compared to df2 (250 Mb).

Right now I'm running this on a cluster of 5 nodes, 16 cores each, 90 GB RAM 
each.
It is taking me about 1 hour and 40 minutes to perform the groupBy, count, and 
join, which seems very slow to me.

Currently I have set the following in my spark-defaults.conf:

spark.executor.instances                             24
spark.executor.memory                               10g
spark.executor.cores                                    3
spark.driver.memory                                     5g
spark.sql.autoBroadcastJoinThreshold        200Mb


I have a couple of questions regarding tuning for performance as a beginner.
Right now I'm running Spark 1.6.0. Would moving to Spark 2.0 DataSet (or even 
DataFrames) be better?
What if I used RDDs instead? I know that reduceByKey is better than groupByKey, 
and DataFrames don't have that method. 
I think I can do a broadcast join and have set a threshold. Do I need to set it 
above my second DataFrame size? Do I need to explicitly call broadcast(df2)?
What's the point of driver memory?
Can anyone point out something wrong with my tuning numbers, or any additional 
parameters worth checking out?

Thank you a lot!
Sincerely,
Jestin

Re: Error in collecting RDD as a Map - IOException in collectAsMap

2016-07-23 Thread Pedro Rodriguez
Have you changed spark-env.sh or spark-defaults.conf from the default? It looks 
like spark is trying to address local workers based on a network address (eg 
192.168……) instead of on localhost (localhost, 127.0.0.1, 0.0.0.0,…). 
Additionally, that network address doesn’t resolve correctly. You might also 
check /etc/hosts to make sure that you don’t have anything weird going on.

Last thing to try perhaps is that are you running Spark within a VM and/or 
Docker? If networking isn’t setup correctly on those you may also run into 
trouble.

What would be helpful is to know everything about your setup that might affect 
networking.

—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 23, 2016 at 9:10:31 AM, VG (vlin...@gmail.com) wrote:

Hi pedro,

Apologies for not adding this earlier. 

This is running on a local cluster set up as follows.
JavaSparkContext jsc = new JavaSparkContext("local[2]", "DR");

Any suggestions based on this ? 

The ports are not blocked by firewall. 

Regards,



On Sat, Jul 23, 2016 at 8:35 PM, Pedro Rodriguez <ski.rodrig...@gmail.com> 
wrote:
Make sure that you don’t have ports firewalled. You don’t really give much 
information to work from, but it looks like the master can’t access the worker 
nodes for some reason. If you give more information on the cluster, networking, 
etc, it would help.

For example, on AWS you can create a security group which allows all traffic 
to/from itself to itself. If you are using something like ufw on ubuntu then 
you probably need to know the ip addresses of the worker nodes beforehand.

—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 23, 2016 at 7:38:01 AM, VG (vlin...@gmail.com) wrote:

Please suggest if I am doing something wrong or an alternative way of doing 
this. 

I have an RDD with two values as follows 
JavaPairRDD<String, Long> rdd

When I execute   rdd..collectAsMap()
it always fails with IO exceptions.   


16/07/23 19:03:58 ERROR RetryingBlockFetcher: Exception while beginning fetch 
of 1 outstanding blocks 
java.io.IOException: Failed to connect to /192.168.1.3:58179
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179)
at 
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:96)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
at 
org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:105)
at 
org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:92)
at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:546)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:76)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:57)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:57)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1793)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:56)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.net.ConnectException: Connection timed out: no further 
information: /192.168.1.3:58179
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source)
at 
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
... 1 more
16/07/23 19:03:58 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 1 
outstanding blocks after 5000 ms






Re: Error in collecting RDD as a Map - IOException in collectAsMap

2016-07-23 Thread Pedro Rodriguez
Make sure that you don’t have ports firewalled. You don’t really give much 
information to work from, but it looks like the master can’t access the worker 
nodes for some reason. If you give more information on the cluster, networking, 
etc, it would help.

For example, on AWS you can create a security group which allows all traffic 
to/from itself to itself. If you are using something like ufw on ubuntu then 
you probably need to know the ip addresses of the worker nodes beforehand.

—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 23, 2016 at 7:38:01 AM, VG (vlin...@gmail.com) wrote:

Please suggest if I am doing something wrong or an alternative way of doing 
this. 

I have an RDD with two values as follows 
JavaPairRDD<String, Long> rdd

When I execute   rdd..collectAsMap()
it always fails with IO exceptions.   


16/07/23 19:03:58 ERROR RetryingBlockFetcher: Exception while beginning fetch 
of 1 outstanding blocks 
java.io.IOException: Failed to connect to /192.168.1.3:58179
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179)
at 
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:96)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
at 
org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:105)
at 
org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:92)
at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:546)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:76)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:57)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:57)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1793)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:56)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.net.ConnectException: Connection timed out: no further 
information: /192.168.1.3:58179
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source)
at 
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
... 1 more
16/07/23 19:03:58 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 1 
outstanding blocks after 5000 ms





Re: Dataset , RDD zipWithIndex -- How to use as a map .

2016-07-22 Thread Pedro Rodriguez
You could either do monotonically_increasing_id or use a window function
and rank. The first is a simple spark SQL function, data bricks has a
pretty helpful post for how to use window functions (in this case the whole
data set is the window).

On Fri, Jul 22, 2016 at 12:20 PM, Marco Mistroni <mmistr...@gmail.com>
wrote:

> Hi
> So u u have a data frame, then use zipwindex and create a tuple 
> I m not sure if df API has something useful for zip w index.
> But u can
> - get a data frame
> - convert it to rdd (there's a tordd )
> - do a zip with index
>
> That will give u a rdd with 3 fields...
> I don't think you can update df columns
> Hth
> On 22 Jul 2016 5:19 pm, "VG" <vlin...@gmail.com> wrote:
>
> >
>
> > Hi All,
> >
> > Any suggestions for this
> >
> > Regards,
> > VG
> >
> > On Fri, Jul 22, 2016 at 6:40 PM, VG <vlin...@gmail.com> wrote:
>
> >>
>
> >> Hi All,
> >>
> >> I am really confused how to proceed further. Please help.
> >>
> >> I have a dataset created as follows:
> >> Dataset b = sqlContext.sql("SELECT bid, name FROM business");
> >>
> >> Now I need to map each name with a unique index and I did the following
> >> JavaPairRDD<Row, Long> indexedBId = business.javaRDD()
> >>
>  .zipWithIndex();
> >>
> >> In later part of the code I need to change a datastructure and update
> name with index value generated above .
> >> I am unable to figure out how to do a look up here..
> >>
> >> Please suggest /.
> >>
> >> If there is a better way to do this please suggest that.
> >>
> >> Regards
> >> VG
> >>
> >
>



-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: How to get the number of partitions for a SparkDataFrame in Spark 2.0-preview?

2016-07-22 Thread Pedro Rodriguez
I haven't used SparkR/R before, only Scala/Python APIs so I don't know for
sure.

I am guessing if things are in a DataFrame they were read either from some
disk source (S3/HDFS/file/etc) or they were created from parallelize. If
you are using the first, Spark will for the most part choose a reasonable
number of partitions while for parallelize I think it depends on what your
min parallelism is set to.

In my brief google it looks like dapply is an analogue of mapPartitions.
Usually the reason to use this is if your map operation has some expensive
initialization function. For example, you need to open a connection to a
database so its better to re-use that connection for one partition's
elements than create it for each element.

What are you trying to accomplish with dapply?

On Fri, Jul 22, 2016 at 8:05 PM, Neil Chang <iam...@gmail.com> wrote:

> Thanks Pedro,
>   so to use sparkR dapply on SparkDataFrame, don't we need partition the
> DataFrame first? the example in doc doesn't seem to do this.
> Without knowing how it partitioned, how can one write the function to
> process each partition?
>
> Neil
>
> On Fri, Jul 22, 2016 at 5:56 PM, Pedro Rodriguez <ski.rodrig...@gmail.com>
> wrote:
>
>> This should work and I don't think triggers any actions:
>>
>> df.rdd.partitions.length
>>
>> On Fri, Jul 22, 2016 at 2:20 PM, Neil Chang <iam...@gmail.com> wrote:
>>
>>> Seems no function does this in Spark 2.0 preview?
>>>
>>
>>
>>
>> --
>> Pedro Rodriguez
>> PhD Student in Distributed Machine Learning | CU Boulder
>> UC Berkeley AMPLab Alumni
>>
>> ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
>> Github: github.com/EntilZha | LinkedIn:
>> https://www.linkedin.com/in/pedrorodriguezscience
>>
>>
>


-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: spark and plot data

2016-07-22 Thread Pedro Rodriguez
As of the most recent 0.6.0 release its partially alleviated, but still not
great (compared to something like Jupyter).

They can be "downloaded" but its only really meaningful in importing it
back to Zeppelin. It would be great if they could be exported as HTML or
PDF, but at present they can't be. I know they have some sort of git
support, but it was never clear to me how it was suppose to be used since
the docs are sparse on that. So far what works best for us is S3 storage,
but you don't get the benefit of Github using that (history + commits etc).

There are a couple other notebooks floating around, Apache Toree seems the
most promising for portability since its based on jupyter
https://github.com/apache/incubator-toree

On Fri, Jul 22, 2016 at 3:53 PM, Gourav Sengupta <gourav.sengu...@gmail.com>
wrote:

> The biggest stumbling block to using Zeppelin has been that we cannot
> download the notebooks, cannot export them and certainly cannot sync them
> back to Github, without mind numbing and sometimes irritating hacks. Have
> those issues been resolved?
>
>
> Regards,
> Gourav
>
>
> On Fri, Jul 22, 2016 at 2:22 PM, Pedro Rodriguez <ski.rodrig...@gmail.com>
> wrote:
>
>> Zeppelin works great. The other thing that we have done in notebooks
>> (like Zeppelin or Databricks) which support multiple types of spark session
>> is register Spark SQL temp tables in our scala code then escape hatch to
>> python for plotting with seaborn/matplotlib when the built in plots are
>> insufficient.
>>
>> —
>> Pedro Rodriguez
>> PhD Student in Large-Scale Machine Learning | CU Boulder
>> Systems Oriented Data Scientist
>> UC Berkeley AMPLab Alumni
>>
>> pedrorodriguez.io | 909-353-4423
>> github.com/EntilZha | LinkedIn
>> <https://www.linkedin.com/in/pedrorodriguezscience>
>>
>> On July 22, 2016 at 3:04:48 AM, Marco Colombo (
>> ing.marco.colo...@gmail.com) wrote:
>>
>> Take a look at zeppelin
>>
>> http://zeppelin.apache.org
>>
>> Il giovedì 21 luglio 2016, Andy Davidson <a...@santacruzintegration.com>
>> ha scritto:
>>
>>> Hi Pseudo
>>>
>>> Plotting, graphing, data visualization, report generation are common
>>> needs in scientific and enterprise computing.
>>>
>>> Can you tell me more about your use case? What is it about the current
>>> process / workflow do you think could be improved by pushing plotting (I
>>> assume you mean plotting and graphing) into spark.
>>>
>>>
>>> In my personal work all the graphing is done in the driver on summary
>>> stats calculated using spark. So for me using standard python libs has not
>>> been a problem.
>>>
>>> Andy
>>>
>>> From: pseudo oduesp <pseudo20...@gmail.com>
>>> Date: Thursday, July 21, 2016 at 8:30 AM
>>> To: "user @spark" <user@spark.apache.org>
>>> Subject: spark and plot data
>>>
>>> Hi ,
>>> i know spark  it s engine  to compute large data set but for me i work
>>> with pyspark and it s very wonderful machine
>>>
>>> my question  we  don't have tools for ploting data each time we have to
>>> switch and go back to python for using plot.
>>> but when you have large result scatter plot or roc curve  you cant use
>>> collect to take data .
>>>
>>> somone have propostion for plot .
>>>
>>> thanks
>>>
>>>
>>
>> --
>> Ing. Marco Colombo
>>
>>
>


-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: How to search on a Dataset / RDD <Row, Long >

2016-07-22 Thread Pedro Rodriguez
You might look at monotonically_increasing_id() here
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions
instead of converting it to an RDD. since you pay a performance penalty for
that.

If you want to change the name you can do something like this (in scala
since I am not familiar with java API, but it should be similar in java)

val df = sqlContext.sql("select bid, name from
business").withColumn(monotonically_increasing_id().as("id")
// some steps later on
df.withColumn("name", $"id")

I am not 100% what you mean by updating the data structure, I am guessing
you mean replace the name column with the id column? Not, on the second
line the withColumn call uses $"id" which in scala converts to a Column. In
java maybe its something like new Column("id"), not sure.

Pedro

On Fri, Jul 22, 2016 at 12:21 PM, VG <vlin...@gmail.com> wrote:

> Any suggestions here  please
>
> I basically need an ability to look up *name -> index* and *index -> name*
> in the code
>
> -VG
>
> On Fri, Jul 22, 2016 at 6:40 PM, VG <vlin...@gmail.com> wrote:
>
>> Hi All,
>>
>> I am really confused how to proceed further. Please help.
>>
>> I have a dataset created as follows:
>> Dataset b = sqlContext.sql("SELECT bid, name FROM business");
>>
>> Now I need to map each name with a unique index and I did the following
>> JavaPairRDD<Row, Long> indexedBId = business.javaRDD()
>>
>>  .zipWithIndex();
>>
>> In later part of the code I need to change a datastructure and update
>> name with index value generated above .
>> I am unable to figure out how to do a look up here..
>>
>> Please suggest /.
>>
>> If there is a better way to do this please suggest that.
>>
>> Regards
>> VG
>>
>>
>


-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: How to get the number of partitions for a SparkDataFrame in Spark 2.0-preview?

2016-07-22 Thread Pedro Rodriguez
This should work and I don't think triggers any actions:

df.rdd.partitions.length

On Fri, Jul 22, 2016 at 2:20 PM, Neil Chang <iam...@gmail.com> wrote:

> Seems no function does this in Spark 2.0 preview?
>



-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: spark and plot data

2016-07-22 Thread Pedro Rodriguez
Zeppelin works great. The other thing that we have done in notebooks (like 
Zeppelin or Databricks) which support multiple types of spark session is 
register Spark SQL temp tables in our scala code then escape hatch to python 
for plotting with seaborn/matplotlib when the built in plots are insufficient.

—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 22, 2016 at 3:04:48 AM, Marco Colombo (ing.marco.colo...@gmail.com) 
wrote:

Take a look at zeppelin

http://zeppelin.apache.org

Il giovedì 21 luglio 2016, Andy Davidson <a...@santacruzintegration.com> ha 
scritto:
Hi Pseudo

Plotting, graphing, data visualization, report generation are common needs in 
scientific and enterprise computing.

Can you tell me more about your use case? What is it about the current process 
/ workflow do you think could be improved by pushing plotting (I assume you 
mean plotting and graphing) into spark.


In my personal work all the graphing is done in the driver on summary stats 
calculated using spark. So for me using standard python libs has not been a 
problem.

Andy

From: pseudo oduesp <pseudo20...@gmail.com>
Date: Thursday, July 21, 2016 at 8:30 AM
To: "user @spark" <user@spark.apache.org>
Subject: spark and plot data

Hi , 
i know spark  it s engine  to compute large data set but for me i work with 
pyspark and it s very wonderful machine 

my question  we  don't have tools for ploting data each time we have to switch 
and go back to python for using plot.
but when you have large result scatter plot or roc curve  you cant use collect 
to take data .

somone have propostion for plot .

thanks 


--
Ing. Marco Colombo


Re: How can we control CPU and Memory per Spark job operation..

2016-07-22 Thread Pedro Rodriguez
Sorry, wasn’t very clear (looks like Pavan’s response was dropped from list for 
some reason as well).

I am assuming that:
1) the first map is CPU bound
2) the second map is heavily memory bound

To be specific, lets saw you are using 4 m3.2xlarge instances which have 8 CPUs 
and 30GB of ram each for a total of 32 cores and 120GB of ram. Since the NLP 
model can’t be distributed that means every worker/core must use 4GB of RAM. If 
the cluster is fully utilized that means that just for the NLP model you are 
consuming 32 * 4GB = 128GB of ram. The cluster at this point is out of memory 
just for the NLP model not considering the data set itself. My suggestion would 
be see if r3.8xlarge instances will work (or even X1s if you have access) since 
the cpu/memory fraction is better. Here is the “hack” I proposed in more detail 
(basically n partitions < total cores):

1) have the first map have a regular number of partitions, suppose 32 * 4 = 128 
which is a reasonable starting place
2) repartition immediately after that map to 16 partitions. At this point, 
spark is not guaranteed to distributed you work evenly across the 4 nodes, but 
it probably will. The net result is that half the CPU cores are idle, but the 
NLP model is at worse using 16 * 4GB = 64GB of RAM. To be sure, this is a hack 
since the nodes being evenly distributed work is not guaranteed. 

If you wanted to do this as not a hack, you could perform the map, checkpoint 
your work, end the job, then submit a new job where the cpu/memory ratio is 
more favorable which reads from the prior job’s output. I am guessing this 
heavily depends on how expensive reloading the data set from disk/network is. 

Hopefully one of these helps,
—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 17, 2016 at 6:16:41 AM, Jacek Laskowski (ja...@japila.pl) wrote:

Hi,

How would that help?! Why would you do that?

Jacek


On 17 Jul 2016 7:19 a.m., "Pedro Rodriguez" <ski.rodrig...@gmail.com> wrote:
You could call map on an RDD which has “many” partitions, then call 
repartition/coalesce to drastically reduce the number of partitions so that 
your second map job has less things running.

—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 16, 2016 at 4:46:04 PM, Jacek Laskowski (ja...@japila.pl) wrote:

Hi,

My understanding is that these two map functions will end up as a job
with one stage (as if you wrote the two maps as a single map) so you
really need as much vcores and memory as possible for map1 and map2. I
initially thought about dynamic allocation of executors that may or
may not help you with the case, but since there's just one stage I
don't think you can do much.

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Fri, Jul 15, 2016 at 9:54 PM, Pavan Achanta <pacha...@sysomos.com> wrote:
> Hi All,
>
> Here is my use case:
>
> I have a pipeline job consisting of 2 map functions:
>
> CPU intensive map operation that does not require a lot of memory.
> Memory intensive map operation that requires upto 4 GB of memory. And this
> 4GB memory cannot be distributed since it is an NLP model.
>
> Ideally what I like to do is to use 20 nodes with 4 cores each and minimal
> memory for first map operation and then use only 3 nodes with minimal CPU
> but each having 4GB of memory for 2nd operation.
>
> While it is possible to control this parallelism for each map operation in
> spark. I am not sure how to control the resources for each operation.
> Obviously I don’t want to start off the job with 20 nodes with 4 cores and
> 4GB memory since I cannot afford that much memory.
>
> We use Yarn with Spark. Any suggestions ?
>
> Thanks and regards,
> Pavan
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How can we control CPU and Memory per Spark job operation..

2016-07-16 Thread Pedro Rodriguez
You could call map on an RDD which has “many” partitions, then call 
repartition/coalesce to drastically reduce the number of partitions so that 
your second map job has less things running.

—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 16, 2016 at 4:46:04 PM, Jacek Laskowski (ja...@japila.pl) wrote:

Hi,  

My understanding is that these two map functions will end up as a job  
with one stage (as if you wrote the two maps as a single map) so you  
really need as much vcores and memory as possible for map1 and map2. I  
initially thought about dynamic allocation of executors that may or  
may not help you with the case, but since there's just one stage I  
don't think you can do much.  

Pozdrawiam,  
Jacek Laskowski  
  
https://medium.com/@jaceklaskowski/  
Mastering Apache Spark http://bit.ly/mastering-apache-spark  
Follow me at https://twitter.com/jaceklaskowski  


On Fri, Jul 15, 2016 at 9:54 PM, Pavan Achanta <pacha...@sysomos.com> wrote:  
> Hi All,  
>  
> Here is my use case:  
>  
> I have a pipeline job consisting of 2 map functions:  
>  
> CPU intensive map operation that does not require a lot of memory.  
> Memory intensive map operation that requires upto 4 GB of memory. And this  
> 4GB memory cannot be distributed since it is an NLP model.  
>  
> Ideally what I like to do is to use 20 nodes with 4 cores each and minimal  
> memory for first map operation and then use only 3 nodes with minimal CPU  
> but each having 4GB of memory for 2nd operation.  
>  
> While it is possible to control this parallelism for each map operation in  
> spark. I am not sure how to control the resources for each operation.  
> Obviously I don’t want to start off the job with 20 nodes with 4 cores and  
> 4GB memory since I cannot afford that much memory.  
>  
> We use Yarn with Spark. Any suggestions ?  
>  
> Thanks and regards,  
> Pavan  
>  
>  

-  
To unsubscribe e-mail: user-unsubscr...@spark.apache.org  



Re: Saving data frames on Spark Master/Driver

2016-07-14 Thread Pedro Rodriguez
Out of curiosity, is there a way to pull all the data back to the driver to 
save without collect()? That is, stream the data in chunks back to the driver 
so that maximum memory used comparable to a single node’s data, but all the 
data is saved on one node.

—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 14, 2016 at 6:02:12 PM, Jacek Laskowski (ja...@japila.pl) wrote:

Hi,  

Please re-consider your wish since it is going to move all the  
distributed dataset to the single machine of the driver and may lead  
to OOME. It's more pro to save your result to HDFS or S3 or any other  
distributed filesystem (that is accessible by the driver and  
executors).  

If you insist...  

Use collect() after select() and work with Array[T].  

Pozdrawiam,  
Jacek Laskowski  
  
https://medium.com/@jaceklaskowski/  
Mastering Apache Spark http://bit.ly/mastering-apache-spark  
Follow me at https://twitter.com/jaceklaskowski  


On Fri, Jul 15, 2016 at 12:15 AM, vr.n. nachiappan  
<nachiappan_...@yahoo.com.invalid> wrote:  
> Hello,  
>  
> I am using data frames to join two cassandra tables.  
>  
> Currently when i invoke save on data frames as shown below it is saving the  
> join results on executor nodes.  
>  
> joineddataframe.select(,   
> ...).format("com.databricks.spark.csv").option("header",  
> "true").save()  
>  
> I would like to persist the results of the join on Spark Master/Driver node.  
> Is it possible to save the results on Spark Master/Driver and how to do it.  
>  
> I appreciate your help.  
>  
> Nachi  
>  

-  
To unsubscribe e-mail: user-unsubscr...@spark.apache.org  



Re: Call http request from within Spark

2016-07-14 Thread Pedro Rodriguez
Hi Amit,

Have you tried running a subset of the IDs locally on a single thread? It
would be useful to benchmark your getProfile function for a subset of the
data then estimate how long the full data set would take then divide by
number of spark executor cores. This should at least serve as a sanity
check. If things are much slower than expected is it possible that the
service has a rate limit per ip address that you are hitting?

If requests is more efficient at batching requests together (I don't know
much about its internal implementation and connection pools) you could do
that with mapPartitions. This is useful when the initialization time of the
function in the map call is expensive (eg uses a connection pool for a db
or web) as it allows you to initialize that resource once per partition
then reuse it for all the elements in the partition.

Pedro

On Thu, Jul 14, 2016 at 8:52 AM, Amit Dutta <amitkrdu...@outlook.com> wrote:

> Hi All,
>
>
> I have a requirement to call a rest service url for 300k customer ids.
>
> Things I have tried so far is
>
>
> custid_rdd = sc.textFile('file:Users/zzz/CustomerID_SC/Inactive User
> Hashed LCID List.csv') #getting all the customer ids and building adds
>
> profile_rdd = custid_rdd.map(lambda r: getProfile(r.split(',')[0]))
>
> profile_rdd.count()
>
>
> #getprofile is the method to do the http call
>
> def getProfile(cust_id):
>
> api_key = 'txt'
>
> api_secret = 'yuyuy'
>
> profile_uri = 'https://profile.localytics.com/x1/customers/{}'
>
> customer_id = cust_id
>
>
> if customer_id is not None:
>
> data = requests.get(profile_uri.format(customer_id),
> auth=requests.auth.HTTPBasicAuth(api_key, api_secret))
>
> # print json.dumps(data.json(), indent=4)
>
> return data
>
>
> when I print the json dump of the data i see it returning results from the
> rest call. But the count never stops.
>
>
> Is there an efficient way of dealing this? Some post says we have to
> define a batch size etc but don't know how.
>
>
> Appreciate your help
>
>
> Regards,
>
> Amit
>
>


-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: Tools for Balancing Partitions by Size

2016-07-13 Thread Pedro Rodriguez
Hi Gourav,

In our case, we process raw logs into parquet tables that downstream
applications can use for other jobs. The desired outcome is that we only
need to worry about unbalanced input data at the preprocess step so that
downstream jobs can assume balanced input data.

In our specific case, this works because although the raw log rows are of
variable size, the rows in the Spark SQL table are of fixed size by parsing
primitives or chopping arrays. Due to this, in our use case it makes sense
to think in terms of balanced file size because it directly correlates to
having a balanced number of rows/partition and thus balanced partitions.

Given this setting, are there any specific issues you foresee? I agree that
file size isn't a general solution, but in the setting I don't see a reason
it should not work.

Our overall goal is to avoid two problems when we write data to S3:
- Large number of small files (Kbs) since this makes S3 listing take a long
time
- Small number of large files (GBs) since this makes reads not as efficient

Thus far, we have done this on a per-application basis with repartition and
a manually tuned number of partitions, but this is inconvenient. We are
interested in seeing if there is a way to automatically infer the number of
partitions we should use so that our files in S3 have a particular average
size (without incurring too high an overhead cost).

The solution that seems most promising right now is:

   - Define a custom write function which does two steps:
   - Write one partition to S3 and get files size and number of records
   - Use that to determine the number of partitions to repartition to, then
   write everything to S3

What seems unclear is how to compute the parent RDD (suppose its an RDD
with wide dependencies like a join), get one partition for step (1), then
not recompute anything to do step (2) without an explicit cache. This would
make it so the additional overhead on the job is on writing one partition
to S3 which seems like an acceptable level of overhead. Perhaps this could
be accomplished by saying: RDD A computes the size of on partition, RDD B
holds all partitions except for the one from A, the parents of A and B are
the original parent RDD, RDD C has parents A and B and has the overall
write balanced function.

Thanks,
Pedro

On Wed, Jul 13, 2016 at 9:10 AM, Gourav Sengupta <gourav.sengu...@gmail.com>
wrote:

> Hi,
>
> Using file size is a very bad way of managing data provided you think that
> volume, variety and veracity does not holds true. Actually its a very bad
> way of thinking and designing data solutions, you are bound to hit bottle
> necks, optimization issues, and manual interventions.
>
> I have found thinking about data in logical partitions helps overcome most
> of the design problems that is mentioned above.
>
> You can either use reparition with shuffling or colasce with shuffle
> turned off to manage loads.
>
> If you are using HIVE just let me know.
>
>
> Regards,
> Gourav Sengupta
>
> On Wed, Jul 13, 2016 at 5:39 AM, Pedro Rodriguez <ski.rodrig...@gmail.com>
> wrote:
>
>> The primary goal for balancing partitions would be for the write to S3.
>> We would like to prevent unbalanced partitions (can do with repartition),
>> but also avoid partitions that are too small or too large.
>>
>> So for that case, getting the cache size would work Maropu if its roughly
>> accurate, but for data ingest we aren’t caching, just writing straight
>> through to S3.
>>
>> The idea for writing to disk and checking for the size is interesting
>> Hatim. For certain jobs, it seems very doable to write a small percentage
>> of the data to S3, check the file size through the AWS API, and use that to
>> estimate the total size. Thanks for the idea.
>>
>> —
>> Pedro Rodriguez
>> PhD Student in Large-Scale Machine Learning | CU Boulder
>> Systems Oriented Data Scientist
>> UC Berkeley AMPLab Alumni
>>
>> pedrorodriguez.io | 909-353-4423
>> github.com/EntilZha | LinkedIn
>> <https://www.linkedin.com/in/pedrorodriguezscience>
>>
>> On July 12, 2016 at 7:26:17 PM, Hatim Diab (timd...@gmail.com) wrote:
>>
>> Hi,
>>
>> Since the final size depends on data types and compression. I've had to
>> first get a rough estimate of data, written to disk, then compute the
>> number of partitions.
>>
>> partitions = int(ceil(size_data * conversion_ratio / block_size))
>>
>> In my case block size 256mb, source txt & dest is snappy parquet,
>> compression_ratio .6
>>
>> df.repartition(partitions).write.parquet(output)
>>
>> Which yields files in the range of 230mb.
>>
>> Another way was to count and come up with an imperial formula.
>>
>

Re: Tools for Balancing Partitions by Size

2016-07-12 Thread Pedro Rodriguez
The primary goal for balancing partitions would be for the write to S3. We 
would like to prevent unbalanced partitions (can do with repartition), but also 
avoid partitions that are too small or too large.

So for that case, getting the cache size would work Maropu if its roughly 
accurate, but for data ingest we aren’t caching, just writing straight through 
to S3.

The idea for writing to disk and checking for the size is interesting Hatim. 
For certain jobs, it seems very doable to write a small percentage of the data 
to S3, check the file size through the AWS API, and use that to estimate the 
total size. Thanks for the idea.

—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 12, 2016 at 7:26:17 PM, Hatim Diab (timd...@gmail.com) wrote:

Hi,

Since the final size depends on data types and compression. I've had to first 
get a rough estimate of data, written to disk, then compute the number of 
partitions.

partitions = int(ceil(size_data * conversion_ratio / block_size))

In my case block size 256mb, source txt & dest is snappy parquet, 
compression_ratio .6

df.repartition(partitions).write.parquet(output)

Which yields files in the range of 230mb.

Another way was to count and come up with an imperial formula.

Cheers,
Hatim


On Jul 12, 2016, at 9:07 PM, Takeshi Yamamuro <linguin@gmail.com> wrote:

Hi,

There is no simple way to access the size in a driver side.
Since the partitions of primitive typed data (e.g., int) are compressed by 
`DataFrame#cache`,
the actual size is possibly a little bit different from processing partitions 
size.

// maropu

On Wed, Jul 13, 2016 at 4:53 AM, Pedro Rodriguez <ski.rodrig...@gmail.com> 
wrote:
Hi,

Are there any tools for partitioning RDD/DataFrames by size at runtime? The 
idea would be to specify that I would like for each partition to be roughly X 
number of megabytes then write that through to S3. I haven't found anything off 
the shelf, and looking through stack overflow posts doesn't seem to yield 
anything concrete.

Is there a way to programmatically get the size or a size estimate for an 
RDD/DataFrame at runtime (eg size of one record would be sufficient)? I gave 
SizeEstimator a try, but it seems like the results varied quite a bit (tried on 
whole RDD and a sample). It would also be useful to get programmatic access to 
the size of the RDD in memory if it is cached.

Thanks,
--
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn: 
https://www.linkedin.com/in/pedrorodriguezscience




--
---
Takeshi Yamamuro


Tools for Balancing Partitions by Size

2016-07-12 Thread Pedro Rodriguez
Hi,

Are there any tools for partitioning RDD/DataFrames by size at runtime? The
idea would be to specify that I would like for each partition to be roughly
X number of megabytes then write that through to S3. I haven't found
anything off the shelf, and looking through stack overflow posts doesn't
seem to yield anything concrete.

Is there a way to programmatically get the size or a size estimate for an
RDD/DataFrame at runtime (eg size of one record would be sufficient)? I
gave SizeEstimator a try, but it seems like the results varied quite a bit
(tried on whole RDD and a sample). It would also be useful to get
programmatic access to the size of the RDD in memory if it is cached.

Thanks,
-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Spark SQL: Merge Arrays/Sets

2016-07-11 Thread Pedro Rodriguez
Is it possible with Spark SQL to merge columns whose types are Arrays or
Sets?

My use case would be something like this:

DF types
id: String
words: Array[String]

I would want to do something like

df.groupBy('id).agg(merge_arrays('words)) -> list of all words
df.groupBy('id).agg(merge_sets('words)) -> list of distinct words

Thanks,
-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: question about UDAF

2016-07-11 Thread Pedro Rodriguez
I am not sure I understand your code entirely, but I worked with UDAFs
Friday and over the weekend (
https://gist.github.com/EntilZha/3951769a011389fef25e930258c20a2a).

I think what is going on is that your "update" function is not defined
correctly. Update should take a possibly initialized or in progress buffer
and integrate new results in. Right now, you ignore the input row. What is
probably occurring is that the initialization value "" is setting the
buffer equal to the buffer itself which is "".

Merge is responsible for taking two buffers and merging them together. In
this case, the buffers are "" since initialize makes it "" and update keeps
it "" so the result is just "". I am not sure it matters, but you probably
also want to do buffer.getString(0).

Pedro

On Mon, Jul 11, 2016 at 3:04 AM, <luohui20...@sina.com> wrote:

> hello guys:
>  I have a DF and a UDAF. this DF has 2 columns, lp_location_id , id,
> both are of Int type. I want to group by id and aggregate all value of id
> into 1 string. So I used a UDAF to do this transformation: multi Int values
> to 1 String. However my UDAF returns empty values as the accessory attached.
>  Here is my code for my main class:
> val hc = new org.apache.spark.sql.hive.HiveContext(sc)
> val hiveTable = hc.sql("select lp_location_id,id from
> house_id_pv_location_top50")
>
> val jsonArray = new JsonArray
> val result =
> hiveTable.groupBy("lp_location_id").agg(jsonArray(col("id")).as("jsonArray")).collect.foreach(println)
>
> --
>  Here is my code of my UDAF:
>
> class JsonArray extends UserDefinedAggregateFunction {
>   def inputSchema: org.apache.spark.sql.types.StructType =
> StructType(StructField("id", IntegerType) :: Nil)
>
>   def bufferSchema: StructType = StructType(
> StructField("id", StringType) :: Nil)
>
>   def dataType: DataType = StringType
>
>   def deterministic: Boolean = true
>
>   def initialize(buffer: MutableAggregationBuffer): Unit = {
> buffer(0) = ""
>   }
>
>   def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
> buffer(0) = buffer.getAs[Int](0)
>   }
>
>   def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
> val s1 = buffer1.getAs[Int](0).toString()
> val s2 = buffer2.getAs[Int](0).toString()
> buffer1(0) = s1.concat(s2)
>   }
>
>   def evaluate(buffer: Row): Any = {
> buffer(0)
>   }
> }
>
>
> I don't quit understand why I get empty result from my UDAF, I guess there
> may be 2 reason:
> 1. error initialization with "" in code of define initialize method
> 2. the buffer didn't write successfully.
>
> can anyone share a idea about this. thank you.
>
>
>
>
> 
>
> ThanksBest regards!
> San.Luo
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>



-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: DataFrame Min By Column

2016-07-09 Thread Pedro Rodriguez
Thanks Michael,

That seems like the analog to sorting tuples. I am curious, is there a 
significant performance penalty to the UDAF versus that? Its certainly nicer 
and more compact code at least.

—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 9, 2016 at 2:19:11 PM, Michael Armbrust (mich...@databricks.com) wrote:

You can do whats called an argmax/argmin, where you take the min/max of a 
couple of columns that have been grouped together as a struct.  We sort in 
column order, so you can put the timestamp first.

Here is an example.

On Sat, Jul 9, 2016 at 6:10 AM, Pedro Rodriguez <ski.rodrig...@gmail.com> wrote:
I implemented a more generic version which I posted here: 
https://gist.github.com/EntilZha/3951769a011389fef25e930258c20a2a

I think I could generalize this by pattern matching on DataType to use 
different getLong/getDouble/etc functions ( not trying to use getAs[] because 
getting T from Array[T] is hard it seems).

Is there a way to go further and make the arguments unnecessary or inferable at 
runtime, particularly for the valueType since it doesn’t matter what it is? 
DataType is abstract so I can’t instantiate it, is there a way to define the 
method so that it pulls from the user input at runtime?

Thanks,
—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 9, 2016 at 1:33:18 AM, Pedro Rodriguez (ski.rodrig...@gmail.com) wrote:

Hi Xinh,

A co-worker also found that solution but I thought it was possibly 
overkill/brittle so looks into UDAFs (user defined aggregate functions). I 
don’t have code, but Databricks has a post that has an example 
https://databricks.com/blog/2015/09/16/apache-spark-1-5-dataframe-api-highlights.html.
 From that, I was able to write a MinLongByTimestamp function, but was having a 
hard time writing a generic aggregate to any column by an order able column.

Anyone know how you might go about using generics in a UDAF, or something that 
would mimic union types to express that order able spark sql types are allowed?

—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 8, 2016 at 6:06:32 PM, Xinh Huynh (xinh.hu...@gmail.com) wrote:

Hi Pedro,

I could not think of a way using an aggregate. It's possible with a window 
function, partitioned on user and ordered by time:

// Assuming "df" holds your dataframe ...

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val wSpec = Window.partitionBy("user").orderBy("time")
df.select($"user", $"time", rank().over(wSpec).as("rank"))
  .where($"rank" === 1)

Xinh

On Fri, Jul 8, 2016 at 12:57 PM, Pedro Rodriguez <ski.rodrig...@gmail.com> 
wrote:
Is there a way to on a GroupedData (from groupBy in DataFrame) to have an 
aggregate that returns column A based on a min of column B? For example, I have 
a list of sites visited by a given user and I would like to find the event with 
the minimum time (first event)

Thanks,
--
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn: 
https://www.linkedin.com/in/pedrorodriguezscience





Re: problem making Zeppelin 0.6 work with Spark 1.6.1, throwing jackson.databind.JsonMappingException exception

2016-07-09 Thread Pedro Rodriguez
It would be helpful if you included relevant configuration files from each or 
if you are using the defaults, particularly any changes to class paths.

I worked through Zeppelin to 0.6.0 at work and at home without any issue so 
hard to say more without having more details.

—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 9, 2016 at 8:25:30 AM, Mich Talebzadeh (mich.talebza...@gmail.com) 
wrote:

Hi,

I just installed the latest Zeppelin 0.6 as follows:

Source: zeppelin-0.6.0-bin-all

With Spark 1.6.1


Now I am getting this issue with jackson. I did some search  that suggested 
this is caused by the classpath providing you with a different version of 
Jackson than the one Spark is expecting. However, no luck yet. With Spark 1.5.2 
and the previous version of Zeppelin namely 0.5.6-incubating  it used to work 
without problem.

Any ideas will be appreciated


com.fasterxml.jackson.databind.JsonMappingException:
Could not find creator property with name 'id' (in class
org.apache.spark.rdd.RDDOperationScope)

at [Source:
{"id":"14","name":"ExecutedCommand"}; line: 1, column:
1]

at
com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:148)

at
com.fasterxml.jackson.databind.DeserializationContext.mappingException(DeserializationContext.java:843)

at
com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.addBeanProps(BeanDeserializerFactory.java:533)

at
com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.buildBeanDeserializer(BeanDeserializerFactory.java:220)

at
com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.createBeanDeserializer(BeanDeserializerFactory.java:143)

at
com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer2(DeserializerCache.java:409)

at
com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer(DeserializerCache.java:358)

at
com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCache2(DeserializerCache.java:265)

at
com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCacheValueDeserializer(DeserializerCache.java:245)

at
com.fasterxml.jackson.databind.deser.DeserializerCache.findValueDeserializer(DeserializerCache.java:143)

at
com.fasterxml.jackson.databind.DeserializationContext.findRootValueDeserializer(DeserializationContext.java:439)

at
com.fasterxml.jackson.databind.ObjectMapper._findRootDeserializer(ObjectMapper.java:3666)

at
com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3558)

at
com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2578)

at
org.apache.spark.rdd.RDDOperationScope$.fromJson(RDDOperationScope.scala:85)

at
org.apache.spark.rdd.RDDOperationScope$$anonfun$5.apply(RDDOperationScope.scala:136)

at
org.apache.spark.rdd.RDDOperationScope$$anonfun$5.apply(RDDOperationScope.scala:136)

at
scala.Option.map(Option.scala:145)

at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:136)

at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)

at
org.apache.spark.SparkContext.withScope(SparkContext.scala:714)

at
org.apache.spark.SparkContext.parallelize(SparkContext.scala:728)

at
org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70)

at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)

at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)

at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)

at
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)

at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)

at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)

at
org.apache.spark.sql.DataFrame.(DataFrame.scala:145)

at
org.apache.spark.sql.DataFrame.(DataFrame.scala:130)

at
org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52)

at
org.apache.spark.sql.SQLContext.sql(SQLContext.scala:817)

at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:34)

at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:39)

at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:41)

at
$iwC$$iwC$$iwC$$iwC$$iwC.(:43)

at
$iwC$$iwC$$iwC$$iwC.(:45)

at
$iwC$$iwC$$iwC.(:47)

at
$iwC$$iwC.(:49)

at
$iwC.(:51)

at
(:53)

at
.(:57)

Dr Mich Talebzadeh
 
LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 
http://talebzadehmich.wordpress.com

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.
 

Re: DataFrame Min By Column

2016-07-09 Thread Pedro Rodriguez
I implemented a more generic version which I posted here: 
https://gist.github.com/EntilZha/3951769a011389fef25e930258c20a2a

I think I could generalize this by pattern matching on DataType to use 
different getLong/getDouble/etc functions ( not trying to use getAs[] because 
getting T from Array[T] is hard it seems).

Is there a way to go further and make the arguments unnecessary or inferable at 
runtime, particularly for the valueType since it doesn’t matter what it is? 
DataType is abstract so I can’t instantiate it, is there a way to define the 
method so that it pulls from the user input at runtime?

Thanks,
—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 9, 2016 at 1:33:18 AM, Pedro Rodriguez (ski.rodrig...@gmail.com) wrote:

Hi Xinh,

A co-worker also found that solution but I thought it was possibly 
overkill/brittle so looks into UDAFs (user defined aggregate functions). I 
don’t have code, but Databricks has a post that has an example 
https://databricks.com/blog/2015/09/16/apache-spark-1-5-dataframe-api-highlights.html.
 From that, I was able to write a MinLongByTimestamp function, but was having a 
hard time writing a generic aggregate to any column by an order able column.

Anyone know how you might go about using generics in a UDAF, or something that 
would mimic union types to express that order able spark sql types are allowed?

—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 8, 2016 at 6:06:32 PM, Xinh Huynh (xinh.hu...@gmail.com) wrote:

Hi Pedro,

I could not think of a way using an aggregate. It's possible with a window 
function, partitioned on user and ordered by time:

// Assuming "df" holds your dataframe ...

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val wSpec = Window.partitionBy("user").orderBy("time")
df.select($"user", $"time", rank().over(wSpec).as("rank"))
  .where($"rank" === 1)

Xinh

On Fri, Jul 8, 2016 at 12:57 PM, Pedro Rodriguez <ski.rodrig...@gmail.com> 
wrote:
Is there a way to on a GroupedData (from groupBy in DataFrame) to have an 
aggregate that returns column A based on a min of column B? For example, I have 
a list of sites visited by a given user and I would like to find the event with 
the minimum time (first event)

Thanks,
--
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn: 
https://www.linkedin.com/in/pedrorodriguezscience




Re: DataFrame Min By Column

2016-07-09 Thread Pedro Rodriguez
Hi Xinh,

A co-worker also found that solution but I thought it was possibly 
overkill/brittle so looks into UDAFs (user defined aggregate functions). I 
don’t have code, but Databricks has a post that has an example 
https://databricks.com/blog/2015/09/16/apache-spark-1-5-dataframe-api-highlights.html.
 From that, I was able to write a MinLongByTimestamp function, but was having a 
hard time writing a generic aggregate to any column by an order able column.

Anyone know how you might go about using generics in a UDAF, or something that 
would mimic union types to express that order able spark sql types are allowed?

—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 8, 2016 at 6:06:32 PM, Xinh Huynh (xinh.hu...@gmail.com) wrote:

Hi Pedro,

I could not think of a way using an aggregate. It's possible with a window 
function, partitioned on user and ordered by time:

// Assuming "df" holds your dataframe ...

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val wSpec = Window.partitionBy("user").orderBy("time")
df.select($"user", $"time", rank().over(wSpec).as("rank"))
  .where($"rank" === 1)

Xinh

On Fri, Jul 8, 2016 at 12:57 PM, Pedro Rodriguez <ski.rodrig...@gmail.com> 
wrote:
Is there a way to on a GroupedData (from groupBy in DataFrame) to have an 
aggregate that returns column A based on a min of column B? For example, I have 
a list of sites visited by a given user and I would like to find the event with 
the minimum time (first event)

Thanks,
--
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn: 
https://www.linkedin.com/in/pedrorodriguezscience




DataFrame Min By Column

2016-07-08 Thread Pedro Rodriguez
Is there a way to on a GroupedData (from groupBy in DataFrame) to have an
aggregate that returns column A based on a min of column B? For example, I
have a list of sites visited by a given user and I would like to find the
event with the minimum time (first event)

Thanks,
-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: Custom RDD: Report Size of Partition in Bytes to Spark

2016-07-04 Thread Pedro Rodriguez
Just realized I had been replying back to only Takeshi.

Thanks for tip as it got me on the right track. Running into an issue with 
private [spark] methods though. It looks like the input metrics start out as 
None and are not initialized (verified by throwing new Exception on pattern 
match cases when it is None and when its not). Looks like NewHadoopRDD calls 
getInputMetricsForReadMethod which sets _inputMetrics if it is None, but it is 
unfortunately it is private [spark]. Is there a way for external RDDs to access 
this method or somehow initialize _inputMetrics in 1.6.X (looks like 2.0 makes 
more of this API public)?

Using reflection I was able to implement it mimicking the NewHadoopRDD code, 
but if possible would like to avoid using reflection. Below is the source code 
for the method that works.

RDD code: 
https://github.com/EntilZha/spark-s3/blob/9e632f2a71fba2858df748ed43f0dbb5dae52a83/src/main/scala/io/entilzha/spark/s3/S3RDD.scala#L100-L105
Reflection code: 
https://github.com/EntilZha/spark-s3/blob/9e632f2a71fba2858df748ed43f0dbb5dae52a83/src/main/scala/io/entilzha/spark/s3/PrivateMethodUtil.scala

Thanks,
—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 3, 2016 at 10:31:30 PM, Takeshi Yamamuro (linguin@gmail.com) wrote:

How about using `SparkListener`?
You can collect IO statistics thru TaskMetrics#inputMetrics by yourself.

// maropu

On Mon, Jul 4, 2016 at 11:46 AM, Pedro Rodriguez <ski.rodrig...@gmail.com> 
wrote:
Hi All,

I noticed on some Spark jobs it shows you input/output read size. I am 
implementing a custom RDD which reads files and would like to report these 
metrics to Spark since they are available to me.

I looked through the RDD source code and a couple different implementations and 
the best I could find were some Hadoop metrics. Is there a way to simply report 
the number of bytes a partition read so Spark can put it on the UI?

Thanks,
—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn



--
---
Takeshi Yamamuro


Custom RDD: Report Size of Partition in Bytes to Spark

2016-07-03 Thread Pedro Rodriguez
Hi All,

I noticed on some Spark jobs it shows you input/output read size. I am 
implementing a custom RDD which reads files and would like to report these 
metrics to Spark since they are available to me.

I looked through the RDD source code and a couple different implementations and 
the best I could find were some Hadoop metrics. Is there a way to simply report 
the number of bytes a partition read so Spark can put it on the UI?

Thanks,
—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

Re: Call Scala API from PySpark

2016-06-30 Thread Pedro Rodriguez
That was indeed the case, using UTF8Deserializer makes everything work
correctly.

Thanks for the tips!

On Thu, Jun 30, 2016 at 3:32 PM, Pedro Rodriguez <ski.rodrig...@gmail.com>
wrote:

> Quick update, I was able to get most of the plumbing to work thanks to the
> code Holden posted and browsing more source code.
>
> I am running into this error which makes me think that maybe I shouldn't
> be leaving the default python RDD serializer/pickler in place and do
> something else
> https://github.com/apache/spark/blob/v1.6.2/python/pyspark/rdd.py#L182:
> _pickle.UnpicklingError: A load persistent id instruction was encountered,
> but no persistent_load function was specified.
>
>
> On Thu, Jun 30, 2016 at 2:13 PM, Pedro Rodriguez <ski.rodrig...@gmail.com>
> wrote:
>
>> Thanks Jeff and Holden,
>>
>> A little more context here probably helps. I am working on implementing
>> the idea from this article to make reads from S3 faster:
>> http://tech.kinja.com/how-not-to-pull-from-s3-using-apache-spark-1704509219
>> (although my name is Pedro, I am not the author of the article). The reason
>> for wrapping SparkContext is so that the code change is from sc.textFile to
>> sc.s3TextFile in addition to configuring AWS keys correctly (seeing if we
>> can open source our library, but depends on company). Overall, its a very
>> light wrapper and perhaps calling it a context is not quite the right name
>> because of that.
>>
>> At the end of the day I make a sc.parallelize call and return an
>> RDD[String] as described in that blog post. I found a post from Py4J
>> mailing list that reminded my that the JVM gateway needs the jars in
>> spark.driver/executor.extraClassPath in addition to the spark.jars option.
>> With that, I can see the classes now. Looks like I need to do as you
>> suggest and wrap it using Java in order to go the last mile to calling the
>> method/constructor. I don't know yet how to get the RDD back to pyspark
>> though so any pointers on that would be great.
>>
>> Thanks for the tip on code Holden, I will take a look to see if that can
>> give me some insight on how to write the Python code part.
>>
>> Thanks!
>> Pedro
>>
>> On Thu, Jun 30, 2016 at 12:23 PM, Holden Karau <hol...@pigscanfly.ca>
>> wrote:
>>
>>> So I'm a little biased - I think the bet bride between the two is using
>>> DataFrames. I've got some examples in my talk and on the high performance
>>> spark GitHub
>>> https://github.com/high-performance-spark/high-performance-spark-examples/blob/master/high_performance_pyspark/simple_perf_test.py
>>> calls some custom scala code.
>>>
>>> Using a custom context is a bit trixie though because of how the
>>> launching is done, as Jeff Zhang points out you would need to wrap it in a
>>> JavaSparkContext and then you could override the _intialize_context
>>> function in context.py
>>>
>>> On Thu, Jun 30, 2016 at 11:06 AM, Jeff Zhang <zjf...@gmail.com> wrote:
>>>
>>>> Hi Pedro,
>>>>
>>>> Your use case is interesting.  I think launching java gateway is the
>>>> same as native SparkContext, the only difference is on creating your custom
>>>> SparkContext instead of native SparkContext. You might also need to wrap it
>>>> using java.
>>>>
>>>>
>>>> https://github.com/apache/spark/blob/v1.6.2/python/pyspark/context.py#L172
>>>>
>>>>
>>>>
>>>> On Thu, Jun 30, 2016 at 9:53 AM, Pedro Rodriguez <
>>>> ski.rodrig...@gmail.com> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I have written a Scala package which essentially wraps the
>>>>> SparkContext around a custom class that adds some functionality specific 
>>>>> to
>>>>> our internal use case. I am trying to figure out the best way to call this
>>>>> from PySpark.
>>>>>
>>>>> I would like to do this similarly to how Spark itself calls the JVM
>>>>> SparkContext as in:
>>>>> https://github.com/apache/spark/blob/v1.6.2/python/pyspark/context.py
>>>>>
>>>>> My goal would be something like this:
>>>>>
>>>>> Scala Code (this is done):
>>>>> >>> import com.company.mylibrary.CustomContext
>>>>> >>> val myContext = CustomContext(sc)
>>>>> >>> val rdd: RDD[String] = myContext.customTextFile("path")
>>>>>
>&g

Re: Call Scala API from PySpark

2016-06-30 Thread Pedro Rodriguez
Quick update, I was able to get most of the plumbing to work thanks to the
code Holden posted and browsing more source code.

I am running into this error which makes me think that maybe I shouldn't be
leaving the default python RDD serializer/pickler in place and do something
else https://github.com/apache/spark/blob/v1.6.2/python/pyspark/rdd.py#L182:
_pickle.UnpicklingError: A load persistent id instruction was encountered,
but no persistent_load function was specified.


On Thu, Jun 30, 2016 at 2:13 PM, Pedro Rodriguez <ski.rodrig...@gmail.com>
wrote:

> Thanks Jeff and Holden,
>
> A little more context here probably helps. I am working on implementing
> the idea from this article to make reads from S3 faster:
> http://tech.kinja.com/how-not-to-pull-from-s3-using-apache-spark-1704509219
> (although my name is Pedro, I am not the author of the article). The reason
> for wrapping SparkContext is so that the code change is from sc.textFile to
> sc.s3TextFile in addition to configuring AWS keys correctly (seeing if we
> can open source our library, but depends on company). Overall, its a very
> light wrapper and perhaps calling it a context is not quite the right name
> because of that.
>
> At the end of the day I make a sc.parallelize call and return an
> RDD[String] as described in that blog post. I found a post from Py4J
> mailing list that reminded my that the JVM gateway needs the jars in
> spark.driver/executor.extraClassPath in addition to the spark.jars option.
> With that, I can see the classes now. Looks like I need to do as you
> suggest and wrap it using Java in order to go the last mile to calling the
> method/constructor. I don't know yet how to get the RDD back to pyspark
> though so any pointers on that would be great.
>
> Thanks for the tip on code Holden, I will take a look to see if that can
> give me some insight on how to write the Python code part.
>
> Thanks!
> Pedro
>
> On Thu, Jun 30, 2016 at 12:23 PM, Holden Karau <hol...@pigscanfly.ca>
> wrote:
>
>> So I'm a little biased - I think the bet bride between the two is using
>> DataFrames. I've got some examples in my talk and on the high performance
>> spark GitHub
>> https://github.com/high-performance-spark/high-performance-spark-examples/blob/master/high_performance_pyspark/simple_perf_test.py
>> calls some custom scala code.
>>
>> Using a custom context is a bit trixie though because of how the
>> launching is done, as Jeff Zhang points out you would need to wrap it in a
>> JavaSparkContext and then you could override the _intialize_context
>> function in context.py
>>
>> On Thu, Jun 30, 2016 at 11:06 AM, Jeff Zhang <zjf...@gmail.com> wrote:
>>
>>> Hi Pedro,
>>>
>>> Your use case is interesting.  I think launching java gateway is the
>>> same as native SparkContext, the only difference is on creating your custom
>>> SparkContext instead of native SparkContext. You might also need to wrap it
>>> using java.
>>>
>>>
>>> https://github.com/apache/spark/blob/v1.6.2/python/pyspark/context.py#L172
>>>
>>>
>>>
>>> On Thu, Jun 30, 2016 at 9:53 AM, Pedro Rodriguez <
>>> ski.rodrig...@gmail.com> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I have written a Scala package which essentially wraps the SparkContext
>>>> around a custom class that adds some functionality specific to our internal
>>>> use case. I am trying to figure out the best way to call this from PySpark.
>>>>
>>>> I would like to do this similarly to how Spark itself calls the JVM
>>>> SparkContext as in:
>>>> https://github.com/apache/spark/blob/v1.6.2/python/pyspark/context.py
>>>>
>>>> My goal would be something like this:
>>>>
>>>> Scala Code (this is done):
>>>> >>> import com.company.mylibrary.CustomContext
>>>> >>> val myContext = CustomContext(sc)
>>>> >>> val rdd: RDD[String] = myContext.customTextFile("path")
>>>>
>>>> Python Code (I want to be able to do this):
>>>> >>> from company.mylibrary import CustomContext
>>>> >>> myContext = CustomContext(sc)
>>>> >>> rdd = myContext.customTextFile("path")
>>>>
>>>> At the end of each code, I should be working with an ordinary
>>>> RDD[String].
>>>>
>>>> I am trying to access my Scala class through sc._jvm as below, but not
>>>> having any luck so far.
>>>>
>>>> My attempts

Re: Call Scala API from PySpark

2016-06-30 Thread Pedro Rodriguez
Thanks Jeff and Holden,

A little more context here probably helps. I am working on implementing the
idea from this article to make reads from S3 faster:
http://tech.kinja.com/how-not-to-pull-from-s3-using-apache-spark-1704509219
(although my name is Pedro, I am not the author of the article). The reason
for wrapping SparkContext is so that the code change is from sc.textFile to
sc.s3TextFile in addition to configuring AWS keys correctly (seeing if we
can open source our library, but depends on company). Overall, its a very
light wrapper and perhaps calling it a context is not quite the right name
because of that.

At the end of the day I make a sc.parallelize call and return an
RDD[String] as described in that blog post. I found a post from Py4J
mailing list that reminded my that the JVM gateway needs the jars in
spark.driver/executor.extraClassPath in addition to the spark.jars option.
With that, I can see the classes now. Looks like I need to do as you
suggest and wrap it using Java in order to go the last mile to calling the
method/constructor. I don't know yet how to get the RDD back to pyspark
though so any pointers on that would be great.

Thanks for the tip on code Holden, I will take a look to see if that can
give me some insight on how to write the Python code part.

Thanks!
Pedro

On Thu, Jun 30, 2016 at 12:23 PM, Holden Karau <hol...@pigscanfly.ca> wrote:

> So I'm a little biased - I think the bet bride between the two is using
> DataFrames. I've got some examples in my talk and on the high performance
> spark GitHub
> https://github.com/high-performance-spark/high-performance-spark-examples/blob/master/high_performance_pyspark/simple_perf_test.py
> calls some custom scala code.
>
> Using a custom context is a bit trixie though because of how the launching
> is done, as Jeff Zhang points out you would need to wrap it in a
> JavaSparkContext and then you could override the _intialize_context
> function in context.py
>
> On Thu, Jun 30, 2016 at 11:06 AM, Jeff Zhang <zjf...@gmail.com> wrote:
>
>> Hi Pedro,
>>
>> Your use case is interesting.  I think launching java gateway is the same
>> as native SparkContext, the only difference is on creating your custom
>> SparkContext instead of native SparkContext. You might also need to wrap it
>> using java.
>>
>> https://github.com/apache/spark/blob/v1.6.2/python/pyspark/context.py#L172
>>
>>
>>
>> On Thu, Jun 30, 2016 at 9:53 AM, Pedro Rodriguez <ski.rodrig...@gmail.com
>> > wrote:
>>
>>> Hi All,
>>>
>>> I have written a Scala package which essentially wraps the SparkContext
>>> around a custom class that adds some functionality specific to our internal
>>> use case. I am trying to figure out the best way to call this from PySpark.
>>>
>>> I would like to do this similarly to how Spark itself calls the JVM
>>> SparkContext as in:
>>> https://github.com/apache/spark/blob/v1.6.2/python/pyspark/context.py
>>>
>>> My goal would be something like this:
>>>
>>> Scala Code (this is done):
>>> >>> import com.company.mylibrary.CustomContext
>>> >>> val myContext = CustomContext(sc)
>>> >>> val rdd: RDD[String] = myContext.customTextFile("path")
>>>
>>> Python Code (I want to be able to do this):
>>> >>> from company.mylibrary import CustomContext
>>> >>> myContext = CustomContext(sc)
>>> >>> rdd = myContext.customTextFile("path")
>>>
>>> At the end of each code, I should be working with an ordinary
>>> RDD[String].
>>>
>>> I am trying to access my Scala class through sc._jvm as below, but not
>>> having any luck so far.
>>>
>>> My attempts:
>>> >>> a = sc._jvm.com.company.mylibrary.CustomContext
>>> >>> dir(a)
>>> ['']
>>>
>>> Example of what I want::
>>> >>> a = sc._jvm.PythonRDD
>>> >>> dir(a)
>>> ['anonfun$6', 'anonfun$8', 'collectAndServe',
>>> 'doubleRDDToDoubleRDDFunctions', 'getWorkerBroadcasts', 'hadoopFile',
>>> 'hadoopRDD', 'newAPIHadoopFile', 'newAPIHadoopRDD',
>>> 'numericRDDToDoubleRDDFunctions', 'rddToAsyncRDDActions',
>>> 'rddToOrderedRDDFunctions', 'rddToPairRDDFunctions',
>>> 'rddToPairRDDFunctions$default$4', 'rddToSequenceFileRDDFunctions',
>>> 'readBroadcastFromFile', 'readRDDFromFile', 'runJob',
>>> 'saveAsHadoopDataset', 'saveAsHadoopFile', 'saveAsNewAPIHadoopFile',
>>> 'saveAsSequenceFile', 'sequenceFile', 'serveIterator', 'valueOfPair',
>>> 'writeIteratorToStream', 'writeUTF']
>>

Call Scala API from PySpark

2016-06-30 Thread Pedro Rodriguez
Hi All,

I have written a Scala package which essentially wraps the SparkContext
around a custom class that adds some functionality specific to our internal
use case. I am trying to figure out the best way to call this from PySpark.

I would like to do this similarly to how Spark itself calls the JVM
SparkContext as in:
https://github.com/apache/spark/blob/v1.6.2/python/pyspark/context.py

My goal would be something like this:

Scala Code (this is done):
>>> import com.company.mylibrary.CustomContext
>>> val myContext = CustomContext(sc)
>>> val rdd: RDD[String] = myContext.customTextFile("path")

Python Code (I want to be able to do this):
>>> from company.mylibrary import CustomContext
>>> myContext = CustomContext(sc)
>>> rdd = myContext.customTextFile("path")

At the end of each code, I should be working with an ordinary RDD[String].

I am trying to access my Scala class through sc._jvm as below, but not
having any luck so far.

My attempts:
>>> a = sc._jvm.com.company.mylibrary.CustomContext
>>> dir(a)
['']

Example of what I want::
>>> a = sc._jvm.PythonRDD
>>> dir(a)
['anonfun$6', 'anonfun$8', 'collectAndServe',
'doubleRDDToDoubleRDDFunctions', 'getWorkerBroadcasts', 'hadoopFile',
'hadoopRDD', 'newAPIHadoopFile', 'newAPIHadoopRDD',
'numericRDDToDoubleRDDFunctions', 'rddToAsyncRDDActions',
'rddToOrderedRDDFunctions', 'rddToPairRDDFunctions',
'rddToPairRDDFunctions$default$4', 'rddToSequenceFileRDDFunctions',
'readBroadcastFromFile', 'readRDDFromFile', 'runJob',
'saveAsHadoopDataset', 'saveAsHadoopFile', 'saveAsNewAPIHadoopFile',
'saveAsSequenceFile', 'sequenceFile', 'serveIterator', 'valueOfPair',
'writeIteratorToStream', 'writeUTF']

The next thing I would run into is converting the JVM RDD[String] back to a
Python RDD, what is the easiest way to do this?

Overall, is this a good approach to calling the same API in Scala and
Python?

-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: Dataset Select Function after Aggregate Error

2016-06-18 Thread Pedro Rodriguez
I am curious if there is a way to call this so that it becomes a compile
error rather than runtime error:

// Note mispelled count and name
ds.groupBy($"name").count.select('nam, $"coun").show

More specifically, what are the best type safety guarantees that Datasets
provide? It seems like with Dataframes there is still the unsafety of
specifying column names by string/symbol and expecting the type to be
correct and exist, but if you do something like this then downstream code
is safer:

// This is Array[(String, Long)] instead of Array[sql.Row]
ds.groupBy($"name").count.select('name.as[String], 'count.as
[Long]).collect()

Does that seem like a correct understanding of Datasets?

On Sat, Jun 18, 2016 at 6:39 AM, Pedro Rodriguez <ski.rodrig...@gmail.com>
wrote:

> Looks like it was my own fault. I had spark 2.0 cloned/built, but had the
> spark shell in my path so somehow 1.6.1 was being used instead of 2.0.
> Thanks
>
> On Sat, Jun 18, 2016 at 1:16 AM, Takeshi Yamamuro <linguin@gmail.com>
> wrote:
>
>> which version you use?
>> I passed in 2.0-preview as follows;
>> ---
>>
>> Spark context available as 'sc' (master = local[*], app id =
>> local-1466234043659).
>>
>> Spark session available as 'spark'.
>>
>> Welcome to
>>
>>     __
>>
>>  / __/__  ___ _/ /__
>>
>> _\ \/ _ \/ _ `/ __/  '_/
>>
>>/___/ .__/\_,_/_/ /_/\_\   version 2.0.0-preview
>>
>>   /_/
>>
>>
>>
>> Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java
>> 1.8.0_31)
>>
>> Type in expressions to have them evaluated.
>>
>> Type :help for more information.
>>
>>
>> scala> val ds = Seq[Tuple2[Int, Int]]((1, 0), (2, 0)).toDS
>>
>> hive.metastore.schema.verification is not enabled so recording the schema
>> version 1.2.0
>>
>> ds: org.apache.spark.sql.Dataset[(Int, Int)] = [_1: int, _2: int]
>>
>> scala> ds.groupBy($"_1").count.select($"_1", $"count").show
>>
>> +---+-+
>>
>> | _1|count|
>>
>> +---+-+
>>
>> |  1|1|
>>
>> |  2|1|
>>
>> +---+-+
>>
>>
>>
>> On Sat, Jun 18, 2016 at 3:09 PM, Pedro Rodriguez <ski.rodrig...@gmail.com
>> > wrote:
>>
>>> I went ahead and downloaded/compiled Spark 2.0 to try your code snippet
>>> Takeshi. It unfortunately doesn't compile.
>>>
>>> scala> val ds = Seq[Tuple2[Int, Int]]((1, 0), (2, 0)).toDS
>>> ds: org.apache.spark.sql.Dataset[(Int, Int)] = [_1: int, _2: int]
>>>
>>> scala> ds.groupBy($"_1").count.select($"_1", $"count").show
>>> :28: error: type mismatch;
>>>  found   : org.apache.spark.sql.ColumnName
>>>  required: org.apache.spark.sql.TypedColumn[(org.apache.spark.sql.Row,
>>> Long),?]
>>>   ds.groupBy($"_1").count.select($"_1", $"count").show
>>>  ^
>>>
>>> I also gave a try to Xinh's suggestion using the code snippet below
>>> (partially from spark docs)
>>> scala> val ds = Seq(Person("Andy", 32), Person("Andy", 2),
>>> Person("Pedro", 24), Person("Bob", 42)).toDS()
>>> scala> ds.groupBy(_.name).count.select($"name".as[String]).show
>>> org.apache.spark.sql.AnalysisException: cannot resolve 'name' given
>>> input columns: [];
>>> scala> ds.groupBy(_.name).count.select($"_1".as[String]).show
>>> org.apache.spark.sql.AnalysisException: cannot resolve 'name' given
>>> input columns: [];
>>> scala> ds.groupBy($"name").count.select($"_1".as[String]).show
>>> org.apache.spark.sql.AnalysisException: cannot resolve '_1' given input
>>> columns: [];
>>>
>>> Looks like there are empty columns for some reason, the code below works
>>> fine for the simple aggregate
>>> scala> ds.groupBy(_.name).count.show
>>>
>>> Would be great to see an idiomatic example of using aggregates like
>>> these mixed with spark.sql.functions.
>>>
>>> Pedro
>>>
>>> On Fri, Jun 17, 2016 at 9:59 PM, Pedro Rodriguez <
>>> ski.rodrig...@gmail.com> wrote:
>>>
>>>> Thanks Xinh and Takeshi,
>>>>
>>>> I am trying to avoid map since my impression is that this uses a Scala
>>>> clos

Re: Dataset Select Function after Aggregate Error

2016-06-18 Thread Pedro Rodriguez
Looks like it was my own fault. I had spark 2.0 cloned/built, but had the
spark shell in my path so somehow 1.6.1 was being used instead of 2.0.
Thanks

On Sat, Jun 18, 2016 at 1:16 AM, Takeshi Yamamuro <linguin@gmail.com>
wrote:

> which version you use?
> I passed in 2.0-preview as follows;
> ---
>
> Spark context available as 'sc' (master = local[*], app id =
> local-1466234043659).
>
> Spark session available as 'spark'.
>
> Welcome to
>
>     __
>
>  / __/__  ___ _/ /__
>
> _\ \/ _ \/ _ `/ __/  '_/
>
>/___/ .__/\_,_/_/ /_/\_\   version 2.0.0-preview
>
>   /_/
>
>
>
> Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java
> 1.8.0_31)
>
> Type in expressions to have them evaluated.
>
> Type :help for more information.
>
>
> scala> val ds = Seq[Tuple2[Int, Int]]((1, 0), (2, 0)).toDS
>
> hive.metastore.schema.verification is not enabled so recording the schema
> version 1.2.0
>
> ds: org.apache.spark.sql.Dataset[(Int, Int)] = [_1: int, _2: int]
>
> scala> ds.groupBy($"_1").count.select($"_1", $"count").show
>
> +---+-+
>
> | _1|count|
>
> +---+-+
>
> |  1|1|
>
> |  2|1|
>
> +---+-+
>
>
>
> On Sat, Jun 18, 2016 at 3:09 PM, Pedro Rodriguez <ski.rodrig...@gmail.com>
> wrote:
>
>> I went ahead and downloaded/compiled Spark 2.0 to try your code snippet
>> Takeshi. It unfortunately doesn't compile.
>>
>> scala> val ds = Seq[Tuple2[Int, Int]]((1, 0), (2, 0)).toDS
>> ds: org.apache.spark.sql.Dataset[(Int, Int)] = [_1: int, _2: int]
>>
>> scala> ds.groupBy($"_1").count.select($"_1", $"count").show
>> :28: error: type mismatch;
>>  found   : org.apache.spark.sql.ColumnName
>>  required: org.apache.spark.sql.TypedColumn[(org.apache.spark.sql.Row,
>> Long),?]
>>   ds.groupBy($"_1").count.select($"_1", $"count").show
>>  ^
>>
>> I also gave a try to Xinh's suggestion using the code snippet below
>> (partially from spark docs)
>> scala> val ds = Seq(Person("Andy", 32), Person("Andy", 2),
>> Person("Pedro", 24), Person("Bob", 42)).toDS()
>> scala> ds.groupBy(_.name).count.select($"name".as[String]).show
>> org.apache.spark.sql.AnalysisException: cannot resolve 'name' given input
>> columns: [];
>> scala> ds.groupBy(_.name).count.select($"_1".as[String]).show
>> org.apache.spark.sql.AnalysisException: cannot resolve 'name' given input
>> columns: [];
>> scala> ds.groupBy($"name").count.select($"_1".as[String]).show
>> org.apache.spark.sql.AnalysisException: cannot resolve '_1' given input
>> columns: [];
>>
>> Looks like there are empty columns for some reason, the code below works
>> fine for the simple aggregate
>> scala> ds.groupBy(_.name).count.show
>>
>> Would be great to see an idiomatic example of using aggregates like these
>> mixed with spark.sql.functions.
>>
>> Pedro
>>
>> On Fri, Jun 17, 2016 at 9:59 PM, Pedro Rodriguez <ski.rodrig...@gmail.com
>> > wrote:
>>
>>> Thanks Xinh and Takeshi,
>>>
>>> I am trying to avoid map since my impression is that this uses a Scala
>>> closure so is not optimized as well as doing column-wise operations is.
>>>
>>> Looks like the $ notation is the way to go, thanks for the help. Is
>>> there an explanation of how this works? I imagine it is a method/function
>>> with its name defined as $ in Scala?
>>>
>>> Lastly, are there prelim Spark 2.0 docs? If there isn't a good
>>> description/guide of using this syntax I would be willing to contribute
>>> some documentation.
>>>
>>> Pedro
>>>
>>> On Fri, Jun 17, 2016 at 8:53 PM, Takeshi Yamamuro <linguin@gmail.com
>>> > wrote:
>>>
>>>> Hi,
>>>>
>>>> In 2.0, you can say;
>>>> val ds = Seq[Tuple2[Int, Int]]((1, 0), (2, 0)).toDS
>>>> ds.groupBy($"_1").count.select($"_1", $"count").show
>>>>
>>>>
>>>> // maropu
>>>>
>>>>
>>>> On Sat, Jun 18, 2016 at 7:53 AM, Xinh Huynh <xinh.hu...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Pedro,
>>>>>
>>>>> In 1.6.1, you can do:
>&g

Re: Skew data

2016-06-17 Thread Pedro Rodriguez
I am going to take a guess that this means that your partitions within an
RDD are not balanced (one or more partitions are much larger than the
rest). This would mean a single core would need to do much more work than
the rest leading to poor performance. In general, the way to fix this is to
spread data across partitions evenly. In most cases calling repartition is
enough to solve the problem. If you have a special case you might need
create your own custom partitioner.

Pedro

On Thu, Jun 16, 2016 at 6:55 PM, Selvam Raman <sel...@gmail.com> wrote:

> Hi,
>
> What is skew data.
>
> I read that if the data was skewed while joining it would take long time
> to finish the job.(99 percent finished in seconds where 1 percent of task
> taking minutes to hour).
>
> How to handle skewed data in spark.
>
> Thanks,
> Selvam R
> +91-97877-87724
>



-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: Dataset Select Function after Aggregate Error

2016-06-17 Thread Pedro Rodriguez
Thanks Xinh and Takeshi,

I am trying to avoid map since my impression is that this uses a Scala
closure so is not optimized as well as doing column-wise operations is.

Looks like the $ notation is the way to go, thanks for the help. Is there
an explanation of how this works? I imagine it is a method/function with
its name defined as $ in Scala?

Lastly, are there prelim Spark 2.0 docs? If there isn't a good
description/guide of using this syntax I would be willing to contribute
some documentation.

Pedro

On Fri, Jun 17, 2016 at 8:53 PM, Takeshi Yamamuro <linguin@gmail.com>
wrote:

> Hi,
>
> In 2.0, you can say;
> val ds = Seq[Tuple2[Int, Int]]((1, 0), (2, 0)).toDS
> ds.groupBy($"_1").count.select($"_1", $"count").show
>
>
> // maropu
>
>
> On Sat, Jun 18, 2016 at 7:53 AM, Xinh Huynh <xinh.hu...@gmail.com> wrote:
>
>> Hi Pedro,
>>
>> In 1.6.1, you can do:
>> >> ds.groupBy(_.uid).count().map(_._1)
>> or
>> >> ds.groupBy(_.uid).count().select($"value".as[String])
>>
>> It doesn't have the exact same syntax as for DataFrame.
>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset
>>
>> It might be different in 2.0.
>>
>> Xinh
>>
>> On Fri, Jun 17, 2016 at 3:33 PM, Pedro Rodriguez <ski.rodrig...@gmail.com
>> > wrote:
>>
>>> Hi All,
>>>
>>> I am working on using Datasets in 1.6.1 and eventually 2.0 when its
>>> released.
>>>
>>> I am running the aggregate code below where I have a dataset where the
>>> row has a field uid:
>>>
>>> ds.groupBy(_.uid).count()
>>> // res0: org.apache.spark.sql.Dataset[(String, Long)] = [_1: string,
>>> _2: bigint]
>>>
>>> This works as expected, however, attempts to run select statements after
>>> fails:
>>> ds.groupBy(_.uid).count().select(_._1)
>>> // error: missing parameter type for expanded function ((x$2) => x$2._1)
>>> ds.groupBy(_.uid).count().select(_._1)
>>>
>>> I have tried several variants, but nothing seems to work. Below is the
>>> equivalent Dataframe code which works as expected:
>>> df.groupBy("uid").count().select("uid")
>>>
>>> Thanks!
>>> --
>>> Pedro Rodriguez
>>> PhD Student in Distributed Machine Learning | CU Boulder
>>> UC Berkeley AMPLab Alumni
>>>
>>> ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
>>> Github: github.com/EntilZha | LinkedIn:
>>> https://www.linkedin.com/in/pedrorodriguezscience
>>>
>>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>



-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Dataset Select Function after Aggregate Error

2016-06-17 Thread Pedro Rodriguez
Hi All,

I am working on using Datasets in 1.6.1 and eventually 2.0 when its
released.

I am running the aggregate code below where I have a dataset where the row
has a field uid:

ds.groupBy(_.uid).count()
// res0: org.apache.spark.sql.Dataset[(String, Long)] = [_1: string, _2:
bigint]

This works as expected, however, attempts to run select statements after
fails:
ds.groupBy(_.uid).count().select(_._1)
// error: missing parameter type for expanded function ((x$2) => x$2._1)
ds.groupBy(_.uid).count().select(_._1)

I have tried several variants, but nothing seems to work. Below is the
equivalent Dataframe code which works as expected:
df.groupBy("uid").count().select("uid")

Thanks!
-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Undestanding Spark Rebalancing

2016-01-14 Thread Pedro Rodriguez
Hi All,

I am running a Spark program where one of my parts is using Spark as a
scheduler rather than a data management framework. That is, my job can be
described as RDD[String] where the string describes an operation to perform
which may be cheap or expensive (process an object which may have a small
or large amount of records associated with it).

Leaving things to default, I have bad job balancing. I am wondering which
approach I should take:
1. Write a partitioner which uses partitionBy to ahead of time balance
partitions by number of records each string needs
2. repartition to have many small partitions (I have ~1700 strings acting
as jobs to run, so maybe 1-5 per partition). My question here is, does
Spark re-schedule/steal jobs if there are executors/worker processes that
aren't doing any work?

The second one would be easier and since I am not shuffling much data
around would work just fine for me, but I can't seem to find out for sure
if Spark does job re-scheduling/stealing.

Thanks
-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: How to speed up MLlib LDA?

2015-09-22 Thread Pedro Rodriguez
I helped some with the LDA and worked quite a bit on a Gibbs version. I
don't know if the Gibbs version might help, but since it is not (yet) in
MLlib, Intel Analytics kindly created a spark package with their adapted
version plus a couple other LDA algorithms:
http://spark-packages.org/package/intel-analytics/TopicModeling
https://github.com/intel-analytics/TopicModeling

It might be worth trying out. Do you know what LDA algorithm VW uses?

Pedro


On Tue, Sep 22, 2015 at 1:54 AM, Marko Asplund <marko.aspl...@gmail.com>
wrote:

> Hi,
>
> I did some profiling for my LDA prototype code that requests topic
> distributions from a model.
> According to Java Mission Control more than 80 % of execution time during
> sample interval is spent in the following methods:
>
> org.apache.commons.math3.util.FastMath.log(double); count: 337; 47.07%
> org.apache.commons.math3.special.Gamma.digamma(double); count: 164; 22.91%
> org.apache.commons.math3.util.FastMath.log(double, double[]); count: 50;
> 6.98%
> java.lang.Double.valueOf(double); count: 31; 4.33%
>
> Is there any way of using the API more optimally?
> Are there any opportunities for optimising the "topicDistributions" code
> path in MLlib?
>
> My code looks like this:
>
> // executed once
> val model = LocalLDAModel.load(ctx, ModelFileName)
>
> // executed four times
> val samples = Transformers.toSparseVectors(vocabularySize,
> ctx.parallelize(Seq(input))) // fast
> model.topicDistributions(samples.zipWithIndex.map(_.swap)) // <== this
> seems to take about 4 seconds to execute
>
>
> marko
>



-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 208-340-1703
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: Re: How can I know currently supported functions in Spark SQL

2015-08-06 Thread Pedro Rodriguez
Worth noting that Spark 1.5 is extending that list of Spark SQL functions
quite a bit. Not sure where in the docs they would be yet, but the JIRA is
here: https://issues.apache.org/jira/browse/SPARK-8159

On Thu, Aug 6, 2015 at 7:27 PM, Netwaver wanglong_...@163.com wrote:

 Thanks for your kindly help






 At 2015-08-06 19:28:10, Todd Nist tsind...@gmail.com wrote:

 They are covered here in the docs:


 http://spark.apache.org/docs/1.4.1/api/scala/index.html#org.apache.spark.sql.functions$


 On Thu, Aug 6, 2015 at 5:52 AM, Netwaver wanglong_...@163.com wrote:

 Hi All,
  I am using Spark 1.4.1, and I want to know how can I find the
 complete function list supported in Spark SQL, currently I only know
 'sum','count','min','max'. Thanks a lot.








-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 208-340-1703
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: Spark Interview Questions

2015-07-29 Thread Pedro Rodriguez
You might look at the edx course on Apache Spark or ML with Spark. There
are probably some homework problems or quiz questions that might be
relevant. I haven't looked at the course myself, but thats where I would go
first.

https://www.edx.org/course/introduction-big-data-apache-spark-uc-berkeleyx-cs100-1x
https://www.edx.org/course/scalable-machine-learning-uc-berkeleyx-cs190-1x

--
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 208-340-1703
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: Spark SQL Table Caching

2015-07-22 Thread Pedro Rodriguez
I would be interested in the answer to this question, plus the relationship
between those and registerTempTable()

Pedro

On Tue, Jul 21, 2015 at 1:59 PM, Brandon White bwwintheho...@gmail.com
wrote:

 A few questions about caching a table in Spark SQL.

 1) Is there any difference between caching the dataframe and the table?

 df.cache() vs sqlContext.cacheTable(tableName)

 2) Do you need to warm up the cache before seeing the performance
 benefits? Is the cache LRU? Do you need to run some queries on the table
 before it is cached in memory?

 3) Is caching the table much faster than .saveAsTable? I am only seeing a
 10 %- 20% performance increase.




-- 
Pedro Rodriguez
UCBerkeley 2014 | Computer Science
SnowGeek http://SnowGeek.org
pedro-rodriguez.com
ski.rodrig...@gmail.com
208-340-1703


Python DataFrames, length of array

2015-07-15 Thread pedro
Based on the list of functions here:
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions

there doesn't seem to be a way to get the length of an array in a dataframe
without defining a UDF.

What I would be looking for is something like this (except length_udf would
be pyspark.sql.functions.length or something similar):

length_udf = UserDefinedFunction(len, IntegerType())
test_schema = StructType([
StructField('arr', ArrayType(IntegerType())),
StructField('letter', StringType())
])
test_df = sql.createDataFrame(sc.parallelize([
[[1, 2, 3], 'a'],
[[4, 5, 6, 7, 8], 'b']
]), test_schema)
test_df.select(length_udf(test_df.arr)).collect()

Output:
[Row(PythonUDF#len(arr)=3), Row(PythonUDF#len(arr)=5)]

Is there currently a way to accomplish this? If this doesn't exist and seems
useful, I would be happy to contribute a PR with the function.

Pedro Rodriguez



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Python-DataFrames-length-of-array-tp23868.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Python DataFrames: length of ArrayType

2015-07-15 Thread pedro
Resubmitting after fixing subscription to mailing list.

Based on the list of functions here:
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions

there doesn't seem to be a way to get the length of an array in a dataframe
without defining a UDF. 

What I would be looking for is something like this (except length_udf would
be pyspark.sql.functions.length or something similar):

length_udf = UserDefinedFunction(len, IntegerType()) 
test_schema = StructType([ 
StructField('arr', ArrayType(IntegerType())), 
StructField('letter', StringType()) 
]) 
test_df = sql.createDataFrame(sc.parallelize([ 
[[1, 2, 3], 'a'], 
[[4, 5, 6, 7, 8], 'b'] 
]), test_schema) 
test_df.select(length_udf(test_df.arr)).collect() 

Output: 
[Row(PythonUDF#len(arr)=3), Row(PythonUDF#len(arr)=5)] 

Is there currently a way to accomplish this? If this doesn't exist and seems
useful, I would be happy to contribute a PR with the function. 

Pedro Rodriguez



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Python-DataFrames-length-of-ArrayType-tp23869.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Misaligned Rows with UDF

2015-07-14 Thread pedro
Hi,

I am working at finding the root cause of a bug where rows in dataframes
seem to have misaligned data. My dataframes have two types of columns:
columns from data and columns from UDFs. I seem to be having trouble where
for a given row, the row data doesn't match the data used to compute the
UDF.

In my case, I am calculating click through rates, so the columns of interest
are items sent to users and items clicked by users. Below is an example of
the problem that I am encountering.

Column Schema: clicked_items (Array of Long), sent_items (Array of Long),
ctr (Double), str_items
Data: [[1, 2], [1, 2, 3], 1, [5] | [5]]
ctr_udf = UserDefinedFunction(lambda x, y: 1.0 * len(x) / len(y),
DoubleType())
str_udf = UserDefinedFunction(lambda x, y: {0} | {1}.format(str(x),
str(y)), StringType())

My dataframe looks something like this:

df.select(df.clicked_items, df.sent_items, ctr_udf(df.clicked_items,
df.sent_items), str_udf(df.clicked_items, df.sent_items))

As you can see above, the row data doesn't match what was input to the UDF.

To eliminate the possibility that there was an issue with UDFs, I tested the
code below and it works fine:
__
test_data_schema = StructType([
StructField('numerator', ArrayType(IntegerType())),
StructField('denominator', ArrayType(IntegerType())),
StructField('label', StringType())
])
test_data = sc.parallelize([
(range(3), range(4), 'a'),
([], range(5), 'b'),
([], range(3), 'c'),
(range(4), range(5), 'd'),
(range(3), range(4), 'e'),
(range(1), range(3), 'f')
], 3)
td_df = sql.createDataFrame(test_data, test_data_schema)
def compute_func(num, den, label):
return 1.0 * len(num) / len(den)
compute_udf = UserDefinedFunction(compute_func, DoubleType())
td_df.select(td_df.numerator, td_df.denominator, td_df.label,
compute_udf(td_df.numerator, td_df.denominator, td_df.label)).take(10)
___

Since the above works, I thought that this might be an issue with the data,
not the dataframe/udf. To test this, I wrote the data to hdfs using parquet,
then reread the data in. This fixed the issue, so I think it points to an
issue with lineage/data pipeline/hdfs. Any thoughts on this would be great.

My data roughly follows this flow
1. Read raw logs from HDFS
2. Transform raw logs into dataframe, register as temp table
3. Group data by unique identifier, reflatten the data with the identifier
as a column and additional computed values, register as temp table
4. Perform a broadcast join into a column of the table
5. Regroup the table by the prior unique identifier plus a new one, then
flatten once more (deduping records using the data from the broadcast join).
6. Do UDF calculation.

My next step will be to try extending the example I wrote above to mimic the
data flow that I have in hopes of reproducing the bug outside our codebase.
Any intuition on next step would be great.

Thanks,
Pedro Rodriguez
Trulia
CU Boulder PhD Student





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Misaligned-Rows-with-UDF-tp23837.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Check for null in PySpark DataFrame

2015-07-02 Thread Pedro Rodriguez
Thanks for the tip. Any idea why the intuitive answer doesn't work ( !=
None)? I inspected the Row columns and they do indeed have a None value. I
would suspect that somehow Python's None is translated to something in jvm
which doesn't equal to null?

I might check out the source code for a better idea as well

Pedro

On Wed, Jul 1, 2015 at 12:18 PM, Michael Armbrust mich...@databricks.com
wrote:

 There is an isNotNull function on any column.

 df._1.isNotNull

 or

 from pyspark.sql.functions import *
 col(myColumn).isNotNull

 On Wed, Jul 1, 2015 at 3:07 AM, Olivier Girardot ssab...@gmail.com
 wrote:

 I must admit I've been using the same back to SQL strategy for now :p
 So I'd be glad to have insights into that too.

 Le mar. 30 juin 2015 à 23:28, pedro ski.rodrig...@gmail.com a écrit :

 I am trying to find what is the correct way to programmatically check for
 null values for rows in a dataframe. For example, below is the code using
 pyspark and sql:

 df = sqlContext.createDataFrame(sc.parallelize([(1, None), (2, a), (3,
 b), (4, None)]))
 df.where('_2 is not null').count()

 However, this won't work
 df.where(df._2 != None).count()

 It seems there is no native Python way with DataFrames to do this, but I
 find that difficult to believe and more likely that I am missing the
 right
 way to do this.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Check-for-null-in-PySpark-DataFrame-tp23553.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





-- 
Pedro Rodriguez
UCBerkeley 2014 | Computer Science
SnowGeek http://SnowGeek.org
pedro-rodriguez.com
ski.rodrig...@gmail.com
208-340-1703


Check for null in PySpark DataFrame

2015-06-30 Thread pedro
I am trying to find what is the correct way to programmatically check for
null values for rows in a dataframe. For example, below is the code using
pyspark and sql:

df = sqlContext.createDataFrame(sc.parallelize([(1, None), (2, a), (3,
b), (4, None)]))
df.where('_2 is not null').count()

However, this won't work
df.where(df._2 != None).count()

It seems there is no native Python way with DataFrames to do this, but I
find that difficult to believe and more likely that I am missing the right
way to do this.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Check-for-null-in-PySpark-DataFrame-tp23553.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Gradient Boosting Decision Trees

2014-07-16 Thread Pedro Silva
Hi there,

I am looking for a GBM MLlib implementation. Does anyone know if there is a
plan to roll it out soon?

Thanks!
Pedro


Re: Gradient Boosting Decision Trees

2014-07-16 Thread Pedro Silva
Hi Ameet, that's great news!

Thanks,
Pedro


On Wed, Jul 16, 2014 at 9:33 AM, Ameet Talwalkar atalwal...@gmail.com
wrote:

 Hi Pedro,

 Yes, although they will probably not be included in the next release
 (since the code freeze is ~2 weeks away), GBM (and other ensembles of
 decision trees) are currently under active development.  We're hoping
 they'll make it into the subsequent release.

 -Ameet


 On Wed, Jul 16, 2014 at 9:08 AM, Pedro Silva jpedrosi...@gmail.com
 wrote:

 Hi there,

 I am looking for a GBM MLlib implementation. Does anyone know if there is
 a plan to roll it out soon?

 Thanks!
 Pedro





Variables outside of mapPartitions scope

2014-05-16 Thread pedro
I am working on some code which uses mapPartitions. Its working great, except
when I attempt to use a variable within the function passed to mapPartitions
which references something outside of the scope (for example, a variable
declared immediately before the mapPartitions call). When this happens, I
get a task not serializable error. I wanted to reference a variable which
had been broadcasted, and ready to use within that closure.

Seeing that, I attempted another solution, to store the broadcasted variable
within an object (singleton class, thing). It serialized fine, but when I
ran it on a cluster, any reference to it got a null pointer exception, my
presumption is that the workers were not getting their objects updated for
some reason, despite setting it as a broadcasted variable. My guess is that
the workers get the serialized function, but spark doesn't know to serialize
the object, including the things it reference. Thus the copied reference
becomes invalid.

What would be a good way to solve my problem? Is there a way to reference a
broadcast variable by name rather through a variable?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Variables-outside-of-mapPartitions-scope-tp5517.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Task not serializable?

2014-05-15 Thread pedro
I'me still fairly new to this, but I found problems using classes in maps if
they used instance variables in part of the map function. It seems like for
maps and such to work correctly, it needs to be purely functional
programming.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Re-Task-not-serializable-tp3507p5506.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Variables outside of mapPartitions scope

2014-05-12 Thread pedro
Right now I am not using any class variables (references to this). All my
variables are created within the scope of the method I am running.

I did more debugging and found this strange behavior.
variables here
for loop
mapPartitions call
use variables here
end mapPartitions
endfor

This will result in a serializable bug, but this won't

variables here
for loop
create new references to variables here
mapPartitions call
use new reference variables here
end mapPartitions
endfor



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Variables-outside-of-mapPartitions-scope-tp5517p5528.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Initial job has not accepted any resources

2014-05-04 Thread pedro
I have been working on a Spark program, completed it, but have spent the past
few hours trying to run on EC2 without any luck. I am hoping i can
comprehensively describe my problem and what I have done, but I am pretty
stuck.

My code uses the following lines to configure the SparkContext, which are
taken from the standalone app example found here:
https://spark.apache.org/docs/0.9.0/quick-start.html
And combined with ampcamps code found here:
http://spark-summit.org/2013/exercises/machine-learning-with-spark.html
To give the following code:
http://pastebin.com/zDYkk1T8

Launch spark cluster with:
./spark-ec2 -k plda -i ~/plda.pem -s 1 --instance-type=t1.micro
--region=us-west-2 start lda
When logged in, launch my job with sbt via this, while in my projects
directory
$/root/bin/sbt run

This results in the following log, indicating the problem in my subject
line:
http://pastebin.com/DiQCj6jQ

Following this, I got advice to set my conf/spark-env.sh so it exports
MASTER and SPARK_HOME_IP
There's an inconsistency in the way the master addresses itself. The Spark
master uses the internal (ip-*.internal) address, but the driver is trying
to connect using the external (ec2-*.compute-1.amazonaws.com) address. The
solution is to set the Spark master URL to the external address in the
spark-env.sh file.

Your conf/spark-env.sh is probably empty. It should set MASTER and
SPARK_MASTER_IP to the external URL, as the EC2 launch script does:
https://github.com/.../templ.../root/spark/conf/spark-env.sh;

My spark-env.sh looks like this:
#!/usr/bin/env bash

export MASTER=`cat /root/spark-ec2/cluster-url`
export SPARK_MASTER_IP=ec2-54-186-178-145.us-west-2.compute.amazonaws.com
export SPARK_WORKER_MEM=128m

At this point, I did a test
1. Remove my spark-env.sh variables
2. Run spark-shell
3. Run: sc.parallelize(1 to 1000).count()
4. This works as expected
5. Reset my spark-env.sh variables
6. Run prior spark-shell and commands
7. I get the same error as reported above.

Hence, it is something wrong with how I am setting my master/slave
configuration. Any help would be greatly appreciated.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Initial-job-has-not-accepted-any-resources-tp5322.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Initial job has not accepted any resources

2014-05-04 Thread pedro
Hi Jeremy,

I am running from the most recent release, 0.9. I just fixed the problem, and 
it is indeed correct setting of variables in deployment.

Once I had the cluster I wanted running, I began to suspect that master was not 
responding. So I killed a worker, then recreated it, and found it could not 
connect to master. So I killed master and remade it, then remade the worker. 
Odd things happened, but it seemed like I was on the right track.

So I stopped all, then restarted all, then tried again and it began to work. So 
now, after probably around 6 hours of debugging, I have EC2 working (yay).

Next thing that we are trying to debug is that when we build our project on 
EC2, it can’t find breeze, and when we try to include it as a spark example 
(under the correct directory), it also can’t find it. The first occurs as a 
runtime error and is a NoClassDefFoundError for breeze/linalg/DenseVector which 
makes me think that breeze isn’t on the cluster and also isn’t being built with 
our project (and sbt assembly is acting strange). The second is a compile time 
error, which is odd because other examples in the same directory use breeze and 
I don’t find anything that specifies their dependencies.

Thanks
-- 
Pedro Rodriguez
UCBerkeley 2014 | Computer Science
BSU Cryosphere Science Research
SnowGeek Founder

snowgeek.org
pedro-rodriguez.com
ski.rodrig...@gmail.com
208-340-1703

On May 4, 2014 at 6:51:56 PM, Jeremy Freeman [via Apache Spark User List] 
(ml-node+s1001560n5335...@n3.nabble.com) wrote:

Hey Pedro,

From which version of Spark were you running the spark-ec2.py script? You 
might have run into the problem described here 
(http://apache-spark-user-list.1001560.n3.nabble.com/spark-ec2-error-td5323.html),
 which Patrick just fixed up to ensure backwards compatibility.

With the bug, it would successfully complete deployment but prevent the correct 
setting of various variables, so may have caused the errors you were seeing, 
though I'm not positive.

I'd definitely try re-running the spark-ec2 script now.

-- Jeremy

If you reply to this email, your message will be added to the discussion below:
http://apache-spark-user-list.1001560.n3.nabble.com/Initial-job-has-not-accepted-any-resources-tp5322p5335.html
To unsubscribe from Initial job has not accepted any resources, click here.
NAML



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Initial-job-has-not-accepted-any-resources-tp5322p5336.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: ClassNotFoundException

2014-05-04 Thread pedro
I just ran into the same problem. I will respond if I find how to fix.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ClassNotFoundException-tp5182p5342.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Initial job has not accepted any resources

2014-05-04 Thread pedro
Since it appears breeze is going to be included by default in Spark in 1.0,
and I ran into the issue here:
http://apache-spark-user-list.1001560.n3.nabble.com/ClassNotFoundException-td5182.html

And it seems like the issues I had were recently introduced, I am cloning
spark and checking out the 1.0 branch. Maybe this makes my problem(s) worse,
but am going to give it a try. Rapidly running out of time to get our code
fully working on EC2.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Initial-job-has-not-accepted-any-resources-tp5322p5344.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.