Re: [s3a] Spark is not reading s3 object content

2024-05-23 Thread Mich Talebzadeh
Could be a number of reasons

First test reading the file with a cli

aws s3 cp s3a://input/testfile.csv .
cat testfile.csv


Try this code with debug option to diagnose the problem

from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException

try:
# Initialize Spark session
spark = SparkSession.builder \
.appName("S3ReadTest") \
.config("spark.jars.packages",
"org.apache.hadoop:hadoop-aws:3.3.6") \
.config("spark.hadoop.fs.s3a.access.key", "R*6") \
.config("spark.hadoop.fs.s3a.secret.key", "1***e") \
.config("spark.hadoop.fs.s3a.endpoint", "192.168.52.63:8000") \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.hadoop.fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem") \
.getOrCreate()

# Read the CSV file from S3
df = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.option("delimiter", " ") \  # ensure this is apace
.csv("s3a://input/testfile.csv")

# Show the data
df.show(n=1)

except AnalysisException as e:
print(f"AnalysisException: {e}")
except Exception as e:
print(f"Error: {e}")
finally:
# Stop the Spark session
spark.stop()

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Thu, 23 May 2024 at 20:14, Amin Mosayyebzadeh 
wrote:

> I am trying to read an s3 object from a local S3 storage (Ceph based)
> using Spark 3.5.1. I see it can access the bucket and list the files (I
> have verified it on Ceph side by checking its logs), even returning the
> correct size of the object. But the content is not read.
>
> The object url is:
> s3a://input/testfile.csv (I have also tested a nested bucket:
> s3a://test1/test2/test3/testfile.csv)
>
>
> Object's content:
>
> =
> name int1 int2
> first 1 2
> second 3 4
> =
>
>
> Here is the config I have set so far:
>
> ("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.6")
> ("spark.hadoop.fs.s3a.access.key", "R*6")
> ("spark.hadoop.fs.s3a.secret.key", "1***e")
> ("spark.hadoop.fs.s3a.endpoint", "192.168.52.63:8000")
> ("spark.hadoop.fs.s3a.path.style.access", "true")
> ("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
>
>
> The outop for my following Pyspark application:
> df = spark.read \
> .option("header", "true") \
> .schema(schema) \
> .csv("s3a://input/testfile.csv", sep=' ')
>
> df.show(n=1)
> ==
> 24/05/20 02:35:00 INFO MetricsSystemImpl: s3a-file-system metrics system 
> started24/05/20 02:35:01 INFO MetadataLogFileIndex: Reading streaming file 
> log from s3a://input/testfile.csv/_spark_metadata24/05/20 02:35:01 INFO 
> FileStreamSinkLog: BatchIds found from listing:24/05/20 02:35:03 INFO 
> FileSourceStrategy: Pushed Filters:24/05/20 02:35:03 INFO FileSourceStrategy: 
> Post-Scan Filters:24/05/20 02:35:03 INFO CodeGenerator: Code generated in 
> 176.139675 ms24/05/20 02:35:03 INFO MemoryStore: Block broadcast_0 stored as 
> values in memory (estimated size 496.6 KiB, free 4.1 GiB)24/05/20 02:35:03 
> INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory 
> (estimated size 54.4 KiB, free 4.1 GiB)24/05/20 02:35:03 INFO 
> BlockManagerInfo: Added broadcast_0_piece0 in memory on master:38197 (size: 
> 54.4 KiB, free: 4.1 GiB)24/05/20 02:35:03 INFO SparkContext: Created 
> broadcast 0 from showString at NativeMethodAccessorImpl.java:024/05/20 
> 02:35:03 INFO FileSourceScanExec: Planning scan with bin packing, max size: 
> 4194304 bytes, open cost is considered as scanning 4194304 bytes.
> ++++
> |name|int1|int2|
> ++++
> ++++
> 24/05/20 02:35:04 INFO SparkContext: Invoking stop() from shutdown 
> hook24/05/20 02:35:04 INFO SparkContext: SparkContext is stopping with 
> exitCode 0
> =
>
> Am I missing something here?
>
> P.S. I see OP_IS_DIRECTORY is set to 1. Is that a correct behavior?
>
>
> Thanks in advance!
>
>


BUG :: UI Spark

2024-05-23 Thread Prem Sahoo
Hello Team,
in spark DAG UI , we have Stages tab. Once you click on each stage you can
view the tasks.

In each task we have a column "ShuffleWrite Size/Records " that column
prints wrong data when it gets the data from cache/persist . it
typically will show the wrong record number though the data size is correct
for e.g  3.2G/ 7400 which is wrong .

please advise.


[s3a] Spark is not reading s3 object content

2024-05-23 Thread Amin Mosayyebzadeh
 I am trying to read an s3 object from a local S3 storage (Ceph based)
using Spark 3.5.1. I see it can access the bucket and list the files (I
have verified it on Ceph side by checking its logs), even returning the
correct size of the object. But the content is not read.

The object url is:
s3a://input/testfile.csv (I have also tested a nested bucket:
s3a://test1/test2/test3/testfile.csv)


Object's content:

=
name int1 int2
first 1 2
second 3 4
=


Here is the config I have set so far:

("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.6")
("spark.hadoop.fs.s3a.access.key", "R*6")
("spark.hadoop.fs.s3a.secret.key", "1***e")
("spark.hadoop.fs.s3a.endpoint", "192.168.52.63:8000")
("spark.hadoop.fs.s3a.path.style.access", "true")
("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")


The outop for my following Pyspark application:
df = spark.read \
.option("header", "true") \
.schema(schema) \
.csv("s3a://input/testfile.csv", sep=' ')

df.show(n=1)
==
24/05/20 02:35:00 INFO MetricsSystemImpl: s3a-file-system metrics
system started24/05/20 02:35:01 INFO MetadataLogFileIndex: Reading
streaming file log from
s3a://input/testfile.csv/_spark_metadata24/05/20 02:35:01 INFO
FileStreamSinkLog: BatchIds found from listing:24/05/20 02:35:03 INFO
FileSourceStrategy: Pushed Filters:24/05/20 02:35:03 INFO
FileSourceStrategy: Post-Scan Filters:24/05/20 02:35:03 INFO
CodeGenerator: Code generated in 176.139675 ms24/05/20 02:35:03 INFO
MemoryStore: Block broadcast_0 stored as values in memory (estimated
size 496.6 KiB, free 4.1 GiB)24/05/20 02:35:03 INFO MemoryStore: Block
broadcast_0_piece0 stored as bytes in memory (estimated size 54.4 KiB,
free 4.1 GiB)24/05/20 02:35:03 INFO BlockManagerInfo: Added
broadcast_0_piece0 in memory on master:38197 (size: 54.4 KiB, free:
4.1 GiB)24/05/20 02:35:03 INFO SparkContext: Created broadcast 0 from
showString at NativeMethodAccessorImpl.java:024/05/20 02:35:03 INFO
FileSourceScanExec: Planning scan with bin packing, max size: 4194304
bytes, open cost is considered as scanning 4194304 bytes.
++++
|name|int1|int2|
++++
++++
24/05/20 02:35:04 INFO SparkContext: Invoking stop() from shutdown
hook24/05/20 02:35:04 INFO SparkContext: SparkContext is stopping with
exitCode 0
=

Am I missing something here?

P.S. I see OP_IS_DIRECTORY is set to 1. Is that a correct behavior?


Thanks in advance!


Re: Dstream HasOffsetRanges equivalent in Structured streaming

2024-05-22 Thread Anil Dasari
You are right.
- another question on migration. Is there a way to get the microbatch id
during the microbatch dataset `trasform` operation like in rdd transform ?
I am attempting to implement the following pseudo functionality with
structured streaming. In this approach, recordCategoriesMetadata is fetched
and rdd metrics like rdd size etc using microbatch idin the transform
operation.
```code
val rddQueue = new mutable.Queue[RDD[Int]]()
// source components
val sources = Seq.empty[String]
val consolidatedDstream = sources
.map(source => {
val inputStream = ssc.queueStream(rddQueue)
inputStream.transform((rdd, ts) => {
// emit metrics of microbatch ts : rdd size etc.

val recordCategories = rdd.map(..).collect();
val recordCategoriesMetadata = ...
rdd
.map(r =>
val customRecord = CustomRecord(r, SomeMetadataOfRecordSchema)
(source, customRecord)
)
})
}
)
.reduceLeft(_ union _)

consolidatedDstream
.foreachRDD((rdd, ts) => {
// get pipes for each source
val pipes = Seq.empty[String] // pipes of given source
pipes.foreach(pipe => {
val pipeSource = null; // get from pipe variable
val psRDD = rdd
.filter {
case (source, sourceRDD) => source.equals(pipeSource)
}
// apply pipe transformation and sink

})
})
```

In structured streaming, it can look like -

```code
val consolidatedDstream = sources
.map(source => {
val inputStream = ... (for each source)
inputStream
}
)
.reduceLeft(_ union _)

consolidatedDstream
.writeStream
.foreachBatch((ds, ts) => {
val newDS = ds.transform((internalDS => {
// emit metrics of microbatch ts : rdd size etc.

val recordCategories = rdd.map(..).collect();
val recordCategoriesMetadata = ...
internalDS
.map(r =>
val customRecord = CustomRecord(r, SomeMetadataOfRecordSchema)
(source, customRecord)
)
})(... )
// get pipes for each source
val pipes = Seq.empty[String] // pipes of given source
pipes.foreach(pipe => {
val pipeSource = null; // get from pipe variable
val psRDD = newDS
.filter {
case (source, sourceDS) => source.equals(pipeSource)
}
// apply pipe transformation and sink

})
})
```
^ is just pseudo code and still not sure if it works. Let me know your
suggestions if any. thanks.

On Wed, May 22, 2024 at 8:34 AM Tathagata Das 
wrote:

> The right way to associated microbatches when committing to external
> storage is to use the microbatch id that you can get in foreachBatch. That
> microbatch id guarantees that the data produced in the batch is the always
> the same no matter any recomputations (assuming all processing logic is
> deterministic). So you can commit the batch id + batch data together. And
> then async commit the batch id + offsets.
>
> On Wed, May 22, 2024 at 11:27 AM Anil Dasari 
> wrote:
>
>> Thanks Das, Mtich.
>>
>> Mitch,
>> We process data from Kafka and write it to S3 in Parquet format using
>> Dstreams. To ensure exactly-once delivery and prevent data loss, our
>> process records micro-batch offsets to an external storage at the end of
>> each micro-batch in foreachRDD, which is then used when the job restarts.
>>
>> Das,
>> Thanks for sharing the details. I will look into them.
>> Unfortunately, the listeners process is async and can't guarantee happens
>> before association with microbatch to commit offsets to external storage.
>> But still they will work. Is there a way to access lastProgress in
>> foreachBatch ?
>>
>>
>> On Wed, May 22, 2024 at 7:35 AM Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> If you want to find what offset ranges are present in a microbatch in
>>> Structured Streaming, you have to look at the
>>> StreamingQuery.lastProgress or use the QueryProgressListener
>>> .
>>> Both of these approaches gives you access to the SourceProgress
>>> 
>>> which gives Kafka offsets as a JSON string.
>>>
>>> Hope this helps!
>>>
>>> On Wed, May 22, 2024 at 10:04 AM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 OK to understand better your current model relies on streaming data
 input through Kafka topic, Spark does some ETL and you send to a sink, a
 database for file storage like HDFS etc?

 Your current architecture relies on Direct Streams (DStream) and RDDs
 and you want to move to Spark sStructured Streaming based on dataframes and
 datasets?

 You have not specified your sink

 With regard to your question?

 "Is there an equivalent of Dstream HasOffsetRanges in structure
 streaming to get the microbatch end offsets to the checkpoint in our
 external checkpoint store ?"

 There is not a direct equivalent of DStream HasOffsetRanges in Spark
 Structured Streaming. However, Structured Streaming provides mechanisms to
 achieve similar functionality:

 HTH

 Mich Talebzadeh,
 Technologist | Architect | Data Engineer  | 

Re: Dstream HasOffsetRanges equivalent in Structured streaming

2024-05-22 Thread Mich Talebzadeh
With regard to this sentence


*Offset Tracking with Structured Streaming:: While storing offsets in an
external storage with DStreams was necessary, SSS handles this
automatically through checkpointing. The checkpoints include the offsets
processed by each micro-batch. However, you can still access the most
recent offsets using the offset() method on your StreamingQuery object for
monitoring purposes that is if you need it*

In essence, with SSS and checkpointing in place, you can rely on the
automatic offset management provided by the framework,
*eliminating the need for the custom offset storage you had with DStreams.*

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Wed, 22 May 2024 at 19:49, Mich Talebzadeh 
wrote:

> Hi  Anil,
>
> Ok let us put the complete picture here
>
>* Current DStreams Setup:*
>
>- Data Source: Kafka
>- Processing Engine: Spark DStreams
>- Data Transformation with Spark
>- Sink: S3
>- Data Format: Parquet
>- Exactly-Once Delivery (Attempted): You're attempting exactly-once
>delivery by recording micro-batch offsets in an external storage using
>foreachRDD at the end of each micro-batch. This allows you to potentially
>restart the job from the last processed offset in case of failures?
>- Challenges with DStreams for Exactly-Once Delivery: Spark DStreams
>offer limited built-in support for exactly-once delivery guarantees.
>
>
> *Moving to Spark Structured Streaming: (SSS)*
>
> All stays the same. except below
>
>
>- Exactly-Once Delivery which is guaranteed by SSS
>- Checkpointing: Enable checkpointing by setting the
>checkpointLocation option in  writeStream. Spark will periodically
>checkpoint the state of streaming query, including offsets, to a designated
>location (e.g., HDFS, cloud storage or SSD).
>- Offset Tracking with Structured Streaming:: While storing offsets in
>an external storage with DStreams was necessary, SSS handles this
>automatically through checkpointing. The checkpoints include the offsets
>processed by each micro-batch. However, you can still access the most
>recent offsets using the offset() method on your StreamingQuery object for
>monitoring purposes that is if you need it
>
> Have a look at this article of mine  about  structured streaming  and
> checkpointing
>
> Processing Change Data Capture with Spark Structured Streaming
> 
>
> In your case briefly
>
> def *store_offsets_to_checkpoint*(df, batchId):
> if(len(df.take(1))) > 0:
>  df. persist()
>  # Extract offsets from the DataFrame (assuming a column named
> 'offset')
>  offset_rows = df.select(col('offset')).rdd.collect()
>  # Create OffsetRange objects from extracted offsets
>  offsets = [OffsetRange(partition=row.partition,
> fromOffset=row.offset, toOffset=row.offset + 1) # Assuming 'partition'
> and 'offset' columns
> for row in offset_rows]
>  # Logic to store offsets in your external checkpoint store)
>   ..
>   df.unpersist()
> else:
>   print("DataFrame is empty")
>
> # Define your Structured Streaming application with Kafka source and sink
>
>"""
>"foreach" performs custom write logic on each row and
> "foreachBatch" performs custom write logic on each micro-batch through
> *store_offsets_to_checkpoint* function
> foreachBatch(*store_offsets_to_checkpoint*) expects 2
> parameters, first: micro-batch as DataFrame or Dataset and second: unique
> id for each batch
>Using foreachBatch, we write each micro batch to storage
> defined in our custom logic
> """
>
> streaming = spark.readStream \
>.format("kafka") \ .
> option("kafka.bootstrap.servers", "localhost:9092") \
>   .option("subscribe", "topic_name") \
>   .load()
>
> # Custom sink function to store offsets in checkpoint
> streaming = streaming.writeStream \
>  . format("memory")  \
>  *.option("checkpointLocation", "/path/to/checkpoint/store") \ *
>   .foreachBatch(*store_offsets_to_checkpoint*) \
>   .start()
>
> HTH
>
>
> 
>
> 

Remote File change detection in S3 when spark queries are running and parquet files in S3 changes

2024-05-22 Thread Raghvendra Yadav
Hello,
 We are hoping someone can help us understand the spark behavior
for scenarios listed below.

Q. *Will spark running queries fail when S3 parquet object changes
underneath with S3A remote file change detection enabled?  Is it 100%? *
Our understanding is that S3A has a feature for remote file change
detection using ETag, implemented in the S3AInputStream class.
This feature caches the ETag per S3AInputStream Instance and uses it to
detect file changes even if the stream is reopened. When running a Spark
query that uses FSDataInputStream, will it reliably detect changes in the
file on S3?

*Q2. Does spark work on a single instance of S3AInputStream for a parquet
file or can open multiple S3AInputStream for some queries? *



Thanks
Raghav


Re: Dstream HasOffsetRanges equivalent in Structured streaming

2024-05-22 Thread Mich Talebzadeh
Hi  Anil,

Ok let us put the complete picture here

   * Current DStreams Setup:*

   - Data Source: Kafka
   - Processing Engine: Spark DStreams
   - Data Transformation with Spark
   - Sink: S3
   - Data Format: Parquet
   - Exactly-Once Delivery (Attempted): You're attempting exactly-once
   delivery by recording micro-batch offsets in an external storage using
   foreachRDD at the end of each micro-batch. This allows you to potentially
   restart the job from the last processed offset in case of failures?
   - Challenges with DStreams for Exactly-Once Delivery: Spark DStreams
   offer limited built-in support for exactly-once delivery guarantees.


*Moving to Spark Structured Streaming: (SSS)*

All stays the same. except below


   - Exactly-Once Delivery which is guaranteed by SSS
   - Checkpointing: Enable checkpointing by setting the checkpointLocation
   option in  writeStream. Spark will periodically checkpoint the state of
   streaming query, including offsets, to a designated location (e.g., HDFS,
   cloud storage or SSD).
   - Offset Tracking with Structured Streaming:: While storing offsets in
   an external storage with DStreams was necessary, SSS handles this
   automatically through checkpointing. The checkpoints include the offsets
   processed by each micro-batch. However, you can still access the most
   recent offsets using the offset() method on your StreamingQuery object for
   monitoring purposes that is if you need it

Have a look at this article of mine  about  structured streaming  and
checkpointing

Processing Change Data Capture with Spark Structured Streaming


In your case briefly

def *store_offsets_to_checkpoint*(df, batchId):
if(len(df.take(1))) > 0:
 df. persist()
 # Extract offsets from the DataFrame (assuming a column named
'offset')
 offset_rows = df.select(col('offset')).rdd.collect()
 # Create OffsetRange objects from extracted offsets
 offsets = [OffsetRange(partition=row.partition,
fromOffset=row.offset, toOffset=row.offset + 1) # Assuming 'partition' and
'offset' columns
for row in offset_rows]
 # Logic to store offsets in your external checkpoint store)
  ..
  df.unpersist()
else:
  print("DataFrame is empty")

# Define your Structured Streaming application with Kafka source and sink

   """
   "foreach" performs custom write logic on each row and
"foreachBatch" performs custom write logic on each micro-batch through
*store_offsets_to_checkpoint* function
foreachBatch(*store_offsets_to_checkpoint*) expects 2
parameters, first: micro-batch as DataFrame or Dataset and second: unique
id for each batch
   Using foreachBatch, we write each micro batch to storage
defined in our custom logic
"""

streaming = spark.readStream \
   .format("kafka") \ .
option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "topic_name") \
  .load()

# Custom sink function to store offsets in checkpoint
streaming = streaming.writeStream \
 . format("memory")  \
 *.option("checkpointLocation", "/path/to/checkpoint/store") \ *
  .foreachBatch(*store_offsets_to_checkpoint*) \
  .start()

HTH



Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Wed, 22 May 2024 at 16:27, Anil Dasari  wrote:

> Thanks Das, Mtich.
>
> Mitch,
> We process data from Kafka and write it to S3 in Parquet format using
> Dstreams. To ensure exactly-once delivery and prevent data loss, our
> process records micro-batch offsets to an external storage at the end of
> each micro-batch in foreachRDD, which is then used when the job restarts.
>
> Das,
> Thanks for sharing the details. I will look into them.
> Unfortunately, the listeners process is async and can't guarantee happens
> before association with microbatch to commit offsets to external storage.
> But still they will work. Is there a way to access lastProgress in
> foreachBatch ?
>
>
> On Wed, May 22, 2024 at 7:35 AM Tathagata Das 
> wrote:
>
>> If you want to find what 

Re: Dstream HasOffsetRanges equivalent in Structured streaming

2024-05-22 Thread Tathagata Das
The right way to associated microbatches when committing to external
storage is to use the microbatch id that you can get in foreachBatch. That
microbatch id guarantees that the data produced in the batch is the always
the same no matter any recomputations (assuming all processing logic is
deterministic). So you can commit the batch id + batch data together. And
then async commit the batch id + offsets.

On Wed, May 22, 2024 at 11:27 AM Anil Dasari  wrote:

> Thanks Das, Mtich.
>
> Mitch,
> We process data from Kafka and write it to S3 in Parquet format using
> Dstreams. To ensure exactly-once delivery and prevent data loss, our
> process records micro-batch offsets to an external storage at the end of
> each micro-batch in foreachRDD, which is then used when the job restarts.
>
> Das,
> Thanks for sharing the details. I will look into them.
> Unfortunately, the listeners process is async and can't guarantee happens
> before association with microbatch to commit offsets to external storage.
> But still they will work. Is there a way to access lastProgress in
> foreachBatch ?
>
>
> On Wed, May 22, 2024 at 7:35 AM Tathagata Das 
> wrote:
>
>> If you want to find what offset ranges are present in a microbatch in
>> Structured Streaming, you have to look at the StreamingQuery.lastProgress or
>> use the QueryProgressListener
>> .
>> Both of these approaches gives you access to the SourceProgress
>> 
>> which gives Kafka offsets as a JSON string.
>>
>> Hope this helps!
>>
>> On Wed, May 22, 2024 at 10:04 AM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> OK to understand better your current model relies on streaming data
>>> input through Kafka topic, Spark does some ETL and you send to a sink, a
>>> database for file storage like HDFS etc?
>>>
>>> Your current architecture relies on Direct Streams (DStream) and RDDs
>>> and you want to move to Spark sStructured Streaming based on dataframes and
>>> datasets?
>>>
>>> You have not specified your sink
>>>
>>> With regard to your question?
>>>
>>> "Is there an equivalent of Dstream HasOffsetRanges in structure
>>> streaming to get the microbatch end offsets to the checkpoint in our
>>> external checkpoint store ?"
>>>
>>> There is not a direct equivalent of DStream HasOffsetRanges in Spark
>>> Structured Streaming. However, Structured Streaming provides mechanisms to
>>> achieve similar functionality:
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* The information provided is correct to the best of my
>>> knowledge but of course cannot be guaranteed . It is essential to note
>>> that, as with any advice, quote "one test result is worth one-thousand
>>> expert opinions (Werner
>>> Von Braun
>>> )".
>>>
>>>
>>> On Wed, 22 May 2024 at 10:32, ashok34...@yahoo.com.INVALID
>>>  wrote:
>>>
 Hello,

 what options are you considering yourself?

 On Wednesday 22 May 2024 at 07:37:30 BST, Anil Dasari <
 adas...@guidewire.com> wrote:


 Hello,

 We are on Spark 3.x and using Spark dstream + kafka and planning to use
 structured streaming + Kafka.
 Is there an equivalent of Dstream HasOffsetRanges in structure
 streaming to get the microbatch end offsets to the checkpoint in our
 external checkpoint store ? Thanks in advance.

 Regards




Re: Dstream HasOffsetRanges equivalent in Structured streaming

2024-05-22 Thread Anil Dasari
Thanks Das, Mtich.

Mitch,
We process data from Kafka and write it to S3 in Parquet format using
Dstreams. To ensure exactly-once delivery and prevent data loss, our
process records micro-batch offsets to an external storage at the end of
each micro-batch in foreachRDD, which is then used when the job restarts.

Das,
Thanks for sharing the details. I will look into them.
Unfortunately, the listeners process is async and can't guarantee happens
before association with microbatch to commit offsets to external storage.
But still they will work. Is there a way to access lastProgress in
foreachBatch ?


On Wed, May 22, 2024 at 7:35 AM Tathagata Das 
wrote:

> If you want to find what offset ranges are present in a microbatch in
> Structured Streaming, you have to look at the StreamingQuery.lastProgress or
> use the QueryProgressListener
> .
> Both of these approaches gives you access to the SourceProgress
> 
> which gives Kafka offsets as a JSON string.
>
> Hope this helps!
>
> On Wed, May 22, 2024 at 10:04 AM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> OK to understand better your current model relies on streaming data input
>> through Kafka topic, Spark does some ETL and you send to a sink, a
>> database for file storage like HDFS etc?
>>
>> Your current architecture relies on Direct Streams (DStream) and RDDs and
>> you want to move to Spark sStructured Streaming based on dataframes and
>> datasets?
>>
>> You have not specified your sink
>>
>> With regard to your question?
>>
>> "Is there an equivalent of Dstream HasOffsetRanges in structure
>> streaming to get the microbatch end offsets to the checkpoint in our
>> external checkpoint store ?"
>>
>> There is not a direct equivalent of DStream HasOffsetRanges in Spark
>> Structured Streaming. However, Structured Streaming provides mechanisms to
>> achieve similar functionality:
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>> 
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner
>> Von
>> Braun
>> 
>> )".
>>
>>
>> On Wed, 22 May 2024 at 10:32, ashok34...@yahoo.com.INVALID
>>  wrote:
>>
>>> Hello,
>>>
>>> what options are you considering yourself?
>>>
>>> On Wednesday 22 May 2024 at 07:37:30 BST, Anil Dasari <
>>> adas...@guidewire.com> wrote:
>>>
>>>
>>> Hello,
>>>
>>> We are on Spark 3.x and using Spark dstream + kafka and planning to use
>>> structured streaming + Kafka.
>>> Is there an equivalent of Dstream HasOffsetRanges in structure streaming
>>> to get the microbatch end offsets to the checkpoint in our external
>>> checkpoint store ? Thanks in advance.
>>>
>>> Regards
>>>
>>>


Re: Dstream HasOffsetRanges equivalent in Structured streaming

2024-05-22 Thread Tathagata Das
If you want to find what offset ranges are present in a microbatch in
Structured Streaming, you have to look at the StreamingQuery.lastProgress or
use the QueryProgressListener
.
Both of these approaches gives you access to the SourceProgress

which gives Kafka offsets as a JSON string.

Hope this helps!

On Wed, May 22, 2024 at 10:04 AM Mich Talebzadeh 
wrote:

> OK to understand better your current model relies on streaming data input
> through Kafka topic, Spark does some ETL and you send to a sink, a
> database for file storage like HDFS etc?
>
> Your current architecture relies on Direct Streams (DStream) and RDDs and
> you want to move to Spark sStructured Streaming based on dataframes and
> datasets?
>
> You have not specified your sink
>
> With regard to your question?
>
> "Is there an equivalent of Dstream HasOffsetRanges in structure streaming
> to get the microbatch end offsets to the checkpoint in our external
> checkpoint store ?"
>
> There is not a direct equivalent of DStream HasOffsetRanges in Spark
> Structured Streaming. However, Structured Streaming provides mechanisms to
> achieve similar functionality:
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Wed, 22 May 2024 at 10:32, ashok34...@yahoo.com.INVALID
>  wrote:
>
>> Hello,
>>
>> what options are you considering yourself?
>>
>> On Wednesday 22 May 2024 at 07:37:30 BST, Anil Dasari <
>> adas...@guidewire.com> wrote:
>>
>>
>> Hello,
>>
>> We are on Spark 3.x and using Spark dstream + kafka and planning to use
>> structured streaming + Kafka.
>> Is there an equivalent of Dstream HasOffsetRanges in structure streaming
>> to get the microbatch end offsets to the checkpoint in our external
>> checkpoint store ? Thanks in advance.
>>
>> Regards
>>
>>


[ANNOUNCE] Apache Celeborn 0.4.1 available

2024-05-22 Thread Nicholas Jiang
Hi all,

Apache Celeborn community is glad to announce the
new release of Apache Celeborn 0.4.1.

Celeborn is dedicated to improving the efficiency and elasticity of
different map-reduce engines and provides an elastic, high-efficient
service for intermediate data including shuffle data, spilled data,
result data, etc.


Download Link: https://celeborn.apache.org/download/

GitHub Release Tag:

- https://github.com/apache/celeborn/releases/tag/v0.4.1

Release Notes:

- https://celeborn.apache.org/community/release_notes/release_note_0.4.1


Home Page: https://celeborn.apache.org/

Celeborn Resources:

- Issue Management: https://issues.apache.org/jira/projects/CELEBORN
- Mailing List: d...@celeborn.apache.org

Regards,
Nicholas Jiang
On behalf of the Apache Celeborn community

Re: Dstream HasOffsetRanges equivalent in Structured streaming

2024-05-22 Thread Mich Talebzadeh
OK to understand better your current model relies on streaming data input
through Kafka topic, Spark does some ETL and you send to a sink, a
database for file storage like HDFS etc?

Your current architecture relies on Direct Streams (DStream) and RDDs and
you want to move to Spark sStructured Streaming based on dataframes and
datasets?

You have not specified your sink

With regard to your question?

"Is there an equivalent of Dstream HasOffsetRanges in structure streaming
to get the microbatch end offsets to the checkpoint in our external
checkpoint store ?"

There is not a direct equivalent of DStream HasOffsetRanges in Spark
Structured Streaming. However, Structured Streaming provides mechanisms to
achieve similar functionality:

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Wed, 22 May 2024 at 10:32, ashok34...@yahoo.com.INVALID
 wrote:

> Hello,
>
> what options are you considering yourself?
>
> On Wednesday 22 May 2024 at 07:37:30 BST, Anil Dasari <
> adas...@guidewire.com> wrote:
>
>
> Hello,
>
> We are on Spark 3.x and using Spark dstream + kafka and planning to use
> structured streaming + Kafka.
> Is there an equivalent of Dstream HasOffsetRanges in structure streaming
> to get the microbatch end offsets to the checkpoint in our external
> checkpoint store ? Thanks in advance.
>
> Regards
>
>


Re: Dstream HasOffsetRanges equivalent in Structured streaming

2024-05-22 Thread ashok34...@yahoo.com.INVALID
 Hello,
what options are you considering yourself?
On Wednesday 22 May 2024 at 07:37:30 BST, Anil Dasari 
 wrote:  
 
 Hello,

We are on Spark 3.x and using Spark dstream + kafka and planning to use 
structured streaming + Kafka. Is there an equivalent of Dstream HasOffsetRanges 
in structure streaming to get the microbatch end offsets to the checkpoint in 
our external checkpoint store ? Thanks in advance. 
Regards
  

Dstream HasOffsetRanges equivalent in Structured streaming

2024-05-22 Thread Anil Dasari
Hello,

We are on Spark 3.x and using Spark dstream + kafka and planning to use
structured streaming + Kafka.
Is there an equivalent of Dstream HasOffsetRanges in structure streaming to
get the microbatch end offsets to the checkpoint in our external checkpoint
store ? Thanks in advance.

Regards


Re: Re: EXT: Dual Write to HDFS and MinIO in faster way

2024-05-21 Thread Prem Sahoo
I am looking for writer/comitter optimization which can make the spark
write faster.

On Tue, May 21, 2024 at 9:15 PM eab...@163.com  wrote:

> Hi,
> I think you should write to HDFS then copy file (parquet or orc) from
> HDFS to MinIO.
>
> --
> eabour
>
>
> *From:* Prem Sahoo 
> *Date:* 2024-05-22 00:38
> *To:* Vibhor Gupta ; user
> 
> *Subject:* Re: EXT: Dual Write to HDFS and MinIO in faster way
>
>
> On Tue, May 21, 2024 at 6:58 AM Prem Sahoo  wrote:
>
>> Hello Vibhor,
>> Thanks for the suggestion .
>> I am looking for some other alternatives where I can use the same
>> dataframe can be written to two destinations without re execution and cache
>> or persist .
>>
>> Can some one help me in scenario 2 ?
>> How to make spark write to MinIO faster ?
>> Sent from my iPhone
>>
>> On May 21, 2024, at 1:18 AM, Vibhor Gupta 
>> wrote:
>>
>> 
>>
>> Hi Prem,
>>
>>
>>
>> You can try to write to HDFS then read from HDFS and write to MinIO.
>>
>>
>>
>> This will prevent duplicate transformation.
>>
>>
>>
>> You can also try persisting the dataframe using the DISK_ONLY level.
>>
>>
>>
>> Regards,
>>
>> Vibhor
>>
>> *From: *Prem Sahoo 
>> *Date: *Tuesday, 21 May 2024 at 8:16 AM
>> *To: *Spark dev list 
>> *Subject: *EXT: Dual Write to HDFS and MinIO in faster way
>>
>> *EXTERNAL: *Report suspicious emails to *Email Abuse.*
>>
>> Hello Team,
>>
>> I am planning to write to two datasource at the same time .
>>
>>
>>
>> Scenario:-
>>
>>
>>
>> Writing the same dataframe to HDFS and MinIO without re-executing the
>> transformations and no cache(). Then how can we make it faster ?
>>
>>
>>
>> Read the parquet file and do a few transformations and write to HDFS and
>> MinIO.
>>
>>
>>
>> here in both write spark needs execute the transformation again. Do we
>> know how we can avoid re-execution of transformation  without
>> cache()/persist ?
>>
>>
>>
>> Scenario2 :-
>>
>> I am writing 3.2G data to HDFS and MinIO which takes ~6mins.
>>
>> Do we have any way to make writing this faster ?
>>
>>
>>
>> I don't want to do repartition and write as repartition will have
>> overhead of shuffling .
>>
>>
>>
>> Please provide some inputs.
>>
>>
>>
>>
>>
>>


Re: Re: EXT: Dual Write to HDFS and MinIO in faster way

2024-05-21 Thread eab...@163.com
Hi,
I think you should write to HDFS then copy file (parquet or orc) from HDFS 
to MinIO.



eabour
 
From: Prem Sahoo
Date: 2024-05-22 00:38
To: Vibhor Gupta; user
Subject: Re: EXT: Dual Write to HDFS and MinIO in faster way


On Tue, May 21, 2024 at 6:58 AM Prem Sahoo  wrote:
Hello Vibhor,
Thanks for the suggestion .
I am looking for some other alternatives where I can use the same dataframe can 
be written to two destinations without re execution and cache or persist .

Can some one help me in scenario 2 ?
How to make spark write to MinIO faster ?
Sent from my iPhone

On May 21, 2024, at 1:18 AM, Vibhor Gupta  wrote:

 
Hi Prem,
 
You can try to write to HDFS then read from HDFS and write to MinIO.
 
This will prevent duplicate transformation.
 
You can also try persisting the dataframe using the DISK_ONLY level.
 
Regards,
Vibhor
From: Prem Sahoo 
Date: Tuesday, 21 May 2024 at 8:16 AM
To: Spark dev list 
Subject: EXT: Dual Write to HDFS and MinIO in faster way
EXTERNAL: Report suspicious emails to Email Abuse.
Hello Team,
I am planning to write to two datasource at the same time . 
 
Scenario:-
 
Writing the same dataframe to HDFS and MinIO without re-executing the 
transformations and no cache(). Then how can we make it faster ?
 
Read the parquet file and do a few transformations and write to HDFS and MinIO.
 
here in both write spark needs execute the transformation again. Do we know how 
we can avoid re-execution of transformation  without cache()/persist ?
 
Scenario2 :-
I am writing 3.2G data to HDFS and MinIO which takes ~6mins.
Do we have any way to make writing this faster ?
 
I don't want to do repartition and write as repartition will have overhead of 
shuffling .
 
Please provide some inputs. 
 
 


Re: A handy tool called spark-column-analyser

2024-05-21 Thread ashok34...@yahoo.com.INVALID
 Great work. Very handy for identifying problems
thanks
On Tuesday 21 May 2024 at 18:12:15 BST, Mich Talebzadeh 
 wrote:  
 
 A colleague kindly pointed out about giving an example of output which wll be 
added to README
Doing analysis for column Postcode
Json formatted output
{    "Postcode": {        "exists": true,        "num_rows": 93348,        
"data_type": "string",        "null_count": 21921,        "null_percentage": 
23.48,        "distinct_count": 38726,        "distinct_percentage": 41.49    }}
Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom



   view my Linkedin profile




 https://en.everybodywiki.com/Mich_Talebzadeh

 



Disclaimer: The information provided is correct to the best of my knowledge but 
of course cannot be guaranteed . It is essential to note that, as with any 
advice, quote "one test result is worth one-thousand expert opinions (Werner 
Von Braun)".


On Tue, 21 May 2024 at 16:21, Mich Talebzadeh  wrote:


I just wanted to share a tool I built called spark-column-analyzer. It's a 
Python package that helps you dig into your Spark DataFrames with ease. 

Ever spend ages figuring out what's going on in your columns? Like, how many 
null values are there, or how many unique entries? Built with data preparation 
for Generative AI in mind, it aids in data imputation and augmentation – key 
steps for creating realistic synthetic data.

Basics
   
   - Effortless Column Analysis: It calculates all the important stats you need 
for each column, like null counts, distinct values, percentages, and more. No 
more manual counting or head scratching!
   - Simple to Use: Just toss in your DataFrame and call the analyze_column 
function. Bam! Insights galore.
   - Makes Data Cleaning easier: Knowing your data's quality helps you clean it 
up way faster. This package helps you figure out where the missing values are 
hiding and how much variety you've got in each column.
   - Detecting skewed columns
   - Open Source and Friendly: Feel free to tinker, suggest improvements, or 
even contribute some code yourself! We love collaboration in the Spark 
community.

Installation: 


Using pip from the link: https://pypi.org/project/spark-column-analyzer/

pip install spark-column-analyzer

Also you can clone the project from gitHub

git clone https://github.com/michTalebzadeh/spark_column_analyzer.git

The details are in the attached RENAME file
Let me know what you think! Feedback is always welcome.

HTH

Mich Talebzadeh,

Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom



   view my Linkedin profile




 https://en.everybodywiki.com/Mich_Talebzadeh

 



Disclaimer: The information provided is correct to the best of my knowledge but 
of course cannot be guaranteed . It is essential to note that, as with any 
advice, quote "one test result is worth one-thousand expert opinions (Werner 
Von Braun)".

  

Re: A handy tool called spark-column-analyser

2024-05-21 Thread Mich Talebzadeh
A colleague kindly pointed out about giving an example of output which wll
be added to README

Doing analysis for column Postcode

Json formatted output

{
"Postcode": {
"exists": true,
"num_rows": 93348,
"data_type": "string",
"null_count": 21921,
"null_percentage": 23.48,
"distinct_count": 38726,
"distinct_percentage": 41.49
}
}

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Tue, 21 May 2024 at 16:21, Mich Talebzadeh 
wrote:

> I just wanted to share a tool I built called *spark-column-analyzer*.
> It's a Python package that helps you dig into your Spark DataFrames with
> ease.
>
> Ever spend ages figuring out what's going on in your columns? Like, how
> many null values are there, or how many unique entries? Built with data
> preparation for Generative AI in mind, it aids in data imputation and
> augmentation – key steps for creating realistic synthetic data.
>
> *Basics*
>
>- *Effortless Column Analysis:* It calculates all the important stats
>you need for each column, like null counts, distinct values, percentages,
>and more. No more manual counting or head scratching!
>- *Simple to Use:* Just toss in your DataFrame and call the
>analyze_column function. Bam! Insights galore.
>- *Makes Data Cleaning easier:* Knowing your data's quality helps you
>clean it up way faster. This package helps you figure out where the missing
>values are hiding and how much variety you've got in each column.
>- *Detecting skewed columns*
>- *Open Source and Friendly:* Feel free to tinker, suggest
>improvements, or even contribute some code yourself! We love collaboration
>in the Spark community.
>
> *Installation:*
>
> Using pip from the link: https://pypi.org/project/spark-column-analyzer/
>
>
> *pip install spark-column-analyzer*
> Also you can clone the project from gitHub
>
>
> *git clone https://github.com/michTalebzadeh/spark_column_analyzer.git
> *
>
> The details are in the attached RENAME file
>
> Let me know what you think! Feedback is always welcome.
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>


Re: EXT: Dual Write to HDFS and MinIO in faster way

2024-05-21 Thread Prem Sahoo
On Tue, May 21, 2024 at 6:58 AM Prem Sahoo  wrote:

> Hello Vibhor,
> Thanks for the suggestion .
> I am looking for some other alternatives where I can use the same
> dataframe can be written to two destinations without re execution and cache
> or persist .
>
> Can some one help me in scenario 2 ?
> How to make spark write to MinIO faster ?
> Sent from my iPhone
>
> On May 21, 2024, at 1:18 AM, Vibhor Gupta 
> wrote:
>
> 
>
> Hi Prem,
>
>
>
> You can try to write to HDFS then read from HDFS and write to MinIO.
>
>
>
> This will prevent duplicate transformation.
>
>
>
> You can also try persisting the dataframe using the DISK_ONLY level.
>
>
>
> Regards,
>
> Vibhor
>
> *From: *Prem Sahoo 
> *Date: *Tuesday, 21 May 2024 at 8:16 AM
> *To: *Spark dev list 
> *Subject: *EXT: Dual Write to HDFS and MinIO in faster way
>
> *EXTERNAL: *Report suspicious emails to *Email Abuse.*
>
> Hello Team,
>
> I am planning to write to two datasource at the same time .
>
>
>
> Scenario:-
>
>
>
> Writing the same dataframe to HDFS and MinIO without re-executing the
> transformations and no cache(). Then how can we make it faster ?
>
>
>
> Read the parquet file and do a few transformations and write to HDFS and
> MinIO.
>
>
>
> here in both write spark needs execute the transformation again. Do we
> know how we can avoid re-execution of transformation  without
> cache()/persist ?
>
>
>
> Scenario2 :-
>
> I am writing 3.2G data to HDFS and MinIO which takes ~6mins.
>
> Do we have any way to make writing this faster ?
>
>
>
> I don't want to do repartition and write as repartition will have overhead
> of shuffling .
>
>
>
> Please provide some inputs.
>
>
>
>
>
>


A handy tool called spark-column-analyser

2024-05-21 Thread Mich Talebzadeh
I just wanted to share a tool I built called *spark-column-analyzer*. It's
a Python package that helps you dig into your Spark DataFrames with ease.

Ever spend ages figuring out what's going on in your columns? Like, how
many null values are there, or how many unique entries? Built with data
preparation for Generative AI in mind, it aids in data imputation and
augmentation – key steps for creating realistic synthetic data.

*Basics*

   - *Effortless Column Analysis:* It calculates all the important stats
   you need for each column, like null counts, distinct values, percentages,
   and more. No more manual counting or head scratching!
   - *Simple to Use:* Just toss in your DataFrame and call the
   analyze_column function. Bam! Insights galore.
   - *Makes Data Cleaning easier:* Knowing your data's quality helps you
   clean it up way faster. This package helps you figure out where the missing
   values are hiding and how much variety you've got in each column.
   - *Detecting skewed columns*
   - *Open Source and Friendly:* Feel free to tinker, suggest improvements,
   or even contribute some code yourself! We love collaboration in the Spark
   community.

*Installation:*

Using pip from the link: https://pypi.org/project/spark-column-analyzer/


*pip install spark-column-analyzer*
Also you can clone the project from gitHub


*git clone https://github.com/michTalebzadeh/spark_column_analyzer.git
*

The details are in the attached RENAME file

Let me know what you think! Feedback is always welcome.

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


README.md
Description: Binary data

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: pyspark dataframe join with two different data type

2024-05-17 Thread Karthick Nk
Hi All,

I have tried the same result with pyspark and with SQL query by creating
with tempView, I could able to achieve whereas I have to do in the pyspark
code itself, Could you help on this

incoming_data = [["a"], ["b"], ["d"]]
column_names = ["column1"]
df = spark.createDataFrame(incoming_data, column_names)

view_data_df = spark.createDataFrame([(["a", "b", "c"], 1), (['f'], 2)],
['data_col', 'id'])

df.createOrReplaceTempView(f"id_table")
view_data_df.createOrReplaceTempView(f"view_data")

*%sql*
*select * from view_data*
*where exists (select 1 from id_table where array_contains(data_col,
column1))*

*Result:*

*data_col id*
*["a","b","c"] 1*

I need this equivalent SQL query with pyspark code to achieve the result.

One of the solution, I have tried is below, but here I am doing explode and
doing distinct again, But I need to perform the action without doing this
since this will impact performance again for the huge data.

Thanks,


solutions

On Thu, May 16, 2024 at 8:33 AM Karthick Nk  wrote:

> Thanks Mich,
>
> I have tried this solution, but i want all the columns from the dataframe
> df_1, if i explode the df_1 i am getting only data column. But the
> resultant should get the all the column from the df_1 with distinct result
> like below.
>
> Results in
>
> *df:*
> +---+
> |column1|
> +---+
> |  a|
> |  b|
> |  d|
> +---+
>
> *df_1:*
> +-+
> | id| data| field
> +-+
> |1 | [a, b, c]| ['company']
> | 3| [b, c, d]| ['hello']
> | 4| [e, f, s]| ['hello']
> +-+
>
> *Result:*
> ++
> |id| data| field
> ++
> |1| ['company']
> | 3|  ['helo']|
> ++
>
> Explanation: id with 1, 3 why -> because a or b present in both records 1
> and 3 so returning distinct result from the join.
>
>
> Here I would like to get the result like above, even if I get the
> duplicate element in the column data, I need to get the distinct data
> with respect to id field, But when I try to use array_contain, it will
> return duplicate result since data column has multiple occurrence.
>
> If you need more clarification, please let me know.
>
> Thanks,
>
>
>
>
>
>
> On Tue, May 14, 2024 at 6:12 PM Mich Talebzadeh 
> wrote:
>
>> You can use a combination of explode and distinct before joining.
>>
>> from pyspark.sql import SparkSession
>> from pyspark.sql.functions import explode
>>
>> # Create a SparkSession
>> spark = SparkSession.builder \
>> .appName("JoinExample") \
>> .getOrCreate()
>>
>> sc = spark.sparkContext
>> # Set the log level to ERROR to reduce verbosity
>> sc.setLogLevel("ERROR")
>>
>> # Create DataFrame df
>> data = [
>> ["a"],
>> ["b"],
>> ["d"],
>> ]
>> column_names = ["column1"]
>> df = spark.createDataFrame(data, column_names)
>> print("df:")
>> df.show()
>>
>> # Create DataFrame df_1
>> df_1 = spark.createDataFrame([(["a", "b", "c"],), ([],)], ['data'])
>> print("df_1:")
>> df_1.show()
>>
>> # Explode the array column in df_1
>> exploded_df_1 = df_1.select(explode("data").alias("data"))
>>
>> # Join with df using the exploded column
>> final_df = exploded_df_1.join(df, exploded_df_1.data == df.column1)
>>
>> # Distinct to ensure only unique rows are returned from df_1
>> final_df = final_df.select("data").distinct()
>>
>> print("Result:")
>> final_df.show()
>>
>>
>> Results in
>>
>> df:
>> +---+
>> |column1|
>> +---+
>> |  a|
>> |  b|
>> |  d|
>> +---+
>>
>> df_1:
>> +-+
>> | data|
>> +-+
>> |[a, b, c]|
>> |   []|
>> +-+
>>
>> Result:
>> ++
>> |data|
>> ++
>> |   a|
>> |   b|
>> ++
>>
>> HTH
>>
>> Mich Talebzadeh,
>>
>> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner
>> Von Braun
>> )".
>>
>>
>> On Tue, 14 May 2024 at 13:19, Karthick Nk  wrote:
>>
>>> Hi All,
>>>
>>> Could anyone have any idea or suggestion of any alternate way to achieve
>>> this scenario?
>>>
>>> Thanks.
>>>
>>> On Sat, May 11, 2024 at 6:55 AM Damien Hawes 
>>> wrote:
>>>
 Right now, with the structure of your data, it isn't possible.

 The rows aren't duplicates of each other. "a" and "b" both exist in the
 array. So Spark is correctly performing the join. It looks like you need to
 find another way to model this data to get what you want to achieve.

 Are the values of "a" and "b" related to each other in any way?

 - Damien

 Op vr 10 mei 2024 18:08 schreef Karthick Nk :

> Hi Mich,
>

Request for Assistance: Adding User Authentication to Apache Spark Application

2024-05-16 Thread NIKHIL RAJ SHRIVASTAVA
Dear Team,

I hope this email finds you well. My name is Nikhil Raj, and I am currently
working with Apache Spark for one of my projects , where through the help
of a parquet file we are creating an external table in Spark.

I am reaching out to seek assistance regarding user authentication for our
Apache Spark application. Currently, we can connect to the application
using only the host and port information. However, for security reasons, we
would like to implement user authentication to control access and ensure
data integrity.

After reviewing the available documentation and resources, I found that
adding user authentication to our Spark setup requires additional
configurations or plugins. However, I'm facing challenges in understanding
the exact steps or best practices to implement this.

Could you please provide guidance or point me towards relevant
documentation/resources that detail how to integrate user authentication
into Apache Spark?  Additionally, if there are any recommended practices or
considerations for ensuring the security of our Spark setup, we would
greatly appreciate your insights on that as well.

Your assistance in this matter would be invaluable to us, as we aim to
enhance the security of our Spark application and safeguard our data
effectively.

Thank you very much for your time and consideration. I look forward to
hearing from you and your suggestions.

Warm regards,

NIKHIL RAJ
Developer
Estuate Software Pvt. Ltd.
Thanks & Regards


Re: pyspark dataframe join with two different data type

2024-05-15 Thread Karthick Nk
Thanks Mich,

I have tried this solution, but i want all the columns from the dataframe
df_1, if i explode the df_1 i am getting only data column. But the
resultant should get the all the column from the df_1 with distinct result
like below.

Results in

*df:*
+---+
|column1|
+---+
|  a|
|  b|
|  d|
+---+

*df_1:*
+-+
| id| data| field
+-+
|1 | [a, b, c]| ['company']
| 3| [b, c, d]| ['hello']
| 4| [e, f, s]| ['hello']
+-+

*Result:*
++
|id| data| field
++
|1| ['company']
| 3|  ['helo']|
++

Explanation: id with 1, 3 why -> because a or b present in both records 1
and 3 so returning distinct result from the join.


Here I would like to get the result like above, even if I get the
duplicate element in the column data, I need to get the distinct data
with respect to id field, But when I try to use array_contain, it will
return duplicate result since data column has multiple occurrence.

If you need more clarification, please let me know.

Thanks,






On Tue, May 14, 2024 at 6:12 PM Mich Talebzadeh 
wrote:

> You can use a combination of explode and distinct before joining.
>
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import explode
>
> # Create a SparkSession
> spark = SparkSession.builder \
> .appName("JoinExample") \
> .getOrCreate()
>
> sc = spark.sparkContext
> # Set the log level to ERROR to reduce verbosity
> sc.setLogLevel("ERROR")
>
> # Create DataFrame df
> data = [
> ["a"],
> ["b"],
> ["d"],
> ]
> column_names = ["column1"]
> df = spark.createDataFrame(data, column_names)
> print("df:")
> df.show()
>
> # Create DataFrame df_1
> df_1 = spark.createDataFrame([(["a", "b", "c"],), ([],)], ['data'])
> print("df_1:")
> df_1.show()
>
> # Explode the array column in df_1
> exploded_df_1 = df_1.select(explode("data").alias("data"))
>
> # Join with df using the exploded column
> final_df = exploded_df_1.join(df, exploded_df_1.data == df.column1)
>
> # Distinct to ensure only unique rows are returned from df_1
> final_df = final_df.select("data").distinct()
>
> print("Result:")
> final_df.show()
>
>
> Results in
>
> df:
> +---+
> |column1|
> +---+
> |  a|
> |  b|
> |  d|
> +---+
>
> df_1:
> +-+
> | data|
> +-+
> |[a, b, c]|
> |   []|
> +-+
>
> Result:
> ++
> |data|
> ++
> |   a|
> |   b|
> ++
>
> HTH
>
> Mich Talebzadeh,
>
> Technologist | Architect | Data Engineer  | Generative AI | FinCrimeLondon
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Tue, 14 May 2024 at 13:19, Karthick Nk  wrote:
>
>> Hi All,
>>
>> Could anyone have any idea or suggestion of any alternate way to achieve
>> this scenario?
>>
>> Thanks.
>>
>> On Sat, May 11, 2024 at 6:55 AM Damien Hawes 
>> wrote:
>>
>>> Right now, with the structure of your data, it isn't possible.
>>>
>>> The rows aren't duplicates of each other. "a" and "b" both exist in the
>>> array. So Spark is correctly performing the join. It looks like you need to
>>> find another way to model this data to get what you want to achieve.
>>>
>>> Are the values of "a" and "b" related to each other in any way?
>>>
>>> - Damien
>>>
>>> Op vr 10 mei 2024 18:08 schreef Karthick Nk :
>>>
 Hi Mich,

 Thanks for the solution, But I am getting duplicate result by using
 array_contains. I have explained the scenario below, could you help me on
 that, how we can achieve i have tried different way bu i could able to
 achieve.

 For example

 data = [
 ["a"],
 ["b"],
 ["d"],
 ]
 column_names = ["column1"]
 df = spark.createDataFrame(data, column_names)
 df.display()

 [image: image.png]

 df_1 = spark.createDataFrame([(["a", "b", "c"],), ([],)], ['data'])
 df_1.display()
 [image: image.png]


 final_df = df_1.join(df, expr("array_contains(data, column1)"))
 final_df.display()

 Resul:
 [image: image.png]

 But i need the result like below:

 [image: image.png]

 Why because

 In the df_1 i have only two records, in that first records onlly i have
 matching value.
 But both records from the df i.e *a, b* are present in the first
 records itself, it is returning two records as resultant, but my
 expectation is to return only one records means if any of the records from
 the df is present in the df_1 it should return only one records from the
 df_1.


How to provide a Zstd "training mode" dictionary object

2024-05-15 Thread Saha, Daniel
Hi,

I understand that Zstd compression can optionally be provided a dictionary 
object to improve performance. See “training mode” here 
https://facebook.github.io/zstd/

Does Spark surface a way to provide this dictionary object when writing/reading 
data? What about for intermediate shuffle results?

Thanks,
Daniel


Query Regarding UDF Support in Spark Connect with Kubernetes as Cluster Manager

2024-05-15 Thread Nagatomi Yasukazu
Hi Spark Community,

I have a question regarding the support for User-Defined Functions (UDFs)
in Spark Connect, specifically when using Kubernetes as the Cluster Manager.

According to the Spark documentation, UDFs are supported by default for the
shell and in standalone applications with additional setup requirements.

However, it is not clear whether this support extends to scenarios where
Kubernetes is used as the Cluster Manager.

cf.
https://spark.apache.org/docs/latest/spark-connect-overview.html#:~:text=User%2DDefined%20Functions%20(UDFs)%20are%20supported%2C%20by%20default%20for%20the%20shell%20and%20in%20standalone%20applications%20with%20additional%20set%2Dup%20requirements
.

Could someone please clarify:

1. Are UDFs supported in Spark Connect when using Kubernetes as the Cluster
Manager?

2. If they are supported, are there any specific setup requirements or
limitations we should be aware of?

3. If UDFs are not currently supported with Kubernetes as the Cluster
Manager, are there any plans to include this support in future releases?

Your insights and guidance on this matter would be greatly appreciated.

Thank you in advance for your help!

Best regards,
Yasukazau


Re: pyspark dataframe join with two different data type

2024-05-14 Thread Mich Talebzadeh
You can use a combination of explode and distinct before joining.

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode

# Create a SparkSession
spark = SparkSession.builder \
.appName("JoinExample") \
.getOrCreate()

sc = spark.sparkContext
# Set the log level to ERROR to reduce verbosity
sc.setLogLevel("ERROR")

# Create DataFrame df
data = [
["a"],
["b"],
["d"],
]
column_names = ["column1"]
df = spark.createDataFrame(data, column_names)
print("df:")
df.show()

# Create DataFrame df_1
df_1 = spark.createDataFrame([(["a", "b", "c"],), ([],)], ['data'])
print("df_1:")
df_1.show()

# Explode the array column in df_1
exploded_df_1 = df_1.select(explode("data").alias("data"))

# Join with df using the exploded column
final_df = exploded_df_1.join(df, exploded_df_1.data == df.column1)

# Distinct to ensure only unique rows are returned from df_1
final_df = final_df.select("data").distinct()

print("Result:")
final_df.show()


Results in

df:
+---+
|column1|
+---+
|  a|
|  b|
|  d|
+---+

df_1:
+-+
| data|
+-+
|[a, b, c]|
|   []|
+-+

Result:
++
|data|
++
|   a|
|   b|
++

HTH

Mich Talebzadeh,

Technologist | Architect | Data Engineer  | Generative AI | FinCrimeLondon
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Tue, 14 May 2024 at 13:19, Karthick Nk  wrote:

> Hi All,
>
> Could anyone have any idea or suggestion of any alternate way to achieve
> this scenario?
>
> Thanks.
>
> On Sat, May 11, 2024 at 6:55 AM Damien Hawes 
> wrote:
>
>> Right now, with the structure of your data, it isn't possible.
>>
>> The rows aren't duplicates of each other. "a" and "b" both exist in the
>> array. So Spark is correctly performing the join. It looks like you need to
>> find another way to model this data to get what you want to achieve.
>>
>> Are the values of "a" and "b" related to each other in any way?
>>
>> - Damien
>>
>> Op vr 10 mei 2024 18:08 schreef Karthick Nk :
>>
>>> Hi Mich,
>>>
>>> Thanks for the solution, But I am getting duplicate result by using
>>> array_contains. I have explained the scenario below, could you help me on
>>> that, how we can achieve i have tried different way bu i could able to
>>> achieve.
>>>
>>> For example
>>>
>>> data = [
>>> ["a"],
>>> ["b"],
>>> ["d"],
>>> ]
>>> column_names = ["column1"]
>>> df = spark.createDataFrame(data, column_names)
>>> df.display()
>>>
>>> [image: image.png]
>>>
>>> df_1 = spark.createDataFrame([(["a", "b", "c"],), ([],)], ['data'])
>>> df_1.display()
>>> [image: image.png]
>>>
>>>
>>> final_df = df_1.join(df, expr("array_contains(data, column1)"))
>>> final_df.display()
>>>
>>> Resul:
>>> [image: image.png]
>>>
>>> But i need the result like below:
>>>
>>> [image: image.png]
>>>
>>> Why because
>>>
>>> In the df_1 i have only two records, in that first records onlly i have
>>> matching value.
>>> But both records from the df i.e *a, b* are present in the first
>>> records itself, it is returning two records as resultant, but my
>>> expectation is to return only one records means if any of the records from
>>> the df is present in the df_1 it should return only one records from the
>>> df_1.
>>>
>>> Note:
>>> 1. Here we are able to filter the duplicate records by using distinct of
>>> ID field in the resultant df, bu I am thinking that shouldn't be effective
>>> way, rather i am thinking of updating in array_contains steps itself.
>>>
>>> Thanks.
>>>
>>>
>>> On Fri, Mar 1, 2024 at 4:11 AM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>

 This is what you want, how to join two DFs with a string column in one
 and an array of strings in the other, keeping only rows where the
 string is present in the array.

 from pyspark.sql import SparkSession
 from pyspark.sql import Row
 from pyspark.sql.functions import expr

 spark = SparkSession.builder.appName("joins").getOrCreate()

 data1 = [Row(combined_id=[1, 2, 3])  # this one has a column
 combined_id as an array of integers
 data2 = [Row(mr_id=2), Row(mr_id=5)] # this one has column mr_id with
 single integers

 df1 = spark.createDataFrame(data1)
 df2 = spark.createDataFrame(data2)

 df1.printSchema()
 df2.printSchema()

 # Perform the join with array_contains. It takes two arguments: an
 array and a value. It returns True if the value exists as an element
 within the array, otherwise False.
 joined_df = df1.join(df2, 

Re: pyspark dataframe join with two different data type

2024-05-14 Thread Karthick Nk
Hi All,

Could anyone have any idea or suggestion of any alternate way to achieve
this scenario?

Thanks.

On Sat, May 11, 2024 at 6:55 AM Damien Hawes  wrote:

> Right now, with the structure of your data, it isn't possible.
>
> The rows aren't duplicates of each other. "a" and "b" both exist in the
> array. So Spark is correctly performing the join. It looks like you need to
> find another way to model this data to get what you want to achieve.
>
> Are the values of "a" and "b" related to each other in any way?
>
> - Damien
>
> Op vr 10 mei 2024 18:08 schreef Karthick Nk :
>
>> Hi Mich,
>>
>> Thanks for the solution, But I am getting duplicate result by using
>> array_contains. I have explained the scenario below, could you help me on
>> that, how we can achieve i have tried different way bu i could able to
>> achieve.
>>
>> For example
>>
>> data = [
>> ["a"],
>> ["b"],
>> ["d"],
>> ]
>> column_names = ["column1"]
>> df = spark.createDataFrame(data, column_names)
>> df.display()
>>
>> [image: image.png]
>>
>> df_1 = spark.createDataFrame([(["a", "b", "c"],), ([],)], ['data'])
>> df_1.display()
>> [image: image.png]
>>
>>
>> final_df = df_1.join(df, expr("array_contains(data, column1)"))
>> final_df.display()
>>
>> Resul:
>> [image: image.png]
>>
>> But i need the result like below:
>>
>> [image: image.png]
>>
>> Why because
>>
>> In the df_1 i have only two records, in that first records onlly i have
>> matching value.
>> But both records from the df i.e *a, b* are present in the first records
>> itself, it is returning two records as resultant, but my expectation is to
>> return only one records means if any of the records from the df is present
>> in the df_1 it should return only one records from the df_1.
>>
>> Note:
>> 1. Here we are able to filter the duplicate records by using distinct of
>> ID field in the resultant df, bu I am thinking that shouldn't be effective
>> way, rather i am thinking of updating in array_contains steps itself.
>>
>> Thanks.
>>
>>
>> On Fri, Mar 1, 2024 at 4:11 AM Mich Talebzadeh 
>> wrote:
>>
>>>
>>> This is what you want, how to join two DFs with a string column in one
>>> and an array of strings in the other, keeping only rows where the
>>> string is present in the array.
>>>
>>> from pyspark.sql import SparkSession
>>> from pyspark.sql import Row
>>> from pyspark.sql.functions import expr
>>>
>>> spark = SparkSession.builder.appName("joins").getOrCreate()
>>>
>>> data1 = [Row(combined_id=[1, 2, 3])  # this one has a column combined_id
>>> as an array of integers
>>> data2 = [Row(mr_id=2), Row(mr_id=5)] # this one has column mr_id with
>>> single integers
>>>
>>> df1 = spark.createDataFrame(data1)
>>> df2 = spark.createDataFrame(data2)
>>>
>>> df1.printSchema()
>>> df2.printSchema()
>>>
>>> # Perform the join with array_contains. It takes two arguments: an
>>> array and a value. It returns True if the value exists as an element
>>> within the array, otherwise False.
>>> joined_df = df1.join(df2, expr("array_contains(combined_id, mr_id)"))
>>>
>>> # Show the result
>>> joined_df.show()
>>>
>>> root
>>>  |-- combined_id: array (nullable = true)
>>>  ||-- element: long (containsNull = true)
>>>
>>> root
>>>  |-- mr_id: long (nullable = true)
>>>
>>> +---+-+
>>> |combined_id|mr_id|
>>> +---+-+
>>> |  [1, 2, 3]|2|
>>> |  [4, 5, 6]|5|
>>> +---+-+
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Dad | Technologist | Solutions Architect | Engineer
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* The information provided is correct to the best of my
>>> knowledge but of course cannot be guaranteed . It is essential to note
>>> that, as with any advice, quote "one test result is worth one-thousand
>>> expert opinions (Werner
>>> Von Braun
>>> )".
>>>
>>>
>>> On Thu, 29 Feb 2024 at 20:50, Karthick Nk  wrote:
>>>
 Hi All,

 I have two dataframe with below structure, i have to join these two
 dataframe - the scenario is one column is string in one dataframe and in
 other df join column is array of string, so we have to inner join two df
 and get the data if string value is present in any of the array of string
 value in another dataframe,


 df1 = spark.sql("""
 SELECT
 mr.id as mr_id,
 pv.id as pv_id,
 array(mr.id, pv.id) as combined_id
 FROM
 table1 mr
 INNER JOIN table2 pv ON pv.id = Mr.recordid
where
 pv.id = '35122806-4cd2-4916-a149-24ea55c2dc36'
 or pv.id = 'a5f03625-6cc5-49df-95eb-df741fe9139b'
 """)

 # df1.display()

 # Your second query
 df2 = spark.sql("""
 SELECT
 

Display a warning in EMR welcome screen

2024-05-11 Thread Abhishek Basu
Hi Team, for Elastic Map Reduce (EMR) cluster, it would be great if there is a 
warning message that Logging should be handled carefully and INFO or DEBUG 
should be enabled only when required. This logging thing took my whole day and 
lastly I discovered that it’s exploding the hdfs and not allowing it to perform 
the real task.

Thanks Abhishek


Sent from Yahoo Mail for iPhone


Re: [spark-graphframes]: Generating incorrect edges

2024-05-11 Thread Nijland, J.G.W. (Jelle, Student M-CS)
Hi all,

The issue is solved.
I conducted a lot more testing and built checkers to verify at which size it's 
going wrong.
When checking for specific edges, I could construct successful graphs up to 
261k records.
When verifying all edges created, is breaks somewhere in the 200-250k records.
I didn't bother finding the specific error threshold, as runs take up to 7 
minutes per slice.

I started looking at all underlying assumptions of my code along with my 
supervisor.
We located the problem in the generate_ids() function.
I selected all distinct values to give them an ID and subsequently joining 
those results back to the main DataFrame.
I replaced this by generating unique IDs for each value occurrence by hashing 
them with 'withColumn' rather than joining them back.
This resolved my issues and ended up to be a significant performance boost as 
well.

My fixed generate_ids() code
def generate_ids(df: DataFrame) -> DataFrame:
   """
   Generates a unique ID for each distinct maintainer, prefix, origin and 
organisation

   Parameters
   --
   df : DataFrame
   DataFrame to generate IDs for
   """
   df = df.withColumn(MAINTAINER_ID, psf.concat(psf.lit(PREFIX_M), 
psf.sha2(df.mnt_by, 256)))
   df = df.withColumn(PREFIX_ID, psf.concat(psf.lit(PREFIX_P), 
psf.sha2(df.prefix, 256)))
   df = df.withColumn(ORIGIN_ID, psf.concat(psf.lit(PREFIX_O), 
psf.sha2(df.origin, 256)))
   df = df.withColumn(ORGANISATION_ID, psf.concat(psf.lit(PREFIX_ORG), 
psf.sha2(df.descr, 256)))
   return df

Hope this email finds someone running into a similar issue in the future.

Kind regards,
Jelle




From: Mich Talebzadeh 
Sent: Wednesday, May 1, 2024 11:56 AM
To: Stephen Coy 
Cc: Nijland, J.G.W. (Jelle, Student M-CS) ; 
user@spark.apache.org 
Subject: Re: [spark-graphframes]: Generating incorrect edges

Hi Steve,

Thanks for your statement. I tend to use uuid myself to avoid collisions. This 
built-in function generates random IDs that are highly likely to be unique 
across systems. My concerns are on edge so to speak. If the Spark application 
runs for a very long time or encounters restarts, the 
monotonically_increasing_id() sequence might restart from the beginning. This 
could again cause duplicate IDs if other Spark applications are running 
concurrently or if data is processed across multiple runs of the same 
application..

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


 
[https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE]
   view my Linkedin 
profile


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge but 
of course cannot be guaranteed . It is essential to note that, as with any 
advice, quote "one test result is worth one-thousand expert opinions (Werner 
 Von 
Braun)".


On Wed, 1 May 2024 at 01:22, Stephen Coy 
mailto:s...@infomedia.com.au>> wrote:
Hi Mich,

I was just reading random questions on the user list when I noticed that you 
said:

On 25 Apr 2024, at 2:12 AM, Mich Talebzadeh 
mailto:mich.talebza...@gmail.com>> wrote:

1) You are using monotonically_increasing_id(), which is not 
collision-resistant in distributed environments like Spark. Multiple hosts
   can generate the same ID. I suggest switching to UUIDs (e.g., uuid.uuid4()) 
for guaranteed uniqueness.


It’s my understanding that the *Spark* `monotonically_increasing_id()` function 
exists for the exact purpose of generating a collision-resistant unique id 
across nodes on different hosts.
We use it extensively for this purpose and have never encountered an issue.

Are we wrong or are you thinking of a different (not Spark) function?

Cheers,

Steve C




This email contains confidential information of and is the copyright of 
Infomedia. It must not be forwarded, amended or disclosed without consent of 
the sender. If you received this message by mistake, please advise the sender 
and delete all copies. Security of transmission on the internet cannot be 
guaranteed, could be infected, intercepted, or corrupted and you should ensure 
you have suitable antivirus protection in place. By sending us your or any 
third party personal details, you consent to (or confirm you have obtained 
consent from such third parties) to Infomedia’s privacy policy. 
http://www.infomedia.com.au/privacy-policy/


unsubscribe

2024-05-10 Thread J UDAY KIRAN
unsubscribe


Re: pyspark dataframe join with two different data type

2024-05-10 Thread Damien Hawes
Right now, with the structure of your data, it isn't possible.

The rows aren't duplicates of each other. "a" and "b" both exist in the
array. So Spark is correctly performing the join. It looks like you need to
find another way to model this data to get what you want to achieve.

Are the values of "a" and "b" related to each other in any way?

- Damien

Op vr 10 mei 2024 18:08 schreef Karthick Nk :

> Hi Mich,
>
> Thanks for the solution, But I am getting duplicate result by using
> array_contains. I have explained the scenario below, could you help me on
> that, how we can achieve i have tried different way bu i could able to
> achieve.
>
> For example
>
> data = [
> ["a"],
> ["b"],
> ["d"],
> ]
> column_names = ["column1"]
> df = spark.createDataFrame(data, column_names)
> df.display()
>
> [image: image.png]
>
> df_1 = spark.createDataFrame([(["a", "b", "c"],), ([],)], ['data'])
> df_1.display()
> [image: image.png]
>
>
> final_df = df_1.join(df, expr("array_contains(data, column1)"))
> final_df.display()
>
> Resul:
> [image: image.png]
>
> But i need the result like below:
>
> [image: image.png]
>
> Why because
>
> In the df_1 i have only two records, in that first records onlly i have
> matching value.
> But both records from the df i.e *a, b* are present in the first records
> itself, it is returning two records as resultant, but my expectation is to
> return only one records means if any of the records from the df is present
> in the df_1 it should return only one records from the df_1.
>
> Note:
> 1. Here we are able to filter the duplicate records by using distinct of
> ID field in the resultant df, bu I am thinking that shouldn't be effective
> way, rather i am thinking of updating in array_contains steps itself.
>
> Thanks.
>
>
> On Fri, Mar 1, 2024 at 4:11 AM Mich Talebzadeh 
> wrote:
>
>>
>> This is what you want, how to join two DFs with a string column in one
>> and an array of strings in the other, keeping only rows where the string
>> is present in the array.
>>
>> from pyspark.sql import SparkSession
>> from pyspark.sql import Row
>> from pyspark.sql.functions import expr
>>
>> spark = SparkSession.builder.appName("joins").getOrCreate()
>>
>> data1 = [Row(combined_id=[1, 2, 3])  # this one has a column combined_id
>> as an array of integers
>> data2 = [Row(mr_id=2), Row(mr_id=5)] # this one has column mr_id with
>> single integers
>>
>> df1 = spark.createDataFrame(data1)
>> df2 = spark.createDataFrame(data2)
>>
>> df1.printSchema()
>> df2.printSchema()
>>
>> # Perform the join with array_contains. It takes two arguments: an array
>> and a value. It returns True if the value exists as an element within
>> the array, otherwise False.
>> joined_df = df1.join(df2, expr("array_contains(combined_id, mr_id)"))
>>
>> # Show the result
>> joined_df.show()
>>
>> root
>>  |-- combined_id: array (nullable = true)
>>  ||-- element: long (containsNull = true)
>>
>> root
>>  |-- mr_id: long (nullable = true)
>>
>> +---+-+
>> |combined_id|mr_id|
>> +---+-+
>> |  [1, 2, 3]|2|
>> |  [4, 5, 6]|5|
>> +---+-+
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Dad | Technologist | Solutions Architect | Engineer
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner
>> Von Braun
>> )".
>>
>>
>> On Thu, 29 Feb 2024 at 20:50, Karthick Nk  wrote:
>>
>>> Hi All,
>>>
>>> I have two dataframe with below structure, i have to join these two
>>> dataframe - the scenario is one column is string in one dataframe and in
>>> other df join column is array of string, so we have to inner join two df
>>> and get the data if string value is present in any of the array of string
>>> value in another dataframe,
>>>
>>>
>>> df1 = spark.sql("""
>>> SELECT
>>> mr.id as mr_id,
>>> pv.id as pv_id,
>>> array(mr.id, pv.id) as combined_id
>>> FROM
>>> table1 mr
>>> INNER JOIN table2 pv ON pv.id = Mr.recordid
>>>where
>>> pv.id = '35122806-4cd2-4916-a149-24ea55c2dc36'
>>> or pv.id = 'a5f03625-6cc5-49df-95eb-df741fe9139b'
>>> """)
>>>
>>> # df1.display()
>>>
>>> # Your second query
>>> df2 = spark.sql("""
>>> SELECT
>>> id
>>> FROM
>>> table2
>>> WHERE
>>> id = '35122806-4cd2-4916-a149-24ea55c2dc36'
>>>
>>> """)
>>>
>>>
>>>
>>> Result data:
>>> 35122806-4cd2-4916-a149-24ea55c2dc36 only, because this records alone is
>>> common between string and array of string value.
>>>
>>> Can you share the sample snippet, how we 

Re: pyspark dataframe join with two different data type

2024-05-10 Thread Karthick Nk
Hi Mich,

Thanks for the solution, But I am getting duplicate result by using
array_contains. I have explained the scenario below, could you help me on
that, how we can achieve i have tried different way bu i could able to
achieve.

For example

data = [
["a"],
["b"],
["d"],
]
column_names = ["column1"]
df = spark.createDataFrame(data, column_names)
df.display()

[image: image.png]

df_1 = spark.createDataFrame([(["a", "b", "c"],), ([],)], ['data'])
df_1.display()
[image: image.png]


final_df = df_1.join(df, expr("array_contains(data, column1)"))
final_df.display()

Resul:
[image: image.png]

But i need the result like below:

[image: image.png]

Why because

In the df_1 i have only two records, in that first records onlly i have
matching value.
But both records from the df i.e *a, b* are present in the first records
itself, it is returning two records as resultant, but my expectation is to
return only one records means if any of the records from the df is present
in the df_1 it should return only one records from the df_1.

Note:
1. Here we are able to filter the duplicate records by using distinct of ID
field in the resultant df, bu I am thinking that shouldn't be effective
way, rather i am thinking of updating in array_contains steps itself.

Thanks.


On Fri, Mar 1, 2024 at 4:11 AM Mich Talebzadeh 
wrote:

>
> This is what you want, how to join two DFs with a string column in one and
> an array of strings in the other, keeping only rows where the string is
> present in the array.
>
> from pyspark.sql import SparkSession
> from pyspark.sql import Row
> from pyspark.sql.functions import expr
>
> spark = SparkSession.builder.appName("joins").getOrCreate()
>
> data1 = [Row(combined_id=[1, 2, 3])  # this one has a column combined_id
> as an array of integers
> data2 = [Row(mr_id=2), Row(mr_id=5)] # this one has column mr_id with
> single integers
>
> df1 = spark.createDataFrame(data1)
> df2 = spark.createDataFrame(data2)
>
> df1.printSchema()
> df2.printSchema()
>
> # Perform the join with array_contains. It takes two arguments: an array
> and a value. It returns True if the value exists as an element within the
> array, otherwise False.
> joined_df = df1.join(df2, expr("array_contains(combined_id, mr_id)"))
>
> # Show the result
> joined_df.show()
>
> root
>  |-- combined_id: array (nullable = true)
>  ||-- element: long (containsNull = true)
>
> root
>  |-- mr_id: long (nullable = true)
>
> +---+-+
> |combined_id|mr_id|
> +---+-+
> |  [1, 2, 3]|2|
> |  [4, 5, 6]|5|
> +---+-+
>
> HTH
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Thu, 29 Feb 2024 at 20:50, Karthick Nk  wrote:
>
>> Hi All,
>>
>> I have two dataframe with below structure, i have to join these two
>> dataframe - the scenario is one column is string in one dataframe and in
>> other df join column is array of string, so we have to inner join two df
>> and get the data if string value is present in any of the array of string
>> value in another dataframe,
>>
>>
>> df1 = spark.sql("""
>> SELECT
>> mr.id as mr_id,
>> pv.id as pv_id,
>> array(mr.id, pv.id) as combined_id
>> FROM
>> table1 mr
>> INNER JOIN table2 pv ON pv.id = Mr.recordid
>>where
>> pv.id = '35122806-4cd2-4916-a149-24ea55c2dc36'
>> or pv.id = 'a5f03625-6cc5-49df-95eb-df741fe9139b'
>> """)
>>
>> # df1.display()
>>
>> # Your second query
>> df2 = spark.sql("""
>> SELECT
>> id
>> FROM
>> table2
>> WHERE
>> id = '35122806-4cd2-4916-a149-24ea55c2dc36'
>>
>> """)
>>
>>
>>
>> Result data:
>> 35122806-4cd2-4916-a149-24ea55c2dc36 only, because this records alone is
>> common between string and array of string value.
>>
>> Can you share the sample snippet, how we can do the join for this two
>> different datatype in the dataframe.
>>
>> if any clarification needed, pls feel free to ask.
>>
>> Thanks
>>
>>


Spark 3.5.x on Java 21?

2024-05-08 Thread Stephen Coy
Hi everyone,

We’re about to upgrade our Spark clusters from Java 11 and Spark 3.2.1 to Spark 
3.5.1.

I know that 3.5.1 is supposed to be fine on Java 17, but will it run OK on Java 
21?

Thanks,

Steve C


This email contains confidential information of and is the copyright of 
Infomedia. It must not be forwarded, amended or disclosed without consent of 
the sender. If you received this message by mistake, please advise the sender 
and delete all copies. Security of transmission on the internet cannot be 
guaranteed, could be infected, intercepted, or corrupted and you should ensure 
you have suitable antivirus protection in place. By sending us your or any 
third party personal details, you consent to (or confirm you have obtained 
consent from such third parties) to Infomedia’s privacy policy. 
http://www.infomedia.com.au/privacy-policy/


Re: [Spark Streaming]: Save the records that are dropped by watermarking in spark structured streaming

2024-05-08 Thread Mich Talebzadeh
you may consider

- Increase Watermark Retention: Consider increasing the watermark retention
duration. This allows keeping records for a longer period before dropping
them. However, this might increase processing latency and violate
at-least-once semantics if the watermark lags behind real-time.

OR

- Use a separate stream for dropped records: Create a separate streaming
pipeline to process the dropped records. Try:


   - Filter: Filter out records older than the watermark in the main
   pipeline.  say

   resultC = streamingDataFrame.select( \
 col("parsed_value.rowkey").alias("rowkey") \
   , col("parsed_value.timestamp").alias("timestamp") \
   , col("parsed_value.temperature").alias("temperature"))

"""
We work out the window and the AVG(temperature) in the window's
timeframe below
This should return back the following Dataframe as struct

 root
 |-- window: struct (nullable = false)
 ||-- start: timestamp (nullable = true)
 ||-- end: timestamp (nullable = true)
 |-- avg(temperature): double (nullable = true)

"""
resultM = resultC. \
 *withWatermark("timestamp", "5 minutes").* \
 groupBy(window(resultC.timestamp, "5 minutes", "5
minutes")). \
 avg('temperature')

   - Write to Sink: Write the filtered records (dropped records) to a
   separate Kafka topic.
   - Consume and Store: Consume the dropped records topic with another
   streaming job and store them in a Postgres table or S3 using lib


HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Wed, 8 May 2024 at 05:13, Nandha Kumar  wrote:

> Hi Team,
>We are trying to use *spark structured streaming *for our use
> case. We will be joining 2 streaming sources(from kafka topic) with
> watermarks. As time progresses, the records that are prior to the watermark
> timestamp are removed from the state. For our use case, we want to *store
> these dropped records* in some postgres table or s3.
>
> When searching, we found a similar question
> in
> StackOverflow which is unanswered.
> *We would like to know how to store these dropped records due to the
> watermark.*
>


Spark not creating staging dir for insertInto partitioned table

2024-05-07 Thread Sanskar Modi
Hi Folks,

I wanted to check why spark doesn't create staging dir while doing an
insertInto on partitioned tables. I'm running below example code –
```
spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")

val rdd = sc.parallelize(Seq((1, 5, 1), (2, 1, 2), (4, 4, 3)))
val df = spark.createDataFrame(rdd)
df.write.insertInto("testing_table") // testing table is partitioned on "_1"
```
In this scenario FileOutputCommitter considers table path as output path
and creates temporary folders like
`/testing_table/_temporary/0` and then moves them to the
partition location when the job commit happens.

But in-case if multiple parallel apps are inserting into the same
partition, this can cause race condition issues while deleting the
`_temporary` dir. Ideally for each app there should be a unique staging dir
where the job should write its output.

Is there any specific reason for this? or am i missing something here?
Thanks for your time and assistance regarding this!

Kind regards
Sanskar


[Spark Streaming]: Save the records that are dropped by watermarking in spark structured streaming

2024-05-07 Thread Nandha Kumar
Hi Team,
   We are trying to use *spark structured streaming *for our use case.
We will be joining 2 streaming sources(from kafka topic) with watermarks.
As time progresses, the records that are prior to the watermark timestamp
are removed from the state. For our use case, we want to *store these
dropped records* in some postgres table or s3.

When searching, we found a similar question
in
StackOverflow which is unanswered.
*We would like to know how to store these dropped records due to the
watermark.*


unsubscribe

2024-05-07 Thread Wojciech Bombik
unsubscribe


unsubscribe

2024-05-06 Thread Moise
unsubscribe


Re: ********Spark streaming issue to Elastic data**********

2024-05-06 Thread Mich Talebzadeh
Hi Kartrick,

Unfortunately Materialised views are not available in Spark as yet. I
raised Jira [SPARK-48117] Spark Materialized Views: Improve Query
Performance and Data Management - ASF JIRA (apache.org)
 as a feature request.

Let me think about another way and revert

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Mon, 6 May 2024 at 07:54, Karthick Nk  wrote:

> Thanks Mich,
>
> can you please confirm me is my understanding correct?
>
> First, we have to create the materialized view based on the mapping
> details we have by using multiple tables as source(since we have multiple
> join condition from different tables). From the materialised view we can
> stream the view data into elastic index by using cdc?
>
> Thanks in advance.
>
> On Fri, May 3, 2024 at 3:39 PM Mich Talebzadeh 
> wrote:
>
>> My recommendation! is using materialized views (MVs) created in Hive with
>> Spark Structured Streaming and Change Data Capture (CDC) is a good
>> combination for efficiently streaming view data updates in your scenario.
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner
>> Von Braun
>> )".
>>
>>
>> On Thu, 2 May 2024 at 21:25, Karthick Nk  wrote:
>>
>>> Hi All,
>>>
>>> Requirements:
>>> I am working on the data flow, which will use the view definition(view
>>> definition already defined in schema), there are multiple tables used in
>>> the view definition. Here we want to stream the view data into elastic
>>> index based on if any of the table(used in the view definition) data got
>>> changed.
>>>
>>>
>>> Current flow:
>>> 1. we are inserting id's from the table(which used in the view
>>> definition) into the common table.
>>> 2. From the common table by using the id, we will be streaming the view
>>> data (by using if any of the incomming id is present in the collective id
>>> of all tables used from view definition) by using spark structured
>>> streaming.
>>>
>>>
>>> Issue:
>>> 1. Here we are facing issue - For each incomming id here we running view
>>> definition(so it will read all the data from all the data) and check if any
>>> of the incomming id is present in the collective id's of view result, Due
>>> to which it is taking more memory in the cluster driver and taking more
>>> time to process.
>>>
>>>
>>> I am epxpecting an alternate solution, if we can avoid full scan of view
>>> definition every time, If you have any alternate deisgn flow how we can
>>> achieve the result, please suggest for the same.
>>>
>>>
>>> Note: Also, it will be helpfull, if you can share the details like
>>> community forum or platform to discuss this kind of deisgn related topics,
>>> it will be more helpfull.
>>>
>>


Re: ********Spark streaming issue to Elastic data**********

2024-05-06 Thread Karthick Nk
Thanks Mich,

can you please confirm me is my understanding correct?

First, we have to create the materialized view based on the mapping details
we have by using multiple tables as source(since we have multiple
join condition from different tables). From the materialised view we can
stream the view data into elastic index by using cdc?

Thanks in advance.

On Fri, May 3, 2024 at 3:39 PM Mich Talebzadeh 
wrote:

> My recommendation! is using materialized views (MVs) created in Hive with
> Spark Structured Streaming and Change Data Capture (CDC) is a good
> combination for efficiently streaming view data updates in your scenario.
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Thu, 2 May 2024 at 21:25, Karthick Nk  wrote:
>
>> Hi All,
>>
>> Requirements:
>> I am working on the data flow, which will use the view definition(view
>> definition already defined in schema), there are multiple tables used in
>> the view definition. Here we want to stream the view data into elastic
>> index based on if any of the table(used in the view definition) data got
>> changed.
>>
>>
>> Current flow:
>> 1. we are inserting id's from the table(which used in the view
>> definition) into the common table.
>> 2. From the common table by using the id, we will be streaming the view
>> data (by using if any of the incomming id is present in the collective id
>> of all tables used from view definition) by using spark structured
>> streaming.
>>
>>
>> Issue:
>> 1. Here we are facing issue - For each incomming id here we running view
>> definition(so it will read all the data from all the data) and check if any
>> of the incomming id is present in the collective id's of view result, Due
>> to which it is taking more memory in the cluster driver and taking more
>> time to process.
>>
>>
>> I am epxpecting an alternate solution, if we can avoid full scan of view
>> definition every time, If you have any alternate deisgn flow how we can
>> achieve the result, please suggest for the same.
>>
>>
>> Note: Also, it will be helpfull, if you can share the details like
>> community forum or platform to discuss this kind of deisgn related topics,
>> it will be more helpfull.
>>
>


unsubscribe

2024-05-04 Thread chen...@birdiexx.com

unsubscribe


unsubscribe

2024-05-03 Thread Bing



 Replied Message 
| From | Wood Super |
| Date | 05/01/2024 07:49 |
| To | user  |
| Subject | unsubscribe |
unsubscribe


Spark Materialized Views: Improve Query Performance and Data Management

2024-05-03 Thread Mich Talebzadeh
Hi,

I have raised a ticket SPARK-48117
 for enhancing Spark
capabilities with Materialised Views (MV). Currently both Hive and
Databricks support this. I have added these potential benefits  to the
ticket

-* Improved Query Performance (especially for Streaming Data):*
Materialized Views can significantly improve query performance,
particularly for use cases involving Spark Structured Streaming. When
dealing with continuous data streams, materialized views can pre-compute
and store frequently accessed aggregations or transformations. Subsequent
queries on the materialized view can retrieve the results much faster
compared to continuously processing the entire streaming data. This is
crucial for real-time analytics where low latency is essential.
*Enhancing Data Management:* They offer a way to pre-aggregate or transform
data, making complex queries more efficient.
- *Reduced Data Movement*: Materialized Views can be materialized on
specific clusters or storage locations closer to where the data will be
consumed. This minimizes data movement across the network, further
improving query performance and reducing overall processing time.
- *Simplified Workflows:* Developers and analysts can leverage pre-defined
Materialized Views that represent specific business logic or data subsets.
This simplifies data access, reduces development time for queries that rely
on these views, and fosters code reuse.

Please have a look at the ticket and add your comments.

Thanks

Mich Talebzadeh,

Technologist | Architect | Data Engineer  | Generative AI | FinCrime

London
United Kingdom


   view my Linkedin profile


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge
but of course cannot be guaranteed . It is essential to note that, as with
any advice, quote "one test result is worth one-thousand expert opinions
(Werner Von Braun)".


Re: Issue with Materialized Views in Spark SQL

2024-05-03 Thread Mich Talebzadeh
Sadly Apache Spark sounds like it has nothing to do within materialised
views. I was hoping it could read it!

>>> *spark.sql("SELECT * FROM test.mv ").show()*
Traceback (most recent call last):
  File "", line 1, in 
  File "/opt/spark/python/pyspark/sql/session.py", line 1440, in sql
return DataFrame(self._jsparkSession.sql(sqlQuery, litArgs), self)
  File
"/usr/src/Python-3.9.16/venv/venv3.9/lib/python3.9/site-packages/py4j/java_gateway.py",
line 1321, in __call__
return_value = get_return_value(
  File "/opt/spark/python/pyspark/errors/exceptions/captured.py", line 175,
in deco
raise converted from None
*Pyspark.errors.exceptions.captured.AnalysisException: Hive materialized
view is not supported.*


HTH

Mch Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Fri, 3 May 2024 at 11:03, Mich Talebzadeh 
wrote:

> Thanks for the comments I received.
>
> So in summary, Apache Spark itself doesn't directly manage materialized
> views,(MV)  but it can work with them through integration with the
> underlying data storage systems like Hive or through iceberg. I believe
> databricks through unity catalog support MVs as well.
>
> Moreover, there is a case for supporting MVs. However, Spark can utilize
> materialized views even though it doesn't directly manage them.. This came
> about because someone in the Spark user forum enquired about "Spark
> streaming issue to Elastic data*". One option I thought of was that uUsing
> materialized views with Spark Structured Streaming and Change Data Capture
> (CDC) is a potential solution for efficiently streaming view data updates
> in this scenario. .
>
>
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Fri, 3 May 2024 at 00:54, Mich Talebzadeh 
> wrote:
>
>> An issue I encountered while working with Materialized Views in Spark
>> SQL. It appears that there is an inconsistency between the behavior of
>> Materialized Views in Spark SQL and Hive.
>>
>> When attempting to execute a statement like DROP MATERIALIZED VIEW IF
>> EXISTS test.mv in Spark SQL, I encountered a syntax error indicating
>> that the keyword MATERIALIZED is not recognized. However, the same
>> statement executes successfully in Hive without any errors.
>>
>> pyspark.errors.exceptions.captured.ParseException:
>> [PARSE_SYNTAX_ERROR] Syntax error at or near 'MATERIALIZED'.(line 1, pos
>> 5)
>>
>> == SQL ==
>> DROP MATERIALIZED VIEW IF EXISTS test.mv
>> -^^^
>>
>> Here are the versions I am using:
>>
>>
>>
>> *Hive: 3.1.1Spark: 3.4*
>> my Spark session:
>>
>> spark = SparkSession.builder \
>>   .appName("test") \
>>   .enableHiveSupport() \
>>   .getOrCreate()
>>
>> Has anyone seen this behaviour or encountered a similar issue or if there
>> are any insights into why this discrepancy exists between Spark SQL and
>> Hive.
>>
>> Thanks
>>
>> Mich Talebzadeh,
>>
>> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>>
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> Disclaimer: The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner Von Braun)".
>>
>


Help needed optimize spark history server performance

2024-05-03 Thread Vikas Tharyani
Dear Spark Community,

I'm writing to seek your expertise in optimizing the performance of our
Spark History Server (SHS) deployed on Amazon EKS. We're encountering
timeouts (HTTP 504) when loading large event logs exceeding 5 GB.

*Our Setup:*

   - Deployment: SHS on EKS with Nginx ingress (idle connection timeout: 60
   seconds)
   - Instance: Memory-optimized with sufficient RAM and CPU
   - Spark Daemon Memory: 30 GB
   - Spark History Server Options:
   - K8s Namespace has a limit of 128Gb
   - The backend S3 has a lifecycle policy to delete objects that are older
   than *7 days*.

sparkHistoryOpts:
"-Dspark.history.fs.logDirectory=s3a:///eks-infra-use1/
-Dspark.history.retainedApplications=1
-Dspark.history.ui.maxApplications=20
-Dspark.history.store.serializer=PROTOBUF
-Dspark.hadoop.fs.s3a.threads.max=25
-Dspark.hadoop.fs.s3a.connection.maximum=650
-Dspark.hadoop.fs.s3a.readahead.range=512K
-Dspark.history.fs.endEventReparseChunkSize=2m
-Dspark.history.store.maxDiskUsage=30g"

*Problem:*

   - SHS times out when loading large event logs (8 GB or more).

*Request:*

We would greatly appreciate any insights or suggestions you may have to
improve the performance of our SHS and prevent these timeouts. Here are
some areas we're particularly interested in exploring:

   - Are there additional configuration options we should consider for
   handling large event logs?
   - Could Nginx configuration adjustments help with timeouts?
   - Are there best practices for optimizing SHS performance on EKS?

We appreciate any assistance you can provide.

Thank you for your time and support.

Sincerely,
-- 

Vikas Tharyani

Associate Manager, DevOps

Nielsen

www.nielsen.com 





Re: ********Spark streaming issue to Elastic data**********

2024-05-03 Thread Mich Talebzadeh
My recommendation! is using materialized views (MVs) created in Hive with
Spark Structured Streaming and Change Data Capture (CDC) is a good
combination for efficiently streaming view data updates in your scenario.

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Thu, 2 May 2024 at 21:25, Karthick Nk  wrote:

> Hi All,
>
> Requirements:
> I am working on the data flow, which will use the view definition(view
> definition already defined in schema), there are multiple tables used in
> the view definition. Here we want to stream the view data into elastic
> index based on if any of the table(used in the view definition) data got
> changed.
>
>
> Current flow:
> 1. we are inserting id's from the table(which used in the view definition)
> into the common table.
> 2. From the common table by using the id, we will be streaming the view
> data (by using if any of the incomming id is present in the collective id
> of all tables used from view definition) by using spark structured
> streaming.
>
>
> Issue:
> 1. Here we are facing issue - For each incomming id here we running view
> definition(so it will read all the data from all the data) and check if any
> of the incomming id is present in the collective id's of view result, Due
> to which it is taking more memory in the cluster driver and taking more
> time to process.
>
>
> I am epxpecting an alternate solution, if we can avoid full scan of view
> definition every time, If you have any alternate deisgn flow how we can
> achieve the result, please suggest for the same.
>
>
> Note: Also, it will be helpfull, if you can share the details like
> community forum or platform to discuss this kind of deisgn related topics,
> it will be more helpfull.
>


Re: Issue with Materialized Views in Spark SQL

2024-05-03 Thread Mich Talebzadeh
Thanks for the comments I received.

So in summary, Apache Spark itself doesn't directly manage materialized
views,(MV)  but it can work with them through integration with the
underlying data storage systems like Hive or through iceberg. I believe
databricks through unity catalog support MVs as well.

Moreover, there is a case for supporting MVs. However, Spark can utilize
materialized views even though it doesn't directly manage them.. This came
about because someone in the Spark user forum enquired about "Spark
streaming issue to Elastic data*". One option I thought of was that uUsing
materialized views with Spark Structured Streaming and Change Data Capture
(CDC) is a potential solution for efficiently streaming view data updates
in this scenario. .


Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Fri, 3 May 2024 at 00:54, Mich Talebzadeh 
wrote:

> An issue I encountered while working with Materialized Views in Spark SQL.
> It appears that there is an inconsistency between the behavior of
> Materialized Views in Spark SQL and Hive.
>
> When attempting to execute a statement like DROP MATERIALIZED VIEW IF
> EXISTS test.mv in Spark SQL, I encountered a syntax error indicating that
> the keyword MATERIALIZED is not recognized. However, the same statement
> executes successfully in Hive without any errors.
>
> pyspark.errors.exceptions.captured.ParseException:
> [PARSE_SYNTAX_ERROR] Syntax error at or near 'MATERIALIZED'.(line 1, pos 5)
>
> == SQL ==
> DROP MATERIALIZED VIEW IF EXISTS test.mv
> -^^^
>
> Here are the versions I am using:
>
>
>
> *Hive: 3.1.1Spark: 3.4*
> my Spark session:
>
> spark = SparkSession.builder \
>   .appName("test") \
>   .enableHiveSupport() \
>   .getOrCreate()
>
> Has anyone seen this behaviour or encountered a similar issue or if there
> are any insights into why this discrepancy exists between Spark SQL and
> Hive.
>
> Thanks
>
> Mich Talebzadeh,
>
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>
> London
> United Kingdom
>
>
>view my Linkedin profile
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> Disclaimer: The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner Von Braun)".
>


Re: Issue with Materialized Views in Spark SQL

2024-05-02 Thread Jungtaek Lim
(removing dev@ as I don't think this is dev@ related thread but more about
"question")

My understanding is that Apache Spark does not support Materialized View.
That's all. IMHO it's not a proper expectation that all operations in
Apache Hive will be supported in Apache Spark. They are different projects
and Apache Spark does not aim to be 100% compatible with Apache Hive. There
was a time the community tried to provide some sort of compatibility, but
both projects are 10+ years old, and mature enough to have their own
roadmap to drive.

That said, that's not a bug or an issue. You can initiate a feature request
and wish the community to include that into the roadmap.

On Fri, May 3, 2024 at 12:01 PM Mich Talebzadeh 
wrote:

> An issue I encountered while working with Materialized Views in Spark SQL.
> It appears that there is an inconsistency between the behavior of
> Materialized Views in Spark SQL and Hive.
>
> When attempting to execute a statement like DROP MATERIALIZED VIEW IF
> EXISTS test.mv in Spark SQL, I encountered a syntax error indicating that
> the keyword MATERIALIZED is not recognized. However, the same statement
> executes successfully in Hive without any errors.
>
> pyspark.errors.exceptions.captured.ParseException:
> [PARSE_SYNTAX_ERROR] Syntax error at or near 'MATERIALIZED'.(line 1, pos 5)
>
> == SQL ==
> DROP MATERIALIZED VIEW IF EXISTS test.mv
> -^^^
>
> Here are the versions I am using:
>
>
>
> *Hive: 3.1.1Spark: 3.4*
> my Spark session:
>
> spark = SparkSession.builder \
>   .appName("test") \
>   .enableHiveSupport() \
>   .getOrCreate()
>
> Has anyone seen this behaviour or encountered a similar issue or if there
> are any insights into why this discrepancy exists between Spark SQL and
> Hive.
>
> Thanks
>
> Mich Talebzadeh,
>
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>
> London
> United Kingdom
>
>
>view my Linkedin profile
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> Disclaimer: The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner Von Braun)".
>


Re: Issue with Materialized Views in Spark SQL

2024-05-02 Thread Walaa Eldin Moustafa
I do not think the issue is with DROP MATERIALIZED VIEW only, but also with
CREATE MATERIALIZED VIEW, because neither is supported in Spark. I guess
you must have created the view from Hive and are trying to drop it from
Spark and that is why you are running to the issue with DROP first.

There is some work in the Iceberg community to add the support to Spark
through SQL extensions, and Iceberg support for views and
materialization tables. Some recent discussions can be found here [1] along
with a WIP Iceberg-Spark PR.

[1] https://lists.apache.org/thread/rotmqzmwk5jrcsyxhzjhrvcjs5v3yjcc

Thanks,
Walaa.

On Thu, May 2, 2024 at 4:55 PM Mich Talebzadeh 
wrote:

> An issue I encountered while working with Materialized Views in Spark SQL.
> It appears that there is an inconsistency between the behavior of
> Materialized Views in Spark SQL and Hive.
>
> When attempting to execute a statement like DROP MATERIALIZED VIEW IF
> EXISTS test.mv in Spark SQL, I encountered a syntax error indicating that
> the keyword MATERIALIZED is not recognized. However, the same statement
> executes successfully in Hive without any errors.
>
> pyspark.errors.exceptions.captured.ParseException:
> [PARSE_SYNTAX_ERROR] Syntax error at or near 'MATERIALIZED'.(line 1, pos 5)
>
> == SQL ==
> DROP MATERIALIZED VIEW IF EXISTS test.mv
> -^^^
>
> Here are the versions I am using:
>
>
>
> *Hive: 3.1.1Spark: 3.4*
> my Spark session:
>
> spark = SparkSession.builder \
>   .appName("test") \
>   .enableHiveSupport() \
>   .getOrCreate()
>
> Has anyone seen this behaviour or encountered a similar issue or if there
> are any insights into why this discrepancy exists between Spark SQL and
> Hive.
>
> Thanks
>
> Mich Talebzadeh,
>
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>
> London
> United Kingdom
>
>
>view my Linkedin profile
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> Disclaimer: The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner Von Braun)".
>


Issue with Materialized Views in Spark SQL

2024-05-02 Thread Mich Talebzadeh
An issue I encountered while working with Materialized Views in Spark SQL.
It appears that there is an inconsistency between the behavior of
Materialized Views in Spark SQL and Hive.

When attempting to execute a statement like DROP MATERIALIZED VIEW IF
EXISTS test.mv in Spark SQL, I encountered a syntax error indicating that
the keyword MATERIALIZED is not recognized. However, the same statement
executes successfully in Hive without any errors.

pyspark.errors.exceptions.captured.ParseException:
[PARSE_SYNTAX_ERROR] Syntax error at or near 'MATERIALIZED'.(line 1, pos 5)

== SQL ==
DROP MATERIALIZED VIEW IF EXISTS test.mv
-^^^

Here are the versions I am using:



*Hive: 3.1.1Spark: 3.4*
my Spark session:

spark = SparkSession.builder \
  .appName("test") \
  .enableHiveSupport() \
  .getOrCreate()

Has anyone seen this behaviour or encountered a similar issue or if there
are any insights into why this discrepancy exists between Spark SQL and
Hive.

Thanks

Mich Talebzadeh,

Technologist | Architect | Data Engineer  | Generative AI | FinCrime

London
United Kingdom


   view my Linkedin profile


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge
but of course cannot be guaranteed . It is essential to note that, as with
any advice, quote "one test result is worth one-thousand expert opinions
(Werner Von Braun)".


********Spark streaming issue to Elastic data**********

2024-05-02 Thread Karthick Nk
Hi All,

Requirements:
I am working on the data flow, which will use the view definition(view
definition already defined in schema), there are multiple tables used in
the view definition. Here we want to stream the view data into elastic
index based on if any of the table(used in the view definition) data got
changed.


Current flow:
1. we are inserting id's from the table(which used in the view definition)
into the common table.
2. From the common table by using the id, we will be streaming the view
data (by using if any of the incomming id is present in the collective id
of all tables used from view definition) by using spark structured
streaming.


Issue:
1. Here we are facing issue - For each incomming id here we running view
definition(so it will read all the data from all the data) and check if any
of the incomming id is present in the collective id's of view result, Due
to which it is taking more memory in the cluster driver and taking more
time to process.


I am epxpecting an alternate solution, if we can avoid full scan of view
definition every time, If you have any alternate deisgn flow how we can
achieve the result, please suggest for the same.


Note: Also, it will be helpfull, if you can share the details like
community forum or platform to discuss this kind of deisgn related topics,
it will be more helpfull.


unsubscribe

2024-05-01 Thread Nebi Aydin
unsubscribe


unsubscribe

2024-05-01 Thread Atakala Selam
unsubscribe


unsubscribe

2024-05-01 Thread Yoel Benharrous
unsubscribe


Traceback is missing content in pyspark when invoked with UDF

2024-05-01 Thread Indivar Mishra
Hi

*Tl;Dr:* I have a scenario where I generate code string on fly and execute
that code, now for me if an error occurs I need the traceback but for
executable code I just get partial traceback i.e. the line which caused the
error is missing.

Consider below MRC:
def fun():
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

spark = SparkSession.builder.appName("some_name").getOrCreate()

columns = ["Seqno", "Name"]
data = [("1", "john jones"), ("2", "tracey smith"), ("3", "amy sanders"
)]

df = spark.createDataFrame(data=data, schema=columns)

def errror_func(str):
def internal_error_method():
raise RuntimeError

return internal_error_method()

# Converting function to UDF
errror_func_udf = udf(lambda z: errror_func(z), StringType())

df.select(col("Seqno"), errror_func_udf(col("Name")).alias("Name")).show
(truncate=False)

fun()


This gives below shown Traceback, (Notice we are also getting the line
content that caused error

> Traceback (most recent call last):
>
>   File "temp.py", line 28, in 
>
> fun()
>
>   File "temp.py", line 25, in fun
>
> df.select(col("Seqno"),
>> errror_func_udf(col("Name")).alias("Name")).show(truncate=False)
>
>   File
>> "/home/indivar/corridor/code/corridor-platforms/venv/lib/python3.8/site-packages/pyspark/sql/dataframe.py",
>> line 502, in show
>
> print(self._jdf.showString(n, int_truncate, vertical))
>
>   File
>> "/home/indivar/corridor/code/corridor-platforms/venv/lib/python3.8/site-packages/py4j/java_gateway.py",
>> line 1321, in __call__
>
> return_value = get_return_value(
>
>   File
>> "/home/indivar/corridor/code/corridor-platforms/venv/lib/python3.8/site-packages/pyspark/sql/utils.py",
>> line 117, in deco
>
> raise converted from None
>
> pyspark.sql.utils.PythonException:
>
>   An exception was thrown from the Python worker. Please see the stack
>> trace below.
>
> Traceback (most recent call last):
>
>   File "temp.py", line 23, in 
>
> errror_func_udf = udf(lambda z: errror_func(z), StringType())
>
>   File "temp.py", line 20, in errror_func
>
> return internal_error_method()
>
>   File "temp.py", line 18, in internal_error_method
>
> raise RuntimeError
>
> RuntimeError
>
>
>
But now if i run the same code by doing an exec i loose the traceback line
content although line number is there
import linecache

code = """
def fun():
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

spark = SparkSession.builder.appName("some_name").getOrCreate()

columns = ["Seqno", "Name"]
data = [("1", "john jones"), ("2", "tracey smith"), ("3", "amy
sanders")]

df = spark.createDataFrame(data=data, schema=columns)

def errror_func(str):
def internal_error_method():
raise RuntimeError

return internal_error_method()

# Converting function to UDF
errror_func_udf = udf(lambda z: errror_func(z), StringType())

df.select(col("Seqno"),
errror_func_udf(col("Name")).alias("Name")).show(truncate=False)
"""


scope = {}
filename = ""
compiled_code = compile(code, filename, "exec")
if filename not in linecache.cache:
linecache.cache[filename] = (
len(scope),
None,
code.splitlines(keepends=True),
filename,
)
exec(compiled_code, scope, scope)
fun = scope["fun"]

fun()


Traceback of this code is

> Traceback (most recent call last):
>
>   File "temp.py", line 74, in 
>
> fun()
>
>   File "", line 23, in fun
>
>   File
>> "/home/indivar/corridor/code/corridor-platforms/venv/lib/python3.8/site-packages/pyspark/sql/dataframe.py",
>> line 502, in show
>
> print(self._jdf.showString(n, int_truncate, vertical))
>
>   File
>> "/home/indivar/corridor/code/corridor-platforms/venv/lib/python3.8/site-packages/py4j/java_gateway.py",
>> line 1321, in __call__
>
> return_value = get_return_value(
>
>   File
>> "/home/indivar/corridor/code/corridor-platforms/venv/lib/python3.8/site-packages/pyspark/sql/utils.py",
>> line 117, in deco
>
> raise converted from None
>
> pyspark.sql.utils.PythonException:
>
>   An exception was thrown from the Python worker. Please see the stack
>> trace below.
>
> Traceback (most recent call last):
>
>   File "", line 21, in 
>
>   File "", line 18, in errror_func
>
>   File "", line 16, in internal_error_method
>
> RuntimeError
>
>
> As you can see this has missing line content.

initially i thought this was a python issue, so i tried to do some reading,
python internally seems to be using linecache module to get content of
line, now when doing exec uptill python 3.12 python also had same issue
which they have fixed in python 3.13 [issue ref for details]: Support
multi-line error locations in traceback and other related improvements
(PEP-657, 3.11) · Issue #106922 · python/cpython (github.com)

Re: [spark-graphframes]: Generating incorrect edges

2024-05-01 Thread Mich Talebzadeh
Hi Steve,

Thanks for your statement. I tend to use uuid myself to avoid
collisions. This built-in function generates random IDs that are highly
likely to be unique across systems. My concerns are on edge so to speak. If
the Spark application runs for a very long time or encounters restarts, the
monotonically_increasing_id() sequence might restart from the beginning.
This could again cause duplicate IDs if other Spark applications are
running concurrently or if data is processed across multiple runs of the
same application..

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Wed, 1 May 2024 at 01:22, Stephen Coy  wrote:

> Hi Mich,
>
> I was just reading random questions on the user list when I noticed that
> you said:
>
> On 25 Apr 2024, at 2:12 AM, Mich Talebzadeh 
> wrote:
>
> 1) You are using monotonically_increasing_id(), which is not
> collision-resistant in distributed environments like Spark. Multiple hosts
>can generate the same ID. I suggest switching to UUIDs (e.g.,
> uuid.uuid4()) for guaranteed uniqueness.
>
>
> It’s my understanding that the *Spark* `monotonically_increasing_id()`
> function exists for the exact purpose of generating a collision-resistant
> unique id across nodes on different hosts.
> We use it extensively for this purpose and have never encountered an issue.
>
> Are we wrong or are you thinking of a different (not Spark) function?
>
> Cheers,
>
> Steve C
>
>
>
>
> This email contains confidential information of and is the copyright of
> Infomedia. It must not be forwarded, amended or disclosed without consent
> of the sender. If you received this message by mistake, please advise the
> sender and delete all copies. Security of transmission on the internet
> cannot be guaranteed, could be infected, intercepted, or corrupted and you
> should ensure you have suitable antivirus protection in place. By sending
> us your or any third party personal details, you consent to (or confirm you
> have obtained consent from such third parties) to Infomedia’s privacy
> policy. http://www.infomedia.com.au/privacy-policy/
>


Re: [spark-graphframes]: Generating incorrect edges

2024-04-30 Thread Stephen Coy
Hi Mich,

I was just reading random questions on the user list when I noticed that you 
said:

On 25 Apr 2024, at 2:12 AM, Mich Talebzadeh  wrote:

1) You are using monotonically_increasing_id(), which is not 
collision-resistant in distributed environments like Spark. Multiple hosts
   can generate the same ID. I suggest switching to UUIDs (e.g., uuid.uuid4()) 
for guaranteed uniqueness.


It’s my understanding that the *Spark* `monotonically_increasing_id()` function 
exists for the exact purpose of generating a collision-resistant unique id 
across nodes on different hosts.
We use it extensively for this purpose and have never encountered an issue.

Are we wrong or are you thinking of a different (not Spark) function?

Cheers,

Steve C




This email contains confidential information of and is the copyright of 
Infomedia. It must not be forwarded, amended or disclosed without consent of 
the sender. If you received this message by mistake, please advise the sender 
and delete all copies. Security of transmission on the internet cannot be 
guaranteed, could be infected, intercepted, or corrupted and you should ensure 
you have suitable antivirus protection in place. By sending us your or any 
third party personal details, you consent to (or confirm you have obtained 
consent from such third parties) to Infomedia’s privacy policy. 
http://www.infomedia.com.au/privacy-policy/


unsubscribe

2024-04-30 Thread Wood Super
unsubscribe


unsubscribe

2024-04-30 Thread junhua . xie
unsubscribe


unsubscribe

2024-04-30 Thread Yoel Benharrous



Re: spark.sql.shuffle.partitions=auto

2024-04-30 Thread Mich Talebzadeh
spark.sql.shuffle.partitions=auto

Because Apache Spark does not build clusters. This configuration option is
specific to Databricks, with their managed Spark offering. It allows
Databricks to automatically determine an optimal number of shuffle
partitions for your workload.

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Tue, 30 Apr 2024 at 11:51, second_co...@yahoo.com.INVALID
 wrote:

> May i know is
>
> spark.sql.shuffle.partitions=auto
>
> only available on Databricks? what about on vanilla Spark ? When i set
> this, it gives error need to put int.  Any open source library that auto
> find the best partition , block size for dataframe?
>
>
>


Re: Spark on Kubernetes

2024-04-30 Thread Mich Talebzadeh
Hi,
In k8s the driver is responsible for executor creation. The likelihood of
your problem is that Insufficient memory allocated for executors in the K8s
cluster. Even with dynamic allocation, k8s won't  schedule executor pods if
there is not enough free memory to fulfill their resource requests.

My suggestions

   - Increase Executor Memory: Allocate more memory per executor (e.g., 2GB
   or 3GB) to allow for multiple executors within available cluster memory.
   - Adjust Driver Pod Resources: Ensure the driver pod has enough memory
   to run Spark and manage executors.
   - Optimize Resource Management: Explore on-demand allocation or
   adjusting allocation granularity for better resource utilization. For
   example look at documents for Executor On-Demand Allocation
   (spark.executor.cores=0): and spark.dynamicAllocation.minExecutors &
   spark.dynamicAllocation.maxExecutors

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Tue, 30 Apr 2024 at 04:29, Tarun raghav 
wrote:

> Respected Sir/Madam,
> I am Tarunraghav. I have a query regarding spark on kubernetes.
>
> We have an eks cluster, within which we have spark installed in the pods.
> We set the executor memory as 1GB and set the executor instances as 2, I
> have also set dynamic allocation as true. So when I try to read a 3 GB CSV
> file or parquet file, it is supposed to increase the number of pods by 2.
> But the number of executor pods is zero.
> I don't know why executor pods aren't being created, even though I set
> executor instance as 2. Please suggest a solution for this.
>
> Thanks & Regards,
> Tarunraghav
>
>


spark.sql.shuffle.partitions=auto

2024-04-30 Thread second_co...@yahoo.com.INVALID
May i know is spark.sql.shuffle.partitions=auto only available on Databricks? 
what about on vanilla Spark ? When i set this, it gives error need to put int.  
Any open source library that auto find the best partition , block size for 
dataframe?



Spark on Kubernetes

2024-04-29 Thread Tarun raghav
Respected Sir/Madam,
I am Tarunraghav. I have a query regarding spark on kubernetes.

We have an eks cluster, within which we have spark installed in the pods.
We set the executor memory as 1GB and set the executor instances as 2, I
have also set dynamic allocation as true. So when I try to read a 3 GB CSV
file or parquet file, it is supposed to increase the number of pods by 2.
But the number of executor pods is zero.
I don't know why executor pods aren't being created, even though I set
executor instance as 2. Please suggest a solution for this.

Thanks & Regards,
Tarunraghav


Re: Python for the kids and now PySpark

2024-04-28 Thread Meena Rajani
Mitch, you are right these days the attention span is getting shorter.
Christian could work on a completely new thing for 3 hours and is proud to
explain. It is amazing.

Thanks for sharing.



On Sat, Apr 27, 2024 at 9:40 PM Farshid Ashouri 
wrote:

> Mich, this is absolutely amazing.
>
> Thanks for sharing.
>
> On Sat, 27 Apr 2024, 22:26 Mich Talebzadeh, 
> wrote:
>
>> Python for the kids. Slightly off-topic but worthwhile sharing.
>>
>> One of the things that may benefit kids is starting to learn something
>> new. Basically anything that can focus their attention away from games for
>> a few hours. Around 2020, my son Christian (now nearly 15) decided to
>> learn a programming language. So somehow he picked Python to start with.
>> The kids are good when they focus. However, they live in a virtual reality
>> world and they cannot focus for long hours. I let him explore Python on his
>> Windows 10 laptop and download it himself. In the following video Christian
>> explains to his mother what he started to do just before going to bed. BTW,
>> when he says 32M he means 32-bit. I leave it to you to judge :) Now the
>> idea is to start learning PySpark. So I will let him do it himself and
>> learn from his mistakes.  For those who have kids, I would be interested to
>> know their opinion.
>>
>> Cheers
>>
>>
>> Mich Talebzadeh,
>> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner
>> Von Braun
>> )".
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Python for the kids and now PySpark

2024-04-27 Thread Farshid Ashouri
Mich, this is absolutely amazing.

Thanks for sharing.

On Sat, 27 Apr 2024, 22:26 Mich Talebzadeh, 
wrote:

> Python for the kids. Slightly off-topic but worthwhile sharing.
>
> One of the things that may benefit kids is starting to learn something
> new. Basically anything that can focus their attention away from games for
> a few hours. Around 2020, my son Christian (now nearly 15) decided to
> learn a programming language. So somehow he picked Python to start with.
> The kids are good when they focus. However, they live in a virtual reality
> world and they cannot focus for long hours. I let him explore Python on his
> Windows 10 laptop and download it himself. In the following video Christian
> explains to his mother what he started to do just before going to bed. BTW,
> when he says 32M he means 32-bit. I leave it to you to judge :) Now the
> idea is to start learning PySpark. So I will let him do it himself and
> learn from his mistakes.  For those who have kids, I would be interested to
> know their opinion.
>
> Cheers
>
>
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


[Release Question]: Estimate on 3.5.2 release?

2024-04-26 Thread Paul Gerver
Hello,

I'm curious if there is an estimate when 3.5.2 for Spark Core will be released.
There are several bug and security vulnerability fixes in the dependencies we 
are excited to receive!

If anyone has any insights, that would be greatly appreciated. Thanks!
- ​Paul





[cid:8a2e80d5-1a98-4eca-b993-46937e35b2e9]



Paul Gerver

Streams Software Engineer

[cid:0412bb89-89ee-4329-a946-00e56d95d85c][cid:8543c1a1-eba3-4153-9c05-88c2e1d9bc13]


This e-mail (including any attachments) may contain privileged, confidential, 
proprietary, private, copyrighted, or other legally protected information. The 
information is intended to be for the use of the individual or entity 
designated above. If you are not the intended recipient (even if the e-mail 
address above is yours), please notify us by return e-mail immediately, and 
delete the message and any attachments. Any disclosure, reproduction, 
distribution or other use of this message or any attachments by an individual 
or entity other than the intended recipient is prohibited.


[SparkListener] Accessing classes loaded via the '--packages' option

2024-04-26 Thread Damien Hawes
Hi folks,

I'm contributing to the OpenLineage project, specifically the Apache Spark
integration. My current focus is on extending the project to support data
lineage extraction for Spark Streaming, beginning with Apache Kafka sources
and sinks.

I've encountered an obstacle when attempting to access information
essential for lineage extraction from Apache Kafka-related classes within
the OpenLineage Spark code base. Specifically, I need to access details
like Kafka topic names and bootstrap servers from objects like
StreamingDataSourceV2Relation.

While I can successfully access these details if the Kafka JARs are placed
directly in the 'spark/jars' directory, I'm unable to do so when using the
`--packages` option for dependency management. This creates a significant
obstacle for users who rely on `--packages` for their Spark applications.

I've taken initial steps to investigate (viewable in this GitHub PR
, the class in
question is *StreamingDataSourceV2RelationVisitor*), but I'd greatly
appreciate any insights or guidance on the following:

*1. Understanding the Issue:* Are there known reasons within Spark that
could explain this difference in behavior when loading dependencies via
`--packages` versus placing JARs directly?
*2. Alternative Approaches:*  Are there recommended techniques or patterns
to access the necessary Kafka class information within a SparkListener
extension, especially when dependencies are managed via `--packages`?

I'm eager to find a solution that avoids heavy reliance on reflection.

Thank you for your time and assistance!

Kind regards,
Damien


unsubscribe

2024-04-25 Thread Code Tutelage



Re:RE: How to add MaxDOP option in spark mssql JDBC

2024-04-25 Thread Elite
Thank you.

My main purpose is pass "MaxDop 1" to MSSQL to control the CPU usage. From the 
offical doc, I guess the problem of my codes is spark wrap the query to 




select * from (SELECT TOP 10 * FROM dbo.Demo with (nolock) WHERE Id = 1 option 
(maxdop 1)) spark_gen_alias




Apparently, this valilate MSSQL syntax, because "option (maxdop 1)" is not 
placed at the end.

May I know,  how spark wrap the query if I use prepareQuery?

I do not have spark 3.4+ env now, so did not got a chance to try this option.







At 2024-04-24 20:51:45, "Appel, Kevin"  wrote:

You might be able to leverage the prepareQuery option, that is at 
https://spark.apache.org/docs/3.5.1/sql-data-sources-jdbc.html#data-source-option
 … this was introduced in Spark 3.4.0 to handle temp table query and CTE query 
against MSSQL server since what you send in is not actually what gets sent, 
there is some items that get wrapped.

 

There is more of the technical info in 
https://issues.apache.org/jira/browse/SPARK-37259 with the PR’s linked that had 
the fix done for this

 

 

From: Elite 
Sent: Tuesday, April 23, 2024 10:28 PM
To: user@spark.apache.org
Subject: How to add MaxDOP option in spark mssql JDBC

 

[QUESTION] How to pass MAXDOP option · Issue #2395 · microsoft/mssql-jdbc 
(github.com)

 

Hi team, 

 

I am suggested to require help form spark community.

 

We suspect spark rewerite the query before pass to ms sql, and it lead to 
syntax error.

Is there any work around to let make my codes work? 

 

spark.read()
.format("jdbc")
.option("driver","com.microsoft.sqlserver.jdbc.SQLServerDriver")
.option("url", "jdbc:sqlserver://xxx.database.windows.net;databaseName=")
.option("query", "SELECT TOP 10 * FROM dbo.Demo with (nolock) WHERE Id = 1 
option (maxdop 1)")
.load()
.show();

com.microsoft.sqlserver.jdbc.SQLServerException: Incorrect syntax near the 
keyword 'option'.
at 
com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:270)
at 
com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1778)
at 
com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.doExecutePreparedStatement(SQLServerPreparedStatement.java:697)
at 
com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement$PrepStmtExecCmd.doExecute(SQLServerPreparedStatement.java:616)
at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7775)
at 
com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:4397)
at 
com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:293)
at 
com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:263)
at 
com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeQuery(SQLServerPreparedStatement.java:531)
at 
org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:61)
at 
org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:226)
at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:35)
at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:344)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:297)
at 
org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:286)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:286)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:221)

This message, and any attachment(s), is for the intended recipient(s) only, may 
contain information that is privileged, confidential and/or proprietary and 
subject to important terms and conditions available at 
http://www.bankofamerica.com/electronic-disclaimer. If you are not the intended 
recipient, please delete this message. For more information about how Bank of 
America protects your privacy, including specific rights that may apply, please 
visit the following pages: 
https://business.bofa.com/en-us/content/global-privacy-notices.html (which 
includes global privacy notices) and 
https://www.bankofamerica.com/security-center/privacy-overview/ (which includes 
US State specific privacy notices such as the 
http://www.bankofamerica.com/ccpa-notice).


Re: [spark-graphframes]: Generating incorrect edges

2024-04-25 Thread Nijland, J.G.W. (Jelle, Student M-CS)
Hi Mich,

Thanks for your suggestions.
1) It currently runs on one server with plenty of resources assigned. But I 
will keep it in mind to replace monotonically_increasing_id() with uuid() once 
we scale up.
2) I have replaced the null values in origin with a string 
{prefix}-{mnt_by}-{organisation}

replacement_string = psf.concat_ws("-", psf.col("prefix"), psf.col("mnt_by"), 
psf.col("descr"))
df = df.withColumn("origin", psf.coalesce(psf.col("origin"), 
replacement_string))

I have verified my other columns have no Null values.

3) This is my logic how i generate IDs

mnt_by_id = df.select(MNT_BY).distinct().withColumn(MAINTAINER_ID, 
psf.concat(psf.lit('m_'), psf.monotonically_increasing_id()))
prefix_id = df.select(PREFIX).distinct().withColumn(PREFIX_ID, 
psf.concat(psf.lit('p_'), psf.monotonically_increasing_id()))
origin_id = df.select(ORIGIN).distinct().withColumn(ORIGIN_ID, 
psf.concat(psf.lit('o_'), psf.monotonically_increasing_id()))
organisation_id = df.select(DESCR).distinct().withColumn(ORGANISATION_ID, 
psf.concat(psf.lit('org_'), psf.monotonically_increasing_id()))

df = df.join(mnt_by_id, on=MNT_BY, how="left").join(prefix_id, on=PREFIX, 
how="left").join(origin_id, on=ORIGIN, how="left").join(organisation_id, 
on=DESCR, how="left")

I create the ID using the distinct values in the columns "mnt_by", "prefix", 
"origin" and "descr". The same columns I join "on".

4) This is my current resource allocation, I run it on the server of my 
university.
It has 112 cores and 1.48T ram, I can request more resources but in my eyes 
this sound be plenty.
If you think more resource would help, I will ask them.

spark_conf = SparkConf().setAppName(f"pyspark-{APP_NAME}-{int(time())}").set(
"spark.submit.deployMode", "client"
).set("spark.sql.parquet.binaryAsString", "true"
).set("spark.driver.bindAddress", "localhost"
).set("spark.driver.host", "127.0.0.1"
# ).set("spark.driver.port", "0"
).set("spark.ui.port", "4041"
).set("spark.executor.instances", "1"
).set("spark.executor.cores", "50"
).set("spark.executor.memory", "128G"
).set("spark.executor.memoryOverhead", "32G"
).set("spark.driver.cores", "16"
).set("spark.driver.memory", "64G"
)

I dont think b) applies as its a single machine.

Kind regards,
Jelle


From: Mich Talebzadeh 
Sent: Wednesday, April 24, 2024 6:12 PM
To: Nijland, J.G.W. (Jelle, Student M-CS) 
Cc: user@spark.apache.org 
Subject: Re: [spark-graphframes]: Generating incorrect edges

OK let us have a look at these

1) You are using monotonically_increasing_id(), which is not 
collision-resistant in distributed environments like Spark. Multiple hosts
   can generate the same ID. I suggest switching to UUIDs (e.g., uuid.uuid4()) 
for guaranteed uniqueness.

2) Missing values in the Origin column lead to null IDs, potentially causing 
problems downstream. You can handle missing values appropriately, say
   a) Filter out rows with missing origins or b) impute missing values with a 
strategy that preserves relationships (if applicable).

3) With join code, you mentioned left joining on the same column used for ID 
creation, not very clear!

4) Edge Issue, it appears to me the issue seems to occur with larger datasets 
(>100K records). Possible causes could be
   a) Resource Constraints as data size increases, PySpark might struggle with 
joins or computations if resources are limited (memory, CPU).
   b) Data Skew: Uneven distribution of values in certain columns could lead to 
imbalanced processing across machines.  Check Spark UI (4040) on staging and 
execution tabs

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


 
[https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE]
   view my Linkedin 
profile


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge but 
of course cannot be guaranteed . It is essential to note that, as with any 
advice, quote "one test result is worth one-thousand expert opinions (Werner 
 Von 
Braun)".


On Wed, 24 Apr 2024 at 16:44, Nijland, J.G.W. (Jelle, Student M-CS) 
mailto:j.g.w.nijl...@student.utwente.nl>> 
wrote:
Hi Mich,

Thanks for your reply,
1) ID generation is done using 
monotonically_increasing_id()
 this is then prefixed with "p_", "m_", "o_" or "org_" depending on the type of 
the value it identifies.
2) There are some missing values in the Origin column, these will result in a 
Null ID
3) The join code is present in [1], I join "left" on the same column I create 
the ID on
4) I dont think the issue is in ID or edge 

DataFrameReader: timestampFormat default value

2024-04-24 Thread keen
Is anyone familiar with [Datetime patterns](
https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html) and
`TimestampType` parsing in PySpark?
When reading CSV or JSON files, timestamp columns need to be parsed. via
datasource property `timestampFormat`.
[According to documentation](
https://spark.apache.org/docs/3.3.1/sql-data-sources-json.html#data-source-option:~:text=read/write-,timestampFormat,-%2DMM%2Ddd%27T%27HH)
default value is `-MM-dd'T'HH:mm:ss[.SSS][XXX]`.

However, I noticed some weird behavior:
```python
from pyspark.sql import types as T

json_lines =[
"{'label': 'no tz'  , 'value':
'2023-12-24T20:00:00'  }",
"{'label': 'UTC', 'value':
'2023-12-24T20:00:00Z' }",
"{'label': 'tz offset hour' , 'value':
'2023-12-24T20:00:00+01'   }",
"{'label': 'tz offset minute no colon'  , 'value':
'2023-12-24T20:00:00+0100' }",
"{'label': 'tz offset minute with colon', 'value':
'2023-12-24T20:00:00+01:00'}",
"{'label': 'tz offset second no colon'  , 'value':
'2023-12-24T20:00:00+01'   }",
"{'label': 'tz offset second with colon', 'value':
'2023-12-24T20:00:00+01:00:00' }",
]

schema = T.StructType([
T.StructField("label", T.StringType()),
T.StructField("value", T.TimestampType()),
T.StructField("t_corrupt_record", T.StringType()),
])

df = (spark.read
.schema(schema)
.option("timestampFormat", "-MM-dd'T'HH:mm:ss[.SSS][XXX]") # <--
using the "default" from doc
.option("mode", "PERMISSIVE")
.option("columnNameOfCorruptRecord", "t_corrupt_record")
.json(sc.parallelize(json_lines))
)

df.show(truncate=False)
+---+---+--+
|label  |value  |t_corrupt_record
   |
+---+---+--+
|no tz  |2023-12-24 20:00:00|null
   |
|UTC|2023-12-24 20:00:00|null
   |
|tz offset hour |null   |{'label': 'tz offset hour'
, 'value': '2023-12-24T20:00:00+01'   }|
|tz offset minute no colon  |null   |{'label': 'tz offset
minute no colon'  , 'value': '2023-12-24T20:00:00+0100' }|
|tz offset minute with colon|2023-12-24 19:00:00|null
   |
|tz offset second no colon  |null   |{'label': 'tz offset
second no colon'  , 'value': '2023-12-24T20:00:00+01'   }|
|tz offset second with colon|null   |{'label': 'tz offset
second with colon', 'value': '2023-12-24T20:00:00+01:00:00' }|
+---+---+--+
```

however, when omitting timestampFormat , the values are parsed just fine
```python
df = (spark.read
.schema(schema)
.option("mode", "PERMISSIVE")
.option("columnNameOfCorruptRecord", "t_corrupt_record")
.json(sc.parallelize(json_lines))
)

df.show(truncate=False)
+---+---++
|label  |value  |t_corrupt_record|
+---+---++
|no tz  |2023-12-24 20:00:00|null|
|UTC|2023-12-24 20:00:00|null|
|tz offset hour |2023-12-24 19:00:00|null|
|tz offset minute no colon  |2023-12-24 19:00:00|null|
|tz offset minute with colon|2023-12-24 19:00:00|null|
|tz offset second no colon  |2023-12-24 19:00:00|null|
|tz offset second with colon|2023-12-24 19:00:00|null|
+---+---++
```

This is not plausible to me.
Using the default value explicitly should lead to the same results as
omitting it.


Thanks and regards
Martin


Re: [spark-graphframes]: Generating incorrect edges

2024-04-24 Thread Mich Talebzadeh
OK let us have a look at these

1) You are using monotonically_increasing_id(), which is not
collision-resistant in distributed environments like Spark. Multiple hosts
   can generate the same ID. I suggest switching to UUIDs (e.g.,
uuid.uuid4()) for guaranteed uniqueness.

2) Missing values in the Origin column lead to null IDs, potentially
causing problems downstream. You can handle missing values appropriately,
say
   a) Filter out rows with missing origins or b) impute missing values with
a strategy that preserves relationships (if applicable).

3) With join code, you mentioned left joining on the same column used for
ID creation, not very clear!

4) Edge Issue, it appears to me the issue seems to occur with larger
datasets (>100K records). Possible causes could be
   a) Resource Constraints as data size increases, PySpark might struggle
with joins or computations if resources are limited (memory, CPU).
   b) Data Skew: Uneven distribution of values in certain columns could
lead to imbalanced processing across machines.  Check Spark UI (4040) on
staging and execution tabs

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Wed, 24 Apr 2024 at 16:44, Nijland, J.G.W. (Jelle, Student M-CS) <
j.g.w.nijl...@student.utwente.nl> wrote:

> Hi Mich,
>
> Thanks for your reply,
> 1) ID generation is done using monotonically_increasing_id()
> 
>  this
> is then prefixed with "p_", "m_", "o_" or "org_" depending on the type of
> the value it identifies.
> 2) There are some missing values in the Origin column, these will result
> in a Null ID
> 3) The join code is present in [1], I join "left" on the same column
> I create the ID on
> 4) I dont think the issue is in ID or edge generation, if i limit my input
> dataframe and union it with my Utwente data row, I can verify those edges
> are created correctly up to 100K records.
> Once I go past that amount of records the results become inconsistent and
> incorrect.
>
> Kind regards,
> Jelle Nijland
>
>
> --
> *From:* Mich Talebzadeh 
> *Sent:* Wednesday, April 24, 2024 4:40 PM
> *To:* Nijland, J.G.W. (Jelle, Student M-CS) <
> j.g.w.nijl...@student.utwente.nl>
> *Cc:* user@spark.apache.org 
> *Subject:* Re: [spark-graphframes]: Generating incorrect edges
>
> OK few observations
>
> 1) ID Generation Method: How are you generating unique IDs (UUIDs,
> sequential numbers, etc.)?
> 2) Data Inconsistencies: Have you checked for missing values impacting ID
> generation?
> 3) Join Verification: If relevant, can you share the code for joining data
> points during ID creation? Are joins matching columns correctly?
> 4) Specific Edge Issues: Can you share examples of vertex IDs with
> incorrect connections? Is this related to ID generation or edge creation
> logic?
>
> HTH
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI, FinCrime
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner Von
> Braun )".
>
>
> On Wed, 24 Apr 2024 at 12:24, Nijland, J.G.W. (Jelle, Student M-CS) <
> j.g.w.nijl...@student.utwente.nl> wrote:
>
> tags: pyspark,spark-graphframes
>
> Hello,
>
> I am running pyspark in a podman container and I have issues with
> incorrect edges when I build my graph.
> I start with loading a source dataframe from a parquet directory on my
> server. The source dataframe has the following columns:
>
> +-+---+-+-+--+-+--+---+
> |created |descr |last_modified|mnt_by |origin|start_address|prefix
> |external_origin|
>
> +-+---+-+-+--+-+--+---+
>
> I aim to build a graph connecting prefix, mnt_by, origin and descr with
> edges storing the created and last_modified values.
> I start with generating IDs for the prefix, mnt_by, origin and descr using

Re: [spark-graphframes]: Generating incorrect edges

2024-04-24 Thread Nijland, J.G.W. (Jelle, Student M-CS)
Hi Mich,

Thanks for your reply,
1) ID generation is done using 
monotonically_increasing_id()
 this is then prefixed with "p_", "m_", "o_" or "org_" depending on the type of 
the value it identifies.
2) There are some missing values in the Origin column, these will result in a 
Null ID
3) The join code is present in [1], I join "left" on the same column I create 
the ID on
4) I dont think the issue is in ID or edge generation, if i limit my input 
dataframe and union it with my Utwente data row, I can verify those edges are 
created correctly up to 100K records.
Once I go past that amount of records the results become inconsistent and 
incorrect.

Kind regards,
Jelle Nijland



From: Mich Talebzadeh 
Sent: Wednesday, April 24, 2024 4:40 PM
To: Nijland, J.G.W. (Jelle, Student M-CS) 
Cc: user@spark.apache.org 
Subject: Re: [spark-graphframes]: Generating incorrect edges

OK few observations

1) ID Generation Method: How are you generating unique IDs (UUIDs, sequential 
numbers, etc.)?
2) Data Inconsistencies: Have you checked for missing values impacting ID 
generation?
3) Join Verification: If relevant, can you share the code for joining data 
points during ID creation? Are joins matching columns correctly?
4) Specific Edge Issues: Can you share examples of vertex IDs with incorrect 
connections? Is this related to ID generation or edge creation logic?

HTH
Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI, FinCrime
London
United Kingdom


 
[https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE]
   view my Linkedin 
profile


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge but 
of course cannot be guaranteed . It is essential to note that, as with any 
advice, quote "one test result is worth one-thousand expert opinions (Werner 
 Von 
Braun)".


On Wed, 24 Apr 2024 at 12:24, Nijland, J.G.W. (Jelle, Student M-CS) 
mailto:j.g.w.nijl...@student.utwente.nl>> 
wrote:
tags: pyspark,spark-graphframes

Hello,

I am running pyspark in a podman container and I have issues with incorrect 
edges when I build my graph.
I start with loading a source dataframe from a parquet directory on my server. 
The source dataframe has the following columns:
+-+---+-+-+--+-+--+---+
|created |descr |last_modified|mnt_by |origin|start_address|prefix 
|external_origin|
+-+---+-+-+--+-+--+---+

I aim to build a graph connecting prefix, mnt_by, origin and descr with edges 
storing the created and last_modified values.
I start with generating IDs for the prefix, mnt_by, origin and descr using 
monotonically_increasing_id() [1]
These IDs are prefixed with "m_", "p_", "o_" or "org_" to ensure they are 
unique IDs across the dataframe.

Then I construct the vertices dataframe by collecting the ID, value and whether 
they are external for each vertex. [2]
These vertices are then unioned together.
Following the vertices, I construct the edges dataframe by selecting the IDs 
that I want to be the src and the dst and union those together. [3]
These edges store the created and last_modified.

Now I am ready to construct the graph. Here is where I run into my issue.

When verifying my graph, I looked at a couple of vertices to see if they have 
the correct edges.
I looked at the Utwente prefix, origin, descr and mnt_by and found that it 
generates incorrect edges.

I saw edges going out to vertices that are not associated with the utwente 
values at all.
The methods to find the vertices, edges and the output can be found in [4]
We can already observe inconsistencies by viewing the prefix->maintainer and 
origin -> prefix edges. [5]
Depending on what column I filter on the results are inconsistent.
To make matters worse some edges contain IDs that are not connected to the 
original values in the source dataframe at all.

What I have tried to resolve my issue:

  *
Write a checker that verifies edges created against the source dataframe. [6]
The aim of this checker was to determine where the inconsistency comes fro, to 
locate the bug and resolve it.
I ran this checker a limited graphs from n=10 upwards to n=100 (or 1m).
This felt close enough as there are only ~6.5m records in my source dataframe.
This ran correctly, near the 1m it did experience significant slowdown at the 
full dataframe it errors/times out.
I blamed this on the large joins that it performs on the source dataframe.
  *
I found a github issue of 

Re: [spark-graphframes]: Generating incorrect edges

2024-04-24 Thread Mich Talebzadeh
OK few observations

1) ID Generation Method: How are you generating unique IDs (UUIDs,
sequential numbers, etc.)?
2) Data Inconsistencies: Have you checked for missing values impacting ID
generation?
3) Join Verification: If relevant, can you share the code for joining data
points during ID creation? Are joins matching columns correctly?
4) Specific Edge Issues: Can you share examples of vertex IDs with
incorrect connections? Is this related to ID generation or edge creation
logic?

HTH
Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI, FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Wed, 24 Apr 2024 at 12:24, Nijland, J.G.W. (Jelle, Student M-CS) <
j.g.w.nijl...@student.utwente.nl> wrote:

> tags: pyspark,spark-graphframes
>
> Hello,
>
> I am running pyspark in a podman container and I have issues with
> incorrect edges when I build my graph.
> I start with loading a source dataframe from a parquet directory on my
> server. The source dataframe has the following columns:
>
> +-+---+-+-+--+-+--+---+
> |created |descr |last_modified|mnt_by |origin|start_address|prefix
> |external_origin|
>
> +-+---+-+-+--+-+--+---+
>
> I aim to build a graph connecting prefix, mnt_by, origin and descr with
> edges storing the created and last_modified values.
> I start with generating IDs for the prefix, mnt_by, origin and descr using
> monotonically_increasing_id() [1]
> These IDs are prefixed with "m_", "p_", "o_" or "org_" to ensure they are
> unique IDs across the dataframe.
>
> Then I construct the vertices dataframe by collecting the ID, value and
> whether they are external for each vertex. [2]
> These vertices are then unioned together.
> Following the vertices, I construct the edges dataframe by selecting the
> IDs that I want to be the src and the dst and union those together. [3]
> These edges store the created and last_modified.
>
> Now I am ready to construct the graph. Here is where I run into my issue.
>
> When verifying my graph, I looked at a couple of vertices to see if they
> have the correct edges.
> I looked at the Utwente prefix, origin, descr and mnt_by and found that it
> generates incorrect edges.
>
> I saw edges going out to vertices that are not associated with the utwente
> values at all.
> The methods to find the vertices, edges and the output can be found in [4]
> We can already observe inconsistencies by viewing the prefix->maintainer
> and origin -> prefix edges. [5]
> Depending on what column I filter on the results are inconsistent.
> To make matters worse some edges contain IDs that are not connected to the
> original values in the source dataframe at all.
>
> What I have tried to resolve my issue:
>
>- Write a checker that verifies edges created against the source
>dataframe. [6]
>The aim of this checker was to determine where the inconsistency comes
>fro, to locate the bug and resolve it.
>I ran this checker a limited graphs from n=10 upwards to n=100 (or
>1m).
>This felt close enough as there are only ~6.5m records in my source
>dataframe.
>This ran correctly, near the 1m it did experience significant slowdown
>at the full dataframe it errors/times out.
>I blamed this on the large joins that it performs on the source
>dataframe.
>- I found a github issue of someone with significantly larger graphs
>have similar issues.
>One suggestion there blamed indexing using strings rather than ints or
>longs.
>I rewrote my system to use int for IDs but I ran into the same issue.
>The amount of incorrect edges was the same, the link to which
>incorrects vertices it links to was the same too.
>- I re-ordered my source dataframe to see what the impact was.
>This results in considerably more incorrect edges using the checker in
>[4]
>If helpful I can post the output of this checker as well.
>
>
> Can you give me any pointers in what I can try or what I can do to clarify
> my situation better?
> Thanks in advance for your time.
>
> Kind regards,
> Jelle Nijland
>
>
>
>
> [1]
> import pyspark.sql.functions as psf
>
> # ID labels
> PREFIX_ID = "prefix_id"
> MAINTAINER_ID = "mnt_by_id"
> ORIGIN_ID = "origin_id"
> ORGANISATION_ID = "organisation_id"
>
> # Source dataframe column names
> MNT_BY = "mnt_by"
> PREFIX = "prefix"
> ORIGIN = "origin"
> DESCR = "descr"
> 

RE: How to add MaxDOP option in spark mssql JDBC

2024-04-24 Thread Appel, Kevin
You might be able to leverage the prepareQuery option, that is at 
https://spark.apache.org/docs/3.5.1/sql-data-sources-jdbc.html#data-source-option
 ... this was introduced in Spark 3.4.0 to handle temp table query and CTE 
query against MSSQL server since what you send in is not actually what gets 
sent, there is some items that get wrapped.

There is more of the technical info in 
https://issues.apache.org/jira/browse/SPARK-37259 with the PR's linked that had 
the fix done for this


From: Elite 
Sent: Tuesday, April 23, 2024 10:28 PM
To: user@spark.apache.org
Subject: How to add MaxDOP option in spark mssql JDBC

[QUESTION] How to pass MAXDOP option * Issue #2395 * microsoft/mssql-jdbc 
(github.com)

Hi team,

I am suggested to require help form spark community.

We suspect spark rewerite the query before pass to ms sql, and it lead to 
syntax error.
Is there any work around to let make my codes work?


spark.read()
.format("jdbc")
.option("driver","com.microsoft.sqlserver.jdbc.SQLServerDriver")
.option("url", "jdbc:sqlserver://xxx.database.windows.net;databaseName=")
.option("query", "SELECT TOP 10 * FROM dbo.Demo with (nolock) WHERE Id = 1 
option (maxdop 1)")
.load()
.show();

com.microsoft.sqlserver.jdbc.SQLServerException: Incorrect syntax near the 
keyword 'option'.
at 
com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:270)
at 
com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1778)
at 
com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.doExecutePreparedStatement(SQLServerPreparedStatement.java:697)
at 
com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement$PrepStmtExecCmd.doExecute(SQLServerPreparedStatement.java:616)
at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7775)
at 
com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:4397)
at 
com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:293)
at 
com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:263)
at 
com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeQuery(SQLServerPreparedStatement.java:531)
at 
org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:61)
at 
org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:226)
at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:35)
at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:344)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:297)
at 
org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:286)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:286)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:221)

--
This message, and any attachment(s), is for the intended recipient(s) only, may 
contain information that is privileged, confidential and/or proprietary and 
subject to important terms and conditions available at 
http://www.bankofamerica.com/electronic-disclaimer. If you are not the intended 
recipient, please delete this message. For more information about how Bank of 
America protects your privacy, including specific rights that may apply, please 
visit the following pages: 
https://business.bofa.com/en-us/content/global-privacy-notices.html (which 
includes global privacy notices) and 
https://www.bankofamerica.com/security-center/privacy-overview/ (which includes 
US State specific privacy notices such as the 
http://www.bankofamerica.com/ccpa-notice).


[spark-graphframes]: Generating incorrect edges

2024-04-24 Thread Nijland, J.G.W. (Jelle, Student M-CS)
tags: pyspark,spark-graphframes

Hello,

I am running pyspark in a podman container and I have issues with incorrect 
edges when I build my graph.
I start with loading a source dataframe from a parquet directory on my server. 
The source dataframe has the following columns:
+-+---+-+-+--+-+--+---+
|created |descr |last_modified|mnt_by |origin|start_address|prefix 
|external_origin|
+-+---+-+-+--+-+--+---+

I aim to build a graph connecting prefix, mnt_by, origin and descr with edges 
storing the created and last_modified values.
I start with generating IDs for the prefix, mnt_by, origin and descr using 
monotonically_increasing_id() [1]
These IDs are prefixed with "m_", "p_", "o_" or "org_" to ensure they are 
unique IDs across the dataframe.

Then I construct the vertices dataframe by collecting the ID, value and whether 
they are external for each vertex. [2]
These vertices are then unioned together.
Following the vertices, I construct the edges dataframe by selecting the IDs 
that I want to be the src and the dst and union those together. [3]
These edges store the created and last_modified.

Now I am ready to construct the graph. Here is where I run into my issue.

When verifying my graph, I looked at a couple of vertices to see if they have 
the correct edges.
I looked at the Utwente prefix, origin, descr and mnt_by and found that it 
generates incorrect edges.

I saw edges going out to vertices that are not associated with the utwente 
values at all.
The methods to find the vertices, edges and the output can be found in [4]
We can already observe inconsistencies by viewing the prefix->maintainer and 
origin -> prefix edges. [5]
Depending on what column I filter on the results are inconsistent.
To make matters worse some edges contain IDs that are not connected to the 
original values in the source dataframe at all.

What I have tried to resolve my issue:

  *
Write a checker that verifies edges created against the source dataframe. [6]
The aim of this checker was to determine where the inconsistency comes fro, to 
locate the bug and resolve it.
I ran this checker a limited graphs from n=10 upwards to n=100 (or 1m).
This felt close enough as there are only ~6.5m records in my source dataframe.
This ran correctly, near the 1m it did experience significant slowdown at the 
full dataframe it errors/times out.
I blamed this on the large joins that it performs on the source dataframe.
  *
I found a github issue of someone with significantly larger graphs have similar 
issues.
One suggestion there blamed indexing using strings rather than ints or longs.
I rewrote my system to use int for IDs but I ran into the same issue.
The amount of incorrect edges was the same, the link to which incorrects 
vertices it links to was the same too.
  *
I re-ordered my source dataframe to see what the impact was.
This results in considerably more incorrect edges using the checker in [4]
If helpful I can post the output of this checker as well.

Can you give me any pointers in what I can try or what I can do to clarify my 
situation better?
Thanks in advance for your time.

Kind regards,
Jelle Nijland




[1]
import pyspark.sql.functions as psf

# ID labels
PREFIX_ID = "prefix_id"
MAINTAINER_ID = "mnt_by_id"
ORIGIN_ID = "origin_id"
ORGANISATION_ID = "organisation_id"

# Source dataframe column names
MNT_BY = "mnt_by"
PREFIX = "prefix"
ORIGIN = "origin"
DESCR = "descr"
EXTERNAL_O = "external_origin"


def generate_ids(df: DataFrame) -> DataFrame:
"""
Generates a unique ID for each distinct maintainer, prefix, origin and 
organisation

Parameters
--
df : DataFrame
DataFrame to generate IDs for
"""
mnt_by_id = df.select(MNT_BY).distinct().withColumn(MAINTAINER_ID, 
psf.concat(psf.lit('m_'), psf.monotonically_increasing_id()))
prefix_id = df.select(PREFIX).distinct().withColumn(PREFIX_ID, 
psf.concat(psf.lit('p_'), psf.monotonically_increasing_id()))
origin_id = df.select(ORIGIN).distinct().withColumn(ORIGIN_ID, 
psf.concat(psf.lit('o_'), psf.monotonically_increasing_id()))
organisation_id = df.select(DESCR).distinct().withColumn(ORGANISATION_ID, 
psf.concat(psf.lit('org_'), psf.monotonically_increasing_id()))

df = df.join(mnt_by_id, on=MNT_BY, how="left").join(prefix_id, on=PREFIX, 
how="left").join(origin_id, on=ORIGIN, how="left").join(organisation_id, 
on=DESCR, how="left")
return df

def create_vertices(df: DataFrame) -> DataFrame:
"""
Creates vertices from a DataFrame with IDs
Vertices have the format:
ID (str) | VALUE (str) | EXTERNAL (bool)

ID follows the format [p_|o_|m_|org_][0-9]+

Parameters
--
df : DataFrame
DataFrame to generate vertices for
"""
prefixes = df.select(PREFIX_ID, PREFIX, psf.lit(False))
maintainers = df.select(MAINTAINER_ID, 

How to add MaxDOP option in spark mssql JDBC

2024-04-23 Thread Elite
[QUESTION] How to pass MAXDOP option · Issue #2395 · microsoft/mssql-jdbc 
(github.com)


Hi team, 


I am suggested to require help form spark community.


We suspect spark rewerite the query before pass to ms sql, and it lead to 
syntax error.
Is there any work around to let make my codes work? 



spark.read()
.format("jdbc")
.option("driver","com.microsoft.sqlserver.jdbc.SQLServerDriver")
.option("url", "jdbc:sqlserver://xxx.database.windows.net;databaseName=")
.option("query", "SELECT TOP 10 * FROM dbo.Demo with (nolock) WHERE Id = 1 
option (maxdop 1)")
.load()
.show();

com.microsoft.sqlserver.jdbc.SQLServerException: Incorrect syntax near the 
keyword 'option'.
at 
com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:270)
at 
com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1778)
at 
com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.doExecutePreparedStatement(SQLServerPreparedStatement.java:697)
at 
com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement$PrepStmtExecCmd.doExecute(SQLServerPreparedStatement.java:616)
at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7775)
at 
com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:4397)
at 
com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:293)
at 
com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:263)
at 
com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeQuery(SQLServerPreparedStatement.java:531)
at 
org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:61)
at 
org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:226)
at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:35)
at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:344)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:297)
at 
org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:286)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:286)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:221)

How to use Structured Streaming in Spark SQL

2024-04-22 Thread ????
In Flink, you can create flow calculation tables using Flink SQL, and directly 
connect with SQL through CDC and Kafka. How to use SQL for flow calculation in 
Spark



308027...@qq.com





How to access the internal hidden columns of table by spark jdbc

2024-04-20 Thread casel.chen
I want to use spark jdbc to access alibaba cloud hologres 
(https://www.alibabacloud.com/product/hologres)  internal hidden column 
`hg_binlog_timestamp_us ` but met the following error:


Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot 
resolve 'hg_binlog_timestamp_us' given input columns: 
[top_trans_order.acc_split_bunch, top_trans_order.acct_code, ...]; line 4 pos 
30;


So, my question is how to access the internal hidden columns of table by spark 
jdbc (sql and DataFrame) ? Thanks!

Accounting the impact of failures in spark jobs

2024-04-19 Thread Faiz Halde
Hello,

In my organization, we have an accounting system for spark jobs that uses
the task execution time to determine how much time a spark job uses the
executors for and we use it as a way to segregate cost. We sum all the task
times per job and apply proportions. Our clusters follow a 1 task per core
model & this works well.

A job goes through several failures during its run, due to executor
failure, node failure ( spot interruptions ), and spark retries tasks &
sometimes entire stages.

We now want to account for this failure and determine what % of a job's
total task time is due to these retries. Basically, if a job with failures
& retries has a total task time of X, there is a X' representing the
goodput of this job – i.e. a hypothetical run of the job with 0 failures &
retries. In this case, ( X-X' ) / X quantifies the cost of failures.

This form of accounting requires tracking execution history of each task
i.e. tasks that compute the same logical partition of some RDD. This was
quite easy with AQE disabled as stage ids never changed, but with AQE
enabled that's no longer the case.

Do you have any suggestions on how I can use the Spark event system?

Thanks
Faiz


StreamingQueryListener integration with Spark native metric sink (JmxSink)

2024-04-18 Thread Mason Chen
Hi all,

Is it possible to integrate StreamingQueryListener with Spark metrics so
that metrics can be reported through Spark's internal metric system?
Ideally, I would like to report some custom metrics through
StreamingQueryListener and export them to Spark's JmxSink.

Best,
Mason


[ANNOUNCE] Apache Spark 3.4.3 released

2024-04-18 Thread Dongjoon Hyun
We are happy to announce the availability of Apache Spark 3.4.3!

Spark 3.4.3 is a maintenance release containing many fixes including
security and correctness domains. This release is based on the
branch-3.4 maintenance branch of Spark. We strongly
recommend all 3.4 users to upgrade to this stable release.

To download Spark 3.4.3, head over to the download page:
https://spark.apache.org/downloads.html

To view the release notes:
https://spark.apache.org/releases/spark-release-3-4-3.html

We would like to acknowledge all community members for contributing to this
release. This release would not have been possible without you.

Dongjoon Hyun


[Spark SQL][How-To] Remove builtin function support from Spark

2024-04-17 Thread Matthew McMillian
Hello,

I'm very new to the Spark ecosystem, apologies if this question is a bit
simple.

I want to modify a custom fork of Spark to remove function support. For
example, I want to remove the query runners ability to call reflect and
java_method. I saw that there exists a data structure in spark-sql called
FunctionRegistry that seems to act as an allowlist on what functions Spark
can execute. If I remove a function of the registry, is that enough
guarantee that that function can "never" be invoked in Spark, or are there
other areas that would need to be changed as well?

Thanks,
Matthew McMillian


[Spark SQL][How-To] Remove builtin function support from Spark

2024-04-17 Thread Matthew McMillian
Hello,

I'm very new to the Spark ecosystem, apologies if this question is a bit
simple.

I want to modify a custom fork of Spark to remove function support. For
example, I want to remove the query runners ability to call reflect and
java_method. I saw that there exists a data structure in spark-sql called
FunctionRegistry that seems to act as an allowlist on what functions Spark
can execute. If I remove a function of the registry, is that enough
guarantee that that function can "never" be invoked in Spark, or are there
other areas that would need to be changed as well?

Thanks,
Matthew McMillian


should OutputCommitCoordinator fail stages for authorized committer failures when using s3a optimized committers?

2024-04-17 Thread Dylan McClelland
In https://issues.apache.org/jira/browse/SPARK-39195,
OutputCommitCoordinator was modified to fail a stage if an authorized
committer task fails.

We run our spark jobs on a k8s cluster managed by karpenter and mostly
built from spot instances. As a result, our executors are frequently
killed. With the above change, that leads to expensive stage failures at
the final write stage.

I think I understand why the above is needed when using
FileOutputCommitter, but it seems like we can handle things like the magic
s3a committer differently. For those, we could instead abort the task
attempt, which will the data files that are awaiting the final PUT
operation, and remove them from the list of files to be completed during
the job commit phase

Does this seem reasonable? I think the change could go in
OutputCommitCoordinator (as a case in the taskCompleted block), but there
are other options as well

Any other ideas on how stop individual failures of authorized committer
tasks from failing the whole job?


[Spark SQL] xxhash64 default seed of 42 confusion

2024-04-16 Thread Igor Calabria
Hi all,

I've noticed that spark's xxhas64 output doesn't match other tool's due to
using seed=42 as a default. I've looked at a few libraries and they use 0
as a default seed:

- python https://github.com/ifduyue/python-xxhash
- java https://github.com/OpenHFT/Zero-Allocation-Hashing/
- java (slice library, used by trino)
https://github.com/airlift/slice/blob/master/src/main/java/io/airlift/slice/XxHash64.java

Was there a special motivation behind this? or is 42 just used for the sake
of the hitchhiker's guide reference? It's very common for spark to interact
with other tools (either via data or direct connection) and this just seems
like a unnecessary footgun.


auto create event log directory if not exist

2024-04-15 Thread second_co...@yahoo.com.INVALID
Spark history server is set to use s3a, like below
spark.eventLog.enabled true
spark.eventLog.dir s3a://bucket-test/test-directory-log

any configuration option i can set on the Spark config such that if the 
directory 'test-directory-log' does not exist auto create it before start Spark 
history server?

Thank you



Re: Spark streaming job for kafka transaction does not consume read_committed messages correctly.

2024-04-14 Thread Kidong Lee
Thanks, Mich for your reply.

I agree, it is not so scalable and efficient. But it works correctly for
kafka transaction, and there is no problem with committing offset to kafka
async for now.

I try to tell you some more details about my streaming job.
CustomReceiver does not receive anything from outside and just forward
notice message to run an executor in which kafka consumer will be run.
See my CustomReceiver.

private static class CustomReceiver extends Receiver {

public CustomReceiver() {
super(StorageLevel.MEMORY_AND_DISK_2());
}

@Override
public void onStart() {
new Thread(this::receive).start();
}

private void receive() {
String input = "receiver input " + UUID.randomUUID().toString();
store(input);
}

@Override
public void onStop() {

}
}


Actually, just one Kafka consumer will be run which consumes committed
messages from kafka directly(, which is not so scalable, I think.).
But the main point of this approach which I need is that spark
session needs to be used to save rdd(parallelized consumed messages) to
iceberg table.
Consumed messages will be converted to spark rdd which will be saved to
iceberg table using spark session.

I have tested this spark streaming job with transactional producers which
send several millions of messages. Correctly consumed and saved to iceberg
tables correctly.

- Kidong.



2024년 4월 14일 (일) 오후 11:05, Mich Talebzadeh 님이 작성:

> Interesting
>
> My concern is infinite Loop in* foreachRDD*: The *while(true)* loop
> within foreachRDD creates an infinite loop within each Spark executor. This
> might not be the most efficient approach, especially since offsets are
> committed asynchronously.?
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Solutions Architect | Data Engineer  | Generative AI
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Sun, 14 Apr 2024 at 13:40, Kidong Lee  wrote:
>
>>
>> Because spark streaming for kafk transaction does not work correctly to
>> suit my need, I moved to another approach using raw kafka consumer which
>> handles read_committed messages from kafka correctly.
>>
>> My codes look like the following.
>>
>> JavaDStream stream = ssc.receiverStream(new CustomReceiver()); // 
>> CustomReceiver does nothing special except awaking foreach task.
>>
>> stream.foreachRDD(rdd -> {
>>
>>   KafkaConsumer consumer = new 
>> KafkaConsumer<>(consumerProperties);
>>
>>   consumer.subscribe(Arrays.asList(topic));
>>
>>   while(true){
>>
>> ConsumerRecords records =
>> consumer.poll(java.time.Duration.ofMillis(intervalMs));
>>
>> Map offsetMap = new HashMap<>();
>>
>> List someList = new ArrayList<>();
>>
>> for (ConsumerRecord consumerRecord : records) {
>>
>>   // add something to list.
>>
>>   // put offset to offsetMap.
>>
>> }
>>
>> // process someList.
>>
>> // commit offset.
>>
>> consumer.commitAsync(offsetMap, null);
>>
>>   }
>>
>> });
>>
>>
>> In addition, I increased max.poll.records to 10.
>>
>> Even if this raw kafka consumer approach is not so scalable, it consumes
>> read_committed messages from kafka correctly and is enough for me at the
>> moment.
>>
>> - Kidong.
>>
>>
>>
>> 2024년 4월 12일 (금) 오후 9:19, Kidong Lee 님이 작성:
>>
>>> Hi,
>>>
>>> I have a kafka producer which sends messages transactionally to kafka
>>> and spark streaming job which should consume read_committed messages from
>>> kafka.
>>> But there is a problem for spark streaming to consume read_committed
>>> messages.
>>> The count of messages sent by kafka producer transactionally is not the
>>> same to the count of the read_committed messages consumed by spark
>>> streaming.
>>>
>>> Some consumer properties of my spark streaming job are as follows.
>>>
>>> auto.offset.reset=earliest
>>> enable.auto.commit=false
>>> isolation.level=read_committed
>>>
>>>
>>> I also added the following spark streaming configuration.
>>>
>>> sparkConf.set("spark.streaming.kafka.allowNonConsecutiveOffsets", "true");
>>> sparkConf.set("spark.streaming.kafka.consumer.poll.ms", String.valueOf(2 * 
>>> 60 * 1000));
>>>
>>>
>>> My spark streaming is using DirectStream like this.
>>>
>>> JavaInputDStream> stream =
>>> KafkaUtils.createDirectStream(
>>> ssc,
>>> LocationStrategies.PreferConsistent(),
>>> ConsumerStrategies.>> GenericRecord>Subscribe(topics, kafkaParams)
>>> );
>>>
>>>
>>> stream.foreachRDD(rdd -> O
>>>
>>> 

Re: Spark streaming job for kafka transaction does not consume read_committed messages correctly.

2024-04-14 Thread Mich Talebzadeh
Interesting

My concern is infinite Loop in* foreachRDD*: The *while(true)* loop within
foreachRDD creates an infinite loop within each Spark executor. This might
not be the most efficient approach, especially since offsets are committed
asynchronously.?

HTH

Mich Talebzadeh,
Technologist | Solutions Architect | Data Engineer  | Generative AI
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Sun, 14 Apr 2024 at 13:40, Kidong Lee  wrote:

>
> Because spark streaming for kafk transaction does not work correctly to
> suit my need, I moved to another approach using raw kafka consumer which
> handles read_committed messages from kafka correctly.
>
> My codes look like the following.
>
> JavaDStream stream = ssc.receiverStream(new CustomReceiver()); // 
> CustomReceiver does nothing special except awaking foreach task.
>
> stream.foreachRDD(rdd -> {
>
>   KafkaConsumer consumer = new 
> KafkaConsumer<>(consumerProperties);
>
>   consumer.subscribe(Arrays.asList(topic));
>
>   while(true){
>
> ConsumerRecords records =
> consumer.poll(java.time.Duration.ofMillis(intervalMs));
>
> Map offsetMap = new HashMap<>();
>
> List someList = new ArrayList<>();
>
> for (ConsumerRecord consumerRecord : records) {
>
>   // add something to list.
>
>   // put offset to offsetMap.
>
> }
>
> // process someList.
>
> // commit offset.
>
> consumer.commitAsync(offsetMap, null);
>
>   }
>
> });
>
>
> In addition, I increased max.poll.records to 10.
>
> Even if this raw kafka consumer approach is not so scalable, it consumes
> read_committed messages from kafka correctly and is enough for me at the
> moment.
>
> - Kidong.
>
>
>
> 2024년 4월 12일 (금) 오후 9:19, Kidong Lee 님이 작성:
>
>> Hi,
>>
>> I have a kafka producer which sends messages transactionally to kafka and
>> spark streaming job which should consume read_committed messages from kafka.
>> But there is a problem for spark streaming to consume read_committed
>> messages.
>> The count of messages sent by kafka producer transactionally is not the
>> same to the count of the read_committed messages consumed by spark
>> streaming.
>>
>> Some consumer properties of my spark streaming job are as follows.
>>
>> auto.offset.reset=earliest
>> enable.auto.commit=false
>> isolation.level=read_committed
>>
>>
>> I also added the following spark streaming configuration.
>>
>> sparkConf.set("spark.streaming.kafka.allowNonConsecutiveOffsets", "true");
>> sparkConf.set("spark.streaming.kafka.consumer.poll.ms", String.valueOf(2 * 
>> 60 * 1000));
>>
>>
>> My spark streaming is using DirectStream like this.
>>
>> JavaInputDStream> stream =
>> KafkaUtils.createDirectStream(
>> ssc,
>> LocationStrategies.PreferConsistent(),
>> ConsumerStrategies.Subscribe(topics, 
>> kafkaParams)
>> );
>>
>>
>> stream.foreachRDD(rdd -> O
>>
>>// get offset ranges.
>>
>>OffsetRange[] offsetRanges = ((HasOffsetRanges) 
>> rdd.rdd()).offsetRanges();
>>
>>// process something.
>>
>>
>>// commit offset.
>>((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
>>
>> }
>> );
>>
>>
>>
>> I have tested with a kafka consumer written with raw kafka-clients jar
>> library without problem that it consumes read_committed messages correctly,
>> and the count of consumed read_committed messages is equal to the count of
>> messages sent by kafka producer.
>>
>>
>> And sometimes, I got the following exception.
>>
>> Job aborted due to stage failure: Task 0 in stage 324.0 failed 1 times,
>> most recent failure: Lost task 0.0 in stage 324.0 (TID 1674)
>> (chango-private-1.chango.private executor driver):
>> java.lang.IllegalArgumentException: requirement failed: Failed to get
>> records for compacted spark-executor-school-student-group school-student-7
>> after polling for 12
>>
>> at scala.Predef$.require(Predef.scala:281)
>>
>> at
>> org.apache.spark.streaming.kafka010.InternalKafkaConsumer.compactedNext(KafkaDataConsumer.scala:186)
>>
>> at
>> org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:60)
>>
>> at
>> org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext$(KafkaDataConsumer.scala:59)
>>
>> at
>> org.apache.spark.streaming.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:219)
>>
>>
>>
>> I have experienced spark streaming job which works fine with kafka
>> messages which are non-transactional, and I 

Re: Spark streaming job for kafka transaction does not consume read_committed messages correctly.

2024-04-14 Thread Kidong Lee
Because spark streaming for kafk transaction does not work correctly to
suit my need, I moved to another approach using raw kafka consumer which
handles read_committed messages from kafka correctly.

My codes look like the following.

JavaDStream stream = ssc.receiverStream(new CustomReceiver());
// CustomReceiver does nothing special except awaking foreach task.

stream.foreachRDD(rdd -> {

  KafkaConsumer consumer = new
KafkaConsumer<>(consumerProperties);

  consumer.subscribe(Arrays.asList(topic));

  while(true){

ConsumerRecords records =
consumer.poll(java.time.Duration.ofMillis(intervalMs));

Map offsetMap = new HashMap<>();

List someList = new ArrayList<>();

for (ConsumerRecord consumerRecord : records) {

  // add something to list.

  // put offset to offsetMap.

}

// process someList.

// commit offset.

consumer.commitAsync(offsetMap, null);

  }

});


In addition, I increased max.poll.records to 10.

Even if this raw kafka consumer approach is not so scalable, it consumes
read_committed messages from kafka correctly and is enough for me at the
moment.

- Kidong.



2024년 4월 12일 (금) 오후 9:19, Kidong Lee 님이 작성:

> Hi,
>
> I have a kafka producer which sends messages transactionally to kafka and
> spark streaming job which should consume read_committed messages from kafka.
> But there is a problem for spark streaming to consume read_committed
> messages.
> The count of messages sent by kafka producer transactionally is not the
> same to the count of the read_committed messages consumed by spark
> streaming.
>
> Some consumer properties of my spark streaming job are as follows.
>
> auto.offset.reset=earliest
> enable.auto.commit=false
> isolation.level=read_committed
>
>
> I also added the following spark streaming configuration.
>
> sparkConf.set("spark.streaming.kafka.allowNonConsecutiveOffsets", "true");
> sparkConf.set("spark.streaming.kafka.consumer.poll.ms", String.valueOf(2 * 60 
> * 1000));
>
>
> My spark streaming is using DirectStream like this.
>
> JavaInputDStream> stream =
> KafkaUtils.createDirectStream(
> ssc,
> LocationStrategies.PreferConsistent(),
> ConsumerStrategies.Subscribe(topics, 
> kafkaParams)
> );
>
>
> stream.foreachRDD(rdd -> O
>
>// get offset ranges.
>
>OffsetRange[] offsetRanges = ((HasOffsetRanges) 
> rdd.rdd()).offsetRanges();
>
>// process something.
>
>
>// commit offset.
>((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
>
> }
> );
>
>
>
> I have tested with a kafka consumer written with raw kafka-clients jar
> library without problem that it consumes read_committed messages correctly,
> and the count of consumed read_committed messages is equal to the count of
> messages sent by kafka producer.
>
>
> And sometimes, I got the following exception.
>
> Job aborted due to stage failure: Task 0 in stage 324.0 failed 1 times,
> most recent failure: Lost task 0.0 in stage 324.0 (TID 1674)
> (chango-private-1.chango.private executor driver):
> java.lang.IllegalArgumentException: requirement failed: Failed to get
> records for compacted spark-executor-school-student-group school-student-7
> after polling for 12
>
> at scala.Predef$.require(Predef.scala:281)
>
> at
> org.apache.spark.streaming.kafka010.InternalKafkaConsumer.compactedNext(KafkaDataConsumer.scala:186)
>
> at
> org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:60)
>
> at
> org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext$(KafkaDataConsumer.scala:59)
>
> at
> org.apache.spark.streaming.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:219)
>
>
>
> I have experienced spark streaming job which works fine with kafka
> messages which are non-transactional, and I never encountered the
> exceptions like above.
> It seems that spark streaming for kafka transaction does not handle such
> as kafka consumer properties like isolation.level=read_committed and
> enable.auto.commit=false correctly.
>
> Any help appreciated.
>
> - Kidong.
>
>
> --
> *이기동 *
> *Kidong Lee*
>
> Email: mykid...@gmail.com
> Chango: https://cloudcheflabs.github.io/chango-private-docs
> Web Site: http://www.cloudchef-labs.com/
> Mobile: +82 10 4981 7297
> 
>


-- 
*이기동 *
*Kidong Lee*

Email: mykid...@gmail.com
Chango: https://cloudcheflabs.github.io/chango-private-docs
Web Site: http://www.cloudchef-labs.com/
Mobile: +82 10 4981 7297



Re: Spark streaming job for kafka transaction does not consume read_committed messages correctly.

2024-04-13 Thread Kidong Lee
Thank you Mich for your reply.

Actually, I tried to do most of your advice.

When spark.streaming.kafka.allowNonConsecutiveOffsets=false, I got the
following error.

Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most
recent failure: Lost task 0.0 in stage 1.0 (TID 3)
(chango-private-1.chango.private executor driver):
java.lang.IllegalArgumentException: requirement failed: Got wrong record
for spark-executor-school-student-group school-student-7 even after seeking
to offset 11206961 got offset 11206962 instead. If this is a compacted
topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets

at scala.Predef$.require(Predef.scala:281)

at
org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:155)

at
org.apache.spark.streaming.kafka010.KafkaDataConsumer.get(KafkaDataConsumer.scala:40)

at
org.apache.spark.streaming.kafka010.KafkaDataConsumer.get$(KafkaDataConsumer.scala:39)

at
org.apache.spark.streaming.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:219)


And I tried to increase spark.streaming.kafka.consumer.poll.ms to avoid the
exceptions, but it did not help.


- Kidong.




2024년 4월 14일 (일) 오전 4:25, Mich Talebzadeh 님이 작성:

> Hi Kidong,
>
> There may be few potential reasons why the message counts from your Kafka
> producer and Spark Streaming consumer might not match, especially with
> transactional messages and read_committed isolation level.
>
> 1) Just ensure that both your Spark Streaming job and the Kafka consumer
> written with raw kafka-clients use the same consumer group. Messages are
> delivered to specific consumer groups, and if they differ, Spark Streaming
> might miss messages consumed by the raw consumer.
> 2) Your Spark Streaming configuration sets *enable.auto.commit=false* and
> uses *commitAsync manually*. However, I noted
> *spark.streaming.kafka.allowNonConsecutiveOffsets=true* which may be
> causing the problem. This setting allows Spark Streaming to read offsets
> that are not strictly increasing, which can happen with transactional
> reads. Generally recommended to set this to* false *for transactional
> reads to ensure Spark Streaming only reads committed messages.
> 3) Missed messages, in transactional messages, Kafka guarantees *delivery
> only after the transaction commits successfully. *There could be a slight
> delay between the producer sending the message and it becoming visible to
> consumers under read_committed isolation level. Spark Streaming could
> potentially miss messages during this window.
> 4) The exception Lost task 0.0 in stage 324.0, suggests a problem fetching
> records for a specific topic partition. Review your code handling of
> potential exceptions during rdd.foreachRDD processing. Ensure retries or
> appropriate error handling if encountering issues with specific partitions.
> 5) Try different configurations for *spark.streaming.kafka.consumer.poll.ms
> * to adjust polling
> frequency and potentially improve visibility into committed messages.
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Solutions Architect | Data Engineer  | Generative AI
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Fri, 12 Apr 2024 at 21:38, Kidong Lee  wrote:
>
>> Hi,
>>
>> I have a kafka producer which sends messages transactionally to kafka and
>> spark streaming job which should consume read_committed messages from kafka.
>> But there is a problem for spark streaming to consume read_committed
>> messages.
>> The count of messages sent by kafka producer transactionally is not the
>> same to the count of the read_committed messages consumed by spark
>> streaming.
>>
>> Some consumer properties of my spark streaming job are as follows.
>>
>> auto.offset.reset=earliest
>> enable.auto.commit=false
>> isolation.level=read_committed
>>
>>
>> I also added the following spark streaming configuration.
>>
>> sparkConf.set("spark.streaming.kafka.allowNonConsecutiveOffsets", "true");
>> sparkConf.set("spark.streaming.kafka.consumer.poll.ms", String.valueOf(2 * 
>> 60 * 1000));
>>
>>
>> My spark streaming is using DirectStream like this.
>>
>> JavaInputDStream> stream =
>> KafkaUtils.createDirectStream(
>> ssc,
>> LocationStrategies.PreferConsistent(),
>> ConsumerStrategies.Subscribe(topics, 
>> kafkaParams)
>> );
>>
>>
>> stream.foreachRDD(rdd -> O
>>
>>// get offset ranges.
>>
>> 

Re: Spark streaming job for kafka transaction does not consume read_committed messages correctly.

2024-04-13 Thread Mich Talebzadeh
Hi Kidong,

There may be few potential reasons why the message counts from your Kafka
producer and Spark Streaming consumer might not match, especially with
transactional messages and read_committed isolation level.

1) Just ensure that both your Spark Streaming job and the Kafka consumer
written with raw kafka-clients use the same consumer group. Messages are
delivered to specific consumer groups, and if they differ, Spark Streaming
might miss messages consumed by the raw consumer.
2) Your Spark Streaming configuration sets *enable.auto.commit=false* and
uses *commitAsync manually*. However, I noted
*spark.streaming.kafka.allowNonConsecutiveOffsets=true* which may be
causing the problem. This setting allows Spark Streaming to read offsets
that are not strictly increasing, which can happen with transactional
reads. Generally recommended to set this to* false *for transactional reads
to ensure Spark Streaming only reads committed messages.
3) Missed messages, in transactional messages, Kafka guarantees *delivery
only after the transaction commits successfully. *There could be a slight
delay between the producer sending the message and it becoming visible to
consumers under read_committed isolation level. Spark Streaming could
potentially miss messages during this window.
4) The exception Lost task 0.0 in stage 324.0, suggests a problem fetching
records for a specific topic partition. Review your code handling of
potential exceptions during rdd.foreachRDD processing. Ensure retries or
appropriate error handling if encountering issues with specific partitions.
5) Try different configurations for *spark.streaming.kafka.consumer.poll.ms
* to adjust polling
frequency and potentially improve visibility into committed messages.

HTH

Mich Talebzadeh,
Technologist | Solutions Architect | Data Engineer  | Generative AI
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Fri, 12 Apr 2024 at 21:38, Kidong Lee  wrote:

> Hi,
>
> I have a kafka producer which sends messages transactionally to kafka and
> spark streaming job which should consume read_committed messages from kafka.
> But there is a problem for spark streaming to consume read_committed
> messages.
> The count of messages sent by kafka producer transactionally is not the
> same to the count of the read_committed messages consumed by spark
> streaming.
>
> Some consumer properties of my spark streaming job are as follows.
>
> auto.offset.reset=earliest
> enable.auto.commit=false
> isolation.level=read_committed
>
>
> I also added the following spark streaming configuration.
>
> sparkConf.set("spark.streaming.kafka.allowNonConsecutiveOffsets", "true");
> sparkConf.set("spark.streaming.kafka.consumer.poll.ms", String.valueOf(2 * 60 
> * 1000));
>
>
> My spark streaming is using DirectStream like this.
>
> JavaInputDStream> stream =
> KafkaUtils.createDirectStream(
> ssc,
> LocationStrategies.PreferConsistent(),
> ConsumerStrategies.Subscribe(topics, 
> kafkaParams)
> );
>
>
> stream.foreachRDD(rdd -> O
>
>// get offset ranges.
>
>OffsetRange[] offsetRanges = ((HasOffsetRanges) 
> rdd.rdd()).offsetRanges();
>
>// process something.
>
>
>// commit offset.
>((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
>
> }
> );
>
>
>
> I have tested with a kafka consumer written with raw kafka-clients jar
> library without problem that it consumes read_committed messages correctly,
> and the count of consumed read_committed messages is equal to the count of
> messages sent by kafka producer.
>
>
> And sometimes, I got the following exception.
>
> Job aborted due to stage failure: Task 0 in stage 324.0 failed 1 times,
> most recent failure: Lost task 0.0 in stage 324.0 (TID 1674)
> (chango-private-1.chango.private executor driver):
> java.lang.IllegalArgumentException: requirement failed: Failed to get
> records for compacted spark-executor-school-student-group school-student-7
> after polling for 12
>
> at scala.Predef$.require(Predef.scala:281)
>
> at
> org.apache.spark.streaming.kafka010.InternalKafkaConsumer.compactedNext(KafkaDataConsumer.scala:186)
>
> at
> org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:60)
>
> at
> org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext$(KafkaDataConsumer.scala:59)
>
> at
> 

Spark streaming job for kafka transaction does not consume read_committed messages correctly.

2024-04-12 Thread Kidong Lee
Hi,

I have a kafka producer which sends messages transactionally to kafka and
spark streaming job which should consume read_committed messages from kafka.
But there is a problem for spark streaming to consume read_committed
messages.
The count of messages sent by kafka producer transactionally is not the
same to the count of the read_committed messages consumed by spark
streaming.

Some consumer properties of my spark streaming job are as follows.

auto.offset.reset=earliest
enable.auto.commit=false
isolation.level=read_committed


I also added the following spark streaming configuration.

sparkConf.set("spark.streaming.kafka.allowNonConsecutiveOffsets", "true");
sparkConf.set("spark.streaming.kafka.consumer.poll.ms",
String.valueOf(2 * 60 * 1000));


My spark streaming is using DirectStream like this.

JavaInputDStream> stream =
KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topics, kafkaParams)
);


stream.foreachRDD(rdd -> O

   // get offset ranges.

   OffsetRange[] offsetRanges = ((HasOffsetRanges)
rdd.rdd()).offsetRanges();

   // process something.


   // commit offset.
   ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);

}
);



I have tested with a kafka consumer written with raw kafka-clients jar
library without problem that it consumes read_committed messages correctly,
and the count of consumed read_committed messages is equal to the count of
messages sent by kafka producer.


And sometimes, I got the following exception.

Job aborted due to stage failure: Task 0 in stage 324.0 failed 1 times,
most recent failure: Lost task 0.0 in stage 324.0 (TID 1674)
(chango-private-1.chango.private executor driver):
java.lang.IllegalArgumentException: requirement failed: Failed to get
records for compacted spark-executor-school-student-group school-student-7
after polling for 12

at scala.Predef$.require(Predef.scala:281)

at
org.apache.spark.streaming.kafka010.InternalKafkaConsumer.compactedNext(KafkaDataConsumer.scala:186)

at
org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:60)

at
org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext$(KafkaDataConsumer.scala:59)

at
org.apache.spark.streaming.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:219)



I have experienced spark streaming job which works fine with kafka messages
which are non-transactional, and I never encountered the exceptions like
above.
It seems that spark streaming for kafka transaction does not handle such as
kafka consumer properties like isolation.level=read_committed and
enable.auto.commit=false correctly.

Any help appreciated.

- Kidong.


-- 
*이기동 *
*Kidong Lee*

Email: mykid...@gmail.com
Chango: https://cloudcheflabs.github.io/chango-private-docs
Web Site: http://www.cloudchef-labs.com/
Mobile: +82 10 4981 7297



Spark column headings, camelCase or snake case?

2024-04-11 Thread Mich Talebzadeh
I know this is a bit of a silly question. But what is the norm for
 Sparkcolumn headings? Is it camelCase or snakec_ase. For example here "
someone suggested and I quote
SumTotalInMillionGBP" accurately conveys the meaning but is a bit long and
uses camelCase, which is not the standard convention for Spark DataFrames
(usually snake_case). Use snake_case for better readability like:
"total_price_in_millions_gbp"

So this is the gist

+--+-+---+
|district  |NumberOfOffshoreOwned|total_price_in_millions_gbp|
+--+-+---+
|CITY OF WESTMINSTER   |4452 |21472.5|
|KENSINGTON AND CHELSEA|2403 |6544.8 |
|CAMDEN|1023 |4275.9 |
|SOUTHWARK |1080 |3938.0 |
|ISLINGTON |627  |3062.0 |
|TOWER HAMLETS |1715 |3008.0 |
|HAMMERSMITH AND FULHAM|765  |2137.2 |

Now I recently saw a note (if i recall correctly) that Spark should be
using camelCase in new spark related documents. What are the accepted views
or does it matter?

Thanks
Mich Talebzadeh,

Technologist | Solutions Architect | Data Engineer  | Generative AI

London
United Kingdom


   view my Linkedin profile


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge
but of course cannot be guaranteed . It is essential to note that, as with
any advice, quote "one test result is worth one-thousand expert opinions
(Werner Von Braun)".


Re: External Spark shuffle service for k8s

2024-04-11 Thread Bjørn Jørgensen
I think this answers your question about what to do if you need more space
on nodes.

https://spark.apache.org/docs/latest/running-on-kubernetes.html#local-storage

Local Storage


Spark supports using volumes to spill data during shuffles and other
operations. To use a volume as local storage, the volume’s name should
starts with spark-local-dir-, for example:

--conf 
spark.kubernetes.driver.volumes.[VolumeType].spark-local-dir-[VolumeName].mount.path=
--conf 
spark.kubernetes.driver.volumes.[VolumeType].spark-local-dir-[VolumeName].mount.readOnly=false

Specifically, you can use persistent volume claims if the jobs require
large shuffle and sorting operations in executors.

spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName=OnDemand
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.storageClass=gp
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.sizeLimit=500Gi
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path=/data
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly=false

To enable shuffle data recovery feature via the built-in
KubernetesLocalDiskShuffleDataIO plugin, we need to have the followings.
You may want to enable
spark.kubernetes.driver.waitToReusePersistentVolumeClaim additionally.

spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path=/data/spark-x/executor-x
spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO

If no volume is set as local storage, Spark uses temporary scratch space to
spill data to disk during shuffles and other operations. When using
Kubernetes as the resource manager the pods will be created with an emptyDir
 volume
mounted for each directory listed in spark.local.dir or the environment
variable SPARK_LOCAL_DIRS . If no directories are explicitly specified then
a default directory is created and configured appropriately.

emptyDir volumes use the ephemeral storage feature of Kubernetes and do not
persist beyond the life of the pod.

tor. 11. apr. 2024 kl. 10:29 skrev Bjørn Jørgensen :

> " In the end for my usecase I started using pvcs and pvc aware scheduling
> along with decommissioning. So far performance is good with this choice."
> How did you do this?
>
>
> tor. 11. apr. 2024 kl. 04:13 skrev Arun Ravi :
>
>> Hi Everyone,
>>
>> I had to explored IBM's and AWS's S3 shuffle plugins (some time back), I
>> had also explored AWS FSX lustre in few of my production jobs which has
>> ~20TB of shuffle operations with 200-300 executors. What I have observed is
>> S3 and fax behaviour was fine during the write phase, however I faced iops
>> throttling during the read phase(read taking forever to complete). I think
>> this might be contributed by the heavy use of shuffle index file (I didn't
>> perform any extensive research on this), so I believe the shuffle manager
>> logic have to be intelligent enough to reduce the fetching of files from
>> object store. In the end for my usecase I started using pvcs and pvc aware
>> scheduling along with decommissioning. So far performance is good with this
>> choice.
>>
>> Thank you
>>
>> On Tue, 9 Apr 2024, 15:17 Mich Talebzadeh, 
>> wrote:
>>
>>> Hi,
>>>
>>> First thanks everyone for their contributions
>>>
>>> I was going to reply to @Enrico Minack   but
>>> noticed additional info. As I understand for example,  Apache Uniffle is an
>>> incubating project aimed at providing a pluggable shuffle service for
>>> Spark. So basically, all these "external shuffle services" have in common
>>> is to offload shuffle data management to external services, thus reducing
>>> the memory and CPU overhead on Spark executors. That is great.  While
>>> Uniffle and others enhance shuffle performance and scalability, it would be
>>> great to integrate them with Spark UI. This may require additional
>>> development efforts. I suppose  the interest would be to have these
>>> external matrices incorporated into Spark with one look and feel. This may
>>> require customizing the UI to fetch and display metrics or statistics from
>>> the external shuffle services. Has any project done this?
>>>
>>> Thanks
>>>
>>> Mich Talebzadeh,
>>> Technologist | Solutions Architect | Data Engineer  | Generative AI
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* The information provided is correct to the best of my
>>> knowledge but of course cannot be guaranteed . It is essential to note
>>> that, as with any advice, quote "one test result is worth one-thousand
>>> expert opinions (Werner
>>> 

Re: [Spark SQL]: Source code for PartitionedFile

2024-04-11 Thread Ashley McManamon
Hi Mich,

Thanks for the reply.

I did come across that file but it didn't align with the appearance of
`PartitionedFile`:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala

In fact, the code snippet you shared also references the type
`PartitionedFile`.

There's actually this javadoc.io page for a `PartitionedFile`
at org.apache.spark.sql.execution.datasources for spark-sql_2.12:3.0.2:
https://javadoc.io/doc/org.apache.spark/spark-sql_2.12/3.0.2/org/apache/spark/sql/execution/datasources/PartitionedFile.html.
I double checked the source code for version 3.0.2 and doesn't seem to
exist there either:
https://github.com/apache/spark/tree/v3.0.2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources

Ashley


On Mon, 8 Apr 2024 at 22:41, Mich Talebzadeh 
wrote:

> Hi,
>
> I believe this is the package
>
>
> https://raw.githubusercontent.com/apache/spark/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala
>
> And the code
>
> case class FilePartition(index: Int, files: Array[PartitionedFile])
>   extends Partition with InputPartition {
>   override def preferredLocations(): Array[String] = {
> // Computes total number of bytes that can be retrieved from each host.
> val hostToNumBytes = mutable.HashMap.empty[String, Long]
> files.foreach { file =>
>   file.locations.filter(_ != "localhost").foreach { host =>
> hostToNumBytes(host) = hostToNumBytes.getOrElse(host, 0L) +
> file.length
>   }
> }
>
> // Selects the first 3 hosts with the most data to be retrieved.
> hostToNumBytes.toSeq.sortBy {
>   case (host, numBytes) => numBytes
> }.reverse.take(3).map {
>   case (host, numBytes) => host
> }.toArray
>   }
> }
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Solutions Architect | Data Engineer  | Generative AI
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Mon, 8 Apr 2024 at 20:31, Ashley McManamon <
> ashley.mcmana...@quantcast.com> wrote:
>
>> Hi All,
>>
>> I've been diving into the source code to get a better understanding of
>> how file splitting works from a user perspective. I've hit a deadend at
>> `PartitionedFile`, for which I cannot seem to find a definition? It appears
>> though it should be found at
>> org.apache.spark.sql.execution.datasources but I find no definition in
>> the entire source code. Am I missing something?
>>
>> I appreciate there may be an obvious answer here, apologies if I'm being
>> naive.
>>
>> Thanks,
>> Ashley McManamon
>>
>>


Re: External Spark shuffle service for k8s

2024-04-11 Thread Bjørn Jørgensen
" In the end for my usecase I started using pvcs and pvc aware scheduling
along with decommissioning. So far performance is good with this choice."
How did you do this?


tor. 11. apr. 2024 kl. 04:13 skrev Arun Ravi :

> Hi Everyone,
>
> I had to explored IBM's and AWS's S3 shuffle plugins (some time back), I
> had also explored AWS FSX lustre in few of my production jobs which has
> ~20TB of shuffle operations with 200-300 executors. What I have observed is
> S3 and fax behaviour was fine during the write phase, however I faced iops
> throttling during the read phase(read taking forever to complete). I think
> this might be contributed by the heavy use of shuffle index file (I didn't
> perform any extensive research on this), so I believe the shuffle manager
> logic have to be intelligent enough to reduce the fetching of files from
> object store. In the end for my usecase I started using pvcs and pvc aware
> scheduling along with decommissioning. So far performance is good with this
> choice.
>
> Thank you
>
> On Tue, 9 Apr 2024, 15:17 Mich Talebzadeh, 
> wrote:
>
>> Hi,
>>
>> First thanks everyone for their contributions
>>
>> I was going to reply to @Enrico Minack   but
>> noticed additional info. As I understand for example,  Apache Uniffle is an
>> incubating project aimed at providing a pluggable shuffle service for
>> Spark. So basically, all these "external shuffle services" have in common
>> is to offload shuffle data management to external services, thus reducing
>> the memory and CPU overhead on Spark executors. That is great.  While
>> Uniffle and others enhance shuffle performance and scalability, it would be
>> great to integrate them with Spark UI. This may require additional
>> development efforts. I suppose  the interest would be to have these
>> external matrices incorporated into Spark with one look and feel. This may
>> require customizing the UI to fetch and display metrics or statistics from
>> the external shuffle services. Has any project done this?
>>
>> Thanks
>>
>> Mich Talebzadeh,
>> Technologist | Solutions Architect | Data Engineer  | Generative AI
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner
>> Von Braun
>> )".
>>
>>
>> On Mon, 8 Apr 2024 at 14:19, Vakaris Baškirov <
>> vakaris.bashki...@gmail.com> wrote:
>>
>>> I see that both Uniffle and Celebron support S3/HDFS backends which is
>>> great.
>>> In the case someone is using S3/HDFS, I wonder what would be the
>>> advantages of using Celebron or Uniffle vs IBM shuffle service plugin
>>>  or Cloud Shuffle Storage
>>> Plugin from AWS
>>> 
>>> ?
>>>
>>> These plugins do not require deploying a separate service. Are there any
>>> advantages to using Uniffle/Celebron in the case of using S3 backend, which
>>> would require deploying a separate service?
>>>
>>> Thanks
>>> Vakaris
>>>
>>> On Mon, Apr 8, 2024 at 10:03 AM roryqi  wrote:
>>>
 Apache Uniffle (incubating) may be another solution.
 You can see
 https://github.com/apache/incubator-uniffle

 https://uniffle.apache.org/blog/2023/07/21/Uniffle%20-%20New%20chapter%20for%20the%20shuffle%20in%20the%20cloud%20native%20era

 Mich Talebzadeh  于2024年4月8日周一 07:15写道:

> Splendid
>
> The configurations below can be used with k8s deployments of Spark.
> Spark applications running on k8s can utilize these configurations to
> seamlessly access data stored in Google Cloud Storage (GCS) and Amazon S3.
>
> For Google GCS we may have
>
> spark_config_gcs = {
> "spark.kubernetes.authenticate.driver.serviceAccountName":
> "service_account_name",
> "spark.hadoop.fs.gs.impl":
> "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
> "spark.hadoop.google.cloud.auth.service.account.enable": "true",
> "spark.hadoop.google.cloud.auth.service.account.json.keyfile":
> "/path/to/keyfile.json",
> }
>
> For Amazon S3 similar
>
> spark_config_s3 = {
> "spark.kubernetes.authenticate.driver.serviceAccountName":
> "service_account_name",
> "spark.hadoop.fs.s3a.impl":
> "org.apache.hadoop.fs.s3a.S3AFileSystem",
> "spark.hadoop.fs.s3a.access.key": "s3_access_key",
> "spark.hadoop.fs.s3a.secret.key": "secret_key",
> }
>
>
> To implement these configurations and enable Spark applications to
> interact with GCS and S3, I 

Re: External Spark shuffle service for k8s

2024-04-10 Thread Arun Ravi
Hi Everyone,

I had to explored IBM's and AWS's S3 shuffle plugins (some time back), I
had also explored AWS FSX lustre in few of my production jobs which has
~20TB of shuffle operations with 200-300 executors. What I have observed is
S3 and fax behaviour was fine during the write phase, however I faced iops
throttling during the read phase(read taking forever to complete). I think
this might be contributed by the heavy use of shuffle index file (I didn't
perform any extensive research on this), so I believe the shuffle manager
logic have to be intelligent enough to reduce the fetching of files from
object store. In the end for my usecase I started using pvcs and pvc aware
scheduling along with decommissioning. So far performance is good with this
choice.

Thank you

On Tue, 9 Apr 2024, 15:17 Mich Talebzadeh, 
wrote:

> Hi,
>
> First thanks everyone for their contributions
>
> I was going to reply to @Enrico Minack   but
> noticed additional info. As I understand for example,  Apache Uniffle is an
> incubating project aimed at providing a pluggable shuffle service for
> Spark. So basically, all these "external shuffle services" have in common
> is to offload shuffle data management to external services, thus reducing
> the memory and CPU overhead on Spark executors. That is great.  While
> Uniffle and others enhance shuffle performance and scalability, it would be
> great to integrate them with Spark UI. This may require additional
> development efforts. I suppose  the interest would be to have these
> external matrices incorporated into Spark with one look and feel. This may
> require customizing the UI to fetch and display metrics or statistics from
> the external shuffle services. Has any project done this?
>
> Thanks
>
> Mich Talebzadeh,
> Technologist | Solutions Architect | Data Engineer  | Generative AI
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Mon, 8 Apr 2024 at 14:19, Vakaris Baškirov 
> wrote:
>
>> I see that both Uniffle and Celebron support S3/HDFS backends which is
>> great.
>> In the case someone is using S3/HDFS, I wonder what would be the
>> advantages of using Celebron or Uniffle vs IBM shuffle service plugin
>>  or Cloud Shuffle Storage
>> Plugin from AWS
>> 
>> ?
>>
>> These plugins do not require deploying a separate service. Are there any
>> advantages to using Uniffle/Celebron in the case of using S3 backend, which
>> would require deploying a separate service?
>>
>> Thanks
>> Vakaris
>>
>> On Mon, Apr 8, 2024 at 10:03 AM roryqi  wrote:
>>
>>> Apache Uniffle (incubating) may be another solution.
>>> You can see
>>> https://github.com/apache/incubator-uniffle
>>>
>>> https://uniffle.apache.org/blog/2023/07/21/Uniffle%20-%20New%20chapter%20for%20the%20shuffle%20in%20the%20cloud%20native%20era
>>>
>>> Mich Talebzadeh  于2024年4月8日周一 07:15写道:
>>>
 Splendid

 The configurations below can be used with k8s deployments of Spark.
 Spark applications running on k8s can utilize these configurations to
 seamlessly access data stored in Google Cloud Storage (GCS) and Amazon S3.

 For Google GCS we may have

 spark_config_gcs = {
 "spark.kubernetes.authenticate.driver.serviceAccountName":
 "service_account_name",
 "spark.hadoop.fs.gs.impl":
 "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
 "spark.hadoop.google.cloud.auth.service.account.enable": "true",
 "spark.hadoop.google.cloud.auth.service.account.json.keyfile":
 "/path/to/keyfile.json",
 }

 For Amazon S3 similar

 spark_config_s3 = {
 "spark.kubernetes.authenticate.driver.serviceAccountName":
 "service_account_name",
 "spark.hadoop.fs.s3a.impl":
 "org.apache.hadoop.fs.s3a.S3AFileSystem",
 "spark.hadoop.fs.s3a.access.key": "s3_access_key",
 "spark.hadoop.fs.s3a.secret.key": "secret_key",
 }


 To implement these configurations and enable Spark applications to
 interact with GCS and S3, I guess we can approach it this way

 1) Spark Repository Integration: These configurations need to be added
 to the Spark repository as part of the supported configuration options for
 k8s deployments.

 2) Configuration Settings: Users need to specify these configurations
 when submitting Spark applications to a Kubernetes cluster. They can
 include 

Re: Re: [Spark SQL] How can I use .sql() in conjunction with watermarks?

2024-04-09 Thread Mich Talebzadeh
interesting. So below should be the corrected code with the suggestion in
the [SPARK-47718] .sql() does not recognize watermark defined upstream -
ASF JIRA (apache.org) 

# Define schema for parsing Kafka messages
schema = StructType([
StructField('createTime', TimestampType(), True),
StructField('orderId', LongType(), True),
StructField('payAmount', DoubleType(), True),
StructField('payPlatform', IntegerType(), True),
StructField('provinceId', IntegerType(), True),
])

# Read streaming data from Kafka source
streaming_df = session.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "payment_msg") \
.option("startingOffsets", "earliest") \
.load() \
.select(from_json(col("value").cast("string"),
schema).alias("parsed_value")) \
.select("parsed_value.*") \
.withWatermark("createTime", "10 seconds")

# Create temporary view for SQL queries
*streaming_df.createOrReplaceTempView("streaming_df")*
# Define SQL query with correct window function usage
query = """
*SELECT*
*window(start, '1 hour', '30 minutes') as window,*
provinceId,
sum(payAmount) as totalPayAmount
FROM streaming_df
GROUP BY provinceId, window(start, '1 hour', '30 minutes')
ORDER BY window.start
"""

# Write the aggregated results to Kafka sink
stream = session.sql(query) \
.writeStream \
.format("kafka") \
.option("checkpointLocation", "checkpoint") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "sink") \
.start()


Mich Talebzadeh,
Technologist | Solutions Architect | Data Engineer  | Generative AI
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Tue, 9 Apr 2024 at 21:45, 刘唯  wrote:

> Sorry this is not a bug but essentially a user error. Spark throws a
> really confusing error and I'm also confused. Please see the reply in the
> ticket for how to make things correct.
> https://issues.apache.org/jira/browse/SPARK-47718
>
> 刘唯  于2024年4月6日周六 11:41写道:
>
>> This indeed looks like a bug. I will take some time to look into it.
>>
>> Mich Talebzadeh  于2024年4月3日周三 01:55写道:
>>
>>>
>>> hm. you are getting below
>>>
>>> AnalysisException: Append output mode not supported when there are
>>> streaming aggregations on streaming DataFrames/DataSets without watermark;
>>>
>>> The problem seems to be that you are using the append output mode when
>>> writing the streaming query results to Kafka. This mode is designed for
>>> scenarios where you want to append new data to an existing dataset at the
>>> sink (in this case, the "sink" topic in Kafka). However, your query
>>> involves a streaming aggregation: group by provinceId, window('createTime',
>>> '1 hour', '30 minutes'). The problem is that Spark Structured Streaming
>>> requires a watermark to ensure exactly-once processing when using
>>> aggregations with append mode. Your code already defines a watermark on the
>>> "createTime" column with a delay of 10 seconds (withWatermark("createTime",
>>> "10 seconds")). However, the error message indicates it is missing on the
>>> start column. Try adding watermark to "start" Column: Modify your code as
>>> below  to include a watermark on the "start" column generated by the
>>> window function:
>>>
>>> from pyspark.sql.functions import col, from_json, explode, window, sum,
>>> watermark
>>>
>>> streaming_df = session.readStream \
>>>   .format("kafka") \
>>>   .option("kafka.bootstrap.servers", "localhost:9092") \
>>>   .option("subscribe", "payment_msg") \
>>>   .option("startingOffsets", "earliest") \
>>>   .load() \
>>>   .select(from_json(col("value").cast("string"),
>>> schema).alias("parsed_value")) \
>>>   .select("parsed_value.*") \
>>>   .withWatermark("createTime", "10 seconds")  # Existing watermark on
>>> createTime
>>>
>>> *# Modified section with watermark on 'start' column*
>>> streaming_df = streaming_df.groupBy(
>>>   col("provinceId"),
>>>   window(col("createTime"), "1 hour", "30 minutes")
>>> ).agg(
>>>   sum(col("payAmount")).alias("totalPayAmount")
>>> ).withWatermark(expr("start"), "10 seconds")  # Watermark on
>>> window-generated 'start'
>>>
>>> # Rest of the code remains the same
>>> streaming_df.createOrReplaceTempView("streaming_df")
>>>
>>> spark.sql("""
>>> SELECT
>>>   window.start, window.end, provinceId, totalPayAmount
>>> FROM streaming_df
>>> ORDER BY window.start
>>> """) \
>>> .writeStream \
>>> .format("kafka") \
>>> .option("checkpointLocation", 

  1   2   3   4   5   6   7   8   9   10   >