Understanding Spark execution plans

2020-08-05 Thread Daniel Stojanov
Hi,

When an execution plan is printed it lists the tree of operations that will
be completed when the job is run. The tasks have somewhat cryptic names of
the sort: BroadcastHashJoin, Project, Filter, etc. These do not appear to
map directly to functions that are performed on an RDD.

1) Is there a place in which each of these steps are documented?
2) Is there documentation, outside of Spark's source code, in which the map
between operations on Spark dataframes or RDDs and the resulting physical
execution plan is described? At least in a way that would allow for more
accurately understanding physical execution steps and predicting the steps
that would result from particular actions.

Regards,


Multi insert with join in Spark SQL

2020-08-05 Thread moqi
Hi,

I am trying to migrate Hive SQL to Spark SQL. When I execute the Multi
insert with join statement, Spark SQL will scan the same table multiple
times, while Hive SQL will only scan once. In the actual production
environment, this table is relatively large, which causes the running time
of Spark SQL to be longer than that of Hive SQL.

Can someone help me optimize the Multi insert with join statement to scan
the table only once on Spark SQL?

The environment I use is Spark 2.4.5. In the following simple code, I will
demonstrate the different execution plans of Spark SQL and Hive SQL.


--- SQL start

create table if not exists join_psn
(
id int,
name string,
cid int
) ;  
 insert overwrite table join_psn 
 select 1,'john',2 
 union all 
 select 2,'tom',2 
 union all 
 select 3,'jackson',1 ;


create table if not exists join_country_partition 
(
id int,
cname string,
loc string
) 
partitioned by (dt string);  
 insert overwrite table join_country_partition partition (dt='20200801') 
 select 1,'USA','America'
 union all
 select 2,'UK','European'
 union all
 select 3,'CN','Asia'
 union all
 select 4,'FR','European'
 union all
 select 5,'JP','Asia';

create table if not exists join_result1 
(
id int,
name string,
cname string
) 
create table if not exists join_result2 
(
id int,
name string,
cname string
) 


-- On Spark SQL: Different predicate statements cause multiple scans of the
same table
-- On Hive SQL: Multi Table Inserts minimize the number of data scans
required. Hive can insert data into multiple tables by scanning the input
data just once (and applying different query operators) to the input data.
 from (
select * from join_country_partition where dt='20200801'
 ) c
 join join_psn p
 on c.id=p.cid 
 insert overwrite table join_result1 
 select c.id,name,cname where c.id < 5
 insert overwrite table join_result2 
 select c.id,name,cname where name != 'FR';

-- Spark SQL Plan
Union
:- Execute InsertIntoHiveTable InsertIntoHiveTable `default`.`join_result1`,
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, true, false, [id, name,
cname]
:  +- *(2) Project [id#273, name#278, cname#274]
: +- *(2) BroadcastHashJoin [id#273], [cid#279], Inner, BuildLeft
::- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0,
int, false] as bigint)))
::  +- *(1) Filter (isnotnull(id#273) && (id#273 < 5))
:: +- Scan hive default.join_country_partition [id#273,
cname#274], HiveTableRelation `default`.`join_country_partition`,
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#273, cname#274,
loc#275], [dt#276], [isnotnull(dt#276), (dt#276 = 20200801)]
:+- *(2) Filter ((cid#279 < 5) && isnotnull(cid#279))
:   +- Scan hive default.join_psn [name#278, cid#279],
HiveTableRelation `default`.`join_psn`,
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#277, name#278,
cid#279]
+- Execute InsertIntoHiveTable InsertIntoHiveTable `default`.`join_result2`,
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, true, false, [id, name,
cname]
   +- *(4) Project [id#280, name#285, cname#281]
  +- *(4) BroadcastHashJoin [id#280], [cid#286], Inner, BuildLeft
 :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0,
int, false] as bigint)))
 :  +- *(3) Filter isnotnull(id#280)
 : +- Scan hive default.join_country_partition [id#280,
cname#281], HiveTableRelation `default`.`join_country_partition`,
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#280, cname#281,
loc#282], [dt#283], [isnotnull(dt#283), (dt#283 = 20200801)]
 +- *(4) Filter ((isnotnull(name#285) && NOT (name#285 = FR)) &&
isnotnull(cid#286))
+- Scan hive default.join_psn [name#285, cid#286],
HiveTableRelation `default`.`join_psn`,
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#284, name#285,
cid#286]
Time taken: 0.393 seconds, Fetched 1 row(s)

-- Hive SQL Plan
STAGE DEPENDENCIES:
  Stage-7 is a root stage
  Stage-6 depends on stages: Stage-7
  Stage-0 depends on stages: Stage-6
  Stage-3 depends on stages: Stage-0
  Stage-1 depends on stages: Stage-6
  Stage-4 depends on stages: Stage-1

STAGE PLANS:
  Stage: Stage-7
Map Reduce Local Work
  Alias -> Map Local Tables:
p
  Fetch Operator
limit: -1
  Alias -> Map Local Operator Tree:
p
  TableScan
alias: p
Statistics: Num rows: 0 Data size: 29 Basic stats: PARTIAL
Column stats: NONE
HashTable Sink Operator
  condition expressions:
0 {_col0} {_col1}
1 {name}
  keys:
0 _col0 (type: int)
1 cid (type: int)

  Stage: Stage-6
Map Reduce
  Map Operator Tree:
  TableScan
alias: join_country_partition
Statistics: Num rows: 0 Data size: 62 Basic stats: PARTIAL
Column stats: NONE
Select Operator
  

S3 read/write from PySpark

2020-08-05 Thread Daniel Stojanov
Hi,

I am trying to read/write files to S3 from PySpark. The procedure that I
have used is to download Spark, start PySpark with the hadoop-aws, guava,
aws-java-sdk-bundle packages. The versions are explicitly specified by
looking up the exact dependency version on Maven. Allowing dependencies to
be auto determined does not work. This procedure works for Spark 3.0.0 with
Hadoop 2.7, but does not work for Hadoop 3.2. I get this exception:
java.lang.ClassNotFoundException: Class
org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider


Can somebody point to a procedure for doing this using Spark bundled with
Hadoop 3.2?

Regards,





To replicate:

Download Spark 3.0.0 with support for Hadoop 3.2.

Launch spark with:

./pyspark --packages
org.apache.hadoop:hadoop-common:3.3.0,org.apache.hadoop:hadoop-client:3.3.0,org.apache.hadoop:hadoop-aws:3.3.0,com.amazonaws:aws-java-sdk-bundle:1.11.563,com.google.guava:guava:27.1-jre

Then run the following Python.


# Set these 4 parameters as appropriate.

aws_bucket = ""

aws_filename = ""

aws_access_key = ""

aws_secret_key = ""


spark._jsc.hadoopConfiguration().set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")

spark._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4",
"true")

spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider")

spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.amazonaws.com")

spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", aws_access_key)

spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", aws_secret_key)

df = spark.read.option("header",
"true").csv(f"s3a://{aws_bucket}/{aws_filename}")




Leads to this error message:




Traceback (most recent call last):
  File "", line 1, in 
  File
"/home/daniel/.local/share/Trash/files/spark-3.3.0.0-bin-hadoop3.2/python/pyspark/sql/readwriter.py",
line 535, in csv
return
self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
  File
"/home/daniel/.local/share/Trash/files/spark-3.3.0.0-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
line 1305, in __call__
  File
"/home/daniel/.local/share/Trash/files/spark-3.3.0.0-bin-hadoop3.2/python/pyspark/sql/utils.py",
line 131, in deco
return f(*a, **kw)
  File
"/home/daniel/.local/share/Trash/files/spark-3.3.0.0-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py",
line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o39.csv.
: java.io.IOException: From option fs.s3a.aws.credentials.provider
java.lang.ClassNotFoundException: Class
org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider not found
at
org.apache.hadoop.fs.s3a.S3AUtils.loadAWSProviderClasses(S3AUtils.java:645)
at
org.apache.hadoop.fs.s3a.S3AUtils.buildAWSProviderList(S3AUtils.java:668)
at
org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet(S3AUtils.java:619)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.bindAWSClient(S3AFileSystem.java:636)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:390)
at
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3303)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
at
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3352)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3320)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
at
org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:46)
at
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:361)
at
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:279)
at
org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:268)
at scala.Option.getOrElse(Option.scala:189)
at
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:268)
at
org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:705)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at 

Re: Tab delimited csv import and empty columns

2020-08-05 Thread Stephen Coy
Hi Sean, German and others,

Setting the “nullValue” option (for parsing CSV at least) seems to be an 
exercise in futility.

When parsing the file, 
com.univocity.parsers.common.input.AbstractCharInputReader#getString contains 
the following logic:


String out;
if (len <= 0) {
   out = nullValue;
} else {
   out = new String(buffer, pos, len);
}

resulting in the nullValue being assigned to the column value if it has zero 
length, such as with an empty String.

Later, org.apache.spark.sql.catalyst.csv.UnivocityParser#nullSafeDatum is 
called on the column value:


if (datum == options.nullValue || datum == null) {
  if (!nullable) {
throw new RuntimeException(s"null value found but field $name is not 
nullable.")
  }
  null
} else {
  converter.apply(datum)
}

Therefore, the empty String is first converted to the nullValue, and then 
matched against the nullValue and, bingo, we get the literal null.

For now, the “.na.fill(“”)” addition to the code is doing the right thing for 
me.

Thanks for all the help.


Steve C


On 1 Aug 2020, at 1:40 am, Sean Owen 
mailto:sro...@gmail.com>> wrote:

Try setting nullValue to anything besides the empty string. Because its default 
is the empty string, empty strings become null by default.

On Fri, Jul 31, 2020 at 3:20 AM Stephen Coy 
mailto:s...@infomedia.com.au.invalid>> wrote:
That does not work.

This is Spark 3.0 by the way.

I have been looking at the Spark unit tests and there does not seem to be any 
that load a CSV text file and verify that an empty string maps to an empty 
string which I think is supposed to be the default behaviour because the 
“nullValue” option defaults to “".

Thanks anyway

Steve C

On 30 Jul 2020, at 10:01 pm, German Schiavon Matteo 
mailto:gschiavonsp...@gmail.com>> wrote:

Hey,

I understand that your empty values in your CSV are "" , if so, try this option:

.option("emptyValue", "\"\"")

Hope it helps

On Thu, 30 Jul 2020 at 08:49, Stephen Coy 
mailto:s...@infomedia.com.au.invalid>> wrote:
Hi there,

I’m trying to import a tab delimited file with:

Dataset catalogData = sparkSession
  .read()
  .option("sep", "\t")
  .option("header", "true")
  .csv(args[0])
  .cache();

This works great, except for the fact that any column that is empty is given 
the value null, when I need these values to be literal empty strings.

Is there any option combination that will achieve this?

Thanks,

Steve C


[http://downloads.ifmsystems.com/data/marketing/images/signatures/driving-force-newsletter.jpg]

This email contains confidential information of and is the copyright of 
Infomedia. It must not be forwarded, amended or disclosed without consent of 
the sender. If you received this message by mistake, please advise the sender 
and delete all copies. Security of transmission on the internet cannot be 
guaranteed, could be infected, intercepted, or corrupted and you should ensure 
you have suitable antivirus protection in place. By sending us your or any 
third party personal details, you consent to (or confirm you have obtained 
consent from such third parties) to Infomedia’s privacy policy. 
http://www.infomedia.com.au/privacy-policy/




Re: Comments conventions in Spark distribution official examples

2020-08-05 Thread Sean Owen
These only matter to our documentation, which includes the source of
these examples inline in the docs. For brevity, the examples don't
need to show all the imports that are otherwise necessary for the
source file. You can ignore them like the compiler does as comments if
you are using the example directly. has no effect on the Spark
functionality.

On Wed, Aug 5, 2020 at 7:26 PM Fuad Efendi  wrote:
>
> Hello,
>
>
> I am trying to guess what such comments needed  for and cannot google it on 
> Internet, maybe some documentation tool? Both, Java and Scala, have this in 
> import statements and in a code: “$example on” and “$example off"
>
> package org.apache.spark.examples.sql
>
> // $example on:programmatic_schema$
> import org.apache.spark.sql.Row
> // $example off:programmatic_schema$
> // $example on:init_session$
> import org.apache.spark.sql.SparkSession
> // $example off:init_session$
> // $example on:programmatic_schema$
> // $example on:data_types$
> import org.apache.spark.sql.types._
> // $example off:data_types$
> // $example off:programmatic_schema$
>
> object SparkSQLExample {
>
>   // $example on:create_ds$
>   case class Person(name: String, age: Long)
>   // $example off:create_ds$
> ...
>
>
>
> Thanks,
>
>
>
>
> --
>
> Fuad Efendi
>
> (416) 993-2060
>
> http://www.tokenizer.ca
>
> Machine Learning, Data Mining, Recommender Systems, Search
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Comments conventions in Spark distribution official examples

2020-08-05 Thread Fuad Efendi
Hello,


I am trying to guess what such comments needed  for and cannot google it on
Internet, maybe some documentation tool? Both, Java and Scala, have this in
import statements and in a code: “$example on” and “$example off"

package org.apache.spark.examples.sql

// $example on:programmatic_schema$
import org.apache.spark.sql.Row
// $example off:programmatic_schema$
// $example on:init_session$
import org.apache.spark.sql.SparkSession
// $example off:init_session$
// $example on:programmatic_schema$
// $example on:data_types$
import org.apache.spark.sql.types._
// $example off:data_types$
// $example off:programmatic_schema$

object SparkSQLExample {

  // $example on:create_ds$
  case class Person(name: String, age: Long)
  // $example off:create_ds$
...



Thanks,




--

Fuad Efendi

(416) 993-2060

http://www.tokenizer.ca
Machine Learning, Data Mining, Recommender Systems, Search


Async API to save RDDs?

2020-08-05 Thread Antonin Delpeuch (lists)
Hi,

The RDD API provides async variants of a few RDD methods, which let the
user execute the corresponding jobs asynchronously. This makes it
possible to cancel the jobs for instance:
https://spark.apache.org/docs/latest/api/java/org/apache/spark/rdd/AsyncRDDActions.html

There does not seem to be async versions of the save methods such as
`saveAsTextFile`:
https://spark.apache.org/docs/latest/api/java/org/apache/spark/rdd/RDD.html#saveAsTextFile-java.lang.String-

Is there another way to start such jobs and get a handle on them (such
as the job id)? Specifically, I would like to be able to stop save jobs
on user request.

Thank you,
Antonin

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



file importing / hibernate

2020-08-05 Thread nt
1. I need to import csv files with a entity resolution logic, spark could
help me to process rows in parallel
Do you think is a good approach ?

2. I've quite complex database structure and eager to use i.e. hibernate to
resolve and save the data but it seems like everybody uses plain jdbc
is this the recommend solution ?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Renaming a DataFrame column makes Spark lose partitioning information

2020-08-05 Thread Antoine Wendlinger
Well that's great ! Thank you very much :)


Antoine

On Tue, Aug 4, 2020 at 11:22 PM Terry Kim  wrote:

> This is fixed in Spark 3.0 by https://github.com/apache/spark/pull/26943:
>
> scala> :paste
> // Entering paste mode (ctrl-D to finish)
>
> Seq((1, 2))
>   .toDF("a", "b")
>   .repartition($"b")
>   .withColumnRenamed("b", "c")
>   .repartition($"c")
>   .explain()
>
> // Exiting paste mode, now interpreting.
>
> == Physical Plan ==
> *(1) Project [a#7, b#8 AS c#11]
> +- Exchange hashpartitioning(b#8, 200), false, [id=#12]
>+- LocalTableScan [a#7, b#8]
>
> Thanks,
> Terry
>
> On Tue, Aug 4, 2020 at 6:26 AM Antoine Wendlinger <
> awendlin...@mytraffic.fr> wrote:
>
>> Hi,
>>
>> When renaming a DataFrame column, it looks like Spark is forgetting the
>> partition information:
>>
>> Seq((1, 2))
>>   .toDF("a", "b")
>>   .repartition($"b")
>>   .withColumnRenamed("b", "c")
>>   .repartition($"c")
>>   .explain()
>>
>> Gives the following plan:
>>
>> == Physical Plan ==
>> Exchange hashpartitioning(c#40, 10)
>> +- *(1) Project [a#36, b#37 AS c#40]
>>+- Exchange hashpartitioning(b#37, 10)
>>   +- LocalTableScan [a#36, b#37]
>>
>> As you can see, two shuffles are done, but the second one is unnecessary.
>> Is there a reason I don't know for this behavior ? Is there a way to work
>> around it (other than not renaming my columns) ?
>>
>> I'm using Spark 2.4.3.
>>
>>
>> Thanks for your help,
>>
>> Antoine
>>
>