Re: [Spark Launcher] How to launch parallel jobs?

2017-02-13 Thread Egor Pahomov
About second problem: I understand this can be in two cases: when one job
prevents the other one from getting resources for executors or (2)
bottleneck is reading from disk, so you can not really parallel that. I
have no experience with second case, but it's easy to verify the fist one:
just look on you hadoop UI and verify, that both job get enough resources.

2017-02-13 11:07 GMT-08:00 Egor Pahomov <pahomov.e...@gmail.com>:

> "But if i increase only executor-cores the finish time is the same". More
> experienced ones can correct me, if I'm wrong, but as far as I understand
> that: one partition processed by one spark task. Task is always running on
> 1 core and not parallelized among cores. So if you have 5 partitions and
> you increased totall number of cores among cluster from 7 to 10 for example
> - you have not gained anything. But if you repartition you give an
> opportunity to process thing in more threads, so now more tasks can execute
> in parallel.
>
> 2017-02-13 7:05 GMT-08:00 Cosmin Posteuca <cosmin.poste...@gmail.com>:
>
>> Hi,
>>
>> I think i don't understand enough how to launch jobs.
>>
>> I have one job which takes 60 seconds to finish. I run it with following
>> command:
>>
>> spark-submit --executor-cores 1 \
>>  --executor-memory 1g \
>>  --driver-memory 1g \
>>  --master yarn \
>>  --deploy-mode cluster \
>>  --conf spark.dynamicAllocation.enabled=true \
>>  --conf spark.shuffle.service.enabled=true \
>>  --conf spark.dynamicAllocation.minExecutors=1 \
>>  --conf spark.dynamicAllocation.maxExecutors=4 \
>>  --conf spark.dynamicAllocation.initialExecutors=4 \
>>  --conf spark.executor.instances=4 \
>>
>> If i increase number of partitions from code and number of executors the app 
>> will finish faster, which it's ok. But if i increase only executor-cores the 
>> finish time is the same, and i don't understand why. I expect the time to be 
>> lower than initial time.
>>
>> My second problem is if i launch twice above code i expect that both jobs to 
>> finish in 60 seconds, but this don't happen. Both jobs finish after 120 
>> seconds and i don't understand why.
>>
>> I run this code on AWS EMR, on 2 instances(4 cpu each, and each cpu has 2 
>> threads). From what i saw in default EMR configurations, yarn is set on 
>> FIFO(default) mode with CapacityScheduler.
>>
>> What do you think about this problems?
>>
>> Thanks,
>>
>> Cosmin
>>
>>
>
>
> --
>
>
> *Sincerely yoursEgor Pakhomov*
>



-- 


*Sincerely yoursEgor Pakhomov*


Re: [Spark Launcher] How to launch parallel jobs?

2017-02-13 Thread Egor Pahomov
"But if i increase only executor-cores the finish time is the same". More
experienced ones can correct me, if I'm wrong, but as far as I understand
that: one partition processed by one spark task. Task is always running on
1 core and not parallelized among cores. So if you have 5 partitions and
you increased totall number of cores among cluster from 7 to 10 for example
- you have not gained anything. But if you repartition you give an
opportunity to process thing in more threads, so now more tasks can execute
in parallel.

2017-02-13 7:05 GMT-08:00 Cosmin Posteuca :

> Hi,
>
> I think i don't understand enough how to launch jobs.
>
> I have one job which takes 60 seconds to finish. I run it with following
> command:
>
> spark-submit --executor-cores 1 \
>  --executor-memory 1g \
>  --driver-memory 1g \
>  --master yarn \
>  --deploy-mode cluster \
>  --conf spark.dynamicAllocation.enabled=true \
>  --conf spark.shuffle.service.enabled=true \
>  --conf spark.dynamicAllocation.minExecutors=1 \
>  --conf spark.dynamicAllocation.maxExecutors=4 \
>  --conf spark.dynamicAllocation.initialExecutors=4 \
>  --conf spark.executor.instances=4 \
>
> If i increase number of partitions from code and number of executors the app 
> will finish faster, which it's ok. But if i increase only executor-cores the 
> finish time is the same, and i don't understand why. I expect the time to be 
> lower than initial time.
>
> My second problem is if i launch twice above code i expect that both jobs to 
> finish in 60 seconds, but this don't happen. Both jobs finish after 120 
> seconds and i don't understand why.
>
> I run this code on AWS EMR, on 2 instances(4 cpu each, and each cpu has 2 
> threads). From what i saw in default EMR configurations, yarn is set on 
> FIFO(default) mode with CapacityScheduler.
>
> What do you think about this problems?
>
> Thanks,
>
> Cosmin
>
>


-- 


*Sincerely yoursEgor Pakhomov*


Re: Union of DStream and RDD

2017-02-11 Thread Egor Pahomov
Interestingly, I just faced with the same problem. By any change, do you
want to process old files in the directory as well as new ones? It's my
motivation and checkpointing my problem as well.

2017-02-08 22:02 GMT-08:00 Amit Sela <amitsel...@gmail.com>:

> Not with checkpointing.
>
> On Thu, Feb 9, 2017, 04:58 Egor Pahomov <pahomov.e...@gmail.com> wrote:
>
>> Just guessing here, but would http://spark.apache.org/
>> docs/latest/streaming-programming-guide.html#basic-sources "*Queue of
>> RDDs as a Stream*" work? Basically create DStream from your RDD and than
>> union with other DStream.
>>
>> 2017-02-08 12:32 GMT-08:00 Amit Sela <amitsel...@gmail.com>:
>>
>> Hi all,
>>
>> I'm looking to union a DStream and RDD into a single stream.
>> One important note is that the RDD has to be added to the DStream just
>> once.
>>
>> Ideas ?
>>
>> Thanks,
>> Amit
>>
>>
>>
>>
>>
>> --
>>
>>
>> *Sincerely yoursEgor Pakhomov*
>>
>


-- 


*Sincerely yoursEgor Pakhomov*


Re: [Structured Streaming] Using File Sink to store to hive table.

2017-02-11 Thread Egor Pahomov
Got it, thanks!

2017-02-11 0:56 GMT-08:00 Sam Elamin <hussam.ela...@gmail.com>:

> Here's a link to the thread
>
> http://apache-spark-developers-list.1001551.n3.nabble.com/Structured-
> Streaming-Dropping-Duplicates-td20884.html
>
> On Sat, 11 Feb 2017 at 08:47, Sam Elamin <hussam.ela...@gmail.com> wrote:
>
>> Hey Egor
>>
>>
>> You can use for each writer or you can write a custom sink. I personally
>> went with a custom sink since I get a dataframe per batch
>>
>> https://github.com/samelamin/spark-bigquery/blob/master/
>> src/main/scala/com/samelamin/spark/bigquery/streaming/BigQuerySink.scala
>>
>> You can have a look at how I implemented something similar to file sink
>> that in the event if a failure skips batches already written
>>
>>
>> Also have a look at Micheals reply to me a few days ago on exactly the
>> same topic. The email subject was called structured streaming. Dropping
>> duplicates
>>
>>
>> Regards
>>
>> Sam
>>
>> On Sat, 11 Feb 2017 at 07:59, Jacek Laskowski <ja...@japila.pl> wrote:
>>
>> "Something like that" I've never tried it out myself so I'm only
>> guessing having a brief look at the API.
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>>
>> On Sat, Feb 11, 2017 at 1:31 AM, Egor Pahomov <pahomov.e...@gmail.com>
>> wrote:
>> > Jacek, so I create cache in ForeachWriter, in all "process()" I write
>> to it
>> > and on close I flush? Something like that?
>> >
>> > 2017-02-09 12:42 GMT-08:00 Jacek Laskowski <ja...@japila.pl>:
>> >>
>> >> Hi,
>> >>
>> >> Yes, that's ForeachWriter.
>> >>
>> >> Yes, it works with element by element. You're looking for mapPartition
>> >> and ForeachWriter has partitionId that you could use to implement a
>> >> similar thing.
>> >>
>> >> Pozdrawiam,
>> >> Jacek Laskowski
>> >> 
>> >> https://medium.com/@jaceklaskowski/
>> >> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
>> >> Follow me at https://twitter.com/jaceklaskowski
>> >>
>> >>
>> >> On Thu, Feb 9, 2017 at 3:55 AM, Egor Pahomov <pahomov.e...@gmail.com>
>> >> wrote:
>> >> > Jacek, you mean
>> >> >
>> >> > http://spark.apache.org/docs/latest/api/scala/index.html#
>> org.apache.spark.sql.ForeachWriter
>> >> > ? I do not understand how to use it, since it passes every value
>> >> > separately,
>> >> > not every partition. And addding to table value by value would not
>> work
>> >> >
>> >> > 2017-02-07 12:10 GMT-08:00 Jacek Laskowski <ja...@japila.pl>:
>> >> >>
>> >> >> Hi,
>> >> >>
>> >> >> Have you considered foreach sink?
>> >> >>
>> >> >> Jacek
>> >> >>
>> >> >> On 6 Feb 2017 8:39 p.m., "Egor Pahomov" <pahomov.e...@gmail.com>
>> wrote:
>> >> >>>
>> >> >>> Hi, I'm thinking of using Structured Streaming instead of old
>> >> >>> streaming,
>> >> >>> but I need to be able to save results to Hive table. Documentation
>> for
>> >> >>> file
>> >> >>> sink
>> >> >>>
>> >> >>> says(http://spark.apache.org/docs/latest/structured-
>> streaming-programming-guide.html#output-sinks):
>> >> >>> "Supports writes to partitioned tables. ". But being able to write
>> to
>> >> >>> partitioned directories is not enough to write to the table:
>> someone
>> >> >>> needs
>> >> >>> to write to Hive metastore. How can I use Structured Streaming and
>> >> >>> write to
>> >> >>> Hive table?
>> >> >>>
>> >> >>> --
>> >> >>> Sincerely yours
>> >> >>> Egor Pakhomov
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> > Sincerely yours
>> >> > Egor Pakhomov
>> >
>> >
>> >
>> >
>> > --
>> > Sincerely yours
>> > Egor Pakhomov
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


-- 


*Sincerely yoursEgor Pakhomov*


Re: [Structured Streaming] Using File Sink to store to hive table.

2017-02-10 Thread Egor Pahomov
Jacek, so I create cache in ForeachWriter, in all "process()" I write to it
and on close I flush? Something like that?

2017-02-09 12:42 GMT-08:00 Jacek Laskowski <ja...@japila.pl>:

> Hi,
>
> Yes, that's ForeachWriter.
>
> Yes, it works with element by element. You're looking for mapPartition
> and ForeachWriter has partitionId that you could use to implement a
> similar thing.
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Thu, Feb 9, 2017 at 3:55 AM, Egor Pahomov <pahomov.e...@gmail.com>
> wrote:
> > Jacek, you mean
> > http://spark.apache.org/docs/latest/api/scala/index.html#
> org.apache.spark.sql.ForeachWriter
> > ? I do not understand how to use it, since it passes every value
> separately,
> > not every partition. And addding to table value by value would not work
> >
> > 2017-02-07 12:10 GMT-08:00 Jacek Laskowski <ja...@japila.pl>:
> >>
> >> Hi,
> >>
> >> Have you considered foreach sink?
> >>
> >> Jacek
> >>
> >> On 6 Feb 2017 8:39 p.m., "Egor Pahomov" <pahomov.e...@gmail.com> wrote:
> >>>
> >>> Hi, I'm thinking of using Structured Streaming instead of old
> streaming,
> >>> but I need to be able to save results to Hive table. Documentation for
> file
> >>> sink
> >>> says(http://spark.apache.org/docs/latest/structured-
> streaming-programming-guide.html#output-sinks):
> >>> "Supports writes to partitioned tables. ". But being able to write to
> >>> partitioned directories is not enough to write to the table: someone
> needs
> >>> to write to Hive metastore. How can I use Structured Streaming and
> write to
> >>> Hive table?
> >>>
> >>> --
> >>> Sincerely yours
> >>> Egor Pakhomov
> >
> >
> >
> >
> > --
> > Sincerely yours
> > Egor Pakhomov
>



-- 


*Sincerely yoursEgor Pakhomov*


Re: [Spark-SQL] Hive support is required to select over the following tables

2017-02-08 Thread Egor Pahomov
Just guessing here, but have you build your spark with "-Phive"? By the
way, which version of Zeppelin?

2017-02-08 5:13 GMT-08:00 Daniel Haviv :

> Hi,
> I'm using Spark 2.1.0 on Zeppelin.
>
> I can successfully create a table but when I try to select from it I fail:
> spark.sql("create table foo (name string)")
> res0: org.apache.spark.sql.DataFrame = []
>
> spark.sql("select * from foo")
>
> org.apache.spark.sql.AnalysisException:
> Hive support is required to select over the following tables:
> `default`.`zibi`
> ;;
> 'Project [*]
> +- 'SubqueryAlias foo
> +- 'SimpleCatalogRelation default, CatalogTable(
> Table: `default`.`foo`
> Created: Wed Feb 08 12:52:08 UTC 2017
> Last Access: Wed Dec 31 23:59:59 UTC 1969
> Type: MANAGED
> Schema: [StructField(name,StringType,true)]
> Provider: hive
> Storage(Location: hdfs:/user/spark/warehouse/foo, InputFormat:
> org.apache.hadoop.mapred.TextInputFormat, OutputFormat:
> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat))
>
>
> This is a change in behavior from 2.0.2, any idea why ?
>
> Thank you,
> Daniel
>
>


-- 


*Sincerely yoursEgor Pakhomov*


Re: Union of DStream and RDD

2017-02-08 Thread Egor Pahomov
Just guessing here, but would
http://spark.apache.org/docs/latest/streaming-programming-guide.html#basic-sources
"*Queue of RDDs as a Stream*" work? Basically create DStream from your RDD
and than union with other DStream.

2017-02-08 12:32 GMT-08:00 Amit Sela :

> Hi all,
>
> I'm looking to union a DStream and RDD into a single stream.
> One important note is that the RDD has to be added to the DStream just
> once.
>
> Ideas ?
>
> Thanks,
> Amit
>
>



-- 


*Sincerely yoursEgor Pakhomov*


Re: [Structured Streaming] Using File Sink to store to hive table.

2017-02-08 Thread Egor Pahomov
Jacek, you mean
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.ForeachWriter
? I do not understand how to use it, since it passes every value
separately, not every partition. And addding to table value by value would
not work

2017-02-07 12:10 GMT-08:00 Jacek Laskowski <ja...@japila.pl>:

> Hi,
>
> Have you considered foreach sink?
>
> Jacek
>
> On 6 Feb 2017 8:39 p.m., "Egor Pahomov" <pahomov.e...@gmail.com> wrote:
>
>> Hi, I'm thinking of using Structured Streaming instead of old streaming,
>> but I need to be able to save results to Hive table. Documentation for file
>> sink says(http://spark.apache.org/docs/latest/structured-streamin
>> g-programming-guide.html#output-sinks): "Supports writes to partitioned
>> tables. ". But being able to write to partitioned directories is not
>> enough to write to the table: someone needs to write to Hive metastore. How
>> can I use Structured Streaming and write to Hive table?
>>
>> --
>>
>>
>> *Sincerely yoursEgor Pakhomov*
>>
>


-- 


*Sincerely yoursEgor Pakhomov*


Re: [Structured Streaming] Using File Sink to store to hive table.

2017-02-06 Thread Egor Pahomov
I have stream of files on HDFS with JSON events. I need to convert it to
pq in realtime, process some fields and store in simple Hive table so
people can query it. People even might want to query it with Impala, so
it's important, that it would be real Hive metastore based table. How can I
do that?

2017-02-06 14:25 GMT-08:00 Burak Yavuz <brk...@gmail.com>:

> Hi Egor,
>
> Structured Streaming handles all of its metadata itself, which files are
> actually valid, etc. You may use the "create table" syntax in SQL to treat
> it like a hive table, but it will handle all partitioning information in
> its own metadata log. Is there a specific reason that you want to store the
> information in the Hive Metastore?
>
> Best,
> Burak
>
> On Mon, Feb 6, 2017 at 11:39 AM, Egor Pahomov <pahomov.e...@gmail.com>
> wrote:
>
>> Hi, I'm thinking of using Structured Streaming instead of old streaming,
>> but I need to be able to save results to Hive table. Documentation for file
>> sink says(http://spark.apache.org/docs/latest/structured-streamin
>> g-programming-guide.html#output-sinks): "Supports writes to partitioned
>> tables. ". But being able to write to partitioned directories is not
>> enough to write to the table: someone needs to write to Hive metastore. How
>> can I use Structured Streaming and write to Hive table?
>>
>> --
>>
>>
>> *Sincerely yoursEgor Pakhomov*
>>
>
>


-- 


*Sincerely yoursEgor Pakhomov*


[Structured Streaming] Using File Sink to store to hive table.

2017-02-06 Thread Egor Pahomov
Hi, I'm thinking of using Structured Streaming instead of old streaming,
but I need to be able to save results to Hive table. Documentation for file
sink says(
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks):
"Supports writes to partitioned tables. ". But being able to write to
partitioned directories is not enough to write to the table: someone needs
to write to Hive metastore. How can I use Structured Streaming and write to
Hive table?

-- 


*Sincerely yoursEgor Pakhomov*


Logs of spark driver in yarn-client mode.

2016-07-06 Thread Egor Pahomov
Hi, I have next issue:

I have zeppelin, which set up in yarn-client mode. Notebook in Running
state for long period of time with 0% done and I do not see any even
accepted application in yarn.

To be able to understand what's going on, I need logs of spark driver,
which is trying to connect to hadoop, but zeppelin/logs/* does not have
enough information. Where I should look for these logs?

-- 


*Sincerely yoursEgor Pakhomov*


Re: Thrift JDBC server - why only one per machine and only yarn-client

2016-07-01 Thread Egor Pahomov
What about yarn-cluster mode?

2016-07-01 11:24 GMT-07:00 Egor Pahomov <pahomov.e...@gmail.com>:

> Separate bad users with bad quires from good users with good quires. Spark
> do not provide no scope separation out of the box.
>
> 2016-07-01 11:12 GMT-07:00 Jeff Zhang <zjf...@gmail.com>:
>
>> I think so, any reason you want to deploy multiple thrift server on one
>> machine ?
>>
>> On Fri, Jul 1, 2016 at 10:59 AM, Egor Pahomov <pahomov.e...@gmail.com>
>> wrote:
>>
>>> Takeshi, of course I used different HIVE_SERVER2_THRIFT_PORT
>>> Jeff, thanks, I would try, but from your answer I'm getting the feeling,
>>> that I'm trying some very rare case?
>>>
>>> 2016-07-01 10:54 GMT-07:00 Jeff Zhang <zjf...@gmail.com>:
>>>
>>>> This is not a bug, because these 2 processes use the same SPARK_PID_DIR
>>>> which is /tmp by default.  Although you can resolve this by using
>>>> different SPARK_PID_DIR, I suspect you would still have other issues like
>>>> port conflict. I would suggest you to deploy one spark thrift server per
>>>> machine for now. If stick to deploy multiple spark thrift server on one
>>>> machine, then define different SPARK_CONF_DIR, SPARK_LOG_DIR and
>>>> SPARK_PID_DIR for your 2 instances of spark thrift server. Not sure if
>>>> there's other conflicts. but please try first.
>>>>
>>>>
>>>> On Fri, Jul 1, 2016 at 10:47 AM, Egor Pahomov <pahomov.e...@gmail.com>
>>>> wrote:
>>>>
>>>>> I get
>>>>>
>>>>> "org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 running as
>>>>> process 28989.  Stop it first."
>>>>>
>>>>> Is it a bug?
>>>>>
>>>>> 2016-07-01 10:10 GMT-07:00 Jeff Zhang <zjf...@gmail.com>:
>>>>>
>>>>>> I don't think the one instance per machine is true.  As long as you
>>>>>> resolve the conflict issue such as port conflict, pid file, log file and
>>>>>> etc, you can run multiple instances of spark thrift server.
>>>>>>
>>>>>> On Fri, Jul 1, 2016 at 9:32 AM, Egor Pahomov <pahomov.e...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi, I'm using Spark Thrift JDBC server and 2 limitations are really
>>>>>>> bother me -
>>>>>>>
>>>>>>> 1) One instance per machine
>>>>>>> 2) Yarn client only(not yarn cluster)
>>>>>>>
>>>>>>> Are there any architectural reasons for such limitations? About
>>>>>>> yarn-client I might understand in theory - master is the same process 
>>>>>>> as a
>>>>>>> server, so it makes some sense, but it's really inconvenient - I need a 
>>>>>>> lot
>>>>>>> of memory on my driver machine. Reasons for one instance per machine I 
>>>>>>> do
>>>>>>> not understand.
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>>
>>>>>>> *Sincerely yoursEgor Pakhomov*
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best Regards
>>>>>>
>>>>>> Jeff Zhang
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>>
>>>>> *Sincerely yoursEgor Pakhomov*
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Best Regards
>>>>
>>>> Jeff Zhang
>>>>
>>>
>>>
>>>
>>> --
>>>
>>>
>>> *Sincerely yoursEgor Pakhomov*
>>>
>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>
>
>
> --
>
>
> *Sincerely yoursEgor Pakhomov*
>



-- 


*Sincerely yoursEgor Pakhomov*


Re: Thrift JDBC server - why only one per machine and only yarn-client

2016-07-01 Thread Egor Pahomov
Separate bad users with bad quires from good users with good quires. Spark
do not provide no scope separation out of the box.

2016-07-01 11:12 GMT-07:00 Jeff Zhang <zjf...@gmail.com>:

> I think so, any reason you want to deploy multiple thrift server on one
> machine ?
>
> On Fri, Jul 1, 2016 at 10:59 AM, Egor Pahomov <pahomov.e...@gmail.com>
> wrote:
>
>> Takeshi, of course I used different HIVE_SERVER2_THRIFT_PORT
>> Jeff, thanks, I would try, but from your answer I'm getting the feeling,
>> that I'm trying some very rare case?
>>
>> 2016-07-01 10:54 GMT-07:00 Jeff Zhang <zjf...@gmail.com>:
>>
>>> This is not a bug, because these 2 processes use the same SPARK_PID_DIR
>>> which is /tmp by default.  Although you can resolve this by using
>>> different SPARK_PID_DIR, I suspect you would still have other issues like
>>> port conflict. I would suggest you to deploy one spark thrift server per
>>> machine for now. If stick to deploy multiple spark thrift server on one
>>> machine, then define different SPARK_CONF_DIR, SPARK_LOG_DIR and
>>> SPARK_PID_DIR for your 2 instances of spark thrift server. Not sure if
>>> there's other conflicts. but please try first.
>>>
>>>
>>> On Fri, Jul 1, 2016 at 10:47 AM, Egor Pahomov <pahomov.e...@gmail.com>
>>> wrote:
>>>
>>>> I get
>>>>
>>>> "org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 running as
>>>> process 28989.  Stop it first."
>>>>
>>>> Is it a bug?
>>>>
>>>> 2016-07-01 10:10 GMT-07:00 Jeff Zhang <zjf...@gmail.com>:
>>>>
>>>>> I don't think the one instance per machine is true.  As long as you
>>>>> resolve the conflict issue such as port conflict, pid file, log file and
>>>>> etc, you can run multiple instances of spark thrift server.
>>>>>
>>>>> On Fri, Jul 1, 2016 at 9:32 AM, Egor Pahomov <pahomov.e...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi, I'm using Spark Thrift JDBC server and 2 limitations are really
>>>>>> bother me -
>>>>>>
>>>>>> 1) One instance per machine
>>>>>> 2) Yarn client only(not yarn cluster)
>>>>>>
>>>>>> Are there any architectural reasons for such limitations? About
>>>>>> yarn-client I might understand in theory - master is the same process as 
>>>>>> a
>>>>>> server, so it makes some sense, but it's really inconvenient - I need a 
>>>>>> lot
>>>>>> of memory on my driver machine. Reasons for one instance per machine I do
>>>>>> not understand.
>>>>>>
>>>>>> --
>>>>>>
>>>>>>
>>>>>> *Sincerely yoursEgor Pakhomov*
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best Regards
>>>>>
>>>>> Jeff Zhang
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>>
>>>>
>>>> *Sincerely yoursEgor Pakhomov*
>>>>
>>>
>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>
>>
>>
>> --
>>
>>
>> *Sincerely yoursEgor Pakhomov*
>>
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>



-- 


*Sincerely yoursEgor Pakhomov*


Re: Thrift JDBC server - why only one per machine and only yarn-client

2016-07-01 Thread Egor Pahomov
Takeshi, of course I used different HIVE_SERVER2_THRIFT_PORT
Jeff, thanks, I would try, but from your answer I'm getting the feeling,
that I'm trying some very rare case?

2016-07-01 10:54 GMT-07:00 Jeff Zhang <zjf...@gmail.com>:

> This is not a bug, because these 2 processes use the same SPARK_PID_DIR
> which is /tmp by default.  Although you can resolve this by using
> different SPARK_PID_DIR, I suspect you would still have other issues like
> port conflict. I would suggest you to deploy one spark thrift server per
> machine for now. If stick to deploy multiple spark thrift server on one
> machine, then define different SPARK_CONF_DIR, SPARK_LOG_DIR and
> SPARK_PID_DIR for your 2 instances of spark thrift server. Not sure if
> there's other conflicts. but please try first.
>
>
> On Fri, Jul 1, 2016 at 10:47 AM, Egor Pahomov <pahomov.e...@gmail.com>
> wrote:
>
>> I get
>>
>> "org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 running as
>> process 28989.  Stop it first."
>>
>> Is it a bug?
>>
>> 2016-07-01 10:10 GMT-07:00 Jeff Zhang <zjf...@gmail.com>:
>>
>>> I don't think the one instance per machine is true.  As long as you
>>> resolve the conflict issue such as port conflict, pid file, log file and
>>> etc, you can run multiple instances of spark thrift server.
>>>
>>> On Fri, Jul 1, 2016 at 9:32 AM, Egor Pahomov <pahomov.e...@gmail.com>
>>> wrote:
>>>
>>>> Hi, I'm using Spark Thrift JDBC server and 2 limitations are really
>>>> bother me -
>>>>
>>>> 1) One instance per machine
>>>> 2) Yarn client only(not yarn cluster)
>>>>
>>>> Are there any architectural reasons for such limitations? About
>>>> yarn-client I might understand in theory - master is the same process as a
>>>> server, so it makes some sense, but it's really inconvenient - I need a lot
>>>> of memory on my driver machine. Reasons for one instance per machine I do
>>>> not understand.
>>>>
>>>> --
>>>>
>>>>
>>>> *Sincerely yoursEgor Pakhomov*
>>>>
>>>
>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>
>>
>>
>> --
>>
>>
>> *Sincerely yoursEgor Pakhomov*
>>
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>



-- 


*Sincerely yoursEgor Pakhomov*


Re: Thrift JDBC server - why only one per machine and only yarn-client

2016-07-01 Thread Egor Pahomov
I get

"org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 running as
process 28989.  Stop it first."

Is it a bug?

2016-07-01 10:10 GMT-07:00 Jeff Zhang <zjf...@gmail.com>:

> I don't think the one instance per machine is true.  As long as you
> resolve the conflict issue such as port conflict, pid file, log file and
> etc, you can run multiple instances of spark thrift server.
>
> On Fri, Jul 1, 2016 at 9:32 AM, Egor Pahomov <pahomov.e...@gmail.com>
> wrote:
>
>> Hi, I'm using Spark Thrift JDBC server and 2 limitations are really
>> bother me -
>>
>> 1) One instance per machine
>> 2) Yarn client only(not yarn cluster)
>>
>> Are there any architectural reasons for such limitations? About
>> yarn-client I might understand in theory - master is the same process as a
>> server, so it makes some sense, but it's really inconvenient - I need a lot
>> of memory on my driver machine. Reasons for one instance per machine I do
>> not understand.
>>
>> --
>>
>>
>> *Sincerely yoursEgor Pakhomov*
>>
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>



-- 


*Sincerely yoursEgor Pakhomov*


Thrift JDBC server - why only one per machine and only yarn-client

2016-07-01 Thread Egor Pahomov
Hi, I'm using Spark Thrift JDBC server and 2 limitations are really bother
me -

1) One instance per machine
2) Yarn client only(not yarn cluster)

Are there any architectural reasons for such limitations? About yarn-client
I might understand in theory - master is the same process as a server, so
it makes some sense, but it's really inconvenient - I need a lot of memory
on my driver machine. Reasons for one instance per machine I do not
understand.

-- 


*Sincerely yoursEgor Pakhomov*


Re: 1.6.0: Standalone application: Getting ClassNotFoundException: org.datanucleus.api.jdo.JDOPersistenceManagerFactory

2016-01-14 Thread Egor Pahomov
My fault, I should have read documentation more accurate -
http://spark.apache.org/docs/latest/sql-programming-guide.html precisely
says, that I need to add these 3 jars to class path in case I need them. We
can not include them in fat jar, because they OSGI and require to have
plugin.xml and META_INF/MANIFEST.MF in root of jar. The problem is you have
3 of them and every one has it's own plugin.xml. You can include all this
in fat jar if you would be able to merge plugin.xml, but currently there is
no tool to do so. maven assembly plugin just has no such merger, maven
shaded plugin has XmlAppenderTransformer, but for some reason it doesn't
work. And that is it - you just have to live with the fact, that you have
fat jar with all dep, except these 3. Good news is if you are in
yarn-client mode you only need to add them to classpath of your driver, you
do not have to do addJar(). It's really good news, since it's hard to do
addJar() properly in Oozie job.

2016-01-12 17:01 GMT-08:00 Egor Pahomov <pahomov.e...@gmail.com>:

> Hi, I'm moving my infrastructure from 1.5.2 to 1.6.0 and experiencing
> serious issue. I successfully updated spark thrift server from 1.5.2 to
> 1.6.0. But I have standalone application, which worked fine with 1.5.2 but
> failing on 1.6.0 with:
>
> *NestedThrowables:*
> *java.lang.ClassNotFoundException:
> org.datanucleus.api.jdo.JDOPersistenceManagerFactory*
> * at
> javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1175)*
> * at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808)*
> * at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701)*
>
> Inside this application I work with hive table, which have data in json
> format.
>
> When I add
>
> 
> org.datanucleus
> datanucleus-core
> 4.0.0-release
> 
>
> 
> org.datanucleus
> datanucleus-api-jdo
> 4.0.0-release
> 
>
> 
> org.datanucleus
> datanucleus-rdbms
> 3.2.9
> 
>
> I'm getting:
>
> *Caused by: org.datanucleus.exceptions.NucleusUserException: Persistence
> process has been specified to use a ClassLoaderResolver of name
> "datanucleus" yet this has not been found by the DataNucleus plugin
> mechanism. Please check your CLASSPATH and plugin specification.*
> * at
> org.datanucleus.AbstractNucleusContext.(AbstractNucleusContext.java:102)*
> * at
> org.datanucleus.PersistenceNucleusContextImpl.(PersistenceNucleusContextImpl.java:162)*
>
> I have CDH 5.5. I build spark with
>
> *./make-distribution.sh -Pyarn -Phadoop-2.6
> -Dhadoop.version=2.6.0-cdh5.5.0 -Phive -DskipTests*
>
> Than I publish fat jar locally:
>
> *mvn org.apache.maven.plugins:maven-install-plugin:2.3.1:install-file
> -Dfile=./spark-assembly.jar -DgroupId=org.spark-project
> -DartifactId=my-spark-assembly -Dversion=1.6.0-SNAPSHOT -Dpackaging=jar*
>
> Than I include dependency on this fat jar:
>
> 
> org.spark-project
> my-spark-assembly
> 1.6.0-SNAPSHOT
> 
>
> Than I build my application with assembly plugin:
>
> 
> org.apache.maven.plugins
> maven-shade-plugin
> 
> 
> 
> *:*
> 
> 
> 
> 
> *:*
> 
> META-INF/*.SF
> META-INF/*.DSA
> META-INF/*.RSA
> 
> 
> 
> 
> 
> 
> package
> 
> shade
> 
> 
> 
>  
> implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
>  
> implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
> 
> META-INF/services/org.apache.hadoop.fs.FileSystem
> 
>  
> implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
> reference.conf
> 
>  
> implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
> log4j.properties
> 
>  
> implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
>  
> implementation="org.apache.maven.plugins.shade.resource.Apac

1.6.0: Standalone application: Getting ClassNotFoundException: org.datanucleus.api.jdo.JDOPersistenceManagerFactory

2016-01-12 Thread Egor Pahomov
Hi, I'm moving my infrastructure from 1.5.2 to 1.6.0 and experiencing
serious issue. I successfully updated spark thrift server from 1.5.2 to
1.6.0. But I have standalone application, which worked fine with 1.5.2 but
failing on 1.6.0 with:

*NestedThrowables:*
*java.lang.ClassNotFoundException:
org.datanucleus.api.jdo.JDOPersistenceManagerFactory*
* at
javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1175)*
* at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808)*
* at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701)*

Inside this application I work with hive table, which have data in json
format.

When I add


org.datanucleus
datanucleus-core
4.0.0-release



org.datanucleus
datanucleus-api-jdo
4.0.0-release



org.datanucleus
datanucleus-rdbms
3.2.9


I'm getting:

*Caused by: org.datanucleus.exceptions.NucleusUserException: Persistence
process has been specified to use a ClassLoaderResolver of name
"datanucleus" yet this has not been found by the DataNucleus plugin
mechanism. Please check your CLASSPATH and plugin specification.*
* at
org.datanucleus.AbstractNucleusContext.(AbstractNucleusContext.java:102)*
* at
org.datanucleus.PersistenceNucleusContextImpl.(PersistenceNucleusContextImpl.java:162)*

I have CDH 5.5. I build spark with

*./make-distribution.sh -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0-cdh5.5.0
-Phive -DskipTests*

Than I publish fat jar locally:

*mvn org.apache.maven.plugins:maven-install-plugin:2.3.1:install-file
-Dfile=./spark-assembly.jar -DgroupId=org.spark-project
-DartifactId=my-spark-assembly -Dversion=1.6.0-SNAPSHOT -Dpackaging=jar*

Than I include dependency on this fat jar:


org.spark-project
my-spark-assembly
1.6.0-SNAPSHOT


Than I build my application with assembly plugin:


org.apache.maven.plugins
maven-shade-plugin



*:*




*:*

META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA






package

shade






META-INF/services/org.apache.hadoop.fs.FileSystem


reference.conf


log4j.properties









Configuration of assembly plugin is copy-past from spark assembly pom.

This workflow worked for 1.5.2 and broke for 1.6.0. If I have not good
approach of creating this standalone application, please recommend
other approach, but spark-submit does not work for me - it hard for me
to connect it to Oozie.

Any suggestion would be appreciated - I'm stuck.

My spark config:

lazy val sparkConf = new SparkConf()
  .setMaster("yarn-client")
  .setAppName(appName)
  .set("spark.yarn.queue", "jenkins")
  .set("spark.executor.memory", "10g")
  .set("spark.yarn.executor.memoryOverhead", "2000")
  .set("spark.executor.cores", "3")
  .set("spark.driver.memory", "4g")
  .set("spark.shuffle.io.numConnectionsPerPeer", "5")
  .set("spark.sql.autoBroadcastJoinThreshold", "200483647")
  .set("spark.network.timeout", "1000s")
  .set("spark.executor.extraJavaOptions", "-XX:MaxPermSize=2g")
  .set("spark.driver.maxResultSize", "2g")
  .set("spark.rpc.lookupTimeout", "1000s")
  .set("spark.sql.hive.convertMetastoreParquet", "false")
  .set("spark.kryoserializer.buffer.max", "200m")
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .set("spark.yarn.driver.memoryOverhead", "1000")
  .set("spark.dynamicAllocation.enabled", "true")
  .set("spark.shuffle.service.enabled", "true")
  .set("spark.dynamicAllocation.minExecutors", "1")
  .set("spark.dynamicAllocation.maxExecutors", "20")
  .set("spark.dynamicAllocation.executorIdleTimeout", "60s")
  .set("spark.sql.tungsten.enabled", "false")
  .set("spark.dynamicAllocation.cachedExecutorIdleTimeout", "100s")
.setJars(List(this.getClass.getProtectionDomain().getCodeSource().getLocation().toURI().getPath()))

-- 



*Sincerely yoursEgor Pakhomov*


Elastic allocation(spark.dynamicAllocation.enabled) results in task never being executed.

2014-11-14 Thread Egor Pahomov
Hi.
I execute ipython notebook + pyspark with spark.dynamicAllocation.enabled =
true. Task never ends.
Code:

import sys
from random import random
from operator import add
partitions = 10
n = 10 * partitions

def f(_):
x = random() * 2 - 1
y = random() * 2 - 1
return 1 if x ** 2 + y ** 2  1 else 0

count = sc.parallelize(xrange(1, n + 1), partitions).map(f).reduce(add)
print Pi is roughly %f % (4.0 * count / n)



Run notebook:

IPYTHON_ARGS=notebook --profile=ydf --port $IPYTHON_PORT
--port-retries=0 --ip='*' --no-browser
pyspark \
--verbose \
--master yarn-client \
--conf spark.driver.port=$((RANDOM_PORT + 2)) \
--conf spark.broadcast.port=$((RANDOM_PORT + 3)) \
--conf spark.replClassServer.port=$((RANDOM_PORT + 4)) \
--conf spark.blockManager.port=$((RANDOM_PORT + 5)) \
--conf spark.executor.port=$((RANDOM_PORT + 6)) \
--conf spark.fileserver.port=$((RANDOM_PORT + 7)) \
--conf spark.shuffle.service.enabled=true \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.minExecutors=1 \
--conf spark.dynamicAllocation.maxExecutors=10 \
--conf spark.ui.port=$SPARK_UI_PORT


Spark/Ipython log is in attachment.

-- 



*Sincerely yoursEgor PakhomovScala Developer, Yandex*


ipython_out
Description: Binary data

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: Elastic allocation(spark.dynamicAllocation.enabled) results in task never being executed.

2014-11-14 Thread Egor Pahomov
It's successful without dynamic allocation. I can provide spark log for
that scenario if it can help.

2014-11-14 21:36 GMT+02:00 Sandy Ryza sandy.r...@cloudera.com:

 Hi Egor,

 Is it successful without dynamic allocation? From your log, it looks like
 the job is unable to acquire resources from YARN, which could be because
 other jobs are using up all the resources.

 -Sandy

 On Fri, Nov 14, 2014 at 11:32 AM, Egor Pahomov pahomov.e...@gmail.com
 wrote:

 Hi.
 I execute ipython notebook + pyspark with spark.dynamicAllocation.enabled
 = true. Task never ends.
 Code:

 import sys
 from random import random
 from operator import add
 partitions = 10
 n = 10 * partitions

 def f(_):
 x = random() * 2 - 1
 y = random() * 2 - 1
 return 1 if x ** 2 + y ** 2  1 else 0

 count = sc.parallelize(xrange(1, n + 1), partitions).map(f).reduce(add)
 print Pi is roughly %f % (4.0 * count / n)



 Run notebook:

 IPYTHON_ARGS=notebook --profile=ydf --port $IPYTHON_PORT --port-retries=0 
 --ip='*' --no-browser
 pyspark \
 --verbose \
 --master yarn-client \
 --conf spark.driver.port=$((RANDOM_PORT + 2)) \
 --conf spark.broadcast.port=$((RANDOM_PORT + 3)) \
 --conf spark.replClassServer.port=$((RANDOM_PORT + 4)) \
 --conf spark.blockManager.port=$((RANDOM_PORT + 5)) \
 --conf spark.executor.port=$((RANDOM_PORT + 6)) \
 --conf spark.fileserver.port=$((RANDOM_PORT + 7)) \
 --conf spark.shuffle.service.enabled=true \
 --conf spark.dynamicAllocation.enabled=true \
 --conf spark.dynamicAllocation.minExecutors=1 \
 --conf spark.dynamicAllocation.maxExecutors=10 \
 --conf spark.ui.port=$SPARK_UI_PORT


 Spark/Ipython log is in attachment.

 --



 *Sincerely yoursEgor PakhomovScala Developer, Yandex*


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





-- 



*Sincerely yoursEgor PakhomovScala Developer, Yandex*


java.io.FileNotFoundException in usercache

2014-09-25 Thread Egor Pahomov
I work with spark on unstable cluster with bad administration.
I started get

14/09/25 15:29:56 ERROR storage.DiskBlockObjectWriter: Uncaught
exception while reverting partial writes to file
/local/hd2/yarn/local/usercache/epahomov/appcache/application_1411219858924_15501/spark-local-20140925151931-a4c3/3a/shuffle_4_30_174

java.io.FileNotFoundException:
/local/hd2/yarn/local/usercache/epahomov/appcache/application_1411219858924_15501/spark-local-20140925151931-a4c3/3a/shuffle_4_30_174
(No such file or directory)

couple days ago. After this error spark context shuted down. I'm are that
there are some problems with distributed cache on cluster, some people
add too much data in it.

I totally don't understand what's going on, but willing to undertand deeply.

1) Does spark somehow rely on yarn localization mechanizm?
2) What is directory usercache about?
3) Is there a quick way to go around of problem?
4) Isn't shutting spark context is overreaction on this error?


-- 



*Sincerely yoursEgor PakhomovDeveloper, Yandex*


Re: SPARK 1.1.0 on yarn-cluster and external JARs

2014-09-25 Thread Egor Pahomov
SparkContext.addJar()?

Why you didn't like fat jar way?

2014-09-25 16:25 GMT+04:00 rzykov rzy...@gmail.com:

 We build some SPARK jobs with external jars. I compile jobs by including
 them
 in one assembly.
 But look for an approach to put all external jars into HDFS.

 We have already put  spark jar in a HDFS folder and set up the variable
 SPARK_JAR.
 What is the best way to do that for other external jars (MongoDB, algebird
 and so on)?

 Thanks in advance





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/SPARK-1-1-0-on-yarn-cluster-and-external-JARs-tp15136.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 



*Sincerely yoursEgor PakhomovScala Developer, Yandex*


pyspark + yarn: how everything works.

2014-07-04 Thread Egor Pahomov
Hi, I want to use pySpark with yarn. But documentation doesn't give me full
understanding on what's going on, and I simply don't understand code. So:

1) How python shipped to cluster? Should machines in cluster already have
python?
2) What happens when I write some python code in map function - is it
shipped to cluster and just executed on it? How it understand all
dependencies, which my code need and ship it there? If I use Math in my
code in map does it mean, that I would ship Math class or some python
Math on cluster would be used?
3) I have c++ compiled code. Can I ship this executable with addPyFile
and just use exec function from python? Would it work?



-- 



*Sincerely yoursEgor PakhomovScala Developer, Yandex*


K-means faster on Mahout then on Spark

2014-03-25 Thread Egor Pahomov
Hi, I'm running benchmark, which compares Mahout and SparkML. For now I
have next results for k-means:
Number of iterations= 10, number of elements = 1000, mahouttime= 602,
spark time = 138
Number of iterations= 40, number of elements = 1000, mahouttime= 1917,
spark time = 330
Number of iterations= 70, number of elements = 1000, mahouttime= 3203,
spark time = 388
Number of iterations= 10, number of elements = 1, mahouttime= 1235,
spark time = 2226
Number of iterations= 40, number of elements = 1, mahouttime= 2755,
spark time = 6388
Number of iterations= 70, number of elements = 1, mahouttime= 4107,
spark time = 10967
Number of iterations= 10, number of elements = 10, mahouttime=
7070, spark time = 25268

Time in seconds. It runs on Yarn cluster with about 40 machines. Elements
for clusterization are randomly created. When I changed persistence level
from Memory to Memory_and_disk, on big data spark started to work faster.

What am I missing?

See my benchmarking code in attachment.


-- 



*Sincerely yoursEgor PakhomovScala Developer, Yandex*

package ru.yandex.spark.examples

import scala.util.Random
import scala.collection.mutable
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.hadoop.conf.Configuration
import ru.yandex.spark.benchmark.Job
import org.apache.mahout.common.distance.EuclideanDistanceMeasure
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.{LoggerFactory, Logger}
import org.apache.spark.storage.StorageLevel

object KMeansBenchMark {

  private final val log: Logger = LoggerFactory.getLogger(this.getClass)

  val benchPath: Path = new Path(/tmp/benchmark)
  val inputDataPath: Path = new Path(/tmp/benchmark/testdata)
  val outputDataPath: Path = new Path(/tmp/benchmark/output)

  val configuration = new Configuration()
  val fs = FileSystem.get(FileSystem.getDefaultUri(configuration), configuration)

  def main(args: Array[String]) {

type MahoutTime = Long
type SparkTime = Long
type NumberOfIterations = Int
type NumberOfElements = Long

val result = new mutable.MutableList[(NumberOfIterations, NumberOfElements, MahoutTime, SparkTime)]


System.setProperty(SPARK_YARN_APP_JAR, SparkContext.jarOfClass(this.getClass).head)
System.setProperty(SPARK_JAR, SparkContext.jarOfClass(SparkContext.getClass).head)
System.setProperty(spark.driver.port, 49014)

val conf = new SparkConf()
conf.setAppName(serp-api)
conf.setMaster(yarn-client)
conf.set(spark.httpBroadcast.port, 35660)
conf.set(spark.fileserver.port, 35661)
conf.setJars(SparkContext.jarOfClass(this.getClass))


val numbers = List(1000L, 1L, 10L, 10L)

for (numberOfElements: NumberOfElements - numbers) {
  for (numberOfIterations: NumberOfIterations - 10 until 80 by 30) {
println(s- ${numberOfElements} ${numberOfIterations})
prepareData(numberOfElements)

val sparkStart = System.currentTimeMillis()
val spark = new SparkContext(conf)
val input = spark.textFile(inputDataPath.toString).map(s = s.split( ).map(number = number.toDouble)).persist(StorageLevel.DISK_ONLY)
KMeans.train(input, 10, numberOfIterations, 1, KMeans.RANDOM).clusterCenters
spark.stop()
val sparkEnd = System.currentTimeMillis()

val mahaoutStart = System.currentTimeMillis()
Job.run(configuration, inputDataPath, outputDataPath, new EuclideanDistanceMeasure, 10, 0.5, numberOfIterations)
val mahaoutEnd = System.currentTimeMillis()

val mahaoutTime: MahoutTime = (mahaoutEnd - mahaoutStart) / 1000
val sparkTime: SparkTime = (sparkEnd - sparkStart) / 1000
result += ((numberOfIterations, numberOfElements, mahaoutTime, sparkTime))
for (i - result) {
  log.info(sNumber of iterations= ${i._1}, number of elements = ${i._2}, mahouttime= ${i._3}, spark time = ${i._4})
}
for (i - result) {
  println(sNumber of iterations= ${i._1}, number of elements = ${i._2}, mahouttime= ${i._3}, spark time = ${i._4})
}
  }
}

  }

  def prepareData(numberOfElements: Long) = {
fs.delete(benchPath, true)
fs.mkdirs(benchPath)
val output = fs.create(inputDataPath)
for (i - 0L until numberOfElements) {
  output.writeBytes(nextRandom +   + nextRandom +   + nextRandom +   + nextRandom +   + nextRandom +   + nextRandom +   + nextRandom +   + nextRandom +   + nextRandom +   + nextRandom +   + nextRandom +   + nextRandom +   + nextRandom + \n)
}
output.close()
  }

  def nextRandom = {
Random.nextGaussian() * 10e5 - Random.nextInt(10) * 10e4
  }

}


Re: K-means faster on Mahout then on Spark

2014-03-25 Thread Egor Pahomov
Mahout used MR and made one MR on every iteration. It worked as predicted.
My question more about why spark was so slow. I would try
MEMORY_AND_DISK_SER


2014-03-25 17:58 GMT+04:00 Suneel Marthi suneel_mar...@yahoo.com:

 Mahout does have a kmeans which can be executed in mapreduce and iterative
 modes.

 Sent from my iPhone

 On Mar 25, 2014, at 9:25 AM, Prashant Sharma scrapco...@gmail.com wrote:

 I think Mahout uses FuzzyKmeans, which is different algorithm and it is
 not iterative.

 Prashant Sharma


 On Tue, Mar 25, 2014 at 6:50 PM, Egor Pahomov pahomov.e...@gmail.comwrote:

 Hi, I'm running benchmark, which compares Mahout and SparkML. For now I
 have next results for k-means:
 Number of iterations= 10, number of elements = 1000, mahouttime= 602,
 spark time = 138
 Number of iterations= 40, number of elements = 1000, mahouttime=
 1917, spark time = 330
 Number of iterations= 70, number of elements = 1000, mahouttime=
 3203, spark time = 388
 Number of iterations= 10, number of elements = 1, mahouttime=
 1235, spark time = 2226
 Number of iterations= 40, number of elements = 1, mahouttime=
 2755, spark time = 6388
 Number of iterations= 70, number of elements = 1, mahouttime=
 4107, spark time = 10967
 Number of iterations= 10, number of elements = 10, mahouttime=
 7070, spark time = 25268

 Time in seconds. It runs on Yarn cluster with about 40 machines. Elements
 for clusterization are randomly created. When I changed persistence level
 from Memory to Memory_and_disk, on big data spark started to work faster.

 What am I missing?

 See my benchmarking code in attachment.


 --



 *Sincerely yours Egor PakhomovScala Developer, Yandex*





-- 



*Sincerely yoursEgor PakhomovScala Developer, Yandex*


[Powered by] Yandex Islands powered by Spark

2014-03-16 Thread Egor Pahomov
Hi, page https://cwiki.apache.org/confluence/display/SPARK/Powered+By+Sparksays
I need write here, if want my project to be added there.

In Yandex (www.yandex.com) now we using spark for project Yandex Islands (
http://www.searchenginejournal.com/yandex-islands-markup-issues-implementation/71891/).
We process islands which come from search robot with spark.

-- 



*Sincerely yoursEgor PakhomovScala Developer, Yandex*


Re: Error reading HDFS file using spark 0.9.0 / hadoop 2.2.0 - incompatible protobuf 2.5 and 2.4.1

2014-02-28 Thread Egor Pahomov
Spark 0.9 uses protobuf 2.5.0
Hadoop 2.2 uses protobuf 2.5.0
protobuf 2.5.0 can read massages serialized with protobuf 2.4.1
So there is not any reason why you can't read some messages from hadoop 2.2
with protobuf 2.5.0, probably you somehow have 2.4.1 in your class path. Of
course it's very bad, that you have both 2.4.1 and 2.5.0 in your classpath.
Use excludes or whatever to get rid of 2.4.1.

Personally, I spend 3 days to move my project to protobuf 2.5.0 from 2.4.1.
But it has to be done for the whole your project.

2014-02-28 21:49 GMT+04:00 Aureliano Buendia buendia...@gmail.com:

 Doesn't hadoop 2.2 also depend on protobuf 2.4?


 On Fri, Feb 28, 2014 at 5:45 PM, Ognen Duzlevski 
 og...@plainvanillagames.com wrote:

 A stupid question, by the way, you did compile Spark with Hadoop 2.2.0
 support?

 Ognen

 On 2/28/14, 10:51 AM, Prasad wrote:

 Hi
 I am getting the protobuf error while reading HDFS file using spark
 0.9.0 -- i am running on hadoop 2.2.0 .

 When i look thru, i find that i have both 2.4.1 and 2.5 and some blogs
 suggest that there is some incompatability issues betwen 2.4.1 and 2.5

 hduser@prasadHdp1:~/spark-0.9.0-incubating$ find ~/ -name
 protobuf-java*.jar
 /home/hduser/.m2/repository/com/google/protobuf/protobuf-
 java/2.4.1/protobuf-java-2.4.1.jar
 /home/hduser/.m2/repository/org/spark-project/protobuf/
 protobuf-java/2.4.1-shaded/protobuf-java-2.4.1-shaded.jar
 /home/hduser/spark-0.9.0-incubating/lib_managed/
 bundles/protobuf-java-2.5.0.jar
 /home/hduser/spark-0.9.0-incubating/lib_managed/jars/
 protobuf-java-2.4.1-shaded.jar
 /home/hduser/.ivy2/cache/com.google.protobuf/protobuf-java/
 bundles/protobuf-java-2.5.0.jar
 /home/hduser/.ivy2/cache/org.spark-project.protobuf/
 protobuf-java/jars/protobuf-java-2.4.1-shaded.jar


 Can someone please let me know if you faced these issues and how u fixed
 it.

 Thanks
 Prasad.
 Caused by: java.lang.VerifyError: class
 org.apache.hadoop.security.proto.SecurityProtos$
 GetDelegationTokenRequestProto
 overrides final method
 getUnknownFields.()Lcom/google/protobuf/UnknownFieldSet;
  at java.lang.ClassLoader.defineClass1(Native Method)
  at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
  at
 java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
  at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
  at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
  at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
  at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
  at java.security.AccessController.doPrivileged(Native Method)
  at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
  at java.lang.Class.getDeclaredMethods0(Native Method)
  at java.lang.Class.privateGetDeclaredMethods(Class.java:2531)
  at java.lang.Class.privateGetPublicMethods(Class.java:2651)
  at java.lang.Class.privateGetPublicMethods(Class.java:2661)
  at java.lang.Class.getMethods(Class.java:1467)
  at
 sun.misc.ProxyGenerator.generateClassFile(ProxyGenerator.java:426)
  at
 sun.misc.ProxyGenerator.generateProxyClass(ProxyGenerator.java:323)
  at java.lang.reflect.Proxy.getProxyClass0(Proxy.java:636)
  at java.lang.reflect.Proxy.newProxyInstance(Proxy.java:722)
  at
 org.apache.hadoop.ipc.ProtobufRpcEngine.getProxy(
 ProtobufRpcEngine.java:92)
  at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:537)


 Caused by: java.lang.reflect.InvocationTargetException
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at
 sun.reflect.NativeMethodAccessorImpl.invoke(
 NativeMethodAccessorImpl.java:57)
  at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(
 DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:606)










 --
 View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/Error-reading-HDFS-file-using-spark-
 0-9-0-hadoop-2-2-0-incompatible-protobuf-2-5-and-2-4-1-tp2158.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


 --
 Some people, when confronted with a problem, think I know, I'll use
 regular expressions. Now they have two problems.
 -- Jamie Zawinski





-- 



*Sincerely yoursEgor PakhomovScala Developer, Yandex*


Re: Error reading HDFS file using spark 0.9.0 / hadoop 2.2.0 - incompatible protobuf 2.5 and 2.4.1

2014-02-28 Thread Egor Pahomov
In that same pom

profile
  idyarn/id
  properties
hadoop.major.version2/hadoop.major.version
hadoop.version2.2.0/hadoop.version
protobuf.version2.5.0/protobuf.version
  /properties
  modules
moduleyarn/module
  /modules

/profile



2014-02-28 23:46 GMT+04:00 Aureliano Buendia buendia...@gmail.com:




 On Fri, Feb 28, 2014 at 7:17 PM, Egor Pahomov pahomov.e...@gmail.comwrote:

 Spark 0.9 uses protobuf 2.5.0


 Spark 0.9 uses 2.4.1:


 https://github.com/apache/incubator-spark/blob/4d880304867b55a4f2138617b30600b7fa013b14/pom.xml#L118

 Is there another pom for when hadoop 2.2 is used? I don't see another
 branch for hadooop 2.2.


 Hadoop 2.2 uses protobuf 2.5.0
 protobuf 2.5.0 can read massages serialized with protobuf 2.4.1


 Protobuf java code generated by ptotoc 2.4 does not compile with protobuf
 library 2.5. This is what the OP's error message is about.


 So there is not any reason why you can't read some messages from hadoop
 2.2 with protobuf 2.5.0, probably you somehow have 2.4.1 in your class
 path. Of course it's very bad, that you have both 2.4.1 and 2.5.0 in your
 classpath. Use excludes or whatever to get rid of 2.4.1.

 Personally, I spend 3 days to move my project to protobuf 2.5.0 from
 2.4.1. But it has to be done for the whole your project.

 2014-02-28 21:49 GMT+04:00 Aureliano Buendia buendia...@gmail.com:

 Doesn't hadoop 2.2 also depend on protobuf 2.4?


 On Fri, Feb 28, 2014 at 5:45 PM, Ognen Duzlevski 
 og...@plainvanillagames.com wrote:

 A stupid question, by the way, you did compile Spark with Hadoop 2.2.0
 support?

 Ognen

 On 2/28/14, 10:51 AM, Prasad wrote:

 Hi
 I am getting the protobuf error while reading HDFS file using spark
 0.9.0 -- i am running on hadoop 2.2.0 .

 When i look thru, i find that i have both 2.4.1 and 2.5 and some blogs
 suggest that there is some incompatability issues betwen 2.4.1 and 2.5

 hduser@prasadHdp1:~/spark-0.9.0-incubating$ find ~/ -name
 protobuf-java*.jar
 /home/hduser/.m2/repository/com/google/protobuf/protobuf-
 java/2.4.1/protobuf-java-2.4.1.jar
 /home/hduser/.m2/repository/org/spark-project/protobuf/
 protobuf-java/2.4.1-shaded/protobuf-java-2.4.1-shaded.jar
 /home/hduser/spark-0.9.0-incubating/lib_managed/
 bundles/protobuf-java-2.5.0.jar
 /home/hduser/spark-0.9.0-incubating/lib_managed/jars/
 protobuf-java-2.4.1-shaded.jar
 /home/hduser/.ivy2/cache/com.google.protobuf/protobuf-java/
 bundles/protobuf-java-2.5.0.jar
 /home/hduser/.ivy2/cache/org.spark-project.protobuf/
 protobuf-java/jars/protobuf-java-2.4.1-shaded.jar


 Can someone please let me know if you faced these issues and how u
 fixed it.

 Thanks
 Prasad.
 Caused by: java.lang.VerifyError: class
 org.apache.hadoop.security.proto.SecurityProtos$
 GetDelegationTokenRequestProto
 overrides final method
 getUnknownFields.()Lcom/google/protobuf/UnknownFieldSet;
  at java.lang.ClassLoader.defineClass1(Native Method)
  at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
  at
 java.security.SecureClassLoader.defineClass(
 SecureClassLoader.java:142)
  at java.net.URLClassLoader.defineClass(URLClassLoader.
 java:449)
  at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
  at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
  at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
  at java.security.AccessController.doPrivileged(Native Method)
  at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
  at java.lang.Class.getDeclaredMethods0(Native Method)
  at java.lang.Class.privateGetDeclaredMethods(Class.java:2531)
  at java.lang.Class.privateGetPublicMethods(Class.java:2651)
  at java.lang.Class.privateGetPublicMethods(Class.java:2661)
  at java.lang.Class.getMethods(Class.java:1467)
  at
 sun.misc.ProxyGenerator.generateClassFile(ProxyGenerator.java:426)
  at
 sun.misc.ProxyGenerator.generateProxyClass(ProxyGenerator.java:323)
  at java.lang.reflect.Proxy.getProxyClass0(Proxy.java:636)
  at java.lang.reflect.Proxy.newProxyInstance(Proxy.java:722)
  at
 org.apache.hadoop.ipc.ProtobufRpcEngine.getProxy(
 ProtobufRpcEngine.java:92)
  at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:537)


 Caused by: java.lang.reflect.InvocationTargetException
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
 Method)
  at
 sun.reflect.NativeMethodAccessorImpl.invoke(
 NativeMethodAccessorImpl.java:57)
  at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(
 DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:606)










 --
 View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/Error-reading-HDFS