What is DataFilters and while joining why is the filter isnotnull[joinKey] applied twice
Pyspark version:3.1.3 *Question 1: *What is DataFilters in spark physical plan? How is it different from PushedFilters? *Question 2:* When joining two datasets, Why is the filter isnotnull applied twice on the joining key column? In the physical plan, it is once applied as a PushedFilter and then explicitly applied right after it. Why is that so? code: import os import pandas as pd, numpy as np import pyspark spark=pyspark.sql.SparkSession.builder.getOrCreate() save_loc = "gs://monsoon-credittech.appspot.com/spark_datasets/random_tests/ " df1 = spark.createDataFrame(pd.DataFrame({'a':np.random.choice([1,2,None],size = 1000, p = [0.47,0.48,0.05]), 'b': np.random.random(1000)})) df2 = spark.createDataFrame(pd.DataFrame({'a':np.random.choice([1,2,None],size = 1000, p = [0.47,0.48,0.05]), 'b': np.random.random(1000)})) df1.write.parquet(os.path.join(save_loc,"dfl_key_int")) df2.write.parquet(os.path.join(save_loc,"dfr_key_int")) dfl_int = spark.read.parquet(os.path.join(save_loc,"dfl_key_int")) dfr_int = spark.read.parquet(os.path.join(save_loc,"dfr_key_int")) dfl_int.join(dfr_int,on='a',how='inner').explain() output: == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Project [a#23L, b#24, b#28] +- BroadcastHashJoin [a#23L], [a#27L], Inner, BuildRight, false :- Filter isnotnull(a#23L) : +- FileScan parquet [a#23L,b#24] Batched: true, DataFilters: [isnotnull(a#23L)], Format: Parquet, Location: InMemoryFileIndex[gs://monsoon-credittech.appspot.com/spark_datasets/random_tests/dfl_key_int], PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struct +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#75] +- Filter isnotnull(a#27L) +- FileScan parquet [a#27L,b#28] Batched: true, DataFilters: [isnotnull(a#27L)], Format: Parquet, Location: InMemoryFileIndex[gs://monsoon-credittech.appspot.com/spark_datasets/random_tests/dfr_key_int], PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struct -- Regards, Nitin
Fwd: [Spark Standalone Mode] How to read from kerberised HDFS in spark standalone mode
Glad to hear that! And hope it can help any other guys facing the same problem. -- Forwarded message - 发件人: Bansal, Jaimita Date: 2023年2月1日周三 03:15 Subject: RE: [Spark Standalone Mode] How to read from kerberised HDFS in spark standalone mode To: Wei Yan Cc: Chittajallu, Rajiv , abner.espin...@ny.email.gs.com Hey Wei, This worked! Thank you so much. Thanks, Jaimita *From:* Wei Yan *Sent:* Thursday, January 19, 2023 7:08 PM *To:* Bansal, Jaimita [Engineering] *Subject:* Re: [Spark Standalone Mode] How to read from kerberised HDFS in spark standalone mode Hi! You can use the Delegation Token. In the spark standalone mode ,the simple way to use the Delegation Token is set an environment variable in every node include master nodes and work nodes, and the content of this environment variable is the path of the Delegation Token file. You should renew this file at a fixed time interval. hdfs fetchdt -renewer hive /opt/spark/conf/delegation.token Bansal, Jaimita 于2023年1月20日周五 07:46写道: Hi Spark Team, We are facing an issue when trying to read from HDFS via spark running in standalone cluster. The issue comes from the executor node not able to authenticate. It is using auth:SIMPLE when actually we have setup auth as Kerberos. Could you please help in resolving this? Caused by: java.io.IOException: org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS] at org.apache.hadoop.ipc.Client$Connection$1.run(Client.java:778) ~[hadoop-common-3.1.1.7.1.7.1000-141.jar:na] 18:57:44.726 [main] DEBUG o.a.spark.deploy.SparkHadoopUtil - creating UGI for user: 18:57:45.045 [main] DEBUG o.a.h.security.UserGroupInformation - hadoop login 18:57:45.046 [main] DEBUG o.a.h.security.UserGroupInformation - hadoop login commit 18:57:45.047 [main] DEBUG o.a.h.security.UserGroupInformation - using kerberos user: @GS.COM 18:57:45.047 [main] DEBUG o.a.h.security.UserGroupInformation - Using user: "@GS.COM" with name @GS.COM 18:57:45.047 [main] DEBUG o.a.h.security.UserGroupInformation - User entry: " @GS.COM" 18:57:45.047 [main] DEBUG o.a.h.security.UserGroupInformation - UGI loginUser:@GS.COM (auth:KERBEROS) 18:57:45.056 [main] DEBUG o.a.h.security.UserGroupInformation - PrivilegedAction as: (auth:SIMPLE) from:org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:61) 18:57:45.078 [TGT Renewer for @GS.COM] DEBUG o.a.h.security.UserGroupInformation - Current time is 1674068265078 18:57:45.079 [TGT Renewer for @GS.COM] DEBUG o.a.h.security.UserGroupInformation - Next refresh is 1674136785000 18:57:45.092 [main] INFO org.apache.spark.SecurityManager - Changing view acls to: root, 18:57:45.092 [main] INFO org.apache.spark.SecurityManager - Changing modify acls to: root, 18:57:45.093 [main] INFO org.apache.spark.SecurityManager - Changing view acls groups to: 18:57:45.093 [main] INFO org.apache.spark.SecurityManager - Changing modify acls groups to: Thanks, Jaimita *Vice President, Data Lake Engineering* *Goldman Sachs* -- Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices -- Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices
[Spark/deeplyR] how come spark is caching tables read through jdbc connection from oracle, even when memory=false is chosen
This question is related to using Spark and deeplyR. We load a lot of data from oracle in dataframes through a jdbc connection: dfX <- spark_read_jdbc(spConn, “myconnection", options = list( url = urlDEVdb, driver = "oracle.jdbc.OracleDriver", user = dbt_schema, password = dbt_password, dbtable = pQuery, memory = FALSE # don't cache the whole (big) table )) Then we do a lot of sql statemsnts, and use sdf_register to register the results. Eventually we want to write the final result to a db. Although we have set memory=FALSE, we see all these tables get cached. I notice that counts are triggered (I think this happens before a table is ccahed) and a collect is triggered. Also we think we see that when the tables are registered with sdf_register, looks like it triggers a collect action (almost looks like these are also cached). This leads to a lot of actions (often on the dataframes resulting from the same pipeline) which takes a long time. Questions to people using deeplyR+spark: 1) Is it possible that this memory =false is ignored when reading through jdbc? 2) can someone confirm that there is a lot of automatic caching happening (and hence a lot of counts and a lot of actions)? Thanks for input! - To unsubscribe e-mail: user-unsubscr...@spark.apache.org