Re: Databricks Spark Parallelism and Shuffle Partitions

2021-02-04 Thread Subhash Sriram
Hi Erica,

On your cluster details, you can click on "Advanced", and then set those
parameters in the "Spark" tab. Hope that helps.

Thanks,
Subhash

On Thu, Feb 4, 2021 at 5:27 PM Erica Lin 
wrote:

> Hello!
>
> Is there a way to set spark.sql.shuffle.partitions
> and spark.default.parallelism in Databricks? I checked the event log and
> can't find those parameters in the log either. Is it something that
> Databricks sets automatically?
>
> Thanks,
> Erica
>


Re: Run SQL on files directly

2018-12-08 Thread Subhash Sriram
Hi David,

I’m not sure if that is possible, but why not just read the CSV file using the 
Scala API, specifying those options, and then query it using SQL by creating a 
temp view?

Thanks,
Subhash 

Sent from my iPhone

> On Dec 8, 2018, at 12:39 PM, David Markovitz 
>  wrote:
> 
> Hi
> Spark SQL supports direct querying on files (here), e.g. –
>  
> select * from csv.`/my/path/myfile.csv`
> 
>  
> Does anybody know if it possible to pass options (sep, header, encoding etc.) 
> with this syntax?
>  
> Thanks
>  
>  
> Best regards,
>  
> David (דודו) Markovitz
> Technology Solutions Professional, Data Platform
> Microsoft Israel
>  
> Mobile: +972-525-834-304
> Office: +972-747-119-274
>  
> 
>  


JdbcRDD - schema always resolved as nullable=true

2018-08-15 Thread Subhash Sriram
Hi Spark Users,

We do a lot of processing in Spark using data that is in MS SQL server.
Today, I created a DataFrame against a table in SQL Server using the
following:

val dfSql=spark.read.jdbc(connectionString, table, props)

I noticed that every column in the DataFrame showed as *nullable=true, *even
though many of them are required.

I went hunting in the code, and I found that in JDBCRDD, when it resolves
the schema of a table, it passes in *alwaysNullable=true* to JdbcUtils,
which forces all columns to resolve as nullable.

https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala#L62

I don't see a way to change that functionality. Is this by design, or could
it be a bug?

Thanks!
Subhash


Re: Spark DF to Hive table with both Partition and Bucketing not working

2018-06-19 Thread Subhash Sriram
Hi Umar,

Could it be that spark.sql.sources.bucketing.enabled is not set to true? 

Thanks,
Subhash

Sent from my iPhone

> On Jun 19, 2018, at 11:41 PM, umargeek  wrote:
> 
> Hi Folks,
> 
> I am trying to save a spark data frame after reading from ORC file and add
> two new columns and finally trying to save it to hive table with both
> partition and bucketing feature.
> 
> Using Spark 2.3 (as both partition and bucketing feature are available in
> this version).
> 
> Looking for advise.
> 
> Code Snippet:
> 
> df_orc_data =
> spark.read.format("orc").option("delimiter","|").option("header",
> "true").option("inferschema", "true").load(filtered_path)
> df_fil_ts_data = df_orc_data.withColumn("START_TS",
> lit(process_time).cast("timestamp"))
> daily = (datetime.datetime.utcnow().strftime('%Y-%m-%d'))
> df_filtered_data =
> df_fil_ts_data.withColumn("DAYPART",lit(daily).cast("string")
> hour = (datetime.datetime.utcnow().strftime('%H'))
> df_filtered = df_filtered_data.withColumn("HRS",lit(hour).cast("string"))
> (df_filtered.write.partitionBy("DAYPART").bucketBy(24,"HRS").sortBy("HRS").mode("append").orc('/user/umar/netflow_filtered').saveAsTable("default.DDOS_NETFLOW_FILTERED"))
> 
> Error:
> "'save' does not support bucketing right now;"
> 
> 
> 
> Thanks,
> Umar
> 
> 
> 
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: how can I run spark job in my environment which is a single Ubuntu host with no hadoop installed

2018-06-17 Thread Subhash Sriram
Hi Raymond,

If you set your master to local[*] instead of yarn-client, it should run on 
your local machine.

Thanks,
Subhash 

Sent from my iPhone

> On Jun 17, 2018, at 2:32 PM, Raymond Xie  wrote:
> 
> Hello,
> 
> I am wondering how can I run spark job in my environment which is a single 
> Ubuntu host with no hadoop installed? if I run my job like below, I will end 
> up with infinite loop at the end. Thank you very much.
> 
> rxie@ubuntu:~/data$ spark-submit --class retail_db.GetRevenuePerOrder --conf 
> spark.ui.port=12678 spark2practice_2.11-0.1.jar yarn-client 
> /public/retail_db/order_items /home/rxie/output/revenueperorder
> 2018-06-17 11:19:36 WARN  Utils:66 - Your hostname, ubuntu resolves to a 
> loopback address: 127.0.1.1; using 192.168.112.141 instead (on interface 
> ens33)
> 2018-06-17 11:19:36 WARN  Utils:66 - Set SPARK_LOCAL_IP if you need to bind 
> to another address
> 2018-06-17 11:19:37 WARN  NativeCodeLoader:62 - Unable to load native-hadoop 
> library for your platform... using builtin-java classes where applicable
> 2018-06-17 11:19:38 INFO  SparkContext:54 - Running Spark version 2.3.1
> 2018-06-17 11:19:38 WARN  SparkConf:66 - spark.master yarn-client is 
> deprecated in Spark 2.0+, please instead use "yarn" with specified deploy 
> mode.
> 2018-06-17 11:19:38 INFO  SparkContext:54 - Submitted application: Get 
> Revenue Per Order
> 2018-06-17 11:19:38 INFO  SecurityManager:54 - Changing view acls to: rxie
> 2018-06-17 11:19:38 INFO  SecurityManager:54 - Changing modify acls to: rxie
> 2018-06-17 11:19:38 INFO  SecurityManager:54 - Changing view acls groups to:
> 2018-06-17 11:19:38 INFO  SecurityManager:54 - Changing modify acls groups to:
> 2018-06-17 11:19:38 INFO  SecurityManager:54 - SecurityManager: 
> authentication disabled; ui acls disabled; users  with view permissions: 
> Set(rxie); groups with view permissions: Set(); users  with modify 
> permissions: Set(rxie); groups with modify permissions: Set()
> 2018-06-17 11:19:39 INFO  Utils:54 - Successfully started service 
> 'sparkDriver' on port 44709.
> 2018-06-17 11:19:39 INFO  SparkEnv:54 - Registering MapOutputTracker
> 2018-06-17 11:19:39 INFO  SparkEnv:54 - Registering BlockManagerMaster
> 2018-06-17 11:19:39 INFO  BlockManagerMasterEndpoint:54 - Using 
> org.apache.spark.storage.DefaultTopologyMapper for getting topology 
> information
> 2018-06-17 11:19:39 INFO  BlockManagerMasterEndpoint:54 - 
> BlockManagerMasterEndpoint up
> 2018-06-17 11:19:39 INFO  DiskBlockManager:54 - Created local directory at 
> /tmp/blockmgr-69a8a12d-0881-4454-96ab-6a45d5c58bfe
> 2018-06-17 11:19:39 INFO  MemoryStore:54 - MemoryStore started with capacity 
> 413.9 MB
> 2018-06-17 11:19:39 INFO  SparkEnv:54 - Registering OutputCommitCoordinator
> 2018-06-17 11:19:40 INFO  log:192 - Logging initialized @7035ms
> 2018-06-17 11:19:40 INFO  Server:346 - jetty-9.3.z-SNAPSHOT
> 2018-06-17 11:19:40 INFO  Server:414 - Started @7383ms
> 2018-06-17 11:19:40 INFO  AbstractConnector:278 - Started 
> ServerConnector@51ad75c2{HTTP/1.1,[http/1.1]}{0.0.0.0:12678}
> 2018-06-17 11:19:40 INFO  Utils:54 - Successfully started service 'SparkUI' 
> on port 12678.
> 2018-06-17 11:19:40 INFO  ContextHandler:781 - Started 
> o.s.j.s.ServletContextHandler@50b8ae8d{/jobs,null,AVAILABLE,@Spark}
> 2018-06-17 11:19:40 INFO  ContextHandler:781 - Started 
> o.s.j.s.ServletContextHandler@60afd40d{/jobs/json,null,AVAILABLE,@Spark}
> 2018-06-17 11:19:40 INFO  ContextHandler:781 - Started 
> o.s.j.s.ServletContextHandler@28a2a3e7{/jobs/job,null,AVAILABLE,@Spark}
> 2018-06-17 11:19:40 INFO  ContextHandler:781 - Started 
> o.s.j.s.ServletContextHandler@10b3df93{/jobs/job/json,null,AVAILABLE,@Spark}
> 2018-06-17 11:19:40 INFO  ContextHandler:781 - Started 
> o.s.j.s.ServletContextHandler@ea27e34{/stages,null,AVAILABLE,@Spark}
> 2018-06-17 11:19:40 INFO  ContextHandler:781 - Started 
> o.s.j.s.ServletContextHandler@33a2499c{/stages/json,null,AVAILABLE,@Spark}
> 2018-06-17 11:19:40 INFO  ContextHandler:781 - Started 
> o.s.j.s.ServletContextHandler@e72dba7{/stages/stage,null,AVAILABLE,@Spark}
> 2018-06-17 11:19:40 INFO  ContextHandler:781 - Started 
> o.s.j.s.ServletContextHandler@3c321bdb{/stages/stage/json,null,AVAILABLE,@Spark}
> 2018-06-17 11:19:40 INFO  ContextHandler:781 - Started 
> o.s.j.s.ServletContextHandler@24855019{/stages/pool,null,AVAILABLE,@Spark}
> 2018-06-17 11:19:40 INFO  ContextHandler:781 - Started 
> o.s.j.s.ServletContextHandler@3abd581e{/stages/pool/json,null,AVAILABLE,@Spark}
> 2018-06-17 11:19:40 INFO  ContextHandler:781 - Started 
> o.s.j.s.ServletContextHandler@4d4d8fcf{/storage,null,AVAILABLE,@Spark}
> 2018-06-17 11:19:40 INFO  ContextHandler:781 - Started 
> o.s.j.s.ServletContextHandler@610db97e{/storage/json,null,AVAILABLE,@Spark}
> 2018-06-17 11:19:40 INFO  ContextHandler:781 - Started 
> o.s.j.s.ServletContextHandler@6f0628de{/storage/rdd,null,AVAILABLE,@Spark}
> 2018-06-17 11:19:40 INFO  ContextHandler:781 - Started 
> o.s.j.s.ServletContex

Re: Spark & S3 - Introducing random values into key names

2018-03-08 Thread Subhash Sriram
Thanks, Vadim! That helps and makes sense. I don't think we have a number of 
keys so large that we have to worry about it. If we do, I think I would go with 
an approach similar to what you suggested.

Thanks again,
Subhash 

Sent from my iPhone

> On Mar 8, 2018, at 11:56 AM, Vadim Semenov  wrote:
> 
> You need to put randomness into the beginning of the key, if you put it other 
> than into the beginning, it's not guaranteed that you're going to have good 
> performance.
> 
> The way we achieved this is by writing to HDFS first, and then having a 
> custom DistCp implemented using Spark that copies parquet files using random 
> keys,
> and then saves the list of resulting keys to S3, and when we want to use 
> those parquet files, we just need to load the listing file, and then take 
> keys from it and pass them into the loader.
> 
> You only need to do this when you have way too many files, if the number of 
> keys you operate is reasonably small (let's say, in thousands), you won't get 
> any benefits.
> 
> Also the S3 buckets have internal optimizations, and overtime it adjusts to 
> the workload, i.e. some additional underlying partitions are getting added, 
> some splits happen, etc.
> If you want to have good performance from start, you would need to use 
> randomization, yes.
> Or alternatively, you can contact AWS and tell them about the naming schema 
> that you're going to have (but it must be set in stone), and then they can 
> try to pre-optimize the bucket for you.
> 
>> On Thu, Mar 8, 2018 at 11:42 AM, Subhash Sriram  
>> wrote:
>> Hey Spark user community,
>> 
>> I am writing Parquet files from Spark to S3 using S3a. I was reading this 
>> article about improving S3 bucket performance, specifically about how it can 
>> help to introduce randomness to your key names so that data is written to 
>> different partitions.
>> 
>> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>> 
>> Is there a straight forward way to accomplish this randomness in Spark via 
>> the DataSet API? The only thing that I could think of would be to actually 
>> split the large set into multiple sets (based on row boundaries), and then 
>> write each one with the random key name.
>> 
>> Is there an easier way that I am missing?
>> 
>> Thanks in advance!
>> Subhash
>> 
>> 
> 


Spark & S3 - Introducing random values into key names

2018-03-08 Thread Subhash Sriram
Hey Spark user community,

I am writing Parquet files from Spark to S3 using S3a. I was reading this
article about improving S3 bucket performance, specifically about how it
can help to introduce randomness to your key names so that data is written
to different partitions.

https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/

Is there a straight forward way to accomplish this randomness in Spark via
the DataSet API? The only thing that I could think of would be to actually
split the large set into multiple sets (based on row boundaries), and then
write each one with the random key name.

Is there an easier way that I am missing?

Thanks in advance!
Subhash


Spark JDBC bulk insert

2018-02-01 Thread Subhash Sriram
Hey everyone,

I have a use case where I will be processing data in Spark and then writing
it back to MS SQL Server.

Is it possible to use bulk insert functionality and/or batch the writes
back to SQL?

I am using the DataFrame API to write the rows:

sqlContext.write.jdbc(...)

Thanks in advance for any ideas or assistance!
Subhash


Re: is there a way to create new column with timeuuid using raw spark sql ?

2018-02-01 Thread Subhash Sriram
If you have the temp view name (table, for example), couldn't you do
something like this?

val dfWithColumn=spark.sql("select *,  as new_column from
table")

Thanks,
Subhash

On Thu, Feb 1, 2018 at 11:18 AM, kant kodali  wrote:

> Hi,
>
> Are you talking about df.withColumn() ? If so, thats not what I meant. I
> meant creating a new column using raw sql. otherwords say I dont have a
> dataframe I only have the view name from df.createOrReplaceView("table")
> so I can do things like "select * from table" so in a similar fashion I
> want to see how I can create a new Column using the raw sql. I am looking
> at this reference https://docs.databricks.com/spark/latest/
> spark-sql/index.html and I am not seeing a way.
>
> Thanks!
>
> On Thu, Feb 1, 2018 at 4:01 AM, Jean Georges Perrin  wrote:
>
>> Sure, use withColumn()...
>>
>> jg
>>
>>
>> > On Feb 1, 2018, at 05:50, kant kodali  wrote:
>> >
>> > Hi All,
>> >
>> > Is there any way to create a new timeuuid column of a existing
>> dataframe using raw sql? you can assume that there is a timeuuid udf
>> function if that helps.
>> >
>> > Thanks!
>>
>>
>


Re: Writing data in HDFS high available cluster

2018-01-18 Thread Subhash Sriram
Hi Soheil,

We have a high availability cluster as well, but I never have to specify the 
active master when writing, only the cluster name. It works regardless of which 
node is the active master.

Hope that helps.

Thanks,
Subhash 

Sent from my iPhone

> On Jan 18, 2018, at 5:49 AM, Soheil Pourbafrani  wrote:
> 
> I have a HDFS high available cluster with two namenode, one as active 
> namenode and one as standby namenode. When I want to write data to HDFS I use 
> the active namenode address. Now, my question is what happened if during 
> spark writing data active namenode fails. Is there any way to set both active 
> namenode and standby namenode in spark for writing data?

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Why do I see five attempts on my Spark application

2017-12-13 Thread Subhash Sriram
There are some more properties specifically for YARN here:

http://spark.apache.org/docs/latest/running-on-yarn.html

Thanks,
Subhash

On Wed, Dec 13, 2017 at 2:32 PM, Subhash Sriram 
wrote:

> http://spark.apache.org/docs/latest/configuration.html
>
> On Wed, Dec 13, 2017 at 2:31 PM, Toy  wrote:
>
>> Hi,
>>
>> Can you point me to the config for that please?
>>
>> On Wed, 13 Dec 2017 at 14:23 Marcelo Vanzin  wrote:
>>
>>> On Wed, Dec 13, 2017 at 11:21 AM, Toy  wrote:
>>> > I'm wondering why am I seeing 5 attempts for my Spark application?
>>> Does Spark application restart itself?
>>>
>>> It restarts itself if it fails (up to a limit that can be configured
>>> either per Spark application or globally in YARN).
>>>
>>>
>>> --
>>> Marcelo
>>>
>>
>


Re: Why do I see five attempts on my Spark application

2017-12-13 Thread Subhash Sriram
http://spark.apache.org/docs/latest/configuration.html

On Wed, Dec 13, 2017 at 2:31 PM, Toy  wrote:

> Hi,
>
> Can you point me to the config for that please?
>
> On Wed, 13 Dec 2017 at 14:23 Marcelo Vanzin  wrote:
>
>> On Wed, Dec 13, 2017 at 11:21 AM, Toy  wrote:
>> > I'm wondering why am I seeing 5 attempts for my Spark application? Does
>> Spark application restart itself?
>>
>> It restarts itself if it fails (up to a limit that can be configured
>> either per Spark application or globally in YARN).
>>
>>
>> --
>> Marcelo
>>
>


Re: Json to csv

2017-12-12 Thread Subhash Sriram
I was curious about this too, and found this. You may find it helpful:

http://www.tegdesign.com/converting-a-nested-json-document-to-csv-using-scala-hadoop-and-apache-spark/

Thanks,
Subhash 

Sent from my iPhone

> On Dec 12, 2017, at 1:44 AM, Prabha K  wrote:
> 
> Any help on converting json to csv or flattering the json file. Json file has 
> one struts and multiple arrays.
> Thanks 
> Pk 
> 
> Sent from my iPhone
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 


Re: Structured Stream in Spark

2017-10-25 Thread Subhash Sriram
No problem! Take a look at this:

http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing

Thanks,
Subhash

On Wed, Oct 25, 2017 at 4:08 PM, KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> Hi Sriram,
>
> Thanks. This is what I was looking for.
>
> one question, where do we need to specify the checkpoint directory in case
> of structured streaming?
>
> Thanks,
> Asmath
>
> On Wed, Oct 25, 2017 at 2:52 PM, Subhash Sriram 
> wrote:
>
>> Hi Asmath,
>>
>> Here is an example of using structured streaming to read from Kafka:
>>
>> https://github.com/apache/spark/blob/master/examples/src/
>> main/scala/org/apache/spark/examples/sql/streaming/Structu
>> redKafkaWordCount.scala
>>
>> In terms of parsing the JSON, there is a from_json function that you can
>> use. The following might help:
>>
>> https://databricks.com/blog/2017/02/23/working-complex-data-
>> formats-structured-streaming-apache-spark-2-1.html
>>
>> I hope this helps.
>>
>> Thanks,
>> Subhash
>>
>> On Wed, Oct 25, 2017 at 2:59 PM, KhajaAsmath Mohammed <
>> mdkhajaasm...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Could anyone provide suggestions on how to parse json data from kafka
>>> and load it back in hive.
>>>
>>> I have read about structured streaming but didn't find any examples. is
>>> there any best practise on how to read it and parse it with structured
>>> streaming for this use case?
>>>
>>> Thanks,
>>> Asmath
>>>
>>
>>
>


Re: Structured Stream in Spark

2017-10-25 Thread Subhash Sriram
Hi Asmath,

Here is an example of using structured streaming to read from Kafka:

https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredKafkaWordCount.scala

In terms of parsing the JSON, there is a from_json function that you can
use. The following might help:

https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html

I hope this helps.

Thanks,
Subhash

On Wed, Oct 25, 2017 at 2:59 PM, KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> Hi,
>
> Could anyone provide suggestions on how to parse json data from kafka and
> load it back in hive.
>
> I have read about structured streaming but didn't find any examples. is
> there any best practise on how to read it and parse it with structured
> streaming for this use case?
>
> Thanks,
> Asmath
>


Re: Create dataframe from RDBMS table using JDBC

2017-04-26 Thread Subhash Sriram
Hi Devender,

I have always gone with the 2nd approach, only so I don't have to chain a bunch 
of "option()." calls together. You should be able to use either.

Thanks,
Subhash

Sent from my iPhone

> On Apr 26, 2017, at 3:26 AM, Devender Yadav  
> wrote:
> 
> Hi All,
> 
> 
> I am using Spak 1.6.2
> 
> 
> Which is suitable way to create dataframe from RDBMS table. 
> 
> DataFrame df = 
> sqlContext.read().format("jdbc").options(options).load();
> 
> or 
> 
> DataFrame df = sqlContext.read().jdbc(url, table, properties);
> 
> 
> Regards,
> Devender
> 
> 
> 
> 
> 
> 
> 
> NOTE: This message may contain information that is confidential, proprietary, 
> privileged or otherwise protected by law. The message is intended solely for 
> the named addressee. If received in error, please destroy and notify the 
> sender. Any use of this email is prohibited when received in error. Impetus 
> does not represent, warrant and/or guarantee, that the integrity of this 
> communication has been maintained nor that the communication is free of 
> errors, virus, interception or interference.


Re: Concurrent DataFrame.saveAsTable into non-existant tables fails the second job despite Mode.APPEND

2017-04-20 Thread Subhash Sriram
Would it be an option to just write the results of each job into separate 
tables and then run a UNION on all of them at the end into a final target 
table? Just thinking of an alternative!

Thanks,
Subhash

Sent from my iPhone

> On Apr 20, 2017, at 3:48 AM, Rick Moritz  wrote:
> 
> Hi List,
> 
> I'm wondering if the following behaviour should be considered a bug, or 
> whether it "works as designed":
> 
> I'm starting multiple concurrent (FIFO-scheduled) jobs in a single 
> SparkContext, some of which write into the same tables.
> When these tables already exist, it appears as though both jobs [at least 
> believe that they] successfully appended to the table (i.e., both jobs 
> terminate succesfully, but I haven't checked whether the data from both jobs 
> was actually written, or if one job overwrote the other's data, despite 
> Mode.APPEND). If the table does not exist, both jobs will attempt to create 
> the table, but whichever job's turn is second (or  later) will then fail with 
> a AlreadyExistsException (org.apache.spark.sql.AnalysisException: 
> org.apache.hadoop.hive.ql.metadata.HiveException: AlreadyExistsException).
> 
> I think the issue here is, that both jobs don't register the table with the 
> metastore, until they actually start writing to it, but determine early on 
> that they will need to create it. The slower job then oobviously fails 
> creating the table, and instead of falling back to appending the data to the 
> existing table crashes out.
> 
> I would consider this a bit of a bug, but I'd like to make sure that it isn't 
> merely a case of me doing something stupid elsewhere, or indeed simply an 
> inherent architectural limitation of working with the metastore, before going 
> to Jira with this.
> 
> Also, I'm aware that running the jobs strictly sequentially would work around 
> the issue, but that would require reordering jobs before sending them off to 
> Spark, or kill efficiency.
> 
> Thanks for any feedback,
> 
> Rick

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Assigning a unique row ID

2017-04-07 Thread Subhash Sriram
Hi,

We use monotonically_increasing_id() as well, but just cache the table first 
like Ankur suggested. With that method, we get the same keys in all derived 
tables. 

Thanks,
Subhash

Sent from my iPhone

> On Apr 7, 2017, at 7:32 PM, Everett Anderson  wrote:
> 
> Hi,
> 
> Thanks, but that's using a random UUID. Certainly unlikely to have 
> collisions, but not guaranteed.
> 
> I'd rather prefer something like monotonically_increasing_id or RDD's 
> zipWithUniqueId but with better behavioral characteristics -- so they don't 
> surprise people when 2+ outputs derived from an original table end up not 
> having the same IDs for the same rows, anymore.
> 
> It seems like this would be possible under the covers, but would have the 
> performance penalty of needing to do perhaps a count() and then also a 
> checkpoint.
> 
> I was hoping there's a better way.
> 
> 
>> On Fri, Apr 7, 2017 at 4:24 PM, Tim Smith  wrote:
>> http://stackoverflow.com/questions/37231616/add-a-new-column-to-a-dataframe-new-column-i-want-it-to-be-a-uuid-generator
>> 
>> 
>>> On Fri, Apr 7, 2017 at 3:56 PM, Everett Anderson  
>>> wrote:
>>> Hi,
>>> 
>>> What's the best way to assign a truly unique row ID (rather than a hash) to 
>>> a DataFrame/Dataset?
>>> 
>>> I originally thought that functions.monotonically_increasing_id would do 
>>> this, but it seems to have a rather unfortunate property that if you add it 
>>> as a column to table A and then derive tables X, Y, Z and save those, the 
>>> row ID values in X, Y, and Z may end up different. I assume this is because 
>>> it delays the actual computation to the point where each of those tables is 
>>> computed.
>>> 
>> 
>> 
>> 
>> -- 
>> 
>> --
>> Thanks,
>> 
>> Tim
> 


Re: spark-sql use case beginner question

2017-03-09 Thread Subhash Sriram
We have a similar use case. We use the DataFrame API to cache data out of
Hive tables, and then run pretty complex scripts on them. You can register
your Hive UDFs to be used within Spark SQL statements if you want.

Something like this:

sqlContext.sql("CREATE TEMPORARY FUNCTION  as ''")

If you had a table called Prices in the Stocks Hive db, you could do this:

val pricesDf = sqlContext.table("Stocks.Prices")
pricesDf.createOrReplaceTempView("tmp_prices")

Then, you can run whatever SQL you really want on the pricesDf.

sqlContext.sql("select udf_name(), . from tmp_prices")

There are a lot of SQL functions available:

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$

I hope that helps.

Thanks,
Subhash

On Thu, Mar 9, 2017 at 2:28 AM, nancy henry 
wrote:

> okay what is difference between keep set hive.execution.engine =spark
> and
> running the script through hivecontext.sql
>
> Show quoted text
>
>
> On Mar 9, 2017 8:52 AM, "ayan guha"  wrote:
>
>> Hi
>>
>> Subject to your version of Hive & Spark, you may want to set
>> hive.execution.engine=spark as beeline command line parameter, assuming you
>> are running hive scripts using beeline command line (which is suggested
>> practice for security purposes).
>>
>>
>>
>> On Thu, Mar 9, 2017 at 2:09 PM, nancy henry 
>> wrote:
>>
>>>
>>> Hi Team,
>>>
>>> basically we have all data as hive tables ..and processing it till now
>>> in hive on MR.. now that we have hivecontext which can run hivequeries on
>>> spark, we are making all these complex hive scripts to run using
>>> hivecontext.sql(sc.textfile(hivescript)) kind of approach ie basically
>>> running hive queries on spark and not coding anything yet in scala still we
>>> see just making hive queries to run on spark is showing a lot difference in
>>> time than run on MR..
>>>
>>> so as we already have hivescripts lets make those complex hivescript run
>>> using hc.sql as hc.sql is able to do it
>>>
>>> or is this not best practice even though spark can do it its still
>>> better to load all those individual hive tables in spark and make rdds and
>>> write scala code to get the same functionality happening in hive
>>>
>>> its becoming difficult for us to choose whether to leave it to hc.sql to
>>> do the work of running complex scripts also or we have to code in
>>> scala..will it be worth the effort of manual intervention in terms of
>>> performance
>>>
>>> ex of our sample scripts
>>> use db;
>>> create tempfunction1 as com.fgh.jkl.TestFunction;
>>>
>>> create destable in hive;
>>> insert overwrite desttable select (big complext transformations and
>>> usage of hive udf)
>>> from table1,table2,table3 join table4 on some condition complex and join
>>> table 7 on another complex condition where complex filtering
>>>
>>> So please help what would be best approach and why i should not give
>>> entire script for hivecontext to make its own rdds and run on spark if we
>>> are able to do it
>>>
>>> coz all examples i see online are only showing hc.sql("select * from
>>> table1) and nothing complex than that
>>>
>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>


Re: Spark JDBC reads

2017-03-07 Thread Subhash Sriram
Could you create a view of the table on your JDBC data source and just query 
that from Spark?

Thanks,
Subhash 

Sent from my iPhone

> On Mar 7, 2017, at 6:37 AM, El-Hassan Wanas  wrote:
> 
> As an example, this is basically what I'm doing:
> 
>  val myDF = originalDataFrame.select(col(columnName).when(col(columnName) 
> === "foobar", 0).when(col(columnName) === "foobarbaz", 1))
> 
> Except there's much more columns and much more conditionals. The generated 
> Spark workflow starts with an SQL that basically does:
> 
>SELECT columnName, columnName2, etc. from table;
> 
> Then the conditionals/transformations are evaluated on the cluster.
> 
> Is there a way from the DataSet API to force the computation to happen on the 
> SQL data source in this case? Or should I work with JDBCRDD and use 
> createDataFrame on that?
> 
> 
>> On 03/07/2017 02:19 PM, Jörn Franke wrote:
>> Can you provide some source code? I am not sure I understood the problem .
>> If you want to do a preprocessing at the JDBC datasource then you can write 
>> your own data source. Additionally you may want to modify the sql statement 
>> to extract the data in the right format and push some preprocessing to the 
>> database.
>> 
>>> On 7 Mar 2017, at 12:04, El-Hassan Wanas  wrote:
>>> 
>>> Hello,
>>> 
>>> There is, as usual, a big table lying on some JDBC data source. I am doing 
>>> some data processing on that data from Spark, however, in order to speed up 
>>> my analysis, I use reduced encodings and minimize the general size of the 
>>> data before processing.
>>> 
>>> Spark has been doing a great job at generating the proper workflows that do 
>>> that preprocessing for me, but it seems to generate those workflows for 
>>> execution on the Spark Cluster. The issue with that is the large transfer 
>>> cost is still incurred.
>>> 
>>> Is there any way to force Spark to run the preprocessing on the JDBC data 
>>> source and get the prepared output DataFrame instead?
>>> 
>>> Thanks,
>>> 
>>> Wanas
>>> 
>>> 
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>> 
> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Beginner: Correct approach for use case

2017-03-05 Thread Subhash Sriram
Hi Allan,

Where is the data stored right now? If it's in a relational database, and you 
are using Spark with Hadoop, I feel like it would make sense to move the import 
the data into HDFS, just because it would be faster to access the data. You 
could use Sqoop to do that.

In terms of having a long running Spark context, you could look into the Spark 
job server:

https://github.com/spark-jobserver/spark-jobserver/blob/master/README.md

It would allow you to cache all the data in memory and then accept queries via 
REST API calls. You would have to refresh your cache as the data changes of 
course, but it sounds like that is not very often.

In terms of running the queries themselves, I would think you could use Spark 
SQL and the DataFrame/DataSet API, which is built into Spark. You will have to 
think about the best way to partition your data, depending on the queries 
themselves.

Here is a link to the Spark SQL docs:

http://spark.apache.org/docs/latest/sql-programming-guide.html

I hope that helps, and I'm sure other folks will have some helpful advice as 
well.

Thanks,
Subhash 

Sent from my iPhone

> On Mar 5, 2017, at 3:49 PM, Allan Richards  wrote:
> 
> Hi,
> 
> I am looking to use Spark to help execute queries against a reasonably large 
> dataset (1 billion rows). I'm a bit lost with all the different libraries / 
> add ons to Spark, and am looking for some direction as to what I should look 
> at / what may be helpful.
> 
> A couple of relevant points:
>  - The dataset doesn't change over time. 
>  - There are a small number of applications (or queries I guess, but it's 
> more complicated than a single SQL query) that I want to run against it, but 
> the parameters to those queries will change all the time.
>  - There is a logical grouping of the data per customer, which will generally 
> consist of 1-5000 rows.
> 
> I want each query to run as fast as possible (less than a second or two). So 
> ideally I want to keep all the records in memory, but distributed over the 
> different nodes in the cluster. Does this mean sharing a SparkContext between 
> queries, or is this where HDFS comes in, or is there something else that 
> would be better suited?
> 
> Or is there another overall approach I should look into for executing queries 
> in "real time" against a dataset this size?
> 
> Thanks,
> Allan.


Re: question on transforms for spark 2.0 dataset

2017-03-01 Thread Subhash Sriram
If I am understanding your problem correctly, I think you can just create a
new DataFrame that is a transformation of sample_data by first registering
sample_data as a temp table.

//Register temp table
sample_data.createOrReplaceTempView("sql_sample_data")

//Create new DataSet with transformed values
val transformed = spark.sql("select trim(field1) as field1, trim(field2) as
field2.. from sql_sample_data")

//Test
transformed.show(10)

I hope that helps!
Subhash


On Wed, Mar 1, 2017 at 12:04 PM, Marco Mistroni  wrote:

> Hi I think u need an UDF if u want to transform a column
> Hth
>
> On 1 Mar 2017 4:22 pm, "Bill Schwanitz"  wrote:
>
>> Hi all,
>>
>> I'm fairly new to spark and scala so bear with me.
>>
>> I'm working with a dataset containing a set of column / fields. The data
>> is stored in hdfs as parquet and is sourced from a postgres box so fields
>> and values are reasonably well formed. We are in the process of trying out
>> a switch from pentaho and various sql databases to pulling data into hdfs
>> and applying transforms / new datasets with processing being done in spark
>> ( and other tools - evaluation )
>>
>> A rough version of the code I'm running so far:
>>
>> val sample_data = spark.read.parquet("my_data_input")
>>
>> val example_row = spark.sql("select * from parquet.my_data_input where id
>> = 123").head
>>
>> I want to apply a trim operation on a set of fields - lets call them
>> field1, field2, field3 and field4.
>>
>> What is the best way to go about applying those trims and creating a new
>> dataset? Can I apply the trip to all fields in a single map? or do I need
>> to apply multiple map functions?
>>
>> When I try the map ( even with a single )
>>
>> scala> val transformed_data = sample_data.map(
>>  |   _.trim(col("field1"))
>>  |   .trim(col("field2"))
>>  |   .trim(col("field3"))
>>  |   .trim(col("field4"))
>>  | )
>>
>> I end up with the following error:
>>
>> :26: error: value trim is not a member of
>> org.apache.spark.sql.Row
>>  _.trim(col("field1"))
>>^
>>
>> Any ideas / guidance would be appreciated!
>>
>