Parallel read parquet file, write to postgresql

2018-12-03 Thread James Starks
Reading Spark doc 
(https://spark.apache.org/docs/latest/sql-data-sources-parquet.html). It's not 
mentioned how to parallel read parquet file with SparkSession. Would 
--num-executors just work? Any additional parameters needed to be added to 
SparkSession as well?

Also if I want to parallel write data to database, would options 
'numPartitions' and 'batchsize' enough to improve write performance? For 
example,

 mydf.format("jdbc").
 option("driver", "org.postgresql.Driver").
 option("url", url).
 option("dbtable", table_name).
 option("user", username).
 option("password", password).
 option("numPartitions", N) .
 option("batchsize", M)
 save

From Spark website 
(https://spark.apache.org/docs/2.2.0/sql-programming-guide.html#jdbc-to-other-databases),
 I only find these two parameters that would have impact  on db write 
performance.

I appreciate any suggestions.

Re: Convert RDD[Iterrable[MyCaseClass]] to RDD[MyCaseClass]

2018-12-03 Thread James Starks
By taking with your advice flatMap, now I can convert result from 
RDD[Iterable[MyCaseClass]] to RDD[MyCaseClass]. Basically just to perform 
flatMap in the end before starting to convert RDD object back to DF (i.e. 
SparkSession.createDataFrame(rddRecordsOfMyCaseClass)). For instance,

df.map { ... }.filter{ ... }.flatMap { records => records.flatMap { record => 
Seq(record) } }

Not smart code, but it works for my case.

Thanks for the advice!

‐‐‐ Original Message ‐‐‐
On Saturday, December 1, 2018 12:17 PM, Chris Teoh  wrote:

> Hi James,
>
> Try flatMap (_.toList). See below example:-
>
> scala> case class MyClass(i:Int)
> defined class MyClass
>
> scala> val r = 1 to 100
> r: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3, 4, 5, 6, 7, 8, 
> 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 
> 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 
> 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 
> 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 
> 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)
>
> scala> val r2 = 101 to 200
> r2: scala.collection.immutable.Range.Inclusive = Range(101, 102, 103, 104, 
> 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 
> 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 
> 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 
> 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 
> 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 
> 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 
> 195, 196, 197, 198, 199, 200)
>
> scala> val c1 = r.map(MyClass(_)).toIterable
> c1: Iterable[MyClass] = Vector(MyClass(1), MyClass(2), MyClass(3), 
> MyClass(4), MyClass(5), MyClass(6), MyClass(7), MyClass(8), MyClass(9), 
> MyClass(10), MyClass(11), MyClass(12), MyClass(13), MyClass(14), MyClass(15), 
> MyClass(16), MyClass(17), MyClass(18), MyClass(19), MyClass(20), MyClass(21), 
> MyClass(22), MyClass(23), MyClass(24), MyClass(25), MyClass(26), MyClass(27), 
> MyClass(28), MyClass(29), MyClass(30), MyClass(31), MyClass(32), MyClass(33), 
> MyClass(34), MyClass(35), MyClass(36), MyClass(37), MyClass(38), MyClass(39), 
> MyClass(40), MyClass(41), MyClass(42), MyClass(43), MyClass(44), MyClass(45), 
> MyClass(46), MyClass(47), MyClass(48), MyClass(49), MyClass(50), MyClass(51), 
> MyClass(52), MyClass(53), MyClass(54), MyClass(55), MyClass(56), MyClass(57), 
> MyClass(58), MyClass(59), MyClass(...
>
> scala> val c2 = r2.map(MyClass(_)).toIterable
> c2: Iterable[MyClass] = Vector(MyClass(101), MyClass(102), MyClass(103), 
> MyClass(104), MyClass(105), MyClass(106), MyClass(107), MyClass(108), 
> MyClass(109), MyClass(110), MyClass(111), MyClass(112), MyClass(113), 
> MyClass(114), MyClass(115), MyClass(116), MyClass(117), MyClass(118), 
> MyClass(119), MyClass(120), MyClass(121), MyClass(122), MyClass(123), 
> MyClass(124), MyClass(125), MyClass(126), MyClass(127), MyClass(128), 
> MyClass(129), MyClass(130), MyClass(131), MyClass(132), MyClass(133), 
> MyClass(134), MyClass(135), MyClass(136), MyClass(137), MyClass(138), 
> MyClass(139), MyClass(140), MyClass(141), MyClass(142), MyClass(143), 
> MyClass(144), MyClass(145), MyClass(146), MyClass(147), MyClass(148), 
> MyClass(149), MyClass(150), MyClass(151), MyClass(152), MyClass(153), 
> MyClass(154), MyClass(15...
> scala> val rddIt = sc.parallelize(Seq(c1,c2))
> rddIt: org.apache.spark.rdd.RDD[Iterable[MyClass]] = ParallelCollectionRDD[2] 
> at parallelize at :28
>
> scala> rddIt.flatMap(_.toList)
> res4: org.apache.spark.rdd.RDD[MyClass] = MapPartitionsRDD[3] at flatMap at 
> :26
>
> res4 is what you're looking for.
>
> On Sat, 1 Dec 2018 at 21:09, Chris Teoh  wrote:
>
>> Do you have the full code example?
>>
>> I think this would be similar to the mapPartitions code flow, something like 
>> flatMap( _ =>  _.toList )
>>
>> I haven't yet tested this out but this is how I'd first try.
>>
>> On Sat, 1 Dec 2018 at 01:02, James Starks  
>> wrote:
>>
>>> When processing data, I create an instance of RDD[Iterable[MyCaseClass]] 
>>> and I want to convert it to RDD[MyCaseClass] so that it can be further 
>>> converted to dataset or dataframe with toDS() function. But I encounter a 
>>> problem that SparkContext can not be instantiated within SparkSession.map 
>>> function because it already exists, even with allowMultipleContexts set to 
>>> true.
>>>
>>> val sc = new SparkConf()
>>> sc.set("spark.driver.allowMultipleContexts", "true")
>>> new SparkContext(sc).parallelize(seq)
>>>
>>> How can I fix this?
>>>
>>> Thanks.
>>
>> --
>> Chris
>
> --
> Chris

Re: Caused by: java.io.NotSerializableException: com.softwaremill.sttp.FollowRedirectsBackend

2018-11-30 Thread James Starks
Shadowed with

object MyObject {
  def mymethod(param: MyParam) = actual_function(param)
}
class MyObject {
  import MyObject._
  session.map { ... =>
 mymethod(...)
  }
}

does the job.

Thanks for the advice!

‐‐‐ Original Message ‐‐‐
On Friday, November 30, 2018 9:26 AM,  wrote:

> If it’s just a couple of classes and they are actually suitable for 
> serializing and you have the source code then you can shadow them in your own 
> project with the serializable interface added. Your shadowed classes should 
> be on the classpath before the library’s versions which should lead to spark 
> being able to use the serializable versions.
>
> That’s very much a last resort though!
>
> Chris
>
> On 30 Nov 2018, at 05:08, Koert Kuipers  wrote:
>
>> if you only use it in the executors sometimes using lazy works
>>
>> On Thu, Nov 29, 2018 at 9:45 AM James Starks 
>>  wrote:
>>
>>> This is not problem directly caused by Spark, but it's related; thus asking 
>>> here. I use spark to read data from parquet and processing some http call 
>>> with sttp (https://github.com/softwaremill/sttp). However, spark throws
>>>
>>> Caused by: java.io.NotSerializableException: 
>>> com.softwaremill.sttp.FollowRedirectsBackend
>>>
>>> It's understood why such exception is thrown because
>>> FollowRedirectsBackend is not seralizable. So I would like know in such 
>>> case -  are there any ways to get around this problem without modifying, 
>>> recompiling original code?
>>>
>>> Thanks

Convert RDD[Iterrable[MyCaseClass]] to RDD[MyCaseClass]

2018-11-30 Thread James Starks
When processing data, I create an instance of RDD[Iterable[MyCaseClass]] and I 
want to convert it to RDD[MyCaseClass] so that it can be further converted to 
dataset or dataframe with toDS() function. But I encounter a problem that 
SparkContext can not be instantiated within SparkSession.map function because 
it already exists, even with allowMultipleContexts set to true.

val sc = new SparkConf()
sc.set("spark.driver.allowMultipleContexts", "true")
new SparkContext(sc).parallelize(seq)

How can I fix this?

Thanks.

Caused by: java.io.NotSerializableException: com.softwaremill.sttp.FollowRedirectsBackend

2018-11-29 Thread James Starks
This is not problem directly caused by Spark, but it's related; thus asking 
here. I use spark to read data from parquet and processing some http call with 
sttp (https://github.com/softwaremill/sttp). However, spark throws

Caused by: java.io.NotSerializableException: 
com.softwaremill.sttp.FollowRedirectsBackend
It's understood why such exception is thrown because

FollowRedirectsBackend is not seralizable. So I would like know in such case -  
are there any ways to get around this problem without modifying, recompiling 
original code?

Thanks

Re: Spark job's driver programe consums too much memory

2018-09-07 Thread James Starks
Yes I think I am confused because originally my thought was that executor only 
requires 10g then driver ideally do not need to consume more than 10g or at 
least not more than 20g. But this is not the case. My configuration is setting 
--dervier-memory to 25g and --executor-memory 10g. And my program basically 
only uses `filter`, `map`, `write.mode().parquet` as below (main logic)

val df = spark.read.format("jdbc")...option("dbtable", "select * from 
mytable where filedX <> ''")...load() /* sql returns around 8MM records. */
df.createOrReplaceTempView("newtable")
val newdf = spark.sql("select field1, ..., filedN from newtable" /* around 
50 fields */).as[MyCaseClass].filter {...}.map { ... }.filter { ... }
 newdf.wrie.mode(...).parquet(...)

So I don't understand why driver program need such huge memory? And I don't 
find related doc explaining this, either spark website or through google 
(perhaps I miss it by using wrong keyword). Any places that  may contain 
pointer to this?

I appreciate your help.


‐‐‐ Original Message ‐‐‐
On 7 September 2018 4:46 PM, Apostolos N. Papadopoulos  
wrote:

> You are putting all together and this does not make sense. Writing data
> to HDFS does not require that all data should be transfered back to the
> driver and THEN saved to HDFS.
>
> This would be a disaster and it would never scale. I suggest to check
> the documentation more carefully because I believe you are a bit confused.
>
> regards,
>
> Apostolos
>
> On 07/09/2018 05:39 μμ, James Starks wrote:
>
> > Is df.write.mode(...).parquet("hdfs://..") also actions function? Checking 
> > doc shows that my spark doesn't use those actions functions. But save 
> > functions looks resembling the function 
> > df.write.mode(overwrite).parquet("hdfs://path/to/parquet-file") used by my 
> > spark job uses. Therefore I am thinking maybe that's the reason why my 
> > spark job driver consumes such amount of memory.
> > https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#actions
> > My spark job's driver program consumes too much memory, so I want to 
> > prevent that by writing data to hdfs at the executor side, instead of 
> > waiting those data to be sent back to the driver program (then writing to 
> > hdfs). This is because our worker servers have bigger memory size than the 
> > one that runs driver program. If I can write data to hdfs at executor, then 
> > the driver memory for my spark job can be reduced.
> > Otherwise does Spark support streaming read from database (i.e. spark 
> > streaming + spark sql)?
> > Thanks for your reply.
> > ‐‐‐ Original Message ‐‐‐
> > On 7 September 2018 4:15 PM, Apostolos N. Papadopoulos papad...@csd.auth.gr 
> > wrote:
> >
> > > Dear James,
> > >
> > > -   check the Spark documentation to see the actions that return a lot of
> > > data back to the driver. One of these actions is collect(). However,
> > > take(x) is an action, also reduce() is an action.
> > > Before executing collect() find out what is the size of your RDD/DF.
> > >
> > > -   I cannot understand the phrase "hdfs directly from the executor". You
> > > can specify an hdfs file as your input and also you can use hdfs to
> > > store your output.
> > > regards,
> > > Apostolos
> > > On 07/09/2018 05:04 μμ, James Starks wrote:
> > >
> > >
> > > > I have a Spark job that read data from database. By increasing submit
> > > > parameter '--driver-memory 25g' the job can works without a problem
> > > > locally but not in prod env because prod master do not have enough
> > > > capacity.
> > > > So I have a few questions:
> > > > -  What functions such as collecct() would cause the data to be sent
> > > > back to the driver program?
> > > >   My job so far merely uses `as`, `filter`, `map`, and `filter`.
> > > >
> > > > -   Is it possible to write data (in parquet format for instance) to
> > > > hdfs directly from the executor? If so how can I do (any code 
> > > > snippet,
> > > > doc for reference, or what keyword to search cause can't find by 
> > > > e.g.
> > > > `spark direct executor hdfs write`)?
> > > >
> > > >
> > > > Thanks
> > > > --
> > >
> > > Apostolos N. Papadopoulos, Associate Professor
> > > Department of Informatics
> > > Aristotle University of Thessaloniki
> > > Thessaloniki, GREECE
> > > tel: ++0030312310991918
> > > email: papad...@csd.auth.gr
> > > twitter: @papadopoulos_ap
> > > web: http://datalab.csd.auth.gr/~apostol
> > >
> > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> --
>
> Apostolos N. Papadopoulos, Associate Professor
> Department of Informatics
> Aristotle University of Thessaloniki
> Thessaloniki, GREECE
> tel: ++0030312310991918
> email: papad...@csd.auth.gr
> twitter: @papadopoulos_ap
> web: http://datalab.csd.auth.gr/~apostol



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark job's driver programe consums too much memory

2018-09-07 Thread James Starks


Is df.write.mode(...).parquet("hdfs://..") also actions function? Checking doc 
shows that my spark doesn't use those actions functions. But save functions 
looks resembling the function 
df.write.mode(overwrite).parquet("hdfs://path/to/parquet-file") used by my 
spark job uses. Therefore I am thinking maybe that's the reason why my spark 
job driver consumes such amount of memory.

https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#actions

My spark job's driver program consumes too much memory, so I want to prevent 
that by writing data to hdfs at the executor side, instead of waiting those 
data to be sent back to the driver program (then writing to hdfs). This is 
because our worker servers have bigger memory size than the one that runs 
driver program. If I can write data to hdfs at executor, then the driver memory 
for my spark job can be reduced.

Otherwise does Spark support streaming read from database (i.e. spark streaming 
+ spark sql)?

Thanks for your reply.



‐‐‐ Original Message ‐‐‐
On 7 September 2018 4:15 PM, Apostolos N. Papadopoulos  
wrote:

> Dear James,
>
> -   check the Spark documentation to see the actions that return a lot of
> data back to the driver. One of these actions is collect(). However,
> take(x) is an action, also reduce() is an action.
>
> Before executing collect() find out what is the size of your RDD/DF.
>
> -   I cannot understand the phrase "hdfs directly from the executor". You
> can specify an hdfs file as your input and also you can use hdfs to
> store your output.
>
> regards,
>
> Apostolos
>
> On 07/09/2018 05:04 μμ, James Starks wrote:
>
>
> > I have a Spark job that read data from database. By increasing submit
> > parameter '--driver-memory 25g' the job can works without a problem
> > locally but not in prod env because prod master do not have enough
> > capacity.
> > So I have a few questions:
> > -  What functions such as collecct() would cause the data to be sent
> > back to the driver program?
> >   My job so far merely uses `as`, `filter`, `map`, and `filter`.
> >
> > -   Is it possible to write data (in parquet format for instance) to
> > hdfs directly from the executor? If so how can I do (any code snippet,
> > doc for reference, or what keyword to search cause can't find by e.g.
> > `spark direct executor hdfs write`)?
> >
> >
> > Thanks
>
> --
>
> Apostolos N. Papadopoulos, Associate Professor
> Department of Informatics
> Aristotle University of Thessaloniki
> Thessaloniki, GREECE
> tel: ++0030312310991918
> email: papad...@csd.auth.gr
> twitter: @papadopoulos_ap
> web: http://datalab.csd.auth.gr/~apostol
>
>
> ---
>
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark job's driver programe consums too much memory

2018-09-07 Thread James Starks
I have a Spark job that read data from database. By increasing submit parameter 
'--driver-memory 25g' the job can works without a problem locally but not in 
prod env because prod master do not have enough capacity.

So I have a few questions:

-  What functions such as collecct() would cause the data to be sent back to 
the driver program?
  My job so far merely uses `as`, `filter`, `map`, and `filter`.

- Is it possible to write data (in parquet format for instance) to hdfs 
directly from the executor? If so how can I do (any code snippet, doc for 
reference, or what keyword to search cause can't find by e.g. `spark direct 
executor hdfs write`)?

Thanks

Re: [External Sender] How to debug Spark job

2018-09-07 Thread James Starks
Got the root cause eventually as it throws java.lang.OutOfMemoryError: Java 
heap space. Increasing --driver-memory temporarily fixes the problem. Thanks.

‐‐‐ Original Message ‐‐‐
On 7 September 2018 12:32 PM, Femi Anthony  
wrote:

> One way I would go about this would be to try running a new_df.show(numcols, 
> truncate=False) on a few columns before you try writing to parquet to force 
> computation of newdf and see whether the hanging is occurring at that point 
> or during the write. You may also try doing a newdf.count() as well.
>
> Femi
>
> On Fri, Sep 7, 2018 at 5:48 AM James Starks  
> wrote:
>
>> I have a Spark job that reads from a postgresql (v9.5) table, and write 
>> result to parquet. The code flow is not complicated, basically
>>
>> case class MyCaseClass(field1: String, field2: String)
>> val df = spark.read.format("jdbc")...load()
>> df.createOrReplaceTempView(...)
>> val newdf = spark.sql("seslect field1, field2 from 
>> mytable").as[MyCaseClass].map { row =>
>>   val fieldX = ... // extract something from field2
>>   (field1, fileldX)
>> }.filter { ... /* filter out field 3 that's not valid */ }
>> newdf.write.mode(...).parquet(destPath)
>>
>> This job worked correct without a problem. But it's doesn't look working ok 
>> (the job looks like hanged) when adding more fields. The refactored job 
>> looks as below
>> ...
>> val newdf = spark.sql("seslect field1, field2, ... fieldN from 
>> mytable").as[MyCaseClassWithMoreFields].map { row =>
>> ...
>> NewCaseClassWithMoreFields(...) // all fields plus fieldX
>> }.filter { ... }
>> newdf.write.mode(...).parquet(destPath)
>>
>> Basically what the job does is extracting some info from one of a field in 
>> db table, appends that newly extracted field to the original row, and then 
>> dumps the whole new table to parquet.
>>
>> new filed + (original field1 + ... + original fieldN)
>> ...
>> ...
>>
>> Records loaded by spark sql to spark job (before refactored) are around 8MM, 
>> this remains the same, but when the refactored spark runs, it looks hanging 
>> there without progress. The only output on the console is (there is no 
>> crash, no exceptions thrown)
>>
>> WARN  HeartbeatReceiver:66 - Removing executor driver with no recent 
>> heartbeats: 137128 ms exceeds timeout 12 ms
>>
>> Memory in top command looks like
>>
>> VIRT RES SHR%CPU %MEM
>> 15.866g 8.001g  41.4m 740.3   25.6
>>
>> The command used to  submit spark job is
>>
>> spark-submit --class ... --master local[*] --driver-memory 10g 
>> --executor-memory 10g ... --files ... --driver-class-path ...  ...
>>
>> How can I debug or check which part of my code might cause the problem (so I 
>> can improve it)?
>>
>> Thanks
>
> ---
>
> The information contained in this e-mail is confidential and/or proprietary 
> to Capital One and/or its affiliates and may only be used solely in 
> performance of work or services for Capital One. The information transmitted 
> herewith is intended only for use by the individual or entity to which it is 
> addressed. If the reader of this message is not the intended recipient, you 
> are hereby notified that any review, retransmission, dissemination, 
> distribution, copying or other use of, or taking of any action in reliance 
> upon this information is strictly prohibited. If you have received this 
> communication in error, please contact the sender and delete the material 
> from your computer.

How to debug Spark job

2018-09-07 Thread James Starks
I have a Spark job that reads from a postgresql (v9.5) table, and write result 
to parquet. The code flow is not complicated, basically

case class MyCaseClass(field1: String, field2: String)
val df = spark.read.format("jdbc")...load()
df.createOrReplaceTempView(...)
val newdf = spark.sql("seslect field1, field2 from 
mytable").as[MyCaseClass].map { row =>
  val fieldX = ... // extract something from field2
  (field1, fileldX)
}.filter { ... /* filter out field 3 that's not valid */ }
newdf.write.mode(...).parquet(destPath)

This job worked correct without a problem. But it's doesn't look working ok 
(the job looks like hanged) when adding more fields. The refactored job looks 
as below
...
val newdf = spark.sql("seslect field1, field2, ... fieldN from 
mytable").as[MyCaseClassWithMoreFields].map { row =>
...
NewCaseClassWithMoreFields(...) // all fields plus fieldX
}.filter { ... }
newdf.write.mode(...).parquet(destPath)

Basically what the job does is extracting some info from one of a field in db 
table, appends that newly extracted field to the original row, and then dumps 
the whole new table to parquet.

new filed + (original field1 + ... + original fieldN)
...
...

Records loaded by spark sql to spark job (before refactored) are around 8MM, 
this remains the same, but when the refactored spark runs, it looks hanging 
there without progress. The only output on the console is (there is no crash, 
no exceptions thrown)

WARN  HeartbeatReceiver:66 - Removing executor driver with no recent 
heartbeats: 137128 ms exceeds timeout 12 ms

Memory in top command looks like

VIRT RES SHR%CPU %MEM
15.866g 8.001g  41.4m 740.3   25.6

The command used to  submit spark job is

spark-submit --class ... --master local[*] --driver-memory 10g 
--executor-memory 10g ... --files ... --driver-class-path ...  ...

How can I debug or check which part of my code might cause the problem (so I 
can improve it)?

Thanks

Re: Pass config file through spark-submit

2018-08-17 Thread James Starks
Accidentally to get it working, though don't thoroughly understand why (So far 
as I know, it's to configure in allowing executor refers to the conf file after 
copying to executors' working dir). Basically it's a combination of parameters 
--conf, --files, and --driver-class-path, instead of any single parameter.

spark-submit --class pkg.to.MyApp --master local[*] --conf 
"spark.executor.extraClassPath=-Dconfig.file=" --files 
 --driver-class-path ""

--conf requires to pass the conf file name e.g. myfile.conf along with spark 
executor class path as directive.

--files passes the conf file associated from the context root e.g. executing 
under dir , under which it contains folders such as conf, logs, 
work and so on. The conf file i.e. myfile.conf is located under conf folder.

--driver-class-path points to the conf directory with absolute path.


‐‐‐ Original Message ‐‐‐
On August 17, 2018 3:00 AM, yujhe.li  wrote:

> So can you read the file on executor side?
> I think the file passed by --files my.app.conf would be added under
> classpath, and you can use it directly.
>
>
> 
>
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> 
>
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Pass config file through spark-submit

2018-08-16 Thread James Starks
I have a config file that exploits type safe config library located on the 
local file system, and want to submit that file through spark-submit so that 
spark program can read customized parameters. For instance,

my.app {
  db {
host = domain.cc
port = 1234
db = dbname
user = myuser
passwd = mypass
  }
}

Spark submit code looks like

spark-submit --class "my.app.Sample" --master local[*] --conf 
"spark.executor.extraJavaOptions=-Dconfig.file=/path/to/conf/myapp.conf" 
/path/to/my-app.jar

But the program can not read the parameters such as db, user, host, and so on 
in my conf file.

Passing with --files /path/to/myapp.conf doesn't work either.

What is the correct way to submit that kind of conf file so that my spark job 
can read customized parameters from there?

Thanks

Data source jdbc does not support streamed reading

2018-08-08 Thread James Starks
Now my spark job can perform sql operations against database table. Next I want 
to combine  that with streaming context, so switching to readStream() function. 
But after job submission, spark throws

Exception in thread "main" java.lang.UnsupportedOperationException: Data 
source jdbc does not support streamed reading

That looks like sparkSession.readSteam.format("jdbc")... jdbc doesn't support 
streaming

val sparkSession = SparkSession.builder().appName("my-test").getOrCreate()
import session.implicits._
val df = sparkSession.readStream.format("jdbc")...load()
// other operations against df

Checking the example - 
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala

Also searching on the internet, I don't see any examples that close to my need. 
Any pointers or docs that may talk about this or code snippet that may 
illustrate such purpose?

Thanks

Re: Newbie question on how to extract column value

2018-08-07 Thread James Starks
Because of some legacy issues I can't immediately upgrade spark version. But I 
try filter data before loading it into spark based on the suggestion by

 val df = sparkSession.read.format("jdbc").option(...).option("dbtable", 
"(select .. from ... where url <> '') table_name")load()
 df.createOrReplaceTempView("new_table")

Then perform custom operation do the trick.

sparkSession.sql("select id, url from new_table").as[(String, String)].map 
{ case (id, url) =>
   val derived_data = ... // operation on url
   (id, derived_data)
}.show()

Thanks for the advice, it's really helpful!

‐‐‐ Original Message ‐‐‐
On August 7, 2018 5:33 PM, Gourav Sengupta  wrote:

> Hi James,
>
> It is always advisable to use the latest SPARK version. That said, can you 
> please giving a try to dataframes and udf if possible. I think, that would be 
> a much scalable way to address the issue.
>
> Also in case possible, it is always advisable to use the filter option before 
> fetching the data to Spark.
>
> Thanks and Regards,
> Gourav
>
> On Tue, Aug 7, 2018 at 4:09 PM, James Starks  
> wrote:
>
>> I am very new to Spark. Just successfully setup Spark SQL connecting to 
>> postgresql database, and am able to display table with code
>>
>> sparkSession.sql("SELECT id, url from table_a where col_b <> '' ").show()
>>
>> Now I want to perform filter and map function on col_b value. In plain scala 
>> it would be something like
>>
>> Seq((1, "http://a.com/a;), (2, "http://b.com/b;), (3, "unknown")).filter 
>> { case (_, url) => isValid(url) }.map { case (id, url)  => (id, pathOf(url)) 
>> }
>>
>> where filter will remove invalid url, and then map (id, url) to (id, path of 
>> url).
>>
>> However, when applying this concept to spark sql with code snippet
>>
>> sparkSession.sql("...").filter(isValid($"url"))
>>
>> Compiler complains type mismatch because $"url" is ColumnName type. How can 
>> I extract column value i.e. http://... for the column url in order to 
>> perform filter function?
>>
>> Thanks
>>
>> Java 1.8.0
>> Scala 2.11.8
>> Spark 2.1.0

Newbie question on how to extract column value

2018-08-07 Thread James Starks
I am very new to Spark. Just successfully setup Spark SQL connecting to 
postgresql database, and am able to display table with code

sparkSession.sql("SELECT id, url from table_a where col_b <> '' ").show()

Now I want to perform filter and map function on col_b value. In plain scala it 
would be something like

Seq((1, "http://a.com/a;), (2, "http://b.com/b;), (3, "unknown")).filter { 
case (_, url) => isValid(url) }.map { case (id, url)  => (id, pathOf(url)) }

where filter will remove invalid url, and then map (id, url) to (id, path of 
url).

However, when applying this concept to spark sql with code snippet

sparkSession.sql("...").filter(isValid($"url"))

Compiler complains type mismatch because $"url" is ColumnName type. How can I 
extract column value i.e. http://... for the column url in order to perform 
filter function?

Thanks

Java 1.8.0
Scala 2.11.8
Spark 2.1.0