What is DataFilters and while joining why is the filter isnotnull[joinKey] applied twice

2023-01-31 Thread Nitin Siwach
Pyspark version:3.1.3

*Question 1: *What is DataFilters in spark physical plan? How is it
different from PushedFilters?
*Question 2:* When joining two datasets, Why is the filter isnotnull
applied twice on the joining key column? In the physical plan, it is once
applied as a PushedFilter and then explicitly applied right after it. Why
is that so?


code:

import os
import pandas as pd, numpy as np
import pyspark
spark=pyspark.sql.SparkSession.builder.getOrCreate()

save_loc = "gs://monsoon-credittech.appspot.com/spark_datasets/random_tests/
"

df1 =
spark.createDataFrame(pd.DataFrame({'a':np.random.choice([1,2,None],size =
1000, p = [0.47,0.48,0.05]),
 'b': np.random.random(1000)}))

df2 =
spark.createDataFrame(pd.DataFrame({'a':np.random.choice([1,2,None],size =
1000, p = [0.47,0.48,0.05]),
 'b': np.random.random(1000)}))

df1.write.parquet(os.path.join(save_loc,"dfl_key_int"))
df2.write.parquet(os.path.join(save_loc,"dfr_key_int"))

dfl_int = spark.read.parquet(os.path.join(save_loc,"dfl_key_int"))
dfr_int = spark.read.parquet(os.path.join(save_loc,"dfr_key_int"))

dfl_int.join(dfr_int,on='a',how='inner').explain()



output:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [a#23L, b#24, b#28]
   +- BroadcastHashJoin [a#23L], [a#27L], Inner, BuildRight, false
  :- Filter isnotnull(a#23L)
  :  +- FileScan parquet [a#23L,b#24] Batched: true, DataFilters:
[isnotnull(a#23L)], Format: Parquet, Location:
InMemoryFileIndex[gs://monsoon-credittech.appspot.com/spark_datasets/random_tests/dfl_key_int],
PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema:
struct
  +- BroadcastExchange HashedRelationBroadcastMode(List(input[0,
bigint, false]),false), [id=#75]
 +- Filter isnotnull(a#27L)
+- FileScan parquet [a#27L,b#28] Batched: true,
DataFilters: [isnotnull(a#27L)], Format: Parquet, Location:
InMemoryFileIndex[gs://monsoon-credittech.appspot.com/spark_datasets/random_tests/dfr_key_int],
PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema:
struct



-- 
Regards,
Nitin


Fwd: [Spark Standalone Mode] How to read from kerberised HDFS in spark standalone mode

2023-01-31 Thread Wei Yan
Glad to hear that!
And hope it can help any other guys facing the same problem.

-- Forwarded message -
发件人: Bansal, Jaimita 
Date: 2023年2月1日周三 03:15
Subject: RE: [Spark Standalone Mode] How to read from kerberised HDFS in
spark standalone mode
To: Wei Yan 
Cc: Chittajallu, Rajiv ,
abner.espin...@ny.email.gs.com 


Hey Wei,



This worked!  Thank you so much.



Thanks,

Jaimita



*From:* Wei Yan 
*Sent:* Thursday, January 19, 2023 7:08 PM
*To:* Bansal, Jaimita [Engineering] 
*Subject:* Re: [Spark Standalone Mode] How to read from kerberised HDFS in
spark standalone mode



Hi!

You can use the  Delegation Token.

In the spark standalone mode ,the simple way to use the Delegation Token is
set an environment variable in every node include master nodes and work
nodes, and the content of this environment variable is the path of the
Delegation Token file.

You should renew this file at a fixed time interval.

hdfs fetchdt -renewer hive /opt/spark/conf/delegation.token



Bansal, Jaimita  于2023年1月20日周五 07:46写道:

Hi Spark Team,



We are facing an issue when trying to read from HDFS via spark running in
standalone cluster.  The issue comes from the executor node not able to
authenticate. It is using auth:SIMPLE when actually we have setup auth as
Kerberos.  Could you please help in resolving this?



Caused by: java.io.IOException:
org.apache.hadoop.security.AccessControlException: Client cannot
authenticate via:[TOKEN, KERBEROS]

at
org.apache.hadoop.ipc.Client$Connection$1.run(Client.java:778)
~[hadoop-common-3.1.1.7.1.7.1000-141.jar:na]





18:57:44.726 [main] DEBUG o.a.spark.deploy.SparkHadoopUtil - creating UGI
for user: 

18:57:45.045 [main] DEBUG o.a.h.security.UserGroupInformation - hadoop login

18:57:45.046 [main] DEBUG o.a.h.security.UserGroupInformation - hadoop
login commit

18:57:45.047 [main] DEBUG o.a.h.security.UserGroupInformation - using
kerberos user: @GS.COM

18:57:45.047 [main] DEBUG o.a.h.security.UserGroupInformation - Using user:
"@GS.COM" with name @GS.COM

18:57:45.047 [main] DEBUG o.a.h.security.UserGroupInformation - User entry:
" @GS.COM"

18:57:45.047 [main] DEBUG o.a.h.security.UserGroupInformation - UGI
loginUser:@GS.COM (auth:KERBEROS)

18:57:45.056 [main] DEBUG o.a.h.security.UserGroupInformation -
PrivilegedAction as: (auth:SIMPLE)
from:org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:61)

18:57:45.078 [TGT Renewer for @GS.COM] DEBUG
o.a.h.security.UserGroupInformation - Current time is 1674068265078

18:57:45.079 [TGT Renewer for @GS.COM] DEBUG
o.a.h.security.UserGroupInformation - Next refresh is 1674136785000

18:57:45.092 [main] INFO  org.apache.spark.SecurityManager - Changing view
acls to: root,

18:57:45.092 [main] INFO  org.apache.spark.SecurityManager - Changing
modify acls to: root,

18:57:45.093 [main] INFO  org.apache.spark.SecurityManager - Changing view
acls groups to:

18:57:45.093 [main] INFO  org.apache.spark.SecurityManager - Changing
modify acls groups to:



Thanks,

Jaimita



*Vice President, Data Lake Engineering*

*Goldman Sachs*




--


Your Personal Data: We may collect and process information about you that
may be subject to data protection laws. For more information about how we
use and disclose your personal data, how we protect your information, our
legal basis to use your information, your rights and who you can contact,
please refer to: www.gs.com/privacy-notices


--

Your Personal Data: We may collect and process information about you that
may be subject to data protection laws. For more information about how we
use and disclose your personal data, how we protect your information, our
legal basis to use your information, your rights and who you can contact,
please refer to: www.gs.com/privacy-notices


[Spark/deeplyR] how come spark is caching tables read through jdbc connection from oracle, even when memory=false is chosen

2023-01-31 Thread Joris Billen
This question is related to using Spark and deeplyR.
We load a lot of data from oracle in dataframes through a jdbc connection:

dfX <- spark_read_jdbc(spConn, “myconnection", 
options = list(
url = urlDEVdb,
driver = "oracle.jdbc.OracleDriver",
user = dbt_schema,
password = dbt_password,
dbtable = pQuery,
memory = FALSE # don't cache the whole (big) table
))

Then we do a lot of sql statemsnts, and use sdf_register to register the 
results. Eventually we want to write the final result to a db. 

Although we have set memory=FALSE, we see all these tables get cached. I notice 
that counts are triggered (I think this happens before a table is ccahed) and a 
collect is triggered. Also we think we see that when the tables are registered 
with sdf_register, looks like it triggers a collect action (almost looks like 
these are also cached). This leads to a lot of actions (often on the dataframes 
resulting from the same pipeline) which takes a long time.

Questions to people using deeplyR+spark:
1) Is it possible that this memory =false is ignored when reading through jdbc? 
2) can someone confirm that there is a lot of automatic caching happening (and 
hence a lot of counts and a lot of actions)?


Thanks for input!


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org