bucket joins on multiple data frames.

2021-09-08 Thread Adrian Stern
Sorry if this has been answered, but I had a question about bucketed joins
that I can't seem to find the answer to online.


   - I have a bunch of pyspark data frames (let's call them df1, df2,
   ...df10). I need to join them all together using the same key.
   - joined = df1.join(df2, "key", "full")
  - joined = joined.join(df3, "key", "full")
  - joined = joined.join(df4, "key", "full")
  - ...
   - I saw bucketed joins can help in this situation, but when I try to do
   it, I only get a bucket edjoin on the first join, and then I have to
   re-create a bucket table of joined results after each join otherwise I
   don't get a bucket join. This process of re-creating the joined table only
   slows the join down and I don't see any performance gain.
   - Doesn't work: (pseudo code)
 - df1.write-bucketed() ; t1 = spark.table("df1")
 - df2.write-bucketed() ; t2 = spark.table("df2")
 - df3.write-bucketed() ; t3 = spark.table("df3")
 - joined = t1.join(t2, "key", "full")
 - joined = joined.join(t3, "key", "full")
  - Works but is slow:  (pseudo code)
 - df1.write-bucketed() ; t1 = spark.table("df1")
 - df2.write-bucketed() ; t2 = spark.table("df2")
 - df3.write-bucketed() ; t3 = spark.table("df3")
 - joined = t1.join(t2, "key", "full")
 - joined.write-bucketed() ; joined = spark.table("joined")
 - joined = joined.join(t3, "key", "full")


I'm wondering if there is a way to get performance gains here, either by
using bucketing or some other way.
Also courions if this isn't what bucket joins are for, what are they
actually for.

Thanks
Adrian


Unsubscribe

2021-09-08 Thread Yuri Oleynikov (‫יורי אולייניקוב‬‎)
Unsubscribe 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Is BindingParquetOutputCommitter still used?

2021-09-08 Thread Vladimir Prus
Hi,

per https://spark.apache.org/docs/latest/cloud-integration.html, when using
S3 storage one is advised to set these options:

spark.sql.sources.commitProtocolClass
> org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
> spark.sql.parquet.output.committer.class
> org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter


However, looking at code and trying simple tests suggests that
BindingParquetOutputCommitter is not used at all. Specifically, I used this
code

  import org.apache.log4j.{Level, Logger}

  Logger.getLogger("org.apache.spark.internal.io.cloud").setLevel(Level.TRACE)
  
Logger.getLogger("org.apache.hadoop.mapreduce.lib.output").setLevel(Level.DEBUG)

  val spark = SparkSession.builder().master("local[*]")
.config("spark.sql.sources.outputCommitterClass",
"org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter")
.config("spark.sql.parquet.output.committer.class",
"org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter")
.config("spark.sql.sources.commitProtocolClass",
"org.apache.spark.internal.io.cloud.PathOutputCommitProtocol")
.config("fs.s3a.committer.magic.enabled", "true")
.config("fs.s3.committer.magic.enabled", "true")
.config("spark.hadoop.fs.s3a.committer.name", "magic")
.config("spark.hadoop.fs.s3.committer.name", "magic")
.getOrCreate()
  import spark.implicits._
  val df = Seq("foo", "bar").toDF("s")

  df.write.mode("overwrite").parquet("s3:///2021-09-07-parquet")

I observe that magic committer is used, and I get trace log message from
PathOutputCommitProtocol, but not from BindingParquetOutputCommitter.
If I remove configuration options that set BindingParquetOutputCommitter, I
still see magic committer used.
The spark.sql.parquet.output.committer.class option is only used in
ParquetFileFormat, where it is copied to
spark.sql.sources.outputCommitterClass,
and that option, in turn, is only used by SQLHadoopMapReduceCommitProtocol
- which we don't use here.

So, it sounds like setting parquet.output.committer.class to
org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter is no
longer necessary?
Or is there some code path where it matters?


-- 
Vladimir Prus
http://vladimirprus.com


Fwd: issue in Apache Spark install

2021-09-08 Thread Mukhtar Ali
Dear

Learning member of  of https://learning.oreilly.com
some problem in install Apache Spark
I try both CMD and Jupyter file
same issue* Exception: Java gateway process exited before sending its port
number*
please resolve this issue
find the attachment in Jupyter


In CMD
C:\Users\User>pyspark
Python 3.8.8 (default, Apr 13 2021, 15:08:03) [MSC v.1916 64 bit (AMD64)]
:: Anaconda, Inc. on win32

Warning:
This Python interpreter is in a conda environment, but the environment has
not been activated.  Libraries may fail to load.  To activate this
environment
please see https://conda.io/activation

Type "help", "copyright", "credits" or "license" for more information.
Exception in thread "main" java.lang.ExceptionInInitializerError
at
org.apache.spark.unsafe.array.ByteArrayMethods.(ByteArrayMethods.java:54)
at
org.apache.spark.internal.config.package$.(package.scala:1095)
at org.apache.spark.internal.config.package$.(package.scala)
at
org.apache.spark.deploy.SparkSubmitArguments.$anonfun$loadEnvironmentArguments$3(SparkSubmitArguments.scala:157)
at scala.Option.orElse(Option.scala:447)
at
org.apache.spark.deploy.SparkSubmitArguments.loadEnvironmentArguments(SparkSubmitArguments.scala:157)
at
org.apache.spark.deploy.SparkSubmitArguments.(SparkSubmitArguments.scala:115)
at
org.apache.spark.deploy.SparkSubmit$$anon$2$$anon$3.(SparkSubmit.scala:1022)
at
org.apache.spark.deploy.SparkSubmit$$anon$2.parseArguments(SparkSubmit.scala:1022)
at
org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:85)
at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make
private java.nio.DirectByteBuffer(long,int) accessible: module java.base
does not "opens java.nio" to unnamed module @71e9ddb4
at
java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:357)
at
java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297)
at
java.base/java.lang.reflect.Constructor.checkCanSetAccessible(Constructor.java:188)
at
java.base/java.lang.reflect.Constructor.setAccessible(Constructor.java:181)
at org.apache.spark.unsafe.Platform.(Platform.java:56)
... 13 more
Traceback (most recent call last):
  File "C:\Spark\spark-3.1.2-bin-hadoop2.7\python\pyspark\shell.py", line
35, in 
SparkContext._ensure_initialized()  # type: ignore
  File "C:\Spark\spark-3.1.2-bin-hadoop2.7\python\pyspark\context.py", line
331, in _ensure_initialized
SparkContext._gateway = gateway or launch_gateway(conf)
  File "C:\Spark\spark-3.1.2-bin-hadoop2.7\python\pyspark\java_gateway.py",
line 108, in launch_gateway
raise Exception("Java gateway process exited before sending its port
number")
Exception: Java gateway process exited before sending its port number


SparkTest - Jupyter Notebook.pdf
Description: Adobe PDF document

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org