Re: DataStreamReader cleanSource option

2022-02-03 Thread Jungtaek Lim
Hi,

Could you please set the config
"spark.sql.streaming.fileSource.cleaner.numThreads"
to 0 and see whether it works? (NOTE: will slow down your process since the
cleaning phase will happen in the foreground. The default is background
with 1 thread. You can try out more threads than 1.)
If it doesn't help, please turn on the DEBUG log level for the package
"org.apache.spark.sql.execution.streaming"
and grep the log messages from SourceFileArchiver & SourceFileRemover.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Thu, Jan 27, 2022 at 9:56 PM Gabriela Dvořáková
 wrote:

> Hi,
>
> I am writing to ask for advice regarding the cleanSource option of the
> DataStreamReader. I am using pyspark with Spark 3.1. via Azure Synapse. To
> my knowledge, cleanSource option was introduced in Spark version 3. I'd
> spent a significant amount of time trying to configure this option with
> both "archive" and "delete" options, but the streaming seems to only
> process data in the source data lake storage account container, and store
> them in the sink storage account data lake container. The archive folder is
> never created nor any of the source processed files are removed. None of
> the forums or stackoverflow have been of any help so far, so I am reaching
> out to you if you perhaps have any tips on how to get it running? Here is
> my code:
>
> Reading:
> df = (spark
> .readStream
> .option("sourceArchiveDir", f
> 'abfss://{TRANSIENT_DATA_LAKE_CONTAINER_NAME}@{DATA_LAKE_ACCOUNT_NAME}.
> dfs.core.windows.net/budget-app/budgetOutput/archived-v5')
> .option("cleanSource", "archive")
> .format('json')
> .schema(schema)
> .load(TRANSIENT_DATA_LAKE_PATH))
> --
>
> ...Processing...
>
> Writing:
> (
> df.writeStream
> .format("delta")
> .outputMode('append')
> .option("checkpointLocation", RAW_DATA_LAKE_CHECKPOINT_PATH)
> .trigger(once=True)
> .partitionBy("Year", "Month", "clientId")
> .start(RAW_DATA_LAKE_PATH)
> .awaitTermination()
> )
>
> Thank you very much for help,
> Gabriela
>
> _
>
> Med venlig hilsen / Best regards
>
> Gabriela Dvořáková
>
> Developer | monthio
>
> M: +421902480757
>
> E: gabri...@monthio.com
>
> W: www.monthio.com
>
> Monthio Aps, Ragnagade 7, 2100 Copenhagen
>
>
> Create personal wealth and healthy economy
>
> for people by changing the ways of banking"
>
>


Re: DataStreamReader cleanSource option

2022-01-27 Thread Mich Talebzadeh
Hi Gabriela,

I don't know about data lake but this is about Spark Structured Streaming.
Have both readStream and writeStream working OK, for example can you do
df.printSchema() after read?

It is advisable to wrap the logic inside try: This is an example of
wrapping it
data_path = "file:///mnt/gs/prices/data/"
checkpoint_path = "file:///mnt/gs/prices/chkpt/"
try:
streamingDataFrame = self.spark \
.readStream \
.option('newFilesOnly', 'true') \
.option('header', 'true') \
.option('maxFilesPerTrigger', 1000) \
.option('latestFirst', 'false') \
.text(data_path) \
.select(from_json(col("value").cast("string"),
schema).alias("parsed_value"))


streamingDataFrame.printSchema()
result = streamingDataFrame.select( \
 col("parsed_value.rowkey").alias("rowkey") \
   , col("parsed_value.ticker").alias("ticker") \
   , col("parsed_value.timeissued").alias("timeissued") \
   , col("parsed_value.price").alias("price")). \
 writeStream. \
 outputMode('append'). \
 option("truncate", "false"). \
 foreachBatch(sendToSink). \
 queryName('trailFiles'). \
 trigger(once = True). \
 option('checkpointLocation', checkpoint_path). \
 start(data_path)

except Exception as e:
print(f"""{e}, quitting""")
sys.exit(1)
  result.awaitTermination()


def sendToSink(df, batchId):
if(len(df.take(1))) > 0:
print(f"""batchId is {batchId}""")
rows = df.count()
print(f""" Total records processed in this run = {rows}""")
   else:
print("DataFrame is empty")

HTH


   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 27 Jan 2022 at 12:57, Gabriela Dvořáková
 wrote:

> Hi,
>
> I am writing to ask for advice regarding the cleanSource option of the
> DataStreamReader. I am using pyspark with Spark 3.1. via Azure Synapse. To
> my knowledge, cleanSource option was introduced in Spark version 3. I'd
> spent a significant amount of time trying to configure this option with
> both "archive" and "delete" options, but the streaming seems to only
> process data in the source data lake storage account container, and store
> them in the sink storage account data lake container. The archive folder is
> never created nor any of the source processed files are removed. None of
> the forums or stackoverflow have been of any help so far, so I am reaching
> out to you if you perhaps have any tips on how to get it running? Here is
> my code:
>
> Reading:
> df = (spark
> .readStream
> .option("sourceArchiveDir", f
> 'abfss://{TRANSIENT_DATA_LAKE_CONTAINER_NAME}@{DATA_LAKE_ACCOUNT_NAME}.
> dfs.core.windows.net/budget-app/budgetOutput/archived-v5')
> .option("cleanSource", "archive")
> .format('json')
> .schema(schema)
> .load(TRANSIENT_DATA_LAKE_PATH))
> --
>
> ...Processing...
>
> Writing:
> (
> df.writeStream
> .format("delta")
> .outputMode('append')
> .option("checkpointLocation", RAW_DATA_LAKE_CHECKPOINT_PATH)
> .trigger(once=True)
> .partitionBy("Year", "Month", "clientId")
> .start(RAW_DATA_LAKE_PATH)
> .awaitTermination()
> )
>
> Thank you very much for help,
> Gabriela
>
> _
>
> Med venlig hilsen / Best regards
>
> Gabriela Dvořáková
>
> Developer | monthio
>
> M: +421902480757
>
> E: gabri...@monthio.com
>
> W: www.monthio.com
>
> Monthio Aps, Ragnagade 7, 2100 Copenhagen
>
>
> Create personal wealth and healthy economy
>
> for people by changing the ways of banking"
>
>


DataStreamReader cleanSource option

2022-01-27 Thread Gabriela Dvořáková
Hi,

I am writing to ask for advice regarding the cleanSource option of the
DataStreamReader. I am using pyspark with Spark 3.1. via Azure Synapse. To
my knowledge, cleanSource option was introduced in Spark version 3. I'd
spent a significant amount of time trying to configure this option with
both "archive" and "delete" options, but the streaming seems to only
process data in the source data lake storage account container, and store
them in the sink storage account data lake container. The archive folder is
never created nor any of the source processed files are removed. None of
the forums or stackoverflow have been of any help so far, so I am reaching
out to you if you perhaps have any tips on how to get it running? Here is
my code:

Reading:
df = (spark
.readStream
.option("sourceArchiveDir", f'abfss://{TRANSIENT_DATA_LAKE_CONTAINER_NAME}@
{DATA_LAKE_ACCOUNT_NAME}.
dfs.core.windows.net/budget-app/budgetOutput/archived-v5')
.option("cleanSource", "archive")
.format('json')
.schema(schema)
.load(TRANSIENT_DATA_LAKE_PATH))
-- 

...Processing...

Writing:
(
df.writeStream
.format("delta")
.outputMode('append')
.option("checkpointLocation", RAW_DATA_LAKE_CHECKPOINT_PATH)
.trigger(once=True)
.partitionBy("Year", "Month", "clientId")
.start(RAW_DATA_LAKE_PATH)
.awaitTermination()
)

Thank you very much for help,
Gabriela

_

Med venlig hilsen / Best regards

Gabriela Dvořáková

Developer | monthio

M: +421902480757

E: gabri...@monthio.com

W: www.monthio.com

Monthio Aps, Ragnagade 7, 2100 Copenhagen


Create personal wealth and healthy economy

for people by changing the ways of banking"