Re: A proposal for creating a Knowledge Sharing Hub for Apache Spark Community

2024-03-18 Thread Reynold Xin
One of the problem in the past when something like this was brought up was that 
the ASF couldn't have officially blessed venues beyond the already approved 
ones. So that's something to look into.

Now of course you are welcome to run unofficial things unblessed as long as 
they follow trademark rules.

On Mon, Mar 18, 2024 at 1:53 PM, Mich Talebzadeh < mich.talebza...@gmail.com > 
wrote:

> 
> Well as long as it works.
> 
> Please all check this link from Databricks and let us know your thoughts.
> Will something similar work for us?. Of course Databricks have much deeper
> pockets than our ASF community. Will it require moderation in our side to
> block spams and nutcases.
> 
> 
> 
> Knowledge Sharing Hub - Databricks (
> https://community.databricks.com/t5/knowledge-sharing-hub/bd-p/Knowledge-Sharing-Hub
> )
> 
> 
> 
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> 
> London
> 
> United Kingdom
> 
> 
> 
> 
> 
> 
> 
> ** view my Linkedin profile (
> https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/ )
> 
> 
> 
> 
> 
> 
> 
> 
> https:/ / en. everybodywiki. com/ Mich_Talebzadeh (
> https://en.everybodywiki.com/Mich_Talebzadeh )
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one - thousand
> expert opinions ( Werner ( https://en.wikipedia.org/wiki/Wernher_von_Braun
> ) Von Braun ( https://en.wikipedia.org/wiki/Wernher_von_Braun ) )".
> 
> 
> 
> 
> 
> On Mon, 18 Mar 2024 at 20:31, Bjørn Jørgensen < bjornjorgensen@ gmail. com
> ( bjornjorgen...@gmail.com ) > wrote:
> 
> 
>> something like this Spark community · GitHub (
>> https://github.com/Spark-community )
>> 
>> 
>> 
>> man. 18. mars 2024 kl. 17:26 skrev Parsian, Mahmoud < mparsian@ illumina. 
>> com.
>> invalid ( mpars...@illumina.com.invalid ) >:
>> 
>> 
>>> 
>>> 
>>> Good idea. Will be useful
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> +1
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> *From:* ashok34668@ yahoo. com. INVALID ( ashok34...@yahoo.com.INVALID ) <
>>> ashok34668@ yahoo. com. INVALID ( ashok34...@yahoo.com.INVALID ) >
>>> *Date:* Monday, March 18 , 2024 at 6:36 AM
>>> *To:* user @spark < user@ spark. apache. org ( user@spark.apache.org ) >,
>>> Spark dev list < dev@ spark. apache. org ( d...@spark.apache.org ) >, Mich
>>> Talebzadeh < mich. talebzadeh@ gmail. com ( mich.talebza...@gmail.com ) >
>>> *Cc:* Matei Zaharia < matei. zaharia@ gmail. com ( matei.zaha...@gmail.com
>>> ) >
>>> *Subject:* Re: A proposal for creating a Knowledge Sharing Hub for Apache
>>> Spark Community
>>> 
>>> 
>>> 
>>> 
>>> External message, be mindful when clicking links or attachments
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> Good idea. Will be useful
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> +1
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> On Monday, 18 March 2024 at 11:00:40 GMT, Mich Talebzadeh < mich. 
>>> talebzadeh@
>>> gmail. com ( mich.talebza...@gmail.com ) > wrote:
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> Some of you may be aware that Databricks community Home | Databricks
>>> 
>>> 
>>> 
>>> 
>>> have just launched a knowledge sharing hub. I thought it would be a
>>> 
>>> 
>>> 
>>> 
>>> good idea for the Apache Spark user group to have the same, especially
>>> 
>>> 
>>> 
>>> 
>>> for repeat questions on Spark core, Spark SQL, Spark Structured
>>> 
>>> 
>>> 
>>> 
>>> Streaming, Spark Mlib and so forth.
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> Apache Spark user and dev groups have been around for a good while.
>>> 
>>> 
>>> 
>>> 
>>> They are serving their purpose . We went through creating a slack
>>> 
>>> 
>>> 
>>> 
>>> community that managed to create more more heat than light.. This is
>>> 
>>> 
>>> 
>>> 
>>> what Databricks community came up with and I quote
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> "Knowledge Sharing Hub
>>> 
>>> 
>>> 
>>> 
>>> Dive into a collaborative space where members like YOU can exchange
>>> 
>>> 
>>> 
>>> 
>>> knowledge, tips, and best practices. Join the conversation today and
>>> 
>>> 
>>> 
>>> 
>>> unlock a wealth of collective wisdom to enhance your experience and
>>> 
>>> 
>>> 
>>> 
>>> drive success."
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> I don't know the logistics of setting it up.but I am sure that should
>>> 
>>> 
>>> 
>>> 
>>> not be that difficult. If anyone is supportive of this proposal, let
>>> 
>>> 
>>> 
>>> 
>>> the usual +1, 0, -1 decide
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> HTH
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> Mich Talebzadeh,
>>> 
>>> 
>>> 
>>> 
>>> Dad | Technologist | Solutions Architect | Engineer
>>> 
>>> 
>>> 
>>> 
>>> London
>>> 
>>> 
>>> 
>>> 
>>> United Kingdom
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> view my Linkedin profile

Re: Stickers and Swag

2022-06-14 Thread Reynold Xin
Nice! Going to order a few items myself ...

On Tue, Jun 14, 2022 at 7:54 PM, Gengliang Wang < ltn...@gmail.com > wrote:

> 
> FYI now you can find the shopping information on https:/ / spark. apache. org/
> community ( https://spark.apache.org/community ) as well :)
> 
> 
> 
> Gengliang
> 
> 
> 
> 
> 
> 
>> On Jun 14, 2022, at 7:47 PM, Hyukjin Kwon < gurwls223@ gmail. com (
>> gurwls...@gmail.com ) > wrote:
>> 
>> Woohoo
>> 
>> On Tue, 14 Jun 2022 at 15:04, Xiao Li < gatorsmile@ gmail. com (
>> gatorsm...@gmail.com ) > wrote:
>> 
>> 
>>> Hi, all,
>>> 
>>> 
>>> The ASF has an official store at RedBubble (
>>> https://www.redbubble.com/people/comdev/shop ) that Apache Community
>>> Development (ComDev) runs. If you are interested in buying Spark Swag, 70
>>> products featuring the Spark logo are available: https:/ / www. redbubble.
>>> com/ shop/ ap/ 113203780 ( https://www.redbubble.com/shop/ap/113203780 )
>>> 
>>> 
>>> Go Spark!
>>> 
>>> 
>>> Xiao
>>> 
>> 
>> 
> 
>

smime.p7s
Description: S/MIME Cryptographic Signature


[ANNOUNCE] Apache Spark 3.0.0

2020-06-18 Thread Reynold Xin
Hi all,

Apache Spark 3.0.0 is the first release of the 3.x line. It builds on many of 
the innovations from Spark 2.x, bringing new ideas as well as continuing 
long-term projects that have been in development. This release resolves more 
than 3400 tickets.

We'd like to thank our contributors and users for their contributions and early 
feedback to this release. This release would not have been possible without you.

To download Spark 3.0.0, head over to the download page: 
http://spark.apache.org/downloads.html

To view the release notes: 
https://spark.apache.org/releases/spark-release-3-0-0.html

smime.p7s
Description: S/MIME Cryptographic Signature


Re: results of taken(3) not appearing in console window

2020-03-26 Thread Reynold Xin
bcc dev, +user

You need to print out the result. Take itself doesn't print. You only got the 
results printed to the console because the Scala REPL automatically prints the 
returned value from take.

On Thu, Mar 26, 2020 at 12:15 PM, Zahid Rahman < zahidr1...@gmail.com > wrote:

> 
> I am running the same code with the same libraries but not getting same
> output.
> scala>  case class flight (DEST_COUNTRY_NAME: String,
>      |                      ORIGIN_COUNTRY_NAME:String,
>      |                      count: BigInt)
> defined class flight
> 
> scala>     val flightDf = spark. read. parquet ( http://spark.read.parquet/
> ) ("/data/flight-data/parquet/2010-summary.parquet/")
> flightDf: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string,
> ORIGIN_COUNTRY_NAME: string ... 1 more field]
> 
> scala> val flights = flightDf. as ( http://flightdf.as/ ) [flight]
> flights: org.apache.spark.sql.Dataset[flight] = [DEST_COUNTRY_NAME:
> string, ORIGIN_COUNTRY_NAME: string ... 1 more field]
> 
> scala> flights.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME !=
> "Canada").map(flight_row => flight_row).take(3)
> *res0: Array[flight] = Array(flight(United States,Romania,1),
> flight(United States,Ireland,264), flight(United States,India,69))
> *
> 
> 
>  
> 20/03/26 19:09:00 INFO SparkContext: Running Spark version 3.0.0-preview2
> 20/03/26 19:09:00 INFO ResourceUtils:
> ==
> 20/03/26 19:09:00 INFO ResourceUtils: Resources for spark.driver:
> 
> 20/03/26 19:09:00 INFO ResourceUtils:
> ==
> 20/03/26 19:09:00 INFO SparkContext: Submitted application:  chapter2
> 20/03/26 19:09:00 INFO SecurityManager: Changing view acls to: kub19
> 20/03/26 19:09:00 INFO SecurityManager: Changing modify acls to: kub19
> 20/03/26 19:09:00 INFO SecurityManager: Changing view acls groups to:
> 20/03/26 19:09:00 INFO SecurityManager: Changing modify acls groups to:
> 20/03/26 19:09:00 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users  with view permissions: Set(kub19);
> groups with view permissions: Set(); users  with modify permissions:
> Set(kub19); groups with modify permissions: Set()
> 20/03/26 19:09:00 INFO Utils: Successfully started service 'sparkDriver'
> on port 46817.
> 20/03/26 19:09:00 INFO SparkEnv: Registering MapOutputTracker
> 20/03/26 19:09:00 INFO SparkEnv: Registering BlockManagerMaster
> 20/03/26 19:09:00 INFO BlockManagerMasterEndpoint: Using org. apache. spark.
> storage. DefaultTopologyMapper (
> http://org.apache.spark.storage.defaulttopologymapper/ ) for getting
> topology information
> 20/03/26 19:09:00 INFO BlockManagerMasterEndpoint:
> BlockManagerMasterEndpoint up
> 20/03/26 19:09:00 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
> 20/03/26 19:09:00 INFO DiskBlockManager: Created local directory at
> /tmp/blockmgr-0baf5097-2595-4542-99e2-a192d7baf37c
> 20/03/26 19:09:00 INFO MemoryStore: MemoryStore started with capacity
> 127.2 MiB
> 20/03/26 19:09:01 INFO SparkEnv: Registering OutputCommitCoordinator
> 20/03/26 19:09:01 WARN Utils: Service 'SparkUI' could not bind on port
> 4040. Attempting port 4041.
> 20/03/26 19:09:01 INFO Utils: Successfully started service 'SparkUI' on
> port 4041.
> 20/03/26 19:09:01 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at 
> http:/
> / localhost:4041 ( http://localhost:4041 )
> 20/03/26 19:09:01 INFO Executor: Starting executor ID driver on host
> localhost
> 20/03/26 19:09:01 INFO Utils: Successfully started service ' org. apache. 
> spark.
> network. netty. NettyBlockTransferService (
> http://org.apache.spark.network.netty.nettyblocktransferservice/ ) ' on
> port 38135.
> 20/03/26 19:09:01 INFO NettyBlockTransferService: Server created on
> localhost:38135
> 20/03/26 19:09:01 INFO BlockManager: Using org. apache. spark. storage. 
> RandomBlockReplicationPolicy
> ( http://org.apache.spark.storage.randomblockreplicationpolicy/ ) for block
> replication policy
> 20/03/26 19:09:01 INFO BlockManagerMaster: Registering BlockManager
> BlockManagerId(driver, localhost, 38135, None)
> 20/03/26 19:09:01 INFO BlockManagerMasterEndpoint: Registering block
> manager localhost:38135 with 127.2 MiB RAM, BlockManagerId(driver,
> localhost, 38135, None)
> 20/03/26 19:09:01 INFO BlockManagerMaster: Registered BlockManager
> BlockManagerId(driver, localhost, 38135, None)
> 20/03/26 19:09:01 INFO BlockManager: Initialized BlockManager:
> BlockManagerId(driver, localhost, 38135, None)
> 20/03/26 19:09:01 INFO SharedState: Setting hive.metastore.warehouse.dir
> ('null') to the value of spark.sql.warehouse.dir
> ('file:/home/kub19/spark-3.0.0-preview2-bin-hadoop2.7/projects/spark-warehouse').
> 
> 20/03/26 19:09:01 INFO SharedState: Warehouse path is
> 'file:/home/kub19/spark-3.0.0-preview2-bin-hadoop2.7/projects/spark-warehouse'.
> 
> 20/03/26 19:09:02 INFO SparkContext: Starting job: parquet at
> 

Re: FYI: The evolution on `CHAR` type behavior

2020-03-16 Thread Reynold Xin
BTW I'm not opposing us sticking to SQL standard (I'm in general for it). I was 
merely pointing out that if we deviate away from SQL standard in any way we are 
considered "wrong" or "incorrect". That argument itself is flawed when plenty 
of other popular database systems also deviate away from the standard on this 
specific behavior.

On Mon, Mar 16, 2020 at 5:29 PM, Reynold Xin < r...@databricks.com > wrote:

> 
> I looked up our usage logs (sorry I can't share this publicly) and trim
> has at least four orders of magnitude higher usage than char.
> 
> 
> 
> 
> On Mon, Mar 16, 2020 at 5:27 PM, Dongjoon Hyun < dongjoon. hyun@ gmail. com
> ( dongjoon.h...@gmail.com ) > wrote:
> 
>> Thank you, Stephen and Reynold.
>> 
>> 
>> To Reynold.
>> 
>> 
>> The way I see the following is a little different.
>> 
>> 
>>       > CHAR is an undocumented data type without clearly defined
>> semantics.
>> 
>> Let me describe in Apache Spark User's View point.
>> 
>> 
>> Apache Spark started to claim `HiveContext` (and `hql/hiveql` function) at
>> Apache Spark 1.x without much documentation. In addition, there still
>> exists an effort which is trying to keep it in 3.0.0 age.
>> 
>>        https:/ / issues. apache. org/ jira/ browse/ SPARK-31088 (
>> https://issues.apache.org/jira/browse/SPARK-31088 )
>>        Add back HiveContext and createExternalTable
>> 
>> Historically, we tried to make many SQL-based customer migrate their
>> workloads from Apache Hive into Apache Spark through `HiveContext`.
>> 
>> Although Apache Spark didn't have a good document about the inconsistent
>> behavior among its data sources, Apache Hive has been providing its
>> documentation and many customers rely the behavior.
>> 
>>       - https:/ / cwiki. apache. org/ confluence/ display/ Hive/ 
>> LanguageManual+Types
>> ( https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types )
>> 
>> At that time, frequently in on-prem Hadoop clusters by well-known vendors,
>> many existing huge tables were created by Apache Hive, not Apache Spark.
>> And, Apache Spark is used for boosting SQL performance with its *caching*.
>> This was true because Apache Spark was added into the Hadoop-vendor
>> products later than Apache Hive.
>> 
>> 
>> Until the turning point at Apache Spark 2.0, we tried to catch up more
>> features to be consistent at least with Hive tables in Apache Hive and
>> Apache Spark because two SQL engines share the same tables.
>> 
>> For the following, technically, while Apache Hive doesn't changed its
>> existing behavior in this part, Apache Spark evolves inevitably by moving
>> away from the original Apache Spark old behaviors one-by-one.
>> 
>> 
>>       >  the value is already fucked up
>> 
>> 
>> The following is the change log.
>> 
>>       - When we switched the default value of `convertMetastoreParquet`.
>> (at Apache Spark 1.2)
>>       - When we switched the default value of `convertMetastoreOrc` (at
>> Apache Spark 2.4)
>>       - When we switched `CREATE TABLE` itself. (Change `TEXT` table to
>> `PARQUET` table at Apache Spark 3.0)
>> 
>> To sum up, this has been a well-known issue in the community and among the
>> customers.
>> 
>> Bests,
>> Dongjoon.
>> 
>> On Mon, Mar 16, 2020 at 5:24 PM Stephen Coy < scoy@ infomedia. com. au (
>> s...@infomedia.com.au ) > wrote:
>> 
>> 
>>> Hi there,
>>> 
>>> 
>>> I’m kind of new around here, but I have had experience with all of all the
>>> so called “big iron” databases such as Oracle, IBM DB2 and Microsoft SQL
>>> Server as well as Postgresql.
>>> 
>>> 
>>> They all support the notion of “ANSI padding” for CHAR columns - which
>>> means that such columns are always space padded, and they default to
>>> having this enabled (for ANSI compliance).
>>> 
>>> 
>>> MySQL also supports it, but it defaults to leaving it disabled for
>>> historical reasons not unlike what we have here.
>>> 
>>> 
>>> In my opinion we should push toward standards compliance where possible
>>> and then document where it cannot work.
>>> 
>>> 
>>> If users don’t like the padding on CHAR columns then they should change to
>>> VARCHAR - I believe that was its purpose in the first place, and it does
>>> not dictate any sort of “padding".
>>> 
>>> 
>>> I can see

Re: FYI: The evolution on `CHAR` type behavior

2020-03-16 Thread Reynold Xin
I looked up our usage logs (sorry I can't share this publicly) and trim has at 
least four orders of magnitude higher usage than char.

On Mon, Mar 16, 2020 at 5:27 PM, Dongjoon Hyun < dongjoon.h...@gmail.com > 
wrote:

> 
> Thank you, Stephen and Reynold.
> 
> 
> To Reynold.
> 
> 
> The way I see the following is a little different.
> 
> 
>       > CHAR is an undocumented data type without clearly defined
> semantics.
> 
> Let me describe in Apache Spark User's View point.
> 
> 
> Apache Spark started to claim `HiveContext` (and `hql/hiveql` function) at
> Apache Spark 1.x without much documentation. In addition, there still
> exists an effort which is trying to keep it in 3.0.0 age.
> 
>        https:/ / issues. apache. org/ jira/ browse/ SPARK-31088 (
> https://issues.apache.org/jira/browse/SPARK-31088 )
>        Add back HiveContext and createExternalTable
> 
> Historically, we tried to make many SQL-based customer migrate their
> workloads from Apache Hive into Apache Spark through `HiveContext`.
> 
> Although Apache Spark didn't have a good document about the inconsistent
> behavior among its data sources, Apache Hive has been providing its
> documentation and many customers rely the behavior.
> 
>       - https:/ / cwiki. apache. org/ confluence/ display/ Hive/ 
> LanguageManual+Types
> ( https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types )
> 
> At that time, frequently in on-prem Hadoop clusters by well-known vendors,
> many existing huge tables were created by Apache Hive, not Apache Spark.
> And, Apache Spark is used for boosting SQL performance with its *caching*.
> This was true because Apache Spark was added into the Hadoop-vendor
> products later than Apache Hive.
> 
> 
> Until the turning point at Apache Spark 2.0, we tried to catch up more
> features to be consistent at least with Hive tables in Apache Hive and
> Apache Spark because two SQL engines share the same tables.
> 
> For the following, technically, while Apache Hive doesn't changed its
> existing behavior in this part, Apache Spark evolves inevitably by moving
> away from the original Apache Spark old behaviors one-by-one.
> 
> 
>       >  the value is already fucked up
> 
> 
> The following is the change log.
> 
>       - When we switched the default value of `convertMetastoreParquet`.
> (at Apache Spark 1.2)
>       - When we switched the default value of `convertMetastoreOrc` (at
> Apache Spark 2.4)
>       - When we switched `CREATE TABLE` itself. (Change `TEXT` table to
> `PARQUET` table at Apache Spark 3.0)
> 
> To sum up, this has been a well-known issue in the community and among the
> customers.
> 
> Bests,
> Dongjoon.
> 
> On Mon, Mar 16, 2020 at 5:24 PM Stephen Coy < scoy@ infomedia. com. au (
> s...@infomedia.com.au ) > wrote:
> 
> 
>> Hi there,
>> 
>> 
>> I’m kind of new around here, but I have had experience with all of all the
>> so called “big iron” databases such as Oracle, IBM DB2 and Microsoft SQL
>> Server as well as Postgresql.
>> 
>> 
>> They all support the notion of “ANSI padding” for CHAR columns - which
>> means that such columns are always space padded, and they default to
>> having this enabled (for ANSI compliance).
>> 
>> 
>> MySQL also supports it, but it defaults to leaving it disabled for
>> historical reasons not unlike what we have here.
>> 
>> 
>> In my opinion we should push toward standards compliance where possible
>> and then document where it cannot work.
>> 
>> 
>> If users don’t like the padding on CHAR columns then they should change to
>> VARCHAR - I believe that was its purpose in the first place, and it does
>> not dictate any sort of “padding".
>> 
>> 
>> I can see why you might “ban” the use of CHAR columns where they cannot be
>> consistently supported, but VARCHAR is a different animal and I would
>> expect it to work consistently everywhere.
>> 
>> 
>> 
>> 
>> Cheers,
>> 
>> 
>> Steve C
>> 
>> 
>>> On 17 Mar 2020, at 10:01 am, Dongjoon Hyun < dongjoon. hyun@ gmail. com (
>>> dongjoon.h...@gmail.com ) > wrote:
>>> 
>>> Hi, Reynold.
>>> (And +Michael Armbrust)
>>> 
>>> 
>>> If you think so, do you think it's okay that we change the return value
>>> silently? Then, I'm wondering why we reverted `TRIM` functions then?
>>> 
>>> 
>>> > Are we sure "not padding" is "incorrect"?
>>> 
>>> 
>>> 
>>> Bests,
>>> Dongjoon.

Re: FYI: The evolution on `CHAR` type behavior

2020-03-16 Thread Reynold Xin
I haven't spent enough time thinking about it to give a strong opinion, but 
this is of course very different from TRIM.

TRIM is a publicly documented function with two arguments, and we silently 
swapped the two arguments. And trim is also quite commonly used from a long 
time ago.

CHAR is an undocumented data type without clearly defined semantics. It's not 
great that we are changing the value here, but the value is already fucked up. 
It depends on the underlying data source, and random configs that are seemingly 
unrelated (orc) would impact the value.

On Mon, Mar 16, 2020 at 4:01 PM, Dongjoon Hyun < dongjoon.h...@gmail.com > 
wrote:

> 
> Hi, Reynold.
> (And +Michael Armbrust)
> 
> 
> If you think so, do you think it's okay that we change the return value
> silently? Then, I'm wondering why we reverted `TRIM` functions then?
> 
> 
> > Are we sure "not padding" is "incorrect"?
> 
> 
> 
> Bests,
> Dongjoon.
> 
> 
> 
> On Sun, Mar 15, 2020 at 11:15 PM Gourav Sengupta < gourav. sengupta@ gmail.
> com ( gourav.sengu...@gmail.com ) > wrote:
> 
> 
>> Hi,
>> 
>> 
>> 100% agree with Reynold.
>> 
>> 
>> 
>> 
>> Regards,
>> Gourav Sengupta
>> 
>> 
>> On Mon, Mar 16, 2020 at 3:31 AM Reynold Xin < rxin@ databricks. com (
>> r...@databricks.com ) > wrote:
>> 
>> 
>>> Are we sure "not padding" is "incorrect"?
>>> 
>>> 
>>> 
>>> I don't know whether ANSI SQL actually requires padding, but plenty of
>>> databases don't actually pad.
>>> 
>>> 
>>> 
>>> https:/ / docs. snowflake. net/ manuals/ sql-reference/ data-types-text. 
>>> html
>>> (
>>> https://docs.snowflake.net/manuals/sql-reference/data-types-text.html#:~:text=CHAR%20%2C%20CHARACTER,(1)%20is%20the%20default.=Snowflake%20currently%20deviates%20from%20common,space%2Dpadded%20at%20the%20end.
>>> ) : "Snowflake currently deviates from common CHAR semantics in that
>>> strings shorter than the maximum length are not space-padded at the end."
>>> 
>>> 
>>> 
>>> MySQL: https:/ / stackoverflow. com/ questions/ 53528645/ 
>>> why-char-dont-have-padding-in-mysql
>>> (
>>> https://stackoverflow.com/questions/53528645/why-char-dont-have-padding-in-mysql
>>> )
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> On Sun, Mar 15, 2020 at 7:02 PM, Dongjoon Hyun < dongjoon. hyun@ gmail. com
>>> ( dongjoon.h...@gmail.com ) > wrote:
>>> 
>>>> Hi, Reynold.
>>>> 
>>>> 
>>>> Please see the following for the context.
>>>> 
>>>> 
>>>> https:/ / issues. apache. org/ jira/ browse/ SPARK-31136 (
>>>> https://issues.apache.org/jira/browse/SPARK-31136 )
>>>> "Revert SPARK-30098 Use default datasource as provider for CREATE TABLE
>>>> syntax"
>>>> 
>>>> 
>>>> I raised the above issue according to the new rubric, and the banning was
>>>> the proposed alternative to reduce the potential issue.
>>>> 
>>>> 
>>>> Please give us your opinion since it's still PR.
>>>> 
>>>> 
>>>> Bests,
>>>> Dongjoon.
>>>> 
>>>> On Sat, Mar 14, 2020 at 17:54 Reynold Xin < rxin@ databricks. com (
>>>> r...@databricks.com ) > wrote:
>>>> 
>>>> 
>>>>> I don’t understand this change. Wouldn’t this “ban” confuse the hell out
>>>>> of both new and old users?
>>>>> 
>>>>> 
>>>>> For old users, their old code that was working for char(3) would now stop
>>>>> working. 
>>>>> 
>>>>> 
>>>>> For new users, depending on whether the underlying metastore char(3) is
>>>>> either supported but different from ansi Sql (which is not that big of a
>>>>> deal if we explain it) or not supported. 
>>>>> 
>>>>> On Sat, Mar 14, 2020 at 3:51 PM Dongjoon Hyun < dongjoon. hyun@ gmail. com
>>>>> ( dongjoon.h...@gmail.com ) > wrote:
>>>>> 
>>>>> 
>>>>>> Hi, All.
>>>>>> 
>>>>>> Apache Spark has been suffered from a known consistency issue on `CHAR`
>>>>>> type behavio

Re: FYI: The evolution on `CHAR` type behavior

2020-03-15 Thread Reynold Xin
Are we sure "not padding" is "incorrect"?

I don't know whether ANSI SQL actually requires padding, but plenty of 
databases don't actually pad.

https://docs.snowflake.net/manuals/sql-reference/data-types-text.html ( 
https://docs.snowflake.net/manuals/sql-reference/data-types-text.html#:~:text=CHAR%20%2C%20CHARACTER,(1)%20is%20the%20default.=Snowflake%20currently%20deviates%20from%20common,space%2Dpadded%20at%20the%20end.
 ) : "Snowflake currently deviates from common CHAR semantics in that strings 
shorter than the maximum length are not space-padded at the end."

MySQL: 
https://stackoverflow.com/questions/53528645/why-char-dont-have-padding-in-mysql

On Sun, Mar 15, 2020 at 7:02 PM, Dongjoon Hyun < dongjoon.h...@gmail.com > 
wrote:

> 
> Hi, Reynold.
> 
> 
> Please see the following for the context.
> 
> 
> https:/ / issues. apache. org/ jira/ browse/ SPARK-31136 (
> https://issues.apache.org/jira/browse/SPARK-31136 )
> "Revert SPARK-30098 Use default datasource as provider for CREATE TABLE
> syntax"
> 
> 
> I raised the above issue according to the new rubric, and the banning was
> the proposed alternative to reduce the potential issue.
> 
> 
> Please give us your opinion since it's still PR.
> 
> 
> Bests,
> Dongjoon.
> 
> On Sat, Mar 14, 2020 at 17:54 Reynold Xin < rxin@ databricks. com (
> r...@databricks.com ) > wrote:
> 
> 
>> I don’t understand this change. Wouldn’t this “ban” confuse the hell out
>> of both new and old users?
>> 
>> 
>> For old users, their old code that was working for char(3) would now stop
>> working. 
>> 
>> 
>> For new users, depending on whether the underlying metastore char(3) is
>> either supported but different from ansi Sql (which is not that big of a
>> deal if we explain it) or not supported. 
>> 
>> On Sat, Mar 14, 2020 at 3:51 PM Dongjoon Hyun < dongjoon. hyun@ gmail. com
>> ( dongjoon.h...@gmail.com ) > wrote:
>> 
>> 
>>> Hi, All.
>>> 
>>> Apache Spark has been suffered from a known consistency issue on `CHAR`
>>> type behavior among its usages and configurations. However, the evolution
>>> direction has been gradually moving forward to be consistent inside Apache
>>> Spark because we don't have `CHAR` offically. The following is the
>>> summary.
>>> 
>>> With 1.6.x ~ 2.3.x, `STORED PARQUET` has the following different result.
>>> (`spark.sql.hive.convertMetastoreParquet=false` provides a fallback to
>>> Hive behavior.)
>>> 
>>>     spark-sql> CREATE TABLE t1(a CHAR(3));
>>>     spark-sql> CREATE TABLE t2(a CHAR(3)) STORED AS ORC;
>>>     spark-sql> CREATE TABLE t3(a CHAR(3)) STORED AS PARQUET;
>>> 
>>>     spark-sql> INSERT INTO TABLE t1 SELECT 'a ';
>>>     spark-sql> INSERT INTO TABLE t2 SELECT 'a ';
>>>     spark-sql> INSERT INTO TABLE t3 SELECT 'a ';
>>> 
>>>     spark-sql> SELECT a, length(a) FROM t1;
>>>     a   3
>>>     spark-sql> SELECT a, length(a) FROM t2;
>>>     a   3
>>>     spark-sql> SELECT a, length(a) FROM t3;
>>>     a 2
>>> 
>>> Since 2.4.0, `STORED AS ORC` became consistent.
>>> (`spark.sql.hive.convertMetastoreOrc=false` provides a fallback to Hive
>>> behavior.)
>>> 
>>>     spark-sql> SELECT a, length(a) FROM t1;
>>>     a   3
>>>     spark-sql> SELECT a, length(a) FROM t2;
>>>     a 2
>>>     spark-sql> SELECT a, length(a) FROM t3;
>>>     a 2
>>> 
>>> Since 3.0.0-preview2, `CREATE TABLE` (without `STORED AS` clause) became
>>> consistent.
>>> (`spark.sql.legacy.createHiveTableByDefault.enabled=true` provides a
>>> fallback to Hive behavior.)
>>> 
>>>     spark-sql> SELECT a, length(a) FROM t1;
>>>     a 2
>>>     spark-sql> SELECT a, length(a) FROM t2;
>>>     a 2
>>>     spark-sql> SELECT a, length(a) FROM t3;
>>>     a 2
>>> 
>>> In addition, in 3.0.0, SPARK-31147 aims to ban `CHAR/VARCHAR` type in the
>>> following syntax to be safe.
>>> 
>>>     CREATE TABLE t(a CHAR(3));
>>>    https:/ / github. com/ apache/ spark/ pull/ 27902 (
>>> https://github.com/apache/spark/pull/27902 )
>>> 
>>> This email is sent out to inform you based on the new policy we voted.
>>> The recommendation is always using Apache Spark's native type `String`.
>>> 
>>> Bests,
>>> Dongjoon.
>>> 
>>> References:
>>> 1. "CHAR implementation?", 2017/09/15
>>>      https:/ / lists. apache. org/ thread. html/ 
>>> 96b004331d9762e356053b5c8c97e953e398e489d15e1b49e775702f%40%3Cdev.
>>> spark. apache. org%3E (
>>> https://lists.apache.org/thread.html/96b004331d9762e356053b5c8c97e953e398e489d15e1b49e775702f%40%3Cdev.spark.apache.org%3E
>>> )
>>> 2. "FYI: SPARK-30098 Use default datasource as provider for CREATE TABLE
>>> syntax", 2019/12/06
>>>    https:/ / lists. apache. org/ thread. html/ 
>>> 493f88c10169680191791f9f6962fd16cd0ffa3b06726e92ed04cbe1%40%3Cdev.
>>> spark. apache. org%3E (
>>> https://lists.apache.org/thread.html/493f88c10169680191791f9f6962fd16cd0ffa3b06726e92ed04cbe1%40%3Cdev.spark.apache.org%3E
>>> )
>>> 
>> 
>> 
> 
>

smime.p7s
Description: S/MIME Cryptographic Signature


Re: FYI: The evolution on `CHAR` type behavior

2020-03-14 Thread Reynold Xin
I don’t understand this change. Wouldn’t this “ban” confuse the hell out of
both new and old users?

For old users, their old code that was working for char(3) would now stop
working.

For new users, depending on whether the underlying metastore char(3) is
either supported but different from ansi Sql (which is not that big of a
deal if we explain it) or not supported.

On Sat, Mar 14, 2020 at 3:51 PM Dongjoon Hyun 
wrote:

> Hi, All.
>
> Apache Spark has been suffered from a known consistency issue on `CHAR`
> type behavior among its usages and configurations. However, the evolution
> direction has been gradually moving forward to be consistent inside Apache
> Spark because we don't have `CHAR` offically. The following is the summary.
>
> With 1.6.x ~ 2.3.x, `STORED PARQUET` has the following different result.
> (`spark.sql.hive.convertMetastoreParquet=false` provides a fallback to
> Hive behavior.)
>
> spark-sql> CREATE TABLE t1(a CHAR(3));
> spark-sql> CREATE TABLE t2(a CHAR(3)) STORED AS ORC;
> spark-sql> CREATE TABLE t3(a CHAR(3)) STORED AS PARQUET;
>
> spark-sql> INSERT INTO TABLE t1 SELECT 'a ';
> spark-sql> INSERT INTO TABLE t2 SELECT 'a ';
> spark-sql> INSERT INTO TABLE t3 SELECT 'a ';
>
> spark-sql> SELECT a, length(a) FROM t1;
> a   3
> spark-sql> SELECT a, length(a) FROM t2;
> a   3
> spark-sql> SELECT a, length(a) FROM t3;
> a 2
>
> Since 2.4.0, `STORED AS ORC` became consistent.
> (`spark.sql.hive.convertMetastoreOrc=false` provides a fallback to Hive
> behavior.)
>
> spark-sql> SELECT a, length(a) FROM t1;
> a   3
> spark-sql> SELECT a, length(a) FROM t2;
> a 2
> spark-sql> SELECT a, length(a) FROM t3;
> a 2
>
> Since 3.0.0-preview2, `CREATE TABLE` (without `STORED AS` clause) became
> consistent.
> (`spark.sql.legacy.createHiveTableByDefault.enabled=true` provides a
> fallback to Hive behavior.)
>
> spark-sql> SELECT a, length(a) FROM t1;
> a 2
> spark-sql> SELECT a, length(a) FROM t2;
> a 2
> spark-sql> SELECT a, length(a) FROM t3;
> a 2
>
> In addition, in 3.0.0, SPARK-31147 aims to ban `CHAR/VARCHAR` type in the
> following syntax to be safe.
>
> CREATE TABLE t(a CHAR(3));
> https://github.com/apache/spark/pull/27902
>
> This email is sent out to inform you based on the new policy we voted.
> The recommendation is always using Apache Spark's native type `String`.
>
> Bests,
> Dongjoon.
>
> References:
> 1. "CHAR implementation?", 2017/09/15
>
> https://lists.apache.org/thread.html/96b004331d9762e356053b5c8c97e953e398e489d15e1b49e775702f%40%3Cdev.spark.apache.org%3E
> 2. "FYI: SPARK-30098 Use default datasource as provider for CREATE TABLE
> syntax", 2019/12/06
>
> https://lists.apache.org/thread.html/493f88c10169680191791f9f6962fd16cd0ffa3b06726e92ed04cbe1%40%3Cdev.spark.apache.org%3E
>


smime.p7s
Description: S/MIME Cryptographic Signature


Re: Collections passed from driver to executors

2019-09-23 Thread Reynold Xin
It's was done 2014 by yours truly https://github.com/apache/spark/pull/1498

so any modern version would have it.

On Mon, Sep 23, 2019 at 9:04 PM, Dhrubajyoti Hati < dhruba.w...@gmail.com > 
wrote:

> 
> Thanks. Could you please let me know which version of spark its changed.
> We are still at 2.2.
> 
> On Tue, 24 Sep, 2019, 9:17 AM Reynold Xin, < rxin@ databricks. com (
> r...@databricks.com ) > wrote:
> 
> 
>> A while ago we changed it so the task gets broadcasted too, so I think the
>> two are fairly similar.
>> 
>> 
>> 
>> 
>> 
>> 
>> On Mon, Sep 23, 2019 at 8:17 PM, Dhrubajyoti Hati < dhruba. work@ gmail. com
>> ( dhruba.w...@gmail.com ) > wrote:
>> 
>>> I was wondering if anyone could help with this question.
>>> 
>>> On Fri, 20 Sep, 2019, 11:52 AM Dhrubajyoti Hati, < dhruba. work@ gmail. com
>>> ( dhruba.w...@gmail.com ) > wrote:
>>> 
>>> 
>>>> Hi,
>>>> 
>>>> 
>>>> I have a question regarding passing a dictionary from driver to executors
>>>> in spark on yarn. This dictionary is needed in an udf. I am using pyspark.
>>>> 
>>>> 
>>>> As I understand this can be passed in two ways:
>>>> 
>>>> 
>>>> 1. Broadcast the variable and then use it in the udfs
>>>> 
>>>> 
>>>> 2. Pass the dictionary in the udf itself, in something like this:
>>>> 
>>>> 
>>>>   def udf1(col1, dict):
>>>>    ..
>>>>   def udf 1 _ fn (dict):
>>>>     return udf(lambda col_ data : udf1( col_data, dict ))
>>>> 
>>>> 
>>>>   df.withColumn("column_new", udf 1 _ fn (dict)("old_column"))
>>>> 
>>>> 
>>>> Well I have tested with both the ways and it works both ways.
>>>> 
>>>> 
>>>> Now I am wondering what is fundamentally different between the two. I
>>>> understand how broadcast work but I am not sure how the data is passed
>>>> across in the 2nd way. Is the dictionary passed to each executor every
>>>> time when new task is running on that executor or they are passed only
>>>> once. Also how the data is passed to the python processes. They are python
>>>> udfs so I think they are executed natively in python.(Plz correct me if I
>>>> am wrong). So the data will be serialised and passed to python.
>>>> 
>>>> So in summary my question is which will be better/efficient way to write
>>>> the whole thing and why?
>>>> 
>>>> 
>>>> Thank you!
>>>> 
>>>> 
>>>> R egards,
>>>> Dhrub
>>>> 
>>> 
>>> 
>> 
>> 
> 
>

Re: Collections passed from driver to executors

2019-09-23 Thread Reynold Xin
A while ago we changed it so the task gets broadcasted too, so I think the two 
are fairly similar.

On Mon, Sep 23, 2019 at 8:17 PM, Dhrubajyoti Hati < dhruba.w...@gmail.com > 
wrote:

> 
> I was wondering if anyone could help with this question.
> 
> On Fri, 20 Sep, 2019, 11:52 AM Dhrubajyoti Hati, < dhruba. work@ gmail. com
> ( dhruba.w...@gmail.com ) > wrote:
> 
> 
>> Hi,
>> 
>> 
>> I have a question regarding passing a dictionary from driver to executors
>> in spark on yarn. This dictionary is needed in an udf. I am using pyspark.
>> 
>> 
>> As I understand this can be passed in two ways:
>> 
>> 
>> 1. Broadcast the variable and then use it in the udfs
>> 
>> 
>> 2. Pass the dictionary in the udf itself, in something like this:
>> 
>> 
>>   def udf1(col1, dict):
>>    ..
>>   def udf 1 _ fn (dict):
>>     return udf(lambda col_ data : udf1( col_data, dict ))
>> 
>> 
>>   df.withColumn("column_new", udf 1 _ fn (dict)("old_column"))
>> 
>> 
>> Well I have tested with both the ways and it works both ways.
>> 
>> 
>> Now I am wondering what is fundamentally different between the two. I
>> understand how broadcast work but I am not sure how the data is passed
>> across in the 2nd way. Is the dictionary passed to each executor every
>> time when new task is running on that executor or they are passed only
>> once. Also how the data is passed to the python processes. They are python
>> udfs so I think they are executed natively in python.(Plz correct me if I
>> am wrong). So the data will be serialised and passed to python.
>> 
>> So in summary my question is which will be better/efficient way to write
>> the whole thing and why?
>> 
>> 
>> Thank you!
>> 
>> 
>> R egards,
>> Dhrub
>> 
> 
>

Re: Help: What's the biggest length of SQL that's supported in SparkSQL?

2019-07-12 Thread Reynold Xin
No sorry I'm not at liberty to share other people's code.

On Fri, Jul 12, 2019 at 9:33 AM, Gourav Sengupta < gourav.sengu...@gmail.com > 
wrote:

> 
> Hi Reynold,
> 
> 
> I am genuinely curious about queries which are more than 1 MB and am
> stunned by tens of MB's. Any samples to share :) 
> 
> 
> Regards,
> Gourav
> 
> On Thu, Jul 11, 2019 at 5:03 PM Reynold Xin < rxin@ databricks. com (
> r...@databricks.com ) > wrote:
> 
> 
>> There is no explicit limit but a JVM string cannot be bigger than 2G. It
>> will also at some point run out of memory with too big of a query plan
>> tree or become incredibly slow due to query planning complexity. I've seen
>> queries that are tens of MBs in size.
>> 
>> 
>> 
>> 
>> 
>> 
>> On Thu, Jul 11, 2019 at 5:01 AM, 李书明 < alemmontree@ 126. com (
>> alemmont...@126.com ) > wrote:
>> 
>>> I have a question about the limit(biggest) of SQL's length that is
>>> supported in SparkSQL. I can't find the answer in the documents of Spark.
>>> 
>>> 
>>> Maybe Interger.MAX_VALUE or not ?
>>> 
>> 
>> 
> 
>

Re: Problems running TPC-H on Raspberry Pi Cluster

2019-07-11 Thread Reynold Xin
I don't think Spark is meant to run with 1GB of memory on the entire system. 
The JVM loads almost 200MB of bytecode, and each page during query processing 
takes a min of 64MB.

Maybe on the 4GB model of raspberry pi 4.

On Wed, Jul 10, 2019 at 7:57 AM, agg212 < alexander_galaka...@brown.edu > wrote:

> 
> 
> 
> We are trying to benchmark TPC-H (scale factor 1) on a 13-node Raspberry
> Pi 3B+ cluster (1 master, 12 workers). Each node has 1GB of RAM and a
> quad-core processor, running Ubuntu Server 18.04. The cluster is using the
> Spark standalone scheduler with the *.tbl files from TPCH’s dbgen tool
> stored in HDFS.
> 
> 
> 
> We are experiencing several failures when trying to run queries. Jobs fail
> unpredictably, usually with one or many “DEAD/LOST” nodes displaying in
> the web UI. It appears that one or more nodes “hang” during query
> execution and become unreachable/timeout.
> 
> 
> 
> We have included our configuration parameters as well as the driver
> program below. Any recommendations would be greatly appreciated
> 
> 
> 
> ---
> 
> 
> 
> ---
> 
> 
> 
> Driver:
> ---
> 
> 
> 
> --
> Sent from: http:/ / apache-spark-user-list. 1001560. n3. nabble. com/ (
> http://apache-spark-user-list.1001560.n3.nabble.com/ )
> 
> 
> 
> - To
> unsubscribe e-mail: user-unsubscribe@ spark. apache. org (
> user-unsubscr...@spark.apache.org )
> 
> 
>

Re: Help: What's the biggest length of SQL that's supported in SparkSQL?

2019-07-11 Thread Reynold Xin
There is no explicit limit but a JVM string cannot be bigger than 2G. It will 
also at some point run out of memory with too big of a query plan tree or 
become incredibly slow due to query planning complexity. I've seen queries that 
are tens of MBs in size.

On Thu, Jul 11, 2019 at 5:01 AM, 李书明 < alemmont...@126.com > wrote:

> 
> I have a question about the limit(biggest) of SQL's length that is
> supported in SparkSQL. I can't find the answer in the documents of Spark.
> 
> 
> Maybe Interger.MAX_VALUE or not ?
> 
> 
> 
>

Re: Exposing JIRA issue types at GitHub PRs

2019-06-13 Thread Reynold Xin
Seems like a good idea. Can we test this with a component first?

On Thu, Jun 13, 2019 at 6:17 AM Dongjoon Hyun 
wrote:

> Hi, All.
>
> Since we use both Apache JIRA and GitHub actively for Apache Spark
> contributions, we have lots of JIRAs and PRs consequently. One specific
> thing I've been longing to see is `Jira Issue Type` in GitHub.
>
> How about exposing JIRA issue types at GitHub PRs as GitHub `Labels`?
> There are two main benefits:
> 1. It helps the communication between the contributors and reviewers with
> more information.
> (In some cases, some people only visit GitHub to see the PR and
> commits)
> 2. `Labels` is searchable. We don't need to visit Apache Jira to search
> PRs to see a specific type.
> (For example, the reviewers can see and review 'BUG' PRs first by
> using `is:open is:pr label:BUG`.)
>
> Of course, this can be done automatically without human intervention.
> Since we already have GitHub Jenkins job to access JIRA/GitHub, that job
> can add the labels from the beginning. If needed, I can volunteer to update
> the script.
>
> To show the demo, I labeled several PRs manually. You can see the result
> right now in Apache Spark PR page.
>
>   - https://github.com/apache/spark/pulls
>
> If you're surprised due to those manual activities, I want to apologize
> for that. I hope we can take advantage of the existing GitHub features to
> serve Apache Spark community in a way better than yesterday.
>
> How do you think about this specific suggestion?
>
> Bests,
> Dongjoon
>
> PS. I saw that `Request Review` and `Assign` features are already used for
> some purposes, but these feature are out of the scope in this email.
>


Re: Should python-2 be supported in Spark 3.0?

2019-05-30 Thread Reynold Xin
+1 on Xiangrui’s plan.

On Thu, May 30, 2019 at 7:55 AM shane knapp  wrote:

> I don't have a good sense of the overhead of continuing to support
>> Python 2; is it large enough to consider dropping it in Spark 3.0?
>>
>> from the build/test side, it will actually be pretty easy to continue
> support for python2.7 for spark 2.x as the feature sets won't be expanding.
>
> that being said, i will be cracking a bottle of champagne when i can
> delete all of the ansible and anaconda configs for python2.x.  :)
>
> shane
> --
> Shane Knapp
> UC Berkeley EECS Research / RISELab Staff Technical Lead
> https://rise.cs.berkeley.edu
>


Re: Koalas show data in IDE or pyspark

2019-05-14 Thread Reynold Xin
This has been fixed and was included in the release 0.3 last week. We will be 
making another release (0.4) in the next 24 hours to include more features also.

On Tue, Apr 30, 2019 at 12:42 AM, Manu Zhang < owenzhang1...@gmail.com > wrote:

> 
> Hi,
> 
> 
> It seems koalas.DataFrame can't be displayed in terminal yet as in https:/
> / github. com/ databricks/ koalas/ issues/ 150 (
> https://github.com/databricks/koalas/issues/150 ) and the work around is 
> to convert it to pandas DataFrame.
> 
> 
> Thanks,
> Manu Zhang
> 
> On Tue, Apr 30, 2019 at 2:46 PM Achilleus 003 < achilleus003@ gmail. com (
> achilleus...@gmail.com ) > wrote:
> 
> 
>> Hello Everyone,
>> 
>> 
>> I have been trying to run *koalas* on both pyspark and pyCharm IDE. 
>> 
>> 
>> When I run 
>> 
>> 
>> df = koalas.DataFrame({‘x’: [1, 2], ‘y’: [3, 4], ‘z’: [5, 6]})
>> 
>> df.head(5)
>> 
>> 
>> 
>> I don't get the data back instead, I get an object.
>> 
>> 
>> 
>> 
>> I thought df.head can be used to achieve this.
>> 
>> 
>> Can anyone guide me on how we can print something on the terminal?
>> Something similar to df. show ( http://df.show/ ) () in spark.
>> 
> 
>

Re: [HELP WANTED] Apache Zipkin (incubating) needs Spark gurus

2019-03-21 Thread Reynold Xin
Are there specific questions you have? Might be easier to post them here
also.

On Wed, Mar 20, 2019 at 5:16 PM Andriy Redko  wrote:

> Hello Dear Spark Community!
>
> The hyper-popularity of the Apache Spark made it a de-facto choice for many
> projects which need some sort of data processing capabilities. One of
> those is
> Zipkin, currenly incubating at Apache [1], the widespread distributed
> tracing framework.
> The small amazing team behind the project maintains around ~40 different
> integrations
> and components, including the [2], a set of Spark jobs to reconstruct over
> time the
> service dependency graphs from the collected traces. The current
> maintainers are not
> yet savvy on Spark and the team really struggles to address the ongoing
> ussues and
> answer user questions. For example, users are reporting concerns about job
> distribution
> which the Zipkin team doesn't know how to answer. It is really difficult
> to keep this
> particular component up and running due to the lack of the Spark
> expertise.
>
> Thereby this message to the community, anyone could be / is interested in
> distributed
> tracing (fascinating development by itself!) to the point to step in, help
> with Spark
> expertise and contribute? Please feel free to reach out, Gitter @ [3]
> or d...@zipkin.apache.org!
>
> [1] http://incubator.apache.org/projects/zipkin.html
> [2] https://github.com/openzipkin/zipkin-dependencies
> [3] https://gitter.im/openzipkin/zipkin
>
> Best Regards,
> Apache Zipkin (incubating) Team
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: [PySpark] Revisiting PySpark type annotations

2019-01-25 Thread Reynold Xin
If we can make the annotation compatible with Python 2, why don’t we add
type annotation to make life easier for users of Python 3 (with type)?

On Fri, Jan 25, 2019 at 7:53 AM Maciej Szymkiewicz 
wrote:

>
> Hello everyone,
>
> I'd like to revisit the topic of adding PySpark type annotations in 3.0.
> It has been discussed before (
> http://apache-spark-developers-list.1001551.n3.nabble.com/Python-friendly-API-for-Spark-3-0-td25016.html
> and
> http://apache-spark-developers-list.1001551.n3.nabble.com/PYTHON-PySpark-typing-hints-td21560.html)
> and is tracked by SPARK-17333 (
> https://issues.apache.org/jira/browse/SPARK-17333). Is there any
> consensus here?
>
> In the spirit of full disclosure I am trying to decide if, and if yes to
> what extent, migrate my stub package (
> https://github.com/zero323/pyspark-stubs) to 3.0 and beyond. Maintaining
> such package is relatively time consuming (not being active PySpark user
> anymore, it is the least priority for me at the moment) and if there any
> official plans to make it obsolete, it would be a valuable information for
> me.
>
> If there are no plans to add native annotations to PySpark, I'd like to
> use this opportunity to ask PySpark commiters, to drop by and open issue (
> https://github.com/zero323/pyspark-stubs/issues)  when new methods are
> introduced, or there are changes in the existing API (PR's are of course
> welcomed as well). Thanks in advance.
>
> --
> Best,
> Maciej
>
>
>


Re: [ANNOUNCE] Announcing Apache Spark 2.4.0

2018-11-08 Thread Reynold Xin
Do you have a cached copy? I see it here

http://spark.apache.org/downloads.html



On Thu, Nov 8, 2018 at 4:12 PM Li Gao  wrote:

> this is wonderful !
> I noticed the official spark download site does not have 2.4 download
> links yet.
>
> On Thu, Nov 8, 2018, 4:11 PM Swapnil Shinde  wrote:
>
>> Great news.. thank you very much!
>>
>> On Thu, Nov 8, 2018, 5:19 PM Stavros Kontopoulos <
>> stavros.kontopou...@lightbend.com wrote:
>>
>>> Awesome!
>>>
>>> On Thu, Nov 8, 2018 at 9:36 PM, Jules Damji  wrote:
>>>
 Indeed!

 Sent from my iPhone
 Pardon the dumb thumb typos :)

 On Nov 8, 2018, at 11:31 AM, Dongjoon Hyun 
 wrote:

 Finally, thank you all. Especially, thanks to the release manager,
 Wenchen!

 Bests,
 Dongjoon.


 On Thu, Nov 8, 2018 at 11:24 AM Wenchen Fan 
 wrote:

> + user list
>
> On Fri, Nov 9, 2018 at 2:20 AM Wenchen Fan 
> wrote:
>
>> resend
>>
>> On Thu, Nov 8, 2018 at 11:02 PM Wenchen Fan 
>> wrote:
>>
>>>
>>>
>>> -- Forwarded message -
>>> From: Wenchen Fan 
>>> Date: Thu, Nov 8, 2018 at 10:55 PM
>>> Subject: [ANNOUNCE] Announcing Apache Spark 2.4.0
>>> To: Spark dev list 
>>>
>>>
>>> Hi all,
>>>
>>> Apache Spark 2.4.0 is the fifth release in the 2.x line. This
>>> release adds Barrier Execution Mode for better integration with deep
>>> learning frameworks, introduces 30+ built-in and higher-order functions 
>>> to
>>> deal with complex data type easier, improves the K8s integration, along
>>> with experimental Scala 2.12 support. Other major updates include the
>>> built-in Avro data source, Image data source, flexible streaming sinks,
>>> elimination of the 2GB block size limitation during transfer, Pandas UDF
>>> improvements. In addition, this release continues to focus on usability,
>>> stability, and polish while resolving around 1100 tickets.
>>>
>>> We'd like to thank our contributors and users for their
>>> contributions and early feedback to this release. This release would not
>>> have been possible without you.
>>>
>>> To download Spark 2.4.0, head over to the download page:
>>> http://spark.apache.org/downloads.html
>>>
>>> To view the release notes:
>>> https://spark.apache.org/releases/spark-release-2-4-0.html
>>>
>>> Thanks,
>>> Wenchen
>>>
>>> PS: If you see any issues with the release notes, webpage or
>>> published artifacts, please contact me directly off-list.
>>>
>>
>>>
>>>
>>>
>>>


Re: Back to SQL

2018-10-03 Thread Reynold Xin
No we used to have that (for views) but it wasn’t working well enough so we
removed it.

On Wed, Oct 3, 2018 at 6:41 PM Olivier Girardot <
o.girar...@lateral-thoughts.com> wrote:

> Hi everyone,
> Is there any known way to go from a Spark SQL Logical Plan (optimised ?)
> Back to a SQL query ?
>
> Regards,
>
> Olivier.
>
-- 
--
excuse the brevity and lower case due to wrist injury


Re: Should python-2 be supported in Spark 3.0?

2018-09-17 Thread Reynold Xin
i'd like to second that.

if we want to communicate timeline, we can add to the release notes saying
py2 will be deprecated in 3.0, and removed in a 3.x release.

--
excuse the brevity and lower case due to wrist injury


On Mon, Sep 17, 2018 at 4:24 PM Matei Zaharia 
wrote:

> That’s a good point — I’d say there’s just a risk of creating a perception
> issue. First, some users might feel that this means they have to migrate
> now, which is before Python itself drops support; they might also be
> surprised that we did this in a minor release (e.g. might we drop Python 2
> altogether in a Spark 2.5 if that later comes out?). Second, contributors
> might feel that this means new features no longer have to work with Python
> 2, which would be confusing. Maybe it’s OK on both fronts, but it just
> seems scarier for users to do this now if we do plan to have Spark 3.0 in
> the next 6 months anyway.
>
> Matei
>
> > On Sep 17, 2018, at 1:04 PM, Mark Hamstra 
> wrote:
> >
> > What is the disadvantage to deprecating now in 2.4.0? I mean, it doesn't
> change the code at all; it's just a notification that we will eventually
> cease supporting Py2. Wouldn't users prefer to get that notification sooner
> rather than later?
> >
> > On Mon, Sep 17, 2018 at 12:58 PM Matei Zaharia 
> wrote:
> > I’d like to understand the maintenance burden of Python 2 before
> deprecating it. Since it is not EOL yet, it might make sense to only
> deprecate it once it’s EOL (which is still over a year from now).
> Supporting Python 2+3 seems less burdensome than supporting, say, multiple
> Scala versions in the same codebase, so what are we losing out?
> >
> > The other thing is that even though Python core devs might not support
> 2.x later, it’s quite possible that various Linux distros will if moving
> from 2 to 3 remains painful. In that case, we may want Apache Spark to
> continue releasing for it despite the Python core devs not supporting it.
> >
> > Basically, I’d suggest to deprecate this in Spark 3.0 and then remove it
> later in 3.x instead of deprecating it in 2.4. I’d also consider looking at
> what other data science tools are doing before fully removing it: for
> example, if Pandas and TensorFlow no longer support Python 2 past some
> point, that might be a good point to remove it.
> >
> > Matei
> >
> > > On Sep 17, 2018, at 11:01 AM, Mark Hamstra 
> wrote:
> > >
> > > If we're going to do that, then we need to do it right now, since
> 2.4.0 is already in release candidates.
> > >
> > > On Mon, Sep 17, 2018 at 10:57 AM Erik Erlandson 
> wrote:
> > > I like Mark’s concept for deprecating Py2 starting with 2.4: It may
> seem like a ways off but even now there may be some spark versions
> supporting Py2 past the point where Py2 is no longer receiving security
> patches
> > >
> > >
> > > On Sun, Sep 16, 2018 at 12:26 PM Mark Hamstra 
> wrote:
> > > We could also deprecate Py2 already in the 2.4.0 release.
> > >
> > > On Sat, Sep 15, 2018 at 11:46 AM Erik Erlandson 
> wrote:
> > > In case this didn't make it onto this thread:
> > >
> > > There is a 3rd option, which is to deprecate Py2 for Spark-3.0, and
> remove it entirely on a later 3.x release.
> > >
> > > On Sat, Sep 15, 2018 at 11:09 AM, Erik Erlandson 
> wrote:
> > > On a separate dev@spark thread, I raised a question of whether or not
> to support python 2 in Apache Spark, going forward into Spark 3.0.
> > >
> > > Python-2 is going EOL at the end of 2019. The upcoming release of
> Spark 3.0 is an opportunity to make breaking changes to Spark's APIs, and
> so it is a good time to consider support for Python-2 on PySpark.
> > >
> > > Key advantages to dropping Python 2 are:
> > >   • Support for PySpark becomes significantly easier.
> > >   • Avoid having to support Python 2 until Spark 4.0, which is
> likely to imply supporting Python 2 for some time after it goes EOL.
> > > (Note that supporting python 2 after EOL means, among other things,
> that PySpark would be supporting a version of python that was no longer
> receiving security patches)
> > >
> > > The main disadvantage is that PySpark users who have legacy python-2
> code would have to migrate their code to python 3 to take advantage of
> Spark 3.0
> > >
> > > This decision obviously has large implications for the Apache Spark
> community and we want to solicit community feedback.
> > >
> > >
> >
>
>


Re: Fw:multiple group by action

2018-08-24 Thread Reynold Xin
Use rollout and cube.

On Fri, Aug 24, 2018 at 7:55 PM 崔苗  wrote:

>
>
>
>
>
>
>  Forwarding messages 
> From: "崔苗" 
> Date: 2018-08-25 10:54:31
> To: d...@spark.apache.org
> Subject: multiple group by action
>
> Hi,
> we have some user data with
> columns(userId,company,client,country,region,city),
> now we want to count userId by multiple column,such as :
> select count(distinct userId) group by company
> select count(distinct userId) group by company,client
> select count(distinct userId) group by company,client,country
> select count(distinct userId) group by company,client,country,region
> etc
> so each action will bring a shuffle stage, as for columns( company,client)
> contain column company,
> Is there a way to reduce shuffle stage?
>
> Thanks for any replys
>
>
>
>


Re: Anyone knows how to build and spark on jdk9?

2017-10-26 Thread Reynold Xin
It probably depends on the Scala version we use in Spark supporting Java 9
first.

On Thu, Oct 26, 2017 at 7:22 PM Zhang, Liyun  wrote:

> Hi all:
>
> 1.   I want to build spark on jdk9 and test it with Hadoop on jdk9
> env. I search for jiras related to JDK9. I only found SPARK-13278
> .  This means now
> spark can build or run successfully on JDK9 ?
>
>
>
>
>
> Best Regards
>
> Kelly Zhang/Zhang,Liyun
>
>
>


Re: Dataset API Question

2017-10-25 Thread Reynold Xin
It is a bit more than syntactic sugar, but not much more:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L533

BTW this is basically writing all the data out, and then create a new
Dataset to load them in.


On Wed, Oct 25, 2017 at 6:51 AM, Bernard Jesop 
wrote:

> Hello everyone,
>
> I have a question about checkpointing on dataset.
>
> It seems in 2.1.0 that there is a Dataset.checkpoint(), however unlike RDD
> there is no Dataset.isCheckpointed().
>
> I wonder if Dataset.checkpoint is a syntactic sugar for
> Dataset.rdd.checkpoint.
> When I do :
>
> Dataset.checkpoint; Dataset.count
> Dataset.rdd.isCheckpointed // result: false
>
> However, when I explicitly do:
> Dataset.rdd.checkpoint; Dataset.rdd.count
> Dataset.rdd.isCheckpointed // result: true
>
> Could someone explain this behavior to me, or provide some references?
>
> Best regards,
> Bernard
>


Re: SQL specific documentation for recent Spark releases

2017-08-11 Thread Reynold Xin
This PR should help you in the next release:
https://github.com/apache/spark/pull/18702



On Thu, Aug 10, 2017 at 7:46 PM, Stephen Boesch  wrote:

>
> The correct link is  https://docs.databricks.com/
> spark/latest/spark-sql/index.html .
>
> This link does have the core syntax such as the BNF for the DDL and DML
> and SELECT.  It does *not *have a reference for  date / string / numeric
> functions: is there any such reference at this point?  It is not sufficient
> to peruse the DSL list of functions since the usage is different (and
> sometimes the names as well)  than from the DSL.
>
> thanks
> stephenb
>
> 2017-08-10 14:49 GMT-07:00 Jules Damji :
>
>> I refer to docs.databricks.com/Spark/latest/Spark-sql/index.html.
>>
>> Cheers
>> Jules
>>
>> Sent from my iPhone
>> Pardon the dumb thumb typos :)
>>
>> > On Aug 10, 2017, at 1:46 PM, Stephen Boesch  wrote:
>> >
>> >
>> > While the DataFrame/DataSets are useful in many circumstances they are
>> cumbersome for many types of complex sql queries.
>> >
>> > Is there an up to date *SQL* reference - i.e. not DataFrame DSL
>> operations - for version 2.2?
>> >
>> > An example of what is not clear:  what constructs are supported within
>> >
>> > select count( predicate) from some_table
>> >
>> > when using spark sql.
>> >
>> > But in general the reference guide and programming guide for SQL seems
>> to be difficult to locate - seemingly in favor of the DataFrame/DataSets.
>>
>>
>


Re: Question on Spark code

2017-07-23 Thread Reynold Xin
This is a standard practice used for chaining, to support

a.setStepSize(..)
  .set setRegParam(...)


On Sun, Jul 23, 2017 at 8:47 PM, tao zhan <zhanta...@gmail.com> wrote:

> Thank you for replying.
> But I do not get it completely, why does the "this.type“” necessary?
> why could not it be like:
>
> def setStepSize(step: Double): Unit = {
> require(step > 0,
>   s"Initial step size must be positive but got ${step}")
> this.stepSize = step
> }
>
> On Mon, Jul 24, 2017 at 11:29 AM, M. Muvaffak ONUŞ <
> onus.muvaf...@gmail.com> wrote:
>
>> Doesn't it mean the return type will be type of "this" class. So, it
>> doesn't have to be this instance of the class but it has to be type of this
>> instance of the class. When you have a stack of inheritance and call that
>> function, it will return the same type with the level that you called it.
>>
>> On Sun, Jul 23, 2017 at 8:20 PM Reynold Xin <r...@databricks.com> wrote:
>>
>>> It means the same object ("this") is returned.
>>>
>>> On Sun, Jul 23, 2017 at 8:16 PM, tao zhan <zhanta...@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> I am new to scala and spark.
>>>> What does the "this.type" in set function for?
>>>>
>>>>
>>>> ​
>>>> https://github.com/apache/spark/blob/481f0792944d9a77f0fe8b5
>>>> e2596da1d600b9d0a/mllib/src/main/scala/org/apache/spark/
>>>> mllib/optimization/GradientDescent.scala#L48
>>>>
>>>> Thanks!
>>>>
>>>> Zhan
>>>>
>>>
>>>
>


Re: Question on Spark code

2017-07-23 Thread Reynold Xin
It means the same object ("this") is returned.

On Sun, Jul 23, 2017 at 8:16 PM, tao zhan  wrote:

> Hello,
>
> I am new to scala and spark.
> What does the "this.type" in set function for?
>
>
> ​
> https://github.com/apache/spark/blob/481f0792944d9a77f0fe8b5e2596da
> 1d600b9d0a/mllib/src/main/scala/org/apache/spark/mllib/
> optimization/GradientDescent.scala#L48
>
> Thanks!
>
> Zhan
>


Re: the dependence length of RDD, can its size be greater than 1 pleaae?

2017-06-15 Thread Reynold Xin
A join?

On Thu, Jun 15, 2017 at 1:11 AM 萝卜丝炒饭 <1427357...@qq.com> wrote:

> Hi all,
>
> The RDD code keeps a member as below:
> dependencies_ : seq[Dependency[_]]
>
> It is a seq, that means it can keep more than one dependency.
>
> I have an issue about this.
> Is it possible that its size is greater than one please?
> If yes, how to produce it please? Would you like show me some code please?
>
> thanks
> Robin Shao
>


Re: is dataframe thread safe?

2017-02-13 Thread Reynold Xin
Yes your use case should be fine. Multiple threads can transform the same
data frame in parallel since they create different data frames.


On Sun, Feb 12, 2017 at 9:07 AM Mendelson, Assaf 
wrote:

> Hi,
>
> I was wondering if dataframe is considered thread safe. I know the spark
> session and spark context are thread safe (and actually have tools to
> manage jobs from different threads) but the question is, can I use the same
> dataframe in both threads.
>
> The idea would be to create a dataframe in the main thread and then in two
> sub threads do different transformations and actions on it.
>
> I understand that some things might not be thread safe (e.g. if I
> unpersist in one thread it would affect the other. Checkpointing would
> cause similar issues), however, I can’t find any documentation as to what
> operations (if any) are thread safe.
>
>
>
> Thanks,
>
> Assaf.
>


Re: Output Side Effects for different chain of operations

2016-12-15 Thread Reynold Xin
You can just write some files out directly (and idempotently) in your
map/mapPartitions functions. It is just a function that you can run
arbitrary code after all.


On Thu, Dec 15, 2016 at 11:33 AM, Chawla,Sumit 
wrote:

> Any suggestions on this one?
>
> Regards
> Sumit Chawla
>
>
> On Tue, Dec 13, 2016 at 8:31 AM, Chawla,Sumit 
> wrote:
>
>> Hi All
>>
>> I have a workflow with different steps in my program. Lets say these are
>> steps A, B, C, D.  Step B produces some temp files on each executor node.
>> How can i add another step E which consumes these files?
>>
>> I understand the easiest choice is  to copy all these temp files to any
>> shared location, and then step E can create another RDD from it and work on
>> that.  But i am trying to avoid this copy.  I was wondering if there is any
>> way i can queue up these files for E as they are getting generated on
>> executors.  Is there any possibility of creating a dummy RDD in start of
>> program, and then push these files into this RDD from each executor.
>>
>> I take my inspiration from the concept of Side Outputs in Google Dataflow:
>>
>> https://cloud.google.com/dataflow/model/par-do#emitting-to-
>> side-outputs-in-your-dofn
>>
>>
>>
>> Regards
>> Sumit Chawla
>>
>>
>


Re: Can't read tables written in Spark 2.1 in Spark 2.0 (and earlier)

2016-11-30 Thread Reynold Xin
This should fix it: https://github.com/apache/spark/pull/16080



On Wed, Nov 30, 2016 at 10:55 AM, Timur Shenkao  wrote:

> Hello,
>
> Yes, I used hiveContext, sqlContext, sparkSession from Java, Scala,
> Python.
> Via spark-shell, spark-submit, IDE (PyCharm, Intellij IDEA).
> Everything is perfect because I have Hadoop cluster with configured &
> tuned HIVE.
>
> The reason of Michael's error is usually misconfigured or absent HIVE.
> Or may be absence of hive-site.xml in $SPARK_HOME/conf/ directory.
>
> On Wed, Nov 30, 2016 at 9:30 PM, Gourav Sengupta <
> gourav.sengu...@gmail.com> wrote:
>
>> Hi Timur,
>>
>> did you use hiveContext or sqlContext or the spark way mentioned in the
>> http://spark.apache.org/docs/latest/sql-programming-guide.html?
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Wed, Nov 30, 2016 at 5:35 PM, Yin Huai  wrote:
>>
>>> Hello Michael,
>>>
>>> Thank you for reporting this issue. It will be fixed by
>>> https://github.com/apache/spark/pull/16080.
>>>
>>> Thanks,
>>>
>>> Yin
>>>
>>> On Tue, Nov 29, 2016 at 11:34 PM, Timur Shenkao 
>>> wrote:
>>>
 Hi!

 Do you have real HIVE installation?
 Have you built Spark 2.1 & Spark 2.0 with HIVE support ( -Phive
 -Phive-thriftserver ) ?

 It seems that you use "default" Spark's HIVE 1.2.1. Your metadata is
 stored in local Derby DB which is visible to concrete Spark installation
 but not for all.

 On Wed, Nov 30, 2016 at 4:51 AM, Michael Allman 
 wrote:

> This is not an issue with all tables created in Spark 2.1, though I'm
> not sure why some work and some do not. I have found that a table created
> as such
>
> sql("create table test stored as parquet as select 1")
>
> in Spark 2.1 cannot be read in previous versions of Spark.
>
> Michael
>
>
> > On Nov 29, 2016, at 5:15 PM, Michael Allman 
> wrote:
> >
> > Hello,
> >
> > When I try to read from a Hive table created by Spark 2.1 in Spark
> 2.0 or earlier, I get an error:
> >
> > java.lang.ClassNotFoundException: Failed to load class for data
> source: hive.
> >
> > Is there a way to get previous versions of Spark to read tables
> written with Spark 2.1?
> >
> > Cheers,
> >
> > Michael
>
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>

>>>
>>
>


Re: Bit-wise AND operation between integers

2016-11-28 Thread Reynold Xin
Bcc dev@ and add user@

The dev list is not meant for users to ask questions on how to use Spark.
For that you should use StackOverflow or the user@ list.


scala> sql("select 1 & 2").show()
+---+
|(1 & 2)|
+---+
|  0|
+---+


scala> sql("select 1 & 3").show()
+---+
|(1 & 3)|
+---+
|  1|
+---+


On November 28, 2016 at 9:40:45 PM, Nishadi Kirielle (ndime...@gmail.com)
wrote:

Hi all,

I am trying to use bitwise AND operation between integers on top of Spark
SQL. Is this functionality supported and if so, can I have any
documentation on how to use bitwise AND operation?

Thanks & regards

-- 
Nishadi Kirielle

Undergraduate
University of Moratuwa - Sri Lanka

Mobile : +94 70 204 5934
Blog : nishadikirielle.wordpress.com


Re: Third party library

2016-11-26 Thread Reynold Xin
That's just standard JNI and has nothing to do with Spark, does it?


On Sat, Nov 26, 2016 at 11:19 AM, vineet chadha <start.vin...@gmail.com>
wrote:

> Thanks Reynold for quick reply.
>
>  I have tried following:
>
> class MySimpleApp {
>  // ---Native methods
>   @native def fooMethod (foo: String): String
> }
>
> object MySimpleApp {
>   val flag = false
>   def loadResources() {
> System.loadLibrary("foo-C-library")
>   val flag = true
>   }
>   def main() {
> sc.parallelize(1 to 10).mapPartitions ( iter => {
>   if(flag == false){
>   MySimpleApp.loadResources()
>  val SimpleInstance = new MySimpleApp
>   }
>   SimpleInstance.fooMethod ("fooString")
>   iter
> })
>   }
> }
>
> I don't see way to invoke fooMethod which is implemented in foo-C-library.
> Is I am missing something ? If possible, can you point me to existing
> implementation which i can refer to.
>
> Thanks again.
>
> ~
>
> On Fri, Nov 25, 2016 at 3:32 PM, Reynold Xin <r...@databricks.com> wrote:
>
>> bcc dev@ and add user@
>>
>>
>> This is more a user@ list question rather than a dev@ list question. You
>> can do something like this:
>>
>> object MySimpleApp {
>>   def loadResources(): Unit = // define some idempotent way to load
>> resources, e.g. with a flag or lazy val
>>
>>   def main() = {
>> ...
>>
>> sc.parallelize(1 to 10).mapPartitions { iter =>
>>   MySimpleApp.loadResources()
>>
>>   // do whatever you want with the iterator
>> }
>>   }
>> }
>>
>>
>>
>>
>>
>> On Fri, Nov 25, 2016 at 2:33 PM, vineet chadha <start.vin...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I am trying to invoke C library from the Spark Stack using JNI interface
>>> (here is sample  application code)
>>>
>>>
>>> class SimpleApp {
>>>  // ---Native methods
>>> @native def foo (Top: String): String
>>> }
>>>
>>> object SimpleApp  {
>>>def main(args: Array[String]) {
>>>
>>> val conf = new SparkConf().setAppName("Simple
>>> Application").set("SPARK_LIBRARY_PATH", "lib")
>>> val sc = new SparkContext(conf)
>>>  System.loadLibrary("foolib")
>>> //instantiate the class
>>>  val SimpleAppInstance = new SimpleApp
>>> //String passing - Working
>>> val ret = SimpleAppInstance.foo("fooString")
>>>   }
>>>
>>> Above code work fines.
>>>
>>> I have setup LD_LIBRARY_PATH and spark.executor.extraClassPath,
>>> spark.executor.extraLibraryPath at worker node
>>>
>>> How can i invoke JNI library from worker node ? Where should i load it
>>> in executor ?
>>> Calling  System.loadLibrary("foolib") inside the work node gives me
>>> following error :
>>>
>>> Exception in thread "main" java.lang.UnsatisfiedLinkError:
>>>
>>> Any help would be really appreciated.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>


Re: Third party library

2016-11-25 Thread Reynold Xin
bcc dev@ and add user@


This is more a user@ list question rather than a dev@ list question. You
can do something like this:

object MySimpleApp {
  def loadResources(): Unit = // define some idempotent way to load
resources, e.g. with a flag or lazy val

  def main() = {
...

sc.parallelize(1 to 10).mapPartitions { iter =>
  MySimpleApp.loadResources()

  // do whatever you want with the iterator
}
  }
}





On Fri, Nov 25, 2016 at 2:33 PM, vineet chadha 
wrote:

> Hi,
>
> I am trying to invoke C library from the Spark Stack using JNI interface
> (here is sample  application code)
>
>
> class SimpleApp {
>  // ---Native methods
> @native def foo (Top: String): String
> }
>
> object SimpleApp  {
>def main(args: Array[String]) {
>
> val conf = new 
> SparkConf().setAppName("SimpleApplication").set("SPARK_LIBRARY_PATH",
> "lib")
> val sc = new SparkContext(conf)
>  System.loadLibrary("foolib")
> //instantiate the class
>  val SimpleAppInstance = new SimpleApp
> //String passing - Working
> val ret = SimpleAppInstance.foo("fooString")
>   }
>
> Above code work fines.
>
> I have setup LD_LIBRARY_PATH and spark.executor.extraClassPath,
> spark.executor.extraLibraryPath at worker node
>
> How can i invoke JNI library from worker node ? Where should i load it in
> executor ?
> Calling  System.loadLibrary("foolib") inside the work node gives me
> following error :
>
> Exception in thread "main" java.lang.UnsatisfiedLinkError:
>
> Any help would be really appreciated.
>
>
>
>
>
>
>
>
>
>
>
>
>


Re: Re: Re: Multiple streaming aggregations in structured streaming

2016-11-22 Thread Reynold Xin
It's just the "approx_count_distinct" aggregate function.


On Tue, Nov 22, 2016 at 6:51 PM, Xinyu Zhang <wsz...@163.com> wrote:

> Could you please tell me how to use the approximate count distinct? Is
> there any docs?
>
> Thanks
>
>
> At 2016-11-21 15:56:21, "Reynold Xin" <r...@databricks.com> wrote:
>
> Can you use the approximate count distinct?
>
>
> On Sun, Nov 20, 2016 at 11:51 PM, Xinyu Zhang <wsz...@163.com> wrote:
>
>>
>> MapWithState is also very useful.
>> I want to calculate UV in real time, but "distinct count" and "multiple
>> streaming aggregations" are not supported.
>> Is there any method to calculate real-time UV in the current version?
>>
>>
>>
>> At 2016-11-19 06:01:45, "Michael Armbrust" <mich...@databricks.com>
>> wrote:
>>
>> Doing this generally is pretty hard.  We will likely support algebraic
>> aggregate eventually, but this is not currently slotted for 2.2.  Instead I
>> think we will add something like mapWithState that lets users compute
>> arbitrary stateful things.  What is your use case?
>>
>>
>> On Wed, Nov 16, 2016 at 6:58 PM, wszxyh <wsz...@163.com> wrote:
>>
>>> Hi
>>>
>>> Multiple streaming aggregations are not yet supported. When will it be
>>> supported? Is it in the plan?
>>>
>>> Thanks
>>>
>>>
>>>
>>>
>>
>>
>>
>>
>>
>
>
>
>
>


Re: Re: Multiple streaming aggregations in structured streaming

2016-11-20 Thread Reynold Xin
Can you use the approximate count distinct?


On Sun, Nov 20, 2016 at 11:51 PM, Xinyu Zhang  wrote:

>
> MapWithState is also very useful.
> I want to calculate UV in real time, but "distinct count" and "multiple
> streaming aggregations" are not supported.
> Is there any method to calculate real-time UV in the current version?
>
>
>
> At 2016-11-19 06:01:45, "Michael Armbrust"  wrote:
>
> Doing this generally is pretty hard.  We will likely support algebraic
> aggregate eventually, but this is not currently slotted for 2.2.  Instead I
> think we will add something like mapWithState that lets users compute
> arbitrary stateful things.  What is your use case?
>
>
> On Wed, Nov 16, 2016 at 6:58 PM, wszxyh  wrote:
>
>> Hi
>>
>> Multiple streaming aggregations are not yet supported. When will it be
>> supported? Is it in the plan?
>>
>> Thanks
>>
>>
>>
>>
>
>
>
>
>


Re: How do I convert json_encoded_blob_column into a data frame? (This may be a feature request)

2016-11-17 Thread Reynold Xin
Adding a new data type is an enormous undertaking and very invasive. I
don't think it is worth it in this case given there are clear, simple
workarounds.


On Thu, Nov 17, 2016 at 12:24 PM, kant kodali  wrote:

> Can we have a JSONType for Spark SQL?
>
> On Wed, Nov 16, 2016 at 8:41 PM, Nathan Lande 
> wrote:
>
>> If you are dealing with a bunch of different schemas in 1 field, figuring
>> out a strategy to deal with that will depend on your data and does not
>> really have anything to do with spark since mapping your JSON payloads to
>> tractable data structures will depend on business logic.
>>
>> The strategy of pulling out a blob into its on rdd and feeding it into
>> the JSON loader should work for any data source once you have your data
>> strategy figured out.
>>
>> On Wed, Nov 16, 2016 at 4:39 PM, kant kodali  wrote:
>>
>>> 1. I have a Cassandra Table where one of the columns is blob. And this
>>> blob contains a JSON encoded String however not all the blob's across the
>>> Cassandra table for that column are same (some blobs have difference json's
>>> than others) so In that case what is the best way to approach it? Do we
>>> need to put /group all the JSON Blobs that have same structure (same keys)
>>> into each individual data frame? For example, say if I have 5 json blobs
>>> that have same structure and another 3 JSON blobs that belongs to some
>>> other structure In this case do I need to create two data frames? (Attached
>>> is a screen shot of 2 rows of how my json looks like)
>>> 2. In my case, toJSON on RDD doesn't seem to help a lot. Attached a
>>> screen shot. Looks like I got the same data frame as my original one.
>>>
>>> Thanks much for these examples.
>>>
>>>
>>>
>>> On Wed, Nov 16, 2016 at 2:54 PM, Nathan Lande 
>>> wrote:
>>>
 I'm looking forward to 2.1 but, in the meantime, you can pull out the
 specific column into an RDD of JSON objects, pass this RDD into the
 read.json() and then join the results back onto your initial DF.

 Here is an example of what we do to unpack headers from Avro log data:

 def jsonLoad(path):
 #
 #load in the df
 raw = (sqlContext.read
 .format('com.databricks.spark.avro')
 .load(path)
 )
 #
 #define json blob, add primary key elements (hi and lo)
 #
 JSONBlob = concat(
 lit('{'),
 concat(lit('"lo":'), col('header.eventId.lo').cast('string'),
 lit(',')),
 concat(lit('"hi":'), col('header.eventId.hi').cast('string'),
 lit(',')),
 concat(lit('"response":'), decode('requestResponse.response',
 'UTF-8')),
 lit('}')
 )
 #
 #extract the JSON blob as a string
 rawJSONString = raw.select(JSONBlob).rdd.map(lambda x: str(x[0]))
 #
 #transform the JSON string into a DF struct object
 structuredJSON = sqlContext.read.json(rawJSONString)
 #
 #join the structured JSON back onto the initial DF using the hi and
 lo join keys
 final = (raw.join(structuredJSON,
 ((raw['header.eventId.lo'] == structuredJSON['lo']) &
 (raw['header.eventId.hi'] == structuredJSON['hi'])),
 'left_outer')
 .drop('hi')
 .drop('lo')
 )
 #
 #win
 return final

 On Wed, Nov 16, 2016 at 10:50 AM, Michael Armbrust <
 mich...@databricks.com> wrote:

> On Wed, Nov 16, 2016 at 2:49 AM, Hyukjin Kwon 
> wrote:
>
>> Maybe it sounds like you are looking for from_json/to_json functions
>> after en/decoding properly.
>>
>
> Which are new built-in functions that will be released with Spark 2.1.
>


>>>
>>
>


[ANNOUNCE] Apache Spark 2.0.2

2016-11-14 Thread Reynold Xin
We are happy to announce the availability of Spark 2.0.2!

Apache Spark 2.0.2 is a maintenance release containing 90 bug fixes along
with Kafka 0.10 support and runtime metrics for Structured Streaming. This
release is based on the branch-2.0 maintenance branch of Spark. We strongly
recommend all 2.0.x users to upgrade to this stable release.

To download Apache Spark 2.0.12 visit http://spark.apache.org/downloads.html

We would like to acknowledge all community members for contributing patches
to this release.


[ANNOUNCE] Announcing Apache Spark 1.6.3

2016-11-07 Thread Reynold Xin
We are happy to announce the availability of Spark 1.6.3! This maintenance
release includes fixes across several areas of Spark and encourage users on
the 1.6.x line to upgrade to 1.6.3.

Head to the project's download page to download the new version:
http://spark.apache.org/downloads.html


Mark DataFrame/Dataset APIs stable

2016-10-12 Thread Reynold Xin
I took a look at all the public APIs we expose in o.a.spark.sql tonight,
and realized we still have a large number of APIs that are marked
experimental. Most of these haven't really changed, except in 2.0 we merged
DataFrame and Dataset. I think it's long overdue to mark them stable.

I'm tracking this via ticket:
https://issues.apache.org/jira/browse/SPARK-17900

*The list I've come up with to graduate are*:

Dataset/DataFrame
- functions, since 1.3
- ColumnName, since 1.3
- DataFrameNaFunctions, since 1.3.1
- DataFrameStatFunctions, since 1.4
- UserDefinedFunction, since 1.3
- UserDefinedAggregateFunction, since 1.5
- Window and WindowSpec, since 1.4

Data sources:
- DataSourceRegister, since 1.5
- RelationProvider, since 1.3
- SchemaRelationProvider, since 1.3
- CreatableRelationProvider, since 1.3
- BaseRelation, since 1.3
- TableScan, since 1.3
- PrunedScan, since 1.3
- PrunedFilteredScan, since 1.3
- InsertableRelation, since 1.3


*The list I think we should definitely keep experimental are*:

- CatalystScan in data source (tied to internal logical plans so it is not
stable by definition)
- all classes related to Structured streaming (introduced new in 2.0 and
will likely change)


*The ones that I'm not sure whether we should graduate are:*

Typed operations for Datasets, including:
- all typed methods on Dataset class
- KeyValueGroupedDataset
- o.a.s.sql.expressions.javalang.typed
- o.a.s.sql.expressions.scalalang.typed
- methods that return typed Dataset in SparkSession

Most of these were introduced in 1.6 and had gone through drastic changes
in 2.0. I think we should try very hard not to break them any more, but we
might still run into issues in the future that require changing these.


Let me know what you think.


Re: [SPARK-2.0][SQL] UDF containing non-serializable object does not work as expected

2016-08-08 Thread Reynold Xin
The show thing was the result of an optimization that short-circuited any
real Spark computation when the input is a local collection, and the result
was simply the first few rows. That's why it completed without serializing
anything.

It is somewhat inconsistent. One way to eliminate the inconsistency is to
always serialize the query plan even for local execution. We did that back
in the days for the RDD code path, and we can do similar things for the SQL
code path. However, serialization is not free and it will slow down the
execution by small percentage.



On Tue, Aug 9, 2016 at 5:05 AM, Hao Ren <inv...@gmail.com> wrote:

> @Reynold
>
> Some questions to make things clear:
>
> 1. As nothing is really final in the JVM, is the generated code during
> the execution of `df.show()` different from the one of `df.filter($"key"
> === 2).show()` in my snippet ?
>
> 2. When `df.show()` is being executed, it seems that the 'notSer' object
> is not serialized (since no exception), instead the Int value in it is
> serialized. Is this correct ?
> As for me, this behavior is counterintuitive.
> The analogical problem would be a `RDD.map` 's closure contains
> 'notSer.value'. For example:
> 
> rdd.map {
>   case (key, value) => value + notSer.value
> }
> rdd.count
> 
> It should thrown a "Task not serializable" exception. But for dataframe,
> it is not the case because of reflection or unsafe.
>
> 3. I am wondering whether this "feature" of scala complier makes the
> DataFrame API unpredictable ? Any roadmap on this ?
> As a user, I can not expect that a `fitler` call before `show` crashes,
> while a simple `show` on the original df works.
>
> The workaround I can imagine is just to cache and materialize `df` by
> `df.cache.count()`, and then call `df.filter(...).show()`.
> It should work, just a little bit tedious.
>
>
>
> On Mon, Aug 8, 2016 at 10:00 PM, Reynold Xin <r...@databricks.com> wrote:
>
>> That is unfortunately the way how Scala compiler captures (and defines)
>> closures. Nothing is really final in the JVM. You can always use reflection
>> or unsafe to modify the value of fields.
>>
>> On Mon, Aug 8, 2016 at 8:16 PM, Simon Scott <
>> simon.sc...@viavisolutions.com> wrote:
>>
>>> But does the “notSer” object have to be serialized?
>>>
>>>
>>>
>>> The object is immutable by the definition of A, so the only thing that
>>> needs to be serialized is the (immutable) Int value? And Ints are
>>> serializable?
>>>
>>>
>>>
>>> Just thinking out loud
>>>
>>>
>>>
>>> Simon Scott
>>>
>>>
>>>
>>> Research Developer @ viavisolutions.com
>>>
>>>
>>>
>>> *From:* Hao Ren [mailto:inv...@gmail.com]
>>> *Sent:* 08 August 2016 09:03
>>> *To:* Muthu Jayakumar <bablo...@gmail.com>
>>> *Cc:* user <user@spark.apache.org>; dev <d...@spark.apache.org>
>>> *Subject:* Re: [SPARK-2.0][SQL] UDF containing non-serializable object
>>> does not work as expected
>>>
>>>
>>>
>>> Yes, it is.
>>>
>>> You can define a udf like that.
>>>
>>> Basically, it's a udf Int => Int which is a closure contains a non
>>> serializable object.
>>>
>>> The latter should cause Task not serializable exception.
>>>
>>>
>>>
>>> Hao
>>>
>>>
>>>
>>> On Mon, Aug 8, 2016 at 5:08 AM, Muthu Jayakumar <bablo...@gmail.com>
>>> wrote:
>>>
>>> Hello Hao Ren,
>>>
>>>
>>>
>>> Doesn't the code...
>>>
>>>
>>>
>>> val add = udf {
>>>
>>>   (a: Int) => a + notSer.value
>>>
>>> }
>>>
>>> Mean UDF function that Int => Int ?
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Muthu
>>>
>>>
>>>
>>> On Sun, Aug 7, 2016 at 2:31 PM, Hao Ren <inv...@gmail.com> wrote:
>>>
>>> I am playing with spark 2.0
>>>
>>> What I tried to test is:
>>>
>>>
>>>
>>> Create a UDF in which there is a non serializable object.
>>>
>>> What I expected is when this UDF is called during materializing the
>>> dataFrame where the UDF is used in "select", an task non serializable
>>> exception should be thrown.
>>>
>>> It depends also which "action" is called on that

Re: [SPARK-2.0][SQL] UDF containing non-serializable object does not work as expected

2016-08-08 Thread Reynold Xin
That is unfortunately the way how Scala compiler captures (and defines)
closures. Nothing is really final in the JVM. You can always use reflection
or unsafe to modify the value of fields.

On Mon, Aug 8, 2016 at 8:16 PM, Simon Scott 
wrote:

> But does the “notSer” object have to be serialized?
>
>
>
> The object is immutable by the definition of A, so the only thing that
> needs to be serialized is the (immutable) Int value? And Ints are
> serializable?
>
>
>
> Just thinking out loud
>
>
>
> Simon Scott
>
>
>
> Research Developer @ viavisolutions.com
>
>
>
> *From:* Hao Ren [mailto:inv...@gmail.com]
> *Sent:* 08 August 2016 09:03
> *To:* Muthu Jayakumar 
> *Cc:* user ; dev 
> *Subject:* Re: [SPARK-2.0][SQL] UDF containing non-serializable object
> does not work as expected
>
>
>
> Yes, it is.
>
> You can define a udf like that.
>
> Basically, it's a udf Int => Int which is a closure contains a non
> serializable object.
>
> The latter should cause Task not serializable exception.
>
>
>
> Hao
>
>
>
> On Mon, Aug 8, 2016 at 5:08 AM, Muthu Jayakumar 
> wrote:
>
> Hello Hao Ren,
>
>
>
> Doesn't the code...
>
>
>
> val add = udf {
>
>   (a: Int) => a + notSer.value
>
> }
>
> Mean UDF function that Int => Int ?
>
>
>
> Thanks,
>
> Muthu
>
>
>
> On Sun, Aug 7, 2016 at 2:31 PM, Hao Ren  wrote:
>
> I am playing with spark 2.0
>
> What I tried to test is:
>
>
>
> Create a UDF in which there is a non serializable object.
>
> What I expected is when this UDF is called during materializing the
> dataFrame where the UDF is used in "select", an task non serializable
> exception should be thrown.
>
> It depends also which "action" is called on that dataframe.
>
>
>
> Here is the code for reproducing the pb:
>
>
>
> 
>
> object DataFrameSerDeTest extends App {
>
>
>
>   class A(val value: Int) // It is not serializable
>
>
>
>   def run() = {
>
> val spark = SparkSession
>
>   .builder()
>
>   .appName("DataFrameSerDeTest")
>
>   .master("local[*]")
>
>   .getOrCreate()
>
>
>
> import org.apache.spark.sql.functions.udf
>
> import spark.sqlContext.implicits._
>
>
>
> val notSer = new A(2)
>
> val add = udf {
>
>   (a: Int) => a + notSer.value
>
> }
>
> val df = spark.createDataFrame(Seq(
>
>   (1, 2),
>
>   (2, 2),
>
>   (3, 2),
>
>   (4, 2)
>
> )).toDF("key", "value")
>
>   .select($"key", add($"value").as("added"))
>
>
>
> df.show() // *It should not work because the udf contains a
> non-serializable object, but it works*
>
>
>
> df.filter($"key" === 2).show() // *It does not work as expected
> (org.apache.spark.SparkException: Task not serializable)*
>
>   }
>
>
>
>   run()
>
> }
>
> 
>
>
>
> Also, I tried collect(), count(), first(), limit(). All of them worked
> without non-serializable exceptions.
>
> It seems only filter() throws the exception. (feature or bug ?)
>
>
>
> Any ideas ? Or I just messed things up ?
>
> Any help is highly appreciated.
>
>
>
> --
>
> Hao Ren
>
>
>
> Data Engineer @ leboncoin
>
>
>
> Paris, France
>
>
>
>
>
>
>
> --
>
> Hao Ren
>
>
>
> Data Engineer @ leboncoin
>
>
>
> Paris, France
>


Re: RDD vs Dataset performance

2016-07-28 Thread Reynold Xin
The performance difference is coming from the need to serialize and
deserialize data to AnnotationText. The extra stage is probably very quick
and shouldn't impact much.

If you try cache the RDD using serialized mode, it would slow down a lot
too.


On Thu, Jul 28, 2016 at 9:52 AM, Darin McBeath 
wrote:

> I started playing round with Datasets on Spark 2.0 this morning and I'm
> surprised by the significant performance difference I'm seeing between an
> RDD and a Dataset for a very basic example.
>
>
> I've defined a simple case class called AnnotationText that has a handful
> of fields.
>
>
> I create a Dataset[AnnotationText] with my data and repartition(4) this on
> one of the columns and cache the resulting dataset as ds (force the cache
> by executing a count action).  Everything looks good and I have more than
> 10M records in my dataset ds.
>
> When I execute the following:
>
> ds.filter(textAnnotation => textAnnotation.text ==
> "mutational".toLowerCase).count
>
> It consistently finishes in just under 3 seconds.  One of the things I
> notice is that it has 3 stages.  The first stage is skipped (as this had to
> do with creation ds and it was already cached).  The second stage appears
> to do the filtering (requires 4 tasks) but interestingly it shuffles
> output.  The third stage (requires only 1 task) appears to count the
> results of the shuffle.
>
> When I look at the cached dataset (on 4 partitions) it is 82.6MB.
>
> I then decided to convert the ds dataset to an RDD as follows,
> repartition(4) and cache.
>
> val aRDD = ds.rdd.repartition(4).cache
> aRDD.count
> So, I now have an RDD[AnnotationText]
>
> When I execute the following:
>
> aRDD.filter(textAnnotation => textAnnotation.text ==
> "mutational".toLowerCase).count
>
> It consistently finishes in just under half a second.  One of the things I
> notice is that it only has 2 stages.  The first stage is skipped (as this
> had to do with creation of aRDD and it was already cached).  The second
> stage appears to do the filtering and count(requires 4 tasks).
> Interestingly, there is no shuffle (or subsequently 3rd stage).
>
> When I look at the cached RDD (on 4 partitions) it is 2.9GB.
>
>
> I was surprised how significant the cached storage difference was between
> the Dataset (82.6MB) and the RDD (2.9GB) version of the same content.  Is
> this kind of difference to be expected?
>
> While I like the smaller size for the Dataset version, I was confused as
> to why the performance for the Dataset version was so much slower (2.5s vs
> .5s).  I suspect it might be attributed to the shuffle and third stage
> required by the Dataset version but I'm not sure. I was under the
> impression that Datasets should (would) be faster in many use cases (such
> as the one I'm using above).  Am I doing something wrong or is this to be
> expected?
>
> Thanks.
>
> Darin.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


[ANNOUNCE] Announcing Apache Spark 2.0.0

2016-07-27 Thread Reynold Xin
Hi all,

Apache Spark 2.0.0 is the first release of Spark 2.x line. It includes
2500+ patches from 300+ contributors.

To download Spark 2.0, head over to the download page:
http://spark.apache.org/downloads.html

To view the release notes:
http://spark.apache.org/releases/spark-release-2-0-0.html


(note: it can take a few hours for everything to be propagated, so you
might get 404 on some download links.  If you see any issues with the
release notes or webpage *please contact me directly, off-list*)


Re: where I can find spark-streaming-kafka for spark2.0

2016-07-25 Thread Reynold Xin
The presentation at Spark Summit SF was probably referring to Structured
Streaming. The existing Spark Streaming (dstream) in Spark 2.0 has the same
production stability level as Spark 1.6. There is also Kafka 0.10 support
in dstream.

On July 25, 2016 at 10:26:49 AM, Andy Davidson (
a...@santacruzintegration.com) wrote:

Hi Kevin

Just a heads up at the recent spark summit in S.F. There was a presentation
about streaming in 2.0. They said that streaming was not going to
production ready in 2.0.

I am not sure if the older 1.6.x version will be supported. My project will
not be able to upgrade with streaming support. We also use kafka

Andy

From: Marco Mistroni 
Date: Monday, July 25, 2016 at 2:33 AM
To: kevin 
Cc: "user @spark" , "dev.spark" 
Subject: Re: where I can find spark-streaming-kafka for spark2.0

Hi Kevin
  you should not need to rebuild everything.
Instead, i believe you should launch spark-submit by specifying the kafka
jar file in your --packages... i had to follow same when integrating spark
streaming with flume

  have you checked this link ?
https://spark.apache.org/docs/latest/streaming-kafka-integration.html


hth



On Mon, Jul 25, 2016 at 10:20 AM, kevin  wrote:

> I have compile it from source code
>
> 2016-07-25 12:05 GMT+08:00 kevin :
>
>> hi,all :
>> I try to run example org.apache.spark.examples.streaming.KafkaWordCount ,
>> I got error :
>> Exception in thread "main" java.lang.NoClassDefFoundError:
>> org/apache/spark/streaming/kafka/KafkaUtils$
>> at
>> org.apache.spark.examples.streaming.KafkaWordCount$.main(KafkaWordCount.scala:57)
>> at
>> org.apache.spark.examples.streaming.KafkaWordCount.main(KafkaWordCount.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:724)
>> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>> Caused by: java.lang.ClassNotFoundException:
>> org.apache.spark.streaming.kafka.KafkaUtils$
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> ... 11 more
>>
>> so where I can find spark-streaming-kafka for spark2.0
>>
>
>


Re: transtition SQLContext to SparkSession

2016-07-19 Thread Reynold Xin
Yes. But in order to access methods available only in HiveContext a user
cast is required.

On Tuesday, July 19, 2016, Maciej Bryński <mac...@brynski.pl> wrote:

> @Reynold Xin,
> How this will work with Hive Support ?
> SparkSession.sqlContext return HiveContext ?
>
> 2016-07-19 0:26 GMT+02:00 Reynold Xin <r...@databricks.com <javascript:;>
> >:
> > Good idea.
> >
> > https://github.com/apache/spark/pull/14252
> >
> >
> >
> > On Mon, Jul 18, 2016 at 12:16 PM, Michael Armbrust <
> mich...@databricks.com <javascript:;>>
> > wrote:
> >>
> >> + dev, reynold
> >>
> >> Yeah, thats a good point.  I wonder if SparkSession.sqlContext should be
> >> public/deprecated?
> >>
> >> On Mon, Jul 18, 2016 at 8:37 AM, Koert Kuipers <ko...@tresata.com
> <javascript:;>> wrote:
> >>>
> >>> in my codebase i would like to gradually transition to SparkSession, so
> >>> while i start using SparkSession i also want a SQLContext to be
> available as
> >>> before (but with a deprecated warning when i use it). this should be
> easy
> >>> since SQLContext is now a wrapper for SparkSession.
> >>>
> >>> so basically:
> >>> val session = SparkSession.builder.set(..., ...).getOrCreate()
> >>> val sqlc = new SQLContext(session)
> >>>
> >>> however this doesnt work, the SQLContext constructor i am trying to use
> >>> is private. SparkSession.sqlContext is also private.
> >>>
> >>> am i missing something?
> >>>
> >>> a non-gradual switch is not very realistic in any significant codebase,
> >>> and i do not want to create SparkSession and SQLContext independendly
> (both
> >>> from same SparkContext) since that can only lead to confusion and
> >>> inconsistent settings.
> >>
> >>
> >
>
>
>
> --
> Maciek Bryński
>


Re: transtition SQLContext to SparkSession

2016-07-18 Thread Reynold Xin
Good idea.

https://github.com/apache/spark/pull/14252



On Mon, Jul 18, 2016 at 12:16 PM, Michael Armbrust 
wrote:

> + dev, reynold
>
> Yeah, thats a good point.  I wonder if SparkSession.sqlContext should be
> public/deprecated?
>
> On Mon, Jul 18, 2016 at 8:37 AM, Koert Kuipers  wrote:
>
>> in my codebase i would like to gradually transition to SparkSession, so
>> while i start using SparkSession i also want a SQLContext to be available
>> as before (but with a deprecated warning when i use it). this should be
>> easy since SQLContext is now a wrapper for SparkSession.
>>
>> so basically:
>> val session = SparkSession.builder.set(..., ...).getOrCreate()
>> val sqlc = new SQLContext(session)
>>
>> however this doesnt work, the SQLContext constructor i am trying to use
>> is private. SparkSession.sqlContext is also private.
>>
>> am i missing something?
>>
>> a non-gradual switch is not very realistic in any significant codebase,
>> and i do not want to create SparkSession and SQLContext independendly (both
>> from same SparkContext) since that can only lead to confusion and
>> inconsistent settings.
>>
>
>


Re: Spark Website

2016-07-13 Thread Reynold Xin
Thanks for reporting. This is due to
https://issues.apache.org/jira/servicedesk/agent/INFRA/issue/INFRA-12055



On Wed, Jul 13, 2016 at 11:52 AM, Pradeep Gollakota 
wrote:

> Worked for me if I go to https://spark.apache.org/site/ but not
> https://spark.apache.org
>
> On Wed, Jul 13, 2016 at 11:48 AM, Maurin Lenglart 
> wrote:
>
>> Same here
>>
>>
>>
>> *From: *Benjamin Kim 
>> *Date: *Wednesday, July 13, 2016 at 11:47 AM
>> *To: *manish ranjan 
>> *Cc: *user 
>> *Subject: *Re: Spark Website
>>
>>
>>
>> It takes me to the directories instead of the webpage.
>>
>>
>>
>> On Jul 13, 2016, at 11:45 AM, manish ranjan 
>> wrote:
>>
>>
>>
>> working for me. What do you mean 'as supposed to'?
>>
>>
>> ~Manish
>>
>>
>>
>> On Wed, Jul 13, 2016 at 11:45 AM, Benjamin Kim 
>> wrote:
>>
>> Has anyone noticed that the spark.apache.org is not working as supposed
>> to?
>>
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>>
>>
>>
>
>


Re: ml and mllib persistence

2016-07-12 Thread Reynold Xin
Also Java serialization isn't great for cross platform compatibility.

On Tuesday, July 12, 2016, aka.fe2s  wrote:

> Okay, I think I found an answer on my question. Some models (for instance
> org.apache.spark.mllib.recommendation.MatrixFactorizationModel) hold RDDs,
> so just serializing these objects will not work.
>
> --
> Oleksiy Dyagilev
>
> On Tue, Jul 12, 2016 at 5:40 PM, aka.fe2s  > wrote:
>
>> What is the reason Spark has an individual implementations of read/write
>> routines for every model in mllib and ml? (Saveable and MLWritable trait
>> impls)
>>
>> Wouldn't a generic implementation via Java serialization mechanism work?
>> I would like to use it to store the models to a custom storage.
>>
>> --
>> Oleksiy
>>
>
>


Re: Logical Plan

2016-06-30 Thread Reynold Xin
Which version are you using here? If the underlying files change,
technically we should go through optimization again.

Perhaps the real "fix" is to figure out why is logical plan creation so
slow for 700 columns.


On Thu, Jun 30, 2016 at 1:58 PM, Darshan Singh 
wrote:

> Is there a way I can use same Logical plan for a query. Everything will be
> same except underlying file will be different.
>
> Issue is that my query has around 700 columns and Generating logical plan
> takes 20 seconds and it happens every 2 minutes but every time underlying
> file is different.
>
> I do not know these files in advance so I cant create the table on
> directory level. These files are created and then used in the final query.
>
> Thanks
>


[ANNOUNCE] Announcing Spark 1.6.2

2016-06-27 Thread Reynold Xin
We are happy to announce the availability of Spark 1.6.2! This maintenance
release includes fixes across several areas of Spark. You can find the list
of changes here: https://s.apache.org/spark-1.6.2

And download the release here: http://spark.apache.org/downloads.html


Re: Thanks For a Job Well Done !!!

2016-06-18 Thread Reynold Xin
Thanks for the kind words, Krishna! Please keep the feedback coming.

On Saturday, June 18, 2016, Krishna Sankar  wrote:

> Hi all,
>Just wanted to thank all for the dataset API - most of the times we see
> only bugs in these lists ;o).
>
>- Putting some context, this weekend I was updating the SQL chapters
>of my book - it had all the ugliness of SchemaRDD,
>registerTempTable, take(10).foreach(println)
>and take(30).foreach(e=>println("%15s | %9.2f |".format(e(0),e(1 ;o)
>- I remember Hossein Falaki chiding me about the ugly println
>   statements !
>   - Took me a little while to grok the dataset, sparksession,
>   
> spark.read.option("header","true").option("inferSchema","true").csv(...) et
>   al.
>  - I am a big R fan and know the language pretty decent - so the
>  constructs are familiar
>   - Once I got it ( I am sure still there are more mysteries to
>uncover ...) it was just beautiful - well done folks !!!
>- One sees the contrast a lot better while teaching or writing books,
>because one has to think thru the old, the new and the transitional arc
>   - I even remember the good old days when we were discussing whether
>   Spark would get the dataframes like R at one of Paco's sessions !
>   - And now, it looks very decent for data wrangling.
>
> Cheers & keep up the good work
> 
> P.S: My next chapter is the MLlib - need to convert to ml. Should be
> interesting ... I am a glutton for punishment - of the Spark kind, of
> course !
>


Re: Spark 2.0 Release Date

2016-06-07 Thread Reynold Xin
It'd be great to cut an RC as soon as possible. Looking at the
blocker/critical issue list, majority of them are API audits. I think
people will get back to those once Spark Summit is over, and then we should
see some good progress towards an RC.

On Tue, Jun 7, 2016 at 6:20 AM, Jacek Laskowski  wrote:

> Finally, the PMC voice on the subject. Thanks a lot, Sean!
>
> p.s. Given how much time it takes to ship 2.0 (with so many cool
> features already backed in!) I'd vote for releasing a few more RCs
> before 2.0 hits the shelves. I hope 2.0 is not Java 9 or Jigsaw ;-)
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Tue, Jun 7, 2016 at 3:06 PM, Sean Owen  wrote:
> > I don't believe the intent was to get it out before Spark Summit or
> > something. That shouldn't drive the schedule anyway. But now that
> > there's a 2.0.0 preview available, people who are eager to experiment
> > or test on it can do so now.
> >
> > That probably reduces urgency to get it out the door in order to
> > deliver new functionality. I guessed the 2.0.0 release would be mid
> > June and now I'd guess early July. But, nobody's discussed it per se.
> >
> > In theory only fixes, tests and docs are being merged, so the JIRA
> > count should be going down. It has, slowly. Right now there are 72
> > open issues for 2.0.0, of which 20 are blockers. Most of those are
> > simple "audit x" or "document x" tasks or umbrellas, but, they do
> > represent things that have to get done before a release, and that to
> > me looks like a few more weeks of finishing, pushing, and sweeping
> > under carpets.
> >
> >
> > On Tue, Jun 7, 2016 at 1:45 PM, Jacek Laskowski  wrote:
> >> On Tue, Jun 7, 2016 at 1:25 PM, Arun Patel 
> wrote:
> >>> Do we have any further updates on release date?
> >>
> >> Nope :( And it's even more quiet than I could have thought. I was so
> >> certain that today's the date. Looks like Spark Summit has "consumed"
> >> all the people behind 2.0...Can't believe no one (from the
> >> PMC/committers) even mean to shoot a date :( Patrick's gone. Reynold's
> >> busy. Perhaps Sean?
> >>
> >>> Also, Is there a updated documentation for 2.0 somewhere?
> >>
> >>
> http://people.apache.org/~pwendell/spark-nightly/spark-master-docs/latest/
> ?
> >>
> >> Jacek
> >>
> >> -
> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> For additional commands, e-mail: user-h...@spark.apache.org
> >>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: JDBC Dialect for saving DataFrame into Vertica Table

2016-05-26 Thread Reynold Xin
It's probably a good idea to have the vertica dialect too, since it doesn't
seem like it'd be too difficult to maintain. It is not going to be as
performant as the native Vertica data source, but is going to be much
lighter weight.


On Thu, May 26, 2016 at 3:09 PM, Mohammed Guller 
wrote:

> Vertica also provides a Spark connector. It was not GA the last time I
> looked at it, but available on the Vertica community site. Have you tried
> using the Vertica Spark connector instead of the JDBC driver?
>
>
>
> Mohammed
>
> Author: Big Data Analytics with Spark
> 
>
>
>
> *From:* Aaron Ilovici [mailto:ailov...@wayfair.com]
> *Sent:* Thursday, May 26, 2016 8:08 AM
> *To:* user@spark.apache.org; d...@spark.apache.org
> *Subject:* JDBC Dialect for saving DataFrame into Vertica Table
>
>
>
> I am attempting to write a DataFrame of Rows to Vertica via
> DataFrameWriter's jdbc function in the following manner:
>
>
>
> dataframe.write().mode(SaveMode.Append).jdbc(url, table, properties);
>
>
>
> This works when there are no NULL values in any of the Rows in my
> DataFrame. However, when there are rows, I get the following error:
>
>
>
> ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 24)
>
> java.sql.SQLFeatureNotSupportedException: [Vertica][JDBC](10220) Driver
> not capable.
>
> at com.vertica.exceptions.ExceptionConverter.toSQLException(Unknown
> Source)
>
> at
> com.vertica.jdbc.common.SPreparedStatement.checkTypeSupported(Unknown
> Source)
>
> at com.vertica.jdbc.common.SPreparedStatement.setNull(Unknown Source)
>
>
>
> This appears to be Spark's attempt to set a null value in a
> PreparedStatement, but Vertica does not understand the type upon executing
> the transaction. I see in JdbcDialects.scala that there are dialects for
> MySQL, Postgres, DB2, MsSQLServer, Derby, and Oracle.
>
>
>
> 1 - Would writing a dialect for Vertica eleviate the issue, by setting a
> 'NULL' in a type that Vertica would understand?
>
> 2 - What would be the best way to do this without a Spark patch? Scala,
> Java, make a jar and call 'JdbcDialects.registerDialect(VerticaDialect)'
> once created?
>
> 3 - Where would one find the proper mapping between Spark DataTypes and
> Vertica DataTypes? I don't see 'NULL' handling for any of the dialects,
> only the base case 'case _ => None' - is None mapped to the proper NULL
> type elsewhere?
>
>
>
> My environment: Spark 1.6, Vertica Driver 7.2.2, Java 1.7
>
>
>
> I would be happy to create a Jira and submit a pull request with the
> VerticaDialect once I figure this out.
>
>
>
> Thank you for any insight on this,
>
>
>
> *AARON ILOVICI*
> Software Engineer
>
> Marketing Engineering
>
> *WAYFAIR*
> 4 Copley Place
> Boston, MA 02116
>
> (617) 532-6100 x1231
> ailov...@wayfair.com
>
>
>


Re: feedback on dataset api explode

2016-05-25 Thread Reynold Xin
Created JIRA ticket: https://issues.apache.org/jira/browse/SPARK-15533

@Koert - Please keep API feedback coming. One thing - in the future, can
you send api feedbacks to the dev@ list instead of user@?



On Wed, May 25, 2016 at 1:05 PM, Cheng Lian <l...@databricks.com> wrote:

> Agree, since they can be easily replaced by .flatMap (to do explosion) and
> .select (to rename output columns)
>
> Cheng
>
>
> On 5/25/16 12:30 PM, Reynold Xin wrote:
>
> Based on this discussion I'm thinking we should deprecate the two explode
> functions.
>
> On Wednesday, May 25, 2016, Koert Kuipers < <ko...@tresata.com>
> ko...@tresata.com> wrote:
>
>> wenchen,
>> that definition of explode seems identical to flatMap, so you dont need
>> it either?
>>
>> michael,
>> i didn't know about the column expression version of explode, that makes
>> sense. i will experiment with that instead.
>>
>> On Wed, May 25, 2016 at 3:03 PM, Wenchen Fan <wenc...@databricks.com>
>> wrote:
>>
>>> I think we only need this version:  `def explode[B : Encoder](f: A
>>> => TraversableOnce[B]): Dataset[B]`
>>>
>>> For untyped one, `df.select(explode($"arrayCol").as("item"))` should be
>>> the best choice.
>>>
>>> On Wed, May 25, 2016 at 11:55 AM, Michael Armbrust <
>>> mich...@databricks.com> wrote:
>>>
>>>> These APIs predate Datasets / encoders, so that is why they are Row
>>>> instead of objects.  We should probably rethink that.
>>>>
>>>> Honestly, I usually end up using the column expression version of
>>>> explode now that it exists (i.e. explode($"arrayCol").as("Item")).  It
>>>> would be great to understand more why you are using these instead.
>>>>
>>>> On Wed, May 25, 2016 at 8:49 AM, Koert Kuipers <ko...@tresata.com>
>>>> wrote:
>>>>
>>>>> we currently have 2 explode definitions in Dataset:
>>>>>
>>>>>  def explode[A <: Product : TypeTag](input: Column*)(f: Row =>
>>>>> TraversableOnce[A]): DataFrame
>>>>>
>>>>>  def explode[A, B : TypeTag](inputColumn: String, outputColumn:
>>>>> String)(f: A => TraversableOnce[B]): DataFrame
>>>>>
>>>>> 1) the separation of the functions into their own argument lists is
>>>>> nice, but unfortunately scala's type inference doesn't handle this well,
>>>>> meaning that the generic types always have to be explicitly provided. i
>>>>> assume this was done to allow the "input" to be a varargs in the first
>>>>> method, and then kept the same in the second for reasons of symmetry.
>>>>>
>>>>> 2) i am surprised the first definition returns a DataFrame. this seems
>>>>> to suggest DataFrame usage (so DataFrame to DataFrame), but there is no 
>>>>> way
>>>>> to specify the output column names, which limits its usability for
>>>>> DataFrames. i frequently end up using the first definition for DataFrames
>>>>> anyhow because of the need to return more than 1 column (and the data has
>>>>> columns unknown at compile time that i need to carry along making flatMap
>>>>> on Dataset clumsy/unusable), but relying on the output columns being 
>>>>> called
>>>>> _1 and _2 and renaming then afterwards seems like an anti-pattern.
>>>>>
>>>>> 3) using Row objects isn't very pretty. why not f: A =>
>>>>> TraversableOnce[B] or something like that for the first definition? how
>>>>> about:
>>>>>  def explode[A: TypeTag, B: TypeTag](input: Seq[Column], output:
>>>>> Seq[Column])(f: A => TraversableOnce[B]): DataFrame
>>>>>
>>>>> best,
>>>>> koert
>>>>>
>>>>
>>>>
>>>
>>
>


Re: Pros and Cons

2016-05-25 Thread Reynold Xin
On Wed, May 25, 2016 at 9:52 AM, Jörn Franke  wrote:

> Spark is more for machine learning working iteravely over the whole same
> dataset in memory. Additionally it has streaming and graph processing
> capabilities that can be used together.
>

Hi Jörn,

The first part is actually no true. Spark can handle data far greater than
the aggregate memory available on a cluster. The more recent versions
(1.3+) of Spark have external operations for almost all built-in operators,
and while things may not be perfect, those external operators are becoming
more and more robust with each version of Spark.


Re: feedback on dataset api explode

2016-05-25 Thread Reynold Xin
Based on this discussion I'm thinking we should deprecate the two explode
functions.

On Wednesday, May 25, 2016, Koert Kuipers  wrote:

> wenchen,
> that definition of explode seems identical to flatMap, so you dont need it
> either?
>
> michael,
> i didn't know about the column expression version of explode, that makes
> sense. i will experiment with that instead.
>
> On Wed, May 25, 2016 at 3:03 PM, Wenchen Fan  > wrote:
>
>> I think we only need this version:  `def explode[B : Encoder](f: A
>> => TraversableOnce[B]): Dataset[B]`
>>
>> For untyped one, `df.select(explode($"arrayCol").as("item"))` should be
>> the best choice.
>>
>> On Wed, May 25, 2016 at 11:55 AM, Michael Armbrust <
>> mich...@databricks.com
>> > wrote:
>>
>>> These APIs predate Datasets / encoders, so that is why they are Row
>>> instead of objects.  We should probably rethink that.
>>>
>>> Honestly, I usually end up using the column expression version of
>>> explode now that it exists (i.e. explode($"arrayCol").as("Item")).  It
>>> would be great to understand more why you are using these instead.
>>>
>>> On Wed, May 25, 2016 at 8:49 AM, Koert Kuipers >> > wrote:
>>>
 we currently have 2 explode definitions in Dataset:

  def explode[A <: Product : TypeTag](input: Column*)(f: Row =>
 TraversableOnce[A]): DataFrame

  def explode[A, B : TypeTag](inputColumn: String, outputColumn:
 String)(f: A => TraversableOnce[B]): DataFrame

 1) the separation of the functions into their own argument lists is
 nice, but unfortunately scala's type inference doesn't handle this well,
 meaning that the generic types always have to be explicitly provided. i
 assume this was done to allow the "input" to be a varargs in the first
 method, and then kept the same in the second for reasons of symmetry.

 2) i am surprised the first definition returns a DataFrame. this seems
 to suggest DataFrame usage (so DataFrame to DataFrame), but there is no way
 to specify the output column names, which limits its usability for
 DataFrames. i frequently end up using the first definition for DataFrames
 anyhow because of the need to return more than 1 column (and the data has
 columns unknown at compile time that i need to carry along making flatMap
 on Dataset clumsy/unusable), but relying on the output columns being called
 _1 and _2 and renaming then afterwards seems like an anti-pattern.

 3) using Row objects isn't very pretty. why not f: A =>
 TraversableOnce[B] or something like that for the first definition? how
 about:
  def explode[A: TypeTag, B: TypeTag](input: Seq[Column], output:
 Seq[Column])(f: A => TraversableOnce[B]): DataFrame

 best,
 koert

>>>
>>>
>>
>


Re: Spark 2.0 - SQL Subqueries.

2016-05-21 Thread Reynold Xin
https://issues.apache.org/jira/browse/SPARK-15078 was just a bunch of test
harness and added no new functionality. To reduce confusion, I just
backported it into branch-2.0 so SPARK-15078 is now in 2.0 too.

Can you paste a query you were testing?


On Sat, May 21, 2016 at 10:49 AM, Kamalesh Nair 
wrote:

> Hi,
>
> From the Spark 2.0 Release webinar what I understood is, the newer version
> have significantly expanded the SQL capabilities of Spark, with the
> introduction of a new ANSI SQL parser and support for Subqueries.  It also
> says, Spark 2.0 can run all the 99 TPC-DS queries, which require many of
> the
> SQL:2003 features.
>
> I was testing out Spark 2.0 (apache/branch-2.0 preview) Technical Preview
> provided by Databricks and observed that the Spark SQL Subqueries are now
> supported in Where Clause which is great but it is not supported in Select
> Clause itself. Below listed is the error message displayed.
>
> "Error in SQL statement: AnalysisException: Predicate sub-queries can only
> be used in a Filter"
>
> Is it work in progress planned for the final release or a miss ?
>
> Also, I could find a related JIRA # SPARK-15078 being resolved in Spark
> 2.1.0 version. If this is what address this issue then kindly let me know
> the tentative release dates for Spark 2.1.0. Instead of Spark 2.0 in mid
> June would it be Spark 2.1.0 that would be released during the same time ?
>
> Thanks in advanace for your prompt reply !
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-2-0-SQL-Subqueries-tp26993.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Do transformation functions on RDD invoke a Job [sc.runJob]?

2016-04-24 Thread Reynold Xin
Usually no - but sortByKey does because it needs the range boundary to be
built in order to have the RDD. It is a long standing problem that's
unfortunately very difficult to solve without breaking the RDD API.

In DataFrame/Dataset we don't have this issue though.


On Sun, Apr 24, 2016 at 10:54 PM, Praveen Devarao 
wrote:

> Hi,
>
> I have a streaming program with the block as below [ref:
> https://github.com/agsachin/streamingBenchmark/blob/master/spark-benchmarks/src/main/scala/TwitterStreaming.scala
> ]
>
> *1 val **lines *= *messages*.map(_._2)
> *2 val **hashTags *= *lines*.flatMap(status => status.split(*" "*
> ).filter(_.startsWith(*"#"*)))
>
> *3 val **topCounts60 *= *hashTags*.map((_, 1)).reduceByKey( _ + _ )
> *3a* .map { *case *(topic, count) => (count, topic) }
> *3b* .transform(_.sortByKey(*false*))
>
> *4a**topCounts60*.foreachRDD( rdd => {
> *4b* *val *topList = rdd.take( 10 )
> })
>
> This batch is triggering 2 jobs...one at line *3b**(sortByKey)*
>  and the other at *4b (rdd.take) *I agree that there is a Job triggered
> on line 4b as take() is an action on RDD while as on line 3b sortByKey is
> just a transformation function which as per docs is lazy evaluation...but I
> see that this line uses a RangePartitioner and Rangepartitioner on
> initialization invokes a method called *sketch() *that invokes *collect()*
> triggering a Job.
>
> My question: Is it expected that sortByKey will invoke a Job...if
> yes, why is sortByKey listed as a transformation and not action. Are there
> any other functions like this that invoke a Job, though they are
> transformations and not actions?
>
> I am on Spark 1.6
>
> Thanking You
>
> -
> Praveen Devarao
> Spark Technology Centre
> IBM India Software Labs
>
> -
> "Courage doesn't always roar. Sometimes courage is the quiet voice at the
> end of the day saying I will try again"
>


Re: How Spark handles dead machines during a job.

2016-04-09 Thread Reynold Xin
The driver has the data and wouldn't need to rerun.

On Friday, April 8, 2016, Sung Hwan Chung  wrote:

> Hello,
>
> Say, that I'm doing a simple rdd.map followed by collect. Say, also, that
> one of the executors finish all of its tasks, but there are still other
> executors running.
>
> If the machine that hosted the finished executor gets terminated, does the
> master still have the results from the finished tasks (and thus doesn't
> restart those finished tasks)?
>
> Or does the master require that all the executors be alive during the
> entire map-collect cycle?
>
> Thanks!
>


Re: Executor shutdown hooks?

2016-04-06 Thread Reynold Xin
On Wed, Apr 6, 2016 at 4:39 PM, Sung Hwan Chung 
wrote:

> My option so far seems to be using JVM's shutdown hook, but I was
> wondering if Spark itself had an API for tasks.
>

Spark would be using that under the hood anyway, so you might as well just
use the jvm shutdown hook directly.


Re: Switch RDD-based MLlib APIs to maintenance mode in Spark 2.0

2016-04-05 Thread Reynold Xin
+1

This is a no brainer IMO.


On Tue, Apr 5, 2016 at 7:32 PM, Joseph Bradley 
wrote:

> +1  By the way, the JIRA for tracking (Scala) API parity is:
> https://issues.apache.org/jira/browse/SPARK-4591
>
> On Tue, Apr 5, 2016 at 4:58 PM, Matei Zaharia 
> wrote:
>
>> This sounds good to me as well. The one thing we should pay attention to
>> is how we update the docs so that people know to start with the spark.ml
>> classes. Right now the docs list spark.mllib first and also seem more
>> comprehensive in that area than in spark.ml, so maybe people naturally
>> move towards that.
>>
>> Matei
>>
>> On Apr 5, 2016, at 4:44 PM, Xiangrui Meng  wrote:
>>
>> Yes, DB (cc'ed) is working on porting the local linear algebra library
>> over (SPARK-13944). There are also frequent pattern mining algorithms we
>> need to port over in order to reach feature parity. -Xiangrui
>>
>> On Tue, Apr 5, 2016 at 12:08 PM Shivaram Venkataraman <
>> shiva...@eecs.berkeley.edu> wrote:
>>
>>> Overall this sounds good to me. One question I have is that in
>>> addition to the ML algorithms we have a number of linear algebra
>>> (various distributed matrices) and statistical methods in the
>>> spark.mllib package. Is the plan to port or move these to the spark.ml
>>> namespace in the 2.x series ?
>>>
>>> Thanks
>>> Shivaram
>>>
>>> On Tue, Apr 5, 2016 at 11:48 AM, Sean Owen  wrote:
>>> > FWIW, all of that sounds like a good plan to me. Developing one API is
>>> > certainly better than two.
>>> >
>>> > On Tue, Apr 5, 2016 at 7:01 PM, Xiangrui Meng 
>>> wrote:
>>> >> Hi all,
>>> >>
>>> >> More than a year ago, in Spark 1.2 we introduced the ML pipeline API
>>> built
>>> >> on top of Spark SQL’s DataFrames. Since then the new DataFrame-based
>>> API has
>>> >> been developed under the spark.ml package, while the old RDD-based
>>> API has
>>> >> been developed in parallel under the spark.mllib package. While it was
>>> >> easier to implement and experiment with new APIs under a new package,
>>> it
>>> >> became harder and harder to maintain as both packages grew bigger and
>>> >> bigger. And new users are often confused by having two sets of APIs
>>> with
>>> >> overlapped functions.
>>> >>
>>> >> We started to recommend the DataFrame-based API over the RDD-based
>>> API in
>>> >> Spark 1.5 for its versatility and flexibility, and we saw the
>>> development
>>> >> and the usage gradually shifting to the DataFrame-based API. Just
>>> counting
>>> >> the lines of Scala code, from 1.5 to the current master we added
>>> ~1
>>> >> lines to the DataFrame-based API while ~700 to the RDD-based API. So,
>>> to
>>> >> gather more resources on the development of the DataFrame-based API
>>> and to
>>> >> help users migrate over sooner, I want to propose switching RDD-based
>>> MLlib
>>> >> APIs to maintenance mode in Spark 2.0. What does it mean exactly?
>>> >>
>>> >> * We do not accept new features in the RDD-based spark.mllib package,
>>> unless
>>> >> they block implementing new features in the DataFrame-based spark.ml
>>> >> package.
>>> >> * We still accept bug fixes in the RDD-based API.
>>> >> * We will add more features to the DataFrame-based API in the 2.x
>>> series to
>>> >> reach feature parity with the RDD-based API.
>>> >> * Once we reach feature parity (possibly in Spark 2.2), we will
>>> deprecate
>>> >> the RDD-based API.
>>> >> * We will remove the RDD-based API from the main Spark repo in Spark
>>> 3.0.
>>> >>
>>> >> Though the RDD-based API is already in de facto maintenance mode, this
>>> >> announcement will make it clear and hence important to both MLlib
>>> developers
>>> >> and users. So we’d greatly appreciate your feedback!
>>> >>
>>> >> (As a side note, people sometimes use “Spark ML” to refer to the
>>> >> DataFrame-based API or even the entire MLlib component. This also
>>> causes
>>> >> confusion. To be clear, “Spark ML” is not an official name and there
>>> are no
>>> >> plans to rename MLlib to “Spark ML” at this time.)
>>> >>
>>> >> Best,
>>> >> Xiangrui
>>> >
>>> > -
>>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> > For additional commands, e-mail: user-h...@spark.apache.org
>>> >
>>>
>>
>>
>


Re: Eliminating shuffle write and spill disk IO reads/writes in Spark

2016-04-01 Thread Reynold Xin
It's spark.local.dir.


On Fri, Apr 1, 2016 at 3:37 PM, Yong Zhang <java8...@hotmail.com> wrote:

> Is there a configuration in the Spark of location of "shuffle spilling"? I
> didn't recall ever see that one. Can you share it out?
>
> It will be good for a test writing to RAM Disk if that configuration is
> available.
>
> Thanks
>
> Yong
>
> --
> From: r...@databricks.com
> Date: Fri, 1 Apr 2016 15:32:23 -0700
> Subject: Re: Eliminating shuffle write and spill disk IO reads/writes in
> Spark
> To: slavi...@gmail.com
> CC: mri...@gmail.com; d...@spark.apache.org; user@spark.apache.org
>
>
> Michael - I'm not sure if you actually read my email, but spill has
> nothing to do with the shuffle files on disk. It was for the partitioning
> (i.e. sorting) process. If that flag is off, Spark will just run out of
> memory when data doesn't fit in memory.
>
>
> On Fri, Apr 1, 2016 at 3:28 PM, Michael Slavitch <slavi...@gmail.com>
> wrote:
>
> RAMdisk is a fine interim step but there is a lot of layers eliminated by
> keeping things in memory unless there is need for spillover.   At one time
> there was support for turning off spilling.  That was eliminated.  Why?
>
>
> On Fri, Apr 1, 2016, 6:05 PM Mridul Muralidharan <mri...@gmail.com> wrote:
>
> I think Reynold's suggestion of using ram disk would be a good way to
> test if these are the bottlenecks or something else is.
> For most practical purposes, pointing local dir to ramdisk should
> effectively give you 'similar' performance as shuffling from memory.
>
> Are there concerns with taking that approach to test ? (I dont see
> any, but I am not sure if I missed something).
>
>
> Regards,
> Mridul
>
>
>
>
> On Fri, Apr 1, 2016 at 2:10 PM, Michael Slavitch <slavi...@gmail.com>
> wrote:
> > I totally disagree that it’s not a problem.
> >
> > - Network fetch throughput on 40G Ethernet exceeds the throughput of NVME
> > drives.
> > - What Spark is depending on is Linux’s IO cache as an effective buffer
> pool
> > This is fine for small jobs but not for jobs with datasets in the TB/node
> > range.
> > - On larger jobs flushing the cache causes Linux to block.
> > - On a modern 56-hyperthread 2-socket host the latency caused by multiple
> > executors writing out to disk increases greatly.
> >
> > I thought the whole point of Spark was in-memory computing?  It’s in fact
> > in-memory for some things but  use spark.local.dir as a buffer pool of
> > others.
> >
> > Hence, the performance of  Spark is gated by the performance of
> > spark.local.dir, even on large memory systems.
> >
> > "Currently it is not possible to not write shuffle files to disk.”
> >
> > What changes >would< make it possible?
> >
> > The only one that seems possible is to clone the shuffle service and
> make it
> > in-memory.
> >
> >
> >
> >
> >
> > On Apr 1, 2016, at 4:57 PM, Reynold Xin <r...@databricks.com> wrote:
> >
> > spark.shuffle.spill actually has nothing to do with whether we write
> shuffle
> > files to disk. Currently it is not possible to not write shuffle files to
> > disk, and typically it is not a problem because the network fetch
> throughput
> > is lower than what disks can sustain. In most cases, especially with
> SSDs,
> > there is little difference between putting all of those in memory and on
> > disk.
> >
> > However, it is becoming more common to run Spark on a few number of beefy
> > nodes (e.g. 2 nodes each with 1TB of RAM). We do want to look into
> improving
> > performance for those. Meantime, you can setup local ramdisks on each
> node
> > for shuffle writes.
> >
> >
> >
> > On Fri, Apr 1, 2016 at 11:32 AM, Michael Slavitch <slavi...@gmail.com>
> > wrote:
> >>
> >> Hello;
> >>
> >> I’m working on spark with very large memory systems (2TB+) and notice
> that
> >> Spark spills to disk in shuffle.  Is there a way to force spark to stay
> in
> >> memory when doing shuffle operations?   The goal is to keep the shuffle
> data
> >> either in the heap or in off-heap memory (in 1.6.x) and never touch the
> IO
> >> subsystem.  I am willing to have the job fail if it runs out of RAM.
> >>
> >> spark.shuffle.spill true  is deprecated in 1.6 and does not work in
> >> Tungsten sort in 1.5.x
> >>
> >> "WARN UnsafeShuffleManager: spark.shuffle.spill was set to false, but
> this
> >> is ignored by the tungsten-sort shuffle manager; its optimized shuffles
> will
> >> continue to spill to disk when necessary.”
> >>
> >> If this is impossible via configuration changes what code changes would
> be
> >> needed to accomplish this?
> >>
> >>
> >>
> >>
> >>
> >> -
> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> For additional commands, e-mail: user-h...@spark.apache.org
> >>
> >
> >
>
> --
> Michael Slavitch
> 62 Renfrew Ave.
> Ottawa Ontario
> K1S 1Z5
>
>
>


Re: Eliminating shuffle write and spill disk IO reads/writes in Spark

2016-04-01 Thread Reynold Xin
Michael - I'm not sure if you actually read my email, but spill has nothing
to do with the shuffle files on disk. It was for the partitioning (i.e.
sorting) process. If that flag is off, Spark will just run out of memory
when data doesn't fit in memory.


On Fri, Apr 1, 2016 at 3:28 PM, Michael Slavitch <slavi...@gmail.com> wrote:

> RAMdisk is a fine interim step but there is a lot of layers eliminated by
> keeping things in memory unless there is need for spillover.   At one time
> there was support for turning off spilling.  That was eliminated.  Why?
>
>
> On Fri, Apr 1, 2016, 6:05 PM Mridul Muralidharan <mri...@gmail.com> wrote:
>
>> I think Reynold's suggestion of using ram disk would be a good way to
>> test if these are the bottlenecks or something else is.
>> For most practical purposes, pointing local dir to ramdisk should
>> effectively give you 'similar' performance as shuffling from memory.
>>
>> Are there concerns with taking that approach to test ? (I dont see
>> any, but I am not sure if I missed something).
>>
>>
>> Regards,
>> Mridul
>>
>>
>>
>>
>> On Fri, Apr 1, 2016 at 2:10 PM, Michael Slavitch <slavi...@gmail.com>
>> wrote:
>> > I totally disagree that it’s not a problem.
>> >
>> > - Network fetch throughput on 40G Ethernet exceeds the throughput of
>> NVME
>> > drives.
>> > - What Spark is depending on is Linux’s IO cache as an effective buffer
>> pool
>> > This is fine for small jobs but not for jobs with datasets in the
>> TB/node
>> > range.
>> > - On larger jobs flushing the cache causes Linux to block.
>> > - On a modern 56-hyperthread 2-socket host the latency caused by
>> multiple
>> > executors writing out to disk increases greatly.
>> >
>> > I thought the whole point of Spark was in-memory computing?  It’s in
>> fact
>> > in-memory for some things but  use spark.local.dir as a buffer pool of
>> > others.
>> >
>> > Hence, the performance of  Spark is gated by the performance of
>> > spark.local.dir, even on large memory systems.
>> >
>> > "Currently it is not possible to not write shuffle files to disk.”
>> >
>> > What changes >would< make it possible?
>> >
>> > The only one that seems possible is to clone the shuffle service and
>> make it
>> > in-memory.
>> >
>> >
>> >
>> >
>> >
>> > On Apr 1, 2016, at 4:57 PM, Reynold Xin <r...@databricks.com> wrote:
>> >
>> > spark.shuffle.spill actually has nothing to do with whether we write
>> shuffle
>> > files to disk. Currently it is not possible to not write shuffle files
>> to
>> > disk, and typically it is not a problem because the network fetch
>> throughput
>> > is lower than what disks can sustain. In most cases, especially with
>> SSDs,
>> > there is little difference between putting all of those in memory and on
>> > disk.
>> >
>> > However, it is becoming more common to run Spark on a few number of
>> beefy
>> > nodes (e.g. 2 nodes each with 1TB of RAM). We do want to look into
>> improving
>> > performance for those. Meantime, you can setup local ramdisks on each
>> node
>> > for shuffle writes.
>> >
>> >
>> >
>> > On Fri, Apr 1, 2016 at 11:32 AM, Michael Slavitch <slavi...@gmail.com>
>> > wrote:
>> >>
>> >> Hello;
>> >>
>> >> I’m working on spark with very large memory systems (2TB+) and notice
>> that
>> >> Spark spills to disk in shuffle.  Is there a way to force spark to
>> stay in
>> >> memory when doing shuffle operations?   The goal is to keep the
>> shuffle data
>> >> either in the heap or in off-heap memory (in 1.6.x) and never touch
>> the IO
>> >> subsystem.  I am willing to have the job fail if it runs out of RAM.
>> >>
>> >> spark.shuffle.spill true  is deprecated in 1.6 and does not work in
>> >> Tungsten sort in 1.5.x
>> >>
>> >> "WARN UnsafeShuffleManager: spark.shuffle.spill was set to false, but
>> this
>> >> is ignored by the tungsten-sort shuffle manager; its optimized
>> shuffles will
>> >> continue to spill to disk when necessary.”
>> >>
>> >> If this is impossible via configuration changes what code changes
>> would be
>> >> needed to accomplish this?
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> -
>> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >> For additional commands, e-mail: user-h...@spark.apache.org
>> >>
>> >
>> >
>>
> --
> Michael Slavitch
> 62 Renfrew Ave.
> Ottawa Ontario
> K1S 1Z5
>


Re: Eliminating shuffle write and spill disk IO reads/writes in Spark

2016-04-01 Thread Reynold Xin
spark.shuffle.spill actually has nothing to do with whether we write
shuffle files to disk. Currently it is not possible to not write shuffle
files to disk, and typically it is not a problem because the network fetch
throughput is lower than what disks can sustain. In most cases, especially
with SSDs, there is little difference between putting all of those in
memory and on disk.

However, it is becoming more common to run Spark on a few number of beefy
nodes (e.g. 2 nodes each with 1TB of RAM). We do want to look into
improving performance for those. Meantime, you can setup local ramdisks on
each node for shuffle writes.



On Fri, Apr 1, 2016 at 11:32 AM, Michael Slavitch 
wrote:

> Hello;
>
> I’m working on spark with very large memory systems (2TB+) and notice that
> Spark spills to disk in shuffle.  Is there a way to force spark to stay in
> memory when doing shuffle operations?   The goal is to keep the shuffle
> data either in the heap or in off-heap memory (in 1.6.x) and never touch
> the IO subsystem.  I am willing to have the job fail if it runs out of RAM.
>
> spark.shuffle.spill true  is deprecated in 1.6 and does not work in
> Tungsten sort in 1.5.x
>
> "WARN UnsafeShuffleManager: spark.shuffle.spill was set to false, but this
> is ignored by the tungsten-sort shuffle manager; its optimized shuffles
> will continue to spill to disk when necessary.”
>
> If this is impossible via configuration changes what code changes would be
> needed to accomplish this?
>
>
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: df.dtypes -> pyspark.sql.types

2016-03-20 Thread Reynold Xin
We probably should have the alias. Is this still a problem on master
branch?

On Wed, Mar 16, 2016 at 9:40 AM, Ruslan Dautkhanov 
wrote:

> Running following:
>
> #fix schema for gaid which should not be Double
>> from pyspark.sql.types import *
>> customSchema = StructType()
>> for (col,typ) in tsp_orig.dtypes:
>> if col=='Agility_GAID':
>> typ='string'
>> customSchema.add(col,typ,True)
>
>
> Getting
>
>   ValueError: Could not parse datatype: bigint
>
>
> Looks like pyspark.sql.types doesn't know anything about bigint..
> Should it be aliased to LongType in pyspark.sql.types?
>
> Thanks
>
>
> On Wed, Mar 16, 2016 at 10:18 AM, Ruslan Dautkhanov 
> wrote:
>
>> Hello,
>>
>> Looking at
>>
>> https://spark.apache.org/docs/1.5.1/api/python/_modules/pyspark/sql/types.html
>>
>> and can't wrap my head around how to convert string data types names to
>> actual
>> pyspark.sql.types data types?
>>
>> Does pyspark.sql.types has an interface to return StringType() for
>> "string",
>> IntegerType() for "integer" etc? If it doesn't exist it would be great to
>> have such a
>> mapping function.
>>
>> Thank you.
>>
>>
>> ps. I have a data frame, and use its dtypes to loop through all columns
>> to fix a few
>> columns' data types as a workaround for SPARK-13866.
>>
>>
>> --
>> Ruslan Dautkhanov
>>
>
>


Re: [discuss] making SparkEnv private in Spark 2.0

2016-03-19 Thread Reynold Xin
On Wed, Mar 16, 2016 at 3:29 PM, Mridul Muralidharan 
wrote:

> b) Shuffle manager (to get shuffle reader)
>

What's the use case for shuffle manager/reader? This seems like using super
internal APIs in applications.


[discuss] making SparkEnv private in Spark 2.0

2016-03-19 Thread Reynold Xin
Any objections? Please articulate your use case. SparkEnv is a weird one
because it was documented as "private" but not marked as so in class
visibility.

 * NOTE: This is not intended for external use. This is exposed for Shark
and may be made private
 *   in a future release.


I do see Hive using it to get the config variable. That can probably be
propagated through other means.


Re: Spark Scheduler creating Straggler Node

2016-03-08 Thread Reynold Xin
You just want to be able to replicate hot cached blocks right?

On Tuesday, March 8, 2016, Prabhu Joseph  wrote:

> Hi All,
>
> When a Spark Job is running, and one of the Spark Executor on Node A
> has some partitions cached. Later for some other stage, Scheduler tries to
> assign a task to Node A to process a cached partition (PROCESS_LOCAL). But
> meanwhile the Node A is occupied with some other
> tasks and got busy. Scheduler waits for spark.locality.wait interval and
> times out and tries to find some other node B which is NODE_LOCAL. The
> executor on Node B will try to get the cached partition from Node A which
> adds network IO to node and also some extra CPU for I/O. Eventually,
> every node will have a task that is waiting to fetch some cached partition
> from node A and so the spark job / cluster is basically blocked on a single
> node.
>
> Spark JIRA is created https://issues.apache.org/jira/browse/SPARK-13718
>
> Beginning from Spark 1.2, Spark introduced External Shuffle Service to
> enable executors fetch shuffle files from an external service instead of
> from each other which will offload the load on Spark Executors.
>
> We want to check whether a similar thing of an External Service is
> implemented for transferring the cached partition to other executors.
>
>
> Thanks, Prabhu Joseph
>
>
>


Re: Selecting column in dataframe created with incompatible schema causes AnalysisException

2016-03-02 Thread Reynold Xin
Thanks. Once you create the jira just reply to this email with the link.

On Wednesday, March 2, 2016, Ewan Leith  wrote:

> Thanks, I'll create the JIRA for it. Happy to help contribute to a patch if 
> we can, not sure if my own scala skills will be up to it but perhaps one of 
> my colleagues' will :)
>
> Ewan
>
> I don't think that exists right now, but it's definitely a good option to
> have. I myself have run into this issue a few times.
>
> Can you create a JIRA ticket so we can track it? Would be even better if
> you are interested in working on a patch! Thanks.
>
>
> On Wed, Mar 2, 2016 at 11:51 AM, Ewan Leith  > wrote:
>
>> Hi Reynold, yes that would be perfect for our use case.
>>
>> I assume it doesn't exist though, otherwise I really need to go re-read the 
>> docs!
>>
>> Thanks to both of you for replying by the way, I know you must be hugely 
>> busy.
>>
>> Ewan
>>
>> Are you looking for "relaxed" mode that simply return nulls for fields
>> that doesn't exist or have incompatible schema?
>>
>>
>> On Wed, Mar 2, 2016 at 11:12 AM, Ewan Leith > > wrote:
>>
>>> Thanks Michael, it's not a great example really, as the data I'm working 
>>> with has some source files that do fit the schema, and some that don't (out 
>>> of millions that do work, perhaps 10 might not).
>>>
>>> In an ideal world for us the select would probably return the valid records 
>>> only.
>>>
>>> We're trying out the new dataset APIs to see if we can do some 
>>> pre-filtering that way.
>>>
>>> Thanks,
>>> Ewan
>>>
>>> -dev +user
>>>
>>> StructType(StructField(data,ArrayType(StructType(StructField(
 *stuff,ArrayType(*StructType(StructField(onetype,ArrayType(StructType(StructField(id,LongType,true),
 StructField(name,StringType,true)),true),true), StructField(othertype,
 ArrayType(StructType(StructField(company,StringType,true),
 StructField(id,LongType,true)),true),true)),true),true)),true),true))
>>>
>>>
>>> Its not a great error message, but as the schema above shows, stuff is
>>> an array, not a struct.  So, you need to pick a particular element (using
>>> []) before you can pull out a specific field.  It would be easier to see
>>> this if you ran sqlContext.read.json(s1Rdd).printSchema(), which gives
>>> you a tree view.  Try the following.
>>>
>>>
>>> sqlContext.read.schema(s1schema).json(s2Rdd).select("data.stuff[0].onetype")
>>>
>>> On Wed, Mar 2, 2016 at 1:44 AM, Ewan Leith >> > wrote:
>>>
 When you create a dataframe using the *sqlContext.read.schema()* API,
 if you pass in a schema that’s compatible with some of the records, but
 incompatible with others, it seems you can’t do a .select on the
 problematic columns, instead you get an AnalysisException error.



 I know loading the wrong data isn’t good behaviour, but if you’re
 reading data from (for example) JSON files, there’s going to be malformed
 files along the way. I think it would be nice to handle this error in a
 nicer way, though I don’t know the best way to approach it.



 Before I raise a JIRA ticket about it, would people consider this to be
 a bug or expected behaviour?



 I’ve attached a couple of sample JSON files and the steps below to
 reproduce it, by taking the inferred schema from the simple1.json file, and
 applying it to a union of simple1.json and simple2.json. You can visually
 see the data has been parsed as I think you’d want if you do a .select on
 the parent column and print out the output, but when you do a select on the
 problem column you instead get an exception.



 *scala> val s1Rdd = sc.wholeTextFiles("/tmp/simple1.json").map(x =>
 x._2)*

 s1Rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[171] at map
 at :27



 *scala> val s1schema = sqlContext.read.json(s1Rdd).schema*

 s1schema: org.apache.spark.sql.types.StructType =
 StructType(StructField(data,ArrayType(StructType(StructField(stuff,ArrayType(StructType(StructField(onetype,ArrayType(StructType(StructField(id,LongType,true),
 StructField(name,StringType,true)),true),true),
 StructField(othertype,ArrayType(StructType(StructField(company,StringType,true),
 StructField(id,LongType,true)),true),true)),true),true)),true),true))



 *scala>
 sqlContext.read.schema(s1schema).json(s2Rdd).select("data.stuff").take(2).foreach(println)*

 [WrappedArray(WrappedArray([WrappedArray([1,John Doe], [2,Don
 Joeh]),null], [null,WrappedArray([ACME,2])]))]

 [WrappedArray(WrappedArray([null,WrappedArray([null,1], [null,2])],
 [WrappedArray([2,null]),null]))]


Re: Selecting column in dataframe created with incompatible schema causes AnalysisException

2016-03-02 Thread Reynold Xin
I don't think that exists right now, but it's definitely a good option to
have. I myself have run into this issue a few times.

Can you create a JIRA ticket so we can track it? Would be even better if
you are interested in working on a patch! Thanks.


On Wed, Mar 2, 2016 at 11:51 AM, Ewan Leith 
wrote:

> Hi Reynold, yes that would be perfect for our use case.
>
> I assume it doesn't exist though, otherwise I really need to go re-read the 
> docs!
>
> Thanks to both of you for replying by the way, I know you must be hugely busy.
>
> Ewan
>
> Are you looking for "relaxed" mode that simply return nulls for fields
> that doesn't exist or have incompatible schema?
>
>
> On Wed, Mar 2, 2016 at 11:12 AM, Ewan Leith 
> wrote:
>
>> Thanks Michael, it's not a great example really, as the data I'm working 
>> with has some source files that do fit the schema, and some that don't (out 
>> of millions that do work, perhaps 10 might not).
>>
>> In an ideal world for us the select would probably return the valid records 
>> only.
>>
>> We're trying out the new dataset APIs to see if we can do some pre-filtering 
>> that way.
>>
>> Thanks,
>> Ewan
>>
>> -dev +user
>>
>> StructType(StructField(data,ArrayType(StructType(StructField(
>>> *stuff,ArrayType(*StructType(StructField(onetype,ArrayType(StructType(StructField(id,LongType,true),
>>> StructField(name,StringType,true)),true),true), StructField(othertype,
>>> ArrayType(StructType(StructField(company,StringType,true),
>>> StructField(id,LongType,true)),true),true)),true),true)),true),true))
>>
>>
>> Its not a great error message, but as the schema above shows, stuff is
>> an array, not a struct.  So, you need to pick a particular element (using
>> []) before you can pull out a specific field.  It would be easier to see
>> this if you ran sqlContext.read.json(s1Rdd).printSchema(), which gives
>> you a tree view.  Try the following.
>>
>>
>> sqlContext.read.schema(s1schema).json(s2Rdd).select("data.stuff[0].onetype")
>>
>> On Wed, Mar 2, 2016 at 1:44 AM, Ewan Leith 
>> wrote:
>>
>>> When you create a dataframe using the *sqlContext.read.schema()* API,
>>> if you pass in a schema that’s compatible with some of the records, but
>>> incompatible with others, it seems you can’t do a .select on the
>>> problematic columns, instead you get an AnalysisException error.
>>>
>>>
>>>
>>> I know loading the wrong data isn’t good behaviour, but if you’re
>>> reading data from (for example) JSON files, there’s going to be malformed
>>> files along the way. I think it would be nice to handle this error in a
>>> nicer way, though I don’t know the best way to approach it.
>>>
>>>
>>>
>>> Before I raise a JIRA ticket about it, would people consider this to be
>>> a bug or expected behaviour?
>>>
>>>
>>>
>>> I’ve attached a couple of sample JSON files and the steps below to
>>> reproduce it, by taking the inferred schema from the simple1.json file, and
>>> applying it to a union of simple1.json and simple2.json. You can visually
>>> see the data has been parsed as I think you’d want if you do a .select on
>>> the parent column and print out the output, but when you do a select on the
>>> problem column you instead get an exception.
>>>
>>>
>>>
>>> *scala> val s1Rdd = sc.wholeTextFiles("/tmp/simple1.json").map(x =>
>>> x._2)*
>>>
>>> s1Rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[171] at map
>>> at :27
>>>
>>>
>>>
>>> *scala> val s1schema = sqlContext.read.json(s1Rdd).schema*
>>>
>>> s1schema: org.apache.spark.sql.types.StructType =
>>> StructType(StructField(data,ArrayType(StructType(StructField(stuff,ArrayType(StructType(StructField(onetype,ArrayType(StructType(StructField(id,LongType,true),
>>> StructField(name,StringType,true)),true),true),
>>> StructField(othertype,ArrayType(StructType(StructField(company,StringType,true),
>>> StructField(id,LongType,true)),true),true)),true),true)),true),true))
>>>
>>>
>>>
>>> *scala>
>>> sqlContext.read.schema(s1schema).json(s2Rdd).select("data.stuff").take(2).foreach(println)*
>>>
>>> [WrappedArray(WrappedArray([WrappedArray([1,John Doe], [2,Don
>>> Joeh]),null], [null,WrappedArray([ACME,2])]))]
>>>
>>> [WrappedArray(WrappedArray([null,WrappedArray([null,1], [null,2])],
>>> [WrappedArray([2,null]),null]))]
>>>
>>>
>>>
>>> *scala>
>>> sqlContext.read.schema(s1schema).json(s2Rdd).select("data.stuff.onetype")*
>>>
>>> org.apache.spark.sql.AnalysisException: cannot resolve
>>> 'data.stuff[onetype]' due to data type mismatch: argument 2 requires
>>> integral type, however, 'onetype' is of string type.;
>>>
>>> at
>>> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
>>>
>>> at
>>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:65)
>>>
>>> at
>>> 

Re: Selecting column in dataframe created with incompatible schema causes AnalysisException

2016-03-02 Thread Reynold Xin
Are you looking for "relaxed" mode that simply return nulls for fields that
doesn't exist or have incompatible schema?


On Wed, Mar 2, 2016 at 11:12 AM, Ewan Leith 
wrote:

> Thanks Michael, it's not a great example really, as the data I'm working with 
> has some source files that do fit the schema, and some that don't (out of 
> millions that do work, perhaps 10 might not).
>
> In an ideal world for us the select would probably return the valid records 
> only.
>
> We're trying out the new dataset APIs to see if we can do some pre-filtering 
> that way.
>
> Thanks,
> Ewan
>
> -dev +user
>
> StructType(StructField(data,ArrayType(StructType(StructField(
>> *stuff,ArrayType(*StructType(StructField(onetype,ArrayType(StructType(StructField(id,LongType,true),
>> StructField(name,StringType,true)),true),true), StructField(othertype,
>> ArrayType(StructType(StructField(company,StringType,true),
>> StructField(id,LongType,true)),true),true)),true),true)),true),true))
>
>
> Its not a great error message, but as the schema above shows, stuff is an
> array, not a struct.  So, you need to pick a particular element (using [])
> before you can pull out a specific field.  It would be easier to see this
> if you ran sqlContext.read.json(s1Rdd).printSchema(), which gives you a
> tree view.  Try the following.
>
>
> sqlContext.read.schema(s1schema).json(s2Rdd).select("data.stuff[0].onetype")
>
> On Wed, Mar 2, 2016 at 1:44 AM, Ewan Leith 
> wrote:
>
>> When you create a dataframe using the *sqlContext.read.schema()* API, if
>> you pass in a schema that’s compatible with some of the records, but
>> incompatible with others, it seems you can’t do a .select on the
>> problematic columns, instead you get an AnalysisException error.
>>
>>
>>
>> I know loading the wrong data isn’t good behaviour, but if you’re reading
>> data from (for example) JSON files, there’s going to be malformed files
>> along the way. I think it would be nice to handle this error in a nicer
>> way, though I don’t know the best way to approach it.
>>
>>
>>
>> Before I raise a JIRA ticket about it, would people consider this to be a
>> bug or expected behaviour?
>>
>>
>>
>> I’ve attached a couple of sample JSON files and the steps below to
>> reproduce it, by taking the inferred schema from the simple1.json file, and
>> applying it to a union of simple1.json and simple2.json. You can visually
>> see the data has been parsed as I think you’d want if you do a .select on
>> the parent column and print out the output, but when you do a select on the
>> problem column you instead get an exception.
>>
>>
>>
>> *scala> val s1Rdd = sc.wholeTextFiles("/tmp/simple1.json").map(x => x._2)*
>>
>> s1Rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[171] at map at
>> :27
>>
>>
>>
>> *scala> val s1schema = sqlContext.read.json(s1Rdd).schema*
>>
>> s1schema: org.apache.spark.sql.types.StructType =
>> StructType(StructField(data,ArrayType(StructType(StructField(stuff,ArrayType(StructType(StructField(onetype,ArrayType(StructType(StructField(id,LongType,true),
>> StructField(name,StringType,true)),true),true),
>> StructField(othertype,ArrayType(StructType(StructField(company,StringType,true),
>> StructField(id,LongType,true)),true),true)),true),true)),true),true))
>>
>>
>>
>> *scala>
>> sqlContext.read.schema(s1schema).json(s2Rdd).select("data.stuff").take(2).foreach(println)*
>>
>> [WrappedArray(WrappedArray([WrappedArray([1,John Doe], [2,Don
>> Joeh]),null], [null,WrappedArray([ACME,2])]))]
>>
>> [WrappedArray(WrappedArray([null,WrappedArray([null,1], [null,2])],
>> [WrappedArray([2,null]),null]))]
>>
>>
>>
>> *scala>
>> sqlContext.read.schema(s1schema).json(s2Rdd).select("data.stuff.onetype")*
>>
>> org.apache.spark.sql.AnalysisException: cannot resolve
>> 'data.stuff[onetype]' due to data type mismatch: argument 2 requires
>> integral type, however, 'onetype' is of string type.;
>>
>> at
>> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
>>
>> at
>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:65)
>>
>> at
>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:57)
>>
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:319)
>>
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:319)
>>
>>
>>
>> (The full exception is attached too).
>>
>>
>>
>> What do people think, is this a bug?
>>
>>
>>
>> Thanks,
>>
>> Ewan
>>
>>
>> -
>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>> For additional commands, e-mail: dev-h...@spark.apache.org
>>
>
>


Re: [Proposal] Enabling time series analysis on spark metrics

2016-03-01 Thread Reynold Xin
Is the suggestion just to use a different config (and maybe fallback to
appid) in order to publish metrics? Seems reasonable.


On Tue, Mar 1, 2016 at 8:17 AM, Karan Kumar 
wrote:

> +dev mailing list
>
> Time series analysis on metrics becomes quite useful when running spark
> jobs using a workflow manager like oozie.
>
> Would love to take this up if the community thinks its worthwhile.
>
> On Tue, Feb 23, 2016 at 2:59 PM, Karan Kumar 
> wrote:
>
>> HI
>>
>> Spark at the moment uses application ID to report metrics. I was thinking
>> that if we can create an option to export metrics on a user-controlled key.
>> This will allow us to do time series analysis on counters by dumping these
>> counters in a DB such as graphite.
>>
>> One of the approaches I had in mind was allowing a user to set a property
>> via the spark client. If that property is set, use the property value to
>> report metrics else use the current implementation
>> of
>> reporting metrics on appid.
>>
>> Thoughts?
>>
>> --
>> Thanks
>> Karan
>>
>
>
>
> --
> Thanks
> Karan
>


Re: Is spark.driver.maxResultSize used correctly ?

2016-03-01 Thread Reynold Xin
How big of a deal is this though? If I am reading your email correctly,
either way this job will fail. You simply want it to fail earlier in the
executor side, rather than collecting it and fail on the driver side?

On Sunday, February 28, 2016, Jeff Zhang <zjf...@gmail.com> wrote:

> data skew might be possible, but not the common case. I think we should
> design for the common case, for the skew case, we may can set some
> parameter of fraction to allow user to tune it.
>
> On Sat, Feb 27, 2016 at 4:51 PM, Reynold Xin <r...@databricks.com
> <javascript:_e(%7B%7D,'cvml','r...@databricks.com');>> wrote:
>
>> But sometimes you might have skew and almost all the result data are in
>> one or a few tasks though.
>>
>>
>> On Friday, February 26, 2016, Jeff Zhang <zjf...@gmail.com
>> <javascript:_e(%7B%7D,'cvml','zjf...@gmail.com');>> wrote:
>>
>>>
>>> My job get this exception very easily even when I set large value of
>>> spark.driver.maxResultSize. After checking the spark code, I found
>>> spark.driver.maxResultSize is also used in Executor side to decide whether
>>> DirectTaskResult/InDirectTaskResult sent. This doesn't make sense to me.
>>> Using  spark.driver.maxResultSize / taskNum might be more proper. Because
>>> if  spark.driver.maxResultSize is 1g and we have 10 tasks each has 200m
>>> output. Then even the output of each task is less than
>>>  spark.driver.maxResultSize so DirectTaskResult will be sent to driver, but
>>> the total result size is 2g which will cause exception in driver side.
>>>
>>>
>>> 16/02/26 10:10:49 INFO DAGScheduler: Job 4 failed: treeAggregate at
>>> LogisticRegression.scala:283, took 33.796379 s
>>>
>>> Exception in thread "main" org.apache.spark.SparkException: Job aborted
>>> due to stage failure: Total size of serialized results of 1 tasks (1085.0
>>> MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: Is spark.driver.maxResultSize used correctly ?

2016-02-27 Thread Reynold Xin
But sometimes you might have skew and almost all the result data are in one
or a few tasks though.

On Friday, February 26, 2016, Jeff Zhang  wrote:

>
> My job get this exception very easily even when I set large value of
> spark.driver.maxResultSize. After checking the spark code, I found
> spark.driver.maxResultSize is also used in Executor side to decide whether
> DirectTaskResult/InDirectTaskResult sent. This doesn't make sense to me.
> Using  spark.driver.maxResultSize / taskNum might be more proper. Because
> if  spark.driver.maxResultSize is 1g and we have 10 tasks each has 200m
> output. Then even the output of each task is less than
>  spark.driver.maxResultSize so DirectTaskResult will be sent to driver, but
> the total result size is 2g which will cause exception in driver side.
>
>
> 16/02/26 10:10:49 INFO DAGScheduler: Job 4 failed: treeAggregate at
> LogisticRegression.scala:283, took 33.796379 s
>
> Exception in thread "main" org.apache.spark.SparkException: Job aborted
> due to stage failure: Total size of serialized results of 1 tasks (1085.0
> MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: DirectFileOutputCommiter

2016-02-26 Thread Reynold Xin
It could lose data in speculation mode, or if any job fails.

On Fri, Feb 26, 2016 at 3:45 AM, Igor Berman  wrote:

> Takeshi, do you know the reason why they wanted to remove this commiter in
> SPARK-10063?
> the jira has no info inside
> as far as I understand the direct committer can't be used when either of
> two is true
> 1. speculation mode
> 2. append mode(ie. not creating new version of data but appending to
> existing data)
>
> On 26 February 2016 at 08:24, Takeshi Yamamuro 
> wrote:
>
>> Hi,
>>
>> Great work!
>> What is the concrete performance gain of the committer on s3?
>> I'd like to know.
>>
>> I think there is no direct committer for files because these kinds of
>> committer has risks
>> to loss data (See: SPARK-10063).
>> Until this resolved, ISTM files cannot support direct commits.
>>
>> thanks,
>>
>>
>>
>> On Fri, Feb 26, 2016 at 8:39 AM, Teng Qiu  wrote:
>>
>>> yes, should be this one
>>> https://gist.github.com/aarondav/c513916e72101bbe14ec
>>>
>>> then need to set it in spark-defaults.conf :
>>> https://github.com/zalando/spark/commit/3473f3f1ef27830813c1e0b3686e96a55f49269c#diff-f7a46208be9e80252614369be6617d65R13
>>>
>>> Am Freitag, 26. Februar 2016 schrieb Yin Yang :
>>> > The header of DirectOutputCommitter.scala says Databricks.
>>> > Did you get it from Databricks ?
>>> > On Thu, Feb 25, 2016 at 3:01 PM, Teng Qiu  wrote:
>>> >>
>>> >> interesting in this topic as well, why the DirectFileOutputCommitter
>>> not included?
>>> >> we added it in our fork,
>>> under 
>>> core/src/main/scala/org/apache/spark/mapred/DirectOutputCommitter.scala
>>> >> moreover, this DirectFileOutputCommitter is not working for the
>>> insert operations in HiveContext, since the Committer is called by hive
>>> (means uses dependencies in hive package)
>>> >> we made some hack to fix this, you can take a look:
>>> >>
>>> https://github.com/apache/spark/compare/branch-1.6...zalando:branch-1.6-zalando
>>> >>
>>> >> may bring some ideas to other spark contributors to find a better way
>>> to use s3.
>>> >>
>>> >> 2016-02-22 23:18 GMT+01:00 igor.berman :
>>> >>>
>>> >>> Hi,
>>> >>> Wanted to understand if anybody uses DirectFileOutputCommitter or
>>> alikes
>>> >>> especially when working with s3?
>>> >>> I know that there is one impl in spark distro for parquet format,
>>> but not
>>> >>> for files -  why?
>>> >>>
>>> >>> Imho, it can bring huge performance boost.
>>> >>> Using default FileOutputCommiter with s3 has big overhead at commit
>>> stage
>>> >>> when all parts are copied one-by-one to destination dir from
>>> _temporary,
>>> >>> which is bottleneck when number of partitions is high.
>>> >>>
>>> >>> Also, wanted to know if there are some problems when using
>>> >>> DirectFileOutputCommitter?
>>> >>> If writing one partition directly will fail in the middle is spark
>>> will
>>> >>> notice this and will fail job(say after all retries)?
>>> >>>
>>> >>> thanks in advance
>>> >>>
>>> >>>
>>> >>>
>>> >>>
>>> >>> --
>>> >>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/DirectFileOutputCommiter-tp26296.html
>>> >>> Sent from the Apache Spark User List mailing list archive at
>>> Nabble.com.
>>> >>>
>>> >>> -
>>> >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> >>> For additional commands, e-mail: user-h...@spark.apache.org
>>> >>>
>>> >>
>>> >
>>> >
>>>
>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>


Spark Summit (San Francisco, June 6-8) call for presentation due in less than week

2016-02-24 Thread Reynold Xin
Just want to send a reminder in case people don't know about it. If you are
working on (or with, using) Spark, consider submitting your work to Spark
Summit, coming up in June in San Francisco.

https://spark-summit.org/2016/call-for-presentations/

Cheers.


[discuss] dropping Hadoop 2.2 and 2.3 support in Spark 2.0?

2016-01-13 Thread Reynold Xin
We've dropped Hadoop 1.x support in Spark 2.0.

There is also a proposal to drop Hadoop 2.2 and 2.3, i.e. the minimal
Hadoop version we support would be Hadoop 2.4. The main advantage is then
we'd be able to focus our Jenkins resources (and the associated maintenance
of Jenkins) to create builds for Hadoop 2.6/2.7. It is my understanding
that all Hadoop vendors have moved away from 2.2/2.3, but there might be
some users that are on these older versions.

What do you think about this idea?


Re: XML column not supported in Database

2016-01-11 Thread Reynold Xin
Can you file a JIRA ticket? Thanks.

The URL is issues.apache.org/jira/browse/SPARK

On Mon, Jan 11, 2016 at 1:44 AM, Gaini Rajeshwar <
raja.rajeshwar2...@gmail.com> wrote:

> Hi All,
>
> I am using PostgreSQL database. I am using the following jdbc call to
> access a customer table (*customer_id int, event text, country text,
> content xml)* in my database.
>
> *val dataframe1 = sqlContext.load("jdbc", Map("url" ->
> "jdbc:postgresql://localhost/customerlogs?user=postgres=postgres",
> "dbtable" -> "customer"))*
>
> When i run above command in spark-shell i receive the following error.
>
> *java.sql.SQLException: Unsupported type *
> * at
> org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.org$apache$spark$sql$execution$datasources$jdbc$JDBCRDD$$getCatalystType(JDBCRDD.scala:103)*
> * at
> org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anonfun$1.apply(JDBCRDD.scala:140)*
> * at
> org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anonfun$1.apply(JDBCRDD.scala:140)*
> * at scala.Option.getOrElse(Option.scala:120)*
> * at
> org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:139)*
> * at
> org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.(JDBCRelation.scala:91)*
> * at
> org.apache.spark.sql.execution.datasources.jdbc.DefaultSource.createRelation(DefaultSource.scala:60)*
> * at
> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)*
> * at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)*
> * at org.apache.spark.sql.SQLContext.load(SQLContext.scala:1153)*
> * at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:25)*
> * at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:30)*
> * at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:32)*
> * at $iwC$$iwC$$iwC$$iwC$$iwC.(:34)*
> * at $iwC$$iwC$$iwC$$iwC.(:36)*
> * at $iwC$$iwC$$iwC.(:38)*
> * at $iwC$$iwC.(:40)*
> * at $iwC.(:42)*
> * at (:44)*
> * at .(:48)*
> * at .()*
> * at .(:7)*
> * at .()*
> * at $print()*
> * at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)*
> * at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)*
> * at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
> * at java.lang.reflect.Method.invoke(Method.java:497)*
> * at
> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)*
> * at
> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)*
> * at
> org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)*
> * at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)*
> * at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)*
> * at
> org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)*
> * at
> org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)*
> * at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)*
> * at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)*
> * at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)*
> * at org.apache.spark.repl.SparkILoop.org
> $apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)*
> * at
> org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)*
> * at
> org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)*
> * at
> org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)*
> * at
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)*
> * at org.apache.spark.repl.SparkILoop.org
> $apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)*
> * at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)*
> * at org.apache.spark.repl.Main$.main(Main.scala:31)*
> * at org.apache.spark.repl.Main.main(Main.scala)*
> * at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)*
> * at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)*
> * at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
> * at java.lang.reflect.Method.invoke(Method.java:497)*
> * at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)*
> * at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)*
> * at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)*
> * at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)*
> * at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)*
>
> Is xml column type not supported yet in spark ? is there any way to fix
> this issue ?
>
> Thanks,
> Rajeshwar Gaini.
>


[discuss] dropping Python 2.6 support

2016-01-04 Thread Reynold Xin
Does anybody here care about us dropping support for Python 2.6 in Spark
2.0?

Python 2.6 is ancient, and is pretty slow in many aspects (e.g. json
parsing) when compared with Python 2.7. Some libraries that Spark depend on
stopped supporting 2.6. We can still convince the library maintainers to
support 2.6, but it will be extra work. I'm curious if anybody still uses
Python 2.6 to run Spark.

Thanks.


Re: Please add us to the Powered by Spark page

2015-11-24 Thread Reynold Xin
I just updated the page to say "email dev" instead of "email user".


On Tue, Nov 24, 2015 at 1:16 AM, Sean Owen  wrote:

> Not sure who generally handles that, but I just made the edit.
>
> On Mon, Nov 23, 2015 at 6:26 PM, Sujit Pal  wrote:
> > Sorry to be a nag, I realize folks with edit rights on the Powered by
> Spark
> > page are very busy people, but its been 10 days since my original
> request,
> > was wondering if maybe it just fell through the cracks. If I should
> submit
> > via some other channel that will make sure it is looked at (or better
> yet, a
> > self service option), please let me know and I will do so.
> >
> > Here is the information again.
> >
> > Organization Name: Elsevier Labs
> > URL: http://labs.elsevier.com
> > Spark components: Spark Core, Spark SQL, MLLib, GraphX.
> > Use Case: Building Machine Reading Pipeline, Knowledge Graphs, Content
> as a
> > Service, Content and Event Analytics, Content/Event based Predictive
> Models
> > and Big Data Processing. We use Scala and Python over Databricks
> Notebooks
> > for most of our work.
> >
> > Thanks very much,
> > Sujit
> >
> > On Fri, Nov 13, 2015 at 9:21 AM, Sujit Pal 
> wrote:
> >>
> >> Hello,
> >>
> >> We have been using Spark at Elsevier Labs for a while now. Would love to
> >> be added to the “Powered By Spark” page.
> >>
> >> Organization Name: Elsevier Labs
> >> URL: http://labs.elsevier.com
> >> Spark components: Spark Core, Spark SQL, MLLib, GraphX.
> >> Use Case: Building Machine Reading Pipeline, Knowledge Graphs, Content
> as
> >> a Service, Content and Event Analytics, Content/Event based Predictive
> >> Models and Big Data Processing. We use Scala and Python over Databricks
> >> Notebooks for most of our work.
> >>
> >> Thanks very much,
> >>
> >> Sujit Pal
> >> Technical Research Director
> >> Elsevier Labs
> >> sujit@elsevier.com
> >>
> >>
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: orc read issue n spark

2015-11-18 Thread Reynold Xin
What do you mean by starts delay scheduling? Are you saying it is no longer
doing local reads?

If that's the case you can increase the spark.locality.read timeout.

On Wednesday, November 18, 2015, Renu Yadav  wrote:

> Hi ,
> I am using spark 1.4.1 and saving orc file using
> df.write.format("orc").save("outputlocation")
>
> outputloation size 440GB
>
> and while reading df.read.format("orc").load("outputlocation").count
>
>
> it has 2618 partitions .
> the count operation runs fine uptil 2500 but starts delay scheduling after
> that which results in slow performance.
>
> *If anyone has any idea on this.Please do reply as I need this  very
> urgent*
>
> Thanks in advance
>
>
> Regards,
> Renu Yadav
>
>
>


Re: Hive on Spark Vs Spark SQL

2015-11-15 Thread Reynold Xin
It's a completely different path.


On Sun, Nov 15, 2015 at 10:37 PM, kiran lonikar  wrote:

> I would like to know if Hive on Spark uses or shares the execution code
> with Spark SQL or DataFrames?
>
> More specifically, does Hive on Spark benefit from the changes made to
> Spark SQL, project Tungsten? Or is it completely different execution path
> where it creates its own plan and executes on RDD?
>
> -Kiran
>
>


Re: Hive on Spark Vs Spark SQL

2015-11-15 Thread Reynold Xin
No it does not -- although it'd benefit from some of the work to make
shuffle more robust.


On Sun, Nov 15, 2015 at 10:45 PM, kiran lonikar <loni...@gmail.com> wrote:

> So does not benefit from Project Tungsten right?
>
>
> On Mon, Nov 16, 2015 at 12:07 PM, Reynold Xin <r...@databricks.com> wrote:
>
>> It's a completely different path.
>>
>>
>> On Sun, Nov 15, 2015 at 10:37 PM, kiran lonikar <loni...@gmail.com>
>> wrote:
>>
>>> I would like to know if Hive on Spark uses or shares the execution code
>>> with Spark SQL or DataFrames?
>>>
>>> More specifically, does Hive on Spark benefit from the changes made to
>>> Spark SQL, project Tungsten? Or is it completely different execution path
>>> where it creates its own plan and executes on RDD?
>>>
>>> -Kiran
>>>
>>>
>>
>


[ANNOUNCE] Announcing Spark 1.5.2

2015-11-10 Thread Reynold Xin
Hi All,

Spark 1.5.2 is a maintenance release containing stability fixes. This
release is based on the branch-1.5 maintenance branch of Spark. We
*strongly recommend* all 1.5.x users to upgrade to this release.

The full list of bug fixes is here: http://s.apache.org/spark-1.5.2

http://spark.apache.org/releases/spark-release-1-5-2.html


Re: Looking for the method executors uses to write to HDFS

2015-11-06 Thread Reynold Xin
Are you looking for this?

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala#L69


On Wed, Nov 4, 2015 at 5:11 AM, Tóth Zoltán  wrote:

> Hi,
>
> I'd like to write a parquet file from the driver. I could use the HDFS API
> but I am worried that it won't work on a secure cluster. I assume that the
> method the executors use to write to HDFS takes care of managing Hadoop
> security. However, I can't find the place where HDFS write happens in the
> spark source.
>
> Please help me:
> 1.How to write parquet from the driver using the Spark API?
> 2. If this wouldn't possible, where can I find the method executors use to
> write to HDFS?
>
> Thanks,
> Zoltan
>
>


Re: Codegen In Shuffle

2015-11-04 Thread Reynold Xin
GenerateUnsafeProjection -- projects any internal row data structure
directly into bytes (UnsafeRow).


On Wed, Nov 4, 2015 at 12:21 AM, 牛兆捷  wrote:

> Dear all:
>
> Tungsten project has mentioned that they are applying code generation is
> to speed up the conversion of data from in-memory binary format to
> wire-protocol for shuffle.
>
> Where can I find the related implementation in spark code-based ?
>
> --
> *Regards,*
> *Zhaojie*
>
>


Please reply if you use Mesos fine grained mode

2015-11-03 Thread Reynold Xin
If you are using Spark with Mesos fine grained mode, can you please respond
to this email explaining why you use it over the coarse grained mode?

Thanks.


Re: Please reply if you use Mesos fine grained mode

2015-11-03 Thread Reynold Xin
Soren,

If I understand how Mesos works correctly, even the fine grained mode keeps
the JVMs around?


On Tue, Nov 3, 2015 at 4:22 PM, Soren Macbeth <so...@yieldbot.com> wrote:

> we use fine-grained mode. coarse-grained mode keeps JVMs around which
> often leads to OOMs, which in turn kill the entire executor, causing entire
> stages to be retried. In fine-grained mode, only the task fails and
> subsequently gets retried without taking out an entire stage or worse.
>
> On Tue, Nov 3, 2015 at 3:54 PM, Reynold Xin <r...@databricks.com> wrote:
>
>> If you are using Spark with Mesos fine grained mode, can you please
>> respond to this email explaining why you use it over the coarse grained
>> mode?
>>
>> Thanks.
>>
>>
>


Re: If you use Spark 1.5 and disabled Tungsten mode ...

2015-11-01 Thread Reynold Xin
cache.LocalCache.getOrLoad(LocalCache.java:4004)
>>> at
>>> org.spark-project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.compile(CodeGenerator.scala:362)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$.create(GenerateOrdering.scala:139)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$.create(GenerateOrdering.scala:37)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:425)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:422)
>>> at
>>> org.apache.spark.sql.execution.SparkPlan.newOrdering(SparkPlan.scala:294)
>>> at org.apache.spark.sql.execution.TungstenSort.org
>>> $apache$spark$sql$execution$TungstenSort$$preparePartition$1(sort.scala:131)
>>> at
>>> org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:169)
>>> at
>>> org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:169)
>>> at
>>> org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:59)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>> at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>> at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:88)
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>>
>>> 2015-10-14 21:00 GMT+02:00 Reynold Xin <r...@databricks.com>:
>>>
>>>> Can you reply to this email and provide us with reasons why you disable
>>>> it?
>>>>
>>>> Thanks.
>>>>
>>>>
>>>
>


Re: [SQL] Memory leak with spark streaming and spark sql in spark 1.5.1

2015-10-14 Thread Reynold Xin
+dev list

On Wed, Oct 14, 2015 at 1:07 AM, Terry Hoo  wrote:

> All,
>
> Does anyone meet memory leak issue with spark streaming and spark sql in
> spark 1.5.1? I can see the memory is increasing all the time when running
> this simple sample:
>
> val sc = new SparkContext(conf)
> val sqlContext = new HiveContext(sc)
> import sqlContext.implicits._
> val ssc = new StreamingContext(sc, Seconds(1))
> val s1 = ssc.socketTextStream("localhost", ).map(x =>
> (x,1)).reduceByKey((x : Int, y : Int) => x + y)
> s1.print
> s1.foreachRDD(rdd => {
>   rdd.foreach(_ => Unit)
>   sqlContext.createDataFrame(rdd).registerTempTable("A")
>   sqlContext.sql("""select * from A""").show(1)
> })
>
> After dump the the java heap, I can see there is about 22K entries
> in SQLListener._stageIdToStageMetrics after 2 hour running (other maps in
> this SQLListener has about 1K entries), is this a leak in SQLListener?
>
> Thanks!
> Terry
>


If you use Spark 1.5 and disabled Tungsten mode ...

2015-10-14 Thread Reynold Xin
Can you reply to this email and provide us with reasons why you disable it?

Thanks.


[ANNOUNCE] Announcing Spark 1.5.1

2015-10-01 Thread Reynold Xin
Hi All,

Spark 1.5.1 is a maintenance release containing stability fixes. This
release is based on the branch-1.5 maintenance branch of Spark. We
*strongly recommend* all 1.5.0 users to upgrade to this release.

The full list of bug fixes is here: http://s.apache.org/spark-1.5.1

http://spark.apache.org/releases/spark-release-1-5-1.html


(note: it can take a few hours for everything to be propagated, so you
might get 404 on some download links, but everything should be in maven
central already)


Re: Null Value in DecimalType column of DataFrame

2015-09-21 Thread Reynold Xin
+dev list

Hi Dirceu,

The answer to whether throwing an exception is better or null is better
depends on your use case. If you are debugging and want to find bugs with
your program, you might prefer throwing an exception. However, if you are
running on a large real-world dataset (i.e. data is dirty) and your query
can take a while (e.g. 30 mins), you then might prefer the system to just
assign null values to the dirty data that could lead to runtime exceptions,
because otherwise you could be spending days just to clean your data.

Postgres throws exceptions here, but I think that's mainly because it is
used for OLTP, and in those cases queries are short-running. Most other
analytic databases I believe just return null. The best we can do is to
provide a config option to indicate behavior for exception handling.


On Fri, Sep 18, 2015 at 8:15 AM, Dirceu Semighini Filho <
dirceu.semigh...@gmail.com> wrote:

> Hi Yin, I got that part.
> I just think that instead of returning null, throwing an exception would
> be better. In the exception message we can explain that the DecimalType
> used can't fit the number that is been converted due to the precision and
> scale values used to create it.
> It would be easier for the user to find the reason why that error is
> happening, instead of receiving an NullPointerException in another part of
> his code. We can also make a better documentation of DecimalType classes to
> explain this behavior, what do you think?
>
>
>
>
> 2015-09-17 18:52 GMT-03:00 Yin Huai :
>
>> As I mentioned before, the range of values of DecimalType(10, 10) is [0,
>> 1). If you have a value 10.5 and you want to cast it to DecimalType(10,
>> 10), I do not think there is any better returned value except of null.
>> Looks like DecimalType(10, 10) is not the right type for your use case. You
>> need a decimal type that has precision - scale >= 2.
>>
>> On Tue, Sep 15, 2015 at 6:39 AM, Dirceu Semighini Filho <
>> dirceu.semigh...@gmail.com> wrote:
>>
>>>
>>> Hi Yin, posted here because I think it's a bug.
>>> So, it will return null and I can get a nullpointerexception, as I was
>>> getting. Is this really the expected behavior? Never seen something
>>> returning null in other Scala tools that I used.
>>>
>>> Regards,
>>>
>>>
>>> 2015-09-14 18:54 GMT-03:00 Yin Huai :
>>>
 btw, move it to user list.

 On Mon, Sep 14, 2015 at 2:54 PM, Yin Huai  wrote:

> A scale of 10 means that there are 10 digits at the right of the
> decimal point. If you also have precision 10, the range of your data will
> be [0, 1) and casting "10.5" to DecimalType(10, 10) will return null, 
> which
> is expected.
>
> On Mon, Sep 14, 2015 at 1:42 PM, Dirceu Semighini Filho <
> dirceu.semigh...@gmail.com> wrote:
>
>> Hi all,
>> I'm moving from spark 1.4 to 1.5, and one of my tests is failing.
>> It seems that there was some changes in org.apache.spark.sql.types.
>> DecimalType
>>
>> This ugly code is a little sample to reproduce the error, don't use
>> it into your project.
>>
>> test("spark test") {
>>   val file = 
>> context.sparkContext().textFile(s"${defaultFilePath}Test.csv").map(f => 
>> Row.fromSeq({
>> val values = f.split(",")
>> 
>> Seq(values.head.toString.toInt,values.tail.head.toString.toInt,BigDecimal(values.tail.tail.head),
>> values.tail.tail.tail.head)}))
>>
>>   val structType = StructType(Seq(StructField("id", IntegerType, false),
>> StructField("int2", IntegerType, false), StructField("double",
>>
>>  DecimalType(10,10), false),
>>
>>
>> StructField("str2", StringType, false)))
>>
>>   val df = context.sqlContext.createDataFrame(file,structType)
>>   df.first
>> }
>>
>> The content of the file is:
>>
>> 1,5,10.5,va
>> 2,1,0.1,vb
>> 3,8,10.0,vc
>>
>> The problem resides in DecimalType, before 1.5 the scala wasn't
>> required. Now when using  DecimalType(12,10) it works fine, but
>> using DecimalType(10,10) the Decimal values
>> 10.5 became null, and the 0.1 works.
>>
>> Is there anybody working with DecimalType for 1.5.1?
>>
>> Regards,
>> Dirceu
>>
>>
>

>>>
>>>
>>
>


Re: in joins, does one side stream?

2015-09-20 Thread Reynold Xin
We do - but I don't think it is feasible to duplicate every single
algorithm in DF and in RDD.

The only way for this to work is to make one underlying implementation work
for both. Right now DataFrame knows how to serialize individual elements
well and can manage memory that way -- the RDD API doesn't give us enough
information for that.

https://issues.apache.org/jira/browse/SPARK-




On Sun, Sep 20, 2015 at 11:48 AM, Koert Kuipers <ko...@tresata.com> wrote:

> sorry that was a typo. i meant to say:
>
> why do we have these features (broadcast join and sort-merge join) in
> DataFrame but not in RDD?
>
> they don't seem specific to structured data analysis to me.
>
> thanks! koert
>
> On Sun, Sep 20, 2015 at 2:46 PM, Koert Kuipers <ko...@tresata.com> wrote:
>
>> why dont we want these (broadcast join and sort-merge join) in DataFrame
>> but not in RDD?
>>
>> they dont seem specific to structured data analysis to me.
>>
>> On Sun, Sep 20, 2015 at 2:41 AM, Rishitesh Mishra <
>> rishi80.mis...@gmail.com> wrote:
>>
>>> Got it..thnx Reynold..
>>> On 20 Sep 2015 07:08, "Reynold Xin" <r...@databricks.com> wrote:
>>>
>>>> The RDDs themselves are not materialized, but the implementations can
>>>> materialize.
>>>>
>>>> E.g. in cogroup (which is used by RDD.join), it materializes all the
>>>> data during grouping.
>>>>
>>>> In SQL/DataFrame join, depending on the join:
>>>>
>>>> 1. For broadcast join, only the smaller side is materialized in memory
>>>> as a hash table.
>>>>
>>>> 2. For sort-merge join, both sides are sorted & streamed through --
>>>> however, one of the sides need to buffer all the rows having the same join
>>>> key in order to perform the join.
>>>>
>>>>
>>>>
>>>> On Sat, Sep 19, 2015 at 12:55 PM, Rishitesh Mishra <
>>>> rishi80.mis...@gmail.com> wrote:
>>>>
>>>>> Hi Reynold,
>>>>> Can you please elaborate on this. I thought RDD also opens only an
>>>>> iterator. Does it get materialized for joins?
>>>>>
>>>>> Rishi
>>>>>
>>>>> On Saturday, September 19, 2015, Reynold Xin <r...@databricks.com>
>>>>> wrote:
>>>>>
>>>>>> Yes for RDD -- both are materialized. No for DataFrame/SQL - one side
>>>>>> streams.
>>>>>>
>>>>>>
>>>>>> On Thu, Sep 17, 2015 at 11:21 AM, Koert Kuipers <ko...@tresata.com>
>>>>>> wrote:
>>>>>>
>>>>>>> in scalding we join with the smaller side on the left, since the
>>>>>>> smaller side will get buffered while the bigger side streams through the
>>>>>>> join.
>>>>>>>
>>>>>>> looking at CoGroupedRDD i do not get the impression such a
>>>>>>> distiction is made. it seems both sided are put into a map that can 
>>>>>>> spill
>>>>>>> to disk. is this correct?
>>>>>>>
>>>>>>> thanks
>>>>>>>
>>>>>>
>>>>>>
>>>>
>>
>


Re: in joins, does one side stream?

2015-09-19 Thread Reynold Xin
The RDDs themselves are not materialized, but the implementations can
materialize.

E.g. in cogroup (which is used by RDD.join), it materializes all the data
during grouping.

In SQL/DataFrame join, depending on the join:

1. For broadcast join, only the smaller side is materialized in memory as a
hash table.

2. For sort-merge join, both sides are sorted & streamed through --
however, one of the sides need to buffer all the rows having the same join
key in order to perform the join.



On Sat, Sep 19, 2015 at 12:55 PM, Rishitesh Mishra <rishi80.mis...@gmail.com
> wrote:

> Hi Reynold,
> Can you please elaborate on this. I thought RDD also opens only an
> iterator. Does it get materialized for joins?
>
> Rishi
>
> On Saturday, September 19, 2015, Reynold Xin <r...@databricks.com> wrote:
>
>> Yes for RDD -- both are materialized. No for DataFrame/SQL - one side
>> streams.
>>
>>
>> On Thu, Sep 17, 2015 at 11:21 AM, Koert Kuipers <ko...@tresata.com>
>> wrote:
>>
>>> in scalding we join with the smaller side on the left, since the smaller
>>> side will get buffered while the bigger side streams through the join.
>>>
>>> looking at CoGroupedRDD i do not get the impression such a distiction is
>>> made. it seems both sided are put into a map that can spill to disk. is
>>> this correct?
>>>
>>> thanks
>>>
>>
>>


Re: in joins, does one side stream?

2015-09-18 Thread Reynold Xin
Yes for RDD -- both are materialized. No for DataFrame/SQL - one side
streams.


On Thu, Sep 17, 2015 at 11:21 AM, Koert Kuipers  wrote:

> in scalding we join with the smaller side on the left, since the smaller
> side will get buffered while the bigger side streams through the join.
>
> looking at CoGroupedRDD i do not get the impression such a distiction is
> made. it seems both sided are put into a map that can spill to disk. is
> this correct?
>
> thanks
>


Re: How to avoid shuffle errors for a large join ?

2015-09-16 Thread Reynold Xin
Only SQL and DataFrame for now.

We are thinking about how to apply that to a more general distributed
collection based API, but it's not in 1.5.

On Sat, Sep 5, 2015 at 11:56 AM, Gurvinder Singh <gurvinder.si...@uninett.no
> wrote:

> On 09/05/2015 11:22 AM, Reynold Xin wrote:
> > Try increase the shuffle memory fraction (by default it is only 16%).
> > Again, if you run Spark 1.5, this will probably run a lot faster,
> > especially if you increase the shuffle memory fraction ...
> Hi Reynold,
>
> Does the 1.5 has better join/cogroup performance for RDD case too or
> only for SQL.
>
> - Gurvinder
> >
> > On Tue, Sep 1, 2015 at 8:13 AM, Thomas Dudziak <tom...@gmail.com
> > <mailto:tom...@gmail.com>> wrote:
> >
> > While it works with sort-merge-join, it takes about 12h to finish
> > (with 1 shuffle partitions). My hunch is that the reason for
> > that is this:
> >
> > INFO ExternalSorter: Thread 3733 spilling in-memory map of 174.9 MB
> > to disk (62 times so far)
> >
> > (and lots more where this comes from).
> >
> > On Sat, Aug 29, 2015 at 7:17 PM, Reynold Xin <r...@databricks.com
> > <mailto:r...@databricks.com>> wrote:
> >
> > Can you try 1.5? This should work much, much better in 1.5 out
> > of the box.
> >
> > For 1.4, I think you'd want to turn on sort-merge-join, which is
> > off by default. However, the sort-merge join in 1.4 can still
> > trigger a lot of garbage, making it slower. SMJ performance is
> > probably 5x - 1000x better in 1.5 for your case.
> >
> >
> > On Thu, Aug 27, 2015 at 6:03 PM, Thomas Dudziak
> > <tom...@gmail.com <mailto:tom...@gmail.com>> wrote:
> >
> > I'm getting errors like "Removing executor with no recent
> > heartbeats" & "Missing an output location for shuffle"
> > errors for a large SparkSql join (1bn rows/2.5TB joined with
> > 1bn rows/30GB) and I'm not sure how to configure the job to
> > avoid them.
> >
> > The initial stage completes fine with some 30k tasks on a
> > cluster with 70 machines/10TB memory, generating about 6.5TB
> > of shuffle writes, but then the shuffle stage first waits
> > 30min in the scheduling phase according to the UI, and then
> > dies with the mentioned errors.
> >
> > I can see in the GC logs that the executors reach their
> > memory limits (32g per executor, 2 workers per machine) and
> > can't allocate any more stuff in the heap. Fwiw, the top 10
> > in the memory use histogram are:
> >
> > num #instances #bytes  class name
> > --
> >1: 24913959511958700560
> >  scala.collection.immutable.HashMap$HashMap1
> >2: 251085327 8034730464 
> >  scala.Tuple2
> >3: 243694737 5848673688  java.lang.Float
> >4: 231198778 5548770672  java.lang.Integer
> >5:  72191585 4298521576
> >  [Lscala.collection.immutable.HashMap;
> >6:  72191582 2310130624
> >  scala.collection.immutable.HashMap$HashTrieMap
> >7:  74114058 1778737392  java.lang.Long
> >8:   6059103  779203840  [Ljava.lang.Object;
> >9:   5461096  174755072
> >  scala.collection.mutable.ArrayBuffer
> >   10: 34749   70122104  [B
> >
> > Relevant settings are (Spark 1.4.1, Java 8 with G1 GC):
> >
> > spark.core.connection.ack.wait.timeout 600
> > spark.executor.heartbeatInterval   60s
> > spark.executor.memory  32g
> > spark.mesos.coarse false
> > spark.network.timeout  600s
> > spark.shuffle.blockTransferService netty
> > spark.shuffle.consolidateFiles true
> > spark.shuffle.file.buffer  1m
> > spark.shuffle.io.maxRetries6
> > spark.shuffle.manager  sort
> >
> > The join is currently configured with
> > spark.sql.shuffle.partitions=1000 but that doesn't seem to
> > help. Would increasing the partitions help ? Is there a
> > formula to determine an approximate partitions number value
> > for a join ?
> > Any help with this job would be appreciated !
> >
> > cheers,
> > Tom
> >
> >
> >
> >
>
>


Re: Perf impact of BlockManager byte[] copies

2015-09-10 Thread Reynold Xin
This is one problem I'd like to address soon - providing a binary block
management interface for shuffle (and maybe other things) that avoids
serialization/copying.


On Fri, Feb 27, 2015 at 3:39 PM, Paul Wais  wrote:

> Dear List,
>
> I'm investigating some problems related to native code integration
> with Spark, and while picking through BlockManager I noticed that data
> (de)serialization currently issues lots of array copies.
> Specifically:
>
> - Deserialization: BlockManager marshals all deserialized bytes
> through a spark.util. ByteBufferInputStream, which necessitates
> copying data into an intermediate temporary byte[] .  The temporary
> byte[] might be reused between deserialization of T instances, but
> nevertheless the bytes must be copied (and likely in a Java loop).
>
> - Serialization: BlockManager buffers all serialized bytes into a
> java.io.ByteArrayOutputStream, which maintains an internal byte[]
> buffer and grows/re-copies the buffer like a vector as the buffer
> fills.  BlockManager then retrieves the internal byte[] buffer, wraps
> it in a ByteBuffer, and sends it off to be stored (e.g. in
> MemoryStore, DiskStore, Tachyon, etc).
>
> When an individual T is somewhat large (e.g. a feature vector, an
> image, etc), or blocks are megabytes in size, these copies become
> expensive (especially for large written blocks), right?  Does anybody
> have any measurements of /how/ expensive they are?  If not, is there
> serialization benchmark code (e.g. for KryoSerializer ) that might be
> helpful here?
>
>
> As part of my investigation, I've found that one might be able to
> sidestep these issues by extending Spark's SerializerInstance API to
> offer I/O on ByteBuffers (in addition to {Input,Output}Streams).  An
> extension including a ByteBuffer API would furthermore have many
> benefits for native code.  A major downside of this API addition is
> that it wouldn't interoperate (nontrivially) with compression, so
> shuffles wouldn't benefit.  Nevertheless, BlockManager could probably
> deduce when use of this ByteBuffer API is possible and leverage it.
>
> Cheers,
> -Paul
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


  1   2   >