Re: How to Spawn Child Thread or Sub-jobs in a Spark Session

2020-12-04 Thread Raghavendra Ganesh
There should not be any need to explicitly make DF-2, DF-3 computation
parallel. Spark generates execution plans and it can decide what can run in
parallel (ideally you should see them running parallel in spark UI).

You need to cache DF-1 if possible (either in memory/disk), otherwise
computation of DF-2 and DF-3 might trigger the DF-1 computation in
duplicate.

--
Raghavendra


On Sat, Dec 5, 2020 at 12:31 AM Artemis User  wrote:

> We have a Spark job that produces a result data frame, say DF-1 at the
> end of the pipeline (i.e. Proc-1).  From DF-1, we need to create two or
> more dataf rames, say DF-2 and DF-3 via additional SQL or ML processes,
> i.e. Proc-2 and Proc-3.  Ideally, we would like to perform Proc-2 and
> Proc-3 in parallel, since Proc-2 and Proc-3 can be executed
> independently, with DF-1 made immutable and DF-2 and DF-3 are
> mutual-exclusive.
>
> Does Spark has some built-in APIs to support spawning sub-jobs in a
> single session?  If multi-threading is needed, what are the common best
> practices in this case?
>
> Thanks in advance for your help!
>
> -- ND
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Typed datataset from Avro generated classes?

2020-12-04 Thread Nads
Same problem here.  A google search shows a few related jira tickets in
"Resolved" state but I am getting the same error in Spark 3.0.1.  I'm
pasting my `spark-shell` output below:

scala> import org.apache.spark.sql.Encoders
import org.apache.spark.sql.Encoders

scala> val linkageBean = Encoders.bean(classOf[MyAvroGeneratedClass])
java.lang.UnsupportedOperationException: Cannot have circular references in
bean class, but got the circular reference of class class
org.apache.avro.Schema
  at
org.apache.spark.sql.catalyst.JavaTypeInference$.inferDataType(JavaTypeInference.scala:142)
  at
org.apache.spark.sql.catalyst.JavaTypeInference$.$anonfun$inferDataType$1(JavaTypeInference.scala:150)
  at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
  at
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
  at
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
  at scala.collection.TraversableLike.map(TraversableLike.scala:238)
  at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
  at
org.apache.spark.sql.catalyst.JavaTypeInference$.inferDataType(JavaTypeInference.scala:148)
  at
org.apache.spark.sql.catalyst.JavaTypeInference$.$anonfun$inferDataType$1(JavaTypeInference.scala:150)
  at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
  at
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
  at
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
  at scala.collection.TraversableLike.map(TraversableLike.scala:238)
  at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
  at
org.apache.spark.sql.catalyst.JavaTypeInference$.inferDataType(JavaTypeInference.scala:148)
  at
org.apache.spark.sql.catalyst.JavaTypeInference$.inferDataType(JavaTypeInference.scala:126)
  at
org.apache.spark.sql.catalyst.JavaTypeInference$.$anonfun$inferDataType$1(JavaTypeInference.scala:150)
  at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
  at
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
  at
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
  at scala.collection.TraversableLike.map(TraversableLike.scala:238)
  at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
  at
org.apache.spark.sql.catalyst.JavaTypeInference$.inferDataType(JavaTypeInference.scala:148)
  at
org.apache.spark.sql.catalyst.JavaTypeInference$.inferDataType(JavaTypeInference.scala:67)
  at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.javaBean(ExpressionEncoder.scala:68)
  at org.apache.spark.sql.Encoders$.bean(Encoders.scala:154)
  ... 49 elided




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: Spark UI Storage Memory

2020-12-04 Thread Jack Yang
unsubsribe


Re: Spark UI Storage Memory

2020-12-04 Thread Amit Sharma
Is there any memory leak in spark 2.3.3 version as mentioned in below Jira.
https://issues.apache.org/jira/browse/SPARK-29055.

Please let me know how to solve it.

Thanks
Amit

On Fri, Dec 4, 2020 at 1:55 PM Amit Sharma  wrote:

> Can someone help me on this please.
>
>
> Thanks
> Amit
>
> On Wed, Dec 2, 2020 at 11:52 AM Amit Sharma  wrote:
>
>> Hi , I have a spark streaming job. When I am checking the Excetors tab ,
>> there is a Storage Memory column. It displays used memory  /total memory.
>> What is used memory. Is it memory in  use or memory used so far. How would
>> I know how much memory is unused at 1 point of time.
>>
>>
>> Thanks
>> Amit
>>
>


How to Spawn Child Thread or Sub-jobs in a Spark Session

2020-12-04 Thread Artemis User
We have a Spark job that produces a result data frame, say DF-1 at the 
end of the pipeline (i.e. Proc-1).  From DF-1, we need to create two or 
more dataf rames, say DF-2 and DF-3 via additional SQL or ML processes, 
i.e. Proc-2 and Proc-3.  Ideally, we would like to perform Proc-2 and 
Proc-3 in parallel, since Proc-2 and Proc-3 can be executed 
independently, with DF-1 made immutable and DF-2 and DF-3 are 
mutual-exclusive.


Does Spark has some built-in APIs to support spawning sub-jobs in a 
single session?  If multi-threading is needed, what are the common best 
practices in this case?


Thanks in advance for your help!

-- ND


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark UI Storage Memory

2020-12-04 Thread Amit Sharma
Can someone help me on this please.


Thanks
Amit

On Wed, Dec 2, 2020 at 11:52 AM Amit Sharma  wrote:

> Hi , I have a spark streaming job. When I am checking the Excetors tab ,
> there is a Storage Memory column. It displays used memory  /total memory.
> What is used memory. Is it memory in  use or memory used so far. How would
> I know how much memory is unused at 1 point of time.
>
>
> Thanks
> Amit
>


Spark thrift server ldap

2020-12-04 Thread mickymiek
Hi everyone

We're using the spark thrift server with spark 3.0.1.

We're using it to query hive with jdbc queries using ldap authentication,
and it seems that the LdapAuthenticationProviderImpl.java provided by spark
thrift server is way outdated
(https://github.com/apache/spark/blob/v3.0.1/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/LdapAuthenticationProviderImpl.java)
we traced it back to the 1.2 release of hive
(https://github.com/apache/hive/blob/branch-1.2/service/src/java/org/apache/hive/service/auth/LdapAuthenticationProviderImpl.java)

It causes some issues because it lacks some features, for instance because
of the version of hive the thrift server provides, we can't use options like
"hive.server2.authentication.ldap.guidKey", which is really needed in our
case.

Is there a good reason the hive part of thrift server isn't up to date?
If not, we would be happy to contribute on this subject.

Thanks a lot




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Broadcast size increases with subsequent iterations

2020-12-04 Thread Kalin Stoyanov
Hi all,

I have an iterative algorithm in spark that uses each iteration as the
input for the following one, but the size of the data does not change. I am
using localCheckpoint to cut the data's lineage (and also facilitate some
computations that reuse df-s). However, this runs slower and slower as time
goes on, and when I looked at the logs it turned out each job is
broadcasting larger and larger amounts of data. I can't figure out why this
is happening or how to stop it - with the actual size remaining constant
the only thing I can imagine increasing is the lineage data, but that is
cut by the checkpoint...

Here's an abridged version of the script:
#in class 1
while (self.t < self.ttarget):
newSnapshot, timePassed = self.integrator.advance(self.cluster) #calls
the below function
self.cluster = newSnapshot
self.t += timePassed

if self.dt_out and self.next_out <= self.t:
self.snapshot() # this saves the dataframe to disk - Job #3 - it
does 1 broadcast
self.next_out += self.dt_out

#in class 2 - integrator
def advance(self, df_clust):
df_clust = df_clust.localCheckpoint().repartition(self.nparts, "id") #
Job #1 - does one broadcast
df_F = self.calc_F(df_clust).localCheckpoint() # Job #2 - does two
broadcasts
df_v, df_r = self.step_v(df_clust, df_F), self.step_r(
df_clust, df_F)

df_clust = df_r.join(df_v, "id")

return (df_clust, self.dt)

When I checked the logs, as expected they fall into a repeating pattern of
the 3 jobs (I'm saving to disk on every iteration so it's simpler), that
look identical for every iteration. However, the size of ALL broadcasts is
increasing over time - for example

[image: broadcast_1.png]
[image: broadcast_2.png]
I'd really appreciate any insight into what's causing this..

Regards,
Kalin


Re: In windows 10, accessing Hive from PySpark with PyCharm throws error

2020-12-04 Thread Mich Talebzadeh
OK with PyCharm itself, i am getting this error

pyspark.sql.utils.AnalysisException: java.lang.RuntimeException: Error
while running command to get file permissions : java.io.IOException: (null)
entry in command string: null ls -F
C:\Users\admin\PycharmProjects\pythonProject\hive-scratchdir

I gather null ls is because it cannot find winutil.exe?


Now if i run the command manually with winutils.exe



D:\temp\spark\bin\winutils.exe ls -F
C:\Users\admin\PycharmProjects\pythonProject\hive-scratchdir

drwxrwxrwx|1|w7\admin|w7\None|0|Nov|29|2020|C:\Users\admin\PycharmProjects\pythonProject\hive-scratchdir

it works

thanks

*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 4 Dec 2020 at 04:50, Artemis User  wrote:

> You don't have to include all your config and log messages.  The error
> message would suffice.  The java.lang.UnsatisfiedLinkError exception
> indicates that the JVM can't find some OS-specific libraries (or commonly
> referred as native libraries).  On Windows, they would be some dll files.
> Look into your Hadoop installation and you will find the
> $HADOOPHOME/lib/native directory.  All the OS-specific library files are
> there (on Windows, this lib path may be different).  So add this path to
> your PATH environmental variable in your command shell before running
> spark-submit again.
>
> -- ND
> On 12/3/20 6:28 PM, Mich Talebzadeh wrote:
>
> This is becoming serious pain.
>
> using powershell I am using spark-submit as follows:
>
> PS C:\Users\admin> spark-submit.cmd
> C:\Users\admin\PycharmProjects\pythonProject\main.py
>
> WARNING: An illegal reflective access operation has occurred
>
> WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (
> file:/D:/temp/spark/jars/spark-unsafe_2.12-3.0.1.jar) to constructor
> java.nio.DirectByteBuffer(long,int)
>
> WARNING: Please consider reporting this to the maintainers of
> org.apache.spark.unsafe.Platform
>
> WARNING: Use --illegal-access=warn to enable warnings of further illegal
> reflective access operations
>
> WARNING: All illegal access operations will be denied in a future release
>
> Using Spark's default log4j profile:
> org/apache/spark/log4j-defaults.properties
>
> 20/12/03 23:13:59 INFO SparkContext: Running Spark version 3.0.1
>
> 20/12/03 23:13:59 INFO ResourceUtils:
> ==
>
> 20/12/03 23:13:59 INFO ResourceUtils: Resources for spark.driver:
>
>
> 20/12/03 23:13:59 INFO ResourceUtils:
> ==
>
> 20/12/03 23:13:59 INFO SparkContext: Submitted application: App1
>
> 20/12/03 23:13:59 INFO SecurityManager: Changing view acls to: admin
>
> 20/12/03 23:13:59 INFO SecurityManager: Changing modify acls to: admin
>
> 20/12/03 23:13:59 INFO SecurityManager: Changing view acls groups to:
>
> 20/12/03 23:13:59 INFO SecurityManager: Changing modify acls groups to:
>
> 20/12/03 23:13:59 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users  with view permissions: Set(admin);
> groups with view permissions: Set(); users  with modify permissions:
> Set(admin); groups with modify permissions: Set()
>
> 20/12/03 23:14:00 INFO Utils: Successfully started service 'sparkDriver'
> on port 62327.
>
> 20/12/03 23:14:00 INFO SparkEnv: Registering MapOutputTracker
>
> 20/12/03 23:14:00 INFO SparkEnv: Registering BlockManagerMaster
>
> 20/12/03 23:14:01 INFO BlockManagerMasterEndpoint: Using
> org.apache.spark.storage.DefaultTopologyMapper for getting topology
> information
>
> 20/12/03 23:14:01 INFO BlockManagerMasterEndpoint:
> BlockManagerMasterEndpoint up
>
> 20/12/03 23:14:01 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
>
> 20/12/03 23:14:01 INFO DiskBlockManager: Created local directory at
> C:\Users\admin\AppData\Local\Temp\blockmgr-30e2019a-af60-44da-86e7-8a162d1e29da
>
> 20/12/03 23:14:01 INFO MemoryStore: MemoryStore started with capacity
> 434.4 MiB
>
> 20/12/03 23:14:01 INFO SparkEnv: Registering OutputCommitCoordinator
>
> 20/12/03 23:14:01 INFO Utils: Successfully started service 'SparkUI' on
> port 4040.
>
> 20/12/03 23:14:01 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at
> http://w7:4040
>
> 20/12/03 23:14:01 INFO Executor: Starting executor ID driver on host w7
>
> 20/12/03 23:14:01 INFO Utils: Successfully started service
> 'org.apache.spark.network.netty.NettyBlockTransferService' on port 62373.
>
> 20/12/03 23:14:01 INFO NettyBlockTransferService: Server created on
> w7:62373
>
> 20/12/03 23:14:01 INFO BlockManager: Using
> org.apache.spark.storage.RandomBlockReplicationPolicy for block replication
> policy
>
> 20/12/03 23:14:01 INFO BlockManagerMaster: Registering