[Spark SQL] Couldn't save dataframe with null columns to S3.

2018-11-05 Thread ehbhaskar
I have a spark job that writes data to S3 as below.
source_data_df_to_write.select(target_columns_list) \
.write.partitionBy(target_partition_cols_list) \
.format("ORC").save(self.table_location_prefix + self.target_table,
mode="append")

My dataframe some times can have null values for columns. Writing dataframe
with null attributes fails my job stating IllegalArgumentException as below.
Caused by: java.lang.*IllegalArgumentException: Error: type expected at the
position 14 of
*'double:string:null:string:string:string:double:bigint:null:null:null:null:string:null:string:null:null:null:null:string:string:string:null:string:string:string:string:string:string:string:string:string:string:string:string:string:string:string:string:string:null:null:null:null:null:null:null:null:null:null:null:null:null:null:null:null:null:string:string:string:string:string:string:string:string:string:string:string:string:string:string:string:string:string:string:string:string:string:string:string:string:string:string'*
but 'null' is found*.


Sample dataframe looks like this:
columns_with_default = "col1, NULL as col2, col2, col4, NULL as col5,
partition_col1, partition_col2"
source_data_df_to_write = self.session.sql(
 "SELECT %s FROM TEMP_VIEW" % (columns_with_default))

So, is there a way to make spark job to write dataframe with NULL attributes
to S3? 



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Equivalent of emptyDataFrame in StructuredStreaming

2018-11-05 Thread Jungtaek Lim
Could you explain what you're trying to do? It should have no batch for no
data in stream, so it will end up to no-op even it is possible.

- Jungtaek Lim (HeartSaVioR)

2018년 11월 6일 (화) 오전 8:29, Arun Manivannan 님이 작성:

> Hi,
>
> I would like to create a "zero" value for a Structured Streaming Dataframe
> and unfortunately, I couldn't find any leads.  With Spark batch, I can do a
> "emptyDataFrame" or "createDataFrame" with "emptyRDD" but with
> StructuredStreaming, I am lost.
>
> If I use the "emptyDataFrame" as the zero value, I wouldn't be able to
> join them with any other DataFrames in the program because Spark doesn't
> allow you to mix batch and stream data frames. (isStreaming=false for the
> Batch ones).
>
> Any clue is greatly appreciated. Here are the alternatives that I have at
> the moment.
>
> *1. Reading from an empty file *
> *Disadvantages : poll is expensive because it involves IO and it's error
> prone in the sense that someone might accidentally update the file.*
>
> val emptyErrorStream = (spark: SparkSession) => {
>   spark
> .readStream
> .format("csv")
> .schema(DataErrorSchema)
> 
> .load("/Users/arunma/IdeaProjects/OSS/SparkDatalakeKitchenSink/src/test/resources/dummy1.txt")
> .as[DataError]
> }
>
> *2. Use MemoryStream*
>
> *Disadvantages: MemoryStream itself is not recommended for production use 
> because of the ability to mutate it but I am converting it to DS immediately. 
> So, I am leaning towards this at the moment. *
>
>
> val emptyErrorStream = (spark:SparkSession) => {
>   implicit val sqlC = spark.sqlContext
>   MemoryStream[DataError].toDS()
> }
>
> Cheers,
> Arun
>


Equivalent of emptyDataFrame in StructuredStreaming

2018-11-05 Thread Arun Manivannan
Hi,

I would like to create a "zero" value for a Structured Streaming Dataframe
and unfortunately, I couldn't find any leads.  With Spark batch, I can do a
"emptyDataFrame" or "createDataFrame" with "emptyRDD" but with
StructuredStreaming, I am lost.

If I use the "emptyDataFrame" as the zero value, I wouldn't be able to join
them with any other DataFrames in the program because Spark doesn't allow
you to mix batch and stream data frames. (isStreaming=false for the Batch
ones).

Any clue is greatly appreciated. Here are the alternatives that I have at
the moment.

*1. Reading from an empty file *
*Disadvantages : poll is expensive because it involves IO and it's error
prone in the sense that someone might accidentally update the file.*

val emptyErrorStream = (spark: SparkSession) => {
  spark
.readStream
.format("csv")
.schema(DataErrorSchema)

.load("/Users/arunma/IdeaProjects/OSS/SparkDatalakeKitchenSink/src/test/resources/dummy1.txt")
.as[DataError]
}

*2. Use MemoryStream*

*Disadvantages: MemoryStream itself is not recommended for production
use because of the ability to mutate it but I am converting it to DS
immediately. So, I am leaning towards this at the moment. *


val emptyErrorStream = (spark:SparkSession) => {
  implicit val sqlC = spark.sqlContext
  MemoryStream[DataError].toDS()
}

Cheers,
Arun


Re: Shuffle write explosion

2018-11-05 Thread Dillon Dukek
What is your function in mapToPair doing?

-Dillon

On Mon, Nov 5, 2018 at 1:41 PM Taylor Cox 
wrote:

> At first glance, I wonder if your tables are partitioned? There may not be
> enough parallelism happening. You can also pass in the number of partitions
> and/or a custom partitioner to help Spark “guess” how to organize the
> shuffle.
>
>
>
> Have you seen any of these docs?
>
>
> https://people.eecs.berkeley.edu/~kubitron/courses/cs262a-F13/projects/reports/project16_report.pdf
>
> https://spark.apache.org/docs/latest/tuning.html
>
>
>
> Taylor
>
>
>
>
>
> *From:* Yichen Zhou 
> *Sent:* Sunday, November 4, 2018 11:42 PM
> *To:* user@spark.apache.org
> *Subject:* Shuffle write explosion
>
>
>
> Hi All,
>
>
>
> When running a spark job, I have 100MB+ input and get more than 700GB
> shuffle write, which is really weird. And this job finally end up with the
> OOM error. Does anybody know why this happened?
>
> [image: Screen Shot 2018-11-05 at 15.20.35.png]
>
> My code is like:
>
> JavaPairRDD inputRDD = sc.sequenceFile(inputPath, Text.class,
> Text.class);
>
>
>  
> inputRDD.repartition(partitionNum).mapToPair(...).saveAsNewAPIHadoopDataset(job.getConfiguration())
> ;
>
>
> Environment:
>
> *CPU 32 core; Memory 256G; Storage 7.5G CentOS 7.5*
>
> *java version "1.8.0_162"*
>
> *Spark 2.1.2*
>
>
> Any help is greatly appreciated.
>
>
>
> Regards,
>
> Yichen
>


Re: [Spark SQL] INSERT OVERWRITE to a hive partitioned table (pointing to s3) from spark is too slow.

2018-11-05 Thread Bhaskar Ebbur
Posted in Mailing list too.

My process generates at most 150 files. As I said it takes more time (to
move files from temp folder to target path) for table with many partitions
compared to table with less partitions. Not sure what's the reason behind
such behavior.

I tried with writing files directly to s3 and then add partitions to hive
table. But, spark job doesn't save dataframe with null values. I get
IllegalArgument exception stating - found `null` instead of .


On Mon, Nov 5, 2018 at 2:41 AM Jörn Franke  wrote:

> Can you share it with the mailing list?
>
> I believe it would be more efficient to work in Spark just at the file
> level (without using Hive) and at the end let Hive discover the new files
> via MSCK repair.
> It could be that your process generates a lot of small files and this is
> very inefficient on Hadoop (try to have larger partitions at least 128M
> size)
>
> Am 05.11.2018 um 08:58 schrieb Bhaskar Ebbur :
>
> Here's code with correct data frame.
>
> self.session = SparkSession \
> .builder \
> .appName(self.app_name) \
> .config("spark.dynamicAllocation.enabled", "false") \
> .config("hive.exec.dynamic.partition.mode", "nonstrict") \
> .config("mapreduce.fileoutputcommitter.algorithm.version",
> "2") \
> .config("hive.load.dynamic.partitions.thread", "10") \
> .config("hive.mv.files.thread", "30") \
> .config("fs.trash.interval", "0") \
> .enableHiveSupport()
>
> columns_with_default = "col1, NULL as col2, col2, col4, NULL as col5,
> partition_col1, partition_col2"
> source_data_df_to_write = self.session.sql(
>  "SELECT %s FROM TEMP_VIEW" % (columns_with_default))
>
> source_data_df_to_write\
> .coalesce(50)\
> .createOrReplaceTempView("TEMP_VIEW")
>
> table_name_abs = "%s.%s" % (self.database, self.target_table)
> self.session.sql(
> "INSERT OVERWRITE TABLE %s "
> "PARTITION (%s) "
> "SELECT %s FROM TEMP_VIEW" % (
> table_name_abs, "partition_col1, partition_col2",
> columns_with_default))
>
>
>
>
> On Sun, Nov 4, 2018 at 11:30 PM Bhaskar Ebbur  wrote:
>
>> Here's some sample code.
>>
>> self.session = SparkSession \
>> .builder \
>> .appName(self.app_name) \
>> .config("spark.dynamicAllocation.enabled", "false") \
>> .config("hive.exec.dynamic.partition.mode", "nonstrict") \
>> .config("mapreduce.fileoutputcommitter.algorithm.version",
>> "2") \
>> .config("hive.load.dynamic.partitions.thread", "10") \
>> .config("hive.mv.files.thread", "30") \
>> .config("fs.trash.interval", "0") \
>> .enableHiveSupport()
>>
>> columns_with_default = "col1, NULL as col2, col2, col4, NULL as col5,
>> partition_col1, partition_col2"
>> source_data_df_to_write = self.session.sql(
>>  "SELECT %s, %s, %s as %s, %s as %s FROM TEMP_VIEW" %
>> (columns_with_default))
>>  source_data_df_to_write\
>>  .coalesce(50)\
>>  .createOrReplaceTempView("TEMP_VIEW")
>>
>> table_name_abs = "%s.%s" % (self.database, self.target_table)
>> self.session.sql(
>> "INSERT OVERWRITE TABLE %s "
>> "PARTITION (%s) "
>> "SELECT %s FROM TEMP_VIEW" % (
>> table_name_abs, "partition_col1, partition_col2",
>> columns_with_default))
>>
>>
>> On Sun, Nov 4, 2018 at 11:08 PM Jörn Franke  wrote:
>>
>>> Can you share some relevant source code?
>>>
>>>
>>> > Am 05.11.2018 um 07:58 schrieb ehbhaskar :
>>> >
>>> > I have a pyspark job that inserts data into hive partitioned table
>>> using
>>> > `Insert Overwrite` statement.
>>> >
>>> > Spark job loads data quickly (in 15 mins) to temp directory
>>> (~/.hive-***) in
>>> > S3. But, it's very slow in moving data from temp directory to the
>>> target
>>> > path, it takes more than 40 mins to move data from temp to target path.
>>> >
>>> > I set the option mapreduce.fileoutputcommitter.algorithm.version=2
>>> (default
>>> > is 1) but still I see no change.
>>> >
>>> > *Are there any ways to improve the performance of hive INSERT OVERWRITE
>>> > query from spark?*
>>> >
>>> > Also, I noticed that this behavior is even worse (i.e. job takes even
>>> more
>>> > time) with hive table that has too many existing partitions. i.e. The
>>> data
>>> > loads relatively fast into table that have less existing partitions.
>>> >
>>> > *Some additional details:*
>>> > * Table is a dynamic partitioned table.
>>> > * Spark version - 2.3.0
>>> > * Hive version - 2.3.2-amzn-2
>>> > * Hadoop version - 2.8.3-amzn-0
>>> >
>>> > PS: Other config options I have tried that didn't have much effect on
>>> the
>>> > job performance.
>>> > * "hive.load.dynamic.partitions.thread - "10"
>>> > * "hive.mv.files.thread" - "30"
>>> > * "fs.trash.interval" - "0".
>>> >
>>> >
>>> >
>>> > --
>>> > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>> >
>>> > 

Re: [Spark SQL] INSERT OVERWRITE to a hive partitioned table (pointing to s3) from spark is too slow.

2018-11-05 Thread ehbhaskar
Here's code with correct data frame.

self.session = SparkSession \
.builder \
.appName(self.app_name) \
.config("spark.dynamicAllocation.enabled", "false") \
.config("hive.exec.dynamic.partition.mode", "nonstrict") \
.config("mapreduce.fileoutputcommitter.algorithm.version", "2")
\
.config("hive.load.dynamic.partitions.thread", "10") \
.config("hive.mv.files.thread", "30") \
.config("fs.trash.interval", "0") \
.enableHiveSupport()

columns_with_default = "col1, NULL as col2, col2, col4, NULL as col5,
partition_col1, partition_col2"
source_data_df_to_write = self.session.sql(
 "SELECT %s FROM TEMP_VIEW" % (columns_with_default))



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: How to avoid long-running jobs blocking short-running jobs

2018-11-05 Thread Taylor Cox
Hi Conner,

What is preventing you from using a cluster model?
I wonder if docker containers could help you here?
A quick internet search yielded Mist: https://github.com/Hydrospheredata/mist 
Could be useful?

Taylor

-Original Message-
From: conner  
Sent: Saturday, November 3, 2018 2:04 AM
To: user@spark.apache.org
Subject: How to avoid long-running jobs blocking short-running jobs

Hi,

I use spark cluster to run ETL jobs and analysis computation about the data 
after elt stage.
The elt jobs can keep running for several hours, but analysis computation is a 
short-running job which can finish in a few seconds.
The dilemma I entrapped is that my application runs in a single JVM and can't 
be a cluster application, so just one spark context in my application 
currently. But when the elt jobs are running, the jobs will occupy all resource 
including worker executors too long to block all my analysis computation jobs. 

My solution is to find a good way to divide the spark cluster resource into 
two. One part for analysis computation jobs, another for elt jobs. if the part 
for elt jobs is free, I can allocate analysis computation jobs to it.
So I want to find a middleware that can support two spark context and it must 
be embedded in my application. I do some research on the third party project 
spark job server. It can divide spark resource by launching another JVM to run 
spark context with a specific resource.
these operations are invisible to the upper layer, so it's a good solution for 
me. But this project is running in a single JVM  and just support REST API, I 
can't endure the data transfer by TCP again which too slow to me. I want to get 
a result from spark cluster by TCP and give this result to view layer to show.
Can anybody give me some good suggestion? I shall be so grateful.





--
Sent from: 
https://na01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fapache-spark-user-list.1001560.n3.nabble.com%2Fdata=02%7C01%7CTaylor.Cox%40microsoft.com%7C3f9379c723d64ca988a908d6416b4f7c%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636768326485388503sdata=h%2BOzv9rIxo%2FYI6xmjFYvEyvcptmDXEBBA%2BDVhngpKsk%3Dreserved=0

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: Shuffle write explosion

2018-11-05 Thread Taylor Cox
At first glance, I wonder if your tables are partitioned? There may not be 
enough parallelism happening. You can also pass in the number of partitions 
and/or a custom partitioner to help Spark “guess” how to organize the shuffle.

Have you seen any of these docs?
https://people.eecs.berkeley.edu/~kubitron/courses/cs262a-F13/projects/reports/project16_report.pdf
https://spark.apache.org/docs/latest/tuning.html

Taylor


From: Yichen Zhou 
Sent: Sunday, November 4, 2018 11:42 PM
To: user@spark.apache.org
Subject: Shuffle write explosion

Hi All,

When running a spark job, I have 100MB+ input and get more than 700GB shuffle 
write, which is really weird. And this job finally end up with the OOM error. 
Does anybody know why this happened?
[Screen Shot 2018-11-05 at 15.20.35.png]
My code is like:
JavaPairRDD inputRDD = sc.sequenceFile(inputPath, Text.class, 
Text.class);
 
inputRDD.repartition(partitionNum).mapToPair(...).saveAsNewAPIHadoopDataset(job.getConfiguration());

Environment:
CPU 32 core; Memory 256G; Storage 7.5G
CentOS 7.5
java version "1.8.0_162"
Spark 2.1.2

Any help is greatly appreciated.

Regards,
Yichen


Modifying pyspark sources

2018-11-05 Thread Soheil Pourbafrani
I want to apply minor modification in PySpark mllib but checking the
PySpark sources I found it uses the Scala (Java) sources. Is there any way
to do modification using Python?


Re: mLIb solving linear regression with sparse inputs

2018-11-05 Thread Robineast
Well I did eventually write this code in Java, and it was very long! see 
https://github.com/insidedctm/sparse-linear-regression
  



-
Robin East 
Spark GraphX in Action Michael Malak and Robin East 
Manning Publications Co. 
http://www.manning.com/books/spark-graphx-in-action

--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How to use the Graphframe PageRank method with dangling edges?

2018-11-05 Thread Alexander Czech
I have graph that has a couple of dangling edges. I use pyspark and work
with spark 2.2.0. It kind of looks like this:

g.vertices.show()
+---+
| id|
+---+
|  1|
|  2|
|  3|
|  4|
+---+
g.edges.show()
+---++
|src| dst|
+---++
|  1|   2|
|  2|   3|
|  3|   4|
|  4|   1|
|  4|null|
+---++

Now when I call g.pageRank(resetProbability=0.15, tol=0.01)it
obviously fails because one edge is pointing towards null.
What I want is that every page rank weight that is distributed towards
a dangling edge is randomly distrusted to a node in the graph much
like the dampening factor or resetProbability in the pagerank
function. Is this possible without rewriting the pagerank method?
Because my Scala knowledge is zero and reimplemeting it in phyton is
probably a pretty slow solution.

Thanks