Re: configuring .sparkStaging with group rwx

2021-02-25 Thread Yuri Oleynikov (‫יורי אולייניקוב‬‎)
Spark-submit --conf spark.hadoop.fs.permissions.umask-mode=007
You may also set sticky bit on staging dir 

Sent from my iPhone

> On 26 Feb 2021, at 03:29, Bulldog20630405  wrote:
> 
> 
> 
> we have a spark cluster running on with multiple users...
> when running with the user owning the cluster jobs run fine... however, when 
> trying to run pyspark with a different user it fails because the 
> .sparkStaging/application_* is written with 700 so the user cannot write to 
> that directory
> 
> how to configure spark/yarn cluster so .sparkStaging is written as 770 
> instead of 700 so the users with shared group can execute?
> 
> 


configuring .sparkStaging with group rwx

2021-02-25 Thread Bulldog20630405
we have a spark cluster running on with multiple users...
when running with the user owning the cluster jobs run fine... however,
when trying to run pyspark with a different user it fails because the
.sparkStaging/application_* is written with 700 so the user cannot write to
that directory

how to configure spark/yarn cluster so .sparkStaging is written as 770
instead of 700 so the users with shared group can execute?


Re: Structured streaming, Writing Kafka topic to BigQuery table, throws error

2021-02-25 Thread Mich Talebzadeh
Hi,

I managed to make mine work using the *foreachBatch function *in
writeStream.

"foreach" performs custom write logic on each row and "foreachBatch"
performs custom write logic on each micro-batch through SendToBigQuery
function here
 foreachBatch(SendToBigQuery) expects 2 parameters, first: micro-batch as
DataFrame or Dataset and second: unique id for each batch
 Using foreachBatch, we write each micro batch to storage defined in our
custom logic. In this case, we store the output of our streaming
application to Google BigQuery table.
 Note that we are appending data and column "rowkey" is defined as UUID so
it can be used as the primary key. batchId is just the counter
(monolithically increasing number).

This is my code:


from __future__ import print_function
from config import config
import sys
from sparkutils import sparkstuff as s
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StringType,IntegerType,
FloatType, TimestampType
from google.cloud import bigquery


def SendToBigQuery(df, batchId):

"""
Below uses standard Spark-BigQuery API to write to the table
Additional transformation logic will be performed here
"""
s.writeTableToBQ(df, "append",
config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])

class MDStreaming:
def __init__(self, spark_session,spark_context):
self.spark = spark_session
self.sc = spark_context
self.config = config

def fetch_data(self):
self.sc.setLogLevel("ERROR")
#{"rowkey":"c9289c6e-77f5-4a65-9dfb-d6b675d67cff","ticker":"MSFT",
"timeissued":"2021-02-23T08:42:23", "price":31.12}
schema = StructType().add("rowkey", StringType()).add("ticker",
StringType()).add("timeissued", TimestampType()).add("price", FloatType())
try:
# construct a streaming dataframe streamingDataFrame that
subscribes to topic config['MDVariables']['topic']) -> md (market data)
streamingDataFrame = self.spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers",
config['MDVariables']['bootstrapServers'],) \
.option("schema.registry.url",
config['MDVariables']['schemaRegistryURL']) \
.option("group.id", config['common']['appName']) \
.option("zookeeper.connection.timeout.ms",
config['MDVariables']['zookeeperConnectionTimeoutMs']) \
.option("rebalance.backoff.ms",
config['MDVariables']['rebalanceBackoffMS']) \
.option("zookeeper.session.timeout.ms",
config['MDVariables']['zookeeperSessionTimeOutMs']) \
.option("auto.commit.interval.ms",
config['MDVariables']['autoCommitIntervalMS']) \
.option("subscribe", config['MDVariables']['topic']) \
.option("failOnDataLoss", "false") \
.option("includeHeaders", "true") \
.option("startingOffsets", "latest") \
.load() \
.select(from_json(col("value").cast("string"),
schema).alias("parsed_value"))

#streamingDataFrame.printSchema()

"""
   "foreach" performs custom write logic on each row and
"foreachBatch" performs custom write logic on each micro-batch through
SendToBigQuery function
foreachBatch(SendToBigQuery) expects 2 parameters, first:
micro-batch as DataFrame or Dataset and second: unique id for each batch
   Using foreachBatch, we write each micro batch to storage
defined in our custom logic. In this case, we store the output of our
streaming application to Google BigQuery table.
   Note that we are appending data and column "rowkey" is
defined as UUID so it can be used as the primary key
"""
result = streamingDataFrame.select( \
 col("parsed_value.rowkey").alias("rowkey") \
   , col("parsed_value.ticker").alias("ticker") \
   , col("parsed_value.timeissued").alias("timeissued") \
   , col("parsed_value.price").alias("price")). \
 withColumn("currency",
lit(config['MDVariables']['currency'])). \
 withColumn("op_type",
lit(config['MDVariables']['op_type'])). \
 withColumn("op_time", current_timestamp()). \
 writeStream. \
  *   foreachBatch(SendToBigQuery). \*
 outputMode("update"). \
 start()
except Exception as e:
print(f"""{e}, quitting""")
sys.exit(1)


result.awaitTermination()

if __name__ == "__main__":
appName = config['common']['appName']
spark_session = s.spark_session(appName)
spark_session = s.setSparkConfBQ(spark_session)
spark_context = s.sparkcontext()
mdstreaming = MDStreaming(spark_session, spark_context)
streamingDataFrame = mds

Re: Structured Streaming With Kafka - processing each event

2021-02-25 Thread Mich Talebzadeh
Hi Sachit,

I managed to make mine work using the *foreachBatch function *in
writeStream.

"foreach" performs custom write logic on each row and "foreachBatch"
performs custom write logic on each micro-batch through SendToBigQuery
function here
 foreachBatch(SendToBigQuery) expects 2 parameters, first: micro-batch as
DataFrame or Dataset and second: unique id for each batch
 Using foreachBatch, we write each micro batch to storage defined in our
custom logic. In this case, we store the output of our streaming
application to Google BigQuery table.
 Note that we are appending data and column "rowkey" is defined as UUID so
it can be used as the primary key. batchId is just the counter
(monolithically increasing number).

This is my code:


from __future__ import print_function
from config import config
import sys
from sparkutils import sparkstuff as s
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StringType,IntegerType,
FloatType, TimestampType
from google.cloud import bigquery


def SendToBigQuery(df, batchId):

"""
Below uses standard Spark-BigQuery API to write to the table
Additional transformation logic will be performed here
"""
s.writeTableToBQ(df, "append",
config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])

class MDStreaming:
def __init__(self, spark_session,spark_context):
self.spark = spark_session
self.sc = spark_context
self.config = config

def fetch_data(self):
self.sc.setLogLevel("ERROR")
#{"rowkey":"c9289c6e-77f5-4a65-9dfb-d6b675d67cff","ticker":"MSFT",
"timeissued":"2021-02-23T08:42:23", "price":31.12}
schema = StructType().add("rowkey", StringType()).add("ticker",
StringType()).add("timeissued", TimestampType()).add("price", FloatType())
try:
# construct a streaming dataframe streamingDataFrame that
subscribes to topic config['MDVariables']['topic']) -> md (market data)
streamingDataFrame = self.spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers",
config['MDVariables']['bootstrapServers'],) \
.option("schema.registry.url",
config['MDVariables']['schemaRegistryURL']) \
.option("group.id", config['common']['appName']) \
.option("zookeeper.connection.timeout.ms",
config['MDVariables']['zookeeperConnectionTimeoutMs']) \
.option("rebalance.backoff.ms",
config['MDVariables']['rebalanceBackoffMS']) \
.option("zookeeper.session.timeout.ms",
config['MDVariables']['zookeeperSessionTimeOutMs']) \
.option("auto.commit.interval.ms",
config['MDVariables']['autoCommitIntervalMS']) \
.option("subscribe", config['MDVariables']['topic']) \
.option("failOnDataLoss", "false") \
.option("includeHeaders", "true") \
.option("startingOffsets", "latest") \
.load() \
.select(from_json(col("value").cast("string"),
schema).alias("parsed_value"))

#streamingDataFrame.printSchema()

"""
   "foreach" performs custom write logic on each row and
"foreachBatch" performs custom write logic on each micro-batch through
SendToBigQuery function
foreachBatch(SendToBigQuery) expects 2 parameters, first:
micro-batch as DataFrame or Dataset and second: unique id for each batch
   Using foreachBatch, we write each micro batch to storage
defined in our custom logic. In this case, we store the output of our
streaming application to Google BigQuery table.
   Note that we are appending data and column "rowkey" is
defined as UUID so it can be used as the primary key
"""
result = streamingDataFrame.select( \
 col("parsed_value.rowkey").alias("rowkey") \
   , col("parsed_value.ticker").alias("ticker") \
   , col("parsed_value.timeissued").alias("timeissued") \
   , col("parsed_value.price").alias("price")). \
 withColumn("currency",
lit(config['MDVariables']['currency'])). \
 withColumn("op_type",
lit(config['MDVariables']['op_type'])). \
 withColumn("op_time", current_timestamp()). \
 writeStream. \
  *   foreachBatch(SendToBigQuery). \*
 outputMode("update"). \
 start()
except Exception as e:
print(f"""{e}, quitting""")
sys.exit(1)


result.awaitTermination()

if __name__ == "__main__":
appName = config['common']['appName']
spark_session = s.spark_session(appName)
spark_session = s.setSparkConfBQ(spark_session)
spark_context = s.sparkcontext()
mdstreaming = MDStreaming(spark_session, spark_context)
streamingDataFram

Re: How to control count / size of output files for

2021-02-25 Thread Gourav Sengupta
Hi Ivan,

sorry but it always helps to know the version of SPARK you are using, its
environment, and the format that you are writing out your files to, and any
other details if possible.


Regards,
Gourav Sengupta

On Wed, Feb 24, 2021 at 3:43 PM Ivan Petrov  wrote:

> Hi, I'm trying to control the size and/or count of spark output.
>
> Here is my code. I expect to get 5 files  but I get dozens of small files.
> Why?
>
> dataset
> .repartition(5)
> .sort("long_repeated_string_in_this_column") // should be better
> compressed with snappy
> .write
> .parquet(outputPath)
>


Re: How to control count / size of output files for

2021-02-25 Thread Ivan Petrov
Ah... makes sense, thank you. i tried sortWithinPartition before and
replaced with sort. It was a mistake.

чт, 25 февр. 2021 г. в 15:25, Pietro Gentile <
pietro.gentile89.develo...@gmail.com>:

> Hi,
>
> It is because of *repartition* before the *sort* method invocation. If
> you reverse them you'll see 5 output files.
>
> Regards,
> Pietro
>
> Il giorno mer 24 feb 2021 alle ore 16:43 Ivan Petrov 
> ha scritto:
>
>> Hi, I'm trying to control the size and/or count of spark output.
>>
>> Here is my code. I expect to get 5 files  but I get dozens of small files.
>> Why?
>>
>> dataset
>> .repartition(5)
>> .sort("long_repeated_string_in_this_column") // should be better
>> compressed with snappy
>> .write
>> .parquet(outputPath)
>>
>


Aggregating large objects and reducing memory pressure

2021-02-25 Thread Augusto
Hi

I am writing here because I need help/advice on how to perform aggregations
more efficiently.

In my current setup I have a Accumulator object which is used as zeroValue
for the foldByKey function. This Accumulator object can get very large since
the accumulations also include lists and maps. This in turn demands that we
use larger and larger memory machines or risk breakdown of jobs due to out
of memory errors.

I have been thinking how to tackle this problem and try to come up with
solutions but I would like to hear what are the best practices or how others
have deal with this kind of problem. 

Best regards, 
Augusto



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Structured Streaming With Kafka - processing each event

2021-02-25 Thread Mich Talebzadeh
BTW you intend to process these in 30 seconds?

processingTime="30 seconds

So how many rows of data are sent in microbatch and what is the interval at
which you receive the data in batches from the producer?




LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*





*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 25 Feb 2021 at 12:21, Mich Talebzadeh 
wrote:

> If you are receiving data from Kafka, Wouldn't that be better in Json
> format?
>
> .   try:
> # construct a streaming dataframe streamingDataFrame that
> subscribes to topic config['MDVariables']['topic']) -> md (market data)
> streamingDataFrame = self.spark \
> .readStream \
> .format("kafka") \
> .option("kafka.bootstrap.servers",
> config['MDVariables']['bootstrapServers'],) \
> .option("schema.registry.url",
> config['MDVariables']['schemaRegistryURL']) \
> .option("group.id", config['common']['appName']) \
> .option("zookeeper.connection.timeout.ms",
> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
> .option("rebalance.backoff.ms",
> config['MDVariables']['rebalanceBackoffMS']) \
> .option("zookeeper.session.timeout.ms",
> config['MDVariables']['zookeeperSessionTimeOutMs']) \
> .option("auto.commit.interval.ms",
> config['MDVariables']['autoCommitIntervalMS']) \
> .option("subscribe", config['MDVariables']['topic']) \
> .option("failOnDataLoss", "false") \
> .option("includeHeaders", "true") \
> .option("startingOffsets", "earliest") \
> .load() \
> .select(from_json(col("value").cast("string"),
> schema).alias("parsed_value"))
> return streamingDataFrame
> except Exception as e:
> print(f"""{e}, quitting""")
> sys.exit(1)
>
> and pass a class to the writer
>
>result = streamingDataFrame. \
>  writeStream. \
>  foreach(*ForeachWriter()*). \
>  start()
>
> You don't want to use a row by row (cursor) approach as it would leave a
> lot of messages un processed (as you correctly stated it runs on a single
> JVM).
>
>
> I am doing the same trying to process and write messages to BigQuery.
>
>
> HTH
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 25 Feb 2021 at 06:27, Sachit Murarka 
> wrote:
>
>> Hello Users,
>>
>> I am using Spark 3.0.1 Structuring streaming with Pyspark.
>>
>> My use case::
>> I get so many records in kafka(essentially some metadata with the
>> location of actual data). I have to take that metadata from kafka and apply
>> some processing.
>> Processing includes : Reading the actual data location from metadata and
>> fetching the actual data and applying some operation on actual data.
>>
>> What I have tried::
>>
>> def process_events(event):
>> fetch_actual_data()
>> #many more steps
>>
>> def fetch_actual_data():
>> #applying operation on actual data
>>
>> df = spark.readStream.format("kafka") \
>> .option("kafka.bootstrap.servers", KAFKA_URL) \
>> .option("subscribe", KAFKA_TOPICS) \
>> .option("startingOffsets",
>> START_OFFSET).load() .selectExpr("CAST(value AS STRING)")
>>
>>
>> query =
>> df.writeStream.foreach(process_events).option("checkpointLocation",
>> "/opt/checkpoint").trigger(processingTime="30 seconds").start()
>>
>>
>> My Queries:
>>
>> 1. Will this foreach run across different executor processes? Generally
>> in spark , foreach means it runs on a single executor.
>>
>> 2. I receive too many records in kafka and above code will run multiple
>> times for each single message. If I change it for foreachbatch, will it
>> optimize it?
>>
>>
>> Kind Regards,
>> Sachit Murarka
>>
>


Re: Structured Streaming With Kafka - processing each event

2021-02-25 Thread Mich Talebzadeh
If you are receiving data from Kafka, Wouldn't that be better in Json
format?

.   try:
# construct a streaming dataframe streamingDataFrame that
subscribes to topic config['MDVariables']['topic']) -> md (market data)
streamingDataFrame = self.spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers",
config['MDVariables']['bootstrapServers'],) \
.option("schema.registry.url",
config['MDVariables']['schemaRegistryURL']) \
.option("group.id", config['common']['appName']) \
.option("zookeeper.connection.timeout.ms",
config['MDVariables']['zookeeperConnectionTimeoutMs']) \
.option("rebalance.backoff.ms",
config['MDVariables']['rebalanceBackoffMS']) \
.option("zookeeper.session.timeout.ms",
config['MDVariables']['zookeeperSessionTimeOutMs']) \
.option("auto.commit.interval.ms",
config['MDVariables']['autoCommitIntervalMS']) \
.option("subscribe", config['MDVariables']['topic']) \
.option("failOnDataLoss", "false") \
.option("includeHeaders", "true") \
.option("startingOffsets", "earliest") \
.load() \
.select(from_json(col("value").cast("string"),
schema).alias("parsed_value"))
return streamingDataFrame
except Exception as e:
print(f"""{e}, quitting""")
sys.exit(1)

and pass a class to the writer

   result = streamingDataFrame. \
 writeStream. \
 foreach(*ForeachWriter()*). \
 start()

You don't want to use a row by row (cursor) approach as it would leave a
lot of messages un processed (as you correctly stated it runs on a single
JVM).


I am doing the same trying to process and write messages to BigQuery.


HTH



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*





*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 25 Feb 2021 at 06:27, Sachit Murarka 
wrote:

> Hello Users,
>
> I am using Spark 3.0.1 Structuring streaming with Pyspark.
>
> My use case::
> I get so many records in kafka(essentially some metadata with the location
> of actual data). I have to take that metadata from kafka and apply some
> processing.
> Processing includes : Reading the actual data location from metadata and
> fetching the actual data and applying some operation on actual data.
>
> What I have tried::
>
> def process_events(event):
> fetch_actual_data()
> #many more steps
>
> def fetch_actual_data():
> #applying operation on actual data
>
> df = spark.readStream.format("kafka") \
> .option("kafka.bootstrap.servers", KAFKA_URL) \
> .option("subscribe", KAFKA_TOPICS) \
> .option("startingOffsets",
> START_OFFSET).load() .selectExpr("CAST(value AS STRING)")
>
>
> query =
> df.writeStream.foreach(process_events).option("checkpointLocation",
> "/opt/checkpoint").trigger(processingTime="30 seconds").start()
>
>
> My Queries:
>
> 1. Will this foreach run across different executor processes? Generally in
> spark , foreach means it runs on a single executor.
>
> 2. I receive too many records in kafka and above code will run multiple
> times for each single message. If I change it for foreachbatch, will it
> optimize it?
>
>
> Kind Regards,
> Sachit Murarka
>