Re: Data Duplication Bug Found - Structured Streaming Versions 3..4.1, 3.2.4, and 3.3.2

2023-09-18 Thread Jerry Peng
Hi Craig,

Thank you for sending us more information.  Can you answer my previous
question which I don't think the document addresses. How did you determine
duplicates in the output?  How was the output data read? The FileStreamSink
provides exactly-once writes ONLY if you read the output with the
FileStreamSource or the FileSource (batch).  A log is used to determine
what data is committed or not and those aforementioned sources know how to
use that log to read the data "exactly-once".  So there may be duplicated
data written on disk.  If you simply just read the data files written to
disk you may see duplicates when there are failures.  However, if you read
the output location with Spark you should get exactly once results (unless
there is a bug) since spark will know how to use the commit log to see what
data files are committed and not.

Best,

Jerry

On Mon, Sep 18, 2023 at 1:18 PM Craig Alfieri 
wrote:

> Hi Russell/Jerry/Mich,
>
>
>
> Appreciate your patience on this.
>
>
>
> Attached are more details on how this duplication “error” was found.
>
> Since we’re still unsure I am using “error” in quotes.
>
>
>
> We’d love the opportunity to work with any of you directly and/or the
> wider Spark community to triage this or get a better understanding of the
> nature of what we’re experiencing.
>
>
>
> Our platform provides the ability to fully reproduce this.
>
>
>
> Once you have had the chance to review the attached draft, let us know if
> there are any questions in the meantime. Again, we welcome the opportunity
> to work with the teams on this.
>
>
>
> Best-
>
> Craig
>
>
>
>
>
>
>
> *From: *Craig Alfieri 
> *Date: *Thursday, September 14, 2023 at 8:45 PM
> *To: *russell.spit...@gmail.com 
> *Cc: *Jerry Peng , Mich Talebzadeh <
> mich.talebza...@gmail.com>, user@spark.apache.org ,
> connor.mc...@antithesis.com 
> *Subject: *Re: Data Duplication Bug Found - Structured Streaming Versions
> 3..4.1, 3.2.4, and 3.3.2
>
> Hi Russell et al,
>
>
>
> Acknowledging receipt; we’ll get these answers back to the group.
>
>
>
> Follow-up forthcoming.
>
>
>
> Craig
>
>
>
>
>
>
>
> On Sep 14, 2023, at 6:38 PM, russell.spit...@gmail.com wrote:
>
> Exactly once should be output sink dependent, what sink was being used?
>
> Sent from my iPhone
>
>
>
> On Sep 14, 2023, at 4:52 PM, Jerry Peng 
> wrote:
>
> 
>
> Craig,
>
>
>
> Thanks! Please let us know the result!
>
>
>
> Best,
>
>
>
> Jerry
>
>
>
> On Thu, Sep 14, 2023 at 12:22 PM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>
>
> Hi Craig,
>
>
>
> Can you please clarify what this bug is and provide sample code causing
> this issue?
>
>
>
> HTH
>
>
> Mich Talebzadeh,
>
> Distinguished Technologist, Solutions Architect & Engineer
>
> London
>
> United Kingdom
>
>
>
>  [image: Image removed by sender.]  view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Thu, 14 Sept 2023 at 17:48, Craig Alfieri 
> wrote:
>
> Hello Spark Community-
>
>
>
> As part of a research effort, our team here at Antithesis tests for
> correctness/fault tolerance of major OSS projects.
>
> Our team recently was testing Spark’s Structured Streaming, and we came
> across a data duplication bug we’d like to work with the teams on to
> resolve.
>
>
>
> Our intention is to utilize this as a future case study for our platform,
> but prior to doing so we like to have a resolution in place so that an
> announcement isn’t alarming to the user base.
>
>
>
> Attached is a high level .pdf that reviews the High Availability set-up
> put under test.
>
> This was also tested across the three latest versions, and the same
> behavior was observed.
>
>
>
> We can reproduce this error readily, since our environment is fully
> deterministic, we are just not Spark experts and would like to work with
> someone in the community to resolve this.
>
>
>
> Please let us know at your earliest convenience.
>
>
>
> Best
>
>
>
> Error! Filename not specified.
>

Re: Data Duplication Bug Found - Structured Streaming Versions 3..4.1, 3.2.4, and 3.3.2

2023-09-14 Thread Jerry Peng
Craig,

Thanks! Please let us know the result!

Best,

Jerry

On Thu, Sep 14, 2023 at 12:22 PM Mich Talebzadeh 
wrote:

>
> Hi Craig,
>
> Can you please clarify what this bug is and provide sample code causing
> this issue?
>
> HTH
>
> Mich Talebzadeh,
> Distinguished Technologist, Solutions Architect & Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 14 Sept 2023 at 17:48, Craig Alfieri 
> wrote:
>
>> Hello Spark Community-
>>
>>
>>
>> As part of a research effort, our team here at Antithesis tests for
>> correctness/fault tolerance of major OSS projects.
>>
>> Our team recently was testing Spark’s Structured Streaming, and we came
>> across a data duplication bug we’d like to work with the teams on to
>> resolve.
>>
>>
>>
>> Our intention is to utilize this as a future case study for our platform,
>> but prior to doing so we like to have a resolution in place so that an
>> announcement isn’t alarming to the user base.
>>
>>
>>
>> Attached is a high level .pdf that reviews the High Availability set-up
>> put under test.
>>
>> This was also tested across the three latest versions, and the same
>> behavior was observed.
>>
>>
>>
>> We can reproduce this error readily, since our environment is fully
>> deterministic, we are just not Spark experts and would like to work with
>> someone in the community to resolve this.
>>
>>
>>
>> Please let us know at your earliest convenience.
>>
>>
>>
>> Best
>>
>>
>>
>> *[image: signature_2327449931]*
>>
>> *Craig Alfieri*
>>
>> c: 917.841.1652
>>
>> craig.alfi...@antithesis.com
>>
>> New York, NY.
>>
>> Antithesis.com
>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__www.antithesis.com_=DwMFaQ=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4=1FbSpGgVIpZO4QkQDmXk7jc1BFVciZWVioOvdJ86ubY=5SVjNvtYuy6icWSaP0lwjzTQw1Cc7JQO9QVaxn5KxqTdH8HC1HHURutlp5rgiaMH=SRmgBE5ImnGZ-GuqL3X6Q_6NPYiay1gLRbcUUofPIHo=>
>>
>>
>>
>> We can't talk about most of the bugs that we've found for our customers,
>>
>> but some customers like to speak about their work with us:
>>
>> https://github.com/mongodb/mongo/wiki/Testing-MongoDB-with-Antithesis
>>
>>
>>
>>
>>
>>
>> *-*
>> *This email and any files transmitted with it are confidential and
>> intended solely for the use of the individual or entity for whom they are
>> addressed. If you received this message in error, please notify the sender
>> and remove it from your system.*
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: Testing ETL with Spark using Pytest

2021-02-09 Thread Jerry Vinokurov
Sure, I think it makes sense in many cases to break things up like this.
Looking at your other example I'd say that you might want to break up
extractHiveData into several fixtures (one for session, one for config, one
for the df) because in my experience fixtures like those are reused
constantly across a test suite. In general I try to keep my fixtures to one
concrete task only, so that if I find myself repeating a pattern I just
factor it out into another fixture.

On Tue, Feb 9, 2021 at 11:14 AM Mich Talebzadeh 
wrote:

> Thanks Jerry for your comments.
>
> The easiest option and I concur is to have all these fixture files
> currently under fixtures package lumped together in conftest.py under
> * tests* package.
>
> Then you can get away all together from fixtures and it works. However, I
> gather plug and play becomes less manageable when you have a large number
> of fixtures (large being relative here). My main modules (not tests) are
> designed to do ETL from any database that supports JDBC connections (bar
> Google BigQuery that only works correctly with Spark API). You specify your
> source DB and target DB in yml file for any pluggable JDBC database
>
> Going back to Pytest, please  check this reference below for the reason
> for fixtures packaging
>
> How to modularize your py.test fixtures (github.com)
> <https://gist.github.com/peterhurford/09f7dcda0ab04b95c026c60fa49c2a68>
>
> With regard to your other point on fixtures (a fixture in each file), I
> have this fixture *loadIntoMysqlTable() *where it uses the data frame
> created in* extractHiveData*, reads sample records from Hive and
> populates MySql test table. The input needed is the Dataframe that is
> constructed in the fixture module extractHiveData which has been passed as
> parameter to this. This is the only way it seems to work through my tests
>
>
> @pytest.fixture(scope = "session")
> def extractHiveData():
> # read data through jdbc from Hive
> spark_session = s.spark_session(ctest['common']['appName'])
> tableName = config['GCPVariables']['sourceTable']
> fullyQualifiedTableName = config['hiveVariables']['DSDB'] + '.' +
> tableName
>house_df = s.loadTableFromHiveJDBC(spark_session,
> fullyQualifiedTableName)
> # sample data selected equally n rows from Kensington and Chelsea and
> n rows from City of Westminster
> num_rows = int(ctest['statics']['read_df_rows']/2)
> house_df = house_df.filter(col("regionname") == "Kensington and
> Chelsea").limit(num_rows).unionAll(house_df.filter(col("regionname") ==
> "City of Westminster").limit(num_rows))
> return house_df
>
> @pytest.fixture(scope = "session")
> def loadIntoMysqlTable(*extractHiveData*):
> try:
> *extractHiveData*. \
> write. \
> format("jdbc"). \
> option("url", test_url). \
> option("dbtable", ctest['statics']['sourceTable']). \
> option("user", ctest['statics']['user']). \
> option("password", ctest['statics']['password']). \
> option("driver", ctest['statics']['driver']). \
> mode(ctest['statics']['mode']). \
> save()
> return True
> except Exception as e:
> print(f"""{e}, quitting""")
> sys.exit(1)
>
> Thanks again.
>
>
> Mich
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 9 Feb 2021 at 15:47, Jerry Vinokurov 
> wrote:
>
>> Hi Mich,
>>
>> I'm a bit confused by what you mean when you say that you cannot call a
>> fixture in another fixture. The fixtures resolve dependencies among
>> themselves by means of their named parameters. So that means that if I have
>> a fixture
>>
>> @pytest.fixture
>> def fixture1():
>> return SomeObj()
>>
>> and another fixture
>>
>> @pytest.fixture
>> def fixture2(fixture1)
>> return do_something_with_obj(fixture1)
>>
>> my second fixture will simply receive the object created by the first. As
>> such, you do not need to &q

Re: Testing ETL with Spark using Pytest

2021-02-09 Thread Jerry Vinokurov
Hi Mich,

I'm a bit confused by what you mean when you say that you cannot call a
fixture in another fixture. The fixtures resolve dependencies among
themselves by means of their named parameters. So that means that if I have
a fixture

@pytest.fixture
def fixture1():
return SomeObj()

and another fixture

@pytest.fixture
def fixture2(fixture1)
return do_something_with_obj(fixture1)

my second fixture will simply receive the object created by the first. As
such, you do not need to "call" the second fixture at all. Of course, if
you had some use case where you were constructing an object in the second
fixture, you could have the first return a class, or you could have it
return a function. In fact, I have fixtures in a project that do both. Here
they are:

@pytest.fixture
def func():

def foo(x, y, z):

return (x + y) * z

return foo

That's a fixture that returns a function, and any test using the func
fixture would receive that actual function as a value, which could then be
invoked by calling e.g. func(1, 2, 3). Here's another fixture that's more
like what you're doing:


@pytest.fixture
def data_frame():

return pd.DataFrame.from_records([(1, 2, 3), (4, 5, 6)],
columns=['x', 'y', 'z'])

This one just returns a data frame that can be operated on.

Looking at your setup, I don't want to say that it's wrong per se (it could
be very appropriate to your specific project to split things up among these
many files) but I would say that it's not idiomatic usage of pytest
fixtures, in my experience. It feels to me like you're jumping through a
lot of hoops to set up something that could be done quite easily and
compactly in conftest.py. I do want to emphasize that there is no
limitation on how fixtures can be used within functions or within other
fixtures (which are also just functions), since the result of the fixture
call is just some Python object.

Hope this helps,
Jerry

On Tue, Feb 9, 2021 at 10:18 AM Mich Talebzadeh 
wrote:

> I was a bit confused with the use of fixtures in Pytest with the
> dataframes passed as an input pipeline from one fixture to another. I wrote
> this after spending some time on it. As usual it is heuristic rather than
> anything overtly by the book so to speak.
>
> In PySpark and PyCharm you can ETTL from Hive to BigQuery or from Oracle
> to Hive etc. However, for PyTest, I decided to use MySql as a database of
> choice for testing with a small sample of data (200 rows). I mentioned
> Fixtures. Simply put "Fixtures are* functions, which will run before each
> test function to which it is applied, to prepare data*. Fixtures are used
> to feed some data to the tests such as database connections". If you have
> ordering like Read data (Extract), do something with it( Transform) and
> save it somewhere (Load), using Spark then these are all happening in
> memory with data frames feeding each other.
>
> The crucial thing to remember is that fixtures pass functions to each
> other as parameters not by invoking them directly!
>
> Example  ## This is correct @pytest.fixture(scope = "session") def
> transformData(readSourceData):  ## fixture passed as parameter # this is
> incorrect (cannot call a fixture in another fixture) read_df =
> readSourceData()  So This operation becomes
>
>  transformation_df = readSourceData. \ select( \ 
>
> Say in PyCharm under tests package, you create a package "fixtures" (just
> a name nothing to do with "fixture") and in there you put your ETL python
> modules that prepare data for you. Example
>
> ### file --> saveData.py @pytest.fixture(scope = "session") def
> saveData(transformData): # Write to test target table try: transformData. \
> write. \ format("jdbc"). \ 
>
>
> You then drive this test by creating a file called *conftest.py *under*
> tests* package. You can then instantiate  your fixture files by
> referencing them in this file as below
>
> import pytest from tests.fixtures.extractHiveData import extractHiveData
> from tests.fixtures.loadIntoMysqlTable import loadIntoMysqlTable from
> tests.fixtures.readSavedData import readSavedData from
> tests.fixtures.readSourceData import readSourceData from
> tests.fixtures.transformData import transformData from
> tests.fixtures.saveData import saveData from tests.fixtures.readSavedData
> import readSavedData
>
> Then you have your test Python file say *test_oracle.py* under package
> tests and then put assertions there
>
> import pytest from src.config import ctest
> @pytest.mark.usefixtures("extractHiveData") def
> test_extract(extractHiveData): assert extractHiveData.count() > 0
> @pytest.mark.usefixtures("loadIntoMysqlTable") def
> test_loadIntoMysqlTable(loadIntoMysqlTable): assert loadIntoMysqlTable
> @py

Re: Calling HTTP Rest APIs from Spark Job

2020-05-14 Thread Jerry Vinokurov
I believe that if you do this within the context of an operation that is
already parallelized such as a map, the work will be distributed to
executors and they will do it in parallel. I could be wrong about this as I
never investigated this specific use case, though.

On Thu, May 14, 2020 at 5:24 PM Chetan Khatri 
wrote:

> Thanks for the quick response.
>
> I am curious to know whether would it be parallel pulling data for 100+
> HTTP request or it will only go on Driver node? the post body would be part
> of DataFrame. Think as I have a data frame of employee_id, employee_name
> now the http GET call has to be made for each employee_id and DataFrame is
> dynamic for each spark job run.
>
> Does it make sense?
>
> Thanks
>
>
> On Thu, May 14, 2020 at 5:12 PM Jerry Vinokurov 
> wrote:
>
>> Hi Chetan,
>>
>> You can pretty much use any client to do this. When I was using Spark at
>> a previous job, we used OkHttp, but I'm sure there are plenty of others. In
>> our case, we had a startup phase in which we gathered metadata via a REST
>> API and then broadcast it to the workers. I think if you need all the
>> workers to have access to whatever you're getting from the API, that's the
>> way to do it.
>>
>> Jerry
>>
>> On Thu, May 14, 2020 at 5:03 PM Chetan Khatri <
>> chetan.opensou...@gmail.com> wrote:
>>
>>> Hi Spark Users,
>>>
>>> How can I invoke the Rest API call from Spark Code which is not only
>>> running on Spark Driver but distributed / parallel?
>>>
>>> Spark with Scala is my tech stack.
>>>
>>> Thanks
>>>
>>>
>>>
>>
>> --
>> http://www.google.com/profiles/grapesmoker
>>
>

-- 
http://www.google.com/profiles/grapesmoker


Re: Calling HTTP Rest APIs from Spark Job

2020-05-14 Thread Jerry Vinokurov
Hi Chetan,

You can pretty much use any client to do this. When I was using Spark at a
previous job, we used OkHttp, but I'm sure there are plenty of others. In
our case, we had a startup phase in which we gathered metadata via a REST
API and then broadcast it to the workers. I think if you need all the
workers to have access to whatever you're getting from the API, that's the
way to do it.

Jerry

On Thu, May 14, 2020 at 5:03 PM Chetan Khatri 
wrote:

> Hi Spark Users,
>
> How can I invoke the Rest API call from Spark Code which is not only
> running on Spark Driver but distributed / parallel?
>
> Spark with Scala is my tech stack.
>
> Thanks
>
>
>

-- 
http://www.google.com/profiles/grapesmoker


Re: Any way to make catalyst optimise away join

2019-11-29 Thread Jerry Vinokurov
This seems like a suboptimal situation for a join. How can Spark know in
advance that all the fields are present and the tables have the same number
of rows? I suppose you could just sort the two frames by id and concatenate
them, but I'm not sure what join optimization is available here.

On Fri, Nov 29, 2019, 4:51 AM jelmer  wrote:

> I have 2 dataframes , lets call them A and B,
>
> A is made up out of [unique_id, field1]
> B is made up out of [unique_id, field2]
>
> The have the exact same number of rows, and every id in A is also present
> in B
>
> if I execute a join like this A.join(B,
> Seq("unique_id")).select($"unique_id", $"field1") then spark will do an
> expensive join even though it does not have to because all the fields it
> needs are in A. is there some trick I can use so that catalyst will
> optimise this join away ?
>


Re: Using Percentile in Spark SQL

2019-11-11 Thread Jerry Vinokurov
I don't think the Spark configuration is what you want to focus on. It's
hard to say without knowing the specifics of the job or the data volume,
but you should be able to accomplish this with the percent_rank function in
SparkSQL and a smart partitioning of the data. If your data has a lot of
skew, you can end up with a situation in which some executors are waiting
around to do work while others are stuck with processing larger partitions,
so you'll need to take a look at the actual stats of your data and figure
out if there's a more efficient partitioning strategy that you can use.

On Mon, Nov 11, 2019 at 10:34 AM Tzahi File  wrote:

> Currently, I'm using the percentile approx function with Hive.
> I'm looking for a better way to run this function or another way to get
> the same result with spark, but faster and not using gigantic instances..
>
> I'm trying to optimize this job by changing the Spark configuration. If
> you have any ideas how to approach this, it would be great (like instance
> type, number of instances, number of executers etc.)
>
>
> On Mon, Nov 11, 2019 at 5:16 PM Patrick McCarthy 
> wrote:
>
>> Depending on your tolerance for error you could also use
>> percentile_approx().
>>
>> On Mon, Nov 11, 2019 at 10:14 AM Jerry Vinokurov 
>> wrote:
>>
>>> Do you mean that you are trying to compute the percent rank of some
>>> data? You can use the SparkSQL percent_rank function for that, but I don't
>>> think that's going to give you any improvement over calling the percentRank
>>> function on the data frame. Are you currently using a user-defined function
>>> for this task? Because I bet that's what's slowing you down.
>>>
>>> On Mon, Nov 11, 2019 at 9:46 AM Tzahi File 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Currently, I'm using hive huge cluster(m5.24xl * 40 workers) to run a
>>>> percentile function. I'm trying to improve this job by moving it to run
>>>> with spark SQL.
>>>>
>>>> Any suggestions on how to use a percentile function in Spark?
>>>>
>>>>
>>>> Thanks,
>>>> --
>>>> Tzahi File
>>>> Data Engineer
>>>> [image: ironSource] <http://www.ironsrc.com/>
>>>>
>>>> email tzahi.f...@ironsrc.com
>>>> mobile +972-546864835
>>>> fax +972-77-5448273
>>>> ironSource HQ - 121 Derech Menachem Begin st. Tel Aviv
>>>> ironsrc.com <http://www.ironsrc.com/>
>>>> [image: linkedin] <https://www.linkedin.com/company/ironsource>[image:
>>>> twitter] <https://twitter.com/ironsource>[image: facebook]
>>>> <https://www.facebook.com/ironSource>[image: googleplus]
>>>> <https://plus.google.com/+ironsrc>
>>>> This email (including any attachments) is for the sole use of the
>>>> intended recipient and may contain confidential information which may be
>>>> protected by legal privilege. If you are not the intended recipient, or the
>>>> employee or agent responsible for delivering it to the intended recipient,
>>>> you are hereby notified that any use, dissemination, distribution or
>>>> copying of this communication and/or its content is strictly prohibited. If
>>>> you are not the intended recipient, please immediately notify us by reply
>>>> email or by telephone, delete this email and destroy any copies. Thank you.
>>>>
>>>
>>>
>>> --
>>> http://www.google.com/profiles/grapesmoker
>>>
>>
>>
>> --
>>
>>
>> *Patrick McCarthy  *
>>
>> Senior Data Scientist, Machine Learning Engineering
>>
>> Dstillery
>>
>> 470 Park Ave South, 17th Floor, NYC 10016
>>
>
>
> --
> Tzahi File
> Data Engineer
> [image: ironSource] <http://www.ironsrc.com/>
>
> email tzahi.f...@ironsrc.com
> mobile +972-546864835
> fax +972-77-5448273
> ironSource HQ - 121 Derech Menachem Begin st. Tel Aviv
> ironsrc.com <http://www.ironsrc.com/>
> [image: linkedin] <https://www.linkedin.com/company/ironsource>[image:
> twitter] <https://twitter.com/ironsource>[image: facebook]
> <https://www.facebook.com/ironSource>[image: googleplus]
> <https://plus.google.com/+ironsrc>
> This email (including any attachments) is for the sole use of the intended
> recipient and may contain confidential information which may be protected
> by legal privilege. If you are not the intended recipient, or the employee
> or agent responsible for delivering it to the intended recipient, you are
> hereby notified that any use, dissemination, distribution or copying of
> this communication and/or its content is strictly prohibited. If you are
> not the intended recipient, please immediately notify us by reply email or
> by telephone, delete this email and destroy any copies. Thank you.
>


-- 
http://www.google.com/profiles/grapesmoker


Re: Using Percentile in Spark SQL

2019-11-11 Thread Jerry Vinokurov
Do you mean that you are trying to compute the percent rank of some data?
You can use the SparkSQL percent_rank function for that, but I don't think
that's going to give you any improvement over calling the percentRank
function on the data frame. Are you currently using a user-defined function
for this task? Because I bet that's what's slowing you down.

On Mon, Nov 11, 2019 at 9:46 AM Tzahi File  wrote:

> Hi,
>
> Currently, I'm using hive huge cluster(m5.24xl * 40 workers) to run a
> percentile function. I'm trying to improve this job by moving it to run
> with spark SQL.
>
> Any suggestions on how to use a percentile function in Spark?
>
>
> Thanks,
> --
> Tzahi File
> Data Engineer
> [image: ironSource] 
>
> email tzahi.f...@ironsrc.com
> mobile +972-546864835
> fax +972-77-5448273
> ironSource HQ - 121 Derech Menachem Begin st. Tel Aviv
> ironsrc.com 
> [image: linkedin] [image:
> twitter] [image: facebook]
> [image: googleplus]
> 
> This email (including any attachments) is for the sole use of the intended
> recipient and may contain confidential information which may be protected
> by legal privilege. If you are not the intended recipient, or the employee
> or agent responsible for delivering it to the intended recipient, you are
> hereby notified that any use, dissemination, distribution or copying of
> this communication and/or its content is strictly prohibited. If you are
> not the intended recipient, please immediately notify us by reply email or
> by telephone, delete this email and destroy any copies. Thank you.
>


-- 
http://www.google.com/profiles/grapesmoker


Re: intermittent Kryo serialization failures in Spark

2019-09-25 Thread Jerry Vinokurov
Hi Julien,

Thanks for the suggestion. If we don't do a broadcast, that would
presumably affect the performance of the job, as the model that is failing
to be broadcast is something that we need to be shared across the cluster.
But it may be worth it if the trade-off is not having things run properly.
Vadim's suggestions did not make a difference for me (still hitting this
error several times a day) but I'll try with disabling broadcast and see if
that does anything.

thanks,
Jerry

On Fri, Sep 20, 2019 at 10:00 AM Julien Laurenceau <
julien.laurenc...@pepitedata.com> wrote:

> Hi,
> Did you try without the broadcast ?
> Regards
> JL
>
> Le jeu. 19 sept. 2019 à 06:41, Vadim Semenov 
> a écrit :
>
>> Pre-register your classes:
>>
>> ```
>> import com.esotericsoftware.kryo.Kryo
>> import org.apache.spark.serializer.KryoRegistrator
>>
>> class MyKryoRegistrator extends KryoRegistrator {
>>   override def registerClasses(kryo: Kryo): Unit = {
>> kryo.register(Class.forName("[[B")) // byte[][]
>> kryo.register(classOf[java.lang.Class[_]])
>>   }
>> }
>> ```
>>
>> then run with
>>
>> 'spark.kryo.referenceTracking': 'false',
>> 'spark.kryo.registrationRequired': 'false',
>> 'spark.kryo.registrator': 'com.datadog.spark.MyKryoRegistrator',
>> 'spark.kryo.unsafe': 'false',
>> 'spark.kryoserializer.buffer.max': '256m',
>>
>> On Tue, Sep 17, 2019 at 10:38 AM Jerry Vinokurov 
>> wrote:
>>
>>> Hi folks,
>>>
>>> Posted this some time ago but the problem continues to bedevil us. I'm
>>> including a (slightly edited) stack trace that results from this error. If
>>> anyone can shed any light on what exactly is happening here and what we can
>>> do to avoid it, that would be much appreciated.
>>>
>>> org.apache.spark.SparkException: Failed to register classes with Kryo
>>>>at 
>>>> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:140)
>>>>at 
>>>> org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:324)
>>>>at 
>>>> org.apache.spark.serializer.KryoSerializerInstance.(KryoSerializer.scala:309)
>>>>at 
>>>> org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:218)
>>>>at 
>>>> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:288)
>>>>at 
>>>> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127)
>>>>at 
>>>> org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:88)
>>>>at 
>>>> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
>>>>at 
>>>> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
>>>>at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1489)
>>>>at 
>>>> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:103)
>>>>at 
>>>> org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129)
>>>>at 
>>>> org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165)
>>>>at 
>>>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:309)
>>>>at 
>>>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:305)
>>>>at 
>>>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327)
>>>>at 
>>>> org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
>>>>at 
>>>> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>>>>at 
>>>> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>>>>at 
>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>>at 
>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>>at 
>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>>>at 
>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperati

Re: intermittent Kryo serialization failures in Spark

2019-09-18 Thread Jerry Vinokurov
Hi Vadim,

Thanks for your suggestion. We do preregister the classes, like so:

object KryoRegistrar {
>
>   val classesToRegister: Array[Class[_]] = Array(
> classOf[MyModel],
>[etc]
> ) }
>

And then we do:

val sparkConf = new SparkConf()
>   .registerKryoClasses(KryoRegistrar.classesToRegister)
>

 I notice that this is a bit different from your code and I'm wondering
whether there's any functional difference or if these are two ways to get
to the same end. Our code is directly adapted from the Spark documentation
on how to use the Kryo serializer but maybe there's some subtlety here that
I'm missing.

With regard to the settings, it looks like we currently have the default
settings, which is to say that referenceTracking is true,
registrationRequired is false, unsafe is false, and buffer.max is 64m (none
of our objects are anywhere near that size but... who knows). I will try it
with your suggestions and see if it solves the problem.

thanks,
Jerry

On Tue, Sep 17, 2019 at 4:34 PM Vadim Semenov  wrote:

> Pre-register your classes:
>
> ```
> import com.esotericsoftware.kryo.Kryo
> import org.apache.spark.serializer.KryoRegistrator
>
> class MyKryoRegistrator extends KryoRegistrator {
>   override def registerClasses(kryo: Kryo): Unit = {
> kryo.register(Class.forName("[[B")) // byte[][]
> kryo.register(classOf[java.lang.Class[_]])
>   }
> }
> ```
>
> then run with
>
> 'spark.kryo.referenceTracking': 'false',
> 'spark.kryo.registrationRequired': 'false',
> 'spark.kryo.registrator': 'com.datadog.spark.MyKryoRegistrator',
> 'spark.kryo.unsafe': 'false',
> 'spark.kryoserializer.buffer.max': '256m',
>
> On Tue, Sep 17, 2019 at 10:38 AM Jerry Vinokurov 
> wrote:
>
>> Hi folks,
>>
>> Posted this some time ago but the problem continues to bedevil us. I'm
>> including a (slightly edited) stack trace that results from this error. If
>> anyone can shed any light on what exactly is happening here and what we can
>> do to avoid it, that would be much appreciated.
>>
>> org.apache.spark.SparkException: Failed to register classes with Kryo
>>> at 
>>> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:140)
>>> at 
>>> org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:324)
>>> at 
>>> org.apache.spark.serializer.KryoSerializerInstance.(KryoSerializer.scala:309)
>>> at 
>>> org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:218)
>>> at 
>>> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:288)
>>> at 
>>> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127)
>>> at 
>>> org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:88)
>>> at 
>>> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
>>> at 
>>> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
>>> at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1489)
>>> at 
>>> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:103)
>>> at 
>>> org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129)
>>> at 
>>> org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165)
>>> at 
>>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:309)
>>> at 
>>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:305)
>>> at 
>>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327)
>>> at 
>>> org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
>>> at 
>>> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>>> at 
>>> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>>> at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>> at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>> at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>> at 
>>> org.apache.spark.rdd.RDDOpera

Re: intermittent Kryo serialization failures in Spark

2019-09-17 Thread Jerry Vinokurov
nonfun$execute$1.apply(SparkPlan.scala:127)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>   at 
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
>   at 
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
>   at 
> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
>   at 
> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
>   at 
> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
>   at 
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
>   at 
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
>   at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
>   at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
>   at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:664)
> [our code that writes data to CSV]
> Caused by: java.lang.ClassNotFoundException: com.mycompany.models.MyModel
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:348)
>   at 
> org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>   at 
> org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at 
> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:132)
>   ... 132 more
>
>
On Wed, Jul 10, 2019 at 12:50 PM Jerry Vinokurov 
wrote:

> Hi all,
>
> I am experiencing a strange intermittent failure of my Spark job that
> results from serialization issues in Kryo. Here is the stack trace:
>
> Caused by: java.lang.ClassNotFoundException: com.mycompany.models.MyModel
>>  at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>  at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>>  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>  at java.lang.Class.forName0(Native Method)
>>  at java.lang.Class.forName(Class.java:348)
>>  at 
>> org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>>  at 
>> org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>>  at 
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>>  at 
>> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:132)
>>  ... 204 more
>>
>> (I've edited the company and model name since this is proprietary code)
>
> This error does not surface every time the job is run; I would say it
> probably shows up once in every 10 runs or so, and there isn't anything
> about the input data that triggers this, as I've been able to
> (nondeterministically) reproduce the error by simply rerunning the job with
> the same inputs over and over again. The model itself is just a plain Scala
> case class whose fields are strings and integers, so there's no custom
> serialization logic or anything like that. As I understand, this is seems
> related to an issue previously documented here
> <https://issues.apache.org/jira/browse/SPARK-21928>but allegedly this was
> fixed long ago. I'm running this job on an AWS EMR cluster and have
> confirmed that the version of Spark running there is 2.4.0, with the patch
> that is linked in the above issue being part of the code.
>
> A suggested solution has been to set the extraClasspath config settings on
> the driver and executor, but that has not fixed the problem. I'm out of
> ideas for how to tackle this and would love to hear if anyone has any
> suggestions or strategies for fixing this.
>
> thanks,
> Jerry
>
> --
> http://www.google.com/profiles/grapesmoker
>


-- 
http://www.google.com/profiles/grapesmoker


Low cache hit ratio when running Spark on Alluxio

2019-08-28 Thread Jerry Yan
 Hi,

We are running Spark jobs on an Alluxio Cluster which is serving 13
gigabytes of data with 99% of the data is in memory. I was hoping to speed
up the Spark jobs by reading the in-memory data in Alluxio, but found
Alluxio local hit rate is only 1.68%, while Alluxio remote hit rate is
98.32%. By monitoring the network IO across all worker nodes through
"dstat" command, I found that only two nodes had about 1GB of recv or send
in the whole precessand, and it is sending  1GB or receiving 1GB during
Spark Shuffle Stage. Is there any metrics I could check or configuration to
tune ?


Best,

Jerry


Re: [Beginner] Run compute on large matrices and return the result in seconds?

2019-07-17 Thread Jerry Vinokurov
Maybe I'm not understanding something about this use case, but why is
precomputation not an option? Is it because the matrices themselves change?
Because if the matrices are constant, then I think precomputation would
work for you even if the users request random correlations. You can just
store the resulting column with the matrix id, row, and column as the key
for retrieval.

My general impression is that while you could do this in Spark, it's
probably not the correct framework for carrying out this kind of operation.
This feels more like a job for something like OpenMP than for Spark.


On Wed, Jul 17, 2019 at 3:42 PM Gautham Acharya 
wrote:

> As I said in the my initial message, precomputing is not an option.
>
>
>
> Retrieving only the top/bottom N most correlated is an option – would that
> speed up the results?
>
>
>
> Our SLAs are soft – slight variations (+- 15 seconds) will not cause
> issues.
>
>
>
> --gautham
>
> *From:* Patrick McCarthy [mailto:pmccar...@dstillery.com]
> *Sent:* Wednesday, July 17, 2019 12:39 PM
> *To:* Gautham Acharya 
> *Cc:* Bobby Evans ; Steven Stetzler <
> steven.stetz...@gmail.com>; user@spark.apache.org
> *Subject:* Re: [Beginner] Run compute on large matrices and return the
> result in seconds?
>
>
>
> *CAUTION:* This email originated from outside the Allen Institute. Please
> do not click links or open attachments unless you've validated the sender
> and know the content is safe.
> --
>
> Do you really need the results of all 3MM computations, or only the top-
> and bottom-most correlation coefficients? Could correlations be computed on
> a sample and from that estimate a distribution of coefficients? Would it
> make sense to precompute offline and instead focus on fast key-value
> retrieval, like ElasticSearch or ScyllaDB?
>
>
>
> Spark is a compute framework rather than a serving backend, I don't think
> it's designed with retrieval SLAs in mind and you may find those SLAs
> difficult to maintain.
>
>
>
> On Wed, Jul 17, 2019 at 3:14 PM Gautham Acharya <
> gauth...@alleninstitute.org> wrote:
>
> Thanks for the reply, Bobby.
>
>
>
> I’ve received notice that we can probably tolerate response times of up to
> 30 seconds. Would this be more manageable? 5 seconds was an initial ask,
> but 20-30 seconds is also a reasonable response time for our use case.
>
>
>
> With the new SLA, do you think that we can easily perform this computation
> in spark?
>
> --gautham
>
>
>
> *From:* Bobby Evans [mailto:reva...@gmail.com]
> *Sent:* Wednesday, July 17, 2019 7:06 AM
> *To:* Steven Stetzler 
> *Cc:* Gautham Acharya ; user@spark.apache.org
> *Subject:* Re: [Beginner] Run compute on large matrices and return the
> result in seconds?
>
>
>
> *CAUTION:* This email originated from outside the Allen Institute. Please
> do not click links or open attachments unless you've validated the sender
> and know the content is safe.
> --
>
> Let's do a few quick rules of thumb to get an idea of what kind of
> processing power you will need in general to do what you want.
>
>
>
> You need 3,000,000 ints by 50,000 rows.  Each int is 4 bytes so that ends
> up being about 560 GB that you need to fully process in 5 seconds.
>
>
>
> If you are reading this from spinning disks (which average about 80 MB/s)
> you would need at least 1,450 disks to just read the data in 5 seconds
> (that number can vary a lot depending on the storage format and your
> compression ratio).
>
> If you are reading the data over a network (let's say 10GigE even though
> in practice you cannot get that in the cloud easily) you would need about
> 90 NICs just to read the data in 5 seconds, again depending on the
> compression ration this may be lower.
>
> If you assume you have a cluster where it all fits in main memory and have
> cached all of the data in memory (which in practice I have seen on most
> modern systems at somewhere between 12 and 16 GB/sec) you would need
> between 7 and 10 machines just to read through the data once in 5 seconds.
> Spark also stores cached data compressed so you might need less as well.
>
>
>
> All the numbers fit with things that spark should be able to handle, but a
> 5 second SLA is very tight for this amount of data.
>
>
>
> Can you make this work with Spark?  probably. Does spark have something
> built in that will make this fast and simple for you?  I doubt it you have
> some very tight requirements and will likely have to write something custom
> to make it work the way you want.
>
>
>
>
>
> On Thu, Jul 11, 2019 at 4:12 PM Steven Stetzler 
> wrote:
>
> Hi Gautham,
>
>
>
> I am a beginner spark user too and I may not have a complete understanding
> of your question, but I thought I would start a discussion anyway. Have you
> looked into using Spark's built in Correlation function? (
> https://spark.apache.org/docs/latest/ml-statistics.html
> 

Re: Spark Newbie question

2019-07-11 Thread Jerry Vinokurov
Hi Ajay,

When a Spark SQL statement references a table, that table has to be
"registered" first. Usually the way this is done is by reading in a
DataFrame, then calling the createOrReplaceTempView (or one of a few other
functions) on that data frame, with the argument being the name under which
you'd like to register that table. You can then use the table in SQL
statements. As far as I know, you cannot directly refer to any external
data store without reading it in first.

Jerry

On Thu, Jul 11, 2019 at 1:27 PM infa elance  wrote:

> Sorry, i guess i hit the send button too soon
>
> This question is regarding a spark stand-alone cluster. My understanding
> is spark is an execution engine and not a storage layer.
> Spark processes data in memory but when someone refers to a spark table
> created through sparksql(df/rdd) what exactly are they referring to?
>
> Could it be a Hive table? If yes, is it the same hive store that spark
> uses?
> Is it a table in memory? If yes, how can an external app access this
> in-memory table? if JDBC what driver ?
>
> On a databricks cluster -- could they be referring spark table created
> through sparksql(df/rdd) as hive or deltalake table?
>
> Spark version with hadoop : spark-2.0.2-bin-hadoop2.7
>
> Thanks and appreciate your help!!
> Ajay.
>
>
>
> On Thu, Jul 11, 2019 at 12:19 PM infa elance 
> wrote:
>
>> This is stand-alone spark cluster. My understanding is spark is an
>> execution engine and not a storage layer.
>> Spark processes data in memory but when someone refers to a spark table
>> created through sparksql(df/rdd) what exactly are they referring to?
>>
>> Could it be a Hive table? If yes, is it the same hive store that spark
>> uses?
>> Is it a table in memory? If yes, how can an external app
>>
>> Spark version with hadoop : spark-2.0.2-bin-hadoop2.7
>>
>> Thanks and appreciate your help!!
>> Ajay.
>>
>

-- 
http://www.google.com/profiles/grapesmoker


intermittent Kryo serialization failures in Spark

2019-07-10 Thread Jerry Vinokurov
Hi all,

I am experiencing a strange intermittent failure of my Spark job that
results from serialization issues in Kryo. Here is the stack trace:

Caused by: java.lang.ClassNotFoundException: com.mycompany.models.MyModel
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:348)
>   at 
> org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>   at 
> org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at 
> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:132)
>   ... 204 more
>
> (I've edited the company and model name since this is proprietary code)

This error does not surface every time the job is run; I would say it
probably shows up once in every 10 runs or so, and there isn't anything
about the input data that triggers this, as I've been able to
(nondeterministically) reproduce the error by simply rerunning the job with
the same inputs over and over again. The model itself is just a plain Scala
case class whose fields are strings and integers, so there's no custom
serialization logic or anything like that. As I understand, this is seems
related to an issue previously documented here
<https://issues.apache.org/jira/browse/SPARK-21928>but allegedly this was
fixed long ago. I'm running this job on an AWS EMR cluster and have
confirmed that the version of Spark running there is 2.4.0, with the patch
that is linked in the above issue being part of the code.

A suggested solution has been to set the extraClasspath config settings on
the driver and executor, but that has not fixed the problem. I'm out of
ideas for how to tackle this and would love to hear if anyone has any
suggestions or strategies for fixing this.

thanks,
Jerry

-- 
http://www.google.com/profiles/grapesmoker


Re: Dataset Question: No Encoder found for Set[(scala.Long, scala.Long)]

2017-02-01 Thread Jerry Lam
Hi Koert,

Thank you for your help! GOT IT!

Best Regards,

Jerry

On Wed, Feb 1, 2017 at 6:24 PM, Koert Kuipers <ko...@tresata.com> wrote:

> you can still use it as Dataset[Set[X]]. all transformations should work
> correctly.
>
> however dataset.schema will show binary type, and dataset.show will show
> bytes (unfortunately).
>
> for example:
>
> scala> implicit def setEncoder[X]: Encoder[Set[X]] = Encoders.kryo[Set[X]]
> setEncoder: [X]=> org.apache.spark.sql.Encoder[Set[X]]
>
> scala> val x = Seq(Set(1,2,3)).toDS
> x: org.apache.spark.sql.Dataset[scala.collection.immutable.Set[Int]] =
> [value: binary]
>
> scala> x.map(_ + 4).collect
> res17: Array[scala.collection.immutable.Set[Int]] = Array(Set(1, 2, 3, 4))
>
> scala> x.show
> ++
> |   value|
> ++
> |[2A 01 03 02 02 0...|
> ++
>
>
> scala> x.schema
> res19: org.apache.spark.sql.types.StructType =
> StructType(StructField(value,BinaryType,true))
>
>
> On Wed, Feb 1, 2017 at 12:03 PM, Jerry Lam <chiling...@gmail.com> wrote:
>
>> Hi Koert,
>>
>> Thanks for the tips. I tried to do that but the column's type is now
>> Binary. Do I get the Set[X] back in the Dataset?
>>
>> Best Regards,
>>
>> Jerry
>>
>>
>> On Tue, Jan 31, 2017 at 8:04 PM, Koert Kuipers <ko...@tresata.com> wrote:
>>
>>> set is currently not supported. you can use kryo encoder. there is no
>>> other work around that i know of.
>>>
>>> import org.apache.spark.sql.{ Encoder, Encoders }
>>> implicit def setEncoder[X]: Encoder[Set[X]] = Encoders.kryo[Set[X]]
>>>
>>> On Tue, Jan 31, 2017 at 7:33 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>>
>>>> Hi guys,
>>>>
>>>> I got an exception like the following, when I tried to implement a user
>>>> defined aggregation function.
>>>>
>>>>  Exception in thread "main" java.lang.UnsupportedOperationException:
>>>> No Encoder found for Set[(scala.Long, scala.Long)]
>>>>
>>>> The Set[(Long, Long)] is a field in the case class which is the output
>>>> type for the aggregation.
>>>>
>>>> Is there a workaround for this?
>>>>
>>>> Best Regards,
>>>>
>>>> Jerry
>>>>
>>>
>>>
>>
>


using withWatermark on Dataset

2017-02-01 Thread Jerry Lam
Hi everyone,

Anyone knows how to use withWatermark  on Dataset?

I have tried the following but hit this exception:

dataset org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
cannot be cast to "MyType"

The code looks like the following:

dataset
.withWatermark("timestamp", "5 seconds")
.groupBy("timestamp", "customer_id")
.agg(MyAggregator)
.writeStream

Where dataset has MyType for each row.
Where MyType is:
case class MyTpe(customer_id: Long, timestamp: Timestamp, product_id: Long)

MyAggregator which takes MyType as the input type did some maths on the
product_id and outputs a set of product_ids.

Best Regards,

Jerry


Re: Dataset Question: No Encoder found for Set[(scala.Long, scala.Long)]

2017-02-01 Thread Jerry Lam
Hi Koert,

Thanks for the tips. I tried to do that but the column's type is now
Binary. Do I get the Set[X] back in the Dataset?

Best Regards,

Jerry


On Tue, Jan 31, 2017 at 8:04 PM, Koert Kuipers <ko...@tresata.com> wrote:

> set is currently not supported. you can use kryo encoder. there is no
> other work around that i know of.
>
> import org.apache.spark.sql.{ Encoder, Encoders }
> implicit def setEncoder[X]: Encoder[Set[X]] = Encoders.kryo[Set[X]]
>
> On Tue, Jan 31, 2017 at 7:33 PM, Jerry Lam <chiling...@gmail.com> wrote:
>
>> Hi guys,
>>
>> I got an exception like the following, when I tried to implement a user
>> defined aggregation function.
>>
>>  Exception in thread "main" java.lang.UnsupportedOperationException: No
>> Encoder found for Set[(scala.Long, scala.Long)]
>>
>> The Set[(Long, Long)] is a field in the case class which is the output
>> type for the aggregation.
>>
>> Is there a workaround for this?
>>
>> Best Regards,
>>
>> Jerry
>>
>
>


Dataset Question: No Encoder found for Set[(scala.Long, scala.Long)]

2017-01-31 Thread Jerry Lam
Hi guys,

I got an exception like the following, when I tried to implement a user
defined aggregation function.

 Exception in thread "main" java.lang.UnsupportedOperationException: No
Encoder found for Set[(scala.Long, scala.Long)]

The Set[(Long, Long)] is a field in the case class which is the output type
for the aggregation.

Is there a workaround for this?

Best Regards,

Jerry


Re: Collaborative Filtering Implicit Feedback Impl.

2016-12-05 Thread Jerry Lam
Hi Sean,

I agree there is no need for that if the implementation actually assigns
c=1 for all missing ratings but from the current implementation of ALS, I
don't think it is doing that.
The idea is that for missing ratings, they are assigned to c=1 (in the
paper) and they do contribute to the optimization of equation (3).

The lines of code that I'm referring to is:

{code}
if (implicitPrefs) {
  // Extension to the original paper to handle b < 0.
confidence is a function of |b|
  // instead so that it is never negative. c1 is confidence -
1.0.
  val c1 = alpha * math.abs(rating)
  // For rating <= 0, the corresponding preference is 0. So the
term below is only added
  // for rating > 0. Because YtY is already added, we need to
adjust the scaling here.
  if (rating > 0) {
numExplicits += 1
ls.add(srcFactor, (c1 + 1.0) / c1, c1)
  }
} else {
  ls.add(srcFactor, rating)
  numExplicits += 1
}
{code}

Regards,

Jerry


On Mon, Dec 5, 2016 at 3:27 PM, Sean Owen <so...@cloudera.com> wrote:

> That doesn't mean this 0 value is literally included in the input. There's
> no need for that.
>
> On Tue, Dec 6, 2016 at 4:24 AM Jerry Lam <chiling...@gmail.com> wrote:
>
>> Hi Sean,
>>
>> I'm referring to the paper (http://yifanhu.net/PUB/cf.pdf) Section 2:
>> " However, with implicit feedback it would be natural to assign values to
>> all rui variables. If no action was observed rui is set to zero, thus
>> meaning in our examples zero watching time, or zero purchases on record."
>>
>> In the implicit setting, apparently there should have values for all
>> pairs (u, i) instead of just the observed ones according to the paper. This
>> is also true for other implicit feedback papers I read.
>>
>> In section 4, when r=0, p=0 BUT c=1. Therefore, when we optimize the
>> value for this pair. (x^Ty)^2 + regularization.
>>
>> Do I misunderstand the paper?
>>
>> Best Regards,
>>
>> Jerry
>>
>>
>> On Mon, Dec 5, 2016 at 2:43 PM, Sean Owen <so...@cloudera.com> wrote:
>>
>> What are you referring to in what paper? implicit input would never
>> materialize 0s for missing values.
>>
>> On Tue, Dec 6, 2016 at 3:42 AM Jerry Lam <chiling...@gmail.com> wrote:
>>
>> Hello spark users and developers,
>>
>> I read the paper from Yahoo about CF with implicit feedback and other
>> papers using implicit feedbacks. Their implementation require to set the
>> missing rating with 0. That is for unobserved ratings, the confidence for
>> those is set to 1 (c=1). Therefore, the matrix to be factorized is a dense
>> matrix.
>>
>> I read the source code of the ALS implementation in spark (version 1.6.x)
>> for implicit feedback. Apparently, it ignores rating that is 0 (Line 1159
>> in ALS.scala). It could be a mistake or it could be an optimization. Just
>> want to see if anyone steps on this yet.
>>
>> Best Regards,
>>
>> Jerry
>>
>>
>>


Re: Collaborative Filtering Implicit Feedback Impl.

2016-12-05 Thread Jerry Lam
Hi Sean,

I'm referring to the paper (http://yifanhu.net/PUB/cf.pdf) Section 2:
" However, with implicit feedback it would be natural to assign values to
all rui variables. If no action was observed rui is set to zero, thus
meaning in our examples zero watching time, or zero purchases on record."

In the implicit setting, apparently there should have values for all pairs
(u, i) instead of just the observed ones according to the paper. This is
also true for other implicit feedback papers I read.

In section 4, when r=0, p=0 BUT c=1. Therefore, when we optimize the value
for this pair. (x^Ty)^2 + regularization.

Do I misunderstand the paper?

Best Regards,

Jerry


On Mon, Dec 5, 2016 at 2:43 PM, Sean Owen <so...@cloudera.com> wrote:

> What are you referring to in what paper? implicit input would never
> materialize 0s for missing values.
>
> On Tue, Dec 6, 2016 at 3:42 AM Jerry Lam <chiling...@gmail.com> wrote:
>
>> Hello spark users and developers,
>>
>> I read the paper from Yahoo about CF with implicit feedback and other
>> papers using implicit feedbacks. Their implementation require to set the
>> missing rating with 0. That is for unobserved ratings, the confidence for
>> those is set to 1 (c=1). Therefore, the matrix to be factorized is a dense
>> matrix.
>>
>> I read the source code of the ALS implementation in spark (version 1.6.x)
>> for implicit feedback. Apparently, it ignores rating that is 0 (Line 1159
>> in ALS.scala). It could be a mistake or it could be an optimization. Just
>> want to see if anyone steps on this yet.
>>
>> Best Regards,
>>
>> Jerry
>>
>


Collaborative Filtering Implicit Feedback Impl.

2016-12-05 Thread Jerry Lam
Hello spark users and developers,

I read the paper from Yahoo about CF with implicit feedback and other
papers using implicit feedbacks. Their implementation require to set the
missing rating with 0. That is for unobserved ratings, the confidence for
those is set to 1 (c=1). Therefore, the matrix to be factorized is a dense
matrix.

I read the source code of the ALS implementation in spark (version 1.6.x)
for implicit feedback. Apparently, it ignores rating that is 0 (Line 1159
in ALS.scala). It could be a mistake or it could be an optimization. Just
want to see if anyone steps on this yet.

Best Regards,

Jerry


Spark SQL: org.apache.spark.sql.AnalysisException: cannot resolve "some columns" given input columns.

2016-06-07 Thread Jerry Wong
Hi,

Two JSON files but one of them miss some columns, like

{"firstName": "Jack", "lastName": "Nelson"}
{"firstName": "Landy", "middleName": "Ken", "lastName": "Yong"}

slqContext.sql("select firstName as first_name, middleName as middle_name,
lastName as last_name from jsonTable)

But there are an error
org.apache.spark.sql.AnalysisException: cannot resolve 'middleName' given
input columns firstName, lastName;

Can anybody give me your wisdom or any suggestions?

Thanks!
Jerry


Spark SQL Nested Array of JSON with empty field

2016-06-03 Thread Jerry Wong
Hi,

I met a problem of empty field in the nested JSON file with Spark SQL. For
instance,
There are two lines of JSON file as follows,

{
"firstname": "Jack",
"lastname": "Nelson",
"address": {
"state": "New York",
"city": "New York"
}
}{
"firstname": "Landy",
"middlename": "Ken",
"lastname": "Yong",
"address": {
"state": "California",
"city": "Los Angles"
}
}

I use Spark SQL to get the files like,
val row = sqlContext.sql("SELECT firstname, middlename, lastname,
address.state, address.city FROM jsontable")
The compile will tell me the error of line1: no "middlename".
How do I handle this case in the SQL sql?

Many thanks in advance!
Jerry


Re: Missing data in Kafka Consumer

2016-05-05 Thread Jerry
Hi David,

Thank you for your response.
Before inserting to Cassandra, I had checked the data have already missed
at HDFS (My second step is to load data from HDFS and then insert to
Cassandra).

Can you send me the link relating this bug of 0.8.2?

Thank you!
Jerry

On Thu, May 5, 2016 at 12:38 PM, david.lewis [via Apache Spark User List] <
ml-node+s1001560n26888...@n3.nabble.com> wrote:

> It's possible Kafka is throwing an exception and erroneously returning
> acks (there is a known bug in 0.8.2 that I encountered when my harddisk
> that was keeping log files and holding the temporary snappy library was
> full).
> It's also possible that your messages are not unique when they are put
> into cassandra. Are all of your messages unique in they primary keys in
> your cassandra table?
>
> On Thu, May 5, 2016 at 10:18 AM, Jerry [via Apache Spark User List] <[hidden
> email] <http:///user/SendEmail.jtp?type=node=26888=0>> wrote:
>
>> Hi,
>>
>> Does anybody give me an idea why the data is lost at the Kafka Consumer
>> side? I use Kafka 0.8.2 and Spark (streaming) version is 1.5.2. Sometimes,
>> I found out I could not receive the same number of data with Kafka
>> producer. Exp) I sent 1000 data to Kafka Broker via Kafka Producer and
>> confirmed the same number in the Broker. But when I checked either HDFS or
>> Cassandra, the number is just 363. The data is not always lost, just
>> sometimes... That's wired and annoying to me.
>> Can anybody give me some reasons?
>>
>> Thanks!
>> Jerry
>>
>> --
>> If you reply to this email, your message will be added to the discussion
>> below:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Missing-data-in-Kafka-Consumer-tp26887.html
>> To unsubscribe from Apache Spark User List, click here.
>> NAML
>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>
>
>
> --
> -David Lewis
>
> *Blyncsy, Inc.* |www.Blyncsy.com  <http://www.blyncsy.com/>
>
> This email contains confidential commercial information the disclosure of
> which would result in serious competitive and commercial injury.  As such,
> it is a protected record under the Utah Government Records Access
> Management Act.
>
> This message is confidential. It may also be privileged or otherwise
> protected by work product immunity or other legal rules. If you have
> received it by mistake, please let us know by e-mail reply and delete it
> from your system; you may not copy this message or disclose its contents to
> anyone. Please send us by fax any message containing deadlines as incoming
> e-mails are not screened for response deadlines. The integrity and security
> of this message cannot be guaranteed on the Internet.
>
>  P Please consider the environment before printing this email.
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Missing-data-in-Kafka-Consumer-tp26887p26888.html
> To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Missing data in Kafka Consumer, click here
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=26887=amVycnkua2luZzIud29uZ0BnbWFpbC5jb218MjY4ODd8MTYwMzcyMjg3MQ==>
> .
> NAML
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Missing-data-in-Kafka-Consumer-tp26887p26890.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Missing data in Kafka Consumer

2016-05-05 Thread Jerry
Hi,

Does anybody give me an idea why the data is lost at the Kafka Consumer
side? I use Kafka 0.8.2 and Spark (streaming) version is 1.5.2. Sometimes, I
found out I could not receive the same number of data with Kafka producer.
Exp) I sent 1000 data to Kafka Broker via Kafka Producer and confirmed the
same number in the Broker. But when I checked either HDFS or Cassandra, the
number is just 363. The data is not always lost, just sometimes... That's
wired and annoying to me. 
Can anybody give me some reasons? 

Thanks!
Jerry  



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Missing-data-in-Kafka-Consumer-tp26887.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[Spark SQL]: UDF with Array[Double] as input

2016-04-01 Thread Jerry Lam
Hi spark users and developers,

Anyone tried to pass in an Array[Double] as a input to the UDF? I tried it
for many hours reading spark sql code but IK still couldn't figure out a
way to do this.

Best Regards,

Jerry


Re: [Spark SQL] Unexpected Behaviour

2016-03-29 Thread Jerry Lam
Hi guys,

Another point is that if this is unsupported shouldn't it throw an
exception instead of giving the wrong answer? I mean if
d1.join(d2, "id").select(d2("label")) should not work at all, the proper
behaviour is to throw the analysis exception. It now returns a wrong answer
though.

As I said, this is just a tip of iceberg. I have experience worsen than
this. For example, you might think renaming fields will work but in some
cases, it still returns wrong results.

Best Regards,

Jerry

On Tue, Mar 29, 2016 at 7:38 AM, Jerry Lam <chiling...@gmail.com> wrote:

> Hi Divya,
>
> This is not a self-join. d1 and d2 contain totally different rows. They
> are derived from the same table. The transformation that are applied to
> generate d1 and d2 should be able to disambiguate the labels in the
> question.
>
>
> Best Regards,
>
> Jerry
>
>
> On Tue, Mar 29, 2016 at 2:43 AM, Divya Gehlot <divya.htco...@gmail.com>
> wrote:
>
>>
>> def join(right: DataFrame, joinExprs: Column, joinType: String):
>> DataFrame = {
>> // Note that in this function, we introduce a hack in the case of
>> self-join to automatically
>> // resolve ambiguous join conditions into ones that might make sense
>> [SPARK-6231].
>> // Consider this case: df.join(df, df("key") === df("key"))
>> // Since df("key") === df("key") is a trivially true condition, this
>> actually becomes a
>> // cartesian join. However, most likely users expect to perform a self
>> join using "key".
>> // With that assumption, this hack turns the trivially true condition
>> into equality on join
>> // keys that are resolved to both sides.
>> // Trigger analysis so in the case of self-join, the analyzer will clone
>> the plan.
>> // After the cloning, left and right side will have distinct expression
>> ids.
>>
>> On 29 March 2016 at 14:33, Jerry Lam <chiling...@gmail.com> wrote:
>>
>>> Hi guys,
>>>
>>> I have another example to illustrate the issue. I think the problem is
>>> pretty nasty.
>>>
>>> val base = sc.parallelize(( 0 to 49).zip( 0 to 49) ++ (30 to 79).zip(50
>>> to 99)).toDF("id", "label")
>>> val d1 = base.where($"label" < 60)
>>> val d2 = base.where($"label" === 60)
>>> d1.join(d2, "id").show
>>> +---+-+-+
>>> | id|label|label|
>>> +---+-+-+
>>> | 40|   40|   60|
>>> +---+-+-+
>>>
>>> d1.join(d2, "id").select(d1("label")).show
>>> +-+
>>> |label|
>>> +-+
>>> |   40|
>>> +-+
>>> (expected answer: 40, right!)
>>>
>>> d1.join(d2, "id").select(d2("label")).show
>>> +-+
>>> |label|
>>> +-+
>>> |   40|
>>> +-+
>>> (expected answer: 60, wrong!)
>>>
>>> d1.join(d2, "id").select(d2("label")).explain
>>> == Physical Plan ==
>>> TungstenProject [label#15]
>>>  SortMergeJoin [id#14], [id#30]
>>>   TungstenSort [id#14 ASC], false, 0
>>>TungstenExchange hashpartitioning(id#14)
>>> TungstenProject [_1#12 AS id#14,_2#13 AS label#15]
>>>  Filter (_2#13 < 60)
>>>   Scan PhysicalRDD[_1#12,_2#13]
>>>   TungstenSort [id#30 ASC], false, 0
>>>TungstenExchange hashpartitioning(id#30)
>>> TungstenProject [_1#12 AS id#30]
>>>  Filter (_2#13 = 60)
>>>   Scan PhysicalRDD[_1#12,_2#13]
>>>
>>> Again, this is just a tip of the iceberg. I have spent hours to find out
>>> this weird behaviour.
>>>
>>> Best Regards,
>>>
>>> Jerry
>>>
>>>
>>> Best Regards,
>>>
>>> Jerry
>>>
>>> On Tue, Mar 29, 2016 at 2:01 AM, Jerry Lam <chiling...@gmail.com> wrote:
>>>
>>>> Hi Sunitha,
>>>>
>>>> Thank you for the reference Jira. It looks like this is the bug I'm
>>>> hitting. Most of the bugs related to this seems to associate with
>>>> dataframes derived from the one dataframe (base in this case). In SQL, this
>>>> is a self-join and dropping d2.label should not affect d1.label. There are
>>>> other bugs I found these three days that are associated with this type of
>>>> joins. In one case, if I don't drop the duplicate column BEFORE the join,
>>>> spark has preferences on the columns from d2 da

Re: [Spark SQL] Unexpected Behaviour

2016-03-29 Thread Jerry Lam
Hi Divya,

This is not a self-join. d1 and d2 contain totally different rows. They are
derived from the same table. The transformation that are applied to
generate d1 and d2 should be able to disambiguate the labels in the
question.


Best Regards,

Jerry


On Tue, Mar 29, 2016 at 2:43 AM, Divya Gehlot <divya.htco...@gmail.com>
wrote:

>
> def join(right: DataFrame, joinExprs: Column, joinType: String): DataFrame
> = {
> // Note that in this function, we introduce a hack in the case of
> self-join to automatically
> // resolve ambiguous join conditions into ones that might make sense
> [SPARK-6231].
> // Consider this case: df.join(df, df("key") === df("key"))
> // Since df("key") === df("key") is a trivially true condition, this
> actually becomes a
> // cartesian join. However, most likely users expect to perform a self
> join using "key".
> // With that assumption, this hack turns the trivially true condition into
> equality on join
> // keys that are resolved to both sides.
> // Trigger analysis so in the case of self-join, the analyzer will clone
> the plan.
> // After the cloning, left and right side will have distinct expression
> ids.
>
> On 29 March 2016 at 14:33, Jerry Lam <chiling...@gmail.com> wrote:
>
>> Hi guys,
>>
>> I have another example to illustrate the issue. I think the problem is
>> pretty nasty.
>>
>> val base = sc.parallelize(( 0 to 49).zip( 0 to 49) ++ (30 to 79).zip(50
>> to 99)).toDF("id", "label")
>> val d1 = base.where($"label" < 60)
>> val d2 = base.where($"label" === 60)
>> d1.join(d2, "id").show
>> +---+-+-+
>> | id|label|label|
>> +---+-+-+
>> | 40|   40|   60|
>> +---+-+-+
>>
>> d1.join(d2, "id").select(d1("label")).show
>> +-+
>> |label|
>> +-+
>> |   40|
>> +-+
>> (expected answer: 40, right!)
>>
>> d1.join(d2, "id").select(d2("label")).show
>> +-+
>> |label|
>> +-+
>> |   40|
>> +-+
>> (expected answer: 60, wrong!)
>>
>> d1.join(d2, "id").select(d2("label")).explain
>> == Physical Plan ==
>> TungstenProject [label#15]
>>  SortMergeJoin [id#14], [id#30]
>>   TungstenSort [id#14 ASC], false, 0
>>TungstenExchange hashpartitioning(id#14)
>> TungstenProject [_1#12 AS id#14,_2#13 AS label#15]
>>  Filter (_2#13 < 60)
>>   Scan PhysicalRDD[_1#12,_2#13]
>>   TungstenSort [id#30 ASC], false, 0
>>TungstenExchange hashpartitioning(id#30)
>> TungstenProject [_1#12 AS id#30]
>>  Filter (_2#13 = 60)
>>   Scan PhysicalRDD[_1#12,_2#13]
>>
>> Again, this is just a tip of the iceberg. I have spent hours to find out
>> this weird behaviour.
>>
>> Best Regards,
>>
>> Jerry
>>
>>
>> Best Regards,
>>
>> Jerry
>>
>> On Tue, Mar 29, 2016 at 2:01 AM, Jerry Lam <chiling...@gmail.com> wrote:
>>
>>> Hi Sunitha,
>>>
>>> Thank you for the reference Jira. It looks like this is the bug I'm
>>> hitting. Most of the bugs related to this seems to associate with
>>> dataframes derived from the one dataframe (base in this case). In SQL, this
>>> is a self-join and dropping d2.label should not affect d1.label. There are
>>> other bugs I found these three days that are associated with this type of
>>> joins. In one case, if I don't drop the duplicate column BEFORE the join,
>>> spark has preferences on the columns from d2 dataframe. I will see if I can
>>> replicate in a small program like above.
>>>
>>> Best Regards,
>>>
>>> Jerry
>>>
>>>
>>> On Mon, Mar 28, 2016 at 6:27 PM, Sunitha Kambhampati <
>>> skambha...@gmail.com> wrote:
>>>
>>>> Hi Jerry,
>>>>
>>>> I think you are running into an issue similar to SPARK-14040
>>>> https://issues.apache.org/jira/browse/SPARK-14040
>>>>
>>>> One way to resolve it is to use alias.
>>>>
>>>> Here is an example that I tried on trunk and I do not see any
>>>> exceptions.
>>>>
>>>> val d1=base.where($"label" === 0) as("d1")
>>>> val d2=base.where($"label" === 1).as("d2")
>>>>
>>>> d1.join(d2, $"d1.id" === $"d2.id", 
>>>> "left_outer").drop($"d2.label").select($"d1.label")
>>>>
>>>>
>>>> Hope this helps some.
>>>>
>>>> Best regards,
>>>> Sunitha.
>>>>
>>>> On Mar 28, 2016, at 2:34 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>>>
>>>> Hi spark users and developers,
>>>>
>>>> I'm using spark 1.5.1 (I have no choice because this is what we used).
>>>> I ran into some very unexpected behaviour when I did some join operations
>>>> lately. I cannot post my actual code here and the following code is not for
>>>> practical reasons but it should demonstrate the issue.
>>>>
>>>> val base = sc.parallelize(( 0 to 49).map(i =>(i,0)) ++ (50 to
>>>> 99).map((_,1))).toDF("id", "label")
>>>> val d1=base.where($"label" === 0)
>>>> val d2=base.where($"label" === 1)
>>>> d1.join(d2, d1("id") === d2("id"),
>>>> "left_outer").drop(d2("label")).select(d1("label"))
>>>>
>>>>
>>>> The above code will throw an exception saying the column label is not
>>>> found. Do you have a reason for throwing an exception when the column has
>>>> not been dropped for d1("label")?
>>>>
>>>> Best Regards,
>>>>
>>>> Jerry
>>>>
>>>>
>>>>
>>>
>>
>


Re: [Spark SQL] Unexpected Behaviour

2016-03-29 Thread Jerry Lam
Hi Sunitha,

Thank you for the reference Jira. It looks like this is the bug I'm
hitting. Most of the bugs related to this seems to associate with
dataframes derived from the one dataframe (base in this case). In SQL, this
is a self-join and dropping d2.label should not affect d1.label. There are
other bugs I found these three days that are associated with this type of
joins. In one case, if I don't drop the duplicate column BEFORE the join,
spark has preferences on the columns from d2 dataframe. I will see if I can
replicate in a small program like above.

Best Regards,

Jerry


On Mon, Mar 28, 2016 at 6:27 PM, Sunitha Kambhampati <skambha...@gmail.com>
wrote:

> Hi Jerry,
>
> I think you are running into an issue similar to SPARK-14040
> https://issues.apache.org/jira/browse/SPARK-14040
>
> One way to resolve it is to use alias.
>
> Here is an example that I tried on trunk and I do not see any exceptions.
>
> val d1=base.where($"label" === 0) as("d1")
> val d2=base.where($"label" === 1).as("d2")
>
> d1.join(d2, $"d1.id" === $"d2.id", 
> "left_outer").drop($"d2.label").select($"d1.label")
>
>
> Hope this helps some.
>
> Best regards,
> Sunitha.
>
> On Mar 28, 2016, at 2:34 PM, Jerry Lam <chiling...@gmail.com> wrote:
>
> Hi spark users and developers,
>
> I'm using spark 1.5.1 (I have no choice because this is what we used). I
> ran into some very unexpected behaviour when I did some join operations
> lately. I cannot post my actual code here and the following code is not for
> practical reasons but it should demonstrate the issue.
>
> val base = sc.parallelize(( 0 to 49).map(i =>(i,0)) ++ (50 to
> 99).map((_,1))).toDF("id", "label")
> val d1=base.where($"label" === 0)
> val d2=base.where($"label" === 1)
> d1.join(d2, d1("id") === d2("id"),
> "left_outer").drop(d2("label")).select(d1("label"))
>
>
> The above code will throw an exception saying the column label is not
> found. Do you have a reason for throwing an exception when the column has
> not been dropped for d1("label")?
>
> Best Regards,
>
> Jerry
>
>
>


[Spark SQL] Unexpected Behaviour

2016-03-28 Thread Jerry Lam
Hi spark users and developers,

I'm using spark 1.5.1 (I have no choice because this is what we used). I
ran into some very unexpected behaviour when I did some join operations
lately. I cannot post my actual code here and the following code is not for
practical reasons but it should demonstrate the issue.

val base = sc.parallelize(( 0 to 49).map(i =>(i,0)) ++ (50 to
99).map((_,1))).toDF("id", "label")
val d1=base.where($"label" === 0)
val d2=base.where($"label" === 1)
d1.join(d2, d1("id") === d2("id"),
"left_outer").drop(d2("label")).select(d1("label"))


The above code will throw an exception saying the column label is not
found. Do you have a reason for throwing an exception when the column has
not been dropped for d1("label")?

Best Regards,

Jerry


Pattern Matching over a Sequence of rows using Spark

2016-02-28 Thread Jerry Lam
Hi spark users and developers,

Anyone has experience developing pattern matching over a sequence of rows
using Spark? I'm talking about functionality similar to matchpath in Hive
or match_recognize in Oracle DB. It is used for path analysis on
clickstream data. If you know of any libraries that do that, please share
your findings!

Thank you,

Jerry


Fast way to parse JSON in Spark

2016-02-23 Thread Jerry
Hi, 
I had a Java parser using GSON and packaged it as java lib (e.g.
messageparserLib.jar). I use this lib in the Spark streaming and parse the
coming json messages. This is very slow and lots of time lag in
parsing/inserting messages to Cassandra. 
What is the fast way to parse JSON messages in Spark on-the-fly? My Json
message is complex and I want to extract over 30 fields and wrap them in a
case class, then store it in Cassandra with Structure format.
Some candidate solutions are appearing to my mind:
(1) Use Spark SQL to register a temp table and then select the fields what I
want to wrap in the case class.
(2) Use native standard lib of Scala, like
"scala.util.parsing.json.JSON.parseFull" to browse, parse and extract the
fields to map the case class.
(3) Use third-party libraries, play-json, lift-json to browse, parse then
extract the fields to map the case class.
The json messages are coming from Kafka consumer. It's over 1,500 messages
per second. So the message processing (parser and write to Cassandra) is
also need to be completed at the same time (1,500/second).

Thanks in advance.
Jerry

I appreciate it if you can give me any helps and advice. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Fast-way-to-parse-JSON-in-Spark-tp26306.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Streaming with broadcast joins

2016-02-19 Thread Jerry Lam
Hi guys,

I also encounter broadcast dataframe issue not for steaming jobs but regular 
dataframe join. In my case, the executors died probably due to OOM which I 
don't think it should use that much memory. Anyway, I'm going to craft an 
example and send it here to see if it is a bug or something I've misunderstood.

Best Regards,

Jerry

Sent from my iPhone

> On 19 Feb, 2016, at 10:20 am, Sebastian Piu <sebastian@gmail.com> wrote:
> 
> I don't have the code with me now, and I ended moving everything to RDD in 
> the end and using map operations to do some lookups, i.e. instead of 
> broadcasting a Dataframe I ended broadcasting a Map 
> 
> 
>> On Fri, Feb 19, 2016 at 11:39 AM Srikanth <srikanth...@gmail.com> wrote:
>> It didn't fail. It wasn't broadcasting. I just ran the test again and here 
>> are the logs.
>> Every batch is reading the metadata file.
>> 
>>  16/02/19 06:27:02 INFO HadoopRDD: Input split: 
>> file:/shared/data/test-data.txt:0+27
>>  16/02/19 06:27:02 INFO HadoopRDD: Input split: 
>> file:/shared/data/test-data.txt:27+28
>> 
>>  16/02/19 06:27:40 INFO HadoopRDD: Input split: 
>> file:/shared/data/test-data.txt:27+28
>>  16/02/19 06:27:40 INFO HadoopRDD: Input split: 
>> file:/shared/data/test-data.txt:0+27
>> 
>> If I remember, foreachRDD is executed in the driver's context. Not sure how 
>> we'll be able to achieve broadcast in this approach(unless we use SQL 
>> broadcast hint again)
>> 
>> When you say "it worked before",  was it with an older version of spark? I'm 
>> trying this on 1.6.
>> If you still have the streaming job running can you verify in spark UI that 
>> broadcast join is being used. Also, if the files are read and broadcasted 
>> each batch??
>> 
>> Thanks for the help!
>> 
>> 
>>> On Fri, Feb 19, 2016 at 3:49 AM, Sebastian Piu <sebastian@gmail.com> 
>>> wrote:
>>> I don't see anything obviously wrong on your second approach, I've done it 
>>> like that before and it worked. When you say that it didn't work what do 
>>> you mean? did it fail? it didnt broadcast? 
>>> 
>>>> On Thu, Feb 18, 2016 at 11:43 PM Srikanth <srikanth...@gmail.com> wrote:
>>>> Code with SQL broadcast hint. This worked and I was able to see that 
>>>> broadcastjoin was performed.
>>>> 
>>>>val testDF = sqlContext.read.format("com.databricks.spark.csv")
>>>>
>>>> .schema(schema).load("file:///shared/data/test-data.txt") 
>>>> 
>>>>val lines = ssc.socketTextStream("DevNode", )
>>>> 
>>>>lines.foreachRDD((rdd, timestamp) => {
>>>>val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt, 
>>>> l(1))).toDF()
>>>>val resultDF = recordDF.join(testDF, "Age")
>>>>
>>>> resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
>>>> }
>>>> 
>>>> But for every batch this file was read and broadcast was performed. 
>>>> Evaluating the entire DAG I guess.
>>>> 16/02/18 12:24:02 INFO HadoopRDD: Input split: 
>>>> file:/shared/data/test-data.txt:27+28
>>>> 16/02/18 12:24:02 INFO HadoopRDD: Input split: 
>>>> file:/shared/data/test-data.txt:0+27
>>>> 
>>>> 16/02/18 12:25:00 INFO HadoopRDD: Input split: 
>>>> file:/shared/data/test-data.txt:27+28
>>>> 16/02/18 12:25:00 INFO HadoopRDD: Input split: 
>>>> file:/shared/data/test-data.txt:0+27
>>>> 
>>>> 
>>>> Then I changed code to broadcast the dataframe. This didn't work either. 
>>>> Not sure if this is what you meant by broadcasting a dataframe.
>>>> 
>>>>val testDF = 
>>>> sc.broadcast(sqlContext.read.format("com.databricks.spark.csv")
>>>>
>>>> .schema(schema).load("file:///shared/data/test-data.txt") 
>>>> )
>>>> 
>>>>val lines = ssc.socketTextStream("DevNode", )
>>>> 
>>>>lines.foreachRDD((rdd, timestamp) => {
>>>>val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt, 
>>>> l(1))).toDF()
>>>>val resultDF = recordDF.join(testDF.value, "Age")
>>>>
>>>

Re: Optimize the performance of inserting data to Cassandra with Kafka and Spark Streaming

2016-02-17 Thread Jerry
Rado,

Yes. you are correct. A lots of messages are created almost in the same
time (even use milliseconds). I changed to use "UUID.randomUUID()" with
which all messages can be inserted in the Cassandra table without time lag.

Thank you very much!
Jerry Wong

On Wed, Feb 17, 2016 at 1:50 AM, radoburansky [via Apache Spark User List] <
ml-node+s1001560n26246...@n3.nabble.com> wrote:

> Hi Jerry,
>
> How do you know that only 100 messages are inserted? What is the primary
> key of the "tableOfTopicA" Cassandra table? Isn't it possible that you
> map more messages to the same primamary key and therefore they overwrite
> each other in Cassandra?
>
> Regards
>
> Rado
>
> On Tue, Feb 16, 2016 at 10:29 PM, Jerry [via Apache Spark User List] <[hidden
> email] <http:///user/SendEmail.jtp?type=node=26246=0>> wrote:
>
>> Hello,
>>
>> I have questions using Spark streaming to consume data from Kafka and
>> insert to Cassandra database.
>>
>> 5 AWS instances (each one does have 8 cores, 30GB memory) for Spark,
>> Hadoop, Cassandra
>> Scala: 2.10.5
>> Spark: 1.2.2
>> Hadoop: 1.2.1
>> Cassandra 2.0.18
>>
>> 3 AWS instances for Kafka cluster (each one does have 8 cores, 30GB
>> memory)
>> Kafka: 0.8.2.1
>> Zookeeper: 3.4.6
>>
>> Other configurations:
>> batchInterval = 6 Seconds
>> blockInterval = 1500 millis
>> spark.locality.wait = 500 millis
>> #Consumers = 10
>>
>> There are two columns in the cassandra table
>> keySpaceOfTopicA.tableOfTopicA, "createdtime" and "log".
>>
>> Here is a piece of codes,
>>
>> @transient val kstreams = (1 to numConsumers.toInt).map { _ =>
>> KafkaUtils.createStream(ssc, zkeeper, groupId, Map("topicA"->1),
>>  StorageLevel.MEMORY_AND_DISK_SER)
>> .map(_._2.toString).map(Tuple1(_))
>> .map{case(log) => (System.currentTimeMillis(), log)}
>> }
>> @transient val unifiedMessage = ssc.union(kstreams)
>>
>> unifiedMessage.saveToCassandra("keySpaceOfTopicA", "tableOfTopicA",
>> SomeColumns("createdtime", "log"))
>>
>> I created a producer and send messages to Brokers (1000 messages/per
>> time)
>>
>> But the Cassandra can only be inserted about 100 messages in each round
>> of test.
>> Can anybody give me advices why the other messages (about 900 message)
>> can't be consumed?
>> How do I configure and tune the parameters in order to improve the
>> throughput of consumers?
>>
>> Thank you very much for your reading and suggestions in advances.
>>
>> Jerry Wong
>>
>> --
>> If you reply to this email, your message will be added to the discussion
>> below:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Optimize-the-performance-of-inserting-data-to-Cassandra-with-Kafka-and-Spark-Streaming-tp26244.html
>> To start a new topic under Apache Spark User List, email [hidden email]
>> <http:///user/SendEmail.jtp?type=node=26246=1>
>> To unsubscribe from Apache Spark User List, click here.
>> NAML
>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Optimize-the-performance-of-inserting-data-to-Cassandra-with-Kafka-and-Spark-Streaming-tp26244p26246.html
> To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Optimize the performance of inserting data to
> Cassandra with Kafka and Spark Streaming, click here
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=26244=amVycnkua2luZzIud29uZ0BnbWFpbC5jb218MjYyNDR8MTYwMzcyMjg3MQ==>
> .
> NAML
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Optimize-the-performance-of-inserting-data-to-Cassandra-with-Kafka-and-Spark-Streaming-tp26244p26252.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Optimize the performance of inserting data to Cassandra with Kafka and Spark Streaming

2016-02-16 Thread Jerry
Hello, 

I have questions using Spark streaming to consume data from Kafka and insert
to Cassandra database.

5 AWS instances (each one does have 8 cores, 30GB memory) for Spark, Hadoop,
Cassandra
Scala: 2.10.5
Spark: 1.2.2
Hadoop: 1.2.1
Cassandra 2.0.18

3 AWS instances for Kafka cluster (each one does have 8 cores, 30GB memory)
Kafka: 0.8.2.1
Zookeeper: 3.4.6

Other configurations:
batchInterval = 6 Seconds
blockInterval = 1500 millis
spark.locality.wait = 500 millis
#Consumers = 10

There are two columns in the cassandra table keySpaceOfTopicA.tableOfTopicA,
"createdtime" and "log".

Here is a piece of codes,

@transient val kstreams = (1 to numConsumers.toInt).map { _ =>
KafkaUtils.createStream(ssc, zkeeper, groupId, Map("topicA"->1),   
StorageLevel.MEMORY_AND_DISK_SER)
.map(_._2.toString).map(Tuple1(_))
.map{case(log) => (System.currentTimeMillis(), log)}
}
@transient val unifiedMessage = ssc.union(kstreams)

unifiedMessage.saveToCassandra("keySpaceOfTopicA", "tableOfTopicA",
SomeColumns("createdtime", "log"))

I created a producer and send messages to Brokers (1000 messages/per time)

But the Cassandra can only be inserted about 100 messages in each round of
test.
Can anybody give me advices why the other messages (about 900 message) can't
be consumed? 
How do I configure and tune the parameters in order to improve the
throughput of consumers?  

Thank you very much for your reading and suggestions in advances.

Jerry Wong



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Optimize-the-performance-of-inserting-data-to-Cassandra-with-Kafka-and-Spark-Streaming-tp26244.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Convert Iterable to RDD

2016-02-12 Thread Jerry Lam
Not sure if I understand your problem well but why don't you create the file 
locally and then upload to hdfs?

Sent from my iPhone

> On 12 Feb, 2016, at 9:09 am, "seb.arzt"  wrote:
> 
> I have an Iterator of several million elements, which unfortunately won't fit
> into the driver memory at the same time. I would like to save them as object
> file in HDFS:
> 
> Doing so I am running out of memory on the driver:
> 
> Using a stream
> 
> also won't work. I cannot further increase the driver memory. Why doesn't it
> work out of the box? Shouldn't lazy evaluation and garbage collection
> prevent the program from running out of memory? I could manually split the
> Iterator into chunks and serialize each chunk, but it feels wrong. What is
> going wrong here?
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Convert-Iterable-to-RDD-tp16882p26211.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.5.2 memory error

2016-02-03 Thread Jerry Lam
Hi guys,

I was processing 300GB data with lot of joins today. I have a combination
of RDD->Dataframe->RDD due to legacy code. I have memory issues at the
beginning. After fine-tuning those configurations that many already
suggested above, it works with 0 task failed. I think it is fair to say any
memory intensive applications would face similar memory issue. It is not
very fair to say it sucks just because it has memory issues. The memory
issue comes in many forms such as 1. bad framework 2. bad code. 3. bad
framework and bad code. I usually blame bad code first, then bad framework.
If it is truly it fails because of the bad framework (mesos+spark+fine
grain mode = disaster), then make the code changes to adapt to the bad
framework.

I never see code that can magically run with 100% completion when data is
close to terabyte without some serious engineering efforts. A framework can
only help a bit but you are still responsible for making conscious
decisions on how much memory and data you are working with. For instance, a
k-v pair with v having 100GB and you allocate 1GB per executor, this is
going to blow up no matter how many times you execute it.

The memory/core is what I fine tune most. Making sure the task/core has
enough memory to execute to completion. Some times you really don't know
how much data you keep in memory until you profile your application.
(calculate some statistics help).

Best Regards,

Jerry



On Wed, Feb 3, 2016 at 4:58 PM, Nirav Patel <npa...@xactlycorp.com> wrote:

> About OP.
>
> How many cores you assign per executor? May be reducing that number will
> give more portion of executor memory to each task being executed on that
> executor. Others please comment if that make sense.
>
>
>
> On Wed, Feb 3, 2016 at 1:52 PM, Nirav Patel <npa...@xactlycorp.com> wrote:
>
>> I know it;s a strong word but when I have a case open for that with MapR
>> and Databricks for a month and their only solution to change to DataFrame
>> it frustrate you. I know DataFrame/Sql catalyst has internal optimizations
>> but it requires lot of code change. I think there's something fundamentally
>> wrong (or different from hadoop) in framework that is not allowing it to do
>> robust memory management. I know my job is memory hogger, it does a groupBy
>> and perform combinatorics in reducer side; uses additional datastructures
>> at task levels. May be spark is running multiple heavier tasks on same
>> executor and collectively they cause OOM. But suggesting DataFrame is NOT a
>> Solution for me (and most others who already invested time with RDD and
>> loves the type safety it provides). Not even sure if changing to DataFrame
>> will for sure solve the issue.
>>
>> On Wed, Feb 3, 2016 at 1:33 PM, Mohammed Guller <moham...@glassbeam.com>
>> wrote:
>>
>>> Nirav,
>>>
>>> Sorry to hear about your experience with Spark; however, sucks is a very
>>> strong word. Many organizations are processing a lot more than 150GB of
>>> data  with Spark.
>>>
>>>
>>>
>>> Mohammed
>>>
>>> Author: Big Data Analytics with Spark
>>> <http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/>
>>>
>>>
>>>
>>> *From:* Nirav Patel [mailto:npa...@xactlycorp.com]
>>> *Sent:* Wednesday, February 3, 2016 11:31 AM
>>> *To:* Stefan Panayotov
>>> *Cc:* Jim Green; Ted Yu; Jakob Odersky; user@spark.apache.org
>>>
>>> *Subject:* Re: Spark 1.5.2 memory error
>>>
>>>
>>>
>>> Hi Stefan,
>>>
>>>
>>>
>>> Welcome to the OOM - heap space club. I have been struggling with
>>> similar errors (OOM and yarn executor being killed) and failing job or
>>> sending it in retry loops. I bet the same job will run perfectly fine with
>>> less resource on Hadoop MapReduce program. I have tested it for my program
>>> and it does work.
>>>
>>>
>>>
>>> Bottomline from my experience. Spark sucks with memory management when
>>> job is processing large (not huge) amount of data. It's failing for me with
>>> 16gb executors, 10 executors, 6 threads each. And data its processing is
>>> only 150GB! It's 1 billion rows for me. Same job works perfectly fine with
>>> 1 million rows.
>>>
>>>
>>>
>>> Hope that saves you some trouble.
>>>
>>>
>>>
>>> Nirav
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Wed, Feb 3, 2016 at 11:00 AM, Stefan Panayotov <spanayo...@msn.com>
>>> wrote:
>>>
&g

Re: Spark DataFrame Catalyst - Another Oracle like query optimizer?

2016-02-02 Thread Jerry Lam
I think spark dataframe supports more than just SQL. It is more like pandas
dataframe.( I rarely use the SQL feature. )
There are a lot of novelties in dataframe so I think it is quite optimize
for many tasks. The in-memory data structure is very memory efficient. I
just change a very slow RDD program to use Dataframe. The performance gain
is about 2 times while using less CPU. Of course, if you are very good at
optimizing your code, then use pure RDD.


On Tue, Feb 2, 2016 at 8:08 PM, Koert Kuipers  wrote:

> Dataset will have access to some of the catalyst/tungsten optimizations
> while also giving you scala and types. However that is currently
> experimental and not yet as efficient as it could be.
> On Feb 2, 2016 7:50 PM, "Nirav Patel"  wrote:
>
>> Sure, having a common distributed query and compute engine for all kind
>> of data source is alluring concept to market and advertise and to attract
>> potential customers (non engineers, analyst, data scientist). But it's
>> nothing new!..but darn old school. it's taking bits and pieces from
>> existing sql and no-sql technology. It lacks many panache of robust sql
>> engine. I think what put spark aside from everything else on market is RDD!
>> and flexibility and scala-like programming style given to developers which
>> is simply much more attractive to write then sql syntaxes, schema and
>> string constants that falls apart left and right. Writing sql is old
>> school. period.  good luck making money though :)
>>
>> On Tue, Feb 2, 2016 at 4:38 PM, Koert Kuipers  wrote:
>>
>>> To have a product databricks can charge for their sql engine needs to be
>>> competitive. That's why they have these optimizations in catalyst. RDD is
>>> simply no longer the focus.
>>> On Feb 2, 2016 7:17 PM, "Nirav Patel"  wrote:
>>>
 so latest optimizations done on spark 1.4 and 1.5 releases are mostly
 from project Tungsten. Docs says it usues sun.misc.unsafe to convert
 physical rdd structure into byte array at some point for optimized GC and
 memory. My question is why is it only applicable to SQL/Dataframe and not
 RDD? RDD has types too!


 On Mon, Jan 25, 2016 at 11:10 AM, Nirav Patel 
 wrote:

> I haven't gone through much details of spark catalyst optimizer and
> tungston project but we have been advised by databricks support to use
> DataFrame to resolve issues with OOM error that we are getting during Join
> and GroupBy operations. We use spark 1.3.1 and looks like it can not
> perform external sort and blows with OOM.
>
> https://forums.databricks.com/questions/2082/i-got-the-akka-frame-size-exceeded-exception.html
>
> Now it's great that it has been addressed in spark 1.5 release but why
> databricks advocating to switch to DataFrames? It may make sense for batch
> jobs or near real-time jobs but not sure if they do when you are 
> developing
> real time analytics where you want to optimize every millisecond that you
> can. Again I am still knowledging myself with DataFrame APIs and
> optimizations and I will benchmark it against RDD for our batch and
> real-time use case as well.
>
> On Mon, Jan 25, 2016 at 9:47 AM, Mark Hamstra  > wrote:
>
>> What do you think is preventing you from optimizing your
>> own RDD-level transformations and actions?  AFAIK, nothing that has been
>> added in Catalyst precludes you from doing that.  The fact of the matter
>> is, though, that there is less type and semantic information available to
>> Spark from the raw RDD API than from using Spark SQL, DataFrames or
>> DataSets.  That means that Spark itself can't optimize for raw RDDs the
>> same way that it can for higher-level constructs that can leverage
>> Catalyst; but if you want to write your own optimizations based on your 
>> own
>> knowledge of the data types and semantics that are hiding in your raw 
>> RDDs,
>> there's no reason that you can't do that.
>>
>> On Mon, Jan 25, 2016 at 9:35 AM, Nirav Patel 
>> wrote:
>>
>>> Hi,
>>>
>>> Perhaps I should write a blog about this that why spark is focusing
>>> more on writing easier spark jobs and hiding underlaying performance
>>> optimization details from a seasoned spark users. It's one thing to 
>>> provide
>>> such abstract framework that does optimization for you so you don't 
>>> have to
>>> worry about it as a data scientist or data analyst but what about
>>> developers who do not want overhead of SQL and Optimizers and 
>>> unnecessary
>>> abstractions ! Application designer who knows their data and queries 
>>> should
>>> be able to optimize at RDD level transformations and actions. Does spark
>>> provides a way to achieve same 

Re: Spark DataFrame Catalyst - Another Oracle like query optimizer?

2016-02-02 Thread Jerry Lam
Hi Michael,

Is there a section in the spark documentation demonstrate how to serialize
arbitrary objects in Dataframe? The last time I did was using some User
Defined Type (copy from VectorUDT).

Best Regards,

Jerry

On Tue, Feb 2, 2016 at 8:46 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> A principal difference between RDDs and DataFrames/Datasets is that the
>> latter have a schema associated to them. This means that they support only
>> certain types (primitives, case classes and more) and that they are
>> uniform, whereas RDDs can contain any serializable object and must not
>> necessarily be uniform. These properties make it possible to generate very
>> efficient serialization and other optimizations that cannot be achieved
>> with plain RDDs.
>>
>
> You can use Encoder.kryo() as well to serialize arbitrary objects, just
> like with RDDs.
>


Union of RDDs without the overhead of Union

2016-02-02 Thread Jerry Lam
Hi Spark users and developers,

anyone knows how to union two RDDs without the overhead of it?

say rdd1.union(rdd2).saveTextFile(..)
This requires a stage to union the 2 rdds before saveAsTextFile (2 stages).
Is there a way to skip the union step but have the contents of the two rdds
save to the same output text file?

Thank you!

Jerry


Re: Spark DataFrame Catalyst - Another Oracle like query optimizer?

2016-02-02 Thread Jerry Lam
Hi Nirav,
I'm sure you read this?
https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html

There is a benchmark in the article to show that dataframe "can" outperform
RDD implementation by 2 times. Of course, benchmarks can be "made". But
from the code snippet you wrote, I "think" dataframe will choose between
different join implementation based on the data statistics.

I cannot comment on the beauty of it because "beauty is in the eye of the
beholder" LOL
Regarding the comment on error prone, can you say why you think it is the
case? Relative to what other ways?

Best Regards,

Jerry


On Tue, Feb 2, 2016 at 8:59 PM, Nirav Patel <npa...@xactlycorp.com> wrote:

> I dont understand why one thinks RDD of case object doesn't have
> types(schema) ? If spark can convert RDD to DataFrame which means it
> understood the schema. SO then from that point why one has to use SQL
> features to do further processing? If all spark need for optimizations is
> schema then what this additional SQL features buys ? If there is a way to
> avoid SQL feature using DataFrame I don't mind it. But looks like I have to
> convert all my existing transformation to things like
> df1.join(df2,df1('abc') == df2('abc'), 'left_outer') .. that's plain ugly
> and error prone in my opinion.
>
> On Tue, Feb 2, 2016 at 5:49 PM, Jerry Lam <chiling...@gmail.com> wrote:
>
>> Hi Michael,
>>
>> Is there a section in the spark documentation demonstrate how to
>> serialize arbitrary objects in Dataframe? The last time I did was using
>> some User Defined Type (copy from VectorUDT).
>>
>> Best Regards,
>>
>> Jerry
>>
>> On Tue, Feb 2, 2016 at 8:46 PM, Michael Armbrust <mich...@databricks.com>
>> wrote:
>>
>>> A principal difference between RDDs and DataFrames/Datasets is that the
>>>> latter have a schema associated to them. This means that they support only
>>>> certain types (primitives, case classes and more) and that they are
>>>> uniform, whereas RDDs can contain any serializable object and must not
>>>> necessarily be uniform. These properties make it possible to generate very
>>>> efficient serialization and other optimizations that cannot be achieved
>>>> with plain RDDs.
>>>>
>>>
>>> You can use Encoder.kryo() as well to serialize arbitrary objects, just
>>> like with RDDs.
>>>
>>
>>
>
>
>
> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>
> <https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn]
> <https://www.linkedin.com/company/xactly-corporation>  [image: Twitter]
> <https://twitter.com/Xactly>  [image: Facebook]
> <https://www.facebook.com/XactlyCorp>  [image: YouTube]
> <http://www.youtube.com/xactlycorporation>
>


Re: Spark, Mesos, Docker and S3

2016-01-26 Thread Jerry Lam
Hi Mao,

Can you try --jars to include those jars?

Best Regards,

Jerry

Sent from my iPhone

> On 26 Jan, 2016, at 7:02 pm, Mao Geng <m...@sumologic.com> wrote:
> 
> Hi there, 
> 
> I am trying to run Spark on Mesos using a Docker image as executor, as 
> mentioned 
> http://spark.apache.org/docs/latest/running-on-mesos.html#mesos-docker-support.
>  
> 
> I built a docker image using the following Dockerfile (which is based on 
> https://github.com/apache/spark/blob/master/docker/spark-mesos/Dockerfile):
> 
> FROM mesosphere/mesos:0.25.0-0.2.70.ubuntu1404
> 
> # Update the base ubuntu image with dependencies needed for Spark
> RUN apt-get update && \
> apt-get install -y python libnss3 openjdk-7-jre-headless curl
> 
> RUN curl 
> http://www.carfab.com/apachesoftware/spark/spark-1.6.0/spark-1.6.0-bin-hadoop2.6.tgz
>  | tar -xzC /opt && \
> ln -s /opt/spark-1.6.0-bin-hadoop2.6 /opt/spark
> ENV SPARK_HOME /opt/spark
> ENV MESOS_NATIVE_JAVA_LIBRARY /usr/local/lib/libmesos.so
> 
> Then I successfully ran spark-shell via this docker command:
> docker run --rm -it --net=host /: 
> /opt/spark/bin/spark-shell --master mesos://:5050 --conf 
> /: 
> 
> So far so good. Then I wanted to call sc.textFile to load a file from S3, but 
> I was blocked by some issues which I couldn't figure out. I've read 
> https://dzone.com/articles/uniting-spark-parquet-and-s3-as-an-alternative-to 
> and 
> http://blog.encomiabile.it/2015/10/29/apache-spark-amazon-s3-and-apache-mesos,
>  learned that I need to add hadood-aws-2.7.1 and aws-java-sdk-2.7.4 into the 
> executor and driver's classpaths, in order to access s3 files. 
> 
> So, I added following lines into Dockerfile and build a new image. 
> RUN curl 
> https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar
>  -o /opt/spark/lib/aws-java-sdk-1.7.4.jar
> RUN curl 
> http://central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.1/hadoop-aws-2.7.1.jar
>  -o /opt/spark/lib/hadoop-aws-2.7.1.jar
> 
> Then I started spark-shell again with below command: 
> docker run --rm -it --net=host /: 
> /opt/spark/bin/spark-shell --master mesos://:5050 --conf 
> /: --conf 
> spark.executor.extraClassPath=/opt/spark/lib/hadoop-aws-2.7.1.jar:/opt/spark/lib/aws-java-sdk-1.7.4.jar
>  --conf 
> spark.driver.extraClassPath=/opt/spark/lib/hadoop-aws-2.7.1.jar:/opt/spark/lib/aws-java-sdk-1.7.4.jar
> 
> But below command failed when I ran it in spark-shell: 
> scala> sc.textFile("s3a:///").count()
> [Stage 0:>  (0 + 2) / 
> 2]16/01/26 23:05:23 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 
> ip-172-16-14-203.us-west-2.compute.internal): java.lang.RuntimeException: 
> java.lang.ClassNotFoundException: Class 
> org.apache.hadoop.fs.s3a.S3AFileSystem not found
>   at 
> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2074)
>   at 
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2578)
>   at 
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
>   at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
>   at 
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
>   at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
>   at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
>   at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
>   at 
> org.apache.hadoop.mapred.LineRecordReader.(LineRecordReader.java:107)
>   at 
> org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67)
>   at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:237)
>   at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:208)
>   at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:101)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>   at org.apache.spark.scheduler.Task.run(Task.scala:89)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: 

Re: sqlContext.cacheTable("tableName") vs dataFrame.cache()

2016-01-19 Thread Jerry Lam
Is cacheTable similar to asTempTable before? 

Sent from my iPhone

> On 19 Jan, 2016, at 4:18 am, George Sigletos  wrote:
> 
> Thanks Kevin for your reply.
> 
> I was suspecting the same thing as well, although it still does not make much 
> sense to me why would you need to do both:
> myData.cache()
> sqlContext.cacheTable("myData")
> 
> in case you are using both sqlContext and dataframes to execute queries
> 
> dataframe.select(...) and sqlContext.sql("select ...") are equivalent, as far 
> as I understand
> 
> Kind regards,
> George
> 
>> On Fri, Jan 15, 2016 at 6:15 PM, Kevin Mellott  
>> wrote:
>> Hi George,
>> 
>> I believe that sqlContext.cacheTable("tableName") is to be used when you 
>> want to cache the data that is being used within a Spark SQL query. For 
>> example, take a look at the code below.
>>  
>>> val myData = sqlContext.load("com.databricks.spark.csv", Map("path" -> 
>>> "hdfs://somepath/file", "header" -> "false").toDF("col1", "col2")
>>> myData.registerTempTable("myData")  
>> 
>> Here, the usage of cache() will affect ONLY the myData.select query. 
>>> myData.cache() 
>>> myData.select("col1", "col2").show() 
>>  
>> Here, the usage of cacheTable will affect ONLY the sqlContext.sql query.
>>> sqlContext.cacheTable("myData")
>>> sqlContext.sql("SELECT col1, col2 FROM myData").show()
>> 
>> Thanks,
>> Kevin
>> 
>>> On Fri, Jan 15, 2016 at 7:00 AM, George Sigletos  
>>> wrote:
>>> According to the documentation they are exactly the same, but in my queries 
>>> 
>>> dataFrame.cache() 
>>> 
>>> results in much faster execution times vs doing 
>>> 
>>> sqlContext.cacheTable("tableName")
>>> 
>>> Is there any explanation about this? I am not caching the RDD prior to 
>>> creating the dataframe. Using Pyspark on Spark 1.5.2
>>> 
>>> Kind regards,
>>> George
> 


[Spark-SQL] from_unixtime with user-specified timezone

2016-01-18 Thread Jerry Lam
Hi spark users and developers,

what do you do if you want the from_unixtime function in spark sql to
return the timezone you want instead of the system timezone?

Best Regards,

Jerry


Re: [Spark-SQL] from_unixtime with user-specified timezone

2016-01-18 Thread Jerry Lam
Thanks Alex:

So you suggested something like:
from_utc_timestamp(to_utc_timestamp(from_unixtime(1389802875),'America/Montreal'),
'America/Los_Angeles')?

This is a lot of conversion :)

Is there a particular reason not to have from_unixtime to take timezone
information?

I think I will make a UDF if this is the only way out of the box.

Thanks!

Jerry

On Mon, Jan 18, 2016 at 2:32 PM, Alexander Pivovarov <apivova...@gmail.com>
wrote:

> Look at
> to_utc_timestamp
>
> from_utc_timestamp
> On Jan 18, 2016 9:39 AM, "Jerry Lam" <chiling...@gmail.com> wrote:
>
>> Hi spark users and developers,
>>
>> what do you do if you want the from_unixtime function in spark sql to
>> return the timezone you want instead of the system timezone?
>>
>> Best Regards,
>>
>> Jerry
>>
>


Re: DataFrameWriter on partitionBy for parquet eat all RAM

2016-01-15 Thread Jerry Lam
Hi Michael,

Thanks for sharing the tip. It will help to the write path of the partitioned 
table. 
Do you have similar suggestion on reading the partitioned table back when there 
is a million of distinct values on the partition field (for example on user 
id)? Last time I have trouble to read a partitioned table because it takes very 
long (over hours on s3) to execute the 
sqlcontext.read.parquet("partitioned_table").

Best Regards,

Jerry

Sent from my iPhone

> On 15 Jan, 2016, at 3:59 pm, Michael Armbrust <mich...@databricks.com> wrote:
> 
> See here for some workarounds: 
> https://issues.apache.org/jira/browse/SPARK-12546
> 
>> On Thu, Jan 14, 2016 at 6:46 PM, Jerry Lam <chiling...@gmail.com> wrote:
>> Hi Arkadiusz,
>> 
>> the partitionBy is not designed to have many distinct value the last time I 
>> used it. If you search in the mailing list, I think there are couple of 
>> people also face similar issues. For example, in my case, it won't work over 
>> a million distinct user ids. It will require a lot of memory and very long 
>> time to read the table back. 
>> 
>> Best Regards,
>> 
>> Jerry
>> 
>>> On Thu, Jan 14, 2016 at 2:31 PM, Arkadiusz Bicz <arkadiusz.b...@gmail.com> 
>>> wrote:
>>> Hi
>>> 
>>> What is the proper configuration for saving parquet partition with
>>> large number of repeated keys?
>>> 
>>> On bellow code I load 500 milion rows of data and partition it on
>>> column with not so many different values.
>>> 
>>> Using spark-shell with 30g per executor and driver and 3 executor cores
>>> 
>>> sqlContext.read.load("hdfs://notpartitioneddata").write.partitionBy("columnname").parquet("partitioneddata")
>>> 
>>> 
>>> Job failed because not enough memory in executor :
>>> 
>>> WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Container killed by
>>> YARN for exceeding memory limits. 43.5 GB of 43.5 GB physical memory
>>> used. Consider boosting spark.yarn.executor.memoryOverhead.
>>> 16/01/14 17:32:38 ERROR YarnScheduler: Lost executor 11 on
>>> datanode2.babar.poc: Container killed by YARN for exceeding memory
>>> limits. 43.5 GB of 43.5 GB physical memory used. Consider boosting
>>> spark.yarn.executor.memoryOverhead.
>>> 
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
> 


Re: How To Save TF-IDF Model In PySpark

2016-01-15 Thread Jerry Lam
Can you save it to parquet with the vector in one field?

Sent from my iPhone

> On 15 Jan, 2016, at 7:33 pm, Andy Davidson  
> wrote:
> 
> Are you using 1.6.0 or an older version?
> 
> I think I remember something in 1.5.1 saying save was not implemented in 
> python.
> 
> 
> The current doc does not say anything about save()
> http://spark.apache.org/docs/latest/mllib-feature-extraction.html#tf-idf
> 
> http://spark.apache.org/docs/latest/ml-guide.html#saving-and-loading-pipelines
> "Often times it is worth it to save a model or a pipeline to disk for later 
> use. In Spark 1.6, a model import/export functionality was added to the 
> Pipeline API. Most basic transformers are supported as well as some of the 
> more basic ML models. Please refer to the algorithm’s API documentation to 
> see if saving and loading is supported."
> 
> andy
> 
> 
> 
> 
> From: Asim Jalis 
> Date: Friday, January 15, 2016 at 4:02 PM
> To: "user @spark" 
> Subject: How To Save TF-IDF Model In PySpark
> 
> Hi,
> 
> I am trying to save a TF-IDF model in PySpark. Looks like this is not
> supported. 
> 
> Using `model.save()` causes:
> 
> AttributeError: 'IDFModel' object has no attribute 'save'
> 
> Using `pickle` causes:
> 
> TypeError: can't pickle lock objects
> 
> Does anyone have suggestions 
> 
> Thanks!
> 
> Asim
> 
> Here is the full repro. Start pyspark shell and then run this code in
> it.
> 
> ```
> # Imports
> from pyspark import SparkContext
> from pyspark.mllib.feature import HashingTF
> 
> from pyspark.mllib.regression import LabeledPoint
> from pyspark.mllib.regression import Vectors
> from pyspark.mllib.feature import IDF
> 
> # Create some data
> n = 4
> freqs = [
> Vectors.sparse(n, (1, 3), (1.0, 2.0)), 
> Vectors.dense([0.0, 1.0, 2.0, 3.0]), 
> Vectors.sparse(n, [1], [1.0])]
> data = sc.parallelize(freqs)
> idf = IDF()
> model = idf.fit(data)
> tfidf = model.transform(data)
> 
> # View
> for r in tfidf.collect(): print(r)
> 
> # Try to save it
> model.save("foo.model")
> 
> # Try to save it with Pickle
> import pickle
> pickle.dump(model, open("model.p", "wb"))
> pickle.dumps(model)
> ```


Re: DataFrameWriter on partitionBy for parquet eat all RAM

2016-01-14 Thread Jerry Lam
Hi Arkadiusz,

the partitionBy is not designed to have many distinct value the last time I
used it. If you search in the mailing list, I think there are couple of
people also face similar issues. For example, in my case, it won't work
over a million distinct user ids. It will require a lot of memory and very
long time to read the table back.

Best Regards,

Jerry

On Thu, Jan 14, 2016 at 2:31 PM, Arkadiusz Bicz <arkadiusz.b...@gmail.com>
wrote:

> Hi
>
> What is the proper configuration for saving parquet partition with
> large number of repeated keys?
>
> On bellow code I load 500 milion rows of data and partition it on
> column with not so many different values.
>
> Using spark-shell with 30g per executor and driver and 3 executor cores
>
>
> sqlContext.read.load("hdfs://notpartitioneddata").write.partitionBy("columnname").parquet("partitioneddata")
>
>
> Job failed because not enough memory in executor :
>
> WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Container killed by
> YARN for exceeding memory limits. 43.5 GB of 43.5 GB physical memory
> used. Consider boosting spark.yarn.executor.memoryOverhead.
> 16/01/14 17:32:38 ERROR YarnScheduler: Lost executor 11 on
> datanode2.babar.poc: Container killed by YARN for exceeding memory
> limits. 43.5 GB of 43.5 GB physical memory used. Consider boosting
> spark.yarn.executor.memoryOverhead.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


[Spark SQL]: Issues with writing dataframe with Append Mode to Parquet

2016-01-12 Thread Jerry Lam
Hi spark users and developers,

I wonder if the following observed behaviour is expected. I'm writing
dataframe to parquet into s3. I'm using append mode when I'm writing to it.
Since I'm using org.apache.spark.sql.
parquet.DirectParquetOutputCommitter as
the spark.sql.parquet.output.committer.class, I expected that no _temporary
files will be generated.

I appended the same dataframe twice to the same directory. The first
"append" works as expected; no _temporary files are generated because of
the DirectParquetOutputCommitter but the second "append" does generate
_temporary files and then it moved the files under the _temporary to the
output directory.

Is this behavior expected? Or is it a bug?

I'm using Spark 1.5.2.

Best Regards,

Jerry


Re: [Spark SQL]: Issues with writing dataframe with Append Mode to Parquet

2016-01-12 Thread Jerry Lam
Hi Michael,

Thanks for the hint! So if I turn off speculation, consecutive appends like
above will not produce temporary files right?
Which class is responsible for disabling the use of DirectOutputCommitter?

Thank you,

Jerry


On Tue, Jan 12, 2016 at 4:12 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> There can be dataloss when you are using the DirectOutputCommitter and
> speculation is turned on, so we disable it automatically.
>
> On Tue, Jan 12, 2016 at 1:11 PM, Jerry Lam <chiling...@gmail.com> wrote:
>
>> Hi spark users and developers,
>>
>> I wonder if the following observed behaviour is expected. I'm writing
>> dataframe to parquet into s3. I'm using append mode when I'm writing to it.
>> Since I'm using org.apache.spark.sql.
>> parquet.DirectParquetOutputCommitter as
>> the spark.sql.parquet.output.committer.class, I expected that no _temporary
>> files will be generated.
>>
>> I appended the same dataframe twice to the same directory. The first
>> "append" works as expected; no _temporary files are generated because of
>> the DirectParquetOutputCommitter but the second "append" does generate
>> _temporary files and then it moved the files under the _temporary to the
>> output directory.
>>
>> Is this behavior expected? Or is it a bug?
>>
>> I'm using Spark 1.5.2.
>>
>> Best Regards,
>>
>> Jerry
>>
>
>


Re: SparkSQL integration issue with AWS S3a

2016-01-06 Thread Jerry Lam
Hi Kostiantyn,

Yes. If security is a concern then this approach cannot satisfy it. The keys 
are visible in the properties files. If the goal is to hide them, you might be 
able go a bit further with this approach. Have you look at spark security page?

Best Regards,

Jerry 

Sent from my iPhone

> On 6 Jan, 2016, at 8:49 am, Kostiantyn Kudriavtsev 
> <kudryavtsev.konstan...@gmail.com> wrote:
> 
> Hi guys,
> 
> the only one big issue with this approach:
>>> spark.hadoop.s3a.access.key  is now visible everywhere, in logs, in spark 
>>> webui and is not secured at all...
> 
>> On Jan 2, 2016, at 11:13 AM, KOSTIANTYN Kudriavtsev 
>> <kudryavtsev.konstan...@gmail.com> wrote:
>> 
>> thanks Jerry, it works!
>> really appreciate your help 
>> 
>> Thank you,
>> Konstantin Kudryavtsev
>> 
>>> On Fri, Jan 1, 2016 at 4:35 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>> Hi Kostiantyn,
>>> 
>>> You should be able to use spark.conf to specify s3a keys.
>>> 
>>> I don't remember exactly but you can add hadoop properties by prefixing 
>>> spark.hadoop.*
>>> * is the s3a properties. For instance,
>>> 
>>> spark.hadoop.s3a.access.key wudjgdueyhsj
>>> 
>>> Of course, you need to make sure the property key is right. I'm using my 
>>> phone so I cannot easily verifying.
>>> 
>>> Then you can specify different user using different spark.conf via 
>>> --properties-file when spark-submit
>>> 
>>> HTH,
>>> 
>>> Jerry
>>> 
>>> Sent from my iPhone
>>> 
>>>> On 31 Dec, 2015, at 2:06 pm, KOSTIANTYN Kudriavtsev 
>>>> <kudryavtsev.konstan...@gmail.com> wrote:
>>>> 
>>>> Hi Jerry,
>>>> 
>>>> what you suggested looks to be working (I put hdfs-site.xml into 
>>>> $SPARK_HOME/conf folder), but could you shed some light on how it can be 
>>>> federated per user?
>>>> Thanks in advance!
>>>> 
>>>> Thank you,
>>>> Konstantin Kudryavtsev
>>>> 
>>>>> On Wed, Dec 30, 2015 at 2:37 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>>>> Hi Kostiantyn,
>>>>> 
>>>>> I want to confirm that it works first by using hdfs-site.xml. If yes, you 
>>>>> could define different spark-{user-x}.conf and source them during 
>>>>> spark-submit. let us know if hdfs-site.xml works first. It should.
>>>>> 
>>>>> Best Regards,
>>>>> 
>>>>> Jerry
>>>>> 
>>>>> Sent from my iPhone
>>>>> 
>>>>>> On 30 Dec, 2015, at 2:31 pm, KOSTIANTYN Kudriavtsev 
>>>>>> <kudryavtsev.konstan...@gmail.com> wrote:
>>>>>> 
>>>>>> Hi Jerry,
>>>>>> 
>>>>>> I want to run different jobs on different S3 buckets - different AWS 
>>>>>> creds - on the same instances. Could you shed some light if it's 
>>>>>> possible to achieve with hdfs-site?
>>>>>> 
>>>>>> Thank you,
>>>>>> Konstantin Kudryavtsev
>>>>>> 
>>>>>>> On Wed, Dec 30, 2015 at 2:10 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>>>>>> Hi Kostiantyn,
>>>>>>> 
>>>>>>> Can you define those properties in hdfs-site.xml and make sure it is 
>>>>>>> visible in the class path when you spark-submit? It looks like a conf 
>>>>>>> sourcing issue to me. 
>>>>>>> 
>>>>>>> Cheers,
>>>>>>> 
>>>>>>> Sent from my iPhone
>>>>>>> 
>>>>>>>> On 30 Dec, 2015, at 1:59 pm, KOSTIANTYN Kudriavtsev 
>>>>>>>> <kudryavtsev.konstan...@gmail.com> wrote:
>>>>>>>> 
>>>>>>>> Chris,
>>>>>>>> 
>>>>>>>> thanks for the hist with AIM roles, but in my case  I need to run 
>>>>>>>> different jobs with different S3 permissions on the same cluster, so 
>>>>>>>> this approach doesn't work for me as far as I understood it
>>>>>>>> 
>>>>>>>> Thank you,
>>>>>>>> Konstantin Kudryavtsev
>>>>>>>> 
>>>>>>>>> On Wed, Dec 30, 2015 at 1:48 PM, Chris Fregly <

Re: SparkSQL integration issue with AWS S3a

2016-01-01 Thread Jerry Lam
Hi Kostiantyn,

You should be able to use spark.conf to specify s3a keys.

I don't remember exactly but you can add hadoop properties by prefixing 
spark.hadoop.*
* is the s3a properties. For instance,

spark.hadoop.s3a.access.key wudjgdueyhsj

Of course, you need to make sure the property key is right. I'm using my phone 
so I cannot easily verifying.

Then you can specify different user using different spark.conf via 
--properties-file when spark-submit

HTH,

Jerry

Sent from my iPhone

> On 31 Dec, 2015, at 2:06 pm, KOSTIANTYN Kudriavtsev 
> <kudryavtsev.konstan...@gmail.com> wrote:
> 
> Hi Jerry,
> 
> what you suggested looks to be working (I put hdfs-site.xml into 
> $SPARK_HOME/conf folder), but could you shed some light on how it can be 
> federated per user?
> Thanks in advance!
> 
> Thank you,
> Konstantin Kudryavtsev
> 
>> On Wed, Dec 30, 2015 at 2:37 PM, Jerry Lam <chiling...@gmail.com> wrote:
>> Hi Kostiantyn,
>> 
>> I want to confirm that it works first by using hdfs-site.xml. If yes, you 
>> could define different spark-{user-x}.conf and source them during 
>> spark-submit. let us know if hdfs-site.xml works first. It should.
>> 
>> Best Regards,
>> 
>> Jerry
>> 
>> Sent from my iPhone
>> 
>>> On 30 Dec, 2015, at 2:31 pm, KOSTIANTYN Kudriavtsev 
>>> <kudryavtsev.konstan...@gmail.com> wrote:
>>> 
>>> Hi Jerry,
>>> 
>>> I want to run different jobs on different S3 buckets - different AWS creds 
>>> - on the same instances. Could you shed some light if it's possible to 
>>> achieve with hdfs-site?
>>> 
>>> Thank you,
>>> Konstantin Kudryavtsev
>>> 
>>>> On Wed, Dec 30, 2015 at 2:10 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>>> Hi Kostiantyn,
>>>> 
>>>> Can you define those properties in hdfs-site.xml and make sure it is 
>>>> visible in the class path when you spark-submit? It looks like a conf 
>>>> sourcing issue to me. 
>>>> 
>>>> Cheers,
>>>> 
>>>> Sent from my iPhone
>>>> 
>>>>> On 30 Dec, 2015, at 1:59 pm, KOSTIANTYN Kudriavtsev 
>>>>> <kudryavtsev.konstan...@gmail.com> wrote:
>>>>> 
>>>>> Chris,
>>>>> 
>>>>> thanks for the hist with AIM roles, but in my case  I need to run 
>>>>> different jobs with different S3 permissions on the same cluster, so this 
>>>>> approach doesn't work for me as far as I understood it
>>>>> 
>>>>> Thank you,
>>>>> Konstantin Kudryavtsev
>>>>> 
>>>>>> On Wed, Dec 30, 2015 at 1:48 PM, Chris Fregly <ch...@fregly.com> wrote:
>>>>>> couple things:
>>>>>> 
>>>>>> 1) switch to IAM roles if at all possible - explicitly passing AWS 
>>>>>> credentials is a long and lonely road in the end
>>>>>> 
>>>>>> 2) one really bad workaround/hack is to run a job that hits every worker 
>>>>>> and writes the credentials to the proper location (~/.awscredentials or 
>>>>>> whatever)
>>>>>> 
>>>>>> ^^ i wouldn't recommend this. ^^  it's horrible and doesn't handle 
>>>>>> autoscaling, but i'm mentioning it anyway as it is a temporary fix.
>>>>>> 
>>>>>> if you switch to IAM roles, things become a lot easier as you can 
>>>>>> authorize all of the EC2 instances in the cluster - and handles 
>>>>>> autoscaling very well - and at some point, you will want to autoscale.
>>>>>> 
>>>>>>> On Wed, Dec 30, 2015 at 1:08 PM, KOSTIANTYN Kudriavtsev 
>>>>>>> <kudryavtsev.konstan...@gmail.com> wrote:
>>>>>>> Chris,
>>>>>>> 
>>>>>>>  good question, as you can see from the code I set up them on driver, 
>>>>>>> so I expect they will be propagated to all nodes, won't them?
>>>>>>> 
>>>>>>> Thank you,
>>>>>>> Konstantin Kudryavtsev
>>>>>>> 
>>>>>>>> On Wed, Dec 30, 2015 at 1:06 PM, Chris Fregly <ch...@fregly.com> wrote:
>>>>>>>> are the credentials visible from each Worker node to all the Executor 
>>>>>>>> JVMs on each Worker?
>>>>>>>> 
>>>>>>>>> On Dec 30, 2015, at 12:45 PM, K

Re: SparkSQL integration issue with AWS S3a

2015-12-30 Thread Jerry Lam
Hi Kostiantyn,

Can you define those properties in hdfs-site.xml and make sure it is visible in 
the class path when you spark-submit? It looks like a conf sourcing issue to 
me. 

Cheers,

Sent from my iPhone

> On 30 Dec, 2015, at 1:59 pm, KOSTIANTYN Kudriavtsev 
>  wrote:
> 
> Chris,
> 
> thanks for the hist with AIM roles, but in my case  I need to run different 
> jobs with different S3 permissions on the same cluster, so this approach 
> doesn't work for me as far as I understood it
> 
> Thank you,
> Konstantin Kudryavtsev
> 
>> On Wed, Dec 30, 2015 at 1:48 PM, Chris Fregly  wrote:
>> couple things:
>> 
>> 1) switch to IAM roles if at all possible - explicitly passing AWS 
>> credentials is a long and lonely road in the end
>> 
>> 2) one really bad workaround/hack is to run a job that hits every worker and 
>> writes the credentials to the proper location (~/.awscredentials or whatever)
>> 
>> ^^ i wouldn't recommend this. ^^  it's horrible and doesn't handle 
>> autoscaling, but i'm mentioning it anyway as it is a temporary fix.
>> 
>> if you switch to IAM roles, things become a lot easier as you can authorize 
>> all of the EC2 instances in the cluster - and handles autoscaling very well 
>> - and at some point, you will want to autoscale.
>> 
>>> On Wed, Dec 30, 2015 at 1:08 PM, KOSTIANTYN Kudriavtsev 
>>>  wrote:
>>> Chris,
>>> 
>>>  good question, as you can see from the code I set up them on driver, so I 
>>> expect they will be propagated to all nodes, won't them?
>>> 
>>> Thank you,
>>> Konstantin Kudryavtsev
>>> 
 On Wed, Dec 30, 2015 at 1:06 PM, Chris Fregly  wrote:
 are the credentials visible from each Worker node to all the Executor JVMs 
 on each Worker?
 
> On Dec 30, 2015, at 12:45 PM, KOSTIANTYN Kudriavtsev 
>  wrote:
> 
> Dear Spark community,
> 
> I faced the following issue with trying accessing data on S3a, my code is 
> the following:
> 
> val sparkConf = new SparkConf()
> 
> val sc = new SparkContext(sparkConf)
> sc.hadoopConfiguration.set("fs.s3a.impl", 
> "org.apache.hadoop.fs.s3a.S3AFileSystem")
> sc.hadoopConfiguration.set("fs.s3a.access.key", "---")
> sc.hadoopConfiguration.set("fs.s3a.secret.key", "---")
> val sqlContext = SQLContext.getOrCreate(sc)
> val df = sqlContext.read.parquet(...)
> df.count
> 
> It results in the following exception and log messages:
> 15/12/30 17:00:32 DEBUG AWSCredentialsProviderChain: Unable to load 
> credentials from BasicAWSCredentialsProvider: Access key or secret key is 
> null
> 15/12/30 17:00:32 DEBUG EC2MetadataClient: Connecting to EC2 instance 
> metadata service at URL: 
> http://x.x.x.x/latest/meta-data/iam/security-credentials/
> 15/12/30 17:00:32 DEBUG AWSCredentialsProviderChain: Unable to load 
> credentials from InstanceProfileCredentialsProvider: The requested 
> metadata is not found at 
> http://x.x.x.x/latest/meta-data/iam/security-credentials/
> 15/12/30 17:00:32 ERROR Executor: Exception in task 1.0 in stage 1.0 (TID 
> 3)
> com.amazonaws.AmazonClientException: Unable to load AWS credentials from 
> any provider in the chain
>   at 
> com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297)
> 
> I run standalone spark 1.5.2 and using hadoop 2.7.1
> 
> any ideas/workarounds?
> 
> AWS credentials are correct for this bucket
> 
> Thank you,
> Konstantin Kudryavtsev
>> 
>> 
>> 
>> -- 
>> 
>> Chris Fregly
>> Principal Data Solutions Engineer
>> IBM Spark Technology Center, San Francisco, CA
>> http://spark.tc | http://advancedspark.com
> 


Re: SparkSQL integration issue with AWS S3a

2015-12-30 Thread Jerry Lam
Hi Kostiantyn,

I want to confirm that it works first by using hdfs-site.xml. If yes, you could 
define different spark-{user-x}.conf and source them during spark-submit. let 
us know if hdfs-site.xml works first. It should.

Best Regards,

Jerry

Sent from my iPhone

> On 30 Dec, 2015, at 2:31 pm, KOSTIANTYN Kudriavtsev 
> <kudryavtsev.konstan...@gmail.com> wrote:
> 
> Hi Jerry,
> 
> I want to run different jobs on different S3 buckets - different AWS creds - 
> on the same instances. Could you shed some light if it's possible to achieve 
> with hdfs-site?
> 
> Thank you,
> Konstantin Kudryavtsev
> 
>> On Wed, Dec 30, 2015 at 2:10 PM, Jerry Lam <chiling...@gmail.com> wrote:
>> Hi Kostiantyn,
>> 
>> Can you define those properties in hdfs-site.xml and make sure it is visible 
>> in the class path when you spark-submit? It looks like a conf sourcing issue 
>> to me. 
>> 
>> Cheers,
>> 
>> Sent from my iPhone
>> 
>>> On 30 Dec, 2015, at 1:59 pm, KOSTIANTYN Kudriavtsev 
>>> <kudryavtsev.konstan...@gmail.com> wrote:
>>> 
>>> Chris,
>>> 
>>> thanks for the hist with AIM roles, but in my case  I need to run different 
>>> jobs with different S3 permissions on the same cluster, so this approach 
>>> doesn't work for me as far as I understood it
>>> 
>>> Thank you,
>>> Konstantin Kudryavtsev
>>> 
>>>> On Wed, Dec 30, 2015 at 1:48 PM, Chris Fregly <ch...@fregly.com> wrote:
>>>> couple things:
>>>> 
>>>> 1) switch to IAM roles if at all possible - explicitly passing AWS 
>>>> credentials is a long and lonely road in the end
>>>> 
>>>> 2) one really bad workaround/hack is to run a job that hits every worker 
>>>> and writes the credentials to the proper location (~/.awscredentials or 
>>>> whatever)
>>>> 
>>>> ^^ i wouldn't recommend this. ^^  it's horrible and doesn't handle 
>>>> autoscaling, but i'm mentioning it anyway as it is a temporary fix.
>>>> 
>>>> if you switch to IAM roles, things become a lot easier as you can 
>>>> authorize all of the EC2 instances in the cluster - and handles 
>>>> autoscaling very well - and at some point, you will want to autoscale.
>>>> 
>>>>> On Wed, Dec 30, 2015 at 1:08 PM, KOSTIANTYN Kudriavtsev 
>>>>> <kudryavtsev.konstan...@gmail.com> wrote:
>>>>> Chris,
>>>>> 
>>>>>  good question, as you can see from the code I set up them on driver, so 
>>>>> I expect they will be propagated to all nodes, won't them?
>>>>> 
>>>>> Thank you,
>>>>> Konstantin Kudryavtsev
>>>>> 
>>>>>> On Wed, Dec 30, 2015 at 1:06 PM, Chris Fregly <ch...@fregly.com> wrote:
>>>>>> are the credentials visible from each Worker node to all the Executor 
>>>>>> JVMs on each Worker?
>>>>>> 
>>>>>>> On Dec 30, 2015, at 12:45 PM, KOSTIANTYN Kudriavtsev 
>>>>>>> <kudryavtsev.konstan...@gmail.com> wrote:
>>>>>>> 
>>>>>>> Dear Spark community,
>>>>>>> 
>>>>>>> I faced the following issue with trying accessing data on S3a, my code 
>>>>>>> is the following:
>>>>>>> 
>>>>>>> val sparkConf = new SparkConf()
>>>>>>> 
>>>>>>> val sc = new SparkContext(sparkConf)
>>>>>>> sc.hadoopConfiguration.set("fs.s3a.impl", 
>>>>>>> "org.apache.hadoop.fs.s3a.S3AFileSystem")
>>>>>>> sc.hadoopConfiguration.set("fs.s3a.access.key", "---")
>>>>>>> sc.hadoopConfiguration.set("fs.s3a.secret.key", "---")
>>>>>>> val sqlContext = SQLContext.getOrCreate(sc)
>>>>>>> val df = sqlContext.read.parquet(...)
>>>>>>> df.count
>>>>>>> 
>>>>>>> It results in the following exception and log messages:
>>>>>>> 15/12/30 17:00:32 DEBUG AWSCredentialsProviderChain: Unable to load 
>>>>>>> credentials from BasicAWSCredentialsProvider: Access key or secret key 
>>>>>>> is null
>>>>>>> 15/12/30 17:00:32 DEBUG EC2MetadataClient: Connecting to EC2 instance 
>>>>>>> metadata service at URL: 
>>>>>>> h

Re: ideal number of executors per machine

2015-12-15 Thread Jerry Lam
Hi Veljko,

I usually ask the following questions: “how many memory per task?” then "How 
many cpu per task?” then I calculate based on the memory and cpu requirements 
per task. You might be surprise (maybe not you, but at least I am :) ) that 
many OOM issues are actually because of this. 

Best Regards,

Jerry

> On Dec 15, 2015, at 5:18 PM, Jakob Odersky <joder...@gmail.com> wrote:
> 
> Hi Veljko,
> I would assume keeping the number of executors per machine to a minimum is 
> best for performance (as long as you consider memory requirements as well).
> Each executor is a process that can run tasks in multiple threads. On a 
> kernel/hardware level, thread switches are much cheaper than process switches 
> and therefore having a single executor with multiple threads gives a better 
> over-all performance that multiple executors with less threads.
> 
> --Jakob
> 
> On 15 December 2015 at 13:07, Veljko Skarich <veljko.skar...@gmail.com 
> <mailto:veljko.skar...@gmail.com>> wrote:
> Hi, 
> 
> I'm looking for suggestions on the ideal number of executors per machine. I 
> run my jobs on 64G 32 core machines, and at the moment I have one executor 
> running per machine, on the spark standalone cluster.
> 
>  I could not find many guidelines for figuring out the ideal number of 
> executors; the Spark official documentation merely recommends not having more 
> than 64G per executor to avoid GC issues. Anyone have and advice on this?
> 
> thank you. 
> 



Re: spark-ec2 vs. EMR

2015-12-02 Thread Jerry Lam
Hi Dana,

Yes, we get VPC + EMR working but I'm not the person who deploys it. It is
related to subnet as Alex points out.

Just to want to add another point, spark-ec2 is nice to keep and improve
because it allows users to any version of spark (nightly-build for
example). EMR does not allow you to do that without manual process.

Best Regards,

Jerry

On Wed, Dec 2, 2015 at 1:02 PM, Alexander Pivovarov <apivova...@gmail.com>
wrote:

> Do you think it's a security issue if EMR started in VPC with a subnet
> having Auto-assign Public IP: Yes
>
> you can remove all Inbound rules having 0.0.0.0/0 Source in master and
> slave Security Group
> So, master and slave boxes will be accessible only for users who are on VPN
>
>
>
>
> On Wed, Dec 2, 2015 at 9:44 AM, Dana Powers <dana.pow...@gmail.com> wrote:
>
>> EMR was a pain to configure on a private VPC last I tried. Has anyone had
>> success with that? I found spark-ec2 easier to use w private networking,
>> but also agree that I would use for prod.
>>
>> -Dana
>> On Dec 1, 2015 12:29 PM, "Alexander Pivovarov" <apivova...@gmail.com>
>> wrote:
>>
>>> 1. Emr 4.2.0 has Zeppelin as an alternative to DataBricks Notebooks
>>>
>>> 2. Emr has Ganglia 3.6.0
>>>
>>> 3. Emr has hadoop fs settings to make s3 work fast (direct.EmrFileSystem)
>>>
>>> 4. EMR has s3 keys in hadoop configs
>>>
>>> 5. EMR allows to resize cluster on fly.
>>>
>>> 6. EMR has aws sdk in spark classpath. Helps to reduce app assembly jar
>>> size
>>>
>>> 7. ec2 script installs all in /root, EMR has dedicated users: hadoop,
>>> zeppelin, etc. EMR is similar to Cloudera or Hortonworks
>>>
>>> 8. There are at least 3 spark-ec2 projects. (in apache/spark, in mesos,
>>> in amplab). Master branch in spark has outdated ec2 script. Other projects
>>> have broken links in readme. WHAT A MESS!
>>>
>>> 9. ec2 script has bad documentation and non informative error messages.
>>> e.g. readme does not say anything about --private-ips option. If you did
>>> not add the flag it will connect to empty string host (localhost) instead
>>> of master. Fixed only last week. Not sure if fixed in all branches
>>>
>>> 10. I think Amazon will include spark-jobserver to EMR soon.
>>>
>>> 11. You do not need to be aws expert to start EMR cluster. Users can use
>>> EMR web ui to start cluster to run some jobs or work in Zeppelun during the
>>> day
>>>
>>> 12. EMR cluster starts in abour 8 min. Ec2 script works longer and you
>>> need to be online.
>>> On Dec 1, 2015 9:22 AM, "Jerry Lam" <chiling...@gmail.com> wrote:
>>>
>>>> Simply put:
>>>>
>>>> EMR = Hadoop Ecosystem (Yarn, HDFS, etc) + Spark + EMRFS + Amazon EMR
>>>> API + Selected Instance Types + Amazon EC2 Friendly (bootstrapping)
>>>> spark-ec2 = HDFS + Yarn (Optional) + Spark (Standalone Default) + Any
>>>> Instance Type
>>>>
>>>> I use spark-ec2 for prototyping and I have never use it for production.
>>>>
>>>> just my $0.02
>>>>
>>>>
>>>>
>>>> On Dec 1, 2015, at 11:15 AM, Nick Chammas <nicholas.cham...@gmail.com>
>>>> wrote:
>>>>
>>>> Pinging this thread in case anyone has thoughts on the matter they want
>>>> to share.
>>>>
>>>> On Sat, Nov 21, 2015 at 11:32 AM Nicholas Chammas <[hidden email]>
>>>> wrote:
>>>>
>>>>> Spark has come bundled with spark-ec2
>>>>> <http://spark.apache.org/docs/latest/ec2-scripts.html> for many
>>>>> years. At the same time, EMR has been capable of running Spark for a 
>>>>> while,
>>>>> and earlier this year it added "official" support
>>>>> <https://aws.amazon.com/blogs/aws/new-apache-spark-on-amazon-emr/>.
>>>>>
>>>>> If you're looking for a way to provision Spark clusters, there are
>>>>> some clear differences between these 2 options. I think the biggest one
>>>>> would be that EMR is a "production" solution backed by a company, whereas
>>>>> spark-ec2 is not really intended for production use (as far as I know).
>>>>>
>>>>> That particular difference in intended use may or may not matter to
>>>>> you, but I'm curious:
>>>>>
>>>>> What are some of the other differences between the 2 that do matter to
>>>>> you? If you were considering these 2 solutions for your use case at one
>>>>> point recently, why did you choose one over the other?
>>>>>
>>>>> I'd be especially interested in hearing about why people might choose
>>>>> spark-ec2 over EMR, since the latter option seems to have shaped up nicely
>>>>> this year.
>>>>>
>>>>> Nick
>>>>>
>>>>>
>>>> --
>>>> View this message in context: Re: spark-ec2 vs. EMR
>>>> <http://apache-spark-user-list.1001560.n3.nabble.com/Re-spark-ec2-vs-EMR-tp25538.html>
>>>> Sent from the Apache Spark User List mailing list archive
>>>> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>>>>
>>>>
>>>>
>


Re: Very slow startup for jobs containing millions of tasks

2015-11-14 Thread Jerry Lam
Hi Ted, 

That looks exactly what happens. It has been 5 hrs now. The code was built for 
1.4. Thank you very much! 

Best Regards,

Jerry

Sent from my iPhone

> On 14 Nov, 2015, at 11:21 pm, Ted Yu <yuzhih...@gmail.com> wrote:
> 
> Which release are you using ?
> If older than 1.5.0, you miss some fixes such as SPARK-9952
> 
> Cheers
> 
>> On Sat, Nov 14, 2015 at 6:35 PM, Jerry Lam <chiling...@gmail.com> wrote:
>> Hi spark users and developers,
>> 
>> Have anyone experience the slow startup of a job when it contains a stage 
>> with over 4 millions of tasks? 
>> The job has been pending for 1.4 hours without doing anything (please refer 
>> to the attached pictures). However, the driver is busy doing something. 
>> jstack the driver and I found the following relevant:
>> 
>> ```
>> "dag-scheduler-event-loop" daemon prio=10 tid=0x7f24a8c59800 nid=0x454 
>> runnable [0x7f23b3e29000]
>>java.lang.Thread.State: RUNNABLE
>> at 
>> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:231)
>> at 
>> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:231)
>> at scala.Option.getOrElse(Option.scala:120)
>> at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:230)
>> at 
>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1399)
>> at 
>> org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1373)
>> at 
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:911)
>> at 
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:910)
>> at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> at 
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>> at 
>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:910)
>> at 
>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:834)
>> at 
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:837)
>> at 
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:836)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at 
>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:836)
>> at 
>> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:818)
>> at 
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1453)
>> at 
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1445)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>> ```
>> 
>> It seems that it takes long time for the driver to create/schedule the DAG 
>> for that many tasks. Is there a way to speed it up? 
>> 
>> Best Regards,
>> 
>> Jerry
>> 
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
> 


Re: [Spark-SQL]: Disable HiveContext from instantiating in spark-shell

2015-11-06 Thread Jerry Lam
Hi Zhan,

Thank you for providing a workaround! 
I will try this out but I agree with Ted, there should be a better way to 
capture the exception and handle it by just initializing SQLContext instead of 
HiveContext. WARN the user that something is wrong with his hive setup. 

Having spark.sql.hive.enabled false configuration would be lovely too. :)
Just an additional bonus is that it requires less memory if we don’t use 
HiveContext on the driver side (~100-200MB) from a rough observation. 

Thanks and have a nice weekend!

Jerry


> On Nov 6, 2015, at 5:53 PM, Ted Yu <yuzhih...@gmail.com> wrote:
> 
> I would suggest adding a config parameter that allows bypassing 
> initialization of HiveContext in case of SQLException
> 
> Cheers
> 
> On Fri, Nov 6, 2015 at 2:50 PM, Zhan Zhang <zzh...@hortonworks.com 
> <mailto:zzh...@hortonworks.com>> wrote:
> Hi Jerry,
> 
> OK. Here is an ugly walk around.
> 
> Put a hive-site.xml under $SPARK_HOME/conf with invalid content. You will get 
> a bunch of exceptions because hive context initialization failure, but you 
> can initialize your SQLContext on your own.
> 
> scala>  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
> sqlContext: org.apache.spark.sql.SQLContext = 
> org.apache.spark.sql.SQLContext@4a5cc2e8
> 
> scala> import sqlContext.implicits._
> import sqlContext.implicits._
> 
> 
> for example
> 
> HW11188:spark zzhang$ more conf/hive-site.xml
> 
> 
>  
> 
>
> 
>   hive.metastore.uris
> thrift://zzhang-yarn11:9083 <>
> 
>
> 
>  
> HW11188:spark zzhang$
> 
> By the way, I don’t know whether there is any caveat for this walk around.
> 
> Thanks.
> 
> Zhan Zhang
> 
> 
> 
> 
> 
> On Nov 6, 2015, at 2:40 PM, Jerry Lam <chiling...@gmail.com 
> <mailto:chiling...@gmail.com>> wrote:
> 
>> Hi Zhan,
>> 
>> I don’t use HiveContext features at all. I use mostly DataFrame API. It is 
>> sexier and much less typo. :)
>> Also, HiveContext requires metastore database setup (derby by default). The 
>> problem is that I cannot have 2 spark-shell sessions running at the same 
>> time in the same host (e.g. /home/jerry directory). It will give me an 
>> exception like below. 
>> 
>> Since I don’t use HiveContext, I don’t see the need to maintain a database. 
>> 
>> What is interesting is that pyspark shell is able to start more than 1 
>> session at the same time. I wonder what pyspark has done better than 
>> spark-shell?
>> 
>> Best Regards,
>> 
>> Jerry
>> 
>>> On Nov 6, 2015, at 5:28 PM, Zhan Zhang <zzh...@hortonworks.com 
>>> <mailto:zzh...@hortonworks.com>> wrote:
>>> 
>>> If you assembly jar have hive jar included, the HiveContext will be used. 
>>> Typically, HiveContext has more functionality than SQLContext. In what case 
>>> you have to use SQLContext that cannot be done by HiveContext?
>>> 
>>> Thanks.
>>> 
>>> Zhan Zhang
>>> 
>>> On Nov 6, 2015, at 10:43 AM, Jerry Lam <chiling...@gmail.com 
>>> <mailto:chiling...@gmail.com>> wrote:
>>> 
>>>> What is interesting is that pyspark shell works fine with multiple session 
>>>> in the same host even though multiple HiveContext has been created. What 
>>>> does pyspark does differently in terms of starting up the shell?
>>>> 
>>>>> On Nov 6, 2015, at 12:12 PM, Ted Yu <yuzhih...@gmail.com 
>>>>> <mailto:yuzhih...@gmail.com>> wrote:
>>>>> 
>>>>> In SQLContext.scala :
>>>>> // After we have populated SQLConf, we call setConf to populate other 
>>>>> confs in the subclass
>>>>> // (e.g. hiveconf in HiveContext).
>>>>> properties.foreach {
>>>>>   case (key, value) => setConf(key, value)
>>>>> }
>>>>> 
>>>>> I don't see config of skipping the above call.
>>>>> 
>>>>> FYI
>>>>> 
>>>>> On Fri, Nov 6, 2015 at 8:53 AM, Jerry Lam <chiling...@gmail.com 
>>>>> <mailto:chiling...@gmail.com>> wrote:
>>>>> Hi spark users and developers,
>>>>> 
>>>>> Is it possible to disable HiveContext from being instantiated when using 
>>>>> spark-shell? I got the following errors when I have more than one session 
>>>>> starts. Since I don't use HiveContext, it would be great if I ca

Re: [Spark-SQL]: Disable HiveContext from instantiating in spark-shell

2015-11-06 Thread Jerry Lam
Hi Zhan,

I don’t use HiveContext features at all. I use mostly DataFrame API. It is 
sexier and much less typo. :)
Also, HiveContext requires metastore database setup (derby by default). The 
problem is that I cannot have 2 spark-shell sessions running at the same time 
in the same host (e.g. /home/jerry directory). It will give me an exception 
like below. 

Since I don’t use HiveContext, I don’t see the need to maintain a database. 

What is interesting is that pyspark shell is able to start more than 1 session 
at the same time. I wonder what pyspark has done better than spark-shell?

Best Regards,

Jerry

> On Nov 6, 2015, at 5:28 PM, Zhan Zhang <zzh...@hortonworks.com> wrote:
> 
> If you assembly jar have hive jar included, the HiveContext will be used. 
> Typically, HiveContext has more functionality than SQLContext. In what case 
> you have to use SQLContext that cannot be done by HiveContext?
> 
> Thanks.
> 
> Zhan Zhang
> 
> On Nov 6, 2015, at 10:43 AM, Jerry Lam <chiling...@gmail.com 
> <mailto:chiling...@gmail.com>> wrote:
> 
>> What is interesting is that pyspark shell works fine with multiple session 
>> in the same host even though multiple HiveContext has been created. What 
>> does pyspark does differently in terms of starting up the shell?
>> 
>>> On Nov 6, 2015, at 12:12 PM, Ted Yu <yuzhih...@gmail.com 
>>> <mailto:yuzhih...@gmail.com>> wrote:
>>> 
>>> In SQLContext.scala :
>>> // After we have populated SQLConf, we call setConf to populate other 
>>> confs in the subclass
>>> // (e.g. hiveconf in HiveContext).
>>> properties.foreach {
>>>   case (key, value) => setConf(key, value)
>>> }
>>> 
>>> I don't see config of skipping the above call.
>>> 
>>> FYI
>>> 
>>> On Fri, Nov 6, 2015 at 8:53 AM, Jerry Lam <chiling...@gmail.com 
>>> <mailto:chiling...@gmail.com>> wrote:
>>> Hi spark users and developers,
>>> 
>>> Is it possible to disable HiveContext from being instantiated when using 
>>> spark-shell? I got the following errors when I have more than one session 
>>> starts. Since I don't use HiveContext, it would be great if I can have more 
>>> than 1 spark-shell start at the same time. 
>>> 
>>> Exception in thread "main" java.lang.RuntimeException: 
>>> java.lang.RuntimeException: Unable to instantiate 
>>> org.apache.hadoop.hive.ql.metadata.SessionHiveMetaS
>>> toreClient
>>> at 
>>> org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)
>>> at 
>>> org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.scala:171)
>>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native 
>>> Method)
>>> at 
>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>>> at 
>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>>> at 
>>> org.apache.spark.sql.hive.client.IsolatedClientLoader.liftedTree1$1(IsolatedClientLoader.scala:183)
>>> at 
>>> org.apache.spark.sql.hive.client.IsolatedClientLoader.(IsolatedClientLoader.scala:179)
>>> at 
>>> org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:226)
>>> at 
>>> org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:185)
>>> at 
>>> org.apache.spark.sql.hive.HiveContext.setConf(HiveContext.scala:392)
>>> at 
>>> org.apache.spark.sql.SQLContext$$anonfun$5.apply(SQLContext.scala:235)
>>> at 
>>> org.apache.spark.sql.SQLContext$$anonfun$5.apply(SQLContext.scala:234)
>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>> at 
>>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>> at org.apache.spark.sql.SQLContext.(SQLContext.scala:234)
>>> at 
>>> org.apache.spark.sql.hive.HiveContext.(HiveContext.scala:72)
>>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native 
>>> Method)
>>> at 
>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstruct

Re: [Spark-SQL]: Disable HiveContext from instantiating in spark-shell

2015-11-06 Thread Jerry Lam
Hi Ted,

I was trying to set spark.sql.dialect to sql as to specify I only need 
“SQLContext” not HiveContext. It didn’t work. It still instantiate HiveContext. 
Since I don’t use HiveContext and I don’t want to start a mysql database 
because I want to have more than 1 session of spark-shell simultaneously. Is 
there an easy way to get around it? More exception here:

Caused by: java.sql.SQLException: Unable to open a test connection to the given 
database. JDBC url = jdbc:derby:;databaseName=metastore_db;create=true, 
username = APP. T
erminating connection pool (set lazyInit to true if you expect to start your 
database after your app). Original Exception: --^M
java.sql.SQLException: Failed to start database 'metastore_db' with class 
loader org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@53a39109, 
see the next exc
eption for details.
at 
org.apache.derby.impl.jdbc.SQLExceptionFactory40.getSQLException(Unknown Source)
at org.apache.derby.impl.jdbc.Util.newEmbedSQLException(Unknown Source)
at org.apache.derby.impl.jdbc.Util.seeNextException(Unknown Source)
at org.apache.derby.impl.jdbc.EmbedConnection.bootDatabase(Unknown 
Source)
at org.apache.derby.impl.jdbc.EmbedConnection.(Unknown Source)
at org.apache.derby.impl.jdbc.EmbedConnection40.(Unknown Source)
at org.apache.derby.jdbc.Driver40.getNewEmbedConnection(Unknown Source)
at org.apache.derby.jdbc.InternalDriver.connect(Unknown Source)
at org.apache.derby.jdbc.Driver20.connect(Unknown Source)
at org.apache.derby.jdbc.AutoloadedDriver.connect(Unknown Source)
at java.sql.DriverManager.getConnection(DriverManager.java:571)

Best Regards,

Jerry

> On Nov 6, 2015, at 12:12 PM, Ted Yu <yuzhih...@gmail.com> wrote:
> 
> In SQLContext.scala :
> // After we have populated SQLConf, we call setConf to populate other 
> confs in the subclass
> // (e.g. hiveconf in HiveContext).
> properties.foreach {
>   case (key, value) => setConf(key, value)
> }
> 
> I don't see config of skipping the above call.
> 
> FYI
> 
> On Fri, Nov 6, 2015 at 8:53 AM, Jerry Lam <chiling...@gmail.com 
> <mailto:chiling...@gmail.com>> wrote:
> Hi spark users and developers,
> 
> Is it possible to disable HiveContext from being instantiated when using 
> spark-shell? I got the following errors when I have more than one session 
> starts. Since I don't use HiveContext, it would be great if I can have more 
> than 1 spark-shell start at the same time. 
> 
> Exception in thread "main" java.lang.RuntimeException: 
> java.lang.RuntimeException: Unable to instantiate 
> org.apache.hadoop.hive.ql.metadata.SessionHiveMetaS
> toreClient
> at 
> org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)
> at 
> org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.scala:171)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native 
> Method)
> at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at 
> org.apache.spark.sql.hive.client.IsolatedClientLoader.liftedTree1$1(IsolatedClientLoader.scala:183)
> at 
> org.apache.spark.sql.hive.client.IsolatedClientLoader.(IsolatedClientLoader.scala:179)
> at 
> org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:226)
> at 
> org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:185)
> at 
> org.apache.spark.sql.hive.HiveContext.setConf(HiveContext.scala:392)
> at 
> org.apache.spark.sql.SQLContext$$anonfun$5.apply(SQLContext.scala:235)
> at 
> org.apache.spark.sql.SQLContext$$anonfun$5.apply(SQLContext.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at org.apache.spark.sql.SQLContext.(SQLContext.scala:234)
> at org.apache.spark.sql.hive.HiveContext.(HiveContext.scala:72)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native 
> Method)
> at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.refl

[Spark-SQL]: Disable HiveContext from instantiating in spark-shell

2015-11-06 Thread Jerry Lam
Hi spark users and developers,

Is it possible to disable HiveContext from being instantiated when using
spark-shell? I got the following errors when I have more than one session
starts. Since I don't use HiveContext, it would be great if I can have more
than 1 spark-shell start at the same time.

Exception in thread "main" java.lang.RuntimeException:
java.lang.RuntimeException: Unable to instantiate
org.apache.hadoop.hive.ql.metadata.SessionHiveMetaS
toreClient
at
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)
at
org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.scala:171)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at
org.apache.spark.sql.hive.client.IsolatedClientLoader.liftedTree1$1(IsolatedClientLoader.scala:183)
at
org.apache.spark.sql.hive.client.IsolatedClientLoader.(IsolatedClientLoader.scala:179)
at
org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:226)
at
org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:185)
at
org.apache.spark.sql.hive.HiveContext.setConf(HiveContext.scala:392)
at
org.apache.spark.sql.SQLContext$$anonfun$5.apply(SQLContext.scala:235)
at
org.apache.spark.sql.SQLContext$$anonfun$5.apply(SQLContext.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.spark.sql.SQLContext.(SQLContext.scala:234)
at
org.apache.spark.sql.hive.HiveContext.(HiveContext.scala:72)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at
org.apache.spark.repl.SparkILoop.createSQLContext(SparkILoop.scala:1028)
at
org.apache.spark.repl.SparkILoopExt.importSpark(SparkILoopExt.scala:154)
at
org.apache.spark.repl.SparkILoopExt$$anonfun$process$1.apply$mcZ$sp(SparkILoopExt.scala:127)
at
org.apache.spark.repl.SparkILoopExt$$anonfun$process$1.apply(SparkILoopExt.scala:113)
at
org.apache.spark.repl.SparkILoopExt$$anonfun$process$1.apply(SparkILoopExt.scala:113)

Best Regards,

Jerry


Re: [Spark-SQL]: Disable HiveContext from instantiating in spark-shell

2015-11-06 Thread Jerry Lam
What is interesting is that pyspark shell works fine with multiple session in 
the same host even though multiple HiveContext has been created. What does 
pyspark does differently in terms of starting up the shell?

> On Nov 6, 2015, at 12:12 PM, Ted Yu <yuzhih...@gmail.com> wrote:
> 
> In SQLContext.scala :
> // After we have populated SQLConf, we call setConf to populate other 
> confs in the subclass
> // (e.g. hiveconf in HiveContext).
> properties.foreach {
>   case (key, value) => setConf(key, value)
> }
> 
> I don't see config of skipping the above call.
> 
> FYI
> 
> On Fri, Nov 6, 2015 at 8:53 AM, Jerry Lam <chiling...@gmail.com 
> <mailto:chiling...@gmail.com>> wrote:
> Hi spark users and developers,
> 
> Is it possible to disable HiveContext from being instantiated when using 
> spark-shell? I got the following errors when I have more than one session 
> starts. Since I don't use HiveContext, it would be great if I can have more 
> than 1 spark-shell start at the same time. 
> 
> Exception in thread "main" java.lang.RuntimeException: 
> java.lang.RuntimeException: Unable to instantiate 
> org.apache.hadoop.hive.ql.metadata.SessionHiveMetaS
> toreClient
> at 
> org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)
> at 
> org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.scala:171)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native 
> Method)
> at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at 
> org.apache.spark.sql.hive.client.IsolatedClientLoader.liftedTree1$1(IsolatedClientLoader.scala:183)
> at 
> org.apache.spark.sql.hive.client.IsolatedClientLoader.(IsolatedClientLoader.scala:179)
> at 
> org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:226)
> at 
> org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:185)
> at 
> org.apache.spark.sql.hive.HiveContext.setConf(HiveContext.scala:392)
> at 
> org.apache.spark.sql.SQLContext$$anonfun$5.apply(SQLContext.scala:235)
> at 
> org.apache.spark.sql.SQLContext$$anonfun$5.apply(SQLContext.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at org.apache.spark.sql.SQLContext.(SQLContext.scala:234)
> at org.apache.spark.sql.hive.HiveContext.(HiveContext.scala:72)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native 
> Method)
> at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at 
> org.apache.spark.repl.SparkILoop.createSQLContext(SparkILoop.scala:1028)
> at 
> org.apache.spark.repl.SparkILoopExt.importSpark(SparkILoopExt.scala:154)
> at 
> org.apache.spark.repl.SparkILoopExt$$anonfun$process$1.apply$mcZ$sp(SparkILoopExt.scala:127)
> at 
> org.apache.spark.repl.SparkILoopExt$$anonfun$process$1.apply(SparkILoopExt.scala:113)
> at 
> org.apache.spark.repl.SparkILoopExt$$anonfun$process$1.apply(SparkILoopExt.scala:113)
> 
> Best Regards,
> 
> Jerry
> 



Re: Spark EC2 script on Large clusters

2015-11-05 Thread Jerry Lam
Does Qubole use Yarn or Mesos for resource management?

Sent from my iPhone

> On 5 Nov, 2015, at 9:02 pm, Sabarish Sasidharan 
>  wrote:
> 
> Qubole

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Please reply if you use Mesos fine grained mode

2015-11-03 Thread Jerry Lam
We "used" Spark on Mesos to build interactive data analysis platform
because the interactive session could be long and might not use Spark for
the entire session. It is very wasteful of resources if we used the
coarse-grained mode because it keeps resource for the entire session.
Therefore, fine-grained mode was used.

Knowing that Spark now supports dynamic resource allocation with coarse
grained mode, we were thinking about using it. However, we decided to
switch to Yarn because in addition to dynamic allocation, it has better
supports on security.

On Tue, Nov 3, 2015 at 7:22 PM, Soren Macbeth  wrote:

> we use fine-grained mode. coarse-grained mode keeps JVMs around which
> often leads to OOMs, which in turn kill the entire executor, causing entire
> stages to be retried. In fine-grained mode, only the task fails and
> subsequently gets retried without taking out an entire stage or worse.
>
> On Tue, Nov 3, 2015 at 3:54 PM, Reynold Xin  wrote:
>
>> If you are using Spark with Mesos fine grained mode, can you please
>> respond to this email explaining why you use it over the coarse grained
>> mode?
>>
>> Thanks.
>>
>>
>


Re: spark sql partitioned by date... read last date

2015-11-01 Thread Jerry Lam
I agreed the max date will satisfy the latest date requirement but it does
not satisfy the second last date requirement you mentioned.

Just for your information, before you invested in the partitioned table too
much, I want to warn you that it has memory issues (both on executors and
driver side). A simple experiment can show that if you have over 10 years
of date (3650 directories), it takes a long time to initialize. I got to
know the limitation after I tried to partition user events per their
user_id. It was a disaster (>1 user_id).

I hope the spark developer can address the memory limitations because
partitioned table is very useful in many cases.

Cheers ~



On Sun, Nov 1, 2015 at 4:39 PM, Koert Kuipers <ko...@tresata.com> wrote:

> i was going for the distinct approach, since i want it to be general
> enough to also solve other related problems later. the max-date is likely
> to be faster though.
>
> On Sun, Nov 1, 2015 at 4:36 PM, Jerry Lam <chiling...@gmail.com> wrote:
>
>> Hi Koert,
>>
>> You should be able to see if it requires scanning the whole data by
>> "explain" the query. The physical plan should say something about it. I
>> wonder if you are trying the distinct-sort-by-limit approach or the
>> max-date approach?
>>
>> Best Regards,
>>
>> Jerry
>>
>>
>> On Sun, Nov 1, 2015 at 4:25 PM, Koert Kuipers <ko...@tresata.com> wrote:
>>
>>> it seems pretty fast, but if i have 2 partitions and 10mm records i do
>>> have to dedupe (distinct) 10mm records
>>>
>>> a direct way to just find out what the 2 partitions are would be much
>>> faster. spark knows it, but its not exposed.
>>>
>>> On Sun, Nov 1, 2015 at 4:08 PM, Koert Kuipers <ko...@tresata.com> wrote:
>>>
>>>> it seems to work but i am not sure if its not scanning the whole
>>>> dataset. let me dig into tasks a a bit
>>>>
>>>> On Sun, Nov 1, 2015 at 3:18 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>>>
>>>>> Hi Koert,
>>>>>
>>>>> If the partitioned table is implemented properly, I would think
>>>>> "select distinct(date) as dt from table order by dt DESC limit 1" would
>>>>> return the latest dates without scanning the whole dataset. I haven't try
>>>>> it that myself. It would be great if you can report back if this actually
>>>>> works or not. :)
>>>>>
>>>>> Best Regards,
>>>>>
>>>>> Jerry
>>>>>
>>>>>
>>>>> On Sun, Nov 1, 2015 at 3:03 PM, Koert Kuipers <ko...@tresata.com>
>>>>> wrote:
>>>>>
>>>>>> hello all,
>>>>>> i am trying to get familiar with spark sql partitioning support.
>>>>>>
>>>>>> my data is partitioned by date, so like this:
>>>>>> data/date=2015-01-01
>>>>>> data/date=2015-01-02
>>>>>> data/date=2015-01-03
>>>>>> ...
>>>>>>
>>>>>> lets say i would like a batch process to read data for the latest
>>>>>> date only. how do i proceed?
>>>>>> generally the latest date will be yesterday, but it could be a day
>>>>>> older or maybe 2.
>>>>>>
>>>>>> i understand that i will have to do something like:
>>>>>> df.filter(df("date") === some_date_string_here)
>>>>>>
>>>>>> however i do now know what some_date_string_here should be. i would
>>>>>> like to inspect the available dates and pick the latest. is there an
>>>>>> efficient way to  find out what the available partitions are?
>>>>>>
>>>>>> thanks! koert
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: spark sql partitioned by date... read last date

2015-11-01 Thread Jerry Lam
Hi Koert,

You should be able to see if it requires scanning the whole data by
"explain" the query. The physical plan should say something about it. I
wonder if you are trying the distinct-sort-by-limit approach or the
max-date approach?

Best Regards,

Jerry


On Sun, Nov 1, 2015 at 4:25 PM, Koert Kuipers <ko...@tresata.com> wrote:

> it seems pretty fast, but if i have 2 partitions and 10mm records i do
> have to dedupe (distinct) 10mm records
>
> a direct way to just find out what the 2 partitions are would be much
> faster. spark knows it, but its not exposed.
>
> On Sun, Nov 1, 2015 at 4:08 PM, Koert Kuipers <ko...@tresata.com> wrote:
>
>> it seems to work but i am not sure if its not scanning the whole dataset.
>> let me dig into tasks a a bit
>>
>> On Sun, Nov 1, 2015 at 3:18 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>
>>> Hi Koert,
>>>
>>> If the partitioned table is implemented properly, I would think "select
>>> distinct(date) as dt from table order by dt DESC limit 1" would return the
>>> latest dates without scanning the whole dataset. I haven't try it that
>>> myself. It would be great if you can report back if this actually works or
>>> not. :)
>>>
>>> Best Regards,
>>>
>>> Jerry
>>>
>>>
>>> On Sun, Nov 1, 2015 at 3:03 PM, Koert Kuipers <ko...@tresata.com> wrote:
>>>
>>>> hello all,
>>>> i am trying to get familiar with spark sql partitioning support.
>>>>
>>>> my data is partitioned by date, so like this:
>>>> data/date=2015-01-01
>>>> data/date=2015-01-02
>>>> data/date=2015-01-03
>>>> ...
>>>>
>>>> lets say i would like a batch process to read data for the latest date
>>>> only. how do i proceed?
>>>> generally the latest date will be yesterday, but it could be a day
>>>> older or maybe 2.
>>>>
>>>> i understand that i will have to do something like:
>>>> df.filter(df("date") === some_date_string_here)
>>>>
>>>> however i do now know what some_date_string_here should be. i would
>>>> like to inspect the available dates and pick the latest. is there an
>>>> efficient way to  find out what the available partitions are?
>>>>
>>>> thanks! koert
>>>>
>>>>
>>>>
>>>
>>
>


Re: spark sql partitioned by date... read last date

2015-11-01 Thread Jerry Lam
Hi Koert,

the physical plan looks like it is doing the right thing:

partitioned table hdfs://user/koert/test, read date from the directory
names, hash partitioned and agg the date to find distinct date. Finally
shuffle the dates for sort and limit 1 operations.

This is my understanding of the physical plan, you can navigate the actual
execution in the web UI to see how much data is actually read to satisfy
this request. I hope it only requires a few bytes for few dates.

Best Regards,

Jerry


On Sun, Nov 1, 2015 at 5:56 PM, Jerry Lam <chiling...@gmail.com> wrote:

> I agreed the max date will satisfy the latest date requirement but it does
> not satisfy the second last date requirement you mentioned.
>
> Just for your information, before you invested in the partitioned table
> too much, I want to warn you that it has memory issues (both on executors
> and driver side). A simple experiment can show that if you have over 10
> years of date (3650 directories), it takes a long time to initialize. I got
> to know the limitation after I tried to partition user events per their
> user_id. It was a disaster (>1 user_id).
>
> I hope the spark developer can address the memory limitations because
> partitioned table is very useful in many cases.
>
> Cheers ~
>
>
>
> On Sun, Nov 1, 2015 at 4:39 PM, Koert Kuipers <ko...@tresata.com> wrote:
>
>> i was going for the distinct approach, since i want it to be general
>> enough to also solve other related problems later. the max-date is likely
>> to be faster though.
>>
>> On Sun, Nov 1, 2015 at 4:36 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>
>>> Hi Koert,
>>>
>>> You should be able to see if it requires scanning the whole data by
>>> "explain" the query. The physical plan should say something about it. I
>>> wonder if you are trying the distinct-sort-by-limit approach or the
>>> max-date approach?
>>>
>>> Best Regards,
>>>
>>> Jerry
>>>
>>>
>>> On Sun, Nov 1, 2015 at 4:25 PM, Koert Kuipers <ko...@tresata.com> wrote:
>>>
>>>> it seems pretty fast, but if i have 2 partitions and 10mm records i do
>>>> have to dedupe (distinct) 10mm records
>>>>
>>>> a direct way to just find out what the 2 partitions are would be much
>>>> faster. spark knows it, but its not exposed.
>>>>
>>>> On Sun, Nov 1, 2015 at 4:08 PM, Koert Kuipers <ko...@tresata.com>
>>>> wrote:
>>>>
>>>>> it seems to work but i am not sure if its not scanning the whole
>>>>> dataset. let me dig into tasks a a bit
>>>>>
>>>>> On Sun, Nov 1, 2015 at 3:18 PM, Jerry Lam <chiling...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Koert,
>>>>>>
>>>>>> If the partitioned table is implemented properly, I would think
>>>>>> "select distinct(date) as dt from table order by dt DESC limit 1" would
>>>>>> return the latest dates without scanning the whole dataset. I haven't try
>>>>>> it that myself. It would be great if you can report back if this actually
>>>>>> works or not. :)
>>>>>>
>>>>>> Best Regards,
>>>>>>
>>>>>> Jerry
>>>>>>
>>>>>>
>>>>>> On Sun, Nov 1, 2015 at 3:03 PM, Koert Kuipers <ko...@tresata.com>
>>>>>> wrote:
>>>>>>
>>>>>>> hello all,
>>>>>>> i am trying to get familiar with spark sql partitioning support.
>>>>>>>
>>>>>>> my data is partitioned by date, so like this:
>>>>>>> data/date=2015-01-01
>>>>>>> data/date=2015-01-02
>>>>>>> data/date=2015-01-03
>>>>>>> ...
>>>>>>>
>>>>>>> lets say i would like a batch process to read data for the latest
>>>>>>> date only. how do i proceed?
>>>>>>> generally the latest date will be yesterday, but it could be a day
>>>>>>> older or maybe 2.
>>>>>>>
>>>>>>> i understand that i will have to do something like:
>>>>>>> df.filter(df("date") === some_date_string_here)
>>>>>>>
>>>>>>> however i do now know what some_date_string_here should be. i would
>>>>>>> like to inspect the available dates and pick the latest. is there an
>>>>>>> efficient way to  find out what the available partitions are?
>>>>>>>
>>>>>>> thanks! koert
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: spark sql partitioned by date... read last date

2015-11-01 Thread Jerry Lam
Hi Koert,

If the partitioned table is implemented properly, I would think "select
distinct(date) as dt from table order by dt DESC limit 1" would return the
latest dates without scanning the whole dataset. I haven't try it that
myself. It would be great if you can report back if this actually works or
not. :)

Best Regards,

Jerry


On Sun, Nov 1, 2015 at 3:03 PM, Koert Kuipers <ko...@tresata.com> wrote:

> hello all,
> i am trying to get familiar with spark sql partitioning support.
>
> my data is partitioned by date, so like this:
> data/date=2015-01-01
> data/date=2015-01-02
> data/date=2015-01-03
> ...
>
> lets say i would like a batch process to read data for the latest date
> only. how do i proceed?
> generally the latest date will be yesterday, but it could be a day older
> or maybe 2.
>
> i understand that i will have to do something like:
> df.filter(df("date") === some_date_string_here)
>
> however i do now know what some_date_string_here should be. i would like
> to inspect the available dates and pick the latest. is there an efficient
> way to  find out what the available partitions are?
>
> thanks! koert
>
>
>


Exception in thread "main" java.lang.IllegalArgumentException: Positive number of slices required

2015-10-29 Thread Jerry Wong
I used the spark 1.3.1 to populate the event logs to Cassandra. But there
is an exception that I could not find out any clauses. Can anybody give me
any helps?

Exception in thread "main" java.lang.IllegalArgumentException: Positive
number of slices required
 at
org.apache.spark.rdd.ParallelCollectionRDD$.slice(ParallelCollectionRDD.scala:119)
 at
org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(ParallelCollectionRDD.scala:97)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:220)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:1367)
 at org.apache.spark.rdd.RDD.collect(RDD.scala:797)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7$$anonfun$apply$1$$anonfun$apply$mcV$sp$2$$anonfun$apply$2$$anonfun$apply$mcV$sp$4$$anonfun$apply$3.apply$mcV$sp(EventLogClusterIngestor.scala:155)
 at scala.util.control.Breaks.breakable(Breaks.scala:37)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7$$anonfun$apply$1$$anonfun$apply$mcV$sp$2$$anonfun$apply$2$$anonfun$apply$mcV$sp$4.apply(EventLogClusterIngestor.scala:145)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7$$anonfun$apply$1$$anonfun$apply$mcV$sp$2$$anonfun$apply$2$$anonfun$apply$mcV$sp$4.apply(EventLogClusterIngestor.scala:144)
 at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7$$anonfun$apply$1$$anonfun$apply$mcV$sp$2$$anonfun$apply$2.apply$mcV$sp(EventLogClusterIngestor.scala:144)
 at scala.util.control.Breaks.breakable(Breaks.scala:37)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7$$anonfun$apply$1$$anonfun$apply$mcV$sp$2.apply(EventLogClusterIngestor.scala:139)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7$$anonfun$apply$1$$anonfun$apply$mcV$sp$2.apply(EventLogClusterIngestor.scala:132)
 at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7$$anonfun$apply$1.apply$mcV$sp(EventLogClusterIngestor.scala:132)
 at scala.util.control.Breaks.breakable(Breaks.scala:37)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7.apply(EventLogClusterIngestor.scala:125)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7.apply(EventLogClusterIngestor.scala:115)
 at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1.apply(EventLogClusterIngestor.scala:115)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1.apply(EventLogClusterIngestor.scala:107)
 at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$.processEventLogMapStreamDiff2(EventLogClusterIngestor.scala:107)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$.main(EventLogClusterIngestor.scala:573)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor.main(EventLogClusterIngestor.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

This does happen in a RDD foreach. I pasted the broken lines as follows:

132 difFileRDD.filter(a=>a.size>0).collect().foreach(file => {
. //...
135val lines = sc.textFile("file:///" + file)
136val elogs = lines.flatMap(_.split("\n"))
137val numOfel = elogs.count()
138 //...
139breakable {
140if(numOfel <= 0) {
141 //..
142  break
143}else{
144 

Re: Spark -- Writing to Partitioned Persistent Table

2015-10-28 Thread Jerry Lam
Hi Bryan,

Did you read the email I sent few days ago. There are more issues with 
partitionBy down the road: 
https://www.mail-archive.com/user@spark.apache.org/msg39512.html 
<https://www.mail-archive.com/user@spark.apache.org/msg39512.html>

Best Regards,

Jerry

> On Oct 28, 2015, at 4:52 PM, Bryan Jeffrey <bryan.jeff...@gmail.com> wrote:
> 
> The second issue I'm seeing is an OOM issue when writing partitioned data.  I 
> am running Spark 1.4.1, Scala 2.11, Hadoop 2.6.1 & using the Hive libraries 
> packaged with Spark.  Spark was compiled using the following:  mvn 
> -Dhadoop.version=2.6.1 -Dscala-2.11 -DskipTests -Pyarn -Phive 
> -Phive-thriftserver package
> 
> Given a case class like the following:
> 
> case class HiveWindowsEvent(
>  targetEntity: String,
>  targetEntityType: String,
>  dateTimeUtc: Timestamp,
>  eventid: String,
>  eventData: Map[String, String],
>  description: String,
>  eventRecordId: String,
>  level: String,
>  machineName: String,
>  sequenceNumber: String,
>  source: String,
>  sourceMachineName: String,
>  taskCategory: String,
>  user: String,
>  machineIp: String,
>  additionalData: Map[String, String],
>  windowseventtimebin: Long
>  )
> 
> The command to write data works fine (and when queried via Beeline data is 
> correct):
> 
> val hc = new HiveContext(sc)
> import hc.implicits._
> 
> val partitioner = new HashPartitioner(5)
> hiveWindowsEvents.foreachRDD(rdd => {
>   val eventsDF = rdd.toDF()
>   eventsDF
> .write
> .mode(SaveMode.Append).saveAsTable("windows_event9")
> })
> 
> Once I add the partitioning (few partitions - three or less):
> 
> val hc = new HiveContext(sc)
> import hc.implicits._
> 
> val partitioner = new HashPartitioner(5)
> hiveWindowsEvents.foreachRDD(rdd => {
>   val eventsDF = rdd.toDF()
>   eventsDF
> .write
> .partitionBy("windowseventtimebin")
> .mode(SaveMode.Append).saveAsTable("windows_event9")
> })
> 
> I see the following error when writing to (3) partitions:
> 
> 15/10/28 20:23:01 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, 
> 10.0.0.6): org.apache.spark.SparkException: Task failed while writing rows.
> at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org 
> <http://org.apache.spark.sql.sources.insertintohadoopfsrelation.org/>$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:270)
> at 
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
> at 
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.OutOfMemoryError: Java heap space
> at 
> parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65)
> at 
> parquet.bytes.CapacityByteArrayOutputStream.(CapacityByteArrayOutputStream.java:57)
> at 
> parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.(ColumnChunkPageWriteStore.java:68)
> at 
> parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.(ColumnChunkPageWriteStore.java:48)
> at 
> parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215)
> at 
> parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67)
> at 
> parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56)
> at 
> parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.(MessageColumnIO.java:178)
> at 
> parque

Re: Spark -- Writing to Partitioned Persistent Table

2015-10-28 Thread Jerry Lam
Hi Bryan,

I think they fixed some memory issues in 1.4 for the partition table 
implementation. 1.5 does much better in terms of executor memory usage for 
generating partition tables. However, if your table has over some thousand of 
partitions, reading the partition could be challenging. it takes awhile to 
initialize the partition table and it requires a lot of memory from the driver. 
I would not use it if the number of partition go over a few hundreds. 

Hope this help,

Jerry

Sent from my iPhone

> On 28 Oct, 2015, at 6:33 pm, Bryan <bryan.jeff...@gmail.com> wrote:
> 
> Jerry,
> 
> Thank you for the note. It sounds like you were able to get further than I 
> have been - any insight? Just a Spark 1.4.1 vs Spark 1.5?
> 
> Regards,
> 
> Bryan Jeffrey
> From: Jerry Lam
> Sent: ‎10/‎28/‎2015 6:29 PM
> To: Bryan Jeffrey
> Cc: Susan Zhang; user
> Subject: Re: Spark -- Writing to Partitioned Persistent Table
> 
> Hi Bryan,
> 
> Did you read the email I sent few days ago. There are more issues with 
> partitionBy down the road: 
> https://www.mail-archive.com/user@spark.apache.org/msg39512.html
> 
> Best Regards,
> 
> Jerry
> 
>> On Oct 28, 2015, at 4:52 PM, Bryan Jeffrey <bryan.jeff...@gmail.com> wrote:
>> 
>> The second issue I'm seeing is an OOM issue when writing partitioned data.  
>> I am running Spark 1.4.1, Scala 2.11, Hadoop 2.6.1 & using the Hive 
>> libraries packaged with Spark.  Spark was compiled using the following:  mvn 
>> -Dhadoop.version=2.6.1 -Dscala-2.11 -DskipTests -Pyarn -Phive 
>> -Phive-thriftserver package
>> 
>> Given a case class like the following:
>> 
>> case class HiveWindowsEvent(
>>  targetEntity: String,
>>  targetEntityType: String,
>>  dateTimeUtc: Timestamp,
>>  eventid: String,
>>  eventData: Map[String, String],
>>  description: String,
>>  eventRecordId: String,
>>  level: String,
>>  machineName: String,
>>  sequenceNumber: String,
>>  source: String,
>>  sourceMachineName: String,
>>  taskCategory: String,
>>  user: String,
>>  machineIp: String,
>>  additionalData: Map[String, String],
>>  windowseventtimebin: Long
>>  )
>> 
>> The command to write data works fine (and when queried via Beeline data is 
>> correct):
>> 
>> val hc = new HiveContext(sc)
>> import hc.implicits._
>> 
>> val partitioner = new HashPartitioner(5)
>> hiveWindowsEvents.foreachRDD(rdd => {
>>   val eventsDF = rdd.toDF()
>>   eventsDF
>> .write
>> .mode(SaveMode.Append).saveAsTable("windows_event9")
>> })
>> 
>> Once I add the partitioning (few partitions - three or less):
>> 
>> val hc = new HiveContext(sc)
>> import hc.implicits._
>> 
>> val partitioner = new HashPartitioner(5)
>> hiveWindowsEvents.foreachRDD(rdd => {
>>   val eventsDF = rdd.toDF()
>>   eventsDF
>> .write
>> .partitionBy("windowseventtimebin")
>> .mode(SaveMode.Append).saveAsTable("windows_event9")
>> })
>> 
>> I see the following error when writing to (3) partitions:
>> 
>> 15/10/28 20:23:01 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, 
>> 10.0.0.6): org.apache.spark.SparkException: Task failed while writing rows.
>> at 
>> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:270)
>> at 
>> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
>> at 
>> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>> at 
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>> at 
>> java.util.concurrent.ThreadPoolExecutor.ru

[Spark-SQL]: Unable to propagate hadoop configuration after SparkContext is initialized

2015-10-27 Thread Jerry Lam
Hi Spark users and developers,

Anyone experiences issues in setting hadoop configurations after
SparkContext is initialized? I'm using Spark 1.5.1.

I'm trying to use s3a which requires access and secret key set into hadoop
configuration. I tried to set the properties in the hadoop configuration
from sparktcontext.

sc.hadoopConfiguration.set("fs.s3a.access.key", AWSAccessKeyId)
sc.hadoopConfiguration.set("fs.s3a.secret.key", AWSSecretKey)

val sqlContext = new SQLContext(sc)
val df = sqlContext.read.parquet("s3a://parquetfiles")

So far so good, I saw a job has been submitted to get the parquet schema
and it returns successfully.

and then I tried to do:

df.count

This failed with AmazonClientException:

com.amazonaws.AmazonClientException: Unable to load AWS credentials from
any provider in the chain
at
com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521)
at
com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
at
com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2596)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:384)
at
org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:157)
at
org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)
at
org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.(SqlNewHadoopRDD.scala:155)
at org.apache.spark.rdd.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:120)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Any idea why it can read the schema from the parquet file but not
processing the file? It feels like the hadoop configuration is not sent to
the executor for some reasons...

Thanks,

Jerry


Re: [Spark-SQL]: Unable to propagate hadoop configuration after SparkContext is initialized

2015-10-27 Thread Jerry Lam
Hi Marcelo,

Thanks for the advice. I understand that we could set the configurations
before creating SparkContext. My question is
SparkContext.hadoopConfiguration.set("key","value") doesn't seem to
propagate to all subsequent SQLContext jobs. Note that I mentioned I can
load the parquet file but I cannot perform a count on the parquet file
because of the AmazonClientException. It means that the credential is used
during the loading of the parquet but not when we are processing the
parquet file. How this can happen?

Best Regards,

Jerry


On Tue, Oct 27, 2015 at 2:05 PM, Marcelo Vanzin <van...@cloudera.com> wrote:

> On Tue, Oct 27, 2015 at 10:43 AM, Jerry Lam <chiling...@gmail.com> wrote:
> > Anyone experiences issues in setting hadoop configurations after
> > SparkContext is initialized? I'm using Spark 1.5.1.
> >
> > I'm trying to use s3a which requires access and secret key set into
> hadoop
> > configuration. I tried to set the properties in the hadoop configuration
> > from sparktcontext.
> >
> > sc.hadoopConfiguration.set("fs.s3a.access.key", AWSAccessKeyId)
> > sc.hadoopConfiguration.set("fs.s3a.secret.key", AWSSecretKey)
>
> Try setting "spark.hadoop.fs.s3a.access.key" and
> "spark.hadoop.fs.s3a.secret.key" in your SparkConf before creating the
> SparkContext.
>
> --
> Marcelo
>


Re: [Spark-SQL]: Unable to propagate hadoop configuration after SparkContext is initialized

2015-10-27 Thread Jerry Lam
Hi Marcelo,

I tried setting the properties before instantiating spark context via
SparkConf. It works fine.
Originally, the code I have read hadoop configurations from hdfs-site.xml
which works perfectly fine as well.
Therefore, can I conclude that sparkContext.hadoopConfiguration.set("key",
"value") does not propagate through all SQL jobs within the same
SparkContext? I haven't try with Spark Core so I cannot tell.

Is there a workaround given it seems to be broken? I need to do this
programmatically after the SparkContext is instantiated not before...

Best Regards,

Jerry

On Tue, Oct 27, 2015 at 2:30 PM, Marcelo Vanzin <van...@cloudera.com> wrote:

> If setting the values in SparkConf works, there's probably some bug in
> the SQL code; e.g. creating a new Configuration object instead of
> using the one in SparkContext. But I'm not really familiar with that
> code.
>
> On Tue, Oct 27, 2015 at 11:22 AM, Jerry Lam <chiling...@gmail.com> wrote:
> > Hi Marcelo,
> >
> > Thanks for the advice. I understand that we could set the configurations
> > before creating SparkContext. My question is
> > SparkContext.hadoopConfiguration.set("key","value") doesn't seem to
> > propagate to all subsequent SQLContext jobs. Note that I mentioned I can
> > load the parquet file but I cannot perform a count on the parquet file
> > because of the AmazonClientException. It means that the credential is
> used
> > during the loading of the parquet but not when we are processing the
> parquet
> > file. How this can happen?
> >
> > Best Regards,
> >
> > Jerry
> >
> >
> > On Tue, Oct 27, 2015 at 2:05 PM, Marcelo Vanzin <van...@cloudera.com>
> wrote:
> >>
> >> On Tue, Oct 27, 2015 at 10:43 AM, Jerry Lam <chiling...@gmail.com>
> wrote:
> >> > Anyone experiences issues in setting hadoop configurations after
> >> > SparkContext is initialized? I'm using Spark 1.5.1.
> >> >
> >> > I'm trying to use s3a which requires access and secret key set into
> >> > hadoop
> >> > configuration. I tried to set the properties in the hadoop
> configuration
> >> > from sparktcontext.
> >> >
> >> > sc.hadoopConfiguration.set("fs.s3a.access.key", AWSAccessKeyId)
> >> > sc.hadoopConfiguration.set("fs.s3a.secret.key", AWSSecretKey)
> >>
> >> Try setting "spark.hadoop.fs.s3a.access.key" and
> >> "spark.hadoop.fs.s3a.secret.key" in your SparkConf before creating the
> >> SparkContext.
> >>
> >> --
> >> Marcelo
> >
> >
>
>
>
> --
> Marcelo
>


Re: [Spark SQL]: Spark Job Hangs on the refresh method when saving over 1 million files

2015-10-26 Thread Jerry Lam
Hi Fengdong,

Why it needs more memory at the driver side when there are many partitions? It 
seems the implementation can only support use cases for a dozen of partition 
when it is over 100, it fails apart. It is also quite slow to initialize the 
loading of partition tables when the number of partition is over 100. 

Best Regards,

Jerry

Sent from my iPhone

> On 26 Oct, 2015, at 2:50 am, Fengdong Yu <fengdo...@everstring.com> wrote:
> 
> How many partitions you generated?
> if Millions generated, then there is a huge memory consumed.
> 
> 
> 
> 
> 
>> On Oct 26, 2015, at 10:58 AM, Jerry Lam <chiling...@gmail.com> wrote:
>> 
>> Hi guys,
>> 
>> I mentioned that the partitions are generated so I tried to read the 
>> partition data from it. The driver is OOM after few minutes. The stack trace 
>> is below. It looks very similar to the the jstack above (note on the refresh 
>> method). Thanks!
>> 
>> Name: java.lang.OutOfMemoryError
>> Message: GC overhead limit exceeded
>> StackTrace: java.util.Arrays.copyOf(Arrays.java:2367)
>> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
>> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114)
>> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415)
>> java.lang.StringBuilder.append(StringBuilder.java:132)
>> org.apache.hadoop.fs.Path.toString(Path.java:384)
>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(interfaces.scala:447)
>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(interfaces.scala:447)
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>> scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(interfaces.scala:447)
>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh(interfaces.scala:453)
>> org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute(interfaces.scala:465)
>> org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache(interfaces.scala:463)
>> org.apache.spark.sql.sources.HadoopFsRelation.cachedLeafStatuses(interfaces.scala:470)
>> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$MetadataCache.refresh(ParquetRelation.scala:381)
>> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache$lzycompute(ParquetRelation.scala:145)
>> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache(ParquetRelation.scala:143)
>> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:196)
>> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:196)
>> scala.Option.getOrElse(Option.scala:120)
>> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.dataSchema(ParquetRelation.scala:196)
>> org.apache.spark.sql.sources.HadoopFsRelation.schema$lzycompute(interfaces.scala:561)
>> org.apache.spark.sql.sources.HadoopFsRelation.schema(interfaces.scala:560)
>> org.apache.spark.sql.execution.datasources.LogicalRelation.(LogicalRelation.scala:31)
>> org.apache.spark.sql.SQLContext.baseRelationToDataFrame(SQLContext.scala:395)
>> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:267)
>> 
>>> On Sun, Oct 25, 2015 at 10:25 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>> Hi Josh,
>>> 
>>> No I don't have speculation enabled. The driver took about few hours until 
>>> it was OOM. Interestingly, all partitions are generated successfully 
>>> (_SUCCESS file is written in the output directory). Is there a reason why 
>>> the driver needs so much memory? The jstack revealed that it called refresh 
>>> some file statuses. Is there a way to avoid OutputCommitCoordinator to use 
>>> so much memory? 
>>> 
>>> Ultimately, I choose to use partitions because most of the queries I have 
>>> will execute based the partition fiel

Re: [Spark SQL]: Spark Job Hangs on the refresh method when saving over 1 million files

2015-10-25 Thread Jerry Lam
Hi Josh,

No I don't have speculation enabled. The driver took about few hours until
it was OOM. Interestingly, all partitions are generated successfully
(_SUCCESS file is written in the output directory). Is there a reason why
the driver needs so much memory? The jstack revealed that it called refresh
some file statuses. Is there a way to avoid OutputCommitCoordinator to use
so much memory?

Ultimately, I choose to use partitions because most of the queries I have
will execute based the partition field. For example, "SELECT events from
customer where customer_id = 1234". If the partition is based on
customer_id, all events for a customer can be easily retrieved without
filtering the entire dataset which is much more efficient (I hope).
However, I notice that the implementation of the partition logic does not
seem to allow this type of use cases without using a lot of memory which is
a bit odd in my opinion. Any help will be greatly appreciated.

Best Regards,

Jerry



On Sun, Oct 25, 2015 at 9:25 PM, Josh Rosen <rosenvi...@gmail.com> wrote:

> Hi Jerry,
>
> Do you have speculation enabled? A write which produces one million files
> / output partitions might be using tons of driver memory via the
> OutputCommitCoordinator's bookkeeping data structures.
>
> On Sun, Oct 25, 2015 at 5:50 PM, Jerry Lam <chiling...@gmail.com> wrote:
>
>> Hi spark guys,
>>
>> I think I hit the same issue SPARK-8890
>> https://issues.apache.org/jira/browse/SPARK-8890. It is marked as
>> resolved. However it is not. I have over a million output directories for 1
>> single column in partitionBy. Not sure if this is a regression issue? Do I
>> need to set some parameters to make it more memory efficient?
>>
>> Best Regards,
>>
>> Jerry
>>
>>
>>
>>
>> On Sun, Oct 25, 2015 at 8:39 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>
>>> Hi guys,
>>>
>>> After waiting for a day, it actually causes OOM on the spark driver. I
>>> configure the driver to have 6GB. Note that I didn't call refresh myself.
>>> The method was called when saving the dataframe in parquet format. Also I'm
>>> using partitionBy() on the DataFrameWriter to generate over 1 million
>>> files. Not sure why it OOM the driver after the job is marked _SUCCESS in
>>> the output folder.
>>>
>>> Best Regards,
>>>
>>> Jerry
>>>
>>>
>>> On Sat, Oct 24, 2015 at 9:35 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>>
>>>> Hi Spark users and developers,
>>>>
>>>> Does anyone encounter any issue when a spark SQL job produces a lot of
>>>> files (over 1 millions), the job hangs on the refresh method? I'm using
>>>> spark 1.5.1. Below is the stack trace. I saw the parquet files are produced
>>>> but the driver is doing something very intensively (it uses all the cpus).
>>>> Does it mean Spark SQL cannot be used to produce over 1 million files in a
>>>> single job?
>>>>
>>>> Thread 528: (state = BLOCKED)
>>>>  - java.util.Arrays.copyOf(char[], int) @bci=1, line=2367 (Compiled
>>>> frame)
>>>>  - java.lang.AbstractStringBuilder.expandCapacity(int) @bci=43,
>>>> line=130 (Compiled frame)
>>>>  - java.lang.AbstractStringBuilder.ensureCapacityInternal(int) @bci=12,
>>>> line=114 (Compiled frame)
>>>>  - java.lang.AbstractStringBuilder.append(java.lang.String) @bci=19,
>>>> line=415 (Compiled frame)
>>>>  - java.lang.StringBuilder.append(java.lang.String) @bci=2, line=132
>>>> (Compiled frame)
>>>>  - org.apache.hadoop.fs.Path.toString() @bci=128, line=384 (Compiled
>>>> frame)
>>>>  -
>>>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(org.apache.hadoop.fs.FileStatus)
>>>> @bci=4, line=447 (Compiled frame)
>>>>  -
>>>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(java.lang.Object)
>>>> @bci=5, line=447 (Compiled frame)
>>>>  -
>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object)
>>>> @bci=9, line=244 (Compiled frame)
>>>>  -
>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object)
>>>> @bci=2, line=244 (Compiled frame)
>>>>  -
>>>> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
>>>> scala.Function1) @bci=22, line=33 (Compiled frame)
>>>>  - scala.collection.mutable.Arr

Re: [Spark SQL]: Spark Job Hangs on the refresh method when saving over 1 million files

2015-10-25 Thread Jerry Lam
Hi guys,

I mentioned that the partitions are generated so I tried to read the
partition data from it. The driver is OOM after few minutes. The stack
trace is below. It looks very similar to the the jstack above (note on the
refresh method). Thanks!

Name: java.lang.OutOfMemoryError
Message: GC overhead limit exceeded
StackTrace: java.util.Arrays.copyOf(Arrays.java:2367)
java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114)
java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415)
java.lang.StringBuilder.append(StringBuilder.java:132)
org.apache.hadoop.fs.Path.toString(Path.java:384)
org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(interfaces.scala:447)
org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(interfaces.scala:447)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(interfaces.scala:447)
org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh(interfaces.scala:453)org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute(interfaces.scala:465)org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache(interfaces.scala:463)
org.apache.spark.sql.sources.HadoopFsRelation.cachedLeafStatuses(interfaces.scala:470)
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$MetadataCache.refresh(ParquetRelation.scala:381)org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache$lzycompute(ParquetRelation.scala:145)org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache(ParquetRelation.scala:143)
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:196)
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:196)
scala.Option.getOrElse(Option.scala:120)
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.dataSchema(ParquetRelation.scala:196)
org.apache.spark.sql.sources.HadoopFsRelation.schema$lzycompute(interfaces.scala:561)
org.apache.spark.sql.sources.HadoopFsRelation.schema(interfaces.scala:560)
org.apache.spark.sql.execution.datasources.LogicalRelation.(LogicalRelation.scala:31)
org.apache.spark.sql.SQLContext.baseRelationToDataFrame(SQLContext.scala:395)
org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:267)


On Sun, Oct 25, 2015 at 10:25 PM, Jerry Lam <chiling...@gmail.com> wrote:

> Hi Josh,
>
> No I don't have speculation enabled. The driver took about few hours until
> it was OOM. Interestingly, all partitions are generated successfully
> (_SUCCESS file is written in the output directory). Is there a reason why
> the driver needs so much memory? The jstack revealed that it called refresh
> some file statuses. Is there a way to avoid OutputCommitCoordinator to
> use so much memory?
>
> Ultimately, I choose to use partitions because most of the queries I have
> will execute based the partition field. For example, "SELECT events from
> customer where customer_id = 1234". If the partition is based on
> customer_id, all events for a customer can be easily retrieved without
> filtering the entire dataset which is much more efficient (I hope).
> However, I notice that the implementation of the partition logic does not
> seem to allow this type of use cases without using a lot of memory which is
> a bit odd in my opinion. Any help will be greatly appreciated.
>
> Best Regards,
>
> Jerry
>
>
>
> On Sun, Oct 25, 2015 at 9:25 PM, Josh Rosen <rosenvi...@gmail.com> wrote:
>
>> Hi Jerry,
>>
>> Do you have speculation enabled? A write which produces one million files
>> / output partitions might be using tons of driver memory via the
>> OutputCommitCoordinator's bookkeeping data structures.
>>
>> On Sun, Oct 25, 2015 at 5:50 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>
>>> Hi spark guys,
>>>
>>> I think I hit the same issue SPARK-8890
>>> https://issues.apache.org/jira/browse/SPARK-8890. It is marked as
>>

Re: [Spark SQL]: Spark Job Hangs on the refresh method when saving over 1 million files

2015-10-25 Thread Jerry Lam
Hi spark guys,

I think I hit the same issue SPARK-8890
https://issues.apache.org/jira/browse/SPARK-8890. It is marked as resolved.
However it is not. I have over a million output directories for 1 single
column in partitionBy. Not sure if this is a regression issue? Do I need to
set some parameters to make it more memory efficient?

Best Regards,

Jerry




On Sun, Oct 25, 2015 at 8:39 PM, Jerry Lam <chiling...@gmail.com> wrote:

> Hi guys,
>
> After waiting for a day, it actually causes OOM on the spark driver. I
> configure the driver to have 6GB. Note that I didn't call refresh myself.
> The method was called when saving the dataframe in parquet format. Also I'm
> using partitionBy() on the DataFrameWriter to generate over 1 million
> files. Not sure why it OOM the driver after the job is marked _SUCCESS in
> the output folder.
>
> Best Regards,
>
> Jerry
>
>
> On Sat, Oct 24, 2015 at 9:35 PM, Jerry Lam <chiling...@gmail.com> wrote:
>
>> Hi Spark users and developers,
>>
>> Does anyone encounter any issue when a spark SQL job produces a lot of
>> files (over 1 millions), the job hangs on the refresh method? I'm using
>> spark 1.5.1. Below is the stack trace. I saw the parquet files are produced
>> but the driver is doing something very intensively (it uses all the cpus).
>> Does it mean Spark SQL cannot be used to produce over 1 million files in a
>> single job?
>>
>> Thread 528: (state = BLOCKED)
>>  - java.util.Arrays.copyOf(char[], int) @bci=1, line=2367 (Compiled frame)
>>  - java.lang.AbstractStringBuilder.expandCapacity(int) @bci=43, line=130
>> (Compiled frame)
>>  - java.lang.AbstractStringBuilder.ensureCapacityInternal(int) @bci=12,
>> line=114 (Compiled frame)
>>  - java.lang.AbstractStringBuilder.append(java.lang.String) @bci=19,
>> line=415 (Compiled frame)
>>  - java.lang.StringBuilder.append(java.lang.String) @bci=2, line=132
>> (Compiled frame)
>>  - org.apache.hadoop.fs.Path.toString() @bci=128, line=384 (Compiled
>> frame)
>>  -
>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(org.apache.hadoop.fs.FileStatus)
>> @bci=4, line=447 (Compiled frame)
>>  -
>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(java.lang.Object)
>> @bci=5, line=447 (Compiled frame)
>>  -
>> scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object)
>> @bci=9, line=244 (Compiled frame)
>>  -
>> scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object)
>> @bci=2, line=244 (Compiled frame)
>>  -
>> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
>> scala.Function1) @bci=22, line=33 (Compiled frame)
>>  - scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1)
>> @bci=2, line=108 (Compiled frame)
>>  -
>> scala.collection.TraversableLike$class.map(scala.collection.TraversableLike,
>> scala.Function1, scala.collection.generic.CanBuildFrom) @bci=17, line=244
>> (Compiled frame)
>>  - scala.collection.mutable.ArrayOps$ofRef.map(scala.Function1,
>> scala.collection.generic.CanBuildFrom) @bci=3, line=108 (Interpreted frame)
>>  -
>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(java.lang.String[])
>> @bci=279, line=447 (Interpreted frame)
>>  -
>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh()
>> @bci=8, line=453 (Interpreted frame)
>>  - 
>> org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute()
>> @bci=26, line=465 (Interpreted frame)
>>  - 
>> org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache()
>> @bci=12, line=463 (Interpreted frame)
>>  - org.apache.spark.sql.sources.HadoopFsRelation.refresh() @bci=1,
>> line=540 (Interpreted frame)
>>  -
>> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.refresh()
>> @bci=1, line=204 (Interpreted frame)
>>  -
>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp()
>> @bci=392, line=152 (Interpreted frame)
>>  -
>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply()
>> @bci=1, line=108 (Interpreted frame)
>>  -
>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply()
>> @bci=1, line=108 (Interpreted frame)
>>  -
>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(org.apache.spark.sql.SQL

Re: [Spark SQL]: Spark Job Hangs on the refresh method when saving over 1 million files

2015-10-25 Thread Jerry Lam
Hi guys,

After waiting for a day, it actually causes OOM on the spark driver. I
configure the driver to have 6GB. Note that I didn't call refresh myself.
The method was called when saving the dataframe in parquet format. Also I'm
using partitionBy() on the DataFrameWriter to generate over 1 million
files. Not sure why it OOM the driver after the job is marked _SUCCESS in
the output folder.

Best Regards,

Jerry


On Sat, Oct 24, 2015 at 9:35 PM, Jerry Lam <chiling...@gmail.com> wrote:

> Hi Spark users and developers,
>
> Does anyone encounter any issue when a spark SQL job produces a lot of
> files (over 1 millions), the job hangs on the refresh method? I'm using
> spark 1.5.1. Below is the stack trace. I saw the parquet files are produced
> but the driver is doing something very intensively (it uses all the cpus).
> Does it mean Spark SQL cannot be used to produce over 1 million files in a
> single job?
>
> Thread 528: (state = BLOCKED)
>  - java.util.Arrays.copyOf(char[], int) @bci=1, line=2367 (Compiled frame)
>  - java.lang.AbstractStringBuilder.expandCapacity(int) @bci=43, line=130
> (Compiled frame)
>  - java.lang.AbstractStringBuilder.ensureCapacityInternal(int) @bci=12,
> line=114 (Compiled frame)
>  - java.lang.AbstractStringBuilder.append(java.lang.String) @bci=19,
> line=415 (Compiled frame)
>  - java.lang.StringBuilder.append(java.lang.String) @bci=2, line=132
> (Compiled frame)
>  - org.apache.hadoop.fs.Path.toString() @bci=128, line=384 (Compiled frame)
>  -
> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(org.apache.hadoop.fs.FileStatus)
> @bci=4, line=447 (Compiled frame)
>  -
> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(java.lang.Object)
> @bci=5, line=447 (Compiled frame)
>  - scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object)
> @bci=9, line=244 (Compiled frame)
>  - scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object)
> @bci=2, line=244 (Compiled frame)
>  -
> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
> scala.Function1) @bci=22, line=33 (Compiled frame)
>  - scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1)
> @bci=2, line=108 (Compiled frame)
>  -
> scala.collection.TraversableLike$class.map(scala.collection.TraversableLike,
> scala.Function1, scala.collection.generic.CanBuildFrom) @bci=17, line=244
> (Compiled frame)
>  - scala.collection.mutable.ArrayOps$ofRef.map(scala.Function1,
> scala.collection.generic.CanBuildFrom) @bci=3, line=108 (Interpreted frame)
>  -
> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(java.lang.String[])
> @bci=279, line=447 (Interpreted frame)
>  - org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh()
> @bci=8, line=453 (Interpreted frame)
>  - 
> org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute()
> @bci=26, line=465 (Interpreted frame)
>  - 
> org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache()
> @bci=12, line=463 (Interpreted frame)
>  - org.apache.spark.sql.sources.HadoopFsRelation.refresh() @bci=1,
> line=540 (Interpreted frame)
>  -
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.refresh()
> @bci=1, line=204 (Interpreted frame)
>  -
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp()
> @bci=392, line=152 (Interpreted frame)
>  -
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply()
> @bci=1, line=108 (Interpreted frame)
>  -
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply()
> @bci=1, line=108 (Interpreted frame)
>  -
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(org.apache.spark.sql.SQLContext,
> org.apache.spark.sql.SQLContext$QueryExecution, scala.Function0) @bci=96,
> line=56 (Interpreted frame)
>  -
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(org.apache.spark.sql.SQLContext)
> @bci=718, line=108 (Interpreted frame)
>  -
> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute()
> @bci=20, line=57 (Interpreted frame)
>  - org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult()
> @bci=15, line=57 (Interpreted frame)
>  - org.apache.spark.sql.execution.ExecutedCommand.doExecute() @bci=12,
> line=69 (Interpreted frame)
>  - org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply()
> @bci=11, line=140 (Interpreted frame)
>  - org.apache.spark.sql.execution.Spa

Spark SQL: Issues with using DirectParquetOutputCommitter with APPEND mode and OVERWRITE mode

2015-10-22 Thread Jerry Lam
Hi Spark users and developers,

I read the ticket [SPARK-8578] (Should ignore user defined output committer
when appending data) which ignore DirectParquetOutputCommitter if append
mode is selected. The logic was that it is unsafe to use because it is not
possible to revert a failed job in append mode using
DirectParquetOutputCommitter. I think wouldn't it better to allow users to
use it at their own risk? Say, if you use DirectParquetOutputCommitter with
append mode, the job fails immediately when a task fails. The user can then
choose to reprocess the job entirely which is not a big deal since failure
is rare in most cases. Another approach is to provide at least once-task
semantic for append mode using DirectParquetOutputCommitter. This will end
up having duplicate records but for some applications, this is fine.

The second issue is that  the assumption that Overwrite mode works with
DirectParquetOutputCommitter for all cases is wrong at least from the
perspective of using it with s3. S3 provides eventual consistency for
overwrite PUTS and DELETES. So if you attempt to delete a directory and
then create the same directory again in a split of a second. The chance you
hit org.apache.hadoop.fs.FileAlreadyExistsException is very high because
deletes don't immediately and creating the same file before it is deleted
will result with the above exception. Might I propose to change the code
such that it will actually OVERWRITE the file instead of a delete following
by a create?

Best Regards,

Jerry


Spark SQL: Preserving Dataframe Schema

2015-10-20 Thread Jerry Lam
Hi Spark users and developers,

I have a dataframe with the following schema (Spark 1.5.1):

StructType(StructField(type,StringType,true),
StructField(timestamp,LongType,false))

After I save the dataframe in parquet and read it back, I get the following
schema:

StructType(StructField(timestamp,LongType,true),
StructField(type,StringType,true))

As you can see the schema does not match. The nullable field is set to true
for timestamp upon reading the dataframe back. Is there a way to preserve
the schema so that what we write to will be what we read back?

Best Regards,

Jerry


Re: Spark executor on Mesos - how to set effective user id?

2015-10-19 Thread Jerry Lam
Can you try setting SPARK_USER at the driver? It is used to impersonate users 
at the executor. So if you have user setup for launching spark jobs on the 
executor machines, simply set it to that user name for SPARK_USER. There is 
another configuration that will prevents jobs being launched with a different 
user except the one that is configured. I don't remember the name of it but it 
is in the documentation.


Sent from my iPhone

> On 19 Oct, 2015, at 8:14 am, Eugene Chepurniy  wrote:
> 
> Hi everyone!
> While we are trying to utilize Spark On Mesos cluster, we are facing an
> issue related to effective linux user id being used to start executors on
> Mesos slaves: all executors are trying to use driver's linux user id to
> start on Mesos slaves. 
> Let me explain in detail: spark driver program (which is going to spawn
> Spark on Mesos in coarse mode) is started as unprivileged linux user, for
> example 'user1'. We have Spark distribution unpacked and ready-to-use on
> every mesos slave (placed at /opt/spark, 'spark.mesos.executor.home' is
> pointing to this folder). And after attempt to run every executor fails to
> start with error log telling user 'user1' is not available. And it is really
> true - there is no 'user1' present on Mesos slaves. 
> So my question is: how can I control effective user id which will be used
> for start executors on Mesos?
> Actually I was trying to setup SPARK_USER=nobody on every slave but it
> wasn't useful. 
> Thanks for advice if any.
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-executor-on-Mesos-how-to-set-effective-user-id-tp25118.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Indexing Support

2015-10-18 Thread Jerry Lam
I'm interested in it but I doubt there is r-tree indexing support in the near 
future as spark is not a database. You might have a better luck looking at 
databases with spatial indexing support out of the box. 

Cheers

Sent from my iPad

On 2015-10-18, at 17:16, Mustafa Elbehery  wrote:

> Hi All, 
> 
> I am trying to use spark to process Spatial Data. I am looking for R-Tree 
> Indexing support in best case, but I would be fine with any other indexing 
> capability as well, just to improve performance. 
> 
> Anyone had the same issue before, and is there any information regarding 
> Index support in future releases ?!!
> 
> Regards.
> 
> -- 
> Mustafa Elbehery
> EIT ICT Labs Master School
> +49(0)15750363097
> skype: mustafaelbehery87
> 


Re: Dataframes - sole data structure for parallel computations?

2015-10-08 Thread Jerry Lam
I just read the article by ogirardot but I don’t agree
It is like saying pandas dataframe is the sole data structure for analyzing 
data in python. Can Pandas dataframe replace Numpy array? The answer is simply 
no from an efficiency perspective for some computations. 

Unless there is a computer science breakthrough in terms of data structure 
(i.e. the data structure of everything), the statement of sole data structure 
can be treated as a joke only. Just in case, people get upset. I AM JOKING :) 

> On Oct 8, 2015, at 1:56 PM, Michael Armbrust  wrote:
> 
> Don't worry, the ability to work with domain objects and lambda functions is 
> not going to go away.  However, we are looking at ways to leverage Tungsten's 
> improved performance when processing structured data.
> 
> More details can be found here:
> https://issues.apache.org/jira/browse/SPARK- 
> 
> 
> On Thu, Oct 8, 2015 at 7:40 AM, Tracewski, Lukasz 
>  > wrote:
> Hi,
> 
>  
> 
> Many people interpret this slide from Databricks
> 
> https://ogirardot.files.wordpress.com/2015/05/future-of-spark.png 
> 
> as indication that Dataframes API is going to be the main processing unit of 
> Spark and sole access point to MLlib, Streaming and such. Is it true? My 
> impression was that Dataframes are an additional abstraction layer with some 
> promising optimisation coming from Tungsten project, but that’s all. RDDs are 
> there to stay. They are a natural selection when it comes to e.g. processing 
> images.
> 
>  
> 
> Here is one article that advertises Dataframes as a “sole data structure for 
> parallel computations”:
> 
> https://ogirardot.wordpress.com/2015/05/29/rdds-are-the-new-bytecode-of-apache-spark/
>  
> 
>  (paragraph 4)
> 
>  
> 
> Cheers,
> 
> Lucas
> 
>  
> 
>  
> 
> 
> 
> ==
> Please access the attached hyperlink for an important electronic 
> communications disclaimer:
> http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html 
> 
> ==
> 
> 



Re: spark-submit --packages using different resolver

2015-10-06 Thread Jerry Lam
This is the ticket SPARK-10951
<https://issues.apache.org/jira/browse/SPARK-10951>

Cheers~

On Tue, Oct 6, 2015 at 11:33 AM, Jerry Lam <chiling...@gmail.com> wrote:

> Hi Burak,
>
> Thank you for the tip.
> Unfortunately it does not work. It throws:
>
> java.net.MalformedURLException: unknown protocol: s3n]
> at
> org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:1003)
> at
> org.apache.spark.deploy.SparkSubmit$.prepareSubmitEnvironment(SparkSubmit.scala:286)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:153)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> It looks like the meat is in the createRepoResolvers which does not
> currently support s3 repo. I will file a jira ticket for this.
>
> Best Regards,
>
> Jerry
>
> On Sat, Oct 3, 2015 at 12:50 PM, Burak Yavuz <brk...@gmail.com> wrote:
>
>> Hi Jerry,
>>
>> The --packages feature doesn't support private repositories right now.
>> However, in the case of s3, maybe it might work. Could you please try using
>> the --repositories flag and provide the address:
>> `$ spark-submit --packages my:awesome:package --repositories
>> s3n://$aws_ak:$aws_sak@bucket/path/to/repo`
>>
>> If that doesn't work, could you please file a JIRA?
>>
>> Best,
>> Burak
>>
>>
>> On Thu, Oct 1, 2015 at 8:58 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>
>>> Hi spark users and developers,
>>>
>>> I'm trying to use spark-submit --packages against private s3 repository.
>>> With sbt, I'm using fm-sbt-s3-resolver with proper aws s3 credentials. I
>>> wonder how can I add this resolver into spark-submit such that --packages
>>> can resolve dependencies from private repo?
>>>
>>> Thank you!
>>>
>>> Jerry
>>>
>>
>>
>


Re: spark-submit --packages using different resolver

2015-10-06 Thread Jerry Lam
Hi Burak,

Thank you for the tip.
Unfortunately it does not work. It throws:

java.net.MalformedURLException: unknown protocol: s3n]
at
org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:1003)
at
org.apache.spark.deploy.SparkSubmit$.prepareSubmitEnvironment(SparkSubmit.scala:286)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:153)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

It looks like the meat is in the createRepoResolvers which does not
currently support s3 repo. I will file a jira ticket for this.

Best Regards,

Jerry

On Sat, Oct 3, 2015 at 12:50 PM, Burak Yavuz <brk...@gmail.com> wrote:

> Hi Jerry,
>
> The --packages feature doesn't support private repositories right now.
> However, in the case of s3, maybe it might work. Could you please try using
> the --repositories flag and provide the address:
> `$ spark-submit --packages my:awesome:package --repositories
> s3n://$aws_ak:$aws_sak@bucket/path/to/repo`
>
> If that doesn't work, could you please file a JIRA?
>
> Best,
> Burak
>
>
> On Thu, Oct 1, 2015 at 8:58 PM, Jerry Lam <chiling...@gmail.com> wrote:
>
>> Hi spark users and developers,
>>
>> I'm trying to use spark-submit --packages against private s3 repository.
>> With sbt, I'm using fm-sbt-s3-resolver with proper aws s3 credentials. I
>> wonder how can I add this resolver into spark-submit such that --packages
>> can resolve dependencies from private repo?
>>
>> Thank you!
>>
>> Jerry
>>
>
>


Re: Limiting number of cores per job in multi-threaded driver.

2015-10-04 Thread Jerry Lam
Philip, the guy is trying to help you. Calling him silly is a bit too far. He 
might assume your problem is IO bound which might not be the case. If you need 
only 4 cores per job no matter what there is little advantage to use spark in 
my opinion because you can easily do this with just a worker farm that take the 
job and process it in a single machine. let the scheduler figures out which 
node in the farm is idled and spawns jobs on those until all of them are 
saturated. Call me silly but this seems much simpler.

Sent from my iPhone

> On 3 Oct, 2015, at 12:02 am, Philip Weaver  wrote:
> 
> You can't really say 8 cores is not much horsepower when you have no idea 
> what my use case is. That's silly.
> 
>> On Fri, Sep 18, 2015 at 10:33 PM, Adrian Tanase  wrote:
>> Forgot to mention that you could also restrict the parallelism to 4, 
>> essentially using only 4 cores at any given time, however if your job is 
>> complex, a stage might be broken into more than 1 task...
>> 
>> Sent from my iPhone
>> 
>> On 19 Sep 2015, at 08:30, Adrian Tanase  wrote:
>> 
>>> Reading through the docs it seems that with a combination of FAIR scheduler 
>>> and maybe pools you can get pretty far.
>>> 
>>> However the smallest unit of scheduled work is the task so probably you 
>>> need to think about the parallelism of each transformation.
>>> 
>>> I'm guessing that by increasing the level of parallelism you get many 
>>> smaller tasks that the scheduler can then run across the many jobs you 
>>> might have - as opposed to fewer, longer tasks...
>>> 
>>> Lastly, 8 cores is not that much horsepower :) 
>>> You may consider running with beefier machines or a larger cluster, to get 
>>> at least tens of cores.
>>> 
>>> Hope this helps,
>>> -adrian
>>> 
>>> Sent from my iPhone
>>> 
>>> On 18 Sep 2015, at 18:37, Philip Weaver  wrote:
>>> 
 Here's a specific example of what I want to do. My Spark application is 
 running with total-executor-cores=8. A request comes in, it spawns a 
 thread to handle that request, and starts a job. That job should use only 
 4 cores, not all 8 of the cores available to the cluster.. When the first 
 job is scheduled, it should take only 4 cores, not all 8 of the cores that 
 are available to the driver.
 
 Is there any way to accomplish this? This is on mesos.
 
 In order to support the use cases described in 
 https://spark.apache.org/docs/latest/job-scheduling.html, where a spark 
 application runs for a long time and handles requests from multiple users, 
 I believe what I'm asking about is a very important feature. One of the 
 goals is to get lower latency for each request, but if the first request 
 takes all resources and we can't guarantee any free resources for the 
 second request, then that defeats the purpose. Does that make sense?
 
 Thanks in advance for any advice you can provide!
 
 - Philip
 
> On Sat, Sep 12, 2015 at 10:40 PM, Philip Weaver  
> wrote:
> I'm playing around with dynamic allocation in spark-1.5.0, with the FAIR 
> scheduler, so I can define a long-running application capable of 
> executing multiple simultaneous spark jobs.
> 
> The kind of jobs that I'm running do not benefit from more than 4 cores, 
> but I want my application to be able to take several times that in order 
> to run multiple jobs at the same time.
> 
> I suppose my question is more basic: How can I limit the number of cores 
> used to load an RDD or DataFrame? I can immediately repartition or 
> coalesce my RDD or DataFrame to 4 partitions after I load it, but that 
> doesn't stop Spark from using more cores to load it.
> 
> Does it make sense what I am trying to accomplish, and is there any way 
> to do it?
> 
> - Philip
> 


spark-submit --packages using different resolver

2015-10-01 Thread Jerry Lam
Hi spark users and developers,

I'm trying to use spark-submit --packages against private s3 repository.
With sbt, I'm using fm-sbt-s3-resolver with proper aws s3 credentials. I
wonder how can I add this resolver into spark-submit such that --packages
can resolve dependencies from private repo?

Thank you!

Jerry


Re: Spark SQL: Implementing Custom Data Source

2015-09-29 Thread Jerry Lam
Hi Michael and Ted,

Thank you for the reference. Is it true that once I implement a custom data
source, it can be used in all spark supported language? That is Scala,
Java, Python and R. :)
I want to take advantage of the interoperability that is already built in
spark.

Thanks!

Jerry

On Tue, Sep 29, 2015 at 11:31 AM, Michael Armbrust <mich...@databricks.com>
wrote:

> Thats a pretty advanced example that uses experimental APIs.  I'd suggest
> looking at https://github.com/databricks/spark-avro as a reference.
>
> On Mon, Sep 28, 2015 at 9:00 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> See this thread:
>>
>> http://search-hadoop.com/m/q3RTttmiYDqGc202
>>
>> And:
>>
>>
>> http://spark.apache.org/docs/latest/sql-programming-guide.html#data-sources
>>
>> On Sep 28, 2015, at 8:22 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>
>> Hi spark users and developers,
>>
>> I'm trying to learn how implement a custom data source for Spark SQL. Is
>> there a documentation that I can use as a reference? I'm not sure exactly
>> what needs to be extended/implemented. A general workflow will be greatly
>> helpful!
>>
>> Best Regards,
>>
>> Jerry
>>
>>
>


Spark SQL: Implementing Custom Data Source

2015-09-28 Thread Jerry Lam
Hi spark users and developers,

I'm trying to learn how implement a custom data source for Spark SQL. Is
there a documentation that I can use as a reference? I'm not sure exactly
what needs to be extended/implemented. A general workflow will be greatly
helpful!

Best Regards,

Jerry


Re: Spark SQL: Native Support for LATERAL VIEW EXPLODE

2015-09-26 Thread Jerry Lam
Hi Michael,

Thanks for the tip. With dataframe, is it possible to explode some selected
fields in each purchase_items?
Since purchase_items is an array of item and each item has a number of
fields (for example product_id and price), is it possible to just explode
these two fields directly using dataframe?

Best Regards,


Jerry

On Fri, Sep 25, 2015 at 7:53 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> The SQL parser without HiveContext is really simple, which is why I
> generally recommend users use HiveContext.  However, you can do it with
> dataframes:
>
> import org.apache.spark.sql.functions._
> table("purchases").select(explode(df("purchase_items")).as("item"))
>
>
>
> On Fri, Sep 25, 2015 at 4:21 PM, Jerry Lam <chiling...@gmail.com> wrote:
>
>> Hi sparkers,
>>
>> Anyone knows how to do LATERAL VIEW EXPLODE without HiveContext?
>> I don't want to start up a metastore and derby just because I need
>> LATERAL VIEW EXPLODE.
>>
>> I have been trying but I always get the exception like this:
>>
>> Name: java.lang.RuntimeException
>> Message: [1.68] failure: ``union'' expected but identifier view found
>>
>> with the query look like:
>>
>> "select items from purhcases lateral view explode(purchase_items) tbl as
>> items"
>>
>> Best Regards,
>>
>> Jerry
>>
>>
>


Spark SQL: Native Support for LATERAL VIEW EXPLODE

2015-09-25 Thread Jerry Lam
Hi sparkers,

Anyone knows how to do LATERAL VIEW EXPLODE without HiveContext?
I don't want to start up a metastore and derby just because I need LATERAL
VIEW EXPLODE.

I have been trying but I always get the exception like this:

Name: java.lang.RuntimeException
Message: [1.68] failure: ``union'' expected but identifier view found

with the query look like:

"select items from purhcases lateral view explode(purchase_items) tbl as
items"

Best Regards,

Jerry


Re: Spark standalone/Mesos on top of Ceph

2015-09-22 Thread Jerry Lam
Do you have specific reasons to use Ceph? I used Ceph before, I'm not too
in love with it especially when I was using the Ceph Object Gateway S3 API.
There are some incompatibilities with aws s3 api. You really really need to
try it because making the commitment. Did you managed to install it?

On Tue, Sep 22, 2015 at 9:28 PM, fightf...@163.com 
wrote:

> Hi guys,
>
> Here is the info for Ceph : http://ceph.com/
>
> We are investigating and using Ceph for distributed storage and
> monitoring, specifically interested
>
> in using Ceph as the underlied file system storage for spark. However, we
> had no experience for achiveing
>
> that. Any body has seen such progress ?
>
> Best,
> Sun.
>
> --
> fightf...@163.com
>


Re: Re: Spark standalone/Mesos on top of Ceph

2015-09-22 Thread Jerry Lam
Hi Sun,

The issue with Ceph as the underlying file system for Spark is that you
lose data locality. Ceph is not designed to have spark run directly on top
of the OSDs. I know that cephfs provides data location information via
hadoop compatible API. The last time I researched on this is that the
integration is experimental (just google it and you will find a lot of
discussions eg.
http://lists.ceph.com/pipermail/ceph-users-ceph.com/2015-July/002837.html).

However, this might not be a biggest issue as long as you have GREAT
network bandwidth like infiniband or +10 Gigabit Ethernet. My guess is that
the architecture and the performance will be similar to S3+Spark at best
(with 10GE instances) if you guys do the network stuff seriously.

HTH,

Jerry

On Tue, Sep 22, 2015 at 9:59 PM, fightf...@163.com <fightf...@163.com>
wrote:

> Hi Jerry
>
> Yeah, we managed to run and use ceph already in our few production
> environment, especially with OpenStack.
>
> The reason we want to use Ceph is that we aim to look for some workarounds
> for unified storage layer and the design
>
> concepts of ceph is quite catching. I am just interested in such work like
> the hadoop cephfs plugin and recently we
>
> are going to do some benchmark tests between HDFS and cephfs.
>
> So the ongoing progress would be benificial if some related work between
> Apache Spark and Ceph could dedicate some
>
> thoughful insights.
>
> BTW, for the Ceph Object Gateway s3 rest api, agreed for such
> inconvinience and some incompobilities. However, we had not
>
> yet quite researched and tested over radosgw a lot. But we had some little
> requirements using gw in some use cases.
>
> Hope for more considerations and talks.
>
> Best,
> Sun.
>
> --
> fightf...@163.com
>
>
> *From:* Jerry Lam <chiling...@gmail.com>
> *Date:* 2015-09-23 09:37
> *To:* fightf...@163.com
> *CC:* user <user@spark.apache.org>
> *Subject:* Re: Spark standalone/Mesos on top of Ceph
> Do you have specific reasons to use Ceph? I used Ceph before, I'm not too
> in love with it especially when I was using the Ceph Object Gateway S3 API.
> There are some incompatibilities with aws s3 api. You really really need to
> try it because making the commitment. Did you managed to install it?
>
> On Tue, Sep 22, 2015 at 9:28 PM, fightf...@163.com <fightf...@163.com>
> wrote:
>
>> Hi guys,
>>
>> Here is the info for Ceph : http://ceph.com/
>>
>> We are investigating and using Ceph for distributed storage and
>> monitoring, specifically interested
>>
>> in using Ceph as the underlied file system storage for spark. However, we
>> had no experience for achiveing
>>
>> that. Any body has seen such progress ?
>>
>> Best,
>> Sun.
>>
>> --
>> fightf...@163.com
>>
>
>
>
>
>


Re: How does one use s3 for checkpointing?

2015-09-21 Thread Jerry Lam
Hi Amit,

Have you looked at Amazon EMR? Most people using EMR use s3 for persistency 
(both as input and output of spark jobs). 

Best Regards,

Jerry

Sent from my iPhone

> On 21 Sep, 2015, at 9:24 pm, Amit Ramesh <a...@yelp.com> wrote:
> 
> 
> A lot of places in the documentation mention using s3 for checkpointing, 
> however I haven't found any examples or concrete evidence of anyone having 
> done this.
> Is this a safe/reliable option given the read-after-write consistency for 
> PUTS in s3?
> Is s3 access broken for hadoop 2.6 (SPARK-7442)? If so, is it viable in 2.4?
> Related to #2. I did try providing hadoop-aws-2.6.0.jar while submitting the 
> job and got the following stack trace. Is there a fix?
> py4j.protocol.Py4JJavaError: An error occurred while calling 
> None.org.apache.spark.api.java.JavaSparkContext.
> : java.util.ServiceConfigurationError: org.apache.hadoop.fs.FileSystem: 
> Provider org.apache.hadoop.fs.s3a.S3AFileSystem could not be instantiated
> at java.util.ServiceLoader.fail(ServiceLoader.java:224)
> at java.util.ServiceLoader.access$100(ServiceLoader.java:181)
> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:377)
> at java.util.ServiceLoader$1.next(ServiceLoader.java:445)
> at 
> org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2563)
> at 
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2574)
> at 
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
> at 
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
> at org.apache.spark.SparkContext.addFile(SparkContext.scala:1354)
> at org.apache.spark.SparkContext.addFile(SparkContext.scala:1332)
> at 
> org.apache.spark.SparkContext$$anonfun$15.apply(SparkContext.scala:475)
> at 
> org.apache.spark.SparkContext$$anonfun$15.apply(SparkContext.scala:475)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at org.apache.spark.SparkContext.(SparkContext.scala:475)
> at 
> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native 
> Method)
> at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
> at py4j.Gateway.invoke(Gateway.java:214)
> at 
> py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
> at 
> py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
> at py4j.GatewayConnection.run(GatewayConnection.java:207)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NoClassDefFoundError: 
> com/amazonaws/AmazonServiceException
> at java.lang.Class.getDeclaredConstructors0(Native Method)
> at java.lang.Class.privateGetDeclaredConstructors(Class.java:2585)
> at java.lang.Class.getConstructor0(Class.java:2885)
> at java.lang.Class.newInstance(Class.java:350)
> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:373)
> ... 27 more
> Caused by: java.lang.ClassNotFoundException: 
> com.amazonaws.AmazonServiceException
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> ... 32 more
> 
> Thanks!
> Amit
> 


Spark SQL DataFrame 1.5.0 is extremely slow for take(1) or head() or first()

2015-09-21 Thread Jerry Lam
Hi Spark Developers,

I just ran some very simple operations on a dataset. I was surprise by the
execution plan of take(1), head() or first().

For your reference, this is what I did in pyspark 1.5:
df=sqlContext.read.parquet("someparquetfiles")
df.head()

The above lines take over 15 minutes. I was frustrated because I can do
better without using spark :) Since I like spark, so I tried to figure out
why. It seems the dataframe requires 3 stages to give me the first row. It
reads all data (which is about 1 billion rows) and run Limit twice.

Instead of head(), show(1) runs much faster. Not to mention that if I do:

df.rdd.take(1) //runs much faster.

Is this expected? Why head/first/take is so slow for dataframe? Is it a bug
in the optimizer? or I did something wrong?

Best Regards,

Jerry


Re: Spark SQL DataFrame 1.5.0 is extremely slow for take(1) or head() or first()

2015-09-21 Thread Jerry Lam
Hi Yin,

You are right! I just tried the scala version with the above lines, it
works as expected.
I'm not sure if it happens also in 1.4 for pyspark but I thought the
pyspark code just calls the scala code via py4j. I didn't expect that this
bug is pyspark specific. That surprises me actually a bit. I created a
ticket for this (SPARK-10731
<https://issues.apache.org/jira/browse/SPARK-10731>).

Best Regards,

Jerry


On Mon, Sep 21, 2015 at 1:01 PM, Yin Huai <yh...@databricks.com> wrote:

> btw, does 1.4 has the same problem?
>
> On Mon, Sep 21, 2015 at 10:01 AM, Yin Huai <yh...@databricks.com> wrote:
>
>> Hi Jerry,
>>
>> Looks like it is a Python-specific issue. Can you create a JIRA?
>>
>> Thanks,
>>
>> Yin
>>
>> On Mon, Sep 21, 2015 at 8:56 AM, Jerry Lam <chiling...@gmail.com> wrote:
>>
>>> Hi Spark Developers,
>>>
>>> I just ran some very simple operations on a dataset. I was surprise by
>>> the execution plan of take(1), head() or first().
>>>
>>> For your reference, this is what I did in pyspark 1.5:
>>> df=sqlContext.read.parquet("someparquetfiles")
>>> df.head()
>>>
>>> The above lines take over 15 minutes. I was frustrated because I can do
>>> better without using spark :) Since I like spark, so I tried to figure out
>>> why. It seems the dataframe requires 3 stages to give me the first row. It
>>> reads all data (which is about 1 billion rows) and run Limit twice.
>>>
>>> Instead of head(), show(1) runs much faster. Not to mention that if I do:
>>>
>>> df.rdd.take(1) //runs much faster.
>>>
>>> Is this expected? Why head/first/take is so slow for dataframe? Is it a
>>> bug in the optimizer? or I did something wrong?
>>>
>>> Best Regards,
>>>
>>> Jerry
>>>
>>
>>
>


Re: Spark SQL DataFrame 1.5.0 is extremely slow for take(1) or head() or first()

2015-09-21 Thread Jerry Lam
I just noticed you found 1.4 has the same issue. I added that as well in
the ticket.

On Mon, Sep 21, 2015 at 1:43 PM, Jerry Lam <chiling...@gmail.com> wrote:

> Hi Yin,
>
> You are right! I just tried the scala version with the above lines, it
> works as expected.
> I'm not sure if it happens also in 1.4 for pyspark but I thought the
> pyspark code just calls the scala code via py4j. I didn't expect that this
> bug is pyspark specific. That surprises me actually a bit. I created a
> ticket for this (SPARK-10731
> <https://issues.apache.org/jira/browse/SPARK-10731>).
>
> Best Regards,
>
> Jerry
>
>
> On Mon, Sep 21, 2015 at 1:01 PM, Yin Huai <yh...@databricks.com> wrote:
>
>> btw, does 1.4 has the same problem?
>>
>> On Mon, Sep 21, 2015 at 10:01 AM, Yin Huai <yh...@databricks.com> wrote:
>>
>>> Hi Jerry,
>>>
>>> Looks like it is a Python-specific issue. Can you create a JIRA?
>>>
>>> Thanks,
>>>
>>> Yin
>>>
>>> On Mon, Sep 21, 2015 at 8:56 AM, Jerry Lam <chiling...@gmail.com> wrote:
>>>
>>>> Hi Spark Developers,
>>>>
>>>> I just ran some very simple operations on a dataset. I was surprise by
>>>> the execution plan of take(1), head() or first().
>>>>
>>>> For your reference, this is what I did in pyspark 1.5:
>>>> df=sqlContext.read.parquet("someparquetfiles")
>>>> df.head()
>>>>
>>>> The above lines take over 15 minutes. I was frustrated because I can do
>>>> better without using spark :) Since I like spark, so I tried to figure out
>>>> why. It seems the dataframe requires 3 stages to give me the first row. It
>>>> reads all data (which is about 1 billion rows) and run Limit twice.
>>>>
>>>> Instead of head(), show(1) runs much faster. Not to mention that if I
>>>> do:
>>>>
>>>> df.rdd.take(1) //runs much faster.
>>>>
>>>> Is this expected? Why head/first/take is so slow for dataframe? Is it a
>>>> bug in the optimizer? or I did something wrong?
>>>>
>>>> Best Regards,
>>>>
>>>> Jerry
>>>>
>>>
>>>
>>
>


  1   2   >