Re: startTimestamp doesn't work when using rate-micro-batch format

2024-01-28 Thread Mich Talebzadeh
OK

This is the equivalent Python code

from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, when
from pyspark.sql.types import StructType, StructField, LongType
from datetime import datetime

spark = SparkSession.builder \
.master("local[*]") \
.appName("StreamingSparkPartitioned") \
.getOrCreate()

expression = when(expr("value % 3 = 1"), "stupid_event") \
.otherwise(when(expr("value % 3 = 2"),
"smart_event").otherwise("neutral_event"))

# Define the schema to match the rate-micro-batch data source
schema = StructType([StructField("timestamp", LongType()),
StructField("value", LongType())])
checkpoint_path = "file:///ssd/hduser/randomdata/chkpt"

# Convert human-readable timestamp to Unix timestamp in milliseconds
start_timestamp = int(datetime(2024, 1, 28).timestamp() * 1000)

streamingDF = spark.readStream \
.format("rate-micro-batch") \
.option("rowsPerBatch", "100") \
.option("startTimestamp", start_timestamp) \
.option("numPartitions", 1) \
.load() \
.withColumn("event_type", expression)

query = (
streamingDF.writeStream
.outputMode("append")
.format("console")
.trigger(processingTime="1 second")
.option("checkpointLocation", checkpoint_path)
.start()
)

query.awaitTermination()

This is the error I am getting
  File "/home/hduser/dba/bin/python/DSBQ/src/StreamingSparkPartitioned.py",
line 38, in 
query.awaitTermination()
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/streaming/query.py",
line 201, in awaitTermination
  File "/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py",
line 1322, in __call__
  File
"/opt/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py",
line 175, in deco
pyspark.errors.exceptions.captured.StreamingQueryException: [STREAM_FAILED]
Query [id = db2fd1cc-cc72-439e-9dcb-8acfd2e9a61e, runId =
f71cd26f-7e86-491c-bcf2-ab2e3dc63eca] terminated with exception: No usable
value for offset
Did not find value which can be converted into long

Seems like there might be an issue with the *rate-micro-batch* source when
using the *startTimestamp* option.

You can try using socket source for testing purposes

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 28 Jan 2024 at 22:00, Perfect Stranger 
wrote:

> I described the issue here:
>
>
> https://stackoverflow.com/questions/77893939/how-does-starttimestamp-option-work-for-the-rate-micro-batch-format
>
> Could someone please respond?
>
> The rate-micro-batch format doesn't seem to respect the startTimestamp
> option.
>
> Thanks.
>


startTimestamp doesn't work when using rate-micro-batch format

2024-01-28 Thread Perfect Stranger
I described the issue here:

https://stackoverflow.com/questions/77893939/how-does-starttimestamp-option-work-for-the-rate-micro-batch-format

Could someone please respond?

The rate-micro-batch format doesn't seem to respect the startTimestamp
option.

Thanks.


subscribe

2024-01-26 Thread Sahib Aulakh
subscribe


subscribe

2024-01-26 Thread Sahib Aulakh



Re: [Structured Streaming] Avoid one microbatch delay with multiple stateful operations

2024-01-24 Thread Andrzej Zera
Hi,

I'm sorry but I got confused about the inner workings of late events
watermark. You're completely right. Thanks for clarifying.

Regards,
Andrzej

czw., 11 sty 2024 o 13:02 Jungtaek Lim 
napisał(a):

> Hi,
>
> The time window is closed and evicted as long as "eviction watermark"
> passes the end of the window. Late events watermark only deals with
> discarding late events from "inputs". We did not introduce additional delay
> on the work of multiple stateful operators. We just allowed more late
> events to be accepted.
>
> Hope this helps.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> On Thu, Jan 11, 2024 at 6:13 AM Andrzej Zera 
> wrote:
>
>> I'm struggling with the following issue in Spark >=3.4, related to
>> multiple stateful operations.
>>
>> When spark.sql.streaming.statefulOperator.allowMultiple is enabled,
>> Spark keeps track of two types of watermarks:
>> eventTimeWatermarkForEviction and eventTimeWatermarkForLateEvents.
>> Introducing them allowed chaining multiple stateful operations but also
>> introduced an additional delay for getting the output out of the streaming
>> query.
>>
>> I'll show this on the example. Assume we have a stream of click events
>> and we aggregate it first by 1-min window and then by 5-min window. If we
>> have a trigger interval of 30s, then in most cases we'll get output 30s
>> later compared to single stateful operations queries. To find out how,
>> let's look at the following examples:
>>
>> Example 1. Single stateful operation (aggregation by 5-min window, assume
>> watermark is 0 seconds)
>>
>> Wall clock
>> (microbatch processing starts) Max event timestamp
>> at the time of getting data from Kafka
>> Global watermark Output
>> 14:10:00 14:09:56 0 -
>> 14:10:30 14:10:26 14:09:56 -
>> 14:11:00 14:10:56 14:10:26 window <14:05, 14:10)
>>
>> Example 2. Mutliple stateful operations (aggregation by 1-min window
>> followed by aggregation by 5-min window, assume watermark is 0 seconds)
>>
>> Wall clock
>> (microbatch processing starts) Max event timestamp at the time of
>> getting data from Kafka Late events watermark Eviction watermark Output
>> 14:10:00 14:09:56 0 0 -
>> 14:10:30 14:10:26 0 14:09:56 -
>> 14:11:00 14:10:56 14:09:56 14:10:26 -
>> 14:11:30 14:11:26 14:10:26 14:10:56 window <14:05, 14:10)
>>
>> In Example 2, we need to wait until both watermarks cross the end of the
>> window to get the output for that window, which happens one iteration later
>> compared to Example 1.
>>
>> Now, in use cases that require near-real-time processing, this one
>> iteration delay can be quite a significant difference.
>>
>> Do we have any option to make streaming queries with multiple stateful
>> operations output data without waiting this extra iteration? One of my
>> ideas was to force an empty microbatch to run and propagate late events
>> watermark without any new data. While this conceptually works, I didn't
>> find a way to trigger an empty microbatch while being connected to Kafka
>> that constantly receives new data and while having a constant 30s trigger
>> interval.
>>
>> Thanks,
>> Andrzej
>>
>


Some optimization questions about our beloved engine Spark

2024-01-23 Thread Aissam Chia
Hi,

I hope this email finds you well.

Currently, I'm working on spark SQL and I have two main questions that I've
been struggling with for 2 weeks now. I'm running spark on AWS EMR :

   1. I'm running 30 spark applications in the same cluster. My
   applications are basically some SQL transformations that are computed on
   data stored on S3. I monitored the cluster using Ganglia and I noticed that
   most of the time I'm using roughly 2% on the total CPU. How can I
   optimize/maximize resource usage ?
   2. I have a spark application that is computing approximately 2Tb of
   data. I'm using 50 rg6.2xlarge ec2 instances and I'm using 90% of the total
   CPU capacity. My SQL job consists of joining a timeseries table with
   another table that I broadcast on the different nodes. Is it possible to
   control which data to load to each node ? For example, I want each node to
   group the rows with the same joining key.

Thank you for your time.

Kind regards,
Aissam CHIA


Facing Error org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for s3ablock-0001-

2024-01-17 Thread Abhishek Singla
Hi Team,

Version: 3.2.2
Java Version: 1.8.0_211
Scala Version: 2.12.15
Cluster: Standalone

I am using Spark Streaming to read from Kafka and write to S3. The job
fails with below error if there are no records published to Kafka for a few
days and then there are some records published. Could someone help me in
identifying the root cause of this job failure.

24/01/17 10:49:22 ERROR MicroBatchExecution: Query [id =
72ee1070-7e05-4999-8b55-2a99e216ec51, runId =
0919e548-9706-4757-be94-359848100070] terminated with error
org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find
any valid local directory for s3ablock-0001-
at 
org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:462)
at 
org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:165)
at 
org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:1019)
at 
org.apache.hadoop.fs.s3a.S3ADataBlocks$DiskBlockFactory.create(S3ADataBlocks.java:816)
at 
org.apache.hadoop.fs.s3a.S3ABlockOutputStream.createBlockIfNeeded(S3ABlockOutputStream.java:204)
at 
org.apache.hadoop.fs.s3a.S3ABlockOutputStream.(S3ABlockOutputStream.java:182)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:1369)
at org.apache.hadoop.fs.FileSystem.primitiveCreate(FileSystem.java:1305)
at 
org.apache.hadoop.fs.DelegateToFileSystem.createInternal(DelegateToFileSystem.java:102)
at 
org.apache.hadoop.fs.AbstractFileSystem.create(AbstractFileSystem.java:626)
at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:701)
at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:697)
at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
at org.apache.hadoop.fs.FileContext.create(FileContext.java:703)
at 
org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createTempFile(CheckpointFileManager.scala:327)
at 
org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.(CheckpointFileManager.scala:140)
at 
org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.(CheckpointFileManager.scala:143)
at 
org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createAtomic(CheckpointFileManager.scala:333)
at 
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$addNewBatchByStream$2(HDFSMetadataLog.scala:173)
at 
scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at scala.Option.getOrElse(Option.scala:189)
at 
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.addNewBatchByStream(HDFSMetadataLog.scala:171)
at 
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:116)
at 
org.apache.spark.sql.execution.streaming.OffsetSeqLog.add(OffsetSeqLog.scala:53)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$14(MicroBatchExecution.scala:442)
at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:440)
at 
scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:627)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:380)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:210)
at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:193)
at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at 

Re: unsubscribe

2024-01-17 Thread Крюков Виталий Семенович
unsubscribe


От: Leandro Martelli 
Отправлено: 17 января 2024 г. 1:53:04
Кому: user@spark.apache.org
Тема: unsubscribe

unsubscribe


unsubscribe

2024-01-16 Thread Leandro Martelli
unsubscribe


Unsubscribe

2024-01-13 Thread Andrew Redd
Unsubscribe


Re: [spark.local.dir] comma separated list does not work

2024-01-12 Thread Andrew Petersen
Actually, that did work, thanks.
What I previously tried that did not work was
#BSUB -env "all,SPARK_LOCAL_DIRS=/tmp,/share/,SPARK_PID_DIR=..."

However, I am still getting "No space left on device" errors. It seems that
I need hierarchical directories, and round robin distribution is not good
enough. Any suggestions for getting Spark to write to dir2 when dir1 fails?
Or if round robin can be implemented so that the first task attempt writes
to dir1, but if the 1st attempt fails, the 2nd task attempt is on dir2?

On Fri, Jan 12, 2024 at 10:23 PM Koert Kuipers  wrote:

> try it without spaces?
> export SPARK_LOCAL_DIRS="/tmp,/share/"
>
> On Fri, Jan 12, 2024 at 5:00 PM Andrew Petersen 
> wrote:
>
>> Hello Spark community
>>
>> SPARK_LOCAL_DIRS or
>> spark.local.dir
>> is supposed to accept a list.
>>
>> I want to list one local (fast) drive, followed by a gpfs network drive,
>> similar to what is done here:
>>
>> https://cug.org/proceedings/cug2016_proceedings/includes/files/pap129s2-file1.pdf
>> "Thus it is preferable to bias the data towards faster storage by
>> including multiple directories on the faster devices (e.g., SPARK LOCAL
>> DIRS=/tmp/spark1, /tmp/spark2, /tmp/spark3, /lus/scratch/sparkscratch/)."
>> The purpose of this is to get both benefits of speed and avoiding "out of
>> space" errors.
>>
>> However, for me, Spark is only considering the 1st directory on the list:
>> export SPARK_LOCAL_DIRS="/tmp, /share/"
>>
>> I am using Spark 3.4.1. Does anyone have any experience getting this to
>> work? If so can you suggest a simple example I can try and tell me which
>> version of Spark you are using?
>>
>> Regards
>> Andrew
>>
>>
>>
>>
>> I am trying to use 2 local drives
>>
>> --
>> Andrew Petersen, PhD
>> Advanced Computing, Office of Information Technology
>> 2620 Hillsborough Street
>> datascience.oit.ncsu.edu
>>
>
> CONFIDENTIALITY NOTICE: This electronic communication and any files
> transmitted with it are confidential, privileged and intended solely for
> the use of the individual or entity to whom they are addressed. If you are
> not the intended recipient, you are hereby notified that any disclosure,
> copying, distribution (electronic or otherwise) or forwarding of, or the
> taking of any action in reliance on the contents of this transmission is
> strictly prohibited. Please notify the sender immediately by e-mail if you
> have received this email by mistake and delete this email from your system.
>
> Is it necessary to print this email? If you care about the environment
> like we do, please refrain from printing emails. It helps to keep the
> environment forested and litter-free.



-- 
Andrew Petersen, PhD
Advanced Computing, Office of Information Technology
2620 Hillsborough Street
datascience.oit.ncsu.edu


Re: [spark.local.dir] comma separated list does not work

2024-01-12 Thread Andrew Petersen
Without spaces was the first thing I tried. The information in the pdf file
inspired me to try the space.

On Fri, Jan 12, 2024 at 10:23 PM Koert Kuipers  wrote:

> try it without spaces?
> export SPARK_LOCAL_DIRS="/tmp,/share/"
>
> On Fri, Jan 12, 2024 at 5:00 PM Andrew Petersen 
> wrote:
>
>> Hello Spark community
>>
>> SPARK_LOCAL_DIRS or
>> spark.local.dir
>> is supposed to accept a list.
>>
>> I want to list one local (fast) drive, followed by a gpfs network drive,
>> similar to what is done here:
>>
>> https://cug.org/proceedings/cug2016_proceedings/includes/files/pap129s2-file1.pdf
>> "Thus it is preferable to bias the data towards faster storage by
>> including multiple directories on the faster devices (e.g., SPARK LOCAL
>> DIRS=/tmp/spark1, /tmp/spark2, /tmp/spark3, /lus/scratch/sparkscratch/)."
>> The purpose of this is to get both benefits of speed and avoiding "out of
>> space" errors.
>>
>> However, for me, Spark is only considering the 1st directory on the list:
>> export SPARK_LOCAL_DIRS="/tmp, /share/"
>>
>> I am using Spark 3.4.1. Does anyone have any experience getting this to
>> work? If so can you suggest a simple example I can try and tell me which
>> version of Spark you are using?
>>
>> Regards
>> Andrew
>>
>>
>>
>>
>> I am trying to use 2 local drives
>>
>> --
>> Andrew Petersen, PhD
>> Advanced Computing, Office of Information Technology
>> 2620 Hillsborough Street
>> datascience.oit.ncsu.edu
>>
>
> CONFIDENTIALITY NOTICE: This electronic communication and any files
> transmitted with it are confidential, privileged and intended solely for
> the use of the individual or entity to whom they are addressed. If you are
> not the intended recipient, you are hereby notified that any disclosure,
> copying, distribution (electronic or otherwise) or forwarding of, or the
> taking of any action in reliance on the contents of this transmission is
> strictly prohibited. Please notify the sender immediately by e-mail if you
> have received this email by mistake and delete this email from your system.
>
> Is it necessary to print this email? If you care about the environment
> like we do, please refrain from printing emails. It helps to keep the
> environment forested and litter-free.



-- 
Andrew Petersen, PhD
Advanced Computing, Office of Information Technology
2620 Hillsborough Street
datascience.oit.ncsu.edu


Re: [spark.local.dir] comma separated list does not work

2024-01-12 Thread Koert Kuipers
try it without spaces?
export SPARK_LOCAL_DIRS="/tmp,/share/"

On Fri, Jan 12, 2024 at 5:00 PM Andrew Petersen 
wrote:

> Hello Spark community
>
> SPARK_LOCAL_DIRS or
> spark.local.dir
> is supposed to accept a list.
>
> I want to list one local (fast) drive, followed by a gpfs network drive,
> similar to what is done here:
>
> https://cug.org/proceedings/cug2016_proceedings/includes/files/pap129s2-file1.pdf
> "Thus it is preferable to bias the data towards faster storage by
> including multiple directories on the faster devices (e.g., SPARK LOCAL
> DIRS=/tmp/spark1, /tmp/spark2, /tmp/spark3, /lus/scratch/sparkscratch/)."
> The purpose of this is to get both benefits of speed and avoiding "out of
> space" errors.
>
> However, for me, Spark is only considering the 1st directory on the list:
> export SPARK_LOCAL_DIRS="/tmp, /share/"
>
> I am using Spark 3.4.1. Does anyone have any experience getting this to
> work? If so can you suggest a simple example I can try and tell me which
> version of Spark you are using?
>
> Regards
> Andrew
>
>
>
>
> I am trying to use 2 local drives
>
> --
> Andrew Petersen, PhD
> Advanced Computing, Office of Information Technology
> 2620 Hillsborough Street
> datascience.oit.ncsu.edu
>

-- 
CONFIDENTIALITY NOTICE: This electronic communication and any files 
transmitted with it are confidential, privileged and intended solely for 
the use of the individual or entity to whom they are addressed. If you are 
not the intended recipient, you are hereby notified that any disclosure, 
copying, distribution (electronic or otherwise) or forwarding of, or the 
taking of any action in reliance on the contents of this transmission is 
strictly prohibited. Please notify the sender immediately by e-mail if you 
have received this email by mistake and delete this email from your system.


Is it necessary to print this email? If you care about the environment 
like we do, please refrain from printing emails. It helps to keep the 
environment forested and litter-free.


[spark.local.dir] comma separated list does not work

2024-01-12 Thread Andrew Petersen
Hello Spark community

SPARK_LOCAL_DIRS or
spark.local.dir
is supposed to accept a list.

I want to list one local (fast) drive, followed by a gpfs network drive,
similar to what is done here:
https://cug.org/proceedings/cug2016_proceedings/includes/files/pap129s2-file1.pdf
"Thus it is preferable to bias the data towards faster storage by including
multiple directories on the faster devices (e.g., SPARK LOCAL
DIRS=/tmp/spark1, /tmp/spark2, /tmp/spark3, /lus/scratch/sparkscratch/)."
The purpose of this is to get both benefits of speed and avoiding "out of
space" errors.

However, for me, Spark is only considering the 1st directory on the list:
export SPARK_LOCAL_DIRS="/tmp, /share/"

I am using Spark 3.4.1. Does anyone have any experience getting this to
work? If so can you suggest a simple example I can try and tell me which
version of Spark you are using?

Regards
Andrew




I am trying to use 2 local drives

-- 
Andrew Petersen, PhD
Advanced Computing, Office of Information Technology
2620 Hillsborough Street
datascience.oit.ncsu.edu


[GraphFrames Spark Package]: Why is there not a distribution for Spark 3.3?

2024-01-12 Thread Boileau, Brad
Hello,

I was hoping to use a distribution of GraphFrames for AWS Glue 4 which has 
spark 3.3, but there is no found distribution for Spark 3.3 at this location:

https://spark-packages.org/package/graphframes/graphframes

Do you have any advice on the best compatible version to use for Spark 3.3?

Sincerely,

Brad Boileau
Senior Product Architect / Architecte produit sénior
Farm Credit Canada | Financement agricole Canada
1820 Hamilton Street / 1820, rue Hamilton
Regina SK  S4P 2B8
Tel/Tél. : 306-359, C/M: 306-737-8900
fcc.ca / fac.ca
FCC social media / 
Médias sociaux FAC

[2QA=]


This email, including attachments, is confidential. You may not share this 
email with any third party. If you are not the intended recipient, any 
redistribution or copying of this email is prohibited. If you have received 
this email in error or cannot comply with these restrictions, please delete or 
destroy it entirely and immediately without making a copy and notify us by 
return email.

Ce courriel (y compris toutes les pièces jointes qu'il comporte) est 
confidentiel. Vous ne pouvez pas partager ce courriel avec des tiers. Si vous 
n'êtes pas le destinataire prévu, toute divulgation, reproduction, copie ou 
distribution de ce courriel est strictement interdite. Si vous avez reçu ce 
courriel par erreur ou ne pouvez pas respecter ces restrictions, merci de le 
supprimer ou de le détruire complètement et immédiatement, sans le dupliquer, 
et de nous aviser par retour de courriel.


Unsubscribe from FCC marketing-related 
messages.
 (Customers will still receive messages related to business transactions.)

Se désabonner pour ne plus recevoir de messages liés au marketing de la part de 
FAC.
 (Les clients continueront de recevoir des messages concernant leurs 
transactions.)


Re: [Structured Streaming] Keeping checkpointing cost under control

2024-01-11 Thread Mich Talebzadeh
catching up a bit late on this, I mentioned optimising RockDB as below in
my earlier thread, specifically

  # Add RocksDB configurations here
spark.conf.set("spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.changelog",
"true")

spark.conf.set("spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB",
"64")  # Example configuration

 spark.conf.set("spark.sql.streaming.stateStore.rocksdb.compaction.style",
"level")

spark.conf.set("spark.sql.streaming.stateStore.rocksdb.compaction.level.targetFileSizeBase",
"67108864")

   - Maybe a bit more clarity will be useful, although they can be
   subjective and somehow debatable.


   1.

   spark.sql.streaming.stateStore.rocksdb.changelog - Enable Changelog: -
   Benefit: Changelog is essential for maintaining state consistency and fault
   tolerance. Enabling it ensures that changes are logged and can be replayed
   in case of failures. - Drawback: The changelog can consume additional
   storage, especially when dealing with frequent state updates.
   2.

   spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB - Write Buffer
   Size: - Benefit: A larger write buffer can improve write performance and
   reduce write amplification. - Drawback: It may lead to increased memory
   usage. The optimal size depends on the characteristics of your workload and
   available resources.
   3.

   spark.sql.streaming.stateStore.rocksdb.compaction.style - Compaction
   Style: - Benefit: Choosing an appropriate compaction style (e.g., level)
   can impact read and write performance. - Drawback: Different compaction
   styles have trade-offs, and the optimal choice depends on factors like read
   vs. write workload and available storage.
   4.

   spark.sql.streaming.stateStore.rocksdb.compaction.level.targetFileSizeBase
   - Target File Size for Level-based Compaction: ** There is another
   compaction style called "uniform" which can be beneficial in scenarios
   with a heavy write workload **


   - Borrowing from colloquial English, your mileage varies as usual.
   However, since you said you may consider tuning RockDB, I add the following
   if I may.
   - The choice of RocksDB and its configurations is often dependent on the
   specific requirements and characteristics of your streaming application.
   RocksDB can provide good performance for stateful streaming applications
   with proper tuning. However memory usage is a crucial consideration,
   especially when dealing with large state sizes. Also note that changelog is
   essential for achieving fault tolerance but may introduce additional
   storage overhead.
   - The optimal configuration depends on factors like the volume and
   characteristics of the streaming data, the frequency of state updates, and
   available resources. In summary, what works well for one workload might
   not be optimal for another.

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 10 Jan 2024 at 15:42, Andrzej Zera  wrote:

> Yes, I agree. But apart from maintaining this state internally (in memory
> or in memory+disk as in case of RocksDB), every trigger it saves some
> information about this state in a checkpoint location. I'm afraid we can't
> do much about this checkpointing operation. I'll continue looking for
> information on how I can decrease the number of LIST requests (ListBucket
> operations) made in this process.
>
> Thank you for your input so far!
> Andrzej
>
> śr., 10 sty 2024 o 16:33 Mich Talebzadeh 
> napisał(a):
>
>> Hi,
>>
>> You may have a point on scenario 2.
>>
>> Caching Streaming DataFrames: In Spark Streaming, each batch of data is
>> processed incrementally, and it may not fit the typical caching we
>> discussed. Instead, Spark Streaming has its mechanisms to manage and
>> optimize the processing of streaming data. Case in point for caching
>> partial results, one often relies on maintaining state by using stateful
>> operations (see below) on Structured Streaming DataFrames. In such
>> scenarios, Spark maintains state internally based on the operations
>> performed. For example, if you are doing a groupBy followed by an
>> aggregation, Spark Streaming will manage the state of the keys and update
>> them incrementally.
>>
>> Just to clarify, in the context of Spark Structured Streaming stateful
>> operation refers to an operation that 

Re: Structured Streaming Process Each Records Individually

2024-01-11 Thread Mich Talebzadeh
Hi,

Let us visit the approach as some fellow members correctly highlighted the
use case for spark structured streaming and two key concepts that I will
mention


   - foreach: A method for applying custom write logic to each individual
   row in a streaming DataFrame or Dataset.
   - foreachBatch: A method for applying custom write logic to entire
   micro-batches of data, providing more flexibility for complex operations.
   - sendToSink (my chosen name here,  Custom Logic ) : A user-defined
   function that encapsulates the logic for writing a micro-batch to a sink
   (In my case Google BigQuery DW)


Let us create a pseudo code (in Python for sendToSink function used in f
oreachBatch(SendToBSink)

def sendToSink(df, batchId):

if len(df.take(1)) > 0:  # Check for empty DataFrame
try:
# Extract table names from the "@table" column
table_names = df.select("@table").rdd.flatMap(lambda row:
row).collect()

# Iterate through each table name
for table_name in table_names:
# Filter the DataFrame for rows belonging to the current
table
table_df = df.filter(col("@table") == table_name)

# Handle nested structures for specific tables
if table_name in ["product_zones", "product_devices"]:
# Extract nested data (e.g., "zones" or "device"
columns)
nested_data = table_df.select("zones",
"device").rdd.flatMap(lambda row: row)

# Create a separate DataFrame for nested data
nested_df = spark.createDataFrame(nested_data,
schema=nested_data.first().asDict())

# Write nested DataFrame to its corresponding table
write_to_sink(nested_df, table_name)

# Write the main DataFrame to its table
write_to_sink(table_df, table_name)

except Exception as e:
# Log errors gracefully
log_error(f"Error processing table {table_name}: {e}")

else:
print("DataFrame is empty")  # Handle empty DataFrame

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 10 Jan 2024 at 18:51, PRASHANT L  wrote:

> Hi
> I have a use case where I need to process json payloads coming from Kafka
> using structured streaming , but thing is json can have different formats ,
> schema is not fixed
> and each json will have a @type tag so based on tag , json has to be
> parsed and loaded to table with tag name  , and if a json has nested sub
> tags , those tags shd go to different table
> so I need to process each json record individually , and determine
> destination tables what would be the best approach
>
>
>> *{*
>> *"os": "andriod",*
>> *"type": "mobile",*
>> *"device": {*
>> *"warrenty": "3 years",*
>> *"replace": "yes"*
>> *},*
>> *"zones": [*
>> *{*
>> *"city": "Bangalore",*
>> *"state": "KA",*
>> *"pin": "577401"*
>> *},*
>> *{*
>> *"city": "Mumbai",*
>> *"state": "MH",*
>> *"pin": "576003"*
>> *}*
>> *],*
>> *"@table": "product"**}*
>
>
> so for the above json , there are 3 tables created
> 1. Product (@type) THis is a parent table
> 2.  poduct_zones and product_devices , child table
>


Best option to process single kafka stream in parallel: PySpark Vs Dask

2024-01-11 Thread lab22
I am creating a setup to process packets from single kafta topic in
parallel. For example, I have 3 containers (let's take 4 cores) on one vm,
and from 1 kafka topic stream I create 10 jobs depending on packet source.
These packets have small workload.

   1.

   I can install dask in each container, and execute set of tasks in
   parallel. So, each container with 4 cores can execute 4 jobs. This will
   increase computation and scheduling overhead in execution time.
   2.

   I can have a stand-alone spark app in each container, and execute set of
   tasks in parallel. Since packets are small, I won't be able to harness
   Spark's distributed computing power.
   3.

   I can have a spark master node, and distribute set of tasks in worker
   container, and there execute 1 task per core. Is such scheduling in Spark
   possible?

I have not yet done POC to compute resource cose, computation overhead etc.
So, I can use your opinion.

Which is the best solution for my use-case? I am interested in stand-alone
spark app, but using it just for parallelism, is it an efficient solution?

I explained all in details.


Re: [Structured Streaming] Keeping checkpointing cost under control

2024-01-11 Thread Jungtaek Lim
If you use RocksDB state store provider, you can turn on changelog
checkpoint to put the single changelog file per partition per batch. With
disabling changelog checkpoint, Spark uploads newly created SST files and
some log files. If compaction had happened, most SST files have to be
re-uploaded. Using changelog checkpoint would upload the snapshot a lot
less frequently, and actually better latency on committing.

On Sat, Jan 6, 2024 at 7:40 PM Andrzej Zera  wrote:

> Hey,
>
> I'm running a few Structured Streaming jobs (with Spark 3.5.0) that
> require near-real time accuracy with trigger intervals in the level of 5-10
> seconds. I usually run 3-6 streaming queries as part of the job and each
> query includes at least one stateful operation (and usually two or more).
> My checkpoint location is S3 bucket and I use RocksDB as a state store.
> Unfortunately, checkpointing costs are quite high. It's the main cost item
> of the system and it's roughly 4-5 times the cost of compute.
>
> To save on compute costs, the following things are usually recommended:
>
>- increase trigger interval (as mentioned, I don't have much space
>here)
>- decrease the number of shuffle partitions (I have 2x the number of
>workers)
>
> I'm looking for some other recommendations that I can use to save on
> checkpointing costs. I saw that most requests are LIST requests. Can we cut
> them down somehow? I'm using Databricks. If I replace S3 bucket with DBFS,
> will it help in any way?
>
> Thank you!
> Andrzej
>
>


Re: [Structured Streaming] Avoid one microbatch delay with multiple stateful operations

2024-01-11 Thread Jungtaek Lim
Hi,

The time window is closed and evicted as long as "eviction watermark"
passes the end of the window. Late events watermark only deals with
discarding late events from "inputs". We did not introduce additional delay
on the work of multiple stateful operators. We just allowed more late
events to be accepted.

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Thu, Jan 11, 2024 at 6:13 AM Andrzej Zera  wrote:

> I'm struggling with the following issue in Spark >=3.4, related to
> multiple stateful operations.
>
> When spark.sql.streaming.statefulOperator.allowMultiple is enabled, Spark
> keeps track of two types of watermarks: eventTimeWatermarkForEviction and
> eventTimeWatermarkForLateEvents. Introducing them allowed chaining
> multiple stateful operations but also introduced an additional delay for
> getting the output out of the streaming query.
>
> I'll show this on the example. Assume we have a stream of click events and
> we aggregate it first by 1-min window and then by 5-min window. If we have
> a trigger interval of 30s, then in most cases we'll get output 30s later
> compared to single stateful operations queries. To find out how, let's look
> at the following examples:
>
> Example 1. Single stateful operation (aggregation by 5-min window, assume
> watermark is 0 seconds)
>
> Wall clock
> (microbatch processing starts) Max event timestamp
> at the time of getting data from Kafka
> Global watermark Output
> 14:10:00 14:09:56 0 -
> 14:10:30 14:10:26 14:09:56 -
> 14:11:00 14:10:56 14:10:26 window <14:05, 14:10)
>
> Example 2. Mutliple stateful operations (aggregation by 1-min window
> followed by aggregation by 5-min window, assume watermark is 0 seconds)
>
> Wall clock
> (microbatch processing starts) Max event timestamp at the time of getting
> data from Kafka Late events watermark Eviction watermark Output
> 14:10:00 14:09:56 0 0 -
> 14:10:30 14:10:26 0 14:09:56 -
> 14:11:00 14:10:56 14:09:56 14:10:26 -
> 14:11:30 14:11:26 14:10:26 14:10:56 window <14:05, 14:10)
>
> In Example 2, we need to wait until both watermarks cross the end of the
> window to get the output for that window, which happens one iteration later
> compared to Example 1.
>
> Now, in use cases that require near-real-time processing, this one
> iteration delay can be quite a significant difference.
>
> Do we have any option to make streaming queries with multiple stateful
> operations output data without waiting this extra iteration? One of my
> ideas was to force an empty microbatch to run and propagate late events
> watermark without any new data. While this conceptually works, I didn't
> find a way to trigger an empty microbatch while being connected to Kafka
> that constantly receives new data and while having a constant 30s trigger
> interval.
>
> Thanks,
> Andrzej
>


Re: Okio Vulnerability in Spark 3.4.1

2024-01-11 Thread Bjørn Jørgensen
[SPARK-46662][K8S][BUILD] Upgrade kubernetes-client to 6.10.0
 a new version of
kubernets-client with okio version 1.17.6 is now merged to master and will
be in the spark 4.0 version.

tir. 14. nov. 2023 kl. 15:21 skrev Bjørn Jørgensen :

> FYI
> I have opened Update okio to version 1.17.6
>  for this now.
>
> tor. 31. aug. 2023 kl. 21:18 skrev Sean Owen :
>
>> It's a dependency of some other HTTP library. Use mvn dependency:tree to
>> see where it comes from. It may be more straightforward to upgrade the
>> library that brings it in, assuming a later version brings in a later okio.
>> You can also manage up the version directly with a new entry in
>> 
>>
>> However, does this affect Spark? all else equal it doesn't hurt to
>> upgrade, but wondering if there is even a theory that it needs to be
>> updated.
>>
>>
>> On Thu, Aug 31, 2023 at 7:42 AM Agrawal, Sanket <
>> sankeagra...@deloitte.com> wrote:
>>
>>> I don’t see an entry in pom.xml while building spark. I think it is
>>> being downloaded as part of some other dependency.
>>>
>>>
>>>
>>> *From:* Sean Owen 
>>> *Sent:* Thursday, August 31, 2023 5:10 PM
>>> *To:* Agrawal, Sanket 
>>> *Cc:* user@spark.apache.org
>>> *Subject:* [EXT] Re: Okio Vulnerability in Spark 3.4.1
>>>
>>>
>>>
>>> Does the vulnerability affect Spark?
>>>
>>> In any event, have you tried updating Okio in the Spark build? I don't
>>> believe you could just replace the JAR, as other libraries probably rely on
>>> it and compiled against the current version.
>>>
>>>
>>>
>>> On Thu, Aug 31, 2023 at 6:02 AM Agrawal, Sanket <
>>> sankeagra...@deloitte.com.invalid> wrote:
>>>
>>> Hi All,
>>>
>>>
>>>
>>> Amazon inspector has detected a vulnerability in okio-1.15.0.jar JAR in
>>> Spark 3.4.1. It suggests to upgrade the jar version to 3.4.0. But when we
>>> try this version of jar then the spark application is failing with below
>>> error:
>>>
>>>
>>>
>>> py4j.protocol.Py4JJavaError: An error occurred while calling
>>> None.org.apache.spark.api.java.JavaSparkContext.
>>>
>>> : java.lang.NoClassDefFoundError: okio/BufferedSource
>>>
>>> at okhttp3.internal.Util.(Util.java:62)
>>>
>>> at okhttp3.OkHttpClient.(OkHttpClient.java:127)
>>>
>>> at okhttp3.OkHttpClient$Builder.(OkHttpClient.java:475)
>>>
>>> at
>>> io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory.newOkHttpClientBuilder(OkHttpClientFactory.java:41)
>>>
>>> at
>>> io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory.newBuilder(OkHttpClientFactory.java:56)
>>>
>>> at
>>> io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory.newBuilder(OkHttpClientFactory.java:68)
>>>
>>> at
>>> io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory.newBuilder(OkHttpClientFactory.java:30)
>>>
>>> at
>>> io.fabric8.kubernetes.client.KubernetesClientBuilder.getHttpClient(KubernetesClientBuilder.java:88)
>>>
>>> at
>>> io.fabric8.kubernetes.client.KubernetesClientBuilder.build(KubernetesClientBuilder.java:78)
>>>
>>> at
>>> org.apache.spark.deploy.k8s.SparkKubernetesClientFactory$.createKubernetesClient(SparkKubernetesClientFactory.scala:120)
>>>
>>> at
>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(KubernetesClusterManager.scala:111)
>>>
>>> at
>>> org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:3037)
>>>
>>> at org.apache.spark.SparkContext.(SparkContext.scala:568)
>>>
>>> at
>>> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:58)
>>>
>>> at
>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>> Method)
>>>
>>> at
>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown
>>> Source)
>>>
>>> at
>>> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown
>>> Source)
>>>
>>> at java.base/java.lang.reflect.Constructor.newInstance(Unknown
>>> Source)
>>>
>>> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
>>>
>>> at
>>> py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
>>>
>>> at py4j.Gateway.invoke(Gateway.java:238)
>>>
>>> at
>>> py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
>>>
>>> at
>>> py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
>>>
>>> at
>>> py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
>>>
>>> at
>>> py4j.ClientServerConnection.run(ClientServerConnection.java:106)
>>>
>>> at java.base/java.lang.Thread.run(Unknown Source)
>>>
>>> Caused by: java.lang.ClassNotFoundException: okio.BufferedSource
>>>
>>> at
>>> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source)
>>>
>>> at
>>> 

Re: [Structured Streaming] Avoid one microbatch delay with multiple stateful operations

2024-01-10 Thread Ant Kutschera
Hi

*Do we have any option to make streaming queries with multiple stateful
operations output data without waiting this extra iteration? One of my
ideas was to force an empty microbatch to run and propagate late events
watermark without any new data. While this conceptually works, I didn't
find a way to trigger an empty microbatch while being connected to Kafka
that constantly receives new data and while having a constant 30s trigger
interval.*

Not sure if this helps or works, but in a Kafka streaming Api solution
without Spark that I did a few years ago, we used artificial events
published once a second to ensure that windows were closed because by
design Kafka streaming only closes windows when events are flowing.  So you
could artificially trigger an 'empty' microbatch because it would contain
only artificial events, which you can of course filter out in the
microbatch processing.




On Thu, 11 Jan 2024, 00:26 Andrzej Zera,  wrote:

> I'm struggling with the following issue in Spark >=3.4, related to
> multiple stateful operations.
>
> When spark.sql.streaming.statefulOperator.allowMultiple is enabled, Spark
> keeps track of two types of watermarks: eventTimeWatermarkForEviction and
> eventTimeWatermarkForLateEvents. Introducing them allowed chaining
> multiple stateful operations but also introduced an additional delay for
> getting the output out of the streaming query.
>
> I'll show this on the example. Assume we have a stream of click events and
> we aggregate it first by 1-min window and then by 5-min window. If we have
> a trigger interval of 30s, then in most cases we'll get output 30s later
> compared to single stateful operations queries. To find out how, let's look
> at the following examples:
>
> Example 1. Single stateful operation (aggregation by 5-min window, assume
> watermark is 0 seconds)
>
> Wall clock
> (microbatch processing starts) Max event timestamp
> at the time of getting data from Kafka
> Global watermark Output
> 14:10:00 14:09:56 0 -
> 14:10:30 14:10:26 14:09:56 -
> 14:11:00 14:10:56 14:10:26 window <14:05, 14:10)
>
> Example 2. Mutliple stateful operations (aggregation by 1-min window
> followed by aggregation by 5-min window, assume watermark is 0 seconds)
>
> Wall clock
> (microbatch processing starts) Max event timestamp at the time of getting
> data from Kafka Late events watermark Eviction watermark Output
> 14:10:00 14:09:56 0 0 -
> 14:10:30 14:10:26 0 14:09:56 -
> 14:11:00 14:10:56 14:09:56 14:10:26 -
> 14:11:30 14:11:26 14:10:26 14:10:56 window <14:05, 14:10)
>
> In Example 2, we need to wait until both watermarks cross the end of the
> window to get the output for that window, which happens one iteration later
> compared to Example 1.
>
> Now, in use cases that require near-real-time processing, this one
> iteration delay can be quite a significant difference.
>
> Do we have any option to make streaming queries with multiple stateful
> operations output data without waiting this extra iteration? One of my
> ideas was to force an empty microbatch to run and propagate late events
> watermark without any new data. While this conceptually works, I didn't
> find a way to trigger an empty microbatch while being connected to Kafka
> that constantly receives new data and while having a constant 30s trigger
> interval.
>
> Thanks,
> Andrzej
>


Re: Structured Streaming Process Each Records Individually

2024-01-10 Thread Ant Kutschera
It might be good to first split the stream up into smaller streams, one per
type.  If ordering of the Kafka records is important, then you could
partition them at the source based on the type, but be careful how you
configure Spark to read from Kafka as that could also influence ordering.

kdf = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafka_bootstrap_servers)
...
.option("includeHeaders", "true")
.option("startingOffsets", "earliest")  # see below where we
trigger with availableNow and checkpoint
.option("subscribe", "topicName")  # the name of the topic
.load())

kdf = kdf.selectExpr("CAST(key AS STRING)",
 "CAST(value AS STRING)",
 "headers",
 "CAST(topic AS STRING)",
 "CAST(partition AS STRING)",
 "CAST(offset AS STRING)")

kdf_one = kdf.filter(kdf.type == 'one')
kdf_two = kdf.filter(kdf.type == 'two')
kdf_three = kdf.filter(kdf.type == 'three')

then transform each one as you need to:

kdf_one = kdf.transform(prepare_for_database_one)

and start each DataFrame and use foreachBatch to store the data in the DB:

(kdf_one
   .writeStream
   .queryName("one")
   .foreachBatch(saveastable_one)
   .trigger(availableNow=True)
   .option("checkpointLocation", "s3a://checkpointlocation/")  #
very important to be on writeStream!
   .start()
   .awaitTermination())


On Wed, 10 Jan 2024 at 21:01, Khalid Mammadov 
wrote:

> Use foreachBatch or foreach methods:
>
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch
>
> On Wed, 10 Jan 2024, 17:42 PRASHANT L,  wrote:
>
>> Hi
>> I have a use case where I need to process json payloads coming from Kafka
>> using structured streaming , but thing is json can have different formats ,
>> schema is not fixed
>> and each json will have a @type tag so based on tag , json has to be
>> parsed and loaded to table with tag name  , and if a json has nested sub
>> tags , those tags shd go to different table
>> so I need to process each json record individually , and determine
>> destination tables what would be the best approach
>>
>>
>>> *{*
>>> *"os": "andriod",*
>>> *"type": "mobile",*
>>> *"device": {*
>>> *"warrenty": "3 years",*
>>> *"replace": "yes"*
>>> *},*
>>> *"zones": [*
>>> *{*
>>> *"city": "Bangalore",*
>>> *"state": "KA",*
>>> *"pin": "577401"*
>>> *},*
>>> *{*
>>> *"city": "Mumbai",*
>>> *"state": "MH",*
>>> *"pin": "576003"*
>>> *}*
>>> *],*
>>> *"@table": "product"**}*
>>
>>
>> so for the above json , there are 3 tables created
>> 1. Product (@type) THis is a parent table
>> 2.  poduct_zones and product_devices , child table
>>
>

-- 
___
Dr Ant Kutschera


Re: Structured Streaming Process Each Records Individually

2024-01-10 Thread Mich Talebzadeh
Use an intermediate work table to put json data streaming in there  in the
first place and then according to the tag store the data in the correct
table

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 10 Jan 2024 at 18:51, PRASHANT L  wrote:

> Hi
> I have a use case where I need to process json payloads coming from Kafka
> using structured streaming , but thing is json can have different formats ,
> schema is not fixed
> and each json will have a @type tag so based on tag , json has to be
> parsed and loaded to table with tag name  , and if a json has nested sub
> tags , those tags shd go to different table
> so I need to process each json record individually , and determine
> destination tables what would be the best approach
>
>
>> *{*
>> *"os": "andriod",*
>> *"type": "mobile",*
>> *"device": {*
>> *"warrenty": "3 years",*
>> *"replace": "yes"*
>> *},*
>> *"zones": [*
>> *{*
>> *"city": "Bangalore",*
>> *"state": "KA",*
>> *"pin": "577401"*
>> *},*
>> *{*
>> *"city": "Mumbai",*
>> *"state": "MH",*
>> *"pin": "576003"*
>> *}*
>> *],*
>> *"@table": "product"**}*
>
>
> so for the above json , there are 3 tables created
> 1. Product (@type) THis is a parent table
> 2.  poduct_zones and product_devices , child table
>


Re: Structured Streaming Process Each Records Individually

2024-01-10 Thread Khalid Mammadov
Use foreachBatch or foreach methods:
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch

On Wed, 10 Jan 2024, 17:42 PRASHANT L,  wrote:

> Hi
> I have a use case where I need to process json payloads coming from Kafka
> using structured streaming , but thing is json can have different formats ,
> schema is not fixed
> and each json will have a @type tag so based on tag , json has to be
> parsed and loaded to table with tag name  , and if a json has nested sub
> tags , those tags shd go to different table
> so I need to process each json record individually , and determine
> destination tables what would be the best approach
>
>
>> *{*
>> *"os": "andriod",*
>> *"type": "mobile",*
>> *"device": {*
>> *"warrenty": "3 years",*
>> *"replace": "yes"*
>> *},*
>> *"zones": [*
>> *{*
>> *"city": "Bangalore",*
>> *"state": "KA",*
>> *"pin": "577401"*
>> *},*
>> *{*
>> *"city": "Mumbai",*
>> *"state": "MH",*
>> *"pin": "576003"*
>> *}*
>> *],*
>> *"@table": "product"**}*
>
>
> so for the above json , there are 3 tables created
> 1. Product (@type) THis is a parent table
> 2.  poduct_zones and product_devices , child table
>


Structured Streaming Process Each Records Individually

2024-01-10 Thread PRASHANT L
Hi
I have a use case where I need to process json payloads coming from Kafka
using structured streaming , but thing is json can have different formats ,
schema is not fixed
and each json will have a @type tag so based on tag , json has to be parsed
and loaded to table with tag name  , and if a json has nested sub tags ,
those tags shd go to different table
so I need to process each json record individually , and determine
destination tables what would be the best approach


> *{*
> *"os": "andriod",*
> *"type": "mobile",*
> *"device": {*
> *"warrenty": "3 years",*
> *"replace": "yes"*
> *},*
> *"zones": [*
> *{*
> *"city": "Bangalore",*
> *"state": "KA",*
> *"pin": "577401"*
> *},*
> *{*
> *"city": "Mumbai",*
> *"state": "MH",*
> *"pin": "576003"*
> *}*
> *],*
> *"@table": "product"**}*


so for the above json , there are 3 tables created
1. Product (@type) THis is a parent table
2.  poduct_zones and product_devices , child table


Re: [Structured Streaming] Keeping checkpointing cost under control

2024-01-10 Thread Andrzej Zera
Yes, I agree. But apart from maintaining this state internally (in memory
or in memory+disk as in case of RocksDB), every trigger it saves some
information about this state in a checkpoint location. I'm afraid we can't
do much about this checkpointing operation. I'll continue looking for
information on how I can decrease the number of LIST requests (ListBucket
operations) made in this process.

Thank you for your input so far!
Andrzej

śr., 10 sty 2024 o 16:33 Mich Talebzadeh 
napisał(a):

> Hi,
>
> You may have a point on scenario 2.
>
> Caching Streaming DataFrames: In Spark Streaming, each batch of data is
> processed incrementally, and it may not fit the typical caching we
> discussed. Instead, Spark Streaming has its mechanisms to manage and
> optimize the processing of streaming data. Case in point for caching
> partial results, one often relies on maintaining state by using stateful
> operations (see below) on Structured Streaming DataFrames. In such
> scenarios, Spark maintains state internally based on the operations
> performed. For example, if you are doing a groupBy followed by an
> aggregation, Spark Streaming will manage the state of the keys and update
> them incrementally.
>
> Just to clarify, in the context of Spark Structured Streaming stateful
> operation refers to an operation that maintains and updates some form of
> state across batches of streaming data. Unlike stateless operations, which
> process each batch independently, stateful operations retain information
> from previous batches and use it to produce results for the current batch.
>
> So, bottom line, while one may not explicitly cache a streaming data
> frame, Spark internally optimizes the processing by maintaining the
> necessary state.
> HTH
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 10 Jan 2024 at 14:20, Andrzej Zera  wrote:
>
>> Hey,
>>
>> Yes, that's how I understood it (scenario 1). However, I'm not sure if
>> scenario 2 is possible. I think cache on streaming DataFrame is supported
>> only in forEachBatch (in which it's actually no longer a streaming DF).
>>
>> śr., 10 sty 2024 o 15:01 Mich Talebzadeh 
>> napisał(a):
>>
>>> Hi,
>>>
>>>  With regard to your point
>>>
>>> - Caching: Can you please explain what you mean by caching? I know that
>>> when you have batch and streaming sources in a streaming query, then you
>>> can try to cache batch ones to save on reads. But I'm not sure if it's what
>>> you mean, and I don't know how to apply what you suggest to streaming data.
>>>
>>> Let us visit this
>>>
>>> Caching purpose in Structured Streaming is to store frequently accessed
>>> data in memory or disk for faster retrieval, reducing repeated reads from
>>> sources.
>>>
>>> - Types:
>>>
>>>- Memory Caching: Stores data in memory for extremely fast access.
>>>- Disk Caching: Stores data on disk for larger datasets or
>>>persistence across triggers
>>>
>>>
>>> - Scenarios:
>>>
>>> Joining Streaming Data with Static Data: Cache static datasets
>>> (e.g., reference tables) to avoid repeated reads for each micro-batch.
>>>
>>>-
>>>- Reusing Intermediate Results: Cache intermediate dataframes that
>>>are expensive to compute and used multiple times within the query.
>>>- Window Operations: Cache data within a window to avoid re-reading
>>>for subsequent aggregations or calculations within that window.
>>>
>>> - Benefits:
>>>
>>>- Performance: Faster query execution by reducing I/O operations and
>>>computation overhead.
>>>- Cost Optimization: Reduced reads from external sources can lower
>>>costs, especially for cloud-based sources.
>>>- Scalability: Helps handle higher data volumes and throughput by
>>>minimizing expensive re-computations.
>>>
>>>
>>> Example codec
>>>
>>> scenario 1
>>>
>>> static_data = spark.read.load("path/to/static/data")
>>> static_data.cache() streaming_data = spark.readStream.format("...").load()
>>> joined_data = streaming_data.join(static_data, ...) # Static data is
>>> cached for efficient joins
>>>
>>> scenario 2
>>>
>>> intermediate_df = streaming_data.groupBy(...).count()
>>> intermediate_df.cache()
>>> # Use cached intermediate_df for further transformations or actions
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Dad | Technologist | Solutions Architect | Engineer
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> 

Re: [Structured Streaming] Keeping checkpointing cost under control

2024-01-10 Thread Mich Talebzadeh
Hi,

You may have a point on scenario 2.

Caching Streaming DataFrames: In Spark Streaming, each batch of data is
processed incrementally, and it may not fit the typical caching we
discussed. Instead, Spark Streaming has its mechanisms to manage and
optimize the processing of streaming data. Case in point for caching
partial results, one often relies on maintaining state by using stateful
operations (see below) on Structured Streaming DataFrames. In such
scenarios, Spark maintains state internally based on the operations
performed. For example, if you are doing a groupBy followed by an
aggregation, Spark Streaming will manage the state of the keys and update
them incrementally.

Just to clarify, in the context of Spark Structured Streaming stateful
operation refers to an operation that maintains and updates some form of
state across batches of streaming data. Unlike stateless operations, which
process each batch independently, stateful operations retain information
from previous batches and use it to produce results for the current batch.

So, bottom line, while one may not explicitly cache a streaming data frame,
Spark internally optimizes the processing by maintaining the necessary
state.
HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 10 Jan 2024 at 14:20, Andrzej Zera  wrote:

> Hey,
>
> Yes, that's how I understood it (scenario 1). However, I'm not sure if
> scenario 2 is possible. I think cache on streaming DataFrame is supported
> only in forEachBatch (in which it's actually no longer a streaming DF).
>
> śr., 10 sty 2024 o 15:01 Mich Talebzadeh 
> napisał(a):
>
>> Hi,
>>
>>  With regard to your point
>>
>> - Caching: Can you please explain what you mean by caching? I know that
>> when you have batch and streaming sources in a streaming query, then you
>> can try to cache batch ones to save on reads. But I'm not sure if it's what
>> you mean, and I don't know how to apply what you suggest to streaming data.
>>
>> Let us visit this
>>
>> Caching purpose in Structured Streaming is to store frequently accessed
>> data in memory or disk for faster retrieval, reducing repeated reads from
>> sources.
>>
>> - Types:
>>
>>- Memory Caching: Stores data in memory for extremely fast access.
>>- Disk Caching: Stores data on disk for larger datasets or
>>persistence across triggers
>>
>>
>> - Scenarios:
>>
>> Joining Streaming Data with Static Data: Cache static datasets
>> (e.g., reference tables) to avoid repeated reads for each micro-batch.
>>
>>-
>>- Reusing Intermediate Results: Cache intermediate dataframes that
>>are expensive to compute and used multiple times within the query.
>>- Window Operations: Cache data within a window to avoid re-reading
>>for subsequent aggregations or calculations within that window.
>>
>> - Benefits:
>>
>>- Performance: Faster query execution by reducing I/O operations and
>>computation overhead.
>>- Cost Optimization: Reduced reads from external sources can lower
>>costs, especially for cloud-based sources.
>>- Scalability: Helps handle higher data volumes and throughput by
>>minimizing expensive re-computations.
>>
>>
>> Example codec
>>
>> scenario 1
>>
>> static_data = spark.read.load("path/to/static/data") static_data.cache()
>> streaming_data = spark.readStream.format("...").load() joined_data =
>> streaming_data.join(static_data, ...) # Static data is cached for
>> efficient joins
>>
>> scenario 2
>>
>> intermediate_df = streaming_data.groupBy(...).count()
>> intermediate_df.cache()
>> # Use cached intermediate_df for further transformations or actions
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Dad | Technologist | Solutions Architect | Engineer
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Wed, 10 Jan 2024 at 13:10, Andrzej Zera  wrote:
>>
>>> Thank you very much for your suggestions. Yes, my main concern is
>>> checkpointing costs.
>>>
>>> I went through your suggestions and here're my comments:
>>>

[Structured Streaming] Avoid one microbatch delay with multiple stateful operations

2024-01-10 Thread Andrzej Zera
I'm struggling with the following issue in Spark >=3.4, related to multiple
stateful operations.

When spark.sql.streaming.statefulOperator.allowMultiple is enabled, Spark
keeps track of two types of watermarks: eventTimeWatermarkForEviction and
eventTimeWatermarkForLateEvents. Introducing them allowed chaining multiple
stateful operations but also introduced an additional delay for getting the
output out of the streaming query.

I'll show this on the example. Assume we have a stream of click events and
we aggregate it first by 1-min window and then by 5-min window. If we have
a trigger interval of 30s, then in most cases we'll get output 30s later
compared to single stateful operations queries. To find out how, let's look
at the following examples:

Example 1. Single stateful operation (aggregation by 5-min window, assume
watermark is 0 seconds)

Wall clock
(microbatch processing starts) Max event timestamp
at the time of getting data from Kafka
Global watermark Output
14:10:00 14:09:56 0 -
14:10:30 14:10:26 14:09:56 -
14:11:00 14:10:56 14:10:26 window <14:05, 14:10)

Example 2. Mutliple stateful operations (aggregation by 1-min window
followed by aggregation by 5-min window, assume watermark is 0 seconds)

Wall clock
(microbatch processing starts) Max event timestamp at the time of getting
data from Kafka Late events watermark Eviction watermark Output
14:10:00 14:09:56 0 0 -
14:10:30 14:10:26 0 14:09:56 -
14:11:00 14:10:56 14:09:56 14:10:26 -
14:11:30 14:11:26 14:10:26 14:10:56 window <14:05, 14:10)

In Example 2, we need to wait until both watermarks cross the end of the
window to get the output for that window, which happens one iteration later
compared to Example 1.

Now, in use cases that require near-real-time processing, this one
iteration delay can be quite a significant difference.

Do we have any option to make streaming queries with multiple stateful
operations output data without waiting this extra iteration? One of my
ideas was to force an empty microbatch to run and propagate late events
watermark without any new data. While this conceptually works, I didn't
find a way to trigger an empty microbatch while being connected to Kafka
that constantly receives new data and while having a constant 30s trigger
interval.

Thanks,
Andrzej


Re: [Structured Streaming] Keeping checkpointing cost under control

2024-01-10 Thread Andrzej Zera
Hey,

Yes, that's how I understood it (scenario 1). However, I'm not sure if
scenario 2 is possible. I think cache on streaming DataFrame is supported
only in forEachBatch (in which it's actually no longer a streaming DF).

śr., 10 sty 2024 o 15:01 Mich Talebzadeh 
napisał(a):

> Hi,
>
>  With regard to your point
>
> - Caching: Can you please explain what you mean by caching? I know that
> when you have batch and streaming sources in a streaming query, then you
> can try to cache batch ones to save on reads. But I'm not sure if it's what
> you mean, and I don't know how to apply what you suggest to streaming data.
>
> Let us visit this
>
> Caching purpose in Structured Streaming is to store frequently accessed
> data in memory or disk for faster retrieval, reducing repeated reads from
> sources.
>
> - Types:
>
>- Memory Caching: Stores data in memory for extremely fast access.
>- Disk Caching: Stores data on disk for larger datasets or persistence
>across triggers
>
>
> - Scenarios:
>
> Joining Streaming Data with Static Data: Cache static datasets
> (e.g., reference tables) to avoid repeated reads for each micro-batch.
>
>-
>- Reusing Intermediate Results: Cache intermediate dataframes that are
>expensive to compute and used multiple times within the query.
>- Window Operations: Cache data within a window to avoid re-reading
>for subsequent aggregations or calculations within that window.
>
> - Benefits:
>
>- Performance: Faster query execution by reducing I/O operations and
>computation overhead.
>- Cost Optimization: Reduced reads from external sources can lower
>costs, especially for cloud-based sources.
>- Scalability: Helps handle higher data volumes and throughput by
>minimizing expensive re-computations.
>
>
> Example codec
>
> scenario 1
>
> static_data = spark.read.load("path/to/static/data") static_data.cache()
> streaming_data = spark.readStream.format("...").load() joined_data =
> streaming_data.join(static_data, ...) # Static data is cached for
> efficient joins
>
> scenario 2
>
> intermediate_df = streaming_data.groupBy(...).count()
> intermediate_df.cache()
> # Use cached intermediate_df for further transformations or actions
>
> HTH
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 10 Jan 2024 at 13:10, Andrzej Zera  wrote:
>
>> Thank you very much for your suggestions. Yes, my main concern is
>> checkpointing costs.
>>
>> I went through your suggestions and here're my comments:
>>
>> - Caching: Can you please explain what you mean by caching? I know that
>> when you have batch and streaming sources in a streaming query, then you
>> can try to cache batch ones to save on reads. But I'm not sure if it's what
>> you mean, and I don't know how to apply what you suggest to streaming data.
>>
>> - Optimize Checkpointing Frequency: I'm already using changelog
>> checkpointing with RocksDB and increased trigger interval to a maximum
>> acceptable value.
>>
>> - Minimize LIST Request: That's where I can get most savings. My LIST
>> requests account for ~70% of checkpointing costs. From what I see, LIST
>> requests are ~2.5x the number of PUT requests. Unfortunately, when I
>> changed to checkpoting location DBFS, it didn't help with minimizing LIST
>> requests. They are roughly at the same level. From what I see, S3 Optimized
>> Committer is EMR-specific so I can't use it in Databricks. The fact that I
>> don't see a difference between S3 and DBFS checkpoint location suggests
>> that both must implement the same or similar committer.
>>
>> - Optimizing RocksDB: I still need to do this but I don't suspect it will
>> help much. From what I understand, these settings shouldn't have a
>> significant impact on the number of requests to S3.
>>
>> Any other ideas how to limit the number of LIST requests are appreciated
>>
>> niedz., 7 sty 2024 o 15:38 Mich Talebzadeh 
>> napisał(a):
>>
>>> OK I assume that your main concern is checkpointing costs.
>>>
>>> - Caching: If your queries read the same data multiple times, caching
>>> the data might reduce the amount of data that needs to be checkpointed.
>>>
>>>
>>> - Optimize Checkpointing Frequency i.e
>>>
>>>- Consider Changelog Checkpointing with RocksDB.  This can
>>>potentially reduce checkpoint size and duration by only storing state
>>>changes, rather than the entire state.
>>>- Adjust Trigger Interval (if possible): While not 

Re: [Structured Streaming] Keeping checkpointing cost under control

2024-01-10 Thread Mich Talebzadeh
Hi,

 With regard to your point

- Caching: Can you please explain what you mean by caching? I know that
when you have batch and streaming sources in a streaming query, then you
can try to cache batch ones to save on reads. But I'm not sure if it's what
you mean, and I don't know how to apply what you suggest to streaming data.

Let us visit this

Caching purpose in Structured Streaming is to store frequently accessed
data in memory or disk for faster retrieval, reducing repeated reads from
sources.

- Types:

   - Memory Caching: Stores data in memory for extremely fast access.
   - Disk Caching: Stores data on disk for larger datasets or persistence
   across triggers


- Scenarios:

Joining Streaming Data with Static Data: Cache static datasets
(e.g., reference tables) to avoid repeated reads for each micro-batch.

   -
   - Reusing Intermediate Results: Cache intermediate dataframes that are
   expensive to compute and used multiple times within the query.
   - Window Operations: Cache data within a window to avoid re-reading for
   subsequent aggregations or calculations within that window.

- Benefits:

   - Performance: Faster query execution by reducing I/O operations and
   computation overhead.
   - Cost Optimization: Reduced reads from external sources can lower
   costs, especially for cloud-based sources.
   - Scalability: Helps handle higher data volumes and throughput by
   minimizing expensive re-computations.


Example codec

scenario 1

static_data = spark.read.load("path/to/static/data") static_data.cache()
streaming_data = spark.readStream.format("...").load() joined_data =
streaming_data.join(static_data, ...) # Static data is cached for efficient
joins

scenario 2

intermediate_df = streaming_data.groupBy(...).count()
intermediate_df.cache()
# Use cached intermediate_df for further transformations or actions

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 10 Jan 2024 at 13:10, Andrzej Zera  wrote:

> Thank you very much for your suggestions. Yes, my main concern is
> checkpointing costs.
>
> I went through your suggestions and here're my comments:
>
> - Caching: Can you please explain what you mean by caching? I know that
> when you have batch and streaming sources in a streaming query, then you
> can try to cache batch ones to save on reads. But I'm not sure if it's what
> you mean, and I don't know how to apply what you suggest to streaming data.
>
> - Optimize Checkpointing Frequency: I'm already using changelog
> checkpointing with RocksDB and increased trigger interval to a maximum
> acceptable value.
>
> - Minimize LIST Request: That's where I can get most savings. My LIST
> requests account for ~70% of checkpointing costs. From what I see, LIST
> requests are ~2.5x the number of PUT requests. Unfortunately, when I
> changed to checkpoting location DBFS, it didn't help with minimizing LIST
> requests. They are roughly at the same level. From what I see, S3 Optimized
> Committer is EMR-specific so I can't use it in Databricks. The fact that I
> don't see a difference between S3 and DBFS checkpoint location suggests
> that both must implement the same or similar committer.
>
> - Optimizing RocksDB: I still need to do this but I don't suspect it will
> help much. From what I understand, these settings shouldn't have a
> significant impact on the number of requests to S3.
>
> Any other ideas how to limit the number of LIST requests are appreciated
>
> niedz., 7 sty 2024 o 15:38 Mich Talebzadeh 
> napisał(a):
>
>> OK I assume that your main concern is checkpointing costs.
>>
>> - Caching: If your queries read the same data multiple times, caching
>> the data might reduce the amount of data that needs to be checkpointed.
>>
>> - Optimize Checkpointing Frequency i.e
>>
>>- Consider Changelog Checkpointing with RocksDB.  This can
>>potentially reduce checkpoint size and duration by only storing state
>>changes, rather than the entire state.
>>- Adjust Trigger Interval (if possible): While not ideal for your
>>near-real time requirement, even a slight increase in the trigger interval
>>(e.g., to 7-8 seconds) can reduce checkpoint frequency and costs.
>>
>> - Minimize LIST Requests:
>>
>>- Enable S3 Optimized Committer: or as you stated consider DBFS
>>
>> You can also optimise RocksDB. Set your state backend to RocksDB, if not
>> already. Here are what I use
>>
>>   # Add RocksDB configurations here
>> 

Re: [Structured Streaming] Keeping checkpointing cost under control

2024-01-10 Thread Andrzej Zera
Thank you very much for your suggestions. Yes, my main concern is
checkpointing costs.

I went through your suggestions and here're my comments:

- Caching: Can you please explain what you mean by caching? I know that
when you have batch and streaming sources in a streaming query, then you
can try to cache batch ones to save on reads. But I'm not sure if it's what
you mean, and I don't know how to apply what you suggest to streaming data.

- Optimize Checkpointing Frequency: I'm already using changelog
checkpointing with RocksDB and increased trigger interval to a maximum
acceptable value.

- Minimize LIST Request: That's where I can get most savings. My LIST
requests account for ~70% of checkpointing costs. From what I see, LIST
requests are ~2.5x the number of PUT requests. Unfortunately, when I
changed to checkpoting location DBFS, it didn't help with minimizing LIST
requests. They are roughly at the same level. From what I see, S3 Optimized
Committer is EMR-specific so I can't use it in Databricks. The fact that I
don't see a difference between S3 and DBFS checkpoint location suggests
that both must implement the same or similar committer.

- Optimizing RocksDB: I still need to do this but I don't suspect it will
help much. From what I understand, these settings shouldn't have a
significant impact on the number of requests to S3.

Any other ideas how to limit the number of LIST requests are appreciated

niedz., 7 sty 2024 o 15:38 Mich Talebzadeh 
napisał(a):

> OK I assume that your main concern is checkpointing costs.
>
> - Caching: If your queries read the same data multiple times, caching the
> data might reduce the amount of data that needs to be checkpointed.
>
> - Optimize Checkpointing Frequency i.e
>
>- Consider Changelog Checkpointing with RocksDB.  This can
>potentially reduce checkpoint size and duration by only storing state
>changes, rather than the entire state.
>- Adjust Trigger Interval (if possible): While not ideal for your
>near-real time requirement, even a slight increase in the trigger interval
>(e.g., to 7-8 seconds) can reduce checkpoint frequency and costs.
>
> - Minimize LIST Requests:
>
>- Enable S3 Optimized Committer: or as you stated consider DBFS
>
> You can also optimise RocksDB. Set your state backend to RocksDB, if not
> already. Here are what I use
>
>   # Add RocksDB configurations here
> spark.conf.set("spark.sql.streaming.stateStore.providerClass",
> "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
> spark.conf.set("spark.sql.streaming.stateStore.rocksdb.changelog",
> "true")
>
> spark.conf.set("spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB",
> "64")  # Example configuration
>
>  spark.conf.set("spark.sql.streaming.stateStore.rocksdb.compaction.style",
> "level")
>
> spark.conf.set("spark.sql.streaming.stateStore.rocksdb.compaction.level.targetFileSizeBase",
> "67108864")
>
> These configurations provide a starting point for tuning RocksDB.
> Depending on your specific use case and requirements, of course, your
> mileage varies.
>
> HTH
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 7 Jan 2024 at 08:07, Andrzej Zera  wrote:
>
>> Usually one or two topics per query. Each query has its own checkpoint
>> directory. Each topic has a few partitions.
>>
>> Performance-wise I don't experience any bottlenecks in terms of
>> checkpointing. It's all about the number of requests (including a high
>> number of LIST requests) and the associated cost.
>>
>> sob., 6 sty 2024 o 13:30 Mich Talebzadeh 
>> napisał(a):
>>
>>> How many topics and checkpoint directories are you dealing with?
>>>
>>> Does each topic has its own checkpoint  on S3?
>>>
>>> All these checkpoints are sequential writes so even SSD would not really
>>> help
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Dad | Technologist | Solutions Architect | Engineer
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such 

unsubscribe

2024-01-10 Thread Daniel Maangi



[apache-spark] documentation on File Metadata _metadata struct

2024-01-10 Thread Jason Horner
All, the only documentation about the File Metadata ( hidden_metadata struct) I can seem to find is on the databricks website https://docs.databricks.com/en/ingestion/file-metadata-column.html#file-metadata-column for reference here is the struct:_metadata: struct (nullable = false) |-- file_path: string (nullable = false) |-- file_name: string (nullable = false) |-- file_size: long (nullable = false) |-- file_block_start: long (nullable = false) |-- file_block_length: long (nullable = false) |-- file_modification_time: timestamp (nullable = false)  As far as I can tell this feature was released as part of spark 3.20 based on this stack overflow post https://stackoverflow.com/questions/62846669/can-i-get-metadata-of-files-reading-by-spark/77238087#77238087 unfortunately I wasn’t able to locate this in the release notes. Though I may have missed it somehow. So I have  the following questions and seeking guidance from the list at how to best approach this Is the documentation “missing” from the spark 3.20 site or am I just unable to find it:While it provides the file_modification_time, there doesn’t seem to be a corresponding file_creation_time Would both of these be issues that should be opened in JIRA?  Both of these seem like simple and useful things to add but are above my ability to submit PR’s for without some guidance. I’m happy to help especially with a documentation PR’ if someone can confirm and get me started in the right direction. I don’t really have the java / scala skills needed to implement the feature.  Thanks for any pointers  

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Unsubscribe

2024-01-09 Thread qi bryce
Unsubscribe


Re: Spark Structured Streaming and Flask REST API for Real-Time Data Ingestion and Analytics.

2024-01-09 Thread Mich Talebzadeh
Hi Ashok,

Thanks for pointing out the databricks article Scalable Spark Structured
Streaming for REST API Destinations | Databricks Blog


I browsed it and it is basically similar to many of us involved with spark
structure streaming with *foreachBatch. *This article and mine both mention
REST API as part of the architecture. However, there are notable
differences I believe.

In my proposed approach:

   1. Event-Driven Model:


   - Spark Streaming waits until Flask REST API makes a request for events
   to be generated within PySpark.
   - Messages are generated and then fed into any sink based on the Flask
   REST API's request.
   - This creates a more event-driven model where Spark generates data when
   prompted by external requests.





In the Databricks article scenario:

Continuous Data Stream:

   - There is an incoming stream of data from sources like Kafka, AWS
   Kinesis, or Azure Event Hub handled by foreachBatch
   - As messages flow off this stream, calls are made to a REST API with
   some or all of the message data.
   - This suggests a continuous flow of data where messages are sent to a
   REST API as soon as they are available in the streaming source.


*Benefits of Event-Driven Model:*


   1. Responsiveness: Ideal for scenarios where data generation needs to be
   aligned with specific events or user actions.
   2. Resource Optimization: Can reduce resource consumption by processing
   data only when needed.
   3. Flexibility: Allows for dynamic control over data generation based on
   external triggers.

*Benefits of Continuous Data Stream Mode with foreachBatch:*

   1. Real-Time Processing: Facilitates immediate analysis and action on
   incoming data.
   2. Handling High Volumes: Well-suited for scenarios with
   continuous, high-volume data streams.
   3. Low-Latency Applications: Essential for applications requiring near
   real-time responses.

*Potential Use Cases for my approach:*

   - On-Demand Data Generation: Generating data for
   simulations, reports, or visualizations based on user requests.
   - Triggered Analytics: Executing specific analytics tasks only when
   certain events occur, such as detecting anomalies or reaching thresholds
   say fraud detection.
   - Custom ETL Processes: Facilitating data
   extraction, transformation, and loading workflows based on external events
   or triggers


Something to note on latency. Event-driven models like mine can potentially
introduce slight latency compared to continuous processing, as data
generation depends on API calls.

So my approach is more event-triggered and responsive to external requests,
while foreachBatch scenario is more continuous and real-time, processing
and sending data as it becomes available.

In summary, both approaches have their merits and are suited to different
use cases depending on the nature of the data flow and processing
requirements.

Cheers

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 9 Jan 2024 at 19:11, ashok34...@yahoo.com 
wrote:

> Hey Mich,
>
> Thanks for this introduction on your forthcoming proposal "Spark
> Structured Streaming and Flask REST API for Real-Time Data Ingestion and
> Analytics". I recently came across an article by Databricks with title 
> Scalable
> Spark Structured Streaming for REST API Destinations
> 
> . Their use case is similar to your suggestion but what they are saying
> is that they have incoming stream of data from sources like Kafka, AWS
> Kinesis, or Azure Event Hub. In other words, a continuous flow of data
> where messages are sent to a REST API as soon as they are available in the
> streaming source. Their approach is practical but wanted to get your
> thoughts on their article with a better understanding on your proposal and
> differences.
>
> Thanks
>
>
> On Tuesday, 9 January 2024 at 00:24:19 GMT, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>
> Please also note that Flask, by default, is a single-threaded web
> framework. While it is suitable for development and small-scale
> applications, it may not handle concurrent requests efficiently in a
> production environment.
> In production, one can utilise Gunicorn (Green Unicorn) which is a WSGI (
> Web Server Gateway Interface) that is commonly used to serve Flask
> applications in 

Re: Spark Structured Streaming and Flask REST API for Real-Time Data Ingestion and Analytics.

2024-01-09 Thread ashok34...@yahoo.com.INVALID
 Hey Mich,
Thanks for this introduction on your forthcoming proposal "Spark Structured 
Streaming and Flask REST API for Real-Time Data Ingestion and Analytics". I 
recently came across an article by Databricks with title Scalable Spark 
Structured Streaming for REST API Destinations. Their use case is similar to 
your suggestion but what they are saying is that they have incoming stream of 
data from sources like Kafka, AWS Kinesis, or Azure Event Hub. In other words, 
a continuous flow of data where messages are sent to a REST API as soon as they 
are available in the streaming source. Their approach is practical but wanted 
to get your thoughts on their article with a better understanding on your 
proposal and differences.
Thanks

On Tuesday, 9 January 2024 at 00:24:19 GMT, Mich Talebzadeh 
 wrote:  
 
 Please also note that Flask, by default, is a single-threaded web framework. 
While it is suitable for development and small-scale applications, it may not 
handle concurrent requests efficiently in a production environment.In 
production, one can utilise Gunicorn (Green Unicorn) which is a WSGI ( Web 
Server Gateway Interface) that is commonly used to serve Flask applications in 
production. It provides multiple worker processes, each capable of handling a 
single request at a time. This makes Gunicorn suitable for handling multiple 
simultaneous requests and improves the concurrency and performance of your 
Flask application.

HTH
Mich Talebzadeh,Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom



   view my Linkedin profile




 https://en.everybodywiki.com/Mich_Talebzadeh

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destructionof data or any other property which may arise from relying 
on this email's technical content is explicitly disclaimed.The author will in 
no case be liable for any monetary damages arising from suchloss, damage or 
destruction. 

 


On Mon, 8 Jan 2024 at 19:30, Mich Talebzadeh  wrote:

Thought it might be useful to share my idea with fellow forum members.  During 
the breaks, I worked on the seamless integration of Spark Structured Streaming 
with Flask REST API for real-time data ingestion and analytics. The use case 
revolves around a scenario where data is generated through REST API requests in 
real time. The Flask REST API efficiently captures and processes this data, 
saving it to a Spark Structured Streaming DataFrame. Subsequently, the 
processed data could be channelled into any sink of your choice including Kafka 
pipeline, showing a robust end-to-end solution for dynamic and responsive data 
streaming. I will delve into the architecture, implementation, and benefits of 
this combination, enabling one to build an agile and efficient real-time data 
application. I will put the code in GitHub for everyone's benefit. Hopefully 
your comments will help me to improve it.
Cheers
Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom



   view my Linkedin profile




 https://en.everybodywiki.com/Mich_Talebzadeh

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destructionof data or any other property which may arise from relying 
on this email's technical content is explicitly disclaimed.The author will in 
no case be liable for any monetary damages arising from suchloss, damage or 
destruction. 

 

  

Unsubscribe

2024-01-09 Thread mahzad kalantari
Unsubscribe


Unsubscribe

2024-01-09 Thread Kalhara Gurugamage
Unsubscribe


Re: Spark Structured Streaming and Flask REST API for Real-Time Data Ingestion and Analytics.

2024-01-08 Thread Mich Talebzadeh
Please also note that Flask, by default, is a single-threaded web
framework. While it is suitable for development and small-scale
applications, it may not handle concurrent requests efficiently in a
production environment.
In production, one can utilise Gunicorn (Green Unicorn) which is a WSGI (
Web Server Gateway Interface) that is commonly used to serve Flask
applications in production. It provides multiple worker processes, each
capable of handling a single request at a time. This makes Gunicorn
suitable for handling multiple simultaneous requests and improves the
concurrency and performance of your Flask application.

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 8 Jan 2024 at 19:30, Mich Talebzadeh 
wrote:

> Thought it might be useful to share my idea with fellow forum members.  During
> the breaks, I worked on the *seamless integration of Spark Structured
> Streaming with Flask REST API for real-time data ingestion and analytics*.
> The use case revolves around a scenario where data is generated through
> REST API requests in real time. The Flask REST AP
> I efficiently
> captures and processes this data, saving it to a Spark Structured Streaming
> DataFrame. Subsequently, the processed data could be channelled into any
> sink of your choice including Kafka pipeline, showing a robust end-to-end
> solution for dynamic and responsive data streaming. I will delve into the
> architecture, implementation, and benefits of this combination, enabling
> one to build an agile and efficient real-time data application. I will put
> the code in GitHub for everyone's benefit. Hopefully your comments will
> help me to improve it.
>
> Cheers
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Spark Structured Streaming and Flask REST API for Real-Time Data Ingestion and Analytics.

2024-01-08 Thread Mich Talebzadeh
Thought it might be useful to share my idea with fellow forum members.  During
the breaks, I worked on the *seamless integration of Spark Structured
Streaming with Flask REST API for real-time data ingestion and analytics*.
The use case revolves around a scenario where data is generated through
REST API requests in real time. The Flask REST AP
I efficiently captures
and processes this data, saving it to a Spark Structured Streaming
DataFrame. Subsequently, the processed data could be channelled into any
sink of your choice including Kafka pipeline, showing a robust end-to-end
solution for dynamic and responsive data streaming. I will delve into the
architecture, implementation, and benefits of this combination, enabling
one to build an agile and efficient real-time data application. I will put
the code in GitHub for everyone's benefit. Hopefully your comments will
help me to improve it.

Cheers

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: Pyspark UDF as a data source for streaming

2024-01-08 Thread Mich Talebzadeh
Hi,

Have you come back with some ideas for implementing this? Specifically
integrating Spark Structured Streaming with REST API? FYI, I did some work
on it as it can have potential wider use cases, i.e. the seamless
integration of Spark Structured Streaming with Flask REST API for real-time
data ingestion and analytics. My use case revolves around a scenario where
data is generated through REST API requests in real time with Pyspark.. The
Flask REST API efficiently captures and processes this data, saving it to a
sync of your choice like a data warehouse or kafka.

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 27 Dec 2023 at 12:16, Поротиков Станислав Вячеславович
 wrote:

> Hello!
>
> Is it possible to write pyspark UDF, generated data to streaming dataframe?
>
> I want to get some data from REST API requests in real time and consider
> to save this data to dataframe.
>
> And then put it to Kafka.
>
> I can't realise how to create streaming dataframe from generated data.
>
>
>
> I am new in spark streaming.
>
> Could you give me some hints?
>
>
>
> Best regards,
>
> Stanislav Porotikov
>
>
>


[ANNOUNCE] Apache Celeborn(incubating) 0.3.2 available

2024-01-07 Thread Nicholas Jiang
Hi all,

Apache Celeborn(Incubating) community is glad to announce the
new release of Apache Celeborn(Incubating) 0.3.2.

Celeborn is dedicated to improving the efficiency and elasticity of
different map-reduce engines and provides an elastic, high-efficient
service for intermediate data including shuffle data, spilled data,
result data, etc.


Download Link: https://celeborn.apache.org/download/

GitHub Release Tag:

- https://github.com/apache/incubator-celeborn/releases/tag/v0.3.2-incubating

Release Notes:

- https://celeborn.apache.org/community/release_notes/release_note_0.3.2


Home Page: https://celeborn.apache.org/

Celeborn Resources:

- Issue Management: https://issues.apache.org/jira/projects/CELEBORN
- Mailing List: d...@celeborn.apache.org

Regards,
Nicholas Jiang
On behalf of the Apache Celeborn(incubating) community

Re: [Structured Streaming] Keeping checkpointing cost under control

2024-01-07 Thread Mich Talebzadeh
OK I assume that your main concern is checkpointing costs.

- Caching: If your queries read the same data multiple times, caching the
data might reduce the amount of data that needs to be checkpointed.

- Optimize Checkpointing Frequency i.e

   - Consider Changelog Checkpointing with RocksDB.  This can
   potentially reduce checkpoint size and duration by only storing state
   changes, rather than the entire state.
   - Adjust Trigger Interval (if possible): While not ideal for your
   near-real time requirement, even a slight increase in the trigger interval
   (e.g., to 7-8 seconds) can reduce checkpoint frequency and costs.

- Minimize LIST Requests:

   - Enable S3 Optimized Committer: or as you stated consider DBFS

You can also optimise RocksDB. Set your state backend to RocksDB, if not
already. Here are what I use

  # Add RocksDB configurations here
spark.conf.set("spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.changelog",
"true")

spark.conf.set("spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB",
"64")  # Example configuration

 spark.conf.set("spark.sql.streaming.stateStore.rocksdb.compaction.style",
"level")

spark.conf.set("spark.sql.streaming.stateStore.rocksdb.compaction.level.targetFileSizeBase",
"67108864")

These configurations provide a starting point for tuning RocksDB. Depending
on your specific use case and requirements, of course, your mileage varies.

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 7 Jan 2024 at 08:07, Andrzej Zera  wrote:

> Usually one or two topics per query. Each query has its own checkpoint
> directory. Each topic has a few partitions.
>
> Performance-wise I don't experience any bottlenecks in terms of
> checkpointing. It's all about the number of requests (including a high
> number of LIST requests) and the associated cost.
>
> sob., 6 sty 2024 o 13:30 Mich Talebzadeh 
> napisał(a):
>
>> How many topics and checkpoint directories are you dealing with?
>>
>> Does each topic has its own checkpoint  on S3?
>>
>> All these checkpoints are sequential writes so even SSD would not really
>> help
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Dad | Technologist | Solutions Architect | Engineer
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sat, 6 Jan 2024 at 08:19, Andrzej Zera  wrote:
>>
>>> Hey,
>>>
>>> I'm running a few Structured Streaming jobs (with Spark 3.5.0) that
>>> require near-real time accuracy with trigger intervals in the level of 5-10
>>> seconds. I usually run 3-6 streaming queries as part of the job and each
>>> query includes at least one stateful operation (and usually two or more).
>>> My checkpoint location is S3 bucket and I use RocksDB as a state store.
>>> Unfortunately, checkpointing costs are quite high. It's the main cost item
>>> of the system and it's roughly 4-5 times the cost of compute.
>>>
>>> To save on compute costs, the following things are usually recommended:
>>>
>>>- increase trigger interval (as mentioned, I don't have much space
>>>here)
>>>- decrease the number of shuffle partitions (I have 2x the number of
>>>workers)
>>>
>>> I'm looking for some other recommendations that I can use to save on
>>> checkpointing costs. I saw that most requests are LIST requests. Can we cut
>>> them down somehow? I'm using Databricks. If I replace S3 bucket with DBFS,
>>> will it help in any way?
>>>
>>> Thank you!
>>> Andrzej
>>>
>>>


Re: [Structured Streaming] Keeping checkpointing cost under control

2024-01-07 Thread Andrzej Zera
Usually one or two topics per query. Each query has its own checkpoint
directory. Each topic has a few partitions.

Performance-wise I don't experience any bottlenecks in terms of
checkpointing. It's all about the number of requests (including a high
number of LIST requests) and the associated cost.

sob., 6 sty 2024 o 13:30 Mich Talebzadeh 
napisał(a):

> How many topics and checkpoint directories are you dealing with?
>
> Does each topic has its own checkpoint  on S3?
>
> All these checkpoints are sequential writes so even SSD would not really
> help
>
> HTH
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 6 Jan 2024 at 08:19, Andrzej Zera  wrote:
>
>> Hey,
>>
>> I'm running a few Structured Streaming jobs (with Spark 3.5.0) that
>> require near-real time accuracy with trigger intervals in the level of 5-10
>> seconds. I usually run 3-6 streaming queries as part of the job and each
>> query includes at least one stateful operation (and usually two or more).
>> My checkpoint location is S3 bucket and I use RocksDB as a state store.
>> Unfortunately, checkpointing costs are quite high. It's the main cost item
>> of the system and it's roughly 4-5 times the cost of compute.
>>
>> To save on compute costs, the following things are usually recommended:
>>
>>- increase trigger interval (as mentioned, I don't have much space
>>here)
>>- decrease the number of shuffle partitions (I have 2x the number of
>>workers)
>>
>> I'm looking for some other recommendations that I can use to save on
>> checkpointing costs. I saw that most requests are LIST requests. Can we cut
>> them down somehow? I'm using Databricks. If I replace S3 bucket with DBFS,
>> will it help in any way?
>>
>> Thank you!
>> Andrzej
>>
>>


Re: [Structured Streaming] Keeping checkpointing cost under control

2024-01-06 Thread Mich Talebzadeh
How many topics and checkpoint directories are you dealing with?

Does each topic has its own checkpoint  on S3?

All these checkpoints are sequential writes so even SSD would not really
help

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 6 Jan 2024 at 08:19, Andrzej Zera  wrote:

> Hey,
>
> I'm running a few Structured Streaming jobs (with Spark 3.5.0) that
> require near-real time accuracy with trigger intervals in the level of 5-10
> seconds. I usually run 3-6 streaming queries as part of the job and each
> query includes at least one stateful operation (and usually two or more).
> My checkpoint location is S3 bucket and I use RocksDB as a state store.
> Unfortunately, checkpointing costs are quite high. It's the main cost item
> of the system and it's roughly 4-5 times the cost of compute.
>
> To save on compute costs, the following things are usually recommended:
>
>- increase trigger interval (as mentioned, I don't have much space
>here)
>- decrease the number of shuffle partitions (I have 2x the number of
>workers)
>
> I'm looking for some other recommendations that I can use to save on
> checkpointing costs. I saw that most requests are LIST requests. Can we cut
> them down somehow? I'm using Databricks. If I replace S3 bucket with DBFS,
> will it help in any way?
>
> Thank you!
> Andrzej
>
>


[Structured Streaming] Keeping checkpointing cost under control

2024-01-05 Thread Andrzej Zera
Hey,

I'm running a few Structured Streaming jobs (with Spark 3.5.0) that require
near-real time accuracy with trigger intervals in the level of 5-10
seconds. I usually run 3-6 streaming queries as part of the job and each
query includes at least one stateful operation (and usually two or more).
My checkpoint location is S3 bucket and I use RocksDB as a state store.
Unfortunately, checkpointing costs are quite high. It's the main cost item
of the system and it's roughly 4-5 times the cost of compute.

To save on compute costs, the following things are usually recommended:

   - increase trigger interval (as mentioned, I don't have much space here)
   - decrease the number of shuffle partitions (I have 2x the number of
   workers)

I'm looking for some other recommendations that I can use to save on
checkpointing costs. I saw that most requests are LIST requests. Can we cut
them down somehow? I'm using Databricks. If I replace S3 bucket with DBFS,
will it help in any way?

Thank you!
Andrzej


Re: Issue with Spark Session Initialization in Kubernetes Deployment

2024-01-05 Thread Mich Talebzadeh
Hi,

I personally do not use the Spark operator.

 Anyhow, the Spark Operator automates the deployment and management of
Spark applications within Kubernetes. However, it does not eliminate the
need to configure Spark sessions for proper communication with the k8
cluster.  So specifying the master URL is mandatory even when using the
Spark Operator in Kubernetes deployments.

from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("AppConstants.APPLICATION_NAME") \
.config("spark.master",
"k8s://https://:")
\
.getOrCreate()

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 4 Jan 2024 at 21:29, Atul Patil  wrote:

> Hello Team,
>
> I am currently working on initializing a Spark session using Spark
> Structure Streaming within a Kubernetes deployment managed by the Spark
> Operator. During the initialization process, I encountered an error message
> indicating the necessity to set a master URL:
>
> *"Caused by: org.apache.spark.SparkException: A master URL must be set in
> your configuration."*
>
> Could you kindly confirm if specifying the master URL is mandatory in this
> context of using Spark Structure Streaming programming with the Spark
> Operator for Kubernetes deployments?
>
> Below is the code snippet I am using for Spark session initialization:
> SparkSession sparkSession = SparkSession.builder()
> .appName(AppConstants.APPLICATION_NAME)
> .getOrCreate()
>
> Thank you!
>
> Regards,
> Atul
>
>
>


Issue with Spark Session Initialization in Kubernetes Deployment

2024-01-04 Thread Atul Patil
Hello Team,

I am currently working on initializing a Spark session using Spark
Structure Streaming within a Kubernetes deployment managed by the Spark
Operator. During the initialization process, I encountered an error message
indicating the necessity to set a master URL:

*"Caused by: org.apache.spark.SparkException: A master URL must be set in
your configuration."*

Could you kindly confirm if specifying the master URL is mandatory in this
context of using Spark Structure Streaming programming with the Spark
Operator for Kubernetes deployments?

Below is the code snippet I am using for Spark session initialization:
SparkSession sparkSession = SparkSession.builder()
.appName(AppConstants.APPLICATION_NAME)
.getOrCreate()

Thank you!

Regards,
Atul


Unsubscribe

2024-01-02 Thread Atlas - Samir Souidi
Unsubscribe

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Select Columns from Dataframe in Java

2023-12-30 Thread Grisha Weintraub
Hi,

Have a look here -
https://repost.aws/knowledge-center/spark-driver-logs-emr-cluster.
Usually, you have application logs out-of-the-box in the driver stdout.
It looks like
"s3://aws-logs--us-east-1/elasticmapreduce/j-35PUYZBQVIJNM/containers/application_1572839353552_0008/container_1572839353552_0008_01_01/stdout.gz".

Grisha

On Sat, Dec 30, 2023 at 2:49 PM PRASHANT L  wrote:

> Hi Grisha
> This is Great :) It worked thanks alot
>
> I have this requirement , I will be running my spark application on EMR
> and build a custom logging to create logs on S3. Any idea what should I do?
> or In general if i create a custom log (with my Application name ), where
> will logs be generated when run in cluster mode (since in cluster mode jobs
> are executed all over different machine)
>
> On Sat, Dec 30, 2023 at 1:56 PM Grisha Weintraub <
> grisha.weintr...@gmail.com> wrote:
>
>> In Java, it expects an array of Columns, so you can simply cast your list
>> to an array:
>>
>> array_df.select(fields.toArray(new Column[0]))
>>
>>
>>
>>
>> On Fri, Dec 29, 2023 at 10:58 PM PRASHANT L 
>> wrote:
>>
>>>
>>> Team
>>> I am using Java and want to select columns from Dataframe , columns are
>>> stored in List
>>> equivalent of below scala code
>>> *  array_df=array_df.select(fields: _*)*
>>>
>>>
>>> When I try array_df=array_df.select(fields) , I get error saying Cast to
>>> Column
>>>
>>> I am using Spark 3.4
>>>
>>


Re: Select Columns from Dataframe in Java

2023-12-30 Thread PRASHANT L
Hi Grisha
This is Great :) It worked thanks alot

I have this requirement , I will be running my spark application on EMR
and build a custom logging to create logs on S3. Any idea what should I do?
or In general if i create a custom log (with my Application name ), where
will logs be generated when run in cluster mode (since in cluster mode jobs
are executed all over different machine)

On Sat, Dec 30, 2023 at 1:56 PM Grisha Weintraub 
wrote:

> In Java, it expects an array of Columns, so you can simply cast your list
> to an array:
>
> array_df.select(fields.toArray(new Column[0]))
>
>
>
>
> On Fri, Dec 29, 2023 at 10:58 PM PRASHANT L  wrote:
>
>>
>> Team
>> I am using Java and want to select columns from Dataframe , columns are
>> stored in List
>> equivalent of below scala code
>> *  array_df=array_df.select(fields: _*)*
>>
>>
>> When I try array_df=array_df.select(fields) , I get error saying Cast to
>> Column
>>
>> I am using Spark 3.4
>>
>


Re: Select Columns from Dataframe in Java

2023-12-30 Thread Grisha Weintraub
In Java, it expects an array of Columns, so you can simply cast your list
to an array:

array_df.select(fields.toArray(new Column[0]))




On Fri, Dec 29, 2023 at 10:58 PM PRASHANT L  wrote:

>
> Team
> I am using Java and want to select columns from Dataframe , columns are
> stored in List
> equivalent of below scala code
> *  array_df=array_df.select(fields: _*)*
>
>
> When I try array_df=array_df.select(fields) , I get error saying Cast to
> Column
>
> I am using Spark 3.4
>


Unsubscribe

2023-12-29 Thread Vinti Maheshwari



Re: Pyspark UDF as a data source for streaming

2023-12-29 Thread Mich Talebzadeh
Hi,

Do you have more info on this Jira besides the github link as I don't seem
to find it!

Thanks

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 28 Dec 2023 at 09:33, Hyukjin Kwon  wrote:

> Just fyi streaming python data source is in progress
> https://github.com/apache/spark/pull/44416 we will likely release this in
> spark 4.0
>
> On Thu, Dec 28, 2023 at 4:53 PM Поротиков Станислав Вячеславович
>  wrote:
>
>> Yes, it's actual data.
>>
>>
>>
>> Best regards,
>>
>> Stanislav Porotikov
>>
>>
>>
>> *From:* Mich Talebzadeh 
>> *Sent:* Wednesday, December 27, 2023 9:43 PM
>> *Cc:* user@spark.apache.org
>> *Subject:* Re: Pyspark UDF as a data source for streaming
>>
>>
>>
>> Is this generated data actual data or you are testing the application?
>>
>>
>>
>> Sounds like a form of Lambda architecture here with some
>> decision/processing not far from the attached diagram
>>
>>
>>
>> HTH
>>
>>
>> Mich Talebzadeh,
>>
>> Dad | Technologist | Solutions Architect | Engineer
>>
>> London
>>
>> United Kingdom
>>
>>
>>
>>  [image: Рисунок удален отправителем.]  view my Linkedin profile
>> 
>>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>>
>>
>>
>> On Wed, 27 Dec 2023 at 13:26, Поротиков Станислав Вячеславович <
>> s.poroti...@skbkontur.ru> wrote:
>>
>> Actually it's json with specific structure from API server.
>>
>> But the task is to check constantly if new data appears on API server and
>> load it to Kafka.
>>
>> Full pipeline can be presented like that:
>>
>> REST API -> Kafka -> some processing -> Kafka/Mongo -> …
>>
>>
>>
>> Best regards,
>>
>> Stanislav Porotikov
>>
>>
>>
>> *From:* Mich Talebzadeh 
>> *Sent:* Wednesday, December 27, 2023 6:17 PM
>> *To:* Поротиков Станислав Вячеславович 
>> *Cc:* user@spark.apache.org
>> *Subject:* Re: Pyspark UDF as a data source for streaming
>>
>>
>>
>> Ok so you want to generate some random data and load it into Kafka on a
>> regular interval and the rest?
>>
>>
>>
>> HTH
>>
>> Mich Talebzadeh,
>>
>> Dad | Technologist | Solutions Architect | Engineer
>>
>> London
>>
>> United Kingdom
>>
>>
>>
>>  [image: Рисунок удален отправителем.]  view my Linkedin profile
>> 
>>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>>
>>
>>
>> On Wed, 27 Dec 2023 at 12:16, Поротиков Станислав Вячеславович
>>  wrote:
>>
>> Hello!
>>
>> Is it possible to write pyspark UDF, generated data to streaming
>> dataframe?
>>
>> I want to get some data from REST API requests in real time and consider
>> to save this data to dataframe.
>>
>> And then put it to Kafka.
>>
>> I can't realise how to create streaming dataframe from generated data.
>>
>>
>>
>> I am new in spark streaming.
>>
>> Could you give me some hints?
>>
>>
>>
>> Best regards,
>>
>> Stanislav Porotikov
>>
>>
>>
>>


Re: the life cycle shuffle Dependency

2023-12-29 Thread murat migdisoglu
Hello, why would you like to delete the shuffle data yourself in the first
place?

On Thu, Dec 28, 2023, 10:08 yang chen  wrote:

>
> hi, I'm learning spark, and wonder when to delete shuffle data, I find the
> ContextCleaner class which clean the shuffle data when shuffle dependency
> is GC-ed.  Based on source code, the shuffle dependency is gc-ed only when
> active job finish, but i'm not sure,  Could you explain the life cycle of a
> Shuffle-Dependency?
>


Select Columns from Dataframe in Java

2023-12-29 Thread PRASHANT L
Team
I am using Java and want to select columns from Dataframe , columns are
stored in List
equivalent of below scala code
*  array_df=array_df.select(fields: _*)*


When I try array_df=array_df.select(fields) , I get error saying Cast to
Column

I am using Spark 3.4


Re: Pyspark UDF as a data source for streaming

2023-12-28 Thread Mich Talebzadeh
Hi Stanislav ,

On Pyspark DF can you the following

df.printSchema()

and send the output please

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 28 Dec 2023 at 12:31, Поротиков Станислав Вячеславович <
s.poroti...@skbkontur.ru> wrote:

> Ok. Thank you very much!
>
>
>
> Best regards,
>
> Stanislav Porotikov
>
>
>
> *From:* Mich Talebzadeh 
> *Sent:* Thursday, December 28, 2023 5:14 PM
> *To:* Hyukjin Kwon 
> *Cc:* Поротиков Станислав Вячеславович ;
> user@spark.apache.org
> *Subject:* Re: Pyspark UDF as a data source for streaming
>
>
>
> You can work around this issue by trying to write your DF to a flat file
> and use Kafka to pick it up from the flat file and stream it in.
>
>
>
> Bear in mind that Kafa will require a unique identifier as K/V pair. Check
> this link how to generate UUID for this purpose
>
>
> https://stackoverflow.com/questions/49785108/spark-streaming-with-python-how-to-add-a-uuid-column
>
>
>
> HTH
>
>
> Mich Talebzadeh,
>
> Dad | Technologist | Solutions Architect | Engineer
>
> London
>
> United Kingdom
>
>
>
>  [image: Рисунок удален отправителем.]  view my Linkedin profile
> 
>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Thu, 28 Dec 2023 at 09:33, Hyukjin Kwon  wrote:
>
> Just fyi streaming python data source is in progress
>
> https://github.com/apache/spark/pull/44416 we will likely release this in
> spark 4.0
>
>
>
> On Thu, Dec 28, 2023 at 4:53 PM Поротиков Станислав Вячеславович
>  wrote:
>
> Yes, it's actual data.
>
>
>
> Best regards,
>
> Stanislav Porotikov
>
>
>
> *From:* Mich Talebzadeh 
> *Sent:* Wednesday, December 27, 2023 9:43 PM
> *Cc:* user@spark.apache.org
> *Subject:* Re: Pyspark UDF as a data source for streaming
>
>
>
> Is this generated data actual data or you are testing the application?
>
>
>
> Sounds like a form of Lambda architecture here with some
> decision/processing not far from the attached diagram
>
>
>
> HTH
>
>
> Mich Talebzadeh,
>
> Dad | Technologist | Solutions Architect | Engineer
>
> London
>
> United Kingdom
>
>
>
>  [image: Рисунок удален отправителем.]  view my Linkedin profile
> 
>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Wed, 27 Dec 2023 at 13:26, Поротиков Станислав Вячеславович <
> s.poroti...@skbkontur.ru> wrote:
>
> Actually it's json with specific structure from API server.
>
> But the task is to check constantly if new data appears on API server and
> load it to Kafka.
>
> Full pipeline can be presented like that:
>
> REST API -> Kafka -> some processing -> Kafka/Mongo -> …
>
>
>
> Best regards,
>
> Stanislav Porotikov
>
>
>
> *From:* Mich Talebzadeh 
> *Sent:* Wednesday, December 27, 2023 6:17 PM
> *To:* Поротиков Станислав Вячеславович 
> *Cc:* user@spark.apache.org
> *Subject:* Re: Pyspark UDF as a data source for streaming
>
>
>
> Ok so you want to generate some random data and load it into Kafka on a
> regular interval and the rest?
>
>
>
> HTH
>
> Mich Talebzadeh,
>
> Dad | Technologist | Solutions Architect | Engineer
>
> London
>
> United Kingdom
>
>
>
>  [image: Рисунок удален отправителем.]  view my Linkedin profile
> 
>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Wed, 27 Dec 2023 at 12:16, Поротиков Станислав Вячеславович
>  

RE: Pyspark UDF as a data source for streaming

2023-12-28 Thread Поротиков Станислав Вячеславович
Ok. Thank you very much!

Best regards,
Stanislav Porotikov

From: Mich Talebzadeh 
Sent: Thursday, December 28, 2023 5:14 PM
To: Hyukjin Kwon 
Cc: Поротиков Станислав Вячеславович ; 
user@spark.apache.org
Subject: Re: Pyspark UDF as a data source for streaming

You can work around this issue by trying to write your DF to a flat file and 
use Kafka to pick it up from the flat file and stream it in.

Bear in mind that Kafa will require a unique identifier as K/V pair. Check this 
link how to generate UUID for this purpose

https://stackoverflow.com/questions/49785108/spark-streaming-with-python-how-to-add-a-uuid-column

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


 [Рисунок удален отправителем.]   view my Linkedin 
profile

 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Thu, 28 Dec 2023 at 09:33, Hyukjin Kwon 
mailto:gurwls...@apache.org>> wrote:
Just fyi streaming python data source is in progress
https://github.com/apache/spark/pull/44416 we will likely release this in spark 
4.0

On Thu, Dec 28, 2023 at 4:53 PM Поротиков Станислав Вячеславович 
 wrote:
Yes, it's actual data.

Best regards,
Stanislav Porotikov

From: Mich Talebzadeh 
mailto:mich.talebza...@gmail.com>>
Sent: Wednesday, December 27, 2023 9:43 PM
Cc: user@spark.apache.org
Subject: Re: Pyspark UDF as a data source for streaming

Is this generated data actual data or you are testing the application?

Sounds like a form of Lambda architecture here with some decision/processing 
not far from the attached diagram

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


 [Рисунок удален отправителем.]   view my Linkedin 
profile

 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Wed, 27 Dec 2023 at 13:26, Поротиков Станислав Вячеславович 
mailto:s.poroti...@skbkontur.ru>> wrote:
Actually it's json with specific structure from API server.
But the task is to check constantly if new data appears on API server and load 
it to Kafka.
Full pipeline can be presented like that:
REST API -> Kafka -> some processing -> Kafka/Mongo -> …

Best regards,
Stanislav Porotikov

From: Mich Talebzadeh 
mailto:mich.talebza...@gmail.com>>
Sent: Wednesday, December 27, 2023 6:17 PM
To: Поротиков Станислав Вячеславович 
Cc: user@spark.apache.org
Subject: Re: Pyspark UDF as a data source for streaming

Ok so you want to generate some random data and load it into Kafka on a regular 
interval and the rest?

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


 [Рисунок удален отправителем.]   view my Linkedin 
profile

 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Wed, 27 Dec 2023 at 12:16, Поротиков Станислав Вячеславович 
 wrote:
Hello!
Is it possible to write pyspark UDF, generated data to streaming dataframe?
I want to get some data from REST API requests in real time and consider to 
save this data to dataframe.
And then put it to Kafka.
I can't realise how to create streaming dataframe from generated data.

I am new in spark streaming.
Could you give me some hints?

Best regards,
Stanislav Porotikov



Re: Pyspark UDF as a data source for streaming

2023-12-28 Thread Mich Talebzadeh
You can work around this issue by trying to write your DF to a flat file
and use Kafka to pick it up from the flat file and stream it in.

Bear in mind that Kafa will require a unique identifier as K/V pair. Check
this link how to generate UUID for this purpose

https://stackoverflow.com/questions/49785108/spark-streaming-with-python-how-to-add-a-uuid-column

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 28 Dec 2023 at 09:33, Hyukjin Kwon  wrote:

> Just fyi streaming python data source is in progress
> https://github.com/apache/spark/pull/44416 we will likely release this in
> spark 4.0
>
> On Thu, Dec 28, 2023 at 4:53 PM Поротиков Станислав Вячеславович
>  wrote:
>
>> Yes, it's actual data.
>>
>>
>>
>> Best regards,
>>
>> Stanislav Porotikov
>>
>>
>>
>> *From:* Mich Talebzadeh 
>> *Sent:* Wednesday, December 27, 2023 9:43 PM
>> *Cc:* user@spark.apache.org
>> *Subject:* Re: Pyspark UDF as a data source for streaming
>>
>>
>>
>> Is this generated data actual data or you are testing the application?
>>
>>
>>
>> Sounds like a form of Lambda architecture here with some
>> decision/processing not far from the attached diagram
>>
>>
>>
>> HTH
>>
>>
>> Mich Talebzadeh,
>>
>> Dad | Technologist | Solutions Architect | Engineer
>>
>> London
>>
>> United Kingdom
>>
>>
>>
>>  [image: Рисунок удален отправителем.]  view my Linkedin profile
>> 
>>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>>
>>
>>
>> On Wed, 27 Dec 2023 at 13:26, Поротиков Станислав Вячеславович <
>> s.poroti...@skbkontur.ru> wrote:
>>
>> Actually it's json with specific structure from API server.
>>
>> But the task is to check constantly if new data appears on API server and
>> load it to Kafka.
>>
>> Full pipeline can be presented like that:
>>
>> REST API -> Kafka -> some processing -> Kafka/Mongo -> …
>>
>>
>>
>> Best regards,
>>
>> Stanislav Porotikov
>>
>>
>>
>> *From:* Mich Talebzadeh 
>> *Sent:* Wednesday, December 27, 2023 6:17 PM
>> *To:* Поротиков Станислав Вячеславович 
>> *Cc:* user@spark.apache.org
>> *Subject:* Re: Pyspark UDF as a data source for streaming
>>
>>
>>
>> Ok so you want to generate some random data and load it into Kafka on a
>> regular interval and the rest?
>>
>>
>>
>> HTH
>>
>> Mich Talebzadeh,
>>
>> Dad | Technologist | Solutions Architect | Engineer
>>
>> London
>>
>> United Kingdom
>>
>>
>>
>>  [image: Рисунок удален отправителем.]  view my Linkedin profile
>> 
>>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>>
>>
>>
>> On Wed, 27 Dec 2023 at 12:16, Поротиков Станислав Вячеславович
>>  wrote:
>>
>> Hello!
>>
>> Is it possible to write pyspark UDF, generated data to streaming
>> dataframe?
>>
>> I want to get some data from REST API requests in real time and consider
>> to save this data to dataframe.
>>
>> And then put it to Kafka.
>>
>> I can't realise how to create streaming dataframe from generated data.
>>
>>
>>
>> I am new in spark streaming.
>>
>> Could you give me some hints?
>>
>>
>>
>> Best regards,
>>
>> Stanislav Porotikov
>>
>>
>>
>>


Re: Pyspark UDF as a data source for streaming

2023-12-28 Thread Hyukjin Kwon
Just fyi streaming python data source is in progress
https://github.com/apache/spark/pull/44416 we will likely release this in
spark 4.0

On Thu, Dec 28, 2023 at 4:53 PM Поротиков Станислав Вячеславович
 wrote:

> Yes, it's actual data.
>
>
>
> Best regards,
>
> Stanislav Porotikov
>
>
>
> *From:* Mich Talebzadeh 
> *Sent:* Wednesday, December 27, 2023 9:43 PM
> *Cc:* user@spark.apache.org
> *Subject:* Re: Pyspark UDF as a data source for streaming
>
>
>
> Is this generated data actual data or you are testing the application?
>
>
>
> Sounds like a form of Lambda architecture here with some
> decision/processing not far from the attached diagram
>
>
>
> HTH
>
>
> Mich Talebzadeh,
>
> Dad | Technologist | Solutions Architect | Engineer
>
> London
>
> United Kingdom
>
>
>
>  [image: Рисунок удален отправителем.]  view my Linkedin profile
> 
>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Wed, 27 Dec 2023 at 13:26, Поротиков Станислав Вячеславович <
> s.poroti...@skbkontur.ru> wrote:
>
> Actually it's json with specific structure from API server.
>
> But the task is to check constantly if new data appears on API server and
> load it to Kafka.
>
> Full pipeline can be presented like that:
>
> REST API -> Kafka -> some processing -> Kafka/Mongo -> …
>
>
>
> Best regards,
>
> Stanislav Porotikov
>
>
>
> *From:* Mich Talebzadeh 
> *Sent:* Wednesday, December 27, 2023 6:17 PM
> *To:* Поротиков Станислав Вячеславович 
> *Cc:* user@spark.apache.org
> *Subject:* Re: Pyspark UDF as a data source for streaming
>
>
>
> Ok so you want to generate some random data and load it into Kafka on a
> regular interval and the rest?
>
>
>
> HTH
>
> Mich Talebzadeh,
>
> Dad | Technologist | Solutions Architect | Engineer
>
> London
>
> United Kingdom
>
>
>
>  [image: Рисунок удален отправителем.]  view my Linkedin profile
> 
>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Wed, 27 Dec 2023 at 12:16, Поротиков Станислав Вячеславович
>  wrote:
>
> Hello!
>
> Is it possible to write pyspark UDF, generated data to streaming dataframe?
>
> I want to get some data from REST API requests in real time and consider
> to save this data to dataframe.
>
> And then put it to Kafka.
>
> I can't realise how to create streaming dataframe from generated data.
>
>
>
> I am new in spark streaming.
>
> Could you give me some hints?
>
>
>
> Best regards,
>
> Stanislav Porotikov
>
>
>
>


RE: Pyspark UDF as a data source for streaming

2023-12-27 Thread Поротиков Станислав Вячеславович
Yes, it's actual data.

Best regards,
Stanislav Porotikov

From: Mich Talebzadeh 
Sent: Wednesday, December 27, 2023 9:43 PM
Cc: user@spark.apache.org
Subject: Re: Pyspark UDF as a data source for streaming

Is this generated data actual data or you are testing the application?

Sounds like a form of Lambda architecture here with some decision/processing 
not far from the attached diagram

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


 [Рисунок удален отправителем.]   view my Linkedin 
profile

 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Wed, 27 Dec 2023 at 13:26, Поротиков Станислав Вячеславович 
mailto:s.poroti...@skbkontur.ru>> wrote:
Actually it's json with specific structure from API server.
But the task is to check constantly if new data appears on API server and load 
it to Kafka.
Full pipeline can be presented like that:
REST API -> Kafka -> some processing -> Kafka/Mongo -> …

Best regards,
Stanislav Porotikov

From: Mich Talebzadeh 
mailto:mich.talebza...@gmail.com>>
Sent: Wednesday, December 27, 2023 6:17 PM
To: Поротиков Станислав Вячеславович 
Cc: user@spark.apache.org
Subject: Re: Pyspark UDF as a data source for streaming

Ok so you want to generate some random data and load it into Kafka on a regular 
interval and the rest?

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


 [Рисунок удален отправителем.]   view my Linkedin 
profile

 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Wed, 27 Dec 2023 at 12:16, Поротиков Станислав Вячеславович 
 wrote:
Hello!
Is it possible to write pyspark UDF, generated data to streaming dataframe?
I want to get some data from REST API requests in real time and consider to 
save this data to dataframe.
And then put it to Kafka.
I can't realise how to create streaming dataframe from generated data.

I am new in spark streaming.
Could you give me some hints?

Best regards,
Stanislav Porotikov



Fwd: the life cycle shuffle Dependency

2023-12-27 Thread yang chen
hi, I'm learning spark, and wonder when to delete shuffle data, I find the
ContextCleaner class which clean the shuffle data when shuffle dependency
is GC-ed.  Based on source code, the shuffle dependency is gc-ed only when
active job finish, but i'm not sure,  Could you explain the life cycle of a
Shuffle-Dependency?


RE: Pyspark UDF as a data source for streaming

2023-12-27 Thread Поротиков Станислав Вячеславович
Actually it's json with specific structure from API server.
But the task is to check constantly if new data appears on API server and load 
it to Kafka.
Full pipeline can be presented like that:
REST API -> Kafka -> some processing -> Kafka/Mongo -> …

Best regards,
Stanislav Porotikov

From: Mich Talebzadeh 
Sent: Wednesday, December 27, 2023 6:17 PM
To: Поротиков Станислав Вячеславович 
Cc: user@spark.apache.org
Subject: Re: Pyspark UDF as a data source for streaming

Ok so you want to generate some random data and load it into Kafka on a regular 
interval and the rest?

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


 [Рисунок удален отправителем.]   view my Linkedin 
profile

 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Wed, 27 Dec 2023 at 12:16, Поротиков Станислав Вячеславович 
 wrote:
Hello!
Is it possible to write pyspark UDF, generated data to streaming dataframe?
I want to get some data from REST API requests in real time and consider to 
save this data to dataframe.
And then put it to Kafka.
I can't realise how to create streaming dataframe from generated data.

I am new in spark streaming.
Could you give me some hints?

Best regards,
Stanislav Porotikov



RE: Pyspark UDF as a data source for streaming

2023-12-27 Thread Поротиков Станислав Вячеславович
Actually it's json with specific structure from API server.
But the task is to check constantly if new data appears on API server and load 
it to Kafka.

Full pipeline can be presented like that:

REST API -> Kafka -> some processing -> Kafka/Mongo -> …

Best regards,
Stanislav Porotikov

From: Mich Talebzadeh 
Sent: Wednesday, December 27, 2023 6:17 PM
To: Поротиков Станислав Вячеславович 
Cc: user@spark.apache.org
Subject: Re: Pyspark UDF as a data source for streaming

Ok so you want to generate some random data and load it into Kafka on a regular 
interval and the rest?

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


 [Рисунок удален отправителем.]   view my Linkedin 
profile

 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Wed, 27 Dec 2023 at 12:16, Поротиков Станислав Вячеславович 
 wrote:
Hello!
Is it possible to write pyspark UDF, generated data to streaming dataframe?
I want to get some data from REST API requests in real time and consider to 
save this data to dataframe.
And then put it to Kafka.
I can't realise how to create streaming dataframe from generated data.

I am new in spark streaming.
Could you give me some hints?

Best regards,
Stanislav Porotikov



Re: Pyspark UDF as a data source for streaming

2023-12-27 Thread Mich Talebzadeh
Ok so you want to generate some random data and load it into Kafka on a
regular interval and the rest?

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 27 Dec 2023 at 12:16, Поротиков Станислав Вячеславович
 wrote:

> Hello!
>
> Is it possible to write pyspark UDF, generated data to streaming dataframe?
>
> I want to get some data from REST API requests in real time and consider
> to save this data to dataframe.
>
> And then put it to Kafka.
>
> I can't realise how to create streaming dataframe from generated data.
>
>
>
> I am new in spark streaming.
>
> Could you give me some hints?
>
>
>
> Best regards,
>
> Stanislav Porotikov
>
>
>


Pyspark UDF as a data source for streaming

2023-12-27 Thread Поротиков Станислав Вячеславович
Hello!
Is it possible to write pyspark UDF, generated data to streaming dataframe?
I want to get some data from REST API requests in real time and consider to 
save this data to dataframe.
And then put it to Kafka.
I can't realise how to create streaming dataframe from generated data.

I am new in spark streaming.
Could you give me some hints?

Best regards,
Stanislav Porotikov



Re: Validate spark sql

2023-12-26 Thread Gourav Sengupta
Dear friend,

thanks a ton was looking for linting for SQL for a long time, looks like
https://sqlfluff.com/ is something that can be used :)

Thank you so much, and wish you all a wonderful new year.

Regards,
Gourav

On Tue, Dec 26, 2023 at 4:42 AM Bjørn Jørgensen 
wrote:

> You can try sqlfluff  it's a linter for SQL code
> and it seems to have support for sparksql
> 
>
>
> man. 25. des. 2023 kl. 17:13 skrev ram manickam :
>
>> Thanks Mich, Nicholas. I tried looking over the stack overflow post and
>> none of them
>> Seems to cover the syntax validation. Do you know if it's even possible
>> to do syntax validation in spark?
>>
>> Thanks
>> Ram
>>
>> On Sun, Dec 24, 2023 at 12:49 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Well not to put too finer point on it, in a public forum, one ought to
>>> respect the importance of open communication. Everyone has the right to ask
>>> questions, seek information, and engage in discussions without facing
>>> unnecessary patronization.
>>>
>>>
>>>
>>> Mich Talebzadeh,
>>> Dad | Technologist | Solutions Architect | Engineer
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Sun, 24 Dec 2023 at 18:27, Nicholas Chammas <
>>> nicholas.cham...@gmail.com> wrote:
>>>
 This is a user-list question, not a dev-list question. Moving this
 conversation to the user list and BCC-ing the dev list.

 Also, this statement

 > We are not validating against table or column existence.

 is not correct. When you call spark.sql(…), Spark will lookup the table
 references and fail with TABLE_OR_VIEW_NOT_FOUND if it cannot find them.

 Also, when you run DDL via spark.sql(…), Spark will actually run it. So
 spark.sql(“drop table my_table”) will actually drop my_table. It’s not a
 validation-only operation.

 This question of validating SQL is already discussed on Stack Overflow
 . You may find some
 useful tips there.

 Nick


 On Dec 24, 2023, at 4:52 AM, Mich Talebzadeh 
 wrote:


 Yes, you can validate the syntax of your PySpark SQL queries without
 connecting to an actual dataset or running the queries on a cluster.
 PySpark provides a method for syntax validation without executing the
 query. Something like below
   __
  / __/__  ___ _/ /__
 _\ \/ _ \/ _ `/ __/  '_/
/__ / .__/\_,_/_/ /_/\_\   version 3.4.0
   /_/

 Using Python version 3.9.16 (main, Apr 24 2023 10:36:11)
 Spark context Web UI available at http://rhes75:4040
 Spark context available as 'sc' (master = local[*], app id =
 local-1703410019374).
 SparkSession available as 'spark'.
 >>> from pyspark.sql import SparkSession
 >>> spark = SparkSession.builder.appName("validate").getOrCreate()
 23/12/24 09:28:02 WARN SparkSession: Using an existing Spark session;
 only runtime SQL configurations will take effect.
 >>> sql = "SELECT * FROM  WHERE  = some value"
 >>> try:
 ...   spark.sql(sql)
 ...   print("is working")
 ... except Exception as e:
 ...   print(f"Syntax error: {e}")
 ...
 Syntax error:
 [PARSE_SYNTAX_ERROR] Syntax error at or near '<'.(line 1, pos 14)

 == SQL ==
 SELECT * FROM  WHERE  = some value
 --^^^

 Here we only check for syntax errors and not the actual existence of
 query semantics. We are not validating against table or column existence.

 This method is useful when you want to catch obvious syntax errors
 before submitting your PySpark job to a cluster, especially when you don't
 have access to the actual data.

 In summary

- Theis method validates syntax but will not catch semantic errors
- If you need more comprehensive validation, consider using a
testing framework and a small dataset.
- For complex queries, using a linter or code analysis tool can
help identify potential issues.

 HTH


 Mich Talebzadeh,
 Dad | Technologist | Solutions Architect | Engineer
 London
 United Kingdom

view my Linkedin profile
 


  https://en.everybodywiki.com/Mich_Talebzadeh


 *Disclaimer:* Use it at 

Re: Validate spark sql

2023-12-26 Thread Mich Talebzadeh
Worth trying EXPLAIN
statement
as suggested by @tianlangstudio
HTH
Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 25 Dec 2023 at 11:15, tianlangstudio 
wrote:

>
> What about EXPLAIN?
> https://spark.apache.org/docs/3.5.0/sql-ref-syntax-qry-explain.html#content
>
>
>
>
> Fusion Zhu 
>
> --
> 发件人:ram manickam 
> 发送时间:2023年12月25日(星期一) 12:58
> 收件人:Mich Talebzadeh
> 抄 送:Nicholas Chammas; user<
> user@spark.apache.org>
> 主 题:Re: Validate spark sql
>
> Thanks Mich, Nicholas. I tried looking over the stack overflow post and
> none of them
> Seems to cover the syntax validation. Do you know if it's even possible to
> do syntax validation in spark?
>
> Thanks
> Ram
>
> On Sun, Dec 24, 2023 at 12:49 PM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
> Well not to put too finer point on it, in a public forum, one ought to
> respect the importance of open communication. Everyone has the right to ask
> questions, seek information, and engage in discussions without facing
> unnecessary patronization.
>
>
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> Disclaimer: Use it at your own risk.Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 24 Dec 2023 at 18:27, Nicholas Chammas 
> wrote:
> This is a user-list question, not a dev-list question. Moving this
> conversation to the user list and BCC-ing the dev list.
>
> Also, this statement
>
> > We are not validating against table or column existence.
>
> is not correct. When you call spark.sql(…), Spark will lookup the table
> references and fail with TABLE_OR_VIEW_NOT_FOUND if it cannot find them.
>
> Also, when you run DDL via spark.sql(…), Spark will actually run it. So
> spark.sql(“drop table my_table”) will actually drop my_table. It’s not a
> validation-only operation.
>
> This question of validating SQL is already discussed on Stack Overflow
> . You may find some useful
> tips there.
>
> Nick
>
>
> On Dec 24, 2023, at 4:52 AM, Mich Talebzadeh 
> wrote:
>
>
> Yes, you can validate the syntax of your PySpark SQL queries without
> connecting to an actual dataset or running the queries on a cluster.
> PySpark provides a method for syntax validation without executing the
> query. Something like below
>   __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/__ / .__/\_,_/_/ /_/\_\   version 3.4.0
>   /_/
> Using Python version 3.9.16 (main, Apr 24 2023 10:36:11)
> Spark context Web UI available at http://rhes75:4040
> Spark context available as 'sc' (master = local[*], app id =
> local-1703410019374).
> SparkSession available as 'spark'.
> >>> from pyspark.sql import SparkSession
> >>> spark = SparkSession.builder.appName("validate").getOrCreate()
> 23/12/24 09:28:02 WARN SparkSession: Using an existing Spark session; only
> runtime SQL configurations will take effect.
> >>> sql = "SELECT * FROM  WHERE  = some value"
> >>> try:
> ...   spark.sql(sql)
> ...   print("is working")
> ... except Exception as e:
> ...   print(f"Syntax error: {e}")
> ...
> Syntax error:
> [PARSE_SYNTAX_ERROR] Syntax error at or near '<'.(line 1, pos 14)
> == SQL ==
> SELECT * FROM  WHERE  = some value
> --^^^
>
> Here we only check for syntax errors and not the actual existence of query
> semantics. We are not validating against table or column existence.
>
> This method is useful when you want to catch obvious syntax errors before
> submitting your PySpark job to a cluster, especially when you don't have
> access to the actual data.
>
> In summary
>
>- Theis method validates syntax but will not catch semantic errors
>- If you need more comprehensive validation, consider using a testing
>framework and a small dataset.
>- For complex queries, using a linter or code analysis 

Re: Validate spark sql

2023-12-25 Thread Bjørn Jørgensen
You can try sqlfluff  it's a linter for SQL code and
it seems to have support for sparksql 


man. 25. des. 2023 kl. 17:13 skrev ram manickam :

> Thanks Mich, Nicholas. I tried looking over the stack overflow post and
> none of them
> Seems to cover the syntax validation. Do you know if it's even possible to
> do syntax validation in spark?
>
> Thanks
> Ram
>
> On Sun, Dec 24, 2023 at 12:49 PM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Well not to put too finer point on it, in a public forum, one ought to
>> respect the importance of open communication. Everyone has the right to ask
>> questions, seek information, and engage in discussions without facing
>> unnecessary patronization.
>>
>>
>>
>> Mich Talebzadeh,
>> Dad | Technologist | Solutions Architect | Engineer
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sun, 24 Dec 2023 at 18:27, Nicholas Chammas <
>> nicholas.cham...@gmail.com> wrote:
>>
>>> This is a user-list question, not a dev-list question. Moving this
>>> conversation to the user list and BCC-ing the dev list.
>>>
>>> Also, this statement
>>>
>>> > We are not validating against table or column existence.
>>>
>>> is not correct. When you call spark.sql(…), Spark will lookup the table
>>> references and fail with TABLE_OR_VIEW_NOT_FOUND if it cannot find them.
>>>
>>> Also, when you run DDL via spark.sql(…), Spark will actually run it. So
>>> spark.sql(“drop table my_table”) will actually drop my_table. It’s not a
>>> validation-only operation.
>>>
>>> This question of validating SQL is already discussed on Stack Overflow
>>> . You may find some useful
>>> tips there.
>>>
>>> Nick
>>>
>>>
>>> On Dec 24, 2023, at 4:52 AM, Mich Talebzadeh 
>>> wrote:
>>>
>>>
>>> Yes, you can validate the syntax of your PySpark SQL queries without
>>> connecting to an actual dataset or running the queries on a cluster.
>>> PySpark provides a method for syntax validation without executing the
>>> query. Something like below
>>>   __
>>>  / __/__  ___ _/ /__
>>> _\ \/ _ \/ _ `/ __/  '_/
>>>/__ / .__/\_,_/_/ /_/\_\   version 3.4.0
>>>   /_/
>>>
>>> Using Python version 3.9.16 (main, Apr 24 2023 10:36:11)
>>> Spark context Web UI available at http://rhes75:4040
>>> Spark context available as 'sc' (master = local[*], app id =
>>> local-1703410019374).
>>> SparkSession available as 'spark'.
>>> >>> from pyspark.sql import SparkSession
>>> >>> spark = SparkSession.builder.appName("validate").getOrCreate()
>>> 23/12/24 09:28:02 WARN SparkSession: Using an existing Spark session;
>>> only runtime SQL configurations will take effect.
>>> >>> sql = "SELECT * FROM  WHERE  = some value"
>>> >>> try:
>>> ...   spark.sql(sql)
>>> ...   print("is working")
>>> ... except Exception as e:
>>> ...   print(f"Syntax error: {e}")
>>> ...
>>> Syntax error:
>>> [PARSE_SYNTAX_ERROR] Syntax error at or near '<'.(line 1, pos 14)
>>>
>>> == SQL ==
>>> SELECT * FROM  WHERE  = some value
>>> --^^^
>>>
>>> Here we only check for syntax errors and not the actual existence of
>>> query semantics. We are not validating against table or column existence.
>>>
>>> This method is useful when you want to catch obvious syntax errors
>>> before submitting your PySpark job to a cluster, especially when you don't
>>> have access to the actual data.
>>>
>>> In summary
>>>
>>>- Theis method validates syntax but will not catch semantic errors
>>>- If you need more comprehensive validation, consider using a
>>>testing framework and a small dataset.
>>>- For complex queries, using a linter or code analysis tool can help
>>>identify potential issues.
>>>
>>> HTH
>>>
>>>
>>> Mich Talebzadeh,
>>> Dad | Technologist | Solutions Architect | Engineer
>>> London
>>> United Kingdom
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Sun, 24 Dec 2023 at 07:57, ram manickam  wrote:
>>>
 Hello,
 Is 

Re: Validate spark sql

2023-12-25 Thread Bjørn Jørgensen
Mailing lists

For broad, opinion based, ask for external resources, debug issues, bugs,
contributing to the project, and scenarios, it is recommended you use the
user@spark.apache.org mailing list.

   - user@spark.apache.org
    is for usage
   questions, help, and announcements. (subscribe)
   

(unsubscribe)
   

(archives) 
   - d...@spark.apache.org
    is for people
   who want to contribute code to Spark. (subscribe)
   

(unsubscribe)
   

(archives) 



man. 25. des. 2023 kl. 04:58 skrev Mich Talebzadeh <
mich.talebza...@gmail.com>:

> Well not to put too finer point on it, in a public forum, one ought to
> respect the importance of open communication. Everyone has the right to ask
> questions, seek information, and engage in discussions without facing
> unnecessary patronization.
>
>
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 24 Dec 2023 at 18:27, Nicholas Chammas 
> wrote:
>
>> This is a user-list question, not a dev-list question. Moving this
>> conversation to the user list and BCC-ing the dev list.
>>
>> Also, this statement
>>
>> > We are not validating against table or column existence.
>>
>> is not correct. When you call spark.sql(…), Spark will lookup the table
>> references and fail with TABLE_OR_VIEW_NOT_FOUND if it cannot find them.
>>
>> Also, when you run DDL via spark.sql(…), Spark will actually run it. So
>> spark.sql(“drop table my_table”) will actually drop my_table. It’s not a
>> validation-only operation.
>>
>> This question of validating SQL is already discussed on Stack Overflow
>> . You may find some useful
>> tips there.
>>
>> Nick
>>
>>
>> On Dec 24, 2023, at 4:52 AM, Mich Talebzadeh 
>> wrote:
>>
>>
>> Yes, you can validate the syntax of your PySpark SQL queries without
>> connecting to an actual dataset or running the queries on a cluster.
>> PySpark provides a method for syntax validation without executing the
>> query. Something like below
>>   __
>>  / __/__  ___ _/ /__
>> _\ \/ _ \/ _ `/ __/  '_/
>>/__ / .__/\_,_/_/ /_/\_\   version 3.4.0
>>   /_/
>>
>> Using Python version 3.9.16 (main, Apr 24 2023 10:36:11)
>> Spark context Web UI available at http://rhes75:4040
>> Spark context available as 'sc' (master = local[*], app id =
>> local-1703410019374).
>> SparkSession available as 'spark'.
>> >>> from pyspark.sql import SparkSession
>> >>> spark = SparkSession.builder.appName("validate").getOrCreate()
>> 23/12/24 09:28:02 WARN SparkSession: Using an existing Spark session;
>> only runtime SQL configurations will take effect.
>> >>> sql = "SELECT * FROM  WHERE  = some value"
>> >>> try:
>> ...   spark.sql(sql)
>> ...   print("is working")
>> ... except Exception as e:
>> ...   print(f"Syntax error: {e}")
>> ...
>> Syntax error:
>> [PARSE_SYNTAX_ERROR] Syntax error at or near '<'.(line 1, pos 14)
>>
>> == SQL ==
>> SELECT * FROM  WHERE  = some value
>> --^^^
>>
>> Here we only check for syntax errors and not the actual existence of
>> query semantics. We are not validating against table or column existence.
>>
>> This method is useful when you want to catch obvious syntax errors before
>> submitting your PySpark job to a cluster, especially when you don't have
>> access to the actual data.
>>
>> In summary
>>
>>- Theis method validates syntax but will not catch semantic errors
>>- If you need more comprehensive validation, consider using a testing
>>framework and a small dataset.
>>- For complex queries, using a linter or code analysis tool can help
>>identify potential issues.
>>
>> HTH
>>
>>
>> Mich Talebzadeh,
>> Dad | Technologist | Solutions Architect | Engineer
>> London
>> United Kingdom
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from 

回复:Validate spark sql

2023-12-25 Thread tianlangstudio
What about EXPLAIN?
https://spark.apache.org/docs/3.5.0/sql-ref-syntax-qry-explain.html#content 

 
 Fusion Zhu 
--
发件人:ram manickam 
发送时间:2023年12月25日(星期一) 12:58
收件人:Mich Talebzadeh
抄 送:Nicholas Chammas; user
主 题:Re: Validate spark sql
Thanks Mich, Nicholas. I tried looking over the stack overflow post and none of 
them
Seems to cover the syntax validation. Do you know if it's even possible to do 
syntax validation in spark?
Thanks
Ram
On Sun, Dec 24, 2023 at 12:49 PM Mich Talebzadeh mailto:mich.talebza...@gmail.com >> wrote:
Well not to put too finer point on it, in a public forum, one ought to respect 
the importance of open communication. Everyone has the right to ask questions, 
seek information, and engage in discussions without facing unnecessary 
patronization.
Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom
view my Linkedin profile 

https://en.everybodywiki.com/Mich_Talebzadeh 

Disclaimer: Use it at your own risk.Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction. 
On Sun, 24 Dec 2023 at 18:27, Nicholas Chammas mailto:nicholas.cham...@gmail.com >> wrote:
This is a user-list question, not a dev-list question. Moving this conversation 
to the user list and BCC-ing the dev list.
Also, this statement
> We are not validating against table or column existence.
is not correct. When you call spark.sql(…), Spark will lookup the table 
references and fail with TABLE_OR_VIEW_NOT_FOUND if it cannot find them.
Also, when you run DDL via spark.sql(…), Spark will actually run it. So 
spark.sql(“drop table my_table”) will actually drop my_table. It’s not a 
validation-only operation.
This question of validating SQL is already discussed on Stack Overflow 
. You may find some useful tips 
there.
Nick
On Dec 24, 2023, at 4:52 AM, Mich Talebzadeh mailto:mich.talebza...@gmail.com >> wrote:
Yes, you can validate the syntax of your PySpark SQL queries without connecting 
to an actual dataset or running the queries on a cluster. 
PySpark provides a method for syntax validation without executing the query. 
Something like below
 __
 / __/__ ___ _/ /__
 _\ \/ _ \/ _ `/ __/ '_/
 /__ / .__/\_,_/_/ /_/\_\ version 3.4.0
 /_/
Using Python version 3.9.16 (main, Apr 24 2023 10:36:11)
Spark context Web UI available at http://rhes75:4040 
Spark context available as 'sc' (master = local[*], app id = 
local-1703410019374).
SparkSession available as 'spark'.
>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.appName("validate").getOrCreate()
23/12/24 09:28:02 WARN SparkSession: Using an existing Spark session; only 
runtime SQL configurations will take effect.
>>> sql = "SELECT * FROM  WHERE  = some value"
>>> try:
... spark.sql(sql)
... print("is working")
... except Exception as e:
... print(f"Syntax error: {e}")
...
Syntax error:
[PARSE_SYNTAX_ERROR] Syntax error at or near '<'.(line 1, pos 14)
== SQL ==
SELECT * FROM  WHERE  = some value
--^^^
Here we only check for syntax errors and not the actual existence of query 
semantics. We are not validating against table or column existence.
This method is useful when you want to catch obvious syntax errors before 
submitting your PySpark job to a cluster, especially when you don't have access 
to the actual data.
In summary

 * 
Theis method validates syntax but will not catch semantic errors

 * 
If you need more comprehensive validation, consider using a testing framework 
and a small dataset.

 * 
For complex queries, using a linter or code analysis tool can help identify 
potential issues.
HTH
Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom
view my Linkedin profile 

https://en.everybodywiki.com/Mich_Talebzadeh 

Disclaimer: Use it at your own risk.Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction. 
On Sun, 24 Dec 2023 at 07:57, ram manickam mailto:ramsidm...@gmail.com >> wrote:
Hello,
Is there a way to validate pyspark sql to validate only syntax errors?. I 
cannot 

Re: Validate spark sql

2023-12-24 Thread ram manickam
Thanks Mich, Nicholas. I tried looking over the stack overflow post and
none of them
Seems to cover the syntax validation. Do you know if it's even possible to
do syntax validation in spark?

Thanks
Ram

On Sun, Dec 24, 2023 at 12:49 PM Mich Talebzadeh 
wrote:

> Well not to put too finer point on it, in a public forum, one ought to
> respect the importance of open communication. Everyone has the right to ask
> questions, seek information, and engage in discussions without facing
> unnecessary patronization.
>
>
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 24 Dec 2023 at 18:27, Nicholas Chammas 
> wrote:
>
>> This is a user-list question, not a dev-list question. Moving this
>> conversation to the user list and BCC-ing the dev list.
>>
>> Also, this statement
>>
>> > We are not validating against table or column existence.
>>
>> is not correct. When you call spark.sql(…), Spark will lookup the table
>> references and fail with TABLE_OR_VIEW_NOT_FOUND if it cannot find them.
>>
>> Also, when you run DDL via spark.sql(…), Spark will actually run it. So
>> spark.sql(“drop table my_table”) will actually drop my_table. It’s not a
>> validation-only operation.
>>
>> This question of validating SQL is already discussed on Stack Overflow
>> . You may find some useful
>> tips there.
>>
>> Nick
>>
>>
>> On Dec 24, 2023, at 4:52 AM, Mich Talebzadeh 
>> wrote:
>>
>>
>> Yes, you can validate the syntax of your PySpark SQL queries without
>> connecting to an actual dataset or running the queries on a cluster.
>> PySpark provides a method for syntax validation without executing the
>> query. Something like below
>>   __
>>  / __/__  ___ _/ /__
>> _\ \/ _ \/ _ `/ __/  '_/
>>/__ / .__/\_,_/_/ /_/\_\   version 3.4.0
>>   /_/
>>
>> Using Python version 3.9.16 (main, Apr 24 2023 10:36:11)
>> Spark context Web UI available at http://rhes75:4040
>> Spark context available as 'sc' (master = local[*], app id =
>> local-1703410019374).
>> SparkSession available as 'spark'.
>> >>> from pyspark.sql import SparkSession
>> >>> spark = SparkSession.builder.appName("validate").getOrCreate()
>> 23/12/24 09:28:02 WARN SparkSession: Using an existing Spark session;
>> only runtime SQL configurations will take effect.
>> >>> sql = "SELECT * FROM  WHERE  = some value"
>> >>> try:
>> ...   spark.sql(sql)
>> ...   print("is working")
>> ... except Exception as e:
>> ...   print(f"Syntax error: {e}")
>> ...
>> Syntax error:
>> [PARSE_SYNTAX_ERROR] Syntax error at or near '<'.(line 1, pos 14)
>>
>> == SQL ==
>> SELECT * FROM  WHERE  = some value
>> --^^^
>>
>> Here we only check for syntax errors and not the actual existence of
>> query semantics. We are not validating against table or column existence.
>>
>> This method is useful when you want to catch obvious syntax errors before
>> submitting your PySpark job to a cluster, especially when you don't have
>> access to the actual data.
>>
>> In summary
>>
>>- Theis method validates syntax but will not catch semantic errors
>>- If you need more comprehensive validation, consider using a testing
>>framework and a small dataset.
>>- For complex queries, using a linter or code analysis tool can help
>>identify potential issues.
>>
>> HTH
>>
>>
>> Mich Talebzadeh,
>> Dad | Technologist | Solutions Architect | Engineer
>> London
>> United Kingdom
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sun, 24 Dec 2023 at 07:57, ram manickam  wrote:
>>
>>> Hello,
>>> Is there a way to validate pyspark sql to validate only syntax errors?.
>>> I cannot connect do actual data set to perform this validation.  Any
>>> help would be appreciated.
>>>
>>>
>>> Thanks
>>> Ram
>>>
>>
>>


Re: Validate spark sql

2023-12-24 Thread Mich Talebzadeh
Well not to put too finer point on it, in a public forum, one ought to
respect the importance of open communication. Everyone has the right to ask
questions, seek information, and engage in discussions without facing
unnecessary patronization.



Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 24 Dec 2023 at 18:27, Nicholas Chammas 
wrote:

> This is a user-list question, not a dev-list question. Moving this
> conversation to the user list and BCC-ing the dev list.
>
> Also, this statement
>
> > We are not validating against table or column existence.
>
> is not correct. When you call spark.sql(…), Spark will lookup the table
> references and fail with TABLE_OR_VIEW_NOT_FOUND if it cannot find them.
>
> Also, when you run DDL via spark.sql(…), Spark will actually run it. So
> spark.sql(“drop table my_table”) will actually drop my_table. It’s not a
> validation-only operation.
>
> This question of validating SQL is already discussed on Stack Overflow
> . You may find some useful
> tips there.
>
> Nick
>
>
> On Dec 24, 2023, at 4:52 AM, Mich Talebzadeh 
> wrote:
>
>
> Yes, you can validate the syntax of your PySpark SQL queries without
> connecting to an actual dataset or running the queries on a cluster.
> PySpark provides a method for syntax validation without executing the
> query. Something like below
>   __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/__ / .__/\_,_/_/ /_/\_\   version 3.4.0
>   /_/
>
> Using Python version 3.9.16 (main, Apr 24 2023 10:36:11)
> Spark context Web UI available at http://rhes75:4040
> Spark context available as 'sc' (master = local[*], app id =
> local-1703410019374).
> SparkSession available as 'spark'.
> >>> from pyspark.sql import SparkSession
> >>> spark = SparkSession.builder.appName("validate").getOrCreate()
> 23/12/24 09:28:02 WARN SparkSession: Using an existing Spark session; only
> runtime SQL configurations will take effect.
> >>> sql = "SELECT * FROM  WHERE  = some value"
> >>> try:
> ...   spark.sql(sql)
> ...   print("is working")
> ... except Exception as e:
> ...   print(f"Syntax error: {e}")
> ...
> Syntax error:
> [PARSE_SYNTAX_ERROR] Syntax error at or near '<'.(line 1, pos 14)
>
> == SQL ==
> SELECT * FROM  WHERE  = some value
> --^^^
>
> Here we only check for syntax errors and not the actual existence of query
> semantics. We are not validating against table or column existence.
>
> This method is useful when you want to catch obvious syntax errors before
> submitting your PySpark job to a cluster, especially when you don't have
> access to the actual data.
>
> In summary
>
>- Theis method validates syntax but will not catch semantic errors
>- If you need more comprehensive validation, consider using a testing
>framework and a small dataset.
>- For complex queries, using a linter or code analysis tool can help
>identify potential issues.
>
> HTH
>
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 24 Dec 2023 at 07:57, ram manickam  wrote:
>
>> Hello,
>> Is there a way to validate pyspark sql to validate only syntax errors?. I
>> cannot connect do actual data set to perform this validation.  Any
>> help would be appreciated.
>>
>>
>> Thanks
>> Ram
>>
>
>


Re: Validate spark sql

2023-12-24 Thread Nicholas Chammas
This is a user-list question, not a dev-list question. Moving this conversation 
to the user list and BCC-ing the dev list.

Also, this statement

> We are not validating against table or column existence.

is not correct. When you call spark.sql(…), Spark will lookup the table 
references and fail with TABLE_OR_VIEW_NOT_FOUND if it cannot find them.

Also, when you run DDL via spark.sql(…), Spark will actually run it. So 
spark.sql(“drop table my_table”) will actually drop my_table. It’s not a 
validation-only operation.

This question of validating SQL is already discussed on Stack Overflow 
. You may find some useful tips 
there.

Nick


> On Dec 24, 2023, at 4:52 AM, Mich Talebzadeh  
> wrote:
> 
>   
> Yes, you can validate the syntax of your PySpark SQL queries without 
> connecting to an actual dataset or running the queries on a cluster.
> PySpark provides a method for syntax validation without executing the query. 
> Something like below
>   __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/__ / .__/\_,_/_/ /_/\_\   version 3.4.0
>   /_/
> 
> Using Python version 3.9.16 (main, Apr 24 2023 10:36:11)
> Spark context Web UI available at http://rhes75:4040 
> Spark context available as 'sc' (master = local[*], app id = 
> local-1703410019374).
> SparkSession available as 'spark'.
> >>> from pyspark.sql import SparkSession
> >>> spark = SparkSession.builder.appName("validate").getOrCreate()
> 23/12/24 09:28:02 WARN SparkSession: Using an existing Spark session; only 
> runtime SQL configurations will take effect.
> >>> sql = "SELECT * FROM  WHERE  = some value"
> >>> try:
> ...   spark.sql(sql)
> ...   print("is working")
> ... except Exception as e:
> ...   print(f"Syntax error: {e}")
> ...
> Syntax error:
> [PARSE_SYNTAX_ERROR] Syntax error at or near '<'.(line 1, pos 14)
> 
> == SQL ==
> SELECT * FROM  WHERE  = some value
> --^^^
> 
> Here we only check for syntax errors and not the actual existence of query 
> semantics. We are not validating against table or column existence.
> 
> This method is useful when you want to catch obvious syntax errors before 
> submitting your PySpark job to a cluster, especially when you don't have 
> access to the actual data.
> In summary
> Theis method validates syntax but will not catch semantic errors
> If you need more comprehensive validation, consider using a testing framework 
> and a small dataset.
> For complex queries, using a linter or code analysis tool can help identify 
> potential issues.
> HTH
> 
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
> 
>view my Linkedin profile 
> 
> 
>  https://en.everybodywiki.com/Mich_Talebzadeh
> 
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
> On Sun, 24 Dec 2023 at 07:57, ram manickam  > wrote:
>> Hello,
>> Is there a way to validate pyspark sql to validate only syntax errors?. I 
>> cannot connect do actual data set to perform this validation.  Any help 
>> would be appreciated.
>> 
>> 
>> Thanks
>> Ram



Unsubscribe

2023-12-21 Thread yxj1141

Unsubscribe

India Scala & Big Data Job Referral

2023-12-21 Thread sri hari kali charan Tummala
Hi Community,

I was laid off from Apple in February 2023, which led to my relocation from
the USA due to immigration issues related to my H1B visa.


I have over 12 years of experience as a consultant in Big Data, Spark,
Scala, Python, and Flink.


Despite my move to India, I haven't secured a job yet. I am seeking
referrals within product firms (preferably non-consulting) in India that
work with Flink, Spark, Scala, Big Data, or in the fields of ML & AI. Can
someone assist me with this?

Thanks
Sri


About shuffle partition size

2023-12-20 Thread Nebi Aydin
Hi all,
What happens when # of unique join keys less than shuffle partitions?
Are we going to end up with lots of empty partitions?
If yes,is there any point to have shuffle partitions bigger than # of
unique join keys?


[ANNOUNCE] Apache Spark 3.3.4 released

2023-12-16 Thread Dongjoon Hyun
We are happy to announce the availability of Apache Spark 3.3.4!

Spark 3.3.4 is the last maintenance release based on the
branch-3.3 maintenance branch of Spark. It contains many fixes
including security and correctness domains. We strongly
recommend all 3.3 users to upgrade to this or higher stable release.

To download Spark 3.3.4, head over to the download page:
https://spark.apache.org/downloads.html

To view the release notes:
https://spark.apache.org/releases/spark-release-3-3-4.html

We would like to acknowledge all community members for contributing to
this release. This release would not have been possible without you.

Dongjoon Hyun


Unsubscribe

2023-12-16 Thread Andrew Milkowski



Re: Does Spark support role-based authentication and access to Amazon S3? (Kubernetes cluster deployment)

2023-12-15 Thread Mich Talebzadeh
Apologies Koert!

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 15 Dec 2023 at 14:12, Mich Talebzadeh 
wrote:

> Hi kurt,
>
> I read this document of yours. indeed interesting and pretty recent (9th
> Dec).
>
> I am more focused on GCP and GKE
> . But obviously the
> concepts are the same. One thing I noticed, there was a lack of mention
> of Workload Identity federation or eqivalent, which is the recommended way
> for workloads running on k8s  to access  Cloud services in a secure and
> manageable way. Specifically I quote "Workload Identity
> 
> allows workloads in your GKE clusters to impersonate Identity and Access
> Management (IAM) service accounts to access Google Cloud services."
>
> *Cheers*
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 14 Dec 2023 at 07:58, Koert Kuipers  wrote:
>
>> yes it does using IAM roles for service accounts.
>> see:
>>
>> https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts.html
>>
>> i wrote a little bit about this also here:
>> https://technotes.tresata.com/spark-on-k8s/
>>
>> On Wed, Dec 13, 2023 at 7:52 AM Atul Patil  wrote:
>>
>>> Hello Team,
>>>
>>>
>>>
>>> Does Spark support role-based authentication and access to Amazon S3 for
>>> Kubernetes deployment?
>>>
>>> *Note: we have deployed our spark application in the Kubernetes cluster.*
>>>
>>>
>>>
>>> Below are the Hadoop-AWS dependencies we are using:
>>>
>>> 
>>>org.apache.hadoop
>>>hadoop-aws
>>>3.3.4
>>> 
>>>
>>>
>>>
>>> We are using the following configuration when creating the spark
>>> session, but it is not working::
>>>
>>> sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.aws.credentials.provider",
>>> "org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider");
>>>
>>> sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.assumed.role.arn",
>>> System.getenv("AWS_ROLE_ARN"));
>>>
>>> sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.assumed.role.credentials.provider",
>>> "com.amazonaws.auth.WebIdentityTokenCredentialsProvider");
>>>
>>> sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.assumed.role.sts.endpoint",
>>> "s3.eu-central-1.amazonaws.com");
>>>
>>> sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.assumed.role.sts.endpoint.region",
>>> Regions.EU_CENTRAL_1.getName());
>>>
>>> sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.web.identity.token.file",
>>> System.getenv("AWS_WEB_IDENTITY_TOKEN_FILE"));
>>>
>>> sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.assumed.role.session.duration",
>>> "30m");
>>>
>>>
>>>
>>> Thank you!
>>>
>>>
>>>
>>> Regards,
>>>
>>> Atul
>>>
>>
>> CONFIDENTIALITY NOTICE: This electronic communication and any files
>> transmitted with it are confidential, privileged and intended solely for
>> the use of the individual or entity to whom they are addressed. If you are
>> not the intended recipient, you are hereby notified that any disclosure,
>> copying, distribution (electronic or otherwise) or forwarding of, or the
>> taking of any action in reliance on the contents of this transmission is
>> strictly prohibited. Please notify the sender immediately by e-mail if you
>> have received this email by mistake and delete this email from your system.
>>
>> Is it necessary to print this email? If you care about the environment
>> like we do, please refrain from printing emails. It helps to keep the
>> environment forested and litter-free.
>
>


Re: Does Spark support role-based authentication and access to Amazon S3? (Kubernetes cluster deployment)

2023-12-15 Thread Mich Talebzadeh
Hi kurt,

I read this document of yours. indeed interesting and pretty recent (9th
Dec).

I am more focused on GCP and GKE
. But obviously the
concepts are the same. One thing I noticed, there was a lack of mention
of Workload Identity federation or eqivalent, which is the recommended way
for workloads running on k8s  to access  Cloud services in a secure and
manageable way. Specifically I quote "Workload Identity

allows workloads in your GKE clusters to impersonate Identity and Access
Management (IAM) service accounts to access Google Cloud services."

*Cheers*

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 14 Dec 2023 at 07:58, Koert Kuipers  wrote:

> yes it does using IAM roles for service accounts.
> see:
>
> https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts.html
>
> i wrote a little bit about this also here:
> https://technotes.tresata.com/spark-on-k8s/
>
> On Wed, Dec 13, 2023 at 7:52 AM Atul Patil  wrote:
>
>> Hello Team,
>>
>>
>>
>> Does Spark support role-based authentication and access to Amazon S3 for
>> Kubernetes deployment?
>>
>> *Note: we have deployed our spark application in the Kubernetes cluster.*
>>
>>
>>
>> Below are the Hadoop-AWS dependencies we are using:
>>
>> 
>>org.apache.hadoop
>>hadoop-aws
>>3.3.4
>> 
>>
>>
>>
>> We are using the following configuration when creating the spark session,
>> but it is not working::
>>
>> sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.aws.credentials.provider",
>> "org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider");
>>
>> sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.assumed.role.arn",
>> System.getenv("AWS_ROLE_ARN"));
>>
>> sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.assumed.role.credentials.provider",
>> "com.amazonaws.auth.WebIdentityTokenCredentialsProvider");
>>
>> sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.assumed.role.sts.endpoint",
>> "s3.eu-central-1.amazonaws.com");
>>
>> sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.assumed.role.sts.endpoint.region",
>> Regions.EU_CENTRAL_1.getName());
>>
>> sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.web.identity.token.file",
>> System.getenv("AWS_WEB_IDENTITY_TOKEN_FILE"));
>>
>> sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.assumed.role.session.duration",
>> "30m");
>>
>>
>>
>> Thank you!
>>
>>
>>
>> Regards,
>>
>> Atul
>>
>
> CONFIDENTIALITY NOTICE: This electronic communication and any files
> transmitted with it are confidential, privileged and intended solely for
> the use of the individual or entity to whom they are addressed. If you are
> not the intended recipient, you are hereby notified that any disclosure,
> copying, distribution (electronic or otherwise) or forwarding of, or the
> taking of any action in reliance on the contents of this transmission is
> strictly prohibited. Please notify the sender immediately by e-mail if you
> have received this email by mistake and delete this email from your system.
>
> Is it necessary to print this email? If you care about the environment
> like we do, please refrain from printing emails. It helps to keep the
> environment forested and litter-free.


Re: Architecture of Spark Connect

2023-12-14 Thread Hyukjin Kwon
By default for now, yes. One Spark Connect server handles multiple Spark
Sessions. To multiplex or run multiple Drivers, you need some work such as
gateway.

On Thu, 14 Dec 2023 at 12:03, Kezhi Xiong  wrote:

> Hi,
>
> My understanding is there is only one driver/spark context for all user
> sessions. When you run the bin/start-connect-server script, you are
> submitting one long standing spark job / application. Every time a new user
> request comes in, a new user session is created under that. Please correct
> me if I am wrong.
>
> Kezhi
>
> On Thu, Dec 14, 2023 at 10:35 AM Nikhil Goyal  wrote:
>
>> [ External sender. Exercise caution. ]
>>
>> If multiple applications are running, we would need multiple spark
>> connect servers? If so, is the user responsible for creating these servers
>> or they are just created on the fly when the user requests a new spark
>> session?
>>
>> On Thu, Dec 14, 2023 at 10:28 AM Nikhil Goyal 
>> wrote:
>>
>>> Hi folks,
>>> I am trying to understand one question. Does Spark Connect create a new
>>> driver in the backend for every user or there are a fixed number of drivers
>>> running to which requests are sent to?
>>>
>>> Thanks
>>> Nikhil
>>>
>>


Re: Architecture of Spark Connect

2023-12-14 Thread Kezhi Xiong
Hi,

My understanding is there is only one driver/spark context for all user
sessions. When you run the bin/start-connect-server script, you are
submitting one long standing spark job / application. Every time a new user
request comes in, a new user session is created under that. Please correct
me if I am wrong.

Kezhi

On Thu, Dec 14, 2023 at 10:35 AM Nikhil Goyal  wrote:

> [ External sender. Exercise caution. ]
>
> If multiple applications are running, we would need multiple spark connect
> servers? If so, is the user responsible for creating these servers or they
> are just created on the fly when the user requests a new spark session?
>
> On Thu, Dec 14, 2023 at 10:28 AM Nikhil Goyal  wrote:
>
>> Hi folks,
>> I am trying to understand one question. Does Spark Connect create a new
>> driver in the backend for every user or there are a fixed number of drivers
>> running to which requests are sent to?
>>
>> Thanks
>> Nikhil
>>
>


Re: Architecture of Spark Connect

2023-12-14 Thread Nikhil Goyal
If multiple applications are running, we would need multiple spark connect
servers? If so, is the user responsible for creating these servers or they
are just created on the fly when the user requests a new spark session?

On Thu, Dec 14, 2023 at 10:28 AM Nikhil Goyal  wrote:

> Hi folks,
> I am trying to understand one question. Does Spark Connect create a new
> driver in the backend for every user or there are a fixed number of drivers
> running to which requests are sent to?
>
> Thanks
> Nikhil
>


Architecture of Spark Connect

2023-12-14 Thread Nikhil Goyal
Hi folks,
I am trying to understand one question. Does Spark Connect create a new
driver in the backend for every user or there are a fixed number of drivers
running to which requests are sent to?

Thanks
Nikhil


Re: Does Spark support role-based authentication and access to Amazon S3? (Kubernetes cluster deployment)

2023-12-13 Thread Koert Kuipers
yes it does using IAM roles for service accounts.
see:
https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts.html

i wrote a little bit about this also here:
https://technotes.tresata.com/spark-on-k8s/

On Wed, Dec 13, 2023 at 7:52 AM Atul Patil  wrote:

> Hello Team,
>
>
>
> Does Spark support role-based authentication and access to Amazon S3 for
> Kubernetes deployment?
>
> *Note: we have deployed our spark application in the Kubernetes cluster.*
>
>
>
> Below are the Hadoop-AWS dependencies we are using:
>
> 
>org.apache.hadoop
>hadoop-aws
>3.3.4
> 
>
>
>
> We are using the following configuration when creating the spark session,
> but it is not working::
>
> sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.aws.credentials.provider",
> "org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider");
>
> sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.assumed.role.arn",
> System.getenv("AWS_ROLE_ARN"));
>
> sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.assumed.role.credentials.provider",
> "com.amazonaws.auth.WebIdentityTokenCredentialsProvider");
>
> sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.assumed.role.sts.endpoint",
> "s3.eu-central-1.amazonaws.com");
>
> sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.assumed.role.sts.endpoint.region",
> Regions.EU_CENTRAL_1.getName());
>
> sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.web.identity.token.file",
> System.getenv("AWS_WEB_IDENTITY_TOKEN_FILE"));
>
> sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.assumed.role.session.duration",
> "30m");
>
>
>
> Thank you!
>
>
>
> Regards,
>
> Atul
>

-- 
CONFIDENTIALITY NOTICE: This electronic communication and any files 
transmitted with it are confidential, privileged and intended solely for 
the use of the individual or entity to whom they are addressed. If you are 
not the intended recipient, you are hereby notified that any disclosure, 
copying, distribution (electronic or otherwise) or forwarding of, or the 
taking of any action in reliance on the contents of this transmission is 
strictly prohibited. Please notify the sender immediately by e-mail if you 
have received this email by mistake and delete this email from your system.


Is it necessary to print this email? If you care about the environment 
like we do, please refrain from printing emails. It helps to keep the 
environment forested and litter-free.


Unsubscribe

2023-12-13 Thread kritika jain



Does Spark support role-based authentication and access to Amazon S3? (Kubernetes cluster deployment)

2023-12-13 Thread Atul Patil
Hello Team,



Does Spark support role-based authentication and access to Amazon S3 for
Kubernetes deployment?

*Note: we have deployed our spark application in the Kubernetes cluster.*



Below are the Hadoop-AWS dependencies we are using:


   org.apache.hadoop
   hadoop-aws
   3.3.4




We are using the following configuration when creating the spark session,
but it is not working::

sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.aws.credentials.provider",
"org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider");

sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.assumed.role.arn",
System.getenv("AWS_ROLE_ARN"));

sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.assumed.role.credentials.provider",
"com.amazonaws.auth.WebIdentityTokenCredentialsProvider");

sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.assumed.role.sts.endpoint",
"s3.eu-central-1.amazonaws.com");

sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.assumed.role.sts.endpoint.region",
Regions.EU_CENTRAL_1.getName());

sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.web.identity.token.file",
System.getenv("AWS_WEB_IDENTITY_TOKEN_FILE"));

sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.assumed.role.session.duration",
"30m");



Thank you!



Regards,

Atul


Does Spark support role-based authentication and access to Amazon S3? (Kubernetes cluster deployment)

2023-12-13 Thread Patil, Atul
Hello Team,

Does Spark support role-based authentication and access to Amazon S3 for 
Kubernetes deployment?
Note: we have deployed our spark application in the Kubernetes cluster.

Below are the Hadoop-AWS dependencies we are using:

   org.apache.hadoop
   hadoop-aws
   3.3.4


We are using the following configuration when creating the spark session, but 
it is not working::
sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.aws.credentials.provider",
 "org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider");
sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.assumed.role.arn",
 System.getenv("AWS_ROLE_ARN"));
sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.assumed.role.credentials.provider",
 "com.amazonaws.auth.WebIdentityTokenCredentialsProvider");
sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.assumed.role.sts.endpoint",
 "s3.eu-central-1.amazonaws.com");
sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.assumed.role.sts.endpoint.region",
 Regions.EU_CENTRAL_1.getName());
sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.web.identity.token.file",
 System.getenv("AWS_WEB_IDENTITY_TOKEN_FILE"));
sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.assumed.role.session.duration",
 "30m");

Thank you!

Regards,
Atul



Unsubscribe

2023-12-12 Thread Daniel Maangi



Unsubscribe

2023-12-12 Thread Klaus Schaefers
-- 
“Overfitting” is not about an excessive amount of physical exercise...


Unsubscribe

2023-12-12 Thread Sergey Boytsov
Unsubscribe

--


Re: Cluster-mode job compute-time/cost metrics

2023-12-12 Thread murat migdisoglu
Hey Jack,

Emr serverless is a great fit for this. You can get these metrics for each
job when they are completed. Besides that, if you create separate "emr
applications" per group and tag them appropriately, you can use the cost
explorer to see the amount of resources being used.
If emr serverless is not an option, I would probably work with Spark Event
Listeners and collect the metrics during runtime and publish them to a
monitoring tool such as grafana/datadog.


On Tue, Dec 12, 2023 at 9:31 AM Jörn Franke  wrote:

> It could be simpler and faster to use tagging of resources for billing:
>
>
> https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-tags-billing.html
>
> That could also include other resources (eg s3).
>
> Am 12.12.2023 um 04:47 schrieb Jack Wells :
>
> 
> Hello Spark experts - I’m running Spark jobs in cluster mode using a
> dedicated cluster for each job. Is there a way to see how much compute time
> each job takes via Spark APIs, metrics, etc.? In case it makes a
> difference, I’m using AWS EMR - I’d ultimately like to be able to say this
> job costs $X since it took Y minutes on Z instance types (assuming all of
> the nodes are the same instance type), but I figure I could probably need
> to get the Z instance type through EMR APIs.
>
> Thanks!
> Jack
>
>

-- 
"Talkers aren’t good doers. Rest assured that we’re going there to use our
hands, not our tongues."
W. Shakespeare


Re: Cluster-mode job compute-time/cost metrics

2023-12-12 Thread Jörn Franke
It could be simpler and faster to use tagging of resources for billing:

https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-tags-billing.html

That could also include other resources (eg s3).

> Am 12.12.2023 um 04:47 schrieb Jack Wells :
> 
> 
> Hello Spark experts - I’m running Spark jobs in cluster mode using a 
> dedicated cluster for each job. Is there a way to see how much compute time 
> each job takes via Spark APIs, metrics, etc.? In case it makes a difference, 
> I’m using AWS EMR - I’d ultimately like to be able to say this job costs $X 
> since it took Y minutes on Z instance types (assuming all of the nodes are 
> the same instance type), but I figure I could probably need to get the Z 
> instance type through EMR APIs.
> 
> Thanks!
> Jack
> 


Cluster-mode job compute-time/cost metrics

2023-12-11 Thread Jack Wells
Hello Spark experts - I’m running Spark jobs in cluster mode using a
dedicated cluster for each job. Is there a way to see how much compute time
each job takes via Spark APIs, metrics, etc.? In case it makes a
difference, I’m using AWS EMR - I’d ultimately like to be able to say this
job costs $X since it took Y minutes on Z instance types (assuming all of
the nodes are the same instance type), but I figure I could probably need
to get the Z instance type through EMR APIs.

Thanks!
Jack


Unsubscribe

2023-12-11 Thread 18706753459
Unsubscribe

Unsubscribe

2023-12-11 Thread Dusty Williams
Unsubscribe


unsubscribe

2023-12-11 Thread Stevens, Clay
unsubscribe



Spark 3.1.3 with Hive dynamic partitions fails while driver moves the staged files

2023-12-11 Thread Shay Elbaz
Hi all,

Running on Dataproc 2.0/1.3/1.4, we use INSERT INTO OVERWRITE command to insert 
new (time) partitions into existing Hive tables. But we see too many failures 
coming from org.apache.hadoop.hive.ql.metadata.Hive.replaceFiles. This is where 
the driver moves the successful files from staging to final directory.
For some reason, the underlying FS implementation - GoogleCloudStorageImpl in 
this case - fails to move at least one file and the exception is propagated all 
the way through. We see many different failures - from 
hadoop.FileSystem.mkdirs, rename, etc., all coming from Hive.replaceFiles().
I guess FS failures are expected, but nowhere in 
org.apache.hadoop.hive.ql.metadata.Hive.loadDynamicPartitions there is 
try/catch/retry mechanism. Is it expected that the FS implementation do it?
Because the GCS connector does not seem to do that :)
We ended up patching and rebuilding hive-exec jar as an immediate mitigation 
(try/catch/retry), while our platform teams are reaching out to GCP support.

I know this is more of a Hive issue rather than Spark, but still I wonder if 
anybody has encountered this issue, or similar?

Thanks,
Shay







<    1   2   3   4   5   6   7   8   9   10   >