Re: A Persisted Spark DataFrame is computed twice

2022-01-30 Thread Benjamin Du
I don't think coalesce (by repartitioning I assume you mean coalesce) itself 
and deserialising takes that much time. To add a little bit more context, the 
computation of the DataFrame is CPU intensive instead of data/IO intensive. I 
purposely keep coalesce​ after df.count​ as I want to keep the large number of 
partitions (30k) when computing the DataFrame so that I can get a much higher 
parallelism. After the computation, I reduce the number of partitions (to avoid 
having too many small files on HDFS). It typically takes about 5 hours to 
compute the DataFrame (when 30k partitions is used) and write it to disk 
(without doing repartitioning or coalesce). If I manually write the computed 
DataFrame to disk, read it back, coalesce it and then write it back to disk, it 
also takes about 5 hours. The code that I pasted in this thread takes forever 
to run as the DataFrame is obviously recomputed at df.coalesce​ and with a 
parallelism of 300 partitions, it is almost impossible to compute the DataFrame 
in a reasonable amount of time.

I tried various ways but none of them worked except manually write to disk, 
read it back, repartition/coalesce it, and then write it back to HDFS.

  1.  checkpoint by itself computer the DataFrame twice. (This is a known 
existing bug of checkpoint).

output_mod = f"{output}/job={mod}"
spark.read.parquet("/input/hdfs/path") \
.filter(col("n0") == n0) \
.filter(col("n1") == n1) \
.filter(col("h1") == h1) \
.filter(col("j1").isin(j1)) \
.filter(col("j0") == j0) \
.filter(col("h0").isin(h0)) \
.filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
.withColumn("test", test_score_r4(col("id0"), col("id1"))) \
.checkpoint() \
.coalesce(300) \
.write.mode("overwrite").parquet(output_mod)


  1.  persist (to Disk) + count computer the DataFrame twice.

output_mod = f"{output}/job={mod}"
df = spark.read.parquet("/input/hdfs/path") \
.filter(col("n0") == n0) \
.filter(col("n1") == n1) \
.filter(col("h1") == h1) \
.filter(col("j1").isin(j1)) \
.filter(col("j0") == j0) \
.filter(col("h0").isin(h0)) \
.filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
.withColumn("test", test_score_r4(col("id0"), col("id1"))) \
.persist(StorageLevel.DISK_ONLY)
df.count()
df.coalesce(300).write.mode("overwrite").parquet(output_mod)


  1.  persist to memory + count computes the DataFrame twice

output_mod = f"{output}/job={mod}"
df = spark.read.parquet("/input/hdfs/path") \
.filter(col("n0") == n0) \
.filter(col("n1") == n1) \
.filter(col("h1") == h1) \
.filter(col("j1").isin(j1)) \
.filter(col("j0") == j0) \
.filter(col("h0").isin(h0)) \
.filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
.withColumn("test", test_score_r4(col("id0"), col("id1"))) \
.persist(StorageLevel.MEMORY_ONLY)
df.count()
df.coalesce(300).write.mode("overwrite").parquet(output_mod)


  1.  persist (to memory) + checkpoint + coalesce computes the DataFrame twice

output_mod = f"{output}/job={mod}"
df = spark.read.parquet("/input/hdfs/path") \
.filter(col("n0") == n0) \
.filter(col("n1") == n1) \
.filter(col("h1") == h1) \
.filter(col("j1").isin(j1)) \
.filter(col("j0") == j0) \
.filter(col("h0").isin(h0)) \
.filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
.withColumn("test", test_score_r4(col("id0"), col("id1"))) \
.persist(StorageLevel.MEMORY_ONLY) \
.checkpoint() \
.coalesce(300).write.mode("overwrite").parquet(output_mod)


  1.  persist (to memory) + checkpoint + without coalesce computes the 
DataFrame twice

output_mod = f"{output}/job={mod}"
df = spark.read.parquet("/input/hdfs/path") \
.filter(col("n0") == n0) \
.filter(col("n1") == n1) \
.filter(col("h1") == h1) \
.filter(col("j1").isin(j1)) \
.filter(col("j0") == j0) \
.filter(col("h0").isin(h0)) \
.filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
.withColumn("test", test_score_r4(col("id0"), col("id1"))) \
.persist(StorageLevel.MEMORY_ONLY) \
.checkpoint() \
.write.mode("overwrite").parquet(output_mod)


  1.  cache (equivalent to persist to MEMORY_AND_DISK) + count + coalesce 
computes it twice

output_mod = f"{output}/job={mod}"
df = spark.read.parquet("/input/hdfs/path") \
.filter(col("n0") == n0) \
.filter(col("n1") == n1) \
.filter(col("h1") == h1) \
.filter(col("j1").isin(j1)) \
.filter(col("j0") == j0) \
.filter(col("h0").isin(h0)) \
.filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
.withColumn("test", test_score_r4(col("id0"), col("id1"))) \
.cache()
df.count()

Re: [ANNOUNCE] Apache Kyuubi (Incubating) released 1.4.1-incubating

2022-01-30 Thread Vino Yang
Hi,

As you can found the description from the website[1] of Apache Kyuubi
(incubating):

"Kyuubi provides a pure SQL gateway through Thrift JDBC/ODBC interface
for end-users to manipulate large-scale data with pre-programmed and
extensible Spark SQL engines."

[1]: https://kyuubi.apache.org/

Best,
Vino

Bitfox  于2022年1月31日周一 14:49写道:
>
> What’s the difference between Spark and Kyuubi?
>
> Thanks
>
> On Mon, Jan 31, 2022 at 2:45 PM Vino Yang  wrote:
>>
>> Hi all,
>>
>> The Apache Kyuubi (Incubating) community is pleased to announce that
>> Apache Kyuubi (Incubating) 1.4.1-incubating has been released!
>>
>> Apache Kyuubi (Incubating) is a distributed multi-tenant JDBC server for
>> large-scale data processing and analytics, built on top of Apache Spark
>> and designed to support more engines (i.e. Apache Flink).
>>
>> Kyuubi provides a pure SQL gateway through Thrift JDBC/ODBC interface
>> for end-users to manipulate large-scale data with pre-programmed and
>> extensible Spark SQL engines.
>>
>> We are aiming to make Kyuubi an "out-of-the-box" tool for data warehouses
>> and data lakes.
>>
>> This "out-of-the-box" model minimizes the barriers and costs for end-users
>> to use Spark at the client side.
>>
>> At the server-side, Kyuubi server and engine's multi-tenant architecture
>> provides the administrators a way to achieve computing resource isolation,
>> data security, high availability, high client concurrency, etc.
>>
>> The full release notes and download links are available at:
>> Release Notes: https://kyuubi.apache.org/release/1.4.1-incubating.html
>>
>> To learn more about Apache Kyuubi (Incubating), please see
>> https://kyuubi.apache.org/
>>
>> Kyuubi Resources:
>> - Issue: https://github.com/apache/incubator-kyuubi/issues
>> - Mailing list: d...@kyuubi.apache.org
>>
>> We would like to thank all contributors of the Kyuubi community and 
>> Incubating
>> community who made this release possible!
>>
>> Thanks,
>> On behalf of Apache Kyuubi (Incubating) community

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [ANNOUNCE] Apache Kyuubi (Incubating) released 1.4.1-incubating

2022-01-30 Thread Bitfox
What’s the difference between Spark and Kyuubi?

Thanks

On Mon, Jan 31, 2022 at 2:45 PM Vino Yang  wrote:

> Hi all,
>
> The Apache Kyuubi (Incubating) community is pleased to announce that
> Apache Kyuubi (Incubating) 1.4.1-incubating has been released!
>
> Apache Kyuubi (Incubating) is a distributed multi-tenant JDBC server for
> large-scale data processing and analytics, built on top of Apache Spark
> and designed to support more engines (i.e. Apache Flink).
>
> Kyuubi provides a pure SQL gateway through Thrift JDBC/ODBC interface
> for end-users to manipulate large-scale data with pre-programmed and
> extensible Spark SQL engines.
>
> We are aiming to make Kyuubi an "out-of-the-box" tool for data warehouses
> and data lakes.
>
> This "out-of-the-box" model minimizes the barriers and costs for end-users
> to use Spark at the client side.
>
> At the server-side, Kyuubi server and engine's multi-tenant architecture
> provides the administrators a way to achieve computing resource isolation,
> data security, high availability, high client concurrency, etc.
>
> The full release notes and download links are available at:
> Release Notes: https://kyuubi.apache.org/release/1.4.1-incubating.html
>
> To learn more about Apache Kyuubi (Incubating), please see
> https://kyuubi.apache.org/
>
> Kyuubi Resources:
> - Issue: https://github.com/apache/incubator-kyuubi/issues
> - Mailing list: d...@kyuubi.apache.org
>
> We would like to thank all contributors of the Kyuubi community and
> Incubating
> community who made this release possible!
>
> Thanks,
> On behalf of Apache Kyuubi (Incubating) community
>


[ANNOUNCE] Apache Kyuubi (Incubating) released 1.4.1-incubating

2022-01-30 Thread Vino Yang
Hi all,

The Apache Kyuubi (Incubating) community is pleased to announce that
Apache Kyuubi (Incubating) 1.4.1-incubating has been released!

Apache Kyuubi (Incubating) is a distributed multi-tenant JDBC server for
large-scale data processing and analytics, built on top of Apache Spark
and designed to support more engines (i.e. Apache Flink).

Kyuubi provides a pure SQL gateway through Thrift JDBC/ODBC interface
for end-users to manipulate large-scale data with pre-programmed and
extensible Spark SQL engines.

We are aiming to make Kyuubi an "out-of-the-box" tool for data warehouses
and data lakes.

This "out-of-the-box" model minimizes the barriers and costs for end-users
to use Spark at the client side.

At the server-side, Kyuubi server and engine's multi-tenant architecture
provides the administrators a way to achieve computing resource isolation,
data security, high availability, high client concurrency, etc.

The full release notes and download links are available at:
Release Notes: https://kyuubi.apache.org/release/1.4.1-incubating.html

To learn more about Apache Kyuubi (Incubating), please see
https://kyuubi.apache.org/

Kyuubi Resources:
- Issue: https://github.com/apache/incubator-kyuubi/issues
- Mailing list: d...@kyuubi.apache.org

We would like to thank all contributors of the Kyuubi community and
Incubating
community who made this release possible!

Thanks,
On behalf of Apache Kyuubi (Incubating) community


Unsubscribe

2022-01-30 Thread Yogitha Ramanathan



Re: Migration to Spark 3.2

2022-01-30 Thread Aurélien Mazoyer
Hi Stephen,

Thank you for your answer. Yes, I changed the scope to "provided" but got
the same error :-( FYI. I am getting this error while running tests.

Regards,

Aurelien

Le jeu. 27 janv. 2022 à 23:57, Stephen Coy  a écrit :

> Hi Aurélien,
>
> Your Jackson versions look fine.
>
> What happens if you change the scope of your Jackson dependencies to
> “provided”?
>
> This should result in your application using the versions provided by
> Spark and avoid this potential collision.
>
> Cheers,
>
> Steve C
>
> On 27 Jan 2022, at 9:48 pm, Aurélien Mazoyer 
> wrote:
>
> Hi Stephen,
>
> Thank you for your answer!
> Here it is, it seems that jackson dependencies are correct, no? :
>
> Thanks,
>
> [INFO] com.krrier:spark-lib-full:jar:0.0.1-SNAPSHOT
> [INFO] +- com.krrier:backend:jar:0.0.1-SNAPSHOT:compile
> [INFO] |  \- com.krrier:data:jar:0.0.1-SNAPSHOT:compile
> [INFO] +- com.krrier:plugin-api:jar:0.0.1-SNAPSHOT:compile
> [INFO] +- com.opencsv:opencsv:jar:4.2:compile
> [INFO] |  +- org.apache.commons:commons-lang3:jar:3.9:compile
> [INFO] |  +- org.apache.commons:commons-text:jar:1.3:compile
> [INFO] |  +- commons-beanutils:commons-beanutils:jar:1.9.3:compile
> [INFO] |  |  \- commons-logging:commons-logging:jar:1.2:compile
> [INFO] |  \- org.apache.commons:commons-collections4:jar:4.1:compile
> [INFO] +- org.apache.solr:solr-solrj:jar:7.4.0:compile
> [INFO] |  +- org.apache.commons:commons-math3:jar:3.6.1:compile
> [INFO] |  +- org.apache.httpcomponents:httpclient:jar:4.5.3:compile
> [INFO] |  +- org.apache.httpcomponents:httpcore:jar:4.4.6:compile
> [INFO] |  +- org.apache.httpcomponents:httpmime:jar:4.5.3:compile
> [INFO] |  +- org.apache.zookeeper:zookeeper:jar:3.4.11:compile
> [INFO] |  +- org.codehaus.woodstox:stax2-api:jar:3.1.4:compile
> [INFO] |  +- org.codehaus.woodstox:woodstox-core-asl:jar:4.4.1:compile
> [INFO] |  \- org.noggit:noggit:jar:0.8:compile
> [INFO] +- com.databricks:spark-xml_2.12:jar:0.5.0:compile
> [INFO] +- org.apache.tika:tika-parsers:jar:1.24:compile
> [INFO] |  +- org.apache.tika:tika-core:jar:1.24:compile
> [INFO] |  +- org.glassfish.jaxb:jaxb-runtime:jar:2.3.2:compile
> [INFO] |  |  +- jakarta.xml.bind:jakarta.xml.bind-api:jar:2.3.2:compile
> [INFO] |  |  +- org.glassfish.jaxb:txw2:jar:2.3.2:compile
> [INFO] |  |  +- com.sun.istack:istack-commons-runtime:jar:3.0.8:compile
> [INFO] |  |  +- org.jvnet.staxex:stax-ex:jar:1.8.1:compile
> [INFO] |  |  \- com.sun.xml.fastinfoset:FastInfoset:jar:1.2.16:compile
> [INFO] |  +- com.sun.activation:jakarta.activation:jar:1.2.1:compile
> [INFO] |  +- xerces:xercesImpl:jar:2.12.0:compile
> [INFO] |  |  \- xml-apis:xml-apis:jar:1.4.01:compile
> [INFO] |  +- javax.annotation:javax.annotation-api:jar:1.3.2:compile
> [INFO] |  +- org.gagravarr:vorbis-java-tika:jar:0.8:compile
> [INFO] |  +- org.tallison:jmatio:jar:1.5:compile
> [INFO] |  +- org.apache.james:apache-mime4j-core:jar:0.8.3:compile
> [INFO] |  +- org.apache.james:apache-mime4j-dom:jar:0.8.3:compile
> [INFO] |  +- org.tukaani:xz:jar:1.8:compile
> [INFO] |  +- com.epam:parso:jar:2.0.11:compile
> [INFO] |  +- org.brotli:dec:jar:0.1.2:compile
> [INFO] |  +- commons-codec:commons-codec:jar:1.13:compile
> [INFO] |  +- org.apache.pdfbox:pdfbox:jar:2.0.19:compile
> [INFO] |  |  \- org.apache.pdfbox:fontbox:jar:2.0.19:compile
> [INFO] |  +- org.apache.pdfbox:pdfbox-tools:jar:2.0.19:compile
> [INFO] |  +- org.apache.pdfbox:preflight:jar:2.0.19:compile
> [INFO] |  |  \- org.apache.pdfbox:xmpbox:jar:2.0.19:compile
> [INFO] |  +- org.apache.pdfbox:jempbox:jar:1.8.16:compile
> [INFO] |  +- org.bouncycastle:bcmail-jdk15on:jar:1.64:compile
> [INFO] |  |  \- org.bouncycastle:bcpkix-jdk15on:jar:1.64:compile
> [INFO] |  +- org.bouncycastle:bcprov-jdk15on:jar:1.64:compile
> [INFO] |  +- org.apache.poi:poi:jar:4.1.2:compile
> [INFO] |  |  \- com.zaxxer:SparseBitSet:jar:1.2:compile
> [INFO] |  +- org.apache.poi:poi-scratchpad:jar:4.1.2:compile
> [INFO] |  +- com.healthmarketscience.jackcess:jackcess:jar:3.0.1:compile
> [INFO] |  +-
> com.healthmarketscience.jackcess:jackcess-encrypt:jar:3.0.0:compile
> [INFO] |  +- org.ccil.cowan.tagsoup:tagsoup:jar:1.2.1:compile
> [INFO] |  +- org.ow2.asm:asm:jar:7.3.1:compile
> [INFO] |  +- com.googlecode.mp4parser:isoparser:jar:1.1.22:compile
> [INFO] |  +- org.tallison:metadata-extractor:jar:2.13.0:compile
> [INFO] |  |  \- org.tallison.xmp:xmpcore-shaded:jar:6.1.10:compile
> [INFO] |  | \- com.adobe.xmp:xmpcore:jar:6.1.10:compile
> [INFO] |  +- de.l3s.boilerpipe:boilerpipe:jar:1.1.0:compile
> [INFO] |  +- com.rometools:rome:jar:1.12.2:compile
> [INFO] |  |  \- com.rometools:rome-utils:jar:1.12.2:compile
> [INFO] |  +- org.gagravarr:vorbis-java-core:jar:0.8:compile
> [INFO] |  +-
> com.googlecode.juniversalchardet:juniversalchardet:jar:1.0.3:compile
> [INFO] |  +- org.codelibs:jhighlight:jar:1.0.3:compile
> [INFO] |  +- com.pff:java-libpst:jar:0.9.3:compile
> [INFO] |  +- com.github.junrar:junrar:jar:4.0.0:compile
> [INFO] |  +- 

RE: why the pyspark RDD API is so slow?

2022-01-30 Thread Theodore J Griesenbrock
Any particular code sample you can suggest to review on your tips?

> On Jan 30, 2022, at 06:16, Sebastian Piu  wrote:
> 
> 
> This Message Is From an External Sender
> This message came from outside your organization.
> It's because all data needs to be pickled back and forth between java and a 
> spun python worker, so there is additional overhead than if you stay fully in 
> scala. 
> 
> Your python code might make this worse too, for example if not yielding from 
> operations
> 
> You can look at using UDFs and arrow or trying to stay as much as possible on 
> datagrams operations only
> 
>> On Sun, 30 Jan 2022, 10:11 Bitfox,  wrote:
>> Hello list,
>> 
>> I did a comparison for pyspark RDD, scala RDD, pyspark dataframe and a pure 
>> scala program. The result shows the pyspark RDD is too slow.
>> 
>> For the operations and dataset please see:
>> https://blog.cloudcache.net/computing-performance-comparison-for-words-statistics/
>> 
>> The result table is below.
>> Can you give suggestions on how to optimize the RDD operation?
>> 
>> Thanks a lot.
>> 
>> 
>> program  time
>> scala program49s
>> pyspark dataframe56s
>> scala RDD1m31s
>> pyspark RDD  7m15s



Re: how can I remove the warning message

2022-01-30 Thread Sean Owen
This one you can ignore. It's from the JVM so you might be able to disable
it by configuring the right JVM logger as well, but it also tells you right
in the message how to turn it off!

But this is saying that some reflective operations are discouraged in Java
9+. They still work and Spark needs them, but they cause a warning now. You
can however ignore it.

On Sun, Jan 30, 2022 at 2:56 AM Gourav Sengupta 
wrote:

> Hi,
>
> I have often found that logging in the warnings is extremely useful, they
> are just logs, and provide a lot of insights during upgrades, external
> package loading, deprecation, debugging, etc.
>
> Do you have any particular reason to disable the warnings in a submitted
> job?
>
> I used to disable warnings in spark-shell  using the
> Logger.getLogger("akka").setLevel(Level.OFF) in case I have not completely
> forgotten. Other details are mentioned here:
> https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.setLogLevel.html
>
>
>
> Regards,
> Gourav Sengupta
>
> On Fri, Jan 28, 2022 at 11:14 AM  wrote:
>
>> When I submitted the job from scala client, I got the warning messages:
>>
>> WARNING: An illegal reflective access operation has occurred
>> WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform
>> (file:/opt/spark/jars/spark-unsafe_2.12-3.2.0.jar) to constructor
>> java.nio.DirectByteBuffer(long,int)
>> WARNING: Please consider reporting this to the maintainers of
>> org.apache.spark.unsafe.Platform
>> WARNING: Use --illegal-access=warn to enable warnings of further illegal
>> reflective access operations
>> WARNING: All illegal access operations will be denied in a future
>> release
>>
>> How can I just remove those messages?
>>
>> spark: 3.2.0
>> scala: 2.13.7
>>
>> Thank you.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: unsubscribe

2022-01-30 Thread Bitfox
The signature in your mail has showed the info:

To unsubscribe e-mail: user-unsubscr...@spark.apache.org



On Sun, Jan 30, 2022 at 8:50 PM Lucas Schroeder Rossi 
wrote:

> unsubscribe
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


unsubscribe

2022-01-30 Thread Lucas Schroeder Rossi
unsubscribe

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: why the pyspark RDD API is so slow?

2022-01-30 Thread Sebastian Piu
It's because all data needs to be pickled back and forth between java and a
spun python worker, so there is additional overhead than if you stay fully
in scala.

Your python code might make this worse too, for example if not yielding
from operations

You can look at using UDFs and arrow or trying to stay as much as possible
on datagrams operations only

On Sun, 30 Jan 2022, 10:11 Bitfox,  wrote:

> Hello list,
>
> I did a comparison for pyspark RDD, scala RDD, pyspark dataframe and a
> pure scala program. The result shows the pyspark RDD is too slow.
>
> For the operations and dataset please see:
>
> https://blog.cloudcache.net/computing-performance-comparison-for-words-statistics/
>
> The result table is below.
> Can you give suggestions on how to optimize the RDD operation?
>
> Thanks a lot.
>
>
> *program* *time*
> scala program 49s
> pyspark dataframe 56s
> scala RDD 1m31s
> pyspark RDD 7m15s
>


why the pyspark RDD API is so slow?

2022-01-30 Thread Bitfox
Hello list,

I did a comparison for pyspark RDD, scala RDD, pyspark dataframe and a pure
scala program. The result shows the pyspark RDD is too slow.

For the operations and dataset please see:
https://blog.cloudcache.net/computing-performance-comparison-for-words-statistics/

The result table is below.
Can you give suggestions on how to optimize the RDD operation?

Thanks a lot.


*program* *time*
scala program 49s
pyspark dataframe 56s
scala RDD 1m31s
pyspark RDD 7m15s


Re: [Spark UDF]: Where does UDF stores temporary Arrays/Sets

2022-01-30 Thread Gourav Sengupta
Hi,

Can you please try to see if you can increase the number of cores per task,
and therefore give each task more memory per executor?

I do not understand what is the XML, what is the data in it, and what is
the problem that you are trying to solve writing UDF's to parse XML. So
maybe we are not actually solving the problem and just addressing the issue.


Regards,
Gourav Sengupta

On Wed, Jan 26, 2022 at 4:07 PM Sean Owen  wrote:

> Really depends on what your UDF is doing. You could read 2GB of XML into
> much more than that as a DOM representation in memory.
> Remember 15GB of executor memory is shared across tasks.
> You need to get a handle on what memory your code is using to begin with
> to start to reason about whether that's enough, first.
>
> On Wed, Jan 26, 2022 at 10:03 AM Abhimanyu Kumar Singh <
> abhimanyu.kr.sing...@gmail.com> wrote:
>
>> Thanks for your quick response.
>>
>> For some reasons I can't use spark-xml (schema related issue).
>>
>> I've tried reducing number of tasks per executor by increasing the number
>> of executors, but it still throws same error.
>>
>> I can't understand why does even 15gb of executor memory is not
>> sufficient to parse just 2gb XML file.
>> How can I check the max amount of JVM memory utilised for each task?
>>
>> Do I need to tweak some other configurations for increasing JVM memory
>> rather than spark.executor.memory?
>>
>> On Wed, Jan 26, 2022, 9:23 PM Sean Owen  wrote:
>>
>>> Executor memory used shows data that is cached, not the VM usage. You're
>>> running out of memory somewhere, likely in your UDF, which probably parses
>>> massive XML docs as a DOM first or something. Use more memory, fewer tasks
>>> per executor, or consider using spark-xml if you are really just parsing
>>> pieces of it. It'll be more efficient.
>>>
>>> On Wed, Jan 26, 2022 at 9:47 AM Abhimanyu Kumar Singh <
>>> abhimanyu.kr.sing...@gmail.com> wrote:
>>>
 I'm doing some complex operations inside spark UDF (parsing huge XML).

 Dataframe:
 | value |
 | Content of XML File 1 |
 | Content of XML File 2 |
 | Content of XML File N |

 val df = Dataframe.select(UDF_to_parse_xml(value))

 UDF looks something like:

 val XMLelements : Array[MyClass1] = getXMLelements(xmlContent)
 val myResult: Array[MyClass2] = XMLelements.map(myfunction).distinct

 Parsing requires creation and de-duplication of arrays from the XML
 containing
 around 0.1 million elements (consisting of MyClass(Strings, Maps,
 Integers,  )).

 In the Spark UI "executor memory used" is barely 60-70 MB. But still
 Spark processing fails
 with *ExecutorLostFailure *error for XMLs of size around 2GB.
 When I increase the executor size (say 15GB to 25 GB) it works fine.
 One partition can contain only
 one XML file (with max size 2GB) and 1 task/executor runs in parallel.

 *My question is which memory is being used by UDF for storing arrays,
 maps or sets while parsing?*
 *And how can I configure it?*

 Should I increase *spark*.*memory*.*offHeap*.size,
 spark.yarn.executor.memoryOverhead or spark.executor.memoryOverhead?

 Thanks a lot,
 Abhimanyu

 PS: I know I shouldn't use UDF this way, but I don't have any other
 alternative here.










Re: A Persisted Spark DataFrame is computed twice

2022-01-30 Thread Gourav Sengupta
Hi,

without getting into suppositions, the best option is to look into the
SPARK UI SQL section.

It is the most wonderful tool to explain what is happening, and why. In
SPARK 3.x they have made the UI even better, with different set of
granularity and details.

On another note, you might want to read the difference between repartition
and coalesce before making any kind of assumptions.


Regards,
Gourav Sengupta

On Sun, Jan 30, 2022 at 8:52 AM Sebastian Piu 
wrote:

> It's probably the repartitioning and deserialising the df that you are
> seeing take time. Try doing this
>
> 1. Add another count after your current one and compare times
> 2. Move coalesce before persist
>
>
>
> You should see
>
> On Sun, 30 Jan 2022, 08:37 Benjamin Du,  wrote:
>
>> I have some PySpark code like below. Basically, I persist a DataFrame
>> (which is time-consuming to compute) to disk, call the method
>> DataFrame.count to trigger the caching/persist immediately, and then I
>> coalesce the DataFrame to reduce the number of partitions (the original
>> DataFrame has 30,000 partitions) and output it to HDFS. Based on the
>> execution time of job stages and the execution plan, it seems to me that
>> the DataFrame is recomputed at df.coalesce(300). Does anyone know why
>> this happens?
>>
>> df = spark.read.parquet("/input/hdfs/path") \
>> .filter(...) \
>> .withColumn("new_col", my_pandas_udf("col0", "col1")) \
>> .persist(StorageLevel.DISK_ONLY)
>> df.count()
>> df.coalesce(300).write.mode("overwrite").parquet(output_mod)
>>
>>
>> BTW, it works well if I manually write the DataFrame to HDFS, read it
>> back, coalesce it and write it back to HDFS.
>> Originally post at
>> https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice.
>> 
>>
>> Best,
>>
>> 
>>
>> Ben Du
>>
>> Personal Blog  | GitHub
>>  | Bitbucket 
>> | Docker Hub 
>>
>


Re: How to delete the record

2022-01-30 Thread Gourav Sengupta
Hi,

I think it will be useful to understand the problem before solving the
problem.

Can I please ask what this table is? Is it a fact (event store) kind of a
table, or a dimension (master data) kind of table? And what are the
downstream consumptions of this table?

Besides that what is the unique identifier for a record in this table? For
example, some master data tables have unique identifiers as phone numbers,
which can get reallocated to other individuals over a period of time.

Is there any other information that you can provide on this
table, its contents, usage, etc?

There is a third option, which is akin to the second option that Mich was
mentioning, and that is basically a database transaction log, which gets
very large, very expensive to store and query over a period of time. Are
you creating a database transaction log?


Thanks and Regards,
Gourav Sengupta


On Thu, Jan 27, 2022 at 9:03 PM ayan guha  wrote:

> Btw, 2 options Mitch explained are not mutually exclusive. Option 2 can
> and should be implemented over a delta lake table anyway. Especially if you
> need to do hard deletes eventually (eg for regulatory needs)
>
>
>
> On Fri, 28 Jan 2022 at 6:50 am, Sid Kal  wrote:
>
>> Thanks Mich and Sean for your time
>>
>> On Fri, 28 Jan 2022, 00:53 Mich Talebzadeh, 
>> wrote:
>>
>>> Yes I believe so.
>>>
>>> Check this article of mine dated early 2019 but will have some relevance
>>> to what I am implying.
>>>
>>>
>>> https://www.linkedin.com/pulse/real-time-data-streaming-big-typical-use-cases-talebzadeh-ph-d-/
>>>
>>> HTH
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Thu, 27 Jan 2022 at 18:46, Sid Kal  wrote:
>>>
 Okay sounds good.

 So,  below two options would help me to capture CDC changes:

 1) Delta lake
 2) Maintaining snapshot of records with some indicators and timestamp.

 Correct me if I'm wrong

 Thanks,
 Sid

 On Thu, 27 Jan 2022, 23:59 Mich Talebzadeh, 
 wrote:

> There are two ways of doing it.
>
>
>1. Through snapshot offered meaning an immutable snapshot of the
>state of the table at a given version. For example, the state
> of
>a Delta table
> at
>the version
>.
>2. Creating your own versioning. Taking your example you define
>the target storage *with two added columns, namely:* op_type INT
>(1-inset,2-update,3-delete) and op_timeTIMESTAMP  ingestion_time>.
>Your example record will be
>
>
> id   op_type  op_time
>
> 11 
>
> 13 
>
>
>df = rdd.toDF(). \
>
> withColumnRenamed("_1", "ID"). \
>
> withColumnRenamed("_2", "CLUSTERED"). \
>
> withColumnRenamed("_3", "SCATTERED"). \
>
> withColumnRenamed("_4", "RANDOMISED"). \
>
> withColumnRenamed("_5", "RANDOM_STRING"). \
>
> withColumnRenamed("_6", "SMALL_VC"). \
>
> withColumnRenamed("_7", "PADDING"). \
>
> withColumn("op_type", lit(1)). \
>
> withColumn("op_time", current_timestamp())
>
> Then  you can look at all records that were created and subsequently
> deleted and at what time
>
>
> SELECT ID, op_time FROM my_table> WHERE op_type in (1,3)
>
>
> HTH
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for
> any loss, damage or destruction of data or any other property which may
> arise from relying on this email's technical content is explicitly
> disclaimed. The author will in no case be liable for any monetary damages
> arising from such loss, damage or destruction.
>
>
>
>
> On Thu, 27 Jan 2022 at 17:54, Sid Kal  wrote:
>
>> Hi Sean,
>>
>> So you mean if I use those file formats it will do the work of CDC
>> automatically or I would have to handle it via code ?
>>
>> Hi Mich,
>>
>> Not sure if I understood you. Let me try to explain my scenario.
>> Suppose there is a Id "1" which is inserted 

Re: how can I remove the warning message

2022-01-30 Thread Gourav Sengupta
Hi,

I have often found that logging in the warnings is extremely useful, they
are just logs, and provide a lot of insights during upgrades, external
package loading, deprecation, debugging, etc.

Do you have any particular reason to disable the warnings in a submitted
job?

I used to disable warnings in spark-shell  using the
Logger.getLogger("akka").setLevel(Level.OFF) in case I have not completely
forgotten. Other details are mentioned here:
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.setLogLevel.html



Regards,
Gourav Sengupta

On Fri, Jan 28, 2022 at 11:14 AM  wrote:

> When I submitted the job from scala client, I got the warning messages:
>
> WARNING: An illegal reflective access operation has occurred
> WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform
> (file:/opt/spark/jars/spark-unsafe_2.12-3.2.0.jar) to constructor
> java.nio.DirectByteBuffer(long,int)
> WARNING: Please consider reporting this to the maintainers of
> org.apache.spark.unsafe.Platform
> WARNING: Use --illegal-access=warn to enable warnings of further illegal
> reflective access operations
> WARNING: All illegal access operations will be denied in a future
> release
>
> How can I just remove those messages?
>
> spark: 3.2.0
> scala: 2.13.7
>
> Thank you.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: A Persisted Spark DataFrame is computed twice

2022-01-30 Thread Deepak Sharma
coalesce returns a new dataset.
That will cause the recomputation.

Thanks
Deepak

On Sun, 30 Jan 2022 at 14:06, Benjamin Du  wrote:

> I have some PySpark code like below. Basically, I persist a DataFrame
> (which is time-consuming to compute) to disk, call the method
> DataFrame.count to trigger the caching/persist immediately, and then I
> coalesce the DataFrame to reduce the number of partitions (the original
> DataFrame has 30,000 partitions) and output it to HDFS. Based on the
> execution time of job stages and the execution plan, it seems to me that
> the DataFrame is recomputed at df.coalesce(300). Does anyone know why
> this happens?
>
> df = spark.read.parquet("/input/hdfs/path") \
> .filter(...) \
> .withColumn("new_col", my_pandas_udf("col0", "col1")) \
> .persist(StorageLevel.DISK_ONLY)
> df.count()
> df.coalesce(300).write.mode("overwrite").parquet(output_mod)
>
>
> BTW, it works well if I manually write the DataFrame to HDFS, read it
> back, coalesce it and write it back to HDFS.
> Originally post at
> https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice.
> 
>
> Best,
>
> 
>
> Ben Du
>
> Personal Blog  | GitHub
>  | Bitbucket 
> | Docker Hub 
>


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net


Re: A Persisted Spark DataFrame is computed twice

2022-01-30 Thread Sebastian Piu
It's probably the repartitioning and deserialising the df that you are
seeing take time. Try doing this

1. Add another count after your current one and compare times
2. Move coalesce before persist



You should see

On Sun, 30 Jan 2022, 08:37 Benjamin Du,  wrote:

> I have some PySpark code like below. Basically, I persist a DataFrame
> (which is time-consuming to compute) to disk, call the method
> DataFrame.count to trigger the caching/persist immediately, and then I
> coalesce the DataFrame to reduce the number of partitions (the original
> DataFrame has 30,000 partitions) and output it to HDFS. Based on the
> execution time of job stages and the execution plan, it seems to me that
> the DataFrame is recomputed at df.coalesce(300). Does anyone know why
> this happens?
>
> df = spark.read.parquet("/input/hdfs/path") \
> .filter(...) \
> .withColumn("new_col", my_pandas_udf("col0", "col1")) \
> .persist(StorageLevel.DISK_ONLY)
> df.count()
> df.coalesce(300).write.mode("overwrite").parquet(output_mod)
>
>
> BTW, it works well if I manually write the DataFrame to HDFS, read it
> back, coalesce it and write it back to HDFS.
> Originally post at
> https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice.
> 
>
> Best,
>
> 
>
> Ben Du
>
> Personal Blog  | GitHub
>  | Bitbucket 
> | Docker Hub 
>


Re: Kafka to spark streaming

2022-01-30 Thread Gourav Sengupta
Hi Amit,

before answering your question, I am just trying to understand it.

I am not exactly clear how do the Akka application, Kafka and SPARK
Streaming application sit together, and what are you exactly trying to
achieve?

Can you please elaborate?

Regards,
Gourav


On Fri, Jan 28, 2022 at 10:14 PM Amit Sharma  wrote:

> Hello everyone, we have spark streaming application. We send request to
> stream through Akka actor using Kafka topic. We wait for response as it is
> real time. Just want a suggestion is there any better option like Livy
> where we can send and receive request to spark streaming.
>
>
> Thanks
> Amit
>


A Persisted Spark DataFrame is computed twice

2022-01-30 Thread Benjamin Du
I have some PySpark code like below. Basically, I persist a DataFrame (which is 
time-consuming to compute) to disk, call the method DataFrame.count to trigger 
the caching/persist immediately, and then I coalesce the DataFrame to reduce 
the number of partitions (the original DataFrame has 30,000 partitions) and 
output it to HDFS. Based on the execution time of job stages and the execution 
plan, it seems to me that the DataFrame is recomputed at df.coalesce(300). Does 
anyone know why this happens?


df = spark.read.parquet("/input/hdfs/path") \
.filter(...) \
.withColumn("new_col", my_pandas_udf("col0", "col1")) \
.persist(StorageLevel.DISK_ONLY)
df.count()
df.coalesce(300).write.mode("overwrite").parquet(output_mod)


BTW, it works well if I manually write the DataFrame to HDFS, read it back, 
coalesce it and write it back to HDFS.

Originally post at 
https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice.

Best,



Ben Du

Personal Blog | GitHub | 
Bitbucket | Docker 
Hub