How is the order ensured in the jdbc relation provider when inserting data from multiple executors

2016-11-21 Thread Niranda Perera
Hi,

Say, I have a table with 1 column and 1000 rows. I want to save the result
in a RDBMS table using the jdbc relation provider. So I run the following
query,

"insert into table table2 select value, count(*) from table1 group by value
order by value"

While debugging, I found that the resultant df from select value, count(*)
from table1 group by value order by value would have around 200+ partitions
and say I have 4 executors attached to my driver. So, I would have 200+
writing tasks assigned to 4 executors. I want to understand, how these
executors are able to write the data to the underlying RDBMS table of
table2 without messing up the order.

I checked the jdbc insertable relation and in jdbcUtils [1] it does the
following

df.foreachPartition { iterator =>
  savePartition(getConnection, table, iterator, rddSchema, nullTypes,
batchSize, dialect)
}

So, my understanding is, all of my 4 executors will parallely run the
savePartition function (or closure) where they do not know which one should
write data before the other!

In the savePartition method, in the comment, it says
"Saves a partition of a DataFrame to the JDBC database.  This is done in
   * a single database transaction in order to avoid repeatedly inserting
   * data as much as possible."

I want to understand, how these parallel executors save the partition
without harming the order of the results? Is it by locking the database
resource, from each executor (i.e. ex0 would first obtain a lock for the
table and write the partition0, while ex1 ... ex3 would wait till the lock
is released )?

In my experience, there is no harm done to the order of the results at the
end of the day!

Would like to hear from you guys! :-)

[1]
https://github.com/apache/spark/blob/v1.6.2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L277

-- 
Niranda Perera
@n1r44 <https://twitter.com/N1R44>
+94 71 554 8430
https://www.linkedin.com/in/niranda
https://pythagoreanscript.wordpress.com/


SQL Syntax for pivots

2016-11-16 Thread Niranda Perera
Hi all,

I see that the pivot functionality is being added to spark DFs from 1.6
onward.

I am interested to see if there is a Spark SQL syntax available for
pivoting? example: Slide 11 of [1]

*pandas (Python) - pivot_table(df, values='D', index=['A', 'B'],
columns=['C'], aggfunc=np.sum) *

*reshape2 (R) - dcast(df, A + B ~ C, sum) *

*Oracle 11g - SELECT * FROM df PIVOT (sum(D) FOR C IN ('small', 'large')) p*


Best

[1]
http://www.slideshare.net/SparkSummit/pivoting-data-with-sparksql-by-andrew-ray

-- 
Niranda Perera
@n1r44 <https://twitter.com/N1R44>
+94 71 554 8430
https://www.linkedin.com/in/niranda
https://pythagoreanscript.wordpress.com/


Executors go OOM when using JDBC relation provider

2016-08-16 Thread Niranda Perera
Hi,

I have been using a standalone spark cluster (v1.4.x) with the following
configurations. 2 nodes with 1 core each and 4g memory workers in each
node. So I had 2 executors for my app with 2 cores and 8g memory in total.

I have a table in a MySQL database which has around 10million rows. It has
around 10 columns with integer, string and date types. (say table1 with
column c1 to c10)

I run the following query,


   1. select count(*) from table1 - completes within seconds
   2. select c1, count(*) from table1 group by c1 - complete within seconds
   but more than the 1st query
   3. select c1, c2, count(*) from table1 group by c1, c2 - same behavior
   as Q2
   4. select c1, c2, c3, c4, count(*) from table1 group by c1, c2, c3, c4 -
   took a few minutes to finish
   5. select c1, c2, c3, c4, count(*) from table1 group by c1, c2, c3, c4, *c5
   *-* Executor goes OOM within a few minutes!!! *(this has one more column
   for group by statement)

It seemed like the more the group by columns added, the time grows
*exponentially!* Is this the expected behavior?
I was monitoring the MySQL process list, and observed that the data was
transmitted to the executors within a few seconds without an issue.
NOTE: I am not using any partition columns here. So, AFAIU essentially
there's only a single partition for the JDBC RDD

I ran the same query (query 5) in MySQL console and I was able to get a
result with in 3 minutes!!! So, I'm wondering what could have been the
issue here. This OOM exception is actually a blocker!

Are there any other tuning I should do? And it certainly worries me to see
that MySQL gave a significantly fast result than Spark here!

Look forward to hearing from you!

Best

-- 
Niranda Perera
@n1r44 <https://twitter.com/N1R44>
+94 71 554 8430
https://www.linkedin.com/in/niranda
https://pythagoreanscript.wordpress.com/


Why isnt spark-yarn module is excluded from the spark parent pom?

2016-07-12 Thread Niranda Perera
Hi guys,

I could not find the spark-yarn module in the spark parent pom? Is there
any particular reason why this has been excluded?

Best
-- 
Niranda
@n1r44 
+94-71-554-8430
https://pythagoreanscript.wordpress.com/


Re: Latest spark release in the 1.4 branch

2016-07-07 Thread Niranda Perera
Hi Mark,

I agree. :-) We already have a product released with Spark 1.4.1 with some
custom extensions and now we are doing a patch release. We will update
Spark to the latest 2.x version in the next release.

Best

On Thu, Jul 7, 2016 at 1:12 PM, Mark Hamstra <m...@clearstorydata.com>
wrote:

> You've got to satisfy my curiosity, though.  Why would you want to run
> such a badly out-of-date version in production?  I mean, 2.0.0 is just
> about ready for release, and lagging three full releases behind, with one
> of them being a major version release, is a long way from where Spark is
> now.
>
> On Wed, Jul 6, 2016 at 11:12 PM, Niranda Perera <niranda.per...@gmail.com>
> wrote:
>
>> Thanks Reynold
>>
>> On Thu, Jul 7, 2016 at 11:40 AM, Reynold Xin <r...@databricks.com> wrote:
>>
>>> Yes definitely.
>>>
>>>
>>> On Wed, Jul 6, 2016 at 11:08 PM, Niranda Perera <
>>> niranda.per...@gmail.com> wrote:
>>>
>>>> Thanks Reynold for the prompt response. Do you think we could use a
>>>> 1.4-branch latest build in a production environment?
>>>>
>>>>
>>>>
>>>> On Thu, Jul 7, 2016 at 11:33 AM, Reynold Xin <r...@databricks.com>
>>>> wrote:
>>>>
>>>>> I think last time I tried I had some trouble releasing it because the
>>>>> release scripts no longer work with branch-1.4. You can build from the
>>>>> branch yourself, but it might be better to upgrade to the later versions.
>>>>>
>>>>> On Wed, Jul 6, 2016 at 11:02 PM, Niranda Perera <
>>>>> niranda.per...@gmail.com> wrote:
>>>>>
>>>>>> Hi guys,
>>>>>>
>>>>>> May I know if you have halted development in the Spark 1.4 branch? I
>>>>>> see that there is a release tag for 1.4.2 but it was never released.
>>>>>>
>>>>>> Can we expect a 1.4.x bug fixing release anytime soon?
>>>>>>
>>>>>> Best
>>>>>> --
>>>>>> Niranda
>>>>>> @n1r44 <https://twitter.com/N1R44>
>>>>>> +94-71-554-8430
>>>>>> https://pythagoreanscript.wordpress.com/
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Niranda
>>>> @n1r44 <https://twitter.com/N1R44>
>>>> +94-71-554-8430
>>>> https://pythagoreanscript.wordpress.com/
>>>>
>>>
>>>
>>
>>
>> --
>> Niranda
>> @n1r44 <https://twitter.com/N1R44>
>> +94-71-554-8430
>> https://pythagoreanscript.wordpress.com/
>>
>
>


-- 
Niranda
@n1r44 <https://twitter.com/N1R44>
+94-71-554-8430
https://pythagoreanscript.wordpress.com/


Re: Latest spark release in the 1.4 branch

2016-07-07 Thread Niranda Perera
Thanks Reynold

On Thu, Jul 7, 2016 at 11:40 AM, Reynold Xin <r...@databricks.com> wrote:

> Yes definitely.
>
>
> On Wed, Jul 6, 2016 at 11:08 PM, Niranda Perera <niranda.per...@gmail.com>
> wrote:
>
>> Thanks Reynold for the prompt response. Do you think we could use a
>> 1.4-branch latest build in a production environment?
>>
>>
>>
>> On Thu, Jul 7, 2016 at 11:33 AM, Reynold Xin <r...@databricks.com> wrote:
>>
>>> I think last time I tried I had some trouble releasing it because the
>>> release scripts no longer work with branch-1.4. You can build from the
>>> branch yourself, but it might be better to upgrade to the later versions.
>>>
>>> On Wed, Jul 6, 2016 at 11:02 PM, Niranda Perera <
>>> niranda.per...@gmail.com> wrote:
>>>
>>>> Hi guys,
>>>>
>>>> May I know if you have halted development in the Spark 1.4 branch? I
>>>> see that there is a release tag for 1.4.2 but it was never released.
>>>>
>>>> Can we expect a 1.4.x bug fixing release anytime soon?
>>>>
>>>> Best
>>>> --
>>>> Niranda
>>>> @n1r44 <https://twitter.com/N1R44>
>>>> +94-71-554-8430
>>>> https://pythagoreanscript.wordpress.com/
>>>>
>>>
>>>
>>
>>
>> --
>> Niranda
>> @n1r44 <https://twitter.com/N1R44>
>> +94-71-554-8430
>> https://pythagoreanscript.wordpress.com/
>>
>
>


-- 
Niranda
@n1r44 <https://twitter.com/N1R44>
+94-71-554-8430
https://pythagoreanscript.wordpress.com/


Re: Latest spark release in the 1.4 branch

2016-07-07 Thread Niranda Perera
Thanks Reynold for the prompt response. Do you think we could use a
1.4-branch latest build in a production environment?



On Thu, Jul 7, 2016 at 11:33 AM, Reynold Xin <r...@databricks.com> wrote:

> I think last time I tried I had some trouble releasing it because the
> release scripts no longer work with branch-1.4. You can build from the
> branch yourself, but it might be better to upgrade to the later versions.
>
> On Wed, Jul 6, 2016 at 11:02 PM, Niranda Perera <niranda.per...@gmail.com>
> wrote:
>
>> Hi guys,
>>
>> May I know if you have halted development in the Spark 1.4 branch? I see
>> that there is a release tag for 1.4.2 but it was never released.
>>
>> Can we expect a 1.4.x bug fixing release anytime soon?
>>
>> Best
>> --
>> Niranda
>> @n1r44 <https://twitter.com/N1R44>
>> +94-71-554-8430
>> https://pythagoreanscript.wordpress.com/
>>
>
>


-- 
Niranda
@n1r44 <https://twitter.com/N1R44>
+94-71-554-8430
https://pythagoreanscript.wordpress.com/


Latest spark release in the 1.4 branch

2016-07-07 Thread Niranda Perera
Hi guys,

May I know if you have halted development in the Spark 1.4 branch? I see
that there is a release tag for 1.4.2 but it was never released.

Can we expect a 1.4.x bug fixing release anytime soon?

Best
-- 
Niranda
@n1r44 
+94-71-554-8430
https://pythagoreanscript.wordpress.com/


Re: Possible deadlock in registering applications in the recovery mode

2016-04-22 Thread Niranda Perera
Hi guys,

any update on this?

Best

On Wed, Apr 20, 2016 at 3:00 AM, Niranda Perera <niranda.per...@gmail.com>
wrote:

> Hi Reynold,
>
> I have created a JIRA for this [1]. I have also created a PR for the same
> issue [2].
>
> Would be very grateful if you could look into this, because this is a
> blocker in our spark deployment, which uses number of spark custom
> extension.
>
> thanks
> best
>
> [1] https://issues.apache.org/jira/browse/SPARK-14736
> [2] https://github.com/apache/spark/pull/12506
>
> On Mon, Apr 18, 2016 at 9:02 AM, Reynold Xin <r...@databricks.com> wrote:
>
>> I haven't looked closely at this, but I think your proposal makes sense.
>>
>>
>> On Sun, Apr 17, 2016 at 6:40 PM, Niranda Perera <niranda.per...@gmail.com
>> > wrote:
>>
>>> Hi guys,
>>>
>>> Any update on this?
>>>
>>> Best
>>>
>>> On Tue, Apr 12, 2016 at 12:46 PM, Niranda Perera <
>>> niranda.per...@gmail.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I have encountered a small issue in the standalone recovery mode.
>>>>
>>>> Let's say there was an application A running in the cluster. Due to
>>>> some issue, the entire cluster, together with the application A goes down.
>>>>
>>>> Then later on, cluster comes back online, and the master then goes into
>>>> the 'recovering' mode, because it sees some apps, workers and drivers have
>>>> already been in the cluster from Persistence Engine. While in the recovery
>>>> process, the application comes back online, but now it would have a
>>>> different ID, let's say B.
>>>>
>>>> But then, as per the master, application registration logic, this
>>>> application B will NOT be added to the 'waitingApps' with the message
>>>> ""Attempted to re-register application at same address". [1]
>>>>
>>>>   private def registerApplication(app: ApplicationInfo): Unit = {
>>>> val appAddress = app.driver.address
>>>> if (addressToApp.contains(appAddress)) {
>>>>   logInfo("Attempted to re-register application at same address: "
>>>> + appAddress)
>>>>   return
>>>> }
>>>>
>>>>
>>>> The problem here is, master is trying to recover application A, which
>>>> is not in there anymore. Therefore after the recovery process, app A will
>>>> be dropped. However app A's successor, app B was also omitted from the
>>>> 'waitingApps' list because it had the same address as App A previously.
>>>>
>>>> This creates a deadlock in the cluster, app A nor app B is available in
>>>> the cluster.
>>>>
>>>> When the master is in the RECOVERING mode, shouldn't it add all the
>>>> registering apps to a list first, and then after the recovery is completed
>>>> (once the unsuccessful recoveries are removed), deploy the apps which are
>>>> new?
>>>>
>>>> This would sort this deadlock IMO?
>>>>
>>>> look forward to hearing from you.
>>>>
>>>> best
>>>>
>>>> [1]
>>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L834
>>>>
>>>> --
>>>> Niranda
>>>> @n1r44 <https://twitter.com/N1R44>
>>>> +94-71-554-8430
>>>> https://pythagoreanscript.wordpress.com/
>>>>
>>>
>>>
>>>
>>> --
>>> Niranda
>>> @n1r44 <https://twitter.com/N1R44>
>>> +94-71-554-8430
>>> https://pythagoreanscript.wordpress.com/
>>>
>>
>>
>
>
> --
> Niranda
> @n1r44 <https://twitter.com/N1R44>
> +94-71-554-8430
> https://pythagoreanscript.wordpress.com/
>



-- 
Niranda
@n1r44 <https://twitter.com/N1R44>
+94-71-554-8430
https://pythagoreanscript.wordpress.com/


Re: Possible deadlock in registering applications in the recovery mode

2016-04-19 Thread Niranda Perera
Hi Reynold,

I have created a JIRA for this [1]. I have also created a PR for the same
issue [2].

Would be very grateful if you could look into this, because this is a
blocker in our spark deployment, which uses number of spark custom
extension.

thanks
best

[1] https://issues.apache.org/jira/browse/SPARK-14736
[2] https://github.com/apache/spark/pull/12506

On Mon, Apr 18, 2016 at 9:02 AM, Reynold Xin <r...@databricks.com> wrote:

> I haven't looked closely at this, but I think your proposal makes sense.
>
>
> On Sun, Apr 17, 2016 at 6:40 PM, Niranda Perera <niranda.per...@gmail.com>
> wrote:
>
>> Hi guys,
>>
>> Any update on this?
>>
>> Best
>>
>> On Tue, Apr 12, 2016 at 12:46 PM, Niranda Perera <
>> niranda.per...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> I have encountered a small issue in the standalone recovery mode.
>>>
>>> Let's say there was an application A running in the cluster. Due to some
>>> issue, the entire cluster, together with the application A goes down.
>>>
>>> Then later on, cluster comes back online, and the master then goes into
>>> the 'recovering' mode, because it sees some apps, workers and drivers have
>>> already been in the cluster from Persistence Engine. While in the recovery
>>> process, the application comes back online, but now it would have a
>>> different ID, let's say B.
>>>
>>> But then, as per the master, application registration logic, this
>>> application B will NOT be added to the 'waitingApps' with the message
>>> ""Attempted to re-register application at same address". [1]
>>>
>>>   private def registerApplication(app: ApplicationInfo): Unit = {
>>> val appAddress = app.driver.address
>>> if (addressToApp.contains(appAddress)) {
>>>   logInfo("Attempted to re-register application at same address: " +
>>> appAddress)
>>>   return
>>> }
>>>
>>>
>>> The problem here is, master is trying to recover application A, which is
>>> not in there anymore. Therefore after the recovery process, app A will be
>>> dropped. However app A's successor, app B was also omitted from the
>>> 'waitingApps' list because it had the same address as App A previously.
>>>
>>> This creates a deadlock in the cluster, app A nor app B is available in
>>> the cluster.
>>>
>>> When the master is in the RECOVERING mode, shouldn't it add all the
>>> registering apps to a list first, and then after the recovery is completed
>>> (once the unsuccessful recoveries are removed), deploy the apps which are
>>> new?
>>>
>>> This would sort this deadlock IMO?
>>>
>>> look forward to hearing from you.
>>>
>>> best
>>>
>>> [1]
>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L834
>>>
>>> --
>>> Niranda
>>> @n1r44 <https://twitter.com/N1R44>
>>> +94-71-554-8430
>>> https://pythagoreanscript.wordpress.com/
>>>
>>
>>
>>
>> --
>> Niranda
>> @n1r44 <https://twitter.com/N1R44>
>> +94-71-554-8430
>> https://pythagoreanscript.wordpress.com/
>>
>
>


-- 
Niranda
@n1r44 <https://twitter.com/N1R44>
+94-71-554-8430
https://pythagoreanscript.wordpress.com/


Re: Possible deadlock in registering applications in the recovery mode

2016-04-17 Thread Niranda Perera
Hi guys,

Any update on this?

Best

On Tue, Apr 12, 2016 at 12:46 PM, Niranda Perera <niranda.per...@gmail.com>
wrote:

> Hi all,
>
> I have encountered a small issue in the standalone recovery mode.
>
> Let's say there was an application A running in the cluster. Due to some
> issue, the entire cluster, together with the application A goes down.
>
> Then later on, cluster comes back online, and the master then goes into
> the 'recovering' mode, because it sees some apps, workers and drivers have
> already been in the cluster from Persistence Engine. While in the recovery
> process, the application comes back online, but now it would have a
> different ID, let's say B.
>
> But then, as per the master, application registration logic, this
> application B will NOT be added to the 'waitingApps' with the message
> ""Attempted to re-register application at same address". [1]
>
>   private def registerApplication(app: ApplicationInfo): Unit = {
> val appAddress = app.driver.address
> if (addressToApp.contains(appAddress)) {
>   logInfo("Attempted to re-register application at same address: " +
> appAddress)
>   return
> }
>
>
> The problem here is, master is trying to recover application A, which is
> not in there anymore. Therefore after the recovery process, app A will be
> dropped. However app A's successor, app B was also omitted from the
> 'waitingApps' list because it had the same address as App A previously.
>
> This creates a deadlock in the cluster, app A nor app B is available in
> the cluster.
>
> When the master is in the RECOVERING mode, shouldn't it add all the
> registering apps to a list first, and then after the recovery is completed
> (once the unsuccessful recoveries are removed), deploy the apps which are
> new?
>
> This would sort this deadlock IMO?
>
> look forward to hearing from you.
>
> best
>
> [1]
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L834
>
> --
> Niranda
> @n1r44 <https://twitter.com/N1R44>
> +94-71-554-8430
> https://pythagoreanscript.wordpress.com/
>



-- 
Niranda
@n1r44 <https://twitter.com/N1R44>
+94-71-554-8430
https://pythagoreanscript.wordpress.com/


Possible deadlock in registering applications in the recovery mode

2016-04-12 Thread Niranda Perera
Hi all,

I have encountered a small issue in the standalone recovery mode.

Let's say there was an application A running in the cluster. Due to some
issue, the entire cluster, together with the application A goes down.

Then later on, cluster comes back online, and the master then goes into the
'recovering' mode, because it sees some apps, workers and drivers have
already been in the cluster from Persistence Engine. While in the recovery
process, the application comes back online, but now it would have a
different ID, let's say B.

But then, as per the master, application registration logic, this
application B will NOT be added to the 'waitingApps' with the message
""Attempted to re-register application at same address". [1]

  private def registerApplication(app: ApplicationInfo): Unit = {
val appAddress = app.driver.address
if (addressToApp.contains(appAddress)) {
  logInfo("Attempted to re-register application at same address: " +
appAddress)
  return
}


The problem here is, master is trying to recover application A, which is
not in there anymore. Therefore after the recovery process, app A will be
dropped. However app A's successor, app B was also omitted from the
'waitingApps' list because it had the same address as App A previously.

This creates a deadlock in the cluster, app A nor app B is available in the
cluster.

When the master is in the RECOVERING mode, shouldn't it add all the
registering apps to a list first, and then after the recovery is completed
(once the unsuccessful recoveries are removed), deploy the apps which are
new?

This would sort this deadlock IMO?

look forward to hearing from you.

best

[1]
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L834

-- 
Niranda
@n1r44 
+94-71-554-8430
https://pythagoreanscript.wordpress.com/


Control the stdout and stderr streams in a executor JVM

2016-02-28 Thread Niranda Perera
Hi all,

Is there any possibility to control the stdout and stderr streams in an
executor JVM?

I understand that there are some configurations provided from the spark
conf as follows
spark.executor.logs.rolling.maxRetainedFiles
spark.executor.logs.rolling.maxSize
spark.executor.logs.rolling.strategy
spark.executor.logs.rolling.time.interval

But is there a possibility to have more fine grained control over these,
like we do in a log4j appender, with a property file?

Rgds
-- 
Niranda
@n1r44 
+94-71-554-8430
https://pythagoreanscript.wordpress.com/


Re: spark job scheduling

2016-01-27 Thread Niranda Perera
Sorry I have made typos. let me rephrase

1. As I understand, the smallest unit of work an executor can perform, is a
'task'. In the 'FAIR' scheduler mode, let's say a job is submitted to the
spark ctx which has a considerable amount of work to do in a single task.
While such a 'big' task is running, can we still submit another smaller job
(from a separate thread) and get it done? or does that smaller job has to
wait till the bigger task finishes and the resources are freed from the
executor?
(essentially, what I'm asking is, in the FAIR scheduler mode, jobs are
scheduled fairly, but at the task granularity they are still FIFO?)

2. When a job is submitted without setting a scheduler pool, the 'default'
scheduler pool is assigned to it, which employs FIFO scheduling. but what
happens when we have the spark.scheduler.mode as FAIR, and if I submit jobs
without specifying a scheduler pool (which has FAIR scheduling)? would the
jobs still run in FIFO mode with the default pool?
essentially, for us to really set FAIR scheduling, do we have to assign a
FAIR scheduler pool also to the job?

On Thu, Jan 28, 2016 at 8:47 AM, Chayapan Khannabha <chaya...@gmail.com>
wrote:

> I think the smallest unit of work is a "Task", and an "Executor" is
> responsible for getting the work done? Would like to understand more about
> the scheduling system too. Scheduling strategy like FAIR or FIFO do have
> significant impact on a Spark cluster architecture design decision.
>
> Best,
>
> Chayapan (A)
>
> On Thu, Jan 28, 2016 at 10:07 AM, Niranda Perera <niranda.per...@gmail.com
> > wrote:
>
>> hi all,
>>
>> I have a few questions on spark job scheduling.
>>
>> 1. As I understand, the smallest unit of work an executor can perform. In
>> the 'fair' scheduler mode, let's say  a job is submitted to the spark ctx
>> which has a considerable amount of work to do in a task. While such a 'big'
>> task is running, can we still submit another smaller job (from a separate
>> thread) and get it done? or does that smaller job has to wait till the
>> bigger task finishes and the resources are freed from the executor?
>>
>> 2. When a job is submitted without setting a scheduler pool, the default
>> scheduler pool is assigned to it, which employs FIFO scheduling. but what
>> happens when we have the spark.scheduler.mode as FAIR, and if I submit jobs
>> without specifying a scheduler pool (which has FAIR scheduling)? would the
>> jobs still run in FIFO mode with the default pool?
>> essentially, for us to really set FAIR scheduling, do we have to assign a
>> FAIR scheduler pool?
>>
>> best
>>
>> --
>> Niranda
>> @n1r44 <https://twitter.com/N1R44>
>> +94-71-554-8430
>> https://pythagoreanscript.wordpress.com/
>>
>
>


-- 
Niranda
@n1r44 <https://twitter.com/N1R44>
+94-71-554-8430
https://pythagoreanscript.wordpress.com/


spark job scheduling

2016-01-27 Thread Niranda Perera
hi all,

I have a few questions on spark job scheduling.

1. As I understand, the smallest unit of work an executor can perform. In
the 'fair' scheduler mode, let's say  a job is submitted to the spark ctx
which has a considerable amount of work to do in a task. While such a 'big'
task is running, can we still submit another smaller job (from a separate
thread) and get it done? or does that smaller job has to wait till the
bigger task finishes and the resources are freed from the executor?

2. When a job is submitted without setting a scheduler pool, the default
scheduler pool is assigned to it, which employs FIFO scheduling. but what
happens when we have the spark.scheduler.mode as FAIR, and if I submit jobs
without specifying a scheduler pool (which has FAIR scheduling)? would the
jobs still run in FIFO mode with the default pool?
essentially, for us to really set FAIR scheduling, do we have to assign a
FAIR scheduler pool?

best

-- 
Niranda
@n1r44 
+94-71-554-8430
https://pythagoreanscript.wordpress.com/


taking the heap dump when an executor goes OOM

2015-10-11 Thread Niranda Perera
Hi all,

is there a way for me to get the heap-dump hprof of an executor jvm, when
it goes out of memory?

is this currently supported or do I have to change some configurations?

cheers

-- 
Niranda
@n1r44 
+94-71-554-8430
https://pythagoreanscript.wordpress.com/


passing a AbstractFunction1 to sparkContext().runJob instead of a Closure

2015-10-09 Thread Niranda Perera
hi all,

I want to run a job in the spark context and since I am running the system
in the java environment, I can not use a closure in
the sparkContext().runJob. Instead, I am passing an AbstractFunction1
extension.

while I get the jobs run without an issue, I constantly get the following
WARN message

TID: [-1234] [] [2015-10-06 04:39:43,387]  WARN
{org.apache.spark.util.ClosureCleaner} -  Expected a closure; got
org.wso2.carbon.analytics.spark.core.sources.AnalyticsWritingFunction
{org.apache.spark.util.ClosureCleaner}


I want to know what are the implications of this approach?
could this WARN cause issues in the functionality later on?

rgds
-- 
Niranda
@n1r44 
+94-71-554-8430
https://pythagoreanscript.wordpress.com/


adding jars to the classpath with the relative path to spark home

2015-09-08 Thread Niranda Perera
Hi,

is it possible to add jars to the spark executor/  driver classpath with
the relative path of the jar (relative to the spark home)?
I need to set the following settings in the spark conf
- spark.driver.extraClassPath
- spark.executor.extraClassPath

the reason why I need to use the relative path is, if not, if we have a
spark cluster, all the jars needs to be kept in the same folder path.

I know we can pass the jars using the --jars options. but I'd rather prefer
this option.

cheers
-- 
Niranda
@n1r44 
https://pythagoreanscript.wordpress.com/


Re: taking an n number of rows from and RDD starting from an index

2015-09-02 Thread Niranda Perera
Hi all,

thank you for your response.

after taking a look at the implementations of rdd.collect(), I thought of
using the rdd.runJob(...) method .

for (int i = 0; i < dataFrame.rdd().partitions().length; i++) {
dataFrame.sqlContext().sparkContext().runJob(data.rdd(),
some function, { i } , false, ClassTag$.MODULE$.Unit());
}

this iterates through the partitions of the dataframe.

I would like to know if this is an accepted way of iterating through
dataFrame partitions while conserving the order of rows encapsulated by the
dataframe?

cheers


On Wed, Sep 2, 2015 at 12:33 PM, Juan Rodríguez Hortalá <
juan.rodriguez.hort...@gmail.com> wrote:

> Hi,
>
> Maybe you could use zipWithIndex and filter to skip the first elements.
> For example starting from
>
> scala> sc.parallelize(100 to 120, 4).zipWithIndex.collect
> res12: Array[(Int, Long)] = Array((100,0), (101,1), (102,2), (103,3),
> (104,4), (105,5), (106,6), (107,7), (108,8), (109,9), (110,10), (111,11),
> (112,12), (113,13), (114,14), (115,15), (116,16), (117,17), (118,18),
> (119,19), (120,20))
>
> we can get the 3 first elements starting from the 4th (counting from 0) as
>
> scala> sc.parallelize(100 to 120, 4).zipWithIndex.filter(_._2 >=4).take(3)
> res14: Array[(Int, Long)] = Array((104,4), (105,5), (106,6))
>
> Hope that helps
>
>
> 2015-09-02 8:52 GMT+02:00 Hemant Bhanawat <hemant9...@gmail.com>:
>
>> I think rdd.toLocalIterator is what you want. But it will keep one
>> partition's data in-memory.
>>
>> On Wed, Sep 2, 2015 at 10:05 AM, Niranda Perera <niranda.per...@gmail.com
>> > wrote:
>>
>>> Hi all,
>>>
>>> I have a large set of data which would not fit into the memory. So, I
>>> wan to take n number of data from the RDD given a particular index. for an
>>> example, take 1000 rows starting from the index 1001.
>>>
>>> I see that there is a  take(num: Int): Array[T] method in the RDD, but
>>> it only returns the 'first n number of rows'.
>>>
>>> the simplest use case of this, requirement is, say, I write a custom
>>> relation provider with a custom relation extending the InsertableRelation.
>>>
>>> say I submit this query,
>>> "insert into table abc select * from xyz sort by x asc"
>>>
>>> in my custom relation, I have implemented the def insert(data:
>>> DataFrame, overwrite: Boolean): Unit
>>> method. here, since the data is large, I can not call methods such as
>>> DataFrame.collect(). Instead, I could do, DataFrame.foreachpartition(...).
>>> As you could see, the resultant DF from the "select * from xyz sort by x
>>> asc" is sorted, and if I sun, foreachpartition on that DF and implement the
>>> insert method, this sorted order would be affected, since the inserting
>>> operation would be done in parallel in each partition.
>>>
>>> in order to handle this, my initial idea was to take rows from the RDD
>>> in batches and do the insert operation, and for that I was looking for a
>>> method to take n number of rows starting from a given index.
>>>
>>> is there any better way to handle this, in RDDs?
>>>
>>> your assistance in this regard is highly appreciated.
>>>
>>> cheers
>>>
>>> --
>>> Niranda
>>> @n1r44 <https://twitter.com/N1R44>
>>> https://pythagoreanscript.wordpress.com/
>>>
>>
>>
>


-- 
Niranda
@n1r44 <https://twitter.com/N1R44>
https://pythagoreanscript.wordpress.com/


Spark SQL sort by and collect by in multiple partitions

2015-09-02 Thread Niranda Perera
Hi all,

I have been using sort by and order by in spark sql and I observed the
following

when using SORT BY and collect results, the results are getting sorted
partition by partition.
example:
if we have 1, 2, ... , 12 and 4 partitions and I want to sort it in
descending order,
partition 0 (p0) would have 12, 8, 4
p1 = 11, 7, 3
p2 = 10, 6, 2
p3 = 9, 5, 1

so collect() would return 12, 8, 4, 11, 7, 3, 10, 6, 2, 9, 5, 1

BUT when I use ORDER BY and collect results
p0 = 12, 11, 10
p1 =  9, 8, 7
.
so collect() would return 12, 11, .., 1 which is the desirable result.

is this the intended behavior of SORT BY and ORDER BY or is there something
I'm missing?

cheers

-- 
Niranda
@n1r44 
https://pythagoreanscript.wordpress.com/


taking an n number of rows from and RDD starting from an index

2015-09-01 Thread Niranda Perera
Hi all,

I have a large set of data which would not fit into the memory. So, I wan
to take n number of data from the RDD given a particular index. for an
example, take 1000 rows starting from the index 1001.

I see that there is a  take(num: Int): Array[T] method in the RDD, but it
only returns the 'first n number of rows'.

the simplest use case of this, requirement is, say, I write a custom
relation provider with a custom relation extending the InsertableRelation.

say I submit this query,
"insert into table abc select * from xyz sort by x asc"

in my custom relation, I have implemented the def insert(data: DataFrame,
overwrite: Boolean): Unit
method. here, since the data is large, I can not call methods such as
DataFrame.collect(). Instead, I could do, DataFrame.foreachpartition(...).
As you could see, the resultant DF from the "select * from xyz sort by x
asc" is sorted, and if I sun, foreachpartition on that DF and implement the
insert method, this sorted order would be affected, since the inserting
operation would be done in parallel in each partition.

in order to handle this, my initial idea was to take rows from the RDD in
batches and do the insert operation, and for that I was looking for a
method to take n number of rows starting from a given index.

is there any better way to handle this, in RDDs?

your assistance in this regard is highly appreciated.

cheers

-- 
Niranda
@n1r44 
https://pythagoreanscript.wordpress.com/


dynamically update the master list of a worker or a spark context

2015-07-27 Thread Niranda Perera
Hi all,

I have been developing a custom recovery implementation for spark masters
and workers using hazlecast clustering.

in the Spark worker code [1], we see that a list of masters needs to be
provided at the worker start up, in order to achieve high availability.
this effectively means that one should know the urls of possible masters,
before spawning a worker. same applies to the spark context (Pls correct me
if I'm wrong)

In our implementation we are planning to add masters dynamically. for an
example, say the elected leader goes down, then we would spawn another
master in the cluster and all the workers connected to the previous master
should connect to the newly spawned master. to do this, we need provision
to dynamically update the master list of an already spawned worker.

can this be achieved as of the current spark implementation?

rgds

[1]
https://github.com/apache/spark/blob/v1.4.0/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala#L538

-- 
Niranda
@n1r44 https://twitter.com/N1R44
https://pythagoreanscript.wordpress.com/


databases currently supported by Spark SQL JDBC

2015-07-09 Thread Niranda Perera
Hi,

I'm planning to use Spark SQL JDBC datasource provider in various RDBMS
databases.

what are the databases currently supported by Spark JDBC relation provider?

rgds

-- 
Niranda
@n1r44 https://twitter.com/N1R44
https://pythagoreanscript.wordpress.com/


Re: Error in invoking a custom StandaloneRecoveryModeFactory in java env (Spark v1.3.0)

2015-07-05 Thread Niranda Perera
Hi Josh,

I tried using the spark 1.4.0 upgrade.

here is the class I'm trying to use

package org.wso2.carbon.analytics.spark.core.util.master

import akka.serialization.Serialization
import org.apache.spark.SparkConf
import org.apache.spark.deploy.master._

class AnalyticsRecoveryModeFactoryScala(conf: SparkConf, serializer:
Serialization)
  extends StandaloneRecoveryModeFactory(conf, serializer) {

  override def createPersistenceEngine(): PersistenceEngine = new
  AnalyticsPersistenceEngine(conf, serializer)

  override def createLeaderElectionAgent(master: LeaderElectable):
LeaderElectionAgent = new
  AnalyticsLeaderElectionAgent(master)
}

object AnalyticsRecoveryModeFactoryScala {

}

when I invoke this factory from the master, I get a similar error as before

[2015-07-05 17:06:55,384] ERROR {akka.actor.OneForOneStrategy} -
 
org.wso2.carbon.analytics.spark.core.util.master.AnalyticsRecoveryModeFactory.init(org.apache.spark.SparkConf,
akka.serialization.Serialization)
akka.actor.ActorInitializationException: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:164)
at akka.actor.ActorCell.create(ActorCell.scala:596)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NoSuchMethodException:
org.wso2.carbon.analytics.spark.core.util.master.AnalyticsRecoveryModeFactory.init(org.apache.spark.SparkConf,
akka.serialization.Serialization)
at java.lang.Class.getConstructor0(Class.java:2810)
at java.lang.Class.getConstructor(Class.java:1718)
at org.apache.spark.deploy.master.Master.preStart(Master.scala:168)
at akka.actor.Actor$class.aroundPreStart(Actor.scala:470)
at org.apache.spark.deploy.master.Master.aroundPreStart(Master.scala:52)
at akka.actor.ActorCell.create(ActorCell.scala:580)
... 9 more


what could be the reason for this?

rgds

On Thu, Jun 25, 2015 at 11:42 AM, Niranda Perera niranda.per...@gmail.com
wrote:

 thanks Josh.

 this looks very similar to my problem.

 On Thu, Jun 25, 2015 at 11:32 AM, Josh Rosen rosenvi...@gmail.com wrote:

 This sounds like https://issues.apache.org/jira/browse/SPARK-7436, which
 has been fixed in Spark 1.4+ and in branch-1.3 (for Spark 1.3.2).

 On Wed, Jun 24, 2015 at 10:57 PM, Niranda Perera 
 niranda.per...@gmail.com wrote:

 Hi all,

 I'm trying to implement a custom StandaloneRecoveryModeFactory in the
 Java environment. Pls find the implementation here. [1] . I'm new to Scala,
 hence I'm trying to use Java environment as much as possible.

 when I start a master with spark.deploy.recoveryMode.factory property to
 be CUSTOM, I encounter a NoSuchMethodException for my custom class's
 constructor.
 it has the following constructor.

  public AnalyticsStandaloneRecoveryModeFactory(SparkConf conf,
 Serialization serializer)

 but from the Master, it looks for a constructor for,
 org.wso2.carbon.analytics.spark.core.util.master.AnalyticsStandaloneRecoveryModeFactory.init(org.apache.spark.SparkConf,
 akka.serialization.Serialization$)

 I see in the Spark source code for Master, that it uses reflection to
 get the custom recovery mode factory class.

 case CUSTOM =
 val clazz =
 Class.forName(conf.get(spark.deploy.recoveryMode.factory))
 val factory = clazz.getConstructor(conf.getClass,
 Serialization.getClass)
   .newInstance(conf, SerializationExtension(context.system))
   .asInstanceOf[StandaloneRecoveryModeFactory]
 (factory.createPersistenceEngine(),
 factory.createLeaderElectionAgent(this))

 here, Serialization.getClass returns a akka.serialization.Serialization$
 object, where as my custom class's constructor
 accepts akka.serialization.Serialization object.

 so I would like to know,
 1. if this happens because I'm using this in the Java environment?
 2. what is the workaround to this?

 thanks

 Please find the full stack trace of the error below.

 [2015-06-25 10:59:01,095] ERROR {akka.actor.OneForOneStrategy} -
  
 org.wso2.carbon.analytics.spark.core.util.master.AnalyticsStandaloneRecoveryModeFactory.init(org.apache.spark.SparkConf,
 akka.serialization.Serialization$)
 akka.actor.ActorInitializationException: exception during creation
 at akka.actor.ActorInitializationException$.apply(Actor.scala:164)
 at akka.actor.ActorCell.create(ActorCell.scala:596)
 at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456)
 at akka.actor.ActorCell.systemInvoke(ActorCell.scala

Re: Error in invoking a custom StandaloneRecoveryModeFactory in java env (Spark v1.3.0)

2015-07-05 Thread Niranda Perera
Hi,

Sorry this was a class loading issue at my side. Sorted it out.

Sorry if I caused any inconvenience

Rgds

Niranda Perera
+94 71 554 8430
On Jul 5, 2015 17:08, Niranda Perera niranda.per...@gmail.com wrote:

 Hi Josh,

 I tried using the spark 1.4.0 upgrade.

 here is the class I'm trying to use

 package org.wso2.carbon.analytics.spark.core.util.master

 import akka.serialization.Serialization
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.master._

 class AnalyticsRecoveryModeFactoryScala(conf: SparkConf, serializer:
 Serialization)
   extends StandaloneRecoveryModeFactory(conf, serializer) {

   override def createPersistenceEngine(): PersistenceEngine = new
   AnalyticsPersistenceEngine(conf, serializer)

   override def createLeaderElectionAgent(master: LeaderElectable):
 LeaderElectionAgent = new
   AnalyticsLeaderElectionAgent(master)
 }

 object AnalyticsRecoveryModeFactoryScala {

 }

 when I invoke this factory from the master, I get a similar error as
 before

 [2015-07-05 17:06:55,384] ERROR {akka.actor.OneForOneStrategy} -
  
 org.wso2.carbon.analytics.spark.core.util.master.AnalyticsRecoveryModeFactory.init(org.apache.spark.SparkConf,
 akka.serialization.Serialization)
 akka.actor.ActorInitializationException: exception during creation
 at akka.actor.ActorInitializationException$.apply(Actor.scala:164)
 at akka.actor.ActorCell.create(ActorCell.scala:596)
 at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456)
 at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
 at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 Caused by: java.lang.NoSuchMethodException:
 org.wso2.carbon.analytics.spark.core.util.master.AnalyticsRecoveryModeFactory.init(org.apache.spark.SparkConf,
 akka.serialization.Serialization)
 at java.lang.Class.getConstructor0(Class.java:2810)
 at java.lang.Class.getConstructor(Class.java:1718)
 at org.apache.spark.deploy.master.Master.preStart(Master.scala:168)
 at akka.actor.Actor$class.aroundPreStart(Actor.scala:470)
 at org.apache.spark.deploy.master.Master.aroundPreStart(Master.scala:52)
 at akka.actor.ActorCell.create(ActorCell.scala:580)
 ... 9 more


 what could be the reason for this?

 rgds

 On Thu, Jun 25, 2015 at 11:42 AM, Niranda Perera niranda.per...@gmail.com
  wrote:

 thanks Josh.

 this looks very similar to my problem.

 On Thu, Jun 25, 2015 at 11:32 AM, Josh Rosen rosenvi...@gmail.com
 wrote:

 This sounds like https://issues.apache.org/jira/browse/SPARK-7436,
 which has been fixed in Spark 1.4+ and in branch-1.3 (for Spark 1.3.2).

 On Wed, Jun 24, 2015 at 10:57 PM, Niranda Perera 
 niranda.per...@gmail.com wrote:

 Hi all,

 I'm trying to implement a custom StandaloneRecoveryModeFactory in the
 Java environment. Pls find the implementation here. [1] . I'm new to Scala,
 hence I'm trying to use Java environment as much as possible.

 when I start a master with spark.deploy.recoveryMode.factory property
 to be CUSTOM, I encounter a NoSuchMethodException for my custom class's
 constructor.
 it has the following constructor.

  public AnalyticsStandaloneRecoveryModeFactory(SparkConf conf,
 Serialization serializer)

 but from the Master, it looks for a constructor for,
 org.wso2.carbon.analytics.spark.core.util.master.AnalyticsStandaloneRecoveryModeFactory.init(org.apache.spark.SparkConf,
 akka.serialization.Serialization$)

 I see in the Spark source code for Master, that it uses reflection to
 get the custom recovery mode factory class.

 case CUSTOM =
 val clazz =
 Class.forName(conf.get(spark.deploy.recoveryMode.factory))
 val factory = clazz.getConstructor(conf.getClass,
 Serialization.getClass)
   .newInstance(conf, SerializationExtension(context.system))
   .asInstanceOf[StandaloneRecoveryModeFactory]
 (factory.createPersistenceEngine(),
 factory.createLeaderElectionAgent(this))

 here, Serialization.getClass returns
 a akka.serialization.Serialization$ object, where as my custom class's
 constructor accepts akka.serialization.Serialization object.

 so I would like to know,
 1. if this happens because I'm using this in the Java environment?
 2. what is the workaround to this?

 thanks

 Please find the full stack trace of the error below.

 [2015-06-25 10:59:01,095] ERROR {akka.actor.OneForOneStrategy} -
  
 org.wso2.carbon.analytics.spark.core.util.master.AnalyticsStandaloneRecoveryModeFactory.init(org.apache.spark.SparkConf,
 akka.serialization.Serialization

Re: Error in invoking a custom StandaloneRecoveryModeFactory in java env (Spark v1.3.0)

2015-06-25 Thread Niranda Perera
thanks Josh.

this looks very similar to my problem.

On Thu, Jun 25, 2015 at 11:32 AM, Josh Rosen rosenvi...@gmail.com wrote:

 This sounds like https://issues.apache.org/jira/browse/SPARK-7436, which
 has been fixed in Spark 1.4+ and in branch-1.3 (for Spark 1.3.2).

 On Wed, Jun 24, 2015 at 10:57 PM, Niranda Perera niranda.per...@gmail.com
  wrote:

 Hi all,

 I'm trying to implement a custom StandaloneRecoveryModeFactory in the
 Java environment. Pls find the implementation here. [1] . I'm new to Scala,
 hence I'm trying to use Java environment as much as possible.

 when I start a master with spark.deploy.recoveryMode.factory property to
 be CUSTOM, I encounter a NoSuchMethodException for my custom class's
 constructor.
 it has the following constructor.

  public AnalyticsStandaloneRecoveryModeFactory(SparkConf conf,
 Serialization serializer)

 but from the Master, it looks for a constructor for,
 org.wso2.carbon.analytics.spark.core.util.master.AnalyticsStandaloneRecoveryModeFactory.init(org.apache.spark.SparkConf,
 akka.serialization.Serialization$)

 I see in the Spark source code for Master, that it uses reflection to get
 the custom recovery mode factory class.

 case CUSTOM =
 val clazz =
 Class.forName(conf.get(spark.deploy.recoveryMode.factory))
 val factory = clazz.getConstructor(conf.getClass,
 Serialization.getClass)
   .newInstance(conf, SerializationExtension(context.system))
   .asInstanceOf[StandaloneRecoveryModeFactory]
 (factory.createPersistenceEngine(),
 factory.createLeaderElectionAgent(this))

 here, Serialization.getClass returns a akka.serialization.Serialization$
 object, where as my custom class's constructor
 accepts akka.serialization.Serialization object.

 so I would like to know,
 1. if this happens because I'm using this in the Java environment?
 2. what is the workaround to this?

 thanks

 Please find the full stack trace of the error below.

 [2015-06-25 10:59:01,095] ERROR {akka.actor.OneForOneStrategy} -
  
 org.wso2.carbon.analytics.spark.core.util.master.AnalyticsStandaloneRecoveryModeFactory.init(org.apache.spark.SparkConf,
 akka.serialization.Serialization$)
 akka.actor.ActorInitializationException: exception during creation
 at akka.actor.ActorInitializationException$.apply(Actor.scala:164)
 at akka.actor.ActorCell.create(ActorCell.scala:596)
 at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456)
 at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
 at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 Caused by: java.lang.NoSuchMethodException:
 org.wso2.carbon.analytics.spark.core.util.master.AnalyticsStandaloneRecoveryModeFactory.init(org.apache.spark.SparkConf,
 akka.serialization.Serialization$)
 at java.lang.Class.getConstructor0(Class.java:2810)
 at java.lang.Class.getConstructor(Class.java:1718)
 at org.apache.spark.deploy.master.Master.preStart(Master.scala:165)
 at akka.actor.Actor$class.aroundPreStart(Actor.scala:470)
 at org.apache.spark.deploy.master.Master.aroundPreStart(Master.scala:52)
 at akka.actor.ActorCell.create(ActorCell.scala:580)
 ... 9 more



 [1]
 https://github.com/nirandaperera/carbon-analytics/blob/spark_master_persistance/components/analytics-processors/org.wso2.carbon.analytics.spark.core/src/main/java/org/wso2/carbon/analytics/spark/core/util/master/AnalyticsStandaloneRecoveryModeFactory.java
 https://github.com/nirandaperera/carbon-analytics/blob/spark_master_persistance/components/analytics-processors/org.wso2.carbon.analytics.spark.core/src/main/java/org/wso2/carbon/analytics/spark/core/util/master/AnalyticsStandaloneRecoveryModeFactory.java

 --
 Niranda
 @n1r44 https://twitter.com/N1R44
 https://pythagoreanscript.wordpress.com/





-- 
Niranda
@n1r44 https://twitter.com/N1R44
https://pythagoreanscript.wordpress.com/


Error in invoking a custom StandaloneRecoveryModeFactory in java env (Spark v1.3.0)

2015-06-24 Thread Niranda Perera
Hi all,

I'm trying to implement a custom StandaloneRecoveryModeFactory in the Java
environment. Pls find the implementation here. [1] . I'm new to Scala,
hence I'm trying to use Java environment as much as possible.

when I start a master with spark.deploy.recoveryMode.factory property to be
CUSTOM, I encounter a NoSuchMethodException for my custom class's
constructor.
it has the following constructor.

 public AnalyticsStandaloneRecoveryModeFactory(SparkConf conf,
Serialization serializer)

but from the Master, it looks for a constructor for,
org.wso2.carbon.analytics.spark.core.util.master.AnalyticsStandaloneRecoveryModeFactory.init(org.apache.spark.SparkConf,
akka.serialization.Serialization$)

I see in the Spark source code for Master, that it uses reflection to get
the custom recovery mode factory class.

case CUSTOM =
val clazz =
Class.forName(conf.get(spark.deploy.recoveryMode.factory))
val factory = clazz.getConstructor(conf.getClass,
Serialization.getClass)
  .newInstance(conf, SerializationExtension(context.system))
  .asInstanceOf[StandaloneRecoveryModeFactory]
(factory.createPersistenceEngine(),
factory.createLeaderElectionAgent(this))

here, Serialization.getClass returns a akka.serialization.Serialization$
object, where as my custom class's constructor
accepts akka.serialization.Serialization object.

so I would like to know,
1. if this happens because I'm using this in the Java environment?
2. what is the workaround to this?

thanks

Please find the full stack trace of the error below.

[2015-06-25 10:59:01,095] ERROR {akka.actor.OneForOneStrategy} -
 
org.wso2.carbon.analytics.spark.core.util.master.AnalyticsStandaloneRecoveryModeFactory.init(org.apache.spark.SparkConf,
akka.serialization.Serialization$)
akka.actor.ActorInitializationException: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:164)
at akka.actor.ActorCell.create(ActorCell.scala:596)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NoSuchMethodException:
org.wso2.carbon.analytics.spark.core.util.master.AnalyticsStandaloneRecoveryModeFactory.init(org.apache.spark.SparkConf,
akka.serialization.Serialization$)
at java.lang.Class.getConstructor0(Class.java:2810)
at java.lang.Class.getConstructor(Class.java:1718)
at org.apache.spark.deploy.master.Master.preStart(Master.scala:165)
at akka.actor.Actor$class.aroundPreStart(Actor.scala:470)
at org.apache.spark.deploy.master.Master.aroundPreStart(Master.scala:52)
at akka.actor.ActorCell.create(ActorCell.scala:580)
... 9 more



[1]
https://github.com/nirandaperera/carbon-analytics/blob/spark_master_persistance/components/analytics-processors/org.wso2.carbon.analytics.spark.core/src/main/java/org/wso2/carbon/analytics/spark/core/util/master/AnalyticsStandaloneRecoveryModeFactory.java
https://github.com/nirandaperera/carbon-analytics/blob/spark_master_persistance/components/analytics-processors/org.wso2.carbon.analytics.spark.core/src/main/java/org/wso2/carbon/analytics/spark/core/util/master/AnalyticsStandaloneRecoveryModeFactory.java

-- 
Niranda
@n1r44 https://twitter.com/N1R44
https://pythagoreanscript.wordpress.com/


custom REST port from spark-defaults.cof

2015-06-23 Thread Niranda Perera
Hi,

is there a configuration setting to set a custom port number for the master
REST URL? can that be included in the spark-defaults.conf?

cheers
-- 
Niranda
@n1r44 https://twitter.com/N1R44
https://pythagoreanscript.wordpress.com/


Re: Tentative due dates for Spark 1.3.2 release

2015-05-17 Thread Niranda Perera
Hi Reynold,

sorry, my mistake. can do that. thanks

On Mon, May 18, 2015 at 9:51 AM, Reynold Xin r...@databricks.com wrote:

 You can just look at this branch, can't you?
 https://github.com/apache/spark/tree/branch-1.3


 On Sun, May 17, 2015 at 9:20 PM, Niranda Perera niranda.per...@gmail.com
 wrote:

 Hi Patrick,

 Is there a separate location where I could download all the patches of
 each branch to this date, so that i could apply it locally?

 rgds

 On Fri, May 15, 2015 at 11:52 PM, Patrick Wendell pwend...@gmail.com
 wrote:

 Hi Niranda,

 Maintenance releases are not done on a predetermined schedule but
 instead according to which fixes show up and their severity. Since we
 just did a 1.3.1 release I'm not sure I see 1.3.2 on the immediate
 horizon.

 However, the maintenance releases are simply builds at the head of the
 respective release branches (in this case branch-1.3). They never
 introduce new API's. If you have a particular bug fix you are waiting
 for, you can always build Spark off of that branch.

 - Patrick

 On Fri, May 15, 2015 at 12:46 AM, Niranda Perera
 niranda.per...@gmail.com wrote:
  Hi,
 
  May I know the tentative release dates for spark 1.3.2?
 
  rgds
 
  --
  Niranda




 --
 Niranda





-- 
Niranda


Re: Tentative due dates for Spark 1.3.2 release

2015-05-17 Thread Niranda Perera
Hi Patrick,

Is there a separate location where I could download all the patches of each
branch to this date, so that i could apply it locally?

rgds

On Fri, May 15, 2015 at 11:52 PM, Patrick Wendell pwend...@gmail.com
wrote:

 Hi Niranda,

 Maintenance releases are not done on a predetermined schedule but
 instead according to which fixes show up and their severity. Since we
 just did a 1.3.1 release I'm not sure I see 1.3.2 on the immediate
 horizon.

 However, the maintenance releases are simply builds at the head of the
 respective release branches (in this case branch-1.3). They never
 introduce new API's. If you have a particular bug fix you are waiting
 for, you can always build Spark off of that branch.

 - Patrick

 On Fri, May 15, 2015 at 12:46 AM, Niranda Perera
 niranda.per...@gmail.com wrote:
  Hi,
 
  May I know the tentative release dates for spark 1.3.2?
 
  rgds
 
  --
  Niranda




-- 
Niranda


Tentative due dates for Spark 1.3.2 release

2015-05-15 Thread Niranda Perera
Hi,

May I know the tentative release dates for spark 1.3.2?

rgds

-- 
Niranda


Re: Custom PersistanceEngine and LeaderAgent implementation in Java

2015-05-01 Thread Niranda Perera
Hi Reynold,

Pls find the PR here [1]

[1] https://github.com/apache/spark/pull/5832

On Thu, Apr 30, 2015 at 11:34 AM, Reynold Xin r...@databricks.com wrote:

 We should change the trait to abstract class, and then your problem will
 go away.

 Do you want to submit a pull request?


 On Wed, Apr 29, 2015 at 11:02 PM, Niranda Perera niranda.per...@gmail.com
  wrote:

 Hi,

 this follows the following feature in this feature [1]

 I'm trying to implement a custom persistence engine and a leader agent in
 the Java environment.

 vis-a-vis scala, when I implement the PersistenceEngine trait in java, I
 would have to implement methods such as readPersistedData, removeDriver,
 etc together with read, persist and unpersist methods.

 but the issue here is, methods such as readPersistedData etc are 'final
 def's, hence can not be overridden in the java environment.

 I am new to scala, but is there any workaround to implement the above
 traits in java?

 look forward to hear from you.

 [1] https://issues.apache.org/jira/browse/SPARK-1830

 --
 Niranda





-- 
Niranda


Custom PersistanceEngine and LeaderAgent implementation in Java

2015-04-30 Thread Niranda Perera
Hi,

this follows the following feature in this feature [1]

I'm trying to implement a custom persistence engine and a leader agent in
the Java environment.

vis-a-vis scala, when I implement the PersistenceEngine trait in java, I
would have to implement methods such as readPersistedData, removeDriver,
etc together with read, persist and unpersist methods.

but the issue here is, methods such as readPersistedData etc are 'final
def's, hence can not be overridden in the java environment.

I am new to scala, but is there any workaround to implement the above
traits in java?

look forward to hear from you.

[1] https://issues.apache.org/jira/browse/SPARK-1830

-- 
Niranda


Migrating from 1.2.1 to 1.3.0 - org.apache.spark.sql.api.java.Row

2015-04-01 Thread Niranda Perera
Hi,

previously in 1.2.1, the result row from a Spark SQL query was
a org.apache.spark.sql.api.java.Row.

In 1.3.0 I do not see a sql.api.java package. so does it mean that even the
SQL query result row is an implementation of org.apache.spark.sql.Row such
as GenericRow etc?

-- 
Niranda


Connecting a worker to the master after a spark context is made

2015-03-20 Thread Niranda Perera
Hi,

Please consider the following scenario.

I've started the spark master by invoking
the org.apache.spark.deploy.master.Master.startSystemAndActor method in a
java code and connected a worker to it using
the org.apache.spark.deploy.worker.Worker.startSystemAndActor method. and
then I have successfully created a java spark  SQL contexts and performed
SQL queries.

My question is, can I change this order?
Can I start the master first, then create a spark context... and later on
connect a worker to the master?

While trying out this scenario, I have successfully started the master.
Please see the screenshot here.



But when I create an spark context, it terminates automatically. is it
because the master not being connected to a worker?

cheers


-- 
Niranda
​


Re: Fixed worker ports in the spark worker

2015-03-18 Thread Niranda Perera
Thanks Arush.

this is governed by the conf/spark-defaults.conf config, is it?

On Wed, Mar 18, 2015 at 1:30 PM, Arush Kharbanda ar...@sigmoidanalytics.com
 wrote:

 You can fix the ports in the configuration -

 http://spark.apache.org/docs/1.2.0/configuration.html#networking

 On Wed, Mar 18, 2015 at 11:10 AM, Niranda Perera niranda.per...@gmail.com
  wrote:

 Hi all,

 I see that spark server opens up random ports, especially in the workers.

 is there any way to fix these ports or give an set of ports for the worker
 to choose from?

 cheers

 --
 Niranda




 --

 [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com

 *Arush Kharbanda* || Technical Teamlead

 ar...@sigmoidanalytics.com || www.sigmoidanalytics.com




-- 
Niranda


Fixed worker ports in the spark worker

2015-03-17 Thread Niranda Perera
Hi all,

I see that spark server opens up random ports, especially in the workers.

is there any way to fix these ports or give an set of ports for the worker
to choose from?

cheers

-- 
Niranda


Deploying master and worker programatically in java

2015-03-03 Thread Niranda Perera
Hi,

I want to start a Spark standalone cluster programatically in java.

I have been checking these classes,
- org.apache.spark.deploy.master.Master
- org.apache.spark.deploy.worker.Worker

I successfully started a master with this simple main class.

 public static void main(String[] args) {
SparkConf conf = new SparkConf();
Master.startSystemAndActor(localhost, 4500, 8080, conf);
}


but I'm finding it hard to carry out a similar approach for the worker.

can anyone give an example of how to pass a value to the workerNumber field
in the Worker.startSystemAndActor constructor (in the java env)?

Cheers
-- 
Niranda


OSGI bundles for spark project..

2015-02-20 Thread Niranda Perera
Hi,

I am interested in a Spark OSGI bundle.

While checking the maven repository I found out that it is still not being
implemented.

Can we see an OSGI bundle being released soon? Is it in the Spark Project
roadmap?

Rgds
-- 
Niranda


Re: OSGI bundles for spark project..

2015-02-20 Thread Niranda Perera
Hi Sean,

does it mean that Spark is not encouraged to be embedded on other products?

On Fri, Feb 20, 2015 at 3:29 PM, Sean Owen so...@cloudera.com wrote:

 I don't think an OSGI bundle makes sense for Spark. It's part JAR,
 part lifecycle manager. Spark has its own lifecycle  management and is
 not generally embeddable. Packaging is generally 'out of scope' for
 the core project beyond the standard Maven and assembly releases.

 On Fri, Feb 20, 2015 at 8:33 AM, Niranda Perera
 niranda.per...@gmail.com wrote:
  Hi,
 
  I am interested in a Spark OSGI bundle.
 
  While checking the maven repository I found out that it is still not
 being
  implemented.
 
  Can we see an OSGI bundle being released soon? Is it in the Spark Project
  roadmap?
 
  Rgds
  --
  Niranda




-- 
Niranda


Re: Replacing Jetty with TomCat

2015-02-19 Thread Niranda Perera
Hi Sean,
The issue we have here is that all our products are based on a single
platform and we try to make all our products coherent with our platform as
much as possible. so, having two web services in one instance would not be
a very elegant solution. That is why we were seeking a way to switch it to
Tomcat. But as I understand, it is not readily supported, hence we will
have to accept it as it is.

If we are not using the Spark UIs, is it possible to disable the UIs and
prevent the jetty server from starting, but yet use the core spark
functionality?

Hi Corey,
thank you for your ideas. Our biggest concern here was that it starts a new
webserver inside spark. opening up new ports etc. might be seen as security
threats when it comes to commercial distributions.

cheers



On Wed, Feb 18, 2015 at 3:25 PM, Sean Owen so...@cloudera.com wrote:

 I do not think it makes sense to make the web server configurable.
 Mostly because there's no real problem in running an HTTP service
 internally based on Netty while you run your own HTTP service based on
 something else like Tomcat. What's the problem?

 On Wed, Feb 18, 2015 at 3:14 AM, Niranda Perera
 niranda.per...@gmail.com wrote:
  Hi Sean,
  The main issue we have is, running two web servers in a single product.
 we
  think it would not be an elegant solution.
 
  Could you please point me to the main areas where jetty server is tightly
  coupled or extension points where I could plug tomcat instead of jetty?
  If successful I could contribute it to the spark project. :-)
 
  cheers
 
 
 
  On Mon, Feb 16, 2015 at 4:51 PM, Sean Owen so...@cloudera.com wrote:
 
  There's no particular reason you have to remove the embedded Jetty
  server, right? it doesn't prevent you from using it inside another app
  that happens to run in Tomcat. You won't be able to switch it out
  without rewriting a fair bit of code, no, but you don't need to.
 
  On Mon, Feb 16, 2015 at 5:08 AM, Niranda Perera
  niranda.per...@gmail.com wrote:
   Hi,
  
   We are thinking of integrating Spark server inside a product. Our
   current
   product uses Tomcat as its webserver.
  
   Is it possible to switch the Jetty webserver in Spark to Tomcat
   off-the-shelf?
  
   Cheers
  
   --
   Niranda
 
 
 
 
  --
  Niranda




-- 
Niranda


Re: Replacing Jetty with TomCat

2015-02-17 Thread Niranda Perera
Hi Sean,
The main issue we have is, running two web servers in a single product. we
think it would not be an elegant solution.

Could you please point me to the main areas where jetty server is tightly
coupled or extension points where I could plug tomcat instead of jetty?
If successful I could contribute it to the spark project. :-)

cheers



On Mon, Feb 16, 2015 at 4:51 PM, Sean Owen so...@cloudera.com wrote:

 There's no particular reason you have to remove the embedded Jetty
 server, right? it doesn't prevent you from using it inside another app
 that happens to run in Tomcat. You won't be able to switch it out
 without rewriting a fair bit of code, no, but you don't need to.

 On Mon, Feb 16, 2015 at 5:08 AM, Niranda Perera
 niranda.per...@gmail.com wrote:
  Hi,
 
  We are thinking of integrating Spark server inside a product. Our current
  product uses Tomcat as its webserver.
 
  Is it possible to switch the Jetty webserver in Spark to Tomcat
  off-the-shelf?
 
  Cheers
 
  --
  Niranda




-- 
Niranda


Re: Replacing Jetty with TomCat

2015-02-15 Thread Niranda Perera
Hi Reynold,

Thank you for the response. Could you please clarify the need of Jetty
server inside Spark? Is it used for Spark core functionality or is it there
for Spark jobs UI purposes?

cheers

On Mon, Feb 16, 2015 at 10:47 AM, Reynold Xin r...@databricks.com wrote:

 Most likely no. We are using the embedded mode of Jetty, rather than using
 servlets.

 Even if it is possible, you probably wouldn't want to embed Spark in your
 application server ...


 On Sun, Feb 15, 2015 at 9:08 PM, Niranda Perera niranda.per...@gmail.com
 wrote:

 Hi,

 We are thinking of integrating Spark server inside a product. Our current
 product uses Tomcat as its webserver.

 Is it possible to switch the Jetty webserver in Spark to Tomcat
 off-the-shelf?

 Cheers

 --
 Niranda





-- 
Niranda


Replacing Jetty with TomCat

2015-02-15 Thread Niranda Perera
Hi,

We are thinking of integrating Spark server inside a product. Our current
product uses Tomcat as its webserver.

Is it possible to switch the Jetty webserver in Spark to Tomcat
off-the-shelf?

Cheers

-- 
Niranda


create a SchemaRDD from a custom datasource

2015-01-13 Thread Niranda Perera
Hi,

We have a custom datasources API, which connects to various data sources
and exposes them out as a common API. We are now trying to implement the
Spark datasources API released in 1.2.0 to connect Spark for analytics.

Looking at the sources API, we figured out that we should extend a scan
class (table scan etc). While doing so, we would have to implement the
'schema' and 'buildScan' methods.

say, we can infer the schema of the underlying data and take data out as
Row elements. Is there any way we could create RDD[Row] (needed in the
buildScan method) using these Row elements?

Cheers
-- 
Niranda


Guava 11 dependency issue in Spark 1.2.0

2015-01-06 Thread Niranda Perera
Hi,

I have been running a simple Spark app on a local spark cluster and I came
across this error.

Exception in thread main java.lang.NoSuchMethodError:
com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode;
at org.apache.spark.util.collection.OpenHashSet.org
$apache$spark$util$collection$OpenHashSet$$hashcode(OpenHashSet.scala:261)
at
org.apache.spark.util.collection.OpenHashSet$mcI$sp.getPos$mcI$sp(OpenHashSet.scala:165)
at
org.apache.spark.util.collection.OpenHashSet$mcI$sp.contains$mcI$sp(OpenHashSet.scala:102)
at
org.apache.spark.util.SizeEstimator$$anonfun$visitArray$2.apply$mcVI$sp(SizeEstimator.scala:214)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at
org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:210)
at
org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:169)
at
org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:161)
at
org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:155)
at
org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
at
org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
at
org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31)
at
org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:249)
at
org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:136)
at
org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:114)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787)
at
org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638)
at
org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:992)
at
org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:98)
at
org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:84)
at
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
at
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:945)
at org.apache.spark.SparkContext.hadoopFile(SparkContext.scala:695)
at
com.databricks.spark.avro.AvroRelation.buildScan$lzycompute(AvroRelation.scala:45)
at
com.databricks.spark.avro.AvroRelation.buildScan(AvroRelation.scala:44)
at
org.apache.spark.sql.sources.DataSourceStrategy$.apply(DataSourceStrategy.scala:56)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
at
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:418)
at
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:416)
at
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:422)
at
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:422)
at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:444)
at
org.apache.spark.sql.api.java.JavaSchemaRDD.collect(JavaSchemaRDD.scala:114)


While looking into this I found out that Guava was downgraded to version 11
in this PR.
https://github.com/apache/spark/pull/1610

In this PR OpenHashSet.scala:261 line hashInt has been changed to hashLong.
But when I actually run my app,  java.lang.NoSuchMethodError:
com.google.common.hash.HashFunction.hashInt error occurs,
which is understandable because hashInt is not available before Guava 12.

So, I''m wondering why this occurs?

Cheers
-- 
Niranda Perera


Re: Can the Scala classes in the spark source code, be inherited in Java classes?

2014-12-02 Thread Niranda Perera
Thanks.

And @Reynold, sorry my bad, Guess I should have used something like
Stackoverflow!

On Tue, Dec 2, 2014 at 12:18 PM, Reynold Xin r...@databricks.com wrote:

 Oops my previous response wasn't sent properly to the dev list. Here you
 go for archiving.


 Yes you can. Scala classes are compiled down to classes in bytecode. Take
 a look at this: https://twitter.github.io/scala_school/java.html

 Note that questions like this are not exactly what this dev list is meant
 for  ...

 On Mon, Dec 1, 2014 at 9:22 PM, Niranda Perera nira...@wso2.com wrote:

 Hi,

 Can the Scala classes in the spark source code, be inherited (and other
 OOP
 concepts) in Java classes?

 I want to customize some part of the code, but I would like to do it in a
 Java environment.

 Rgds

 --
 *Niranda Perera*
 Software Engineer, WSO2 Inc.
 Mobile: +94-71-554-8430
 Twitter: @n1r44 https://twitter.com/N1R44





-- 
*Niranda Perera*
Software Engineer, WSO2 Inc.
Mobile: +94-71-554-8430
Twitter: @n1r44 https://twitter.com/N1R44


Re: Creating a SchemaRDD from an existing API

2014-12-01 Thread Niranda Perera
Hi Michael,

About this new data source API, what type of data sources would it support?
Does it have to be RDBMS necessarily?

Cheers

On Sat, Nov 29, 2014 at 12:57 AM, Michael Armbrust mich...@databricks.com
wrote:

 You probably don't need to create a new kind of SchemaRDD.  Instead I'd
 suggest taking a look at the data sources API that we are adding in Spark
 1.2.  There is not a ton of documentation, but the test cases show how to
 implement the various interfaces
 https://github.com/apache/spark/tree/master/sql/core/src/test/scala/org/apache/spark/sql/sources,
 and there is an example library for reading Avro data
 https://github.com/databricks/spark-avro.

 On Thu, Nov 27, 2014 at 10:31 PM, Niranda Perera nira...@wso2.com wrote:

 Hi,

 I am evaluating Spark for an analytic component where we do batch
 processing of data using SQL.

 So, I am particularly interested in Spark SQL and in creating a SchemaRDD
 from an existing API [1].

 This API exposes elements in a database as datasources. Using the methods
 allowed by this data source, we can access and edit data.

 So, I want to create a custom SchemaRDD using the methods and provisions
 of
 this API. I tried going through Spark documentation and the Java Docs, but
 unfortunately, I was unable to come to a final conclusion if this was
 actually possible.

 I would like to ask the Spark Devs,
 1. As of the current Spark release, can we make a custom SchemaRDD?
 2. What is the extension point to a custom SchemaRDD? or are there
 particular interfaces?
 3. Could you please point me the specific docs regarding this matter?

 Your help in this regard is highly appreciated.

 Cheers

 [1]

 https://github.com/wso2-dev/carbon-analytics/tree/master/components/xanalytics

 --
 *Niranda Perera*
 Software Engineer, WSO2 Inc.
 Mobile: +94-71-554-8430
 Twitter: @n1r44 https://twitter.com/N1R44





-- 
*Niranda Perera*
Software Engineer, WSO2 Inc.
Mobile: +94-71-554-8430
Twitter: @n1r44 https://twitter.com/N1R44


Can the Scala classes in the spark source code, be inherited in Java classes?

2014-12-01 Thread Niranda Perera
Hi,

Can the Scala classes in the spark source code, be inherited (and other OOP
concepts) in Java classes?

I want to customize some part of the code, but I would like to do it in a
Java environment.

Rgds

-- 
*Niranda Perera*
Software Engineer, WSO2 Inc.
Mobile: +94-71-554-8430
Twitter: @n1r44 https://twitter.com/N1R44


Creating a SchemaRDD from an existing API

2014-11-27 Thread Niranda Perera
Hi,

I am evaluating Spark for an analytic component where we do batch
processing of data using SQL.

So, I am particularly interested in Spark SQL and in creating a SchemaRDD
from an existing API [1].

This API exposes elements in a database as datasources. Using the methods
allowed by this data source, we can access and edit data.

So, I want to create a custom SchemaRDD using the methods and provisions of
this API. I tried going through Spark documentation and the Java Docs, but
unfortunately, I was unable to come to a final conclusion if this was
actually possible.

I would like to ask the Spark Devs,
1. As of the current Spark release, can we make a custom SchemaRDD?
2. What is the extension point to a custom SchemaRDD? or are there
particular interfaces?
3. Could you please point me the specific docs regarding this matter?

Your help in this regard is highly appreciated.

Cheers

[1]
https://github.com/wso2-dev/carbon-analytics/tree/master/components/xanalytics

-- 
*Niranda Perera*
Software Engineer, WSO2 Inc.
Mobile: +94-71-554-8430
Twitter: @n1r44 https://twitter.com/N1R44


Getting the execution times of spark job

2014-09-02 Thread Niranda Perera
Hi,

I have been playing around with spark for a couple of days. I am
using spark-1.0.1-bin-hadoop1 and the Java API. The main idea of the
implementation is to run Hive queries on Spark. I used JavaHiveContext to
achieve this (As per the examples).

I have 2 questions.
1. I am wondering how I could get the execution times of a spark job? Does
Spark provide monitoring facilities in the form of an API?

2. I used a laymen way to get the execution times by enclosing a
JavaHiveContext.hql method with System.nanoTime() as follows

long start, end;
JavaHiveContext hiveCtx;
JavaSchemaRDD hiveResult;

start = System.nanoTime();
hiveResult = hiveCtx.hql(query);
end = System.nanoTime();
System.out.println(start-end);

But the result I got is drastically different from the execution times
recorded in SparkUI. Can you please explain this disparity?

Look forward to hearing from you.

rgds

-- 
*Niranda Perera*
Software Engineer, WSO2 Inc.
Mobile: +94-71-554-8430
Twitter: @n1r44 https://twitter.com/N1R44


Storage Handlers in Spark SQL

2014-08-21 Thread Niranda Perera
Hi,

I have been playing around with Spark for the past few days, and evaluating
the possibility of migrating into Spark (Spark SQL) from Hive/Hadoop.

I am working on the WSO2 Business Activity Monitor (WSO2 BAM,
https://docs.wso2.com/display/BAM241/WSO2+Business+Activity+Monitor+Documentation
) which has currently employed Hive. We are considering Spark as a
successor for Hive, given it's performance enhancement.

We have currently employed several custom storage-handlers in Hive.
Example:
WSO2 JDBC and Cassandra storage handlers:
https://docs.wso2.com/display/BAM241/JDBC+Storage+Handler+for+Hive
https://docs.wso2.com/display/BAM241/Creating+Hive+Queries+to+Analyze+Data#CreatingHiveQueriestoAnalyzeData-cas

I would like to know where Spark SQL can work with these storage
handlers (while using HiveContext may be) ?

Best regards
-- 
*Niranda Perera*
Software Engineer, WSO2 Inc.
Mobile: +94-71-554-8430
Twitter: @n1r44 https://twitter.com/N1R44