using facebook Prophet + pyspark for forecasting - Dataframe has less than 2 non-NaN rows

2023-09-29 Thread karan alang
Hello - Anyone used Prophet + pyspark for forecasting ?
I'm trying to backfill forecasts, and running into issues (error -
Dataframe has less than 2 non-NaN rows)

I'm removing all records with NaN values, yet getting this error.

details are in stackoverflow link ->
https://stackoverflow.com/questions/77205021/facebook-prophet-dataframe-has-less-than-2-non-nan-rows

any ideas on how to fix/debug this?

tia!


Apache Spark with watermark - processing data different LogTypes in same kafka topic

2023-06-24 Thread karan alang
Hello All -

I'm using Apache Spark Structured Streaming to read data from Kafka topic,
and do some processing. I'm using watermark to account for late-coming
records and the code works fine.

Here is the working(sample) code:
```

from pyspark.sql import SparkSessionfrom pyspark.sql.functions import
from_json, col, to_timestamp, window, max,exprfrom pyspark.sql.types
import StructType, StructField, StringType, DoubleType,IntegerType

spark = SparkSession \
.builder \
.master("local[3]") \
.appName("Sliding Window Demo") \
.config("spark.streaming.stopGracefullyOnShutdown", "true") \
.config("spark.sql.shuffle.partitions", 1) \
.getOrCreate()


stock_schema = StructType([
StructField("LogType", StringType()),
StructField("CreatedTime", StringType()),
StructField("Type", StringType()),
StructField("Amount", IntegerType()),
StructField("BrokerCode", StringType())
])

kafka_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "trades") \
.option("startingOffsets", "earliest") \
.load()

value_df = kafka_df.select(from_json(col("value").cast("string"),
stock_schema).alias("value"))

trade_df = value_df.select("value.*") \
.withColumn("CreatedTime", to_timestamp(col("CreatedTime"),
"-MM-dd HH:mm:ss")) \
.withColumn("Buy", expr("case when Type == 'BUY' then Amount
else 0 end")) \
.withColumn("Sell", expr("case when Type == 'SELL' then Amount
else 0 end"))


window_agg_df = trade_df \
.withWatermark("CreatedTime", "10 minute") \
.groupBy(window(col("CreatedTime"), "10 minute")) \
.agg({"Buy":"sum",
"Sell":"sum"}).withColumnRenamed("sum(Buy)",
"TotalBuy").withColumnRenamed("sum(Sell)", "TotalSell")

output_df = window_agg_df.select("window.start", "window.end",
"TotalBuy", "TotalSell")

window_query = output_df.writeStream \
.format("console") \
.outputMode("append") \
.option("checkpointLocation", "chk-point-dir-mar28") \
.trigger(processingTime="30 second") \
.start()

window_query.awaitTermination()


```

Currently, I'm processing a single LogType, the requirement is to process
multiple LogTypes in the same flow .. LogTypes will be config driven (not
hard-coded). Objective is to have generic code that can process all
logTypes.

As an example, for LogType X, I will need to get groupby columns col1, col2
and get the sum of values 'sent' & 'received'. for LogType Y, the groupBy
columns will remain the same but the sum will be on column col3 instead.

w/o the watermark, I can look at the LogType and do the processing in batch
mode (using foreachBatch). However, with watermark - i'm unable to figure
out how to process based on LogType.

Any inputs on this ?

Here is the stackoverflow for this

https://stackoverflow.com/questions/76547349/apache-spark-with-watermark-processing-data-different-logtypes-in-same-kafka-t

tia!


Apache Spark not reading UTC timestamp from MongoDB correctly

2023-06-08 Thread karan alang
ref :
https://stackoverflow.com/questions/76436159/apache-spark-not-reading-utc-timestamp-from-mongodb-correctly

Hello All,
I've data stored in MongoDB collection and the timestamp column is not
being read by Apache Spark correctly. I'm running Apache Spark on GCP
Dataproc.

Here is sample data :

-

In Mongo :

timeslot_date  :
timeslot  |timeslot_date |
+--+--1683527400|{2023-05-08T06:30:00Z}|


When I use pyspark to read this  :

+--+---+
timeslot  |timeslot_date  |
+--+---+1683527400|2023-05-07 23:30:00|
++---+-

-

My understanding is, data in Mongo is in UTC format i.e.
2023-05-08T06:30:00Z is in UTC format. I'm in PST timezone. I'm not
clear why spark is reading it a different timezone format (neither PST
nor UTC) Note - it is not reading it as PST timezone, if it was doing
that it would advance the time by 7 hours, instead it is doing the
opposite.

Where is the default timezone format taken from, when Spark is reading
data from MongoDB ?

Any ideas on this ?

tia!


Re: Spark StructuredStreaming - watermark not working as expected

2023-03-17 Thread karan alang
Hi Mich,
I'm currently testing this on my mac ..  are you able to reproduce this
issue ?

Note - the code is similar .. except outputMode is set to update.
wrt outputMode - when using aggregation + watermark, the outputMode should
be either append Or update, in your code - you have used 'complete'

any comments on this ?

tia!


On Fri, Mar 17, 2023 at 2:36 AM Mich Talebzadeh 
wrote:

> Hi Karan,
>
> The version tested was 3.1.1. Are you running on Dataproc serverless
> 3.1.3?
>
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 16 Mar 2023 at 23:49, karan alang  wrote:
>
>> Fyi .. apache spark version is 3.1.3
>>
>> On Wed, Mar 15, 2023 at 4:34 PM karan alang 
>> wrote:
>>
>>> Hi Mich, this doesn't seem to be working for me .. the watermark seems
>>> to be getting ignored !
>>>
>>> Here is the data put into Kafka :
>>>
>>> ```
>>>
>>>
>>> +---++
>>>
>>> |value
>>> |key |
>>>
>>>
>>> +---++
>>>
>>>
>>> |{"temparature":14,"insert_ts":"2023-03-15T16:04:33.003-07:00","ts":"2023-03-15T15:12:00.000-07:00"}|null|
>>>
>>>
>>> |{"temparature":10,"insert_ts":"2023-03-15T16:05:58.816-07:00","ts":"2023-03-15T16:12:00.000-07:00"}|null|
>>>
>>>
>>> |{"temparature":17,"insert_ts":"2023-03-15T16:07:55.222-07:00","ts":"2023-03-15T16:12:00.000-07:00"}|null|
>>>
>>> |{"temparature":6,"insert_ts":"2023-03-15T16:11:41.759-07:00","ts":"2023-03-13T10:12:00.000-07:00"}
>>> |null|
>>>
>>>
>>> +---++
>>> ```
>>> Note :
>>> insert_ts - specifies when the data was inserted
>>>
>>> Here is the output of the Structured Stream:
>>>
>>> ---
>>>
>>> Batch: 2
>>>
>>> ---
>>>
>>> +---+---+---+
>>>
>>> |startOfWindowFrame |endOfWindowFrame   |Sum_Temperature|
>>>
>>> +---+---+---+
>>>
>>> |2023-03-15 16:10:00|2023-03-15 16:15:00|27 |
>>>
>>> |2023-03-15 15:10:00|2023-03-15 15:15:00|14 |
>>>
>>> |2023-03-13 10:10:00|2023-03-13 10:15:00|6  |
>>>
>>> +---+---+---+
>>>
>>> Note: I'm summing up the temperatures (for easy verification)
>>>
>>> As per the above - all the 3 'ts' are included in the DataFrame, even
>>> when I added   "ts":"2023-03-13T10:12:00.000-07:00", as the last record.
>>> Since the wattermark is set to "5 minutes" and the max(ts) ==
>>> 2023-03-15T16:12:00.000-07:00
>>> record with ts = "2023-03-13T10:12:00.000-07:00" should have got
>>> dropped, it is more than 2 days old (i.e. dated - 2023-03-13)!
>>>
>>> Any ideas what needs to be changed to make this work ?
>>>
>>> Here is the code (modified for my requirement, but essentially the same)
>>> ```
>>>
>>> schema = StructType([
>>> StructField("temparature", LongType(), False),
>>> StructField("ts", TimestampType(), False),
>>> StructField("insert_ts", TimestampType(), False)
>>> ])
>>>
>>> streamingDataFrame = spark \
>>>

Re: Spark StructuredStreaming - watermark not working as expected

2023-03-16 Thread karan alang
Fyi .. apache spark version is 3.1.3

On Wed, Mar 15, 2023 at 4:34 PM karan alang  wrote:

> Hi Mich, this doesn't seem to be working for me .. the watermark seems to
> be getting ignored !
>
> Here is the data put into Kafka :
>
> ```
>
>
> +---++
>
> |value
>   |key |
>
>
> +---++
>
>
> |{"temparature":14,"insert_ts":"2023-03-15T16:04:33.003-07:00","ts":"2023-03-15T15:12:00.000-07:00"}|null|
>
>
> |{"temparature":10,"insert_ts":"2023-03-15T16:05:58.816-07:00","ts":"2023-03-15T16:12:00.000-07:00"}|null|
>
>
> |{"temparature":17,"insert_ts":"2023-03-15T16:07:55.222-07:00","ts":"2023-03-15T16:12:00.000-07:00"}|null|
>
> |{"temparature":6,"insert_ts":"2023-03-15T16:11:41.759-07:00","ts":"2023-03-13T10:12:00.000-07:00"}
> |null|
>
>
> +---++
> ```
> Note :
> insert_ts - specifies when the data was inserted
>
> Here is the output of the Structured Stream:
>
> ---
>
> Batch: 2
>
> ---
>
> +---+---+---+
>
> |startOfWindowFrame |endOfWindowFrame   |Sum_Temperature|
>
> +---+---+---+
>
> |2023-03-15 16:10:00|2023-03-15 16:15:00|27 |
>
> |2023-03-15 15:10:00|2023-03-15 15:15:00|14 |
>
> |2023-03-13 10:10:00|2023-03-13 10:15:00|6  |
>
> +---+---+---+
>
> Note: I'm summing up the temperatures (for easy verification)
>
> As per the above - all the 3 'ts' are included in the DataFrame, even when
> I added   "ts":"2023-03-13T10:12:00.000-07:00", as the last record.
> Since the wattermark is set to "5 minutes" and the max(ts) ==
> 2023-03-15T16:12:00.000-07:00
> record with ts = "2023-03-13T10:12:00.000-07:00" should have got dropped,
> it is more than 2 days old (i.e. dated - 2023-03-13)!
>
> Any ideas what needs to be changed to make this work ?
>
> Here is the code (modified for my requirement, but essentially the same)
> ```
>
> schema = StructType([
> StructField("temparature", LongType(), False),
> StructField("ts", TimestampType(), False),
> StructField("insert_ts", TimestampType(), False)
> ])
>
> streamingDataFrame = spark \
> .readStream \
> .format("kafka") \
> .option("kafka.bootstrap.servers", kafkaBrokers) \
> .option("group.id", 'watermark-grp') \
> .option("subscribe", topic) \
> .option("failOnDataLoss", "false") \
> .option("includeHeaders", "true") \
> .option("startingOffsets", "latest") \
> .load() \
> .select(from_json(col("value").cast("string"), 
> schema=schema).alias("parsed_value"))
>
> resultC = streamingDataFrame.select( 
> col("parsed_value.ts").alias("timestamp") \
>, col("parsed_value.temparature").alias("temparature"), 
> col("parsed_value.insert_ts").alias("insert_ts"))
>
> resultM = resultC. \
> withWatermark("timestamp", "5 minutes"). \
> groupBy(window(resultC.timestamp, "5 minutes", "5 minutes")). \
> agg({'temparature':'sum'})
>
> resultMF = resultM. \
> 
> select(col("window.start").alias("startOfWindowFrame"),col("window.end").alias("endOfWindowFrame")
>  \
>   , col("sum(temparature)").alias("Sum_Temperature"))
>
> result = resultMF. \
>  writeStream. \
>  outputMode('complete'). \
>  option("numRows", 1000). \
>  option("truncate", "false"). \
>  format('console'). \
>  option('checkpointLocation', checkpoint_path). \
>

Re: Spark StructuredStreaming - watermark not working as expected

2023-03-15 Thread karan alang
.readStream \
> .format("kafka") \
> .option("kafka.bootstrap.servers",
> config['MDVariables']['bootstrapServers'],) \
> .option("schema.registry.url",
> config['MDVariables']['schemaRegistryURL']) \
> .option("group.id", config['common']['appName']) \
> .option("zookeeper.connection.timeout.ms",
> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
> .option("rebalance.backoff.ms",
> config['MDVariables']['rebalanceBackoffMS']) \
> .option("zookeeper.session.timeout.ms",
> config['MDVariables']['zookeeperSessionTimeOutMs']) \
> .option("auto.commit.interval.ms",
> config['MDVariables']['autoCommitIntervalMS']) \
> .option("subscribe", "temperature") \
> .option("failOnDataLoss", "false") \
> .option("includeHeaders", "true") \
> .option("startingOffsets", "latest") \
> .load() \
> .select(from_json(col("value").cast("string"),
> schema).alias("parsed_value"))
>
>
> resultC = streamingDataFrame.select( \
>  col("parsed_value.rowkey").alias("rowkey") \
>, col("parsed_value.timestamp").alias("timestamp") \
>, col("parsed_value.temperature").alias("temperature"))
>
> """
> We work out the window and the AVG(temperature) in the
> window's timeframe below
> This should return back the following Dataframe as struct
>
>  root
>  |-- window: struct (nullable = false)
>  ||-- start: timestamp (nullable = true)
>  ||-- end: timestamp (nullable = true)
>  |-- avg(temperature): double (nullable = true)
>
> """
> resultM = resultC. \
>  withWatermark("timestamp", "5 minutes"). \
>  groupBy(window(resultC.timestamp, "5 minutes", "5
> minutes")). \
>  avg('temperature')
>
> # We take the above Dataframe and flatten it to get the
> columns aliased as "startOfWindowFrame", "endOfWindowFrame" and
> "AVGTemperature"
> resultMF = resultM. \
>select( \
>
> F.col("window.start").alias("startOfWindowFrame") \
>   , F.col("window.end").alias("endOfWindowFrame") \
>       ,
> F.col("avg(temperature)").alias("AVGTemperature"))
>
> resultMF.printSchema()
>
> result = resultMF. \
>  writeStream. \
>  outputMode('complete'). \
>  option("numRows", 1000). \
>  option("truncate", "false"). \
>  format('console'). \
>  option('checkpointLocation', checkpoint_path). \
>  queryName("temperature"). \
>  start()
>
> except Exception as e:
> print(f"""{e}, quitting""")
> sys.exit(1)
>
>
>
> HTH
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 11 Mar 2023 at 04:33, karan alang  wrote:
>
>> Hi Mich -
>> Here is the output of the ldf.printSchema() & ldf.show() commands.
>>
>> ldf.printSchema()
>>
>> root
>>  |-- applianceName: string (nullable = true)
>>  |-- timeslot: long (nullable = true)
>>  |-- customer: string (nullable = true)
>>  |-- window: struct (nullable = false)
>>  ||-- start: timestamp (nullable = true)
>>  ||-- end: timestamp (nullable = true)
>>  |-- sentOctets: long (nullable = true)
>>  |-- recvdOctets: long (nullable = true)
>>
>>
>> 

Re: Spark StructuredStreaming - watermark not working as expected

2023-03-10 Thread karan alang
Hi Mich -
Here is the output of the ldf.printSchema() & ldf.show() commands.

ldf.printSchema()

root
 |-- applianceName: string (nullable = true)
 |-- timeslot: long (nullable = true)
 |-- customer: string (nullable = true)
 |-- window: struct (nullable = false)
 ||-- start: timestamp (nullable = true)
 ||-- end: timestamp (nullable = true)
 |-- sentOctets: long (nullable = true)
 |-- recvdOctets: long (nullable = true)


 ldf.show() :

 
+--+---++--++--+--+---+
|applianceName |timeslot|customer|window
 |sentOctets|recvdOctets|
+--+---++--++--+--+---+
|abc1  |2797514|cust1 |{2023-03-11 04:15:00, 2023-03-11
04:30:00}|21459264  |32211859   |
|pqrq  |2797513|cust1 |{2023-03-11 04:15:00, 2023-03-11
04:30:00}|17775527  |31331093   |
|xyz|2797514|cust1 |{2023-03-11 04:15:00,
2023-03-11 04:30:00}|12808015  |24191707   |
+--+---++--++--+--+---+

Also, any comment on the outputMode ? I've set it to 'update', since I'm
using aggregation.

thanks!

On Fri, Mar 10, 2023 at 10:55 AM Mich Talebzadeh 
wrote:

>
> Just looking at the code
>
>
> in here
>
>
> ldf = ldf.groupBy("applianceName", "timeslot", "customer",
>  
> window(col("ts"), "15 minutes")) \
> .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \
> .withColumnRenamed('sum(sentOctets)', 'sentOctets') \
> .withColumnRenamed('sum(recvdOctets)', 'recvdOctets') \
> .fillna(0)
>
> What does ldf.printSchema() returns
>
>
> HTH
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 10 Mar 2023 at 07:16, karan alang  wrote:
>
>>
>> Hello All -
>>
>> I've a structured Streaming job which has a trigger of 10 minutes, and
>> I'm using watermark to account for late data coming in. However, the
>> watermark is not working - and instead of a single record with total
>> aggregated value, I see 2 records.
>>
>> Here is the code :
>>
>> ```
>>
>> 1) StructuredStreaming - Reading from Kafka every 10 mins
>>
>>
>> df_stream = self.spark.readStream.format('kafka') \
>> .option("kafka.security.protocol", "SSL") \
>> .option("kafka.ssl.truststore.location", 
>> self.ssl_truststore_location) \
>> .option("kafka.ssl.truststore.password", 
>> self.ssl_truststore_password) \
>> .option("kafka.ssl.keystore.location", 
>> self.ssl_keystore_location_bandwidth_intermediate) \
>> .option("kafka.ssl.keystore.password", 
>> self.ssl_keystore_password_bandwidth_intermediate) \
>> .option("kafka.bootstrap.servers", self.kafkaBrokers) \
>> .option("subscribe", topic) \
>> .option("startingOffsets", "latest") \
>> .option("failOnDataLoss", "false") \
>> .option("kafka.metadata.max.age.ms", "1000") \
>> .option("kafka.ssl.keystore.type", "PKCS12") \
>> .option("kafka.ssl.truststore.type", "PKCS12") \
>> .load()
>>
>> 2. calling foreachBatch(self.process)
>> # note - outputMode is set to "update" (tried setting outputMode = 
>> append as well)
>>
>> # 03/09 ::: outputMode - update instead of append
>> query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp", 
>> "topic").writeStream \
>> .outputMode("update") \
>> .trigger(processingTime='10 minutes') \
>> .option("truncate", "false

Spark StructuredStreaming - watermark not working as expected

2023-03-09 Thread karan alang
Hello All -

I've a structured Streaming job which has a trigger of 10 minutes, and I'm
using watermark to account for late data coming in. However, the watermark
is not working - and instead of a single record with total aggregated
value, I see 2 records.

Here is the code :

```

1) StructuredStreaming - Reading from Kafka every 10 mins


df_stream = self.spark.readStream.format('kafka') \
.option("kafka.security.protocol", "SSL") \
.option("kafka.ssl.truststore.location",
self.ssl_truststore_location) \
.option("kafka.ssl.truststore.password",
self.ssl_truststore_password) \
.option("kafka.ssl.keystore.location",
self.ssl_keystore_location_bandwidth_intermediate) \
.option("kafka.ssl.keystore.password",
self.ssl_keystore_password_bandwidth_intermediate) \
.option("kafka.bootstrap.servers", self.kafkaBrokers) \
.option("subscribe", topic) \
.option("startingOffsets", "latest") \
.option("failOnDataLoss", "false") \
.option("kafka.metadata.max.age.ms", "1000") \
.option("kafka.ssl.keystore.type", "PKCS12") \
.option("kafka.ssl.truststore.type", "PKCS12") \
.load()

2. calling foreachBatch(self.process)
# note - outputMode is set to "update" (tried setting
outputMode = append as well)

# 03/09 ::: outputMode - update instead of append
query = df_stream.selectExpr("CAST(value AS STRING)",
"timestamp", "topic").writeStream \
.outputMode("update") \
.trigger(processingTime='10 minutes') \
.option("truncate", "false") \
.option("checkpointLocation", self.checkpoint) \
.foreachBatch(self.process) \
.start()


self.process - where i do the bulk of the  processing, which calls the
function  'aggIntfLogs'

In function aggIntfLogs - i'm using watermark of 15 mins, and doing
groupBy to calculate the sum of sentOctets & recvdOctets


def aggIntfLogs(ldf):
if ldf and ldf.count() > 0:

ldf = ldf.select('applianceName', 'timeslot',
'sentOctets', 'recvdOctets','ts', 'customer') \
.withColumn('sentOctets',
ldf["sentOctets"].cast(LongType())) \
.withColumn('recvdOctets',
ldf["recvdOctets"].cast(LongType())) \
.withWatermark("ts", "15 minutes")

ldf = ldf.groupBy("applianceName", "timeslot", "customer",

window(col("ts"), "15 minutes")) \
.agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \
.withColumnRenamed('sum(sentOctets)', 'sentOctets') \
.withColumnRenamed('sum(recvdOctets)', 'recvdOctets') \
.fillna(0)
return ldf
return ldf


Dataframe 'ldf' returned from the function aggIntfLogs - is
written to Kafka topic

```

I was expecting that using the watermark will account for late coming data
.. i.e. the sentOctets & recvdOctets are calculated for the consolidated
data
(including late-coming data, since the late coming data comes within 15
mins), however, I'm seeing 2 records for some of the data (i.e. key -
applianceName/timeslot/customer) i.e. the aggregated data is calculated
individually for the records and I see 2 records instead of single record
accounting for late coming data within watermark.

What needs to be done to fix this & make this work as desired?

tia!


Here is the Stackoverflow link as well -

https://stackoverflow.com/questions/75693171/spark-structuredstreaming-watermark-not-working-as-expected


Re: Online classes for spark topics

2023-03-08 Thread karan alang
+1 .. I'm happy to be part of these discussions as well !




On Wed, Mar 8, 2023 at 12:27 PM Mich Talebzadeh 
wrote:

> Hi,
>
> I guess I can schedule this work over a course of time. I for myself can
> contribute plus learn from others.
>
> So +1 for me.
>
> Let us see if anyone else is interested.
>
> HTH
>
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 8 Mar 2023 at 17:48, ashok34...@yahoo.com 
> wrote:
>
>>
>> Hello Mich.
>>
>> Greetings. Would you be able to arrange for Spark Structured Streaming
>> learning webinar.?
>>
>> This is something I haven been struggling with recently. it will be very
>> helpful.
>>
>> Thanks and Regard
>>
>> AK
>> On Tuesday, 7 March 2023 at 20:24:36 GMT, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>
>> Hi,
>>
>> This might  be a worthwhile exercise on the assumption that the
>> contributors will find the time and bandwidth to chip in so to speak.
>>
>> I am sure there are many but on top of my head I can think of Holden
>> Karau for k8s, and Sean Owen for data science stuff. They are both very
>> experienced.
>>
>> Anyone else 樂
>>
>> HTH
>>
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Tue, 7 Mar 2023 at 19:17, ashok34...@yahoo.com.INVALID
>>  wrote:
>>
>> Hello gurus,
>>
>> Does Spark arranges online webinars for special topics like Spark on K8s,
>> data science and Spark Structured Streaming?
>>
>> I would be most grateful if experts can share their experience with
>> learners with intermediate knowledge like myself. Hopefully we will find
>> the practical experiences told valuable.
>>
>> Respectively,
>>
>> AK
>>
>>


Re: Running Spark on Kubernetes (GKE) - failing on spark-submit

2023-02-15 Thread karan alang
thnks, Mich .. let me check this



On Wed, Feb 15, 2023 at 1:42 AM Mich Talebzadeh 
wrote:

>
> It may help to check this article of mine
>
>
> Spark on Kubernetes, A Practitioner’s Guide
> <https://www.linkedin.com/pulse/spark-kubernetes-practitioners-guide-mich-talebzadeh-ph-d-/?trackingId=FDQORri0TBeJl02p3D%2B2JA%3D%3D>
>
>
> HTH
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 15 Feb 2023 at 09:12, Mich Talebzadeh 
> wrote:
>
>> Your submit command
>>
>> spark-submit --master k8s://https://34.74.22.140:7077 --deploy-mode
>> cluster --name pyspark-example --conf 
>> spark.kubernetes.container.image=pyspark-example:0.1
>> --conf spark.kubernetes.file.upload.path=/myexample
>> src/StructuredStream-on-gke.py
>>
>>
>> pay attention to what it says
>>
>>
>> --conf spark.kubernetes.file.upload.path
>>
>> That refers to your Python package on GCS storage not in the docker itself
>>
>>
>> From
>> https://spark.apache.org/docs/latest/running-on-kubernetes.html#dependency-management
>>
>>
>> "... The app jar file will be uploaded to the S3 and then when the
>> driver is launched it will be downloaded to the driver pod and will be
>> added to its classpath. Spark will generate a subdir under the upload path
>> with a random name to avoid conflicts with spark apps running in parallel.
>> User could manage the subdirs created according to his needs..."
>>
>>
>> In your case it is gs not s3
>>
>>
>> There is no point putting your python file in the docker image itself!
>>
>>
>> HTH
>>
>>
>>view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Wed, 15 Feb 2023 at 07:46, karan alang  wrote:
>>
>>> Hi Ye,
>>>
>>> This is the error i get when i don't set the
>>> spark.kubernetes.file.upload.path
>>>
>>> Any ideas on how to fix this ?
>>>
>>> ```
>>>
>>> Exception in thread "main" org.apache.spark.SparkException: Please
>>> specify spark.kubernetes.file.upload.path property.
>>>
>>> at
>>> org.apache.spark.deploy.k8s.KubernetesUtils$.uploadFileUri(KubernetesUtils.scala:299)
>>>
>>> at
>>> org.apache.spark.deploy.k8s.KubernetesUtils$.$anonfun$uploadAndTransformFileUris$1(KubernetesUtils.scala:248)
>>>
>>> at
>>> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
>>>
>>> at
>>> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>>>
>>> at
>>> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>>>
>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>>>
>>> at scala.collection.TraversableLike.map(TraversableLike.scala:238)
>>>
>>> at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
>>>
>>> at scala.collection.AbstractTraversable.map(Traversable.scala:108)
>>>
>>> at
>>> org.apache.spark.deploy.k8s.KubernetesUtils$.uploadAndTransformFileUris(KubernetesUtils.scala:247)
>>>
>>> at
>>> org.apache.spark.deploy.k8s.features.BasicDriverFeatureStep.$anonfun$getAdditionalPodSystemProperties$1(BasicDriverFeatureStep.scala:173)
>>>
>>> at scala.collection.immutable.List.foreach(List.scala:392)
>>>
>>> at
>>> org.apache.spark.deploy.k8s.features.BasicDriverFeatureStep.getAdditionalPodSystemProperties(BasicDriverFeatureStep.scala:164)
>>&

Re: Running Spark on Kubernetes (GKE) - failing on spark-submit

2023-02-14 Thread karan alang
Hi Ye,

This is the error i get when i don't set the
spark.kubernetes.file.upload.path

Any ideas on how to fix this ?

```

Exception in thread "main" org.apache.spark.SparkException: Please specify
spark.kubernetes.file.upload.path property.

at
org.apache.spark.deploy.k8s.KubernetesUtils$.uploadFileUri(KubernetesUtils.scala:299)

at
org.apache.spark.deploy.k8s.KubernetesUtils$.$anonfun$uploadAndTransformFileUris$1(KubernetesUtils.scala:248)

at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)

at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)

at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)

at scala.collection.TraversableLike.map(TraversableLike.scala:238)

at scala.collection.TraversableLike.map$(TraversableLike.scala:231)

at scala.collection.AbstractTraversable.map(Traversable.scala:108)

at
org.apache.spark.deploy.k8s.KubernetesUtils$.uploadAndTransformFileUris(KubernetesUtils.scala:247)

at
org.apache.spark.deploy.k8s.features.BasicDriverFeatureStep.$anonfun$getAdditionalPodSystemProperties$1(BasicDriverFeatureStep.scala:173)

at scala.collection.immutable.List.foreach(List.scala:392)

at
org.apache.spark.deploy.k8s.features.BasicDriverFeatureStep.getAdditionalPodSystemProperties(BasicDriverFeatureStep.scala:164)

at
org.apache.spark.deploy.k8s.submit.KubernetesDriverBuilder.$anonfun$buildFromFeatures$3(KubernetesDriverBuilder.scala:60)

at
scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)

at
scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)

at scala.collection.immutable.List.foldLeft(List.scala:89)

at
org.apache.spark.deploy.k8s.submit.KubernetesDriverBuilder.buildFromFeatures(KubernetesDriverBuilder.scala:58)

at
org.apache.spark.deploy.k8s.submit.Client.run(KubernetesClientApplication.scala:106)

at
org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.$anonfun$run$3(KubernetesClientApplication.scala:213)

at
org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.$anonfun$run$3$adapted(KubernetesClientApplication.scala:207)

at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2622)

at
org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.run(KubernetesClientApplication.scala:207)

at
org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.start(KubernetesClientApplication.scala:179)

at org.apache.spark.deploy.SparkSubmit.org
$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)

at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)

at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)

at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)

at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)

at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)

at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
```

On Tue, Feb 14, 2023 at 1:33 AM Ye Xianjin  wrote:

> The configuration of ‘…file.upload.path’ is wrong. it means a distributed
> fs path to store your archives/resource/jars temporarily, then distributed
> by spark to drivers/executors.
> For your cases, you don’t need to set this configuration.
> Sent from my iPhone
>
> On Feb 14, 2023, at 5:43 AM, karan alang  wrote:
>
> 
> Hello All,
>
> I'm trying to run a simple application on GKE (Kubernetes), and it is
> failing:
> Note : I have spark(bitnami spark chart) installed on GKE using helm
> install
>
> Here is what is done :
> 1. created a docker image using Dockerfile
>
> Dockerfile :
> ```
>
> FROM python:3.7-slim
>
> RUN apt-get update && \
> apt-get install -y default-jre && \
> apt-get install -y openjdk-11-jre-headless && \
> apt-get clean
>
> ENV JAVA_HOME /usr/lib/jvm/java-11-openjdk-amd64
>
> RUN pip install pyspark
> RUN mkdir -p /myexample && chmod 755 /myexample
> WORKDIR /myexample
>
> COPY src/StructuredStream-on-gke.py /myexample/StructuredStream-on-gke.py
>
> CMD ["pyspark"]
>
> ```
> Simple pyspark application :
> ```
>
> from pyspark.sql import SparkSession
> spark = 
> SparkSession.builder.appName("StructuredStreaming-on-gke").getOrCreate()
>
> data = [('k1', 123000), ('k2', 234000), ('k3', 456000)]
> df = spark.createDataFrame(data, ('id', 'salary'))
>
> df.show(5, False)
>
> ```
>
> Spark-submit command :
> ```
>
> spark-submit --master k8s://https://34.74.22.140:7077 --deploy-mode
> cluster --name pyspark-example --conf
> spark.kubernetes.container.image=pyspark-example:0.1 --conf
> spark.kubernetes.file.upload.path=/myexample src/StructuredStream-on-gke.py
> ```
>
> Error i get :
> ```
>
>

Running Spark on Kubernetes (GKE) - failing on spark-submit

2023-02-13 Thread karan alang
Hello All,

I'm trying to run a simple application on GKE (Kubernetes), and it is
failing:
Note : I have spark(bitnami spark chart) installed on GKE using helm
install

Here is what is done :
1. created a docker image using Dockerfile

Dockerfile :
```

FROM python:3.7-slim

RUN apt-get update && \
apt-get install -y default-jre && \
apt-get install -y openjdk-11-jre-headless && \
apt-get clean

ENV JAVA_HOME /usr/lib/jvm/java-11-openjdk-amd64

RUN pip install pyspark
RUN mkdir -p /myexample && chmod 755 /myexample
WORKDIR /myexample

COPY src/StructuredStream-on-gke.py /myexample/StructuredStream-on-gke.py

CMD ["pyspark"]

```
Simple pyspark application :
```

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("StructuredStreaming-on-gke").getOrCreate()

data = [('k1', 123000), ('k2', 234000), ('k3', 456000)]
df = spark.createDataFrame(data, ('id', 'salary'))

df.show(5, False)

```

Spark-submit command :
```

spark-submit --master k8s://https://34.74.22.140:7077 --deploy-mode cluster
--name pyspark-example --conf
spark.kubernetes.container.image=pyspark-example:0.1 --conf
spark.kubernetes.file.upload.path=/myexample src/StructuredStream-on-gke.py
```

Error i get :
```

23/02/13 13:18:27 INFO KubernetesUtils: Uploading file:
/Users/karanalang/PycharmProjects/Kafka/pyspark-docker/src/StructuredStream-on-gke.py
to dest:
/myexample/spark-upload-12228079-d652-4bf3-b907-3810d275124a/StructuredStream-on-gke.py...

Exception in thread "main" org.apache.spark.SparkException: Uploading file
/Users/karanalang/PycharmProjects/Kafka/pyspark-docker/src/StructuredStream-on-gke.py
failed...

at
org.apache.spark.deploy.k8s.KubernetesUtils$.uploadFileUri(KubernetesUtils.scala:296)

at
org.apache.spark.deploy.k8s.KubernetesUtils$.renameMainAppResource(KubernetesUtils.scala:270)

at
org.apache.spark.deploy.k8s.features.DriverCommandFeatureStep.configureForPython(DriverCommandFeatureStep.scala:109)

at
org.apache.spark.deploy.k8s.features.DriverCommandFeatureStep.configurePod(DriverCommandFeatureStep.scala:44)

at
org.apache.spark.deploy.k8s.submit.KubernetesDriverBuilder.$anonfun$buildFromFeatures$3(KubernetesDriverBuilder.scala:59)

at
scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)

at
scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)

at scala.collection.immutable.List.foldLeft(List.scala:89)

at
org.apache.spark.deploy.k8s.submit.KubernetesDriverBuilder.buildFromFeatures(KubernetesDriverBuilder.scala:58)

at
org.apache.spark.deploy.k8s.submit.Client.run(KubernetesClientApplication.scala:106)

at
org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.$anonfun$run$3(KubernetesClientApplication.scala:213)

at
org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.$anonfun$run$3$adapted(KubernetesClientApplication.scala:207)

at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2622)

at
org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.run(KubernetesClientApplication.scala:207)

at
org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.start(KubernetesClientApplication.scala:179)

at org.apache.spark.deploy.SparkSubmit.org
$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)

at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)

at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)

at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)

at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)

at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)

at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Caused by: org.apache.spark.SparkException: Error uploading file
StructuredStream-on-gke.py

at
org.apache.spark.deploy.k8s.KubernetesUtils$.uploadFileToHadoopCompatibleFS(KubernetesUtils.scala:319)

at
org.apache.spark.deploy.k8s.KubernetesUtils$.uploadFileUri(KubernetesUtils.scala:292)

... 21 more

Caused by: java.io.IOException: Mkdirs failed to create
/myexample/spark-upload-12228079-d652-4bf3-b907-3810d275124a

at
org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:317)

at
org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:305)

at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1098)

at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:987)

at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:414)

at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:387)

at org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:2369)

at
org.apache.hadoop.fs.FilterFileSystem.copyFromLocalFile(FilterFileSystem.java:368)

at
org.apache.spark.deploy.k8s.KubernetesUtils$.uploadFileToHadoopCompatibleFS(KubernetesUtils.scala:316)

... 22 more
```

Any ideas on how to fix this & get it to work ?
tia !

Pls see the stackoverflow link :

https://stackoverflow.com/questions/75441360/running-spark-application-on-gke-failing-on-spark-submit


Re: Spark Structured Streaming - stderr getting filled up

2022-09-19 Thread karan alang
here is the stackoverflow link

https://stackoverflow.com/questions/73780259/spark-structured-streaming-stderr-getting-filled-up

On Mon, Sep 19, 2022 at 4:41 PM karan alang  wrote:

> I've created a stackoverflow ticket for this as well
>
> On Mon, Sep 19, 2022 at 4:37 PM karan alang  wrote:
>
>> Hello All,
>> I've a Spark Structured Streaming job on GCP Dataproc - which picks up
>> data from Kafka, does processing and pushes data back into kafka topics.
>>
>> Couple of questions :
>> 1. Does Spark put all the log (incl. INFO, WARN etc) into stderr ?
>> What I notice is that stdout is empty, while all the logging is put in to
>> stderr
>>
>> 2. Is there a way for me to expire the data in stderr (i.e. expire the
>> older logs) ?
>> Since I've a long running streaming job, the stderr gets filled up over
>> time and nodes/VMs become unavailable.
>>
>> Pls advice.
>>
>> Here is output of the yarn logs command :
>> ```
>>
>> root@versa-structured-stream-v1-w-1:/home/karanalang# yarn logs
>> -applicationId application_1663623368960_0008 -log_files stderr -size -500
>>
>> 2022-09-19 23:26:01,439 INFO client.RMProxy: Connecting to
>> ResourceManager at versa-structured-stream-v1-m/10.142.0.62:8032
>>
>> 2022-09-19 23:26:01,696 INFO client.AHSProxy: Connecting to Application
>> History server at versa-structured-stream-v1-m/10.142.0.62:10200
>>
>> Can not find any log file matching the pattern: [stderr] for the
>> container: container_e01_1663623368960_0008_01_03 within the
>> application: application_1663623368960_0008
>>
>> Container: container_e01_1663623368960_0008_01_02 on
>> versa-structured-stream-v1-w-2.c.versa-sml-googl.internal:8026
>>
>> LogAggregationType: LOCAL
>>
>>
>> ===
>>
>> LogType:stderr
>>
>> LogLastModifiedTime:Mon Sep 19 23:26:02 + 2022
>>
>> LogLength:44309782124
>>
>> LogContents:
>>
>> , tenantId=3, vsnId=0, mstatsTotSentOctets=48210,
>> mstatsTotRecvdOctets=242351, mstatsTotSessDuration=30,
>> mstatsTotSessCount=34, mstatsType=dest-stats, destIp=165.225.216.24,
>> mstatsAttribs=,topic=syslog.ueba-us4.v1.versa.demo3,customer=versa  type(row)
>> is ->  
>>
>> 22/09/19 23:26:02 WARN
>> org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer: KafkaDataConsumer
>> is not running in UninterruptibleThread. It may hang when
>> KafkaDataConsumer's methods are interrupted because of KAFKA-1894
>>
>> End of LogType:stderr.This log file belongs to a running container
>> (container_e01_1663623368960_0008_01_02) and so may not be complete.
>>
>> ***
>>
>>
>>
>> Container: container_e01_1663623368960_0008_01_01 on
>> versa-structured-stream-v1-w-1.c.versa-sml-googl.internal:8026
>>
>> LogAggregationType: LOCAL
>>
>>
>> ===
>>
>> LogType:stderr
>>
>> LogLastModifiedTime:Mon Sep 19 22:54:55 + 2022
>>
>> LogLength:17367929
>>
>> LogContents:
>>
>> on syslog.ueba-us4.v1.versa.demo3-2
>>
>> 22/09/19 22:52:52 INFO
>> org.apache.kafka.clients.consumer.internals.SubscriptionState: [Consumer
>> clientId=consumer-spark-kafka-source-0f984ad9-f663-4ce1-9ef1-349419f3e6ec-1714963016-executor-1,
>> groupId=spark-kafka-source-0f984ad9-f663-4ce1-9ef1-349419f3e6ec-1714963016-executor]
>> Resetting offset for partition syslog.ueba-us4.v1.versa.demo3-2 to offset
>> 449568676.
>>
>> 22/09/19 22:54:55 ERROR
>> org.apache.spark.executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
>>
>> End of LogType:stderr.
>>
>> ***
>>
>> ```
>>
>>
>>


Re: Spark Structured Streaming - stderr getting filled up

2022-09-19 Thread karan alang
I've created a stackoverflow ticket for this as well

On Mon, Sep 19, 2022 at 4:37 PM karan alang  wrote:

> Hello All,
> I've a Spark Structured Streaming job on GCP Dataproc - which picks up
> data from Kafka, does processing and pushes data back into kafka topics.
>
> Couple of questions :
> 1. Does Spark put all the log (incl. INFO, WARN etc) into stderr ?
> What I notice is that stdout is empty, while all the logging is put in to
> stderr
>
> 2. Is there a way for me to expire the data in stderr (i.e. expire the
> older logs) ?
> Since I've a long running streaming job, the stderr gets filled up over
> time and nodes/VMs become unavailable.
>
> Pls advice.
>
> Here is output of the yarn logs command :
> ```
>
> root@versa-structured-stream-v1-w-1:/home/karanalang# yarn logs
> -applicationId application_1663623368960_0008 -log_files stderr -size -500
>
> 2022-09-19 23:26:01,439 INFO client.RMProxy: Connecting to ResourceManager
> at versa-structured-stream-v1-m/10.142.0.62:8032
>
> 2022-09-19 23:26:01,696 INFO client.AHSProxy: Connecting to Application
> History server at versa-structured-stream-v1-m/10.142.0.62:10200
>
> Can not find any log file matching the pattern: [stderr] for the
> container: container_e01_1663623368960_0008_01_03 within the
> application: application_1663623368960_0008
>
> Container: container_e01_1663623368960_0008_01_02 on
> versa-structured-stream-v1-w-2.c.versa-sml-googl.internal:8026
>
> LogAggregationType: LOCAL
>
>
> ===
>
> LogType:stderr
>
> LogLastModifiedTime:Mon Sep 19 23:26:02 + 2022
>
> LogLength:44309782124
>
> LogContents:
>
> , tenantId=3, vsnId=0, mstatsTotSentOctets=48210,
> mstatsTotRecvdOctets=242351, mstatsTotSessDuration=30,
> mstatsTotSessCount=34, mstatsType=dest-stats, destIp=165.225.216.24,
> mstatsAttribs=,topic=syslog.ueba-us4.v1.versa.demo3,customer=versa  type(row)
> is ->  
>
> 22/09/19 23:26:02 WARN
> org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer: KafkaDataConsumer
> is not running in UninterruptibleThread. It may hang when
> KafkaDataConsumer's methods are interrupted because of KAFKA-1894
>
> End of LogType:stderr.This log file belongs to a running container
> (container_e01_1663623368960_0008_01_02) and so may not be complete.
>
> ***
>
>
>
> Container: container_e01_1663623368960_0008_01_01 on
> versa-structured-stream-v1-w-1.c.versa-sml-googl.internal:8026
>
> LogAggregationType: LOCAL
>
>
> ===
>
> LogType:stderr
>
> LogLastModifiedTime:Mon Sep 19 22:54:55 + 2022
>
> LogLength:17367929
>
> LogContents:
>
> on syslog.ueba-us4.v1.versa.demo3-2
>
> 22/09/19 22:52:52 INFO
> org.apache.kafka.clients.consumer.internals.SubscriptionState: [Consumer
> clientId=consumer-spark-kafka-source-0f984ad9-f663-4ce1-9ef1-349419f3e6ec-1714963016-executor-1,
> groupId=spark-kafka-source-0f984ad9-f663-4ce1-9ef1-349419f3e6ec-1714963016-executor]
> Resetting offset for partition syslog.ueba-us4.v1.versa.demo3-2 to offset
> 449568676.
>
> 22/09/19 22:54:55 ERROR
> org.apache.spark.executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
>
> End of LogType:stderr.
>
> ***
>
> ```
>
>
>


Spark Structured Streaming - stderr getting filled up

2022-09-19 Thread karan alang
Hello All,
I've a Spark Structured Streaming job on GCP Dataproc - which picks up data
from Kafka, does processing and pushes data back into kafka topics.

Couple of questions :
1. Does Spark put all the log (incl. INFO, WARN etc) into stderr ?
What I notice is that stdout is empty, while all the logging is put in to
stderr

2. Is there a way for me to expire the data in stderr (i.e. expire the
older logs) ?
Since I've a long running streaming job, the stderr gets filled up over
time and nodes/VMs become unavailable.

Pls advice.

Here is output of the yarn logs command :
```

root@versa-structured-stream-v1-w-1:/home/karanalang# yarn logs
-applicationId application_1663623368960_0008 -log_files stderr -size -500

2022-09-19 23:26:01,439 INFO client.RMProxy: Connecting to ResourceManager
at versa-structured-stream-v1-m/10.142.0.62:8032

2022-09-19 23:26:01,696 INFO client.AHSProxy: Connecting to Application
History server at versa-structured-stream-v1-m/10.142.0.62:10200

Can not find any log file matching the pattern: [stderr] for the container:
container_e01_1663623368960_0008_01_03 within the application:
application_1663623368960_0008

Container: container_e01_1663623368960_0008_01_02 on
versa-structured-stream-v1-w-2.c.versa-sml-googl.internal:8026

LogAggregationType: LOCAL

===

LogType:stderr

LogLastModifiedTime:Mon Sep 19 23:26:02 + 2022

LogLength:44309782124

LogContents:

, tenantId=3, vsnId=0, mstatsTotSentOctets=48210,
mstatsTotRecvdOctets=242351, mstatsTotSessDuration=30,
mstatsTotSessCount=34, mstatsType=dest-stats, destIp=165.225.216.24,
mstatsAttribs=,topic=syslog.ueba-us4.v1.versa.demo3,customer=versa  type(row)
is ->  

22/09/19 23:26:02 WARN
org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer: KafkaDataConsumer
is not running in UninterruptibleThread. It may hang when
KafkaDataConsumer's methods are interrupted because of KAFKA-1894

End of LogType:stderr.This log file belongs to a running container
(container_e01_1663623368960_0008_01_02) and so may not be complete.

***



Container: container_e01_1663623368960_0008_01_01 on
versa-structured-stream-v1-w-1.c.versa-sml-googl.internal:8026

LogAggregationType: LOCAL

===

LogType:stderr

LogLastModifiedTime:Mon Sep 19 22:54:55 + 2022

LogLength:17367929

LogContents:

on syslog.ueba-us4.v1.versa.demo3-2

22/09/19 22:52:52 INFO
org.apache.kafka.clients.consumer.internals.SubscriptionState: [Consumer
clientId=consumer-spark-kafka-source-0f984ad9-f663-4ce1-9ef1-349419f3e6ec-1714963016-executor-1,
groupId=spark-kafka-source-0f984ad9-f663-4ce1-9ef1-349419f3e6ec-1714963016-executor]
Resetting offset for partition syslog.ueba-us4.v1.versa.demo3-2 to offset
449568676.

22/09/19 22:54:55 ERROR
org.apache.spark.executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM

End of LogType:stderr.

***

```


Spark Structured Streaming - unable to change max.poll.records (showing as 1)

2022-09-07 Thread karan alang
Hello All,

i've a Spark structured streaming job which reads from Kafka, does
processing and puts data into Mongo/Kafka/GCP Buckets (i.e. it is
processing heavy)

I'm consistently seeing the following warnings:

```

22/09/06 16:55:03 INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator:
[Consumer 
clientId=consumer-spark-kafka-source-4e7e7f32-19ab-44d5-99f5-59fb5a462af2-594190416-driver-0-1,
groupId=spark-kafka-source-4e7e7f32-19ab-44d5-99f5-59fb5a462af2-594190416-driver-0]
Member 
consumer-spark-kafka-source-4e7e7f32-19ab-44d5-99f5-59fb5a462af2-594190416-driver-0-1-604d740f-16d1-46b3-955c-502be5b02be1
sending LeaveGroup request to coordinator 35.237.40.54:9094 (id:
2147483645 rack: null) due to consumer poll timeout has expired. This
means the time between subsequent calls to poll() was longer than the
configured max.poll.interval.ms, which typically implies that the poll
loop is spending too much time processing messages. You can address
this either by increasing max.poll.interval.ms or by reducing the
maximum size of batches returned in poll() with max.poll.records.

```

I'm trying to change the values of the parameters - max.poll.interval.ms &
max.poll.records in the spark.readStream (shown below)

```

df_stream = spark.readStream.format('kafka') \
.option("kafka.security.protocol", "SSL") \
.option("kafka.ssl.truststore.location", ssl_truststore_location) \
.option("kafka.ssl.truststore.password", ssl_truststore_password) \
.option("kafka.ssl.keystore.location", ssl_keystore_location) \
.option("kafka.ssl.keystore.password", ssl_keystore_password) \
.option("kafka.bootstrap.servers", kafkaBrokers) \
.option("subscribe", topic) \
.option("startingOffsets", "latest") \
.option("failOnDataLoss", "false") \
.option("kafka.metadata.max.age.ms", "1000") \
.option("kafka.ssl.keystore.type", "PKCS12") \
.option("kafka.ssl.truststore.type", "PKCS12") \
.option("kafka.max.poll.interval.ms", 60) \
.option("kafka.max.poll.records", 7000) \
.load()

```

The values that I see in the KafkaConsumer are :

max.poll.interval.ms - 60 (changed from 30) max.poll.records - not
getting changed, it is showing as 1

```

22/09/07 02:29:32 INFO
org.apache.kafka.clients.consumer.ConsumerConfig: ConsumerConfig
values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [34.138.213.152:9094]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id =
consumer-spark-kafka-source-17ac0d19-f30c-4db7-91b9-1dbe9172829e-594190416-driver-0-1
client.rack =
connections.max.idle.ms = 54
default.api.timeout.ms = 6
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = 
spark-kafka-source-17ac0d19-f30c-4db7-91b9-1dbe9172829e-594190416-driver-0
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 60
max.poll.records = 1
metadata.max.age.ms = 1000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 3
partition.assignment.strategy = [class
org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 3
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 6
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = SSL
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 1
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = syslog-vani-noacl.p12
ssl.keystore.password = [hidden]
ssl.keystore.type = PKCS12
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null

Structured Streaming - data not being read (offsets not getting committed ?)

2022-08-26 Thread karan alang
Hello All,

i've a long-running Apache Spark structured streaming job running in GCP
Dataproc, which reads data from Kafka every 10 mins, and does some
processing. Kafka topic has 3 partitions, and a retention period of 3 days.

The issue i'm facing is that after few hours, the program stops reading
data from Kafka. If i delete the gcp bucket (which is the checkpoint
directory), and then restart the streaming job - it starts consuming the
data again.

here is the code, where I'm reading data from Kafka & using foreachbatch to
call a function where the processing happens

```

df_stream = spark.readStream.format('kafka') \
.option("kafka.security.protocol", "SSL") \
.option("kafka.ssl.truststore.location", ssl_truststore_location) \
.option("kafka.ssl.truststore.password", ssl_truststore_password) \
.option("kafka.ssl.keystore.location", ssl_keystore_location) \
.option("kafka.ssl.keystore.password", ssl_keystore_password) \
.option("kafka.bootstrap.servers", kafkaBrokers) \
.option("subscribe", topic) \
.option("startingOffsets", "latest") \
.option("failOnDataLoss", "false") \
.option("kafka.metadata.max.age.ms", "1000") \
.option("kafka.ssl.keystore.type", "PKCS12") \
.option("kafka.ssl.truststore.type", "PKCS12") \
.option("maxOffsetsPerTrigger", 10) \
.option("max.poll.records", 500) \
.option("max.poll.interval.ms", 100) \
.load()

query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp",
"topic").writeStream \
.outputMode("append") \
.trigger(processingTime='3 minutes') \
.option("truncate", "false") \
.option("checkpointLocation", checkpoint) \
.foreachBatch(convertToDictForEachBatch) \
.start()

```

Log snippet, offset being set to latest & data not being read :

```

total time take, convertToDict :  0:00:00.00669522/08/26 17:45:03 INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator:
[Consumer 
clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1,
groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0]
Member 
consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1-26755c4c-93d6-4ab6-8799-411439e310bc
sending LeaveGroup request to coordinator 35.185.24.226:9094 (id:
2147483646 rack: null) due to consumer poll timeout has expired. This
means the time between subsequent calls to poll() was longer than the
configured max.poll.interval.ms, which typically implies that the poll
loop is spending too much time processing messages. You can address
this either by increasing max.poll.interval.ms or by reducing the
maximum size of batches returned in poll() with
max.poll.records.22/08/26 17:50:00 INFO
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:
[Consumer 
clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1,
groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0]
Giving away all assigned partitions as lost since generation has been
reset,indicating that consumer is no longer part of the group22/08/26
17:50:00 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:
[Consumer 
clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1,
groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0]
Lost previously assigned partitions syslog.ueba-us4.v1.versa.demo3-0,
syslog.ueba-us4.v1.versa.demo3-1,
syslog.ueba-us4.v1.versa.demo3-222/08/26 17:50:00 INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator:
[Consumer 
clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1,
groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0]
(Re-)joining group22/08/26 17:50:00 INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator:
[Consumer 
clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1,
groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0]
Join group failed with
org.apache.kafka.common.errors.MemberIdRequiredException: The group
member needs to have a valid member id before actually entering a
consumer group.22/08/26 17:50:00 INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator:
[Consumer 
clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1,
groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0]
(Re-)joining group22/08/26 17:50:03 INFO
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:
[Consumer 
clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1,
groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0]
Finished 

StructuredStreaming - read from Kafka, writing data into Mongo every 10 minutes

2022-06-22 Thread karan alang
Hello All,

I have data in Kafka topic(data published every 10 mins) and I'm planning
to read this data using Apache Spark Structured Stream(batch mode) and push
it in MongoDB.

Pls note : This will be scheduled using Composer/Airflow on GCP - which
will create a Dataproc cluster, run the spark code, and then delete the
cluster

Here is my current code :

```

# read from Kafka, extract json - and write to mongoDB

df_reader = spark.readStream.format('kafka')\
.option("kafka.bootstrap.servers",kafkaBrokers)\
.option("kafka.security.protocol","SSL") \
.option("kafka.ssl.truststore.location",ssl_truststore_location) \
.option("kafka.ssl.truststore.password",ssl_truststore_password) \
.option("kafka.ssl.keystore.location", ssl_keystore_location)\
.option("kafka.ssl.keystore.password", ssl_keystore_password)\
.option("subscribe", topic) \
.option("kafka.group.id", consumerGroupId)\
.option("failOnDataLoss", "false") \
.option("startingOffsets", "earliest") \
.load()

df = df_reader.selectExpr("CAST(value AS STRING)")
df_data = 
df.select(from_json(col('value'),schema).alias('data')).select("data.*").filter(col('customer')==database)

# write to Mongo
df_data.write\
.format("mongo") \
.option("uri", mongoConnUri) \
.option("database", database) \
.option("collection", collection) \
.mode("append") \
.save()


```

Since this is run as a batch query every 10 minutes, how do I ensure that
duplicate records are not read, and pushed into MongoDB ? When I use
readStream (every time the job is launched using Airflow) - does it read
all the data in Kafka topic OR from the point it last read the data ?

Pls note : mongo datasource does not support streaming query, else i could
have used the checkpoint to enable this ?

Pls advise what is the best way to achieve this ?

tia!


Here is the stackoverflow link :

https://stackoverflow.com/questions/72723137/structuredstreaming-read-from-kafka-writing-data-into-mongo-every-10-minutes


Spark Structured streaming(batch mode) - running dependent jobs concurrently

2022-06-15 Thread karan alang
Hello All,

I've a Structured Streaming program running on GCP dataproc which reads
data from Kafka every 10 mins, and then does processing.
This is a multi-tenant system i.e. the program will read data from multiple
customers.

In my current code, i'm looping over the customers passing it to the 3
programs - P1, P2, P3
P1, P2, P3 are  classes where the bulk of the processing happens, and the
data is pushed back to kafka

```

def convertToDictForEachBatch(df, batchId):

# code to change syslog to required format - this is not  included here
since it is not relevant to the issue

for cust in hm.values():

# tdict_ap - has data specific to P2, filter code is not shown
p1 = P1(tdict_ap, spark, False, cust)


# tdict_ap - has data specific to P2, filter code is not shown
p2 = P2(tdict_ap, spark, False, cust)


# tdict_ap - has data specific to P3, filter code is not shown
p3 = P3(tdict_ap, spark, False, cust)



# df_stream = data read from Kafka, this calls function
convertToDictForEachBatch

query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp",
"topic").writeStream \
 .outputMode("append") \
 .trigger(processingTime='10 minutes') \
 .option("truncate", "false") \
 .option("checkpointLocation", checkpoint) \
 .foreachBatch(convertToDictForEachBatch) \
 .start()



```
In the above code, the processing is sequential .. I would like to make the
processing concurrent/asynchronous to the extent possible

Couple of options I'm considering :

1. Using asyncio
- from what i understand, this might improve the performance since for each
customer - it might allow processing
in 3 classes in asynchronous fashion

2.

use windowing to Partition dataframe by 'customer' + groupBy
- this should allow concurrency for the multiple customers if there are
sufficient executors

Here is the same code for this,
I think this might need to be done in each of the 3 classes P1, P2, P3

```

window = Window.partitionBy('cust')

all_DF = all_DF.repartition('applianceName', 'cktName').cache()
results = (
all_DF
.groupBy('cust')
.apply()
)

```

Here is the stackoverflow link with the details :

https://stackoverflow.com/questions/72636814/spark-structured-streamingbatch-mode-running-dependent-jobs-concurrently

Pls advise on what is the best way to achieve this ?

tia!


GCP Dataproc - adding multiple packages(kafka, mongodb) while submitting spark jobs not working

2022-05-24 Thread karan alang
Hello All,
I've a Structured Streaming job on GCP Dataproc, and i'm trying to pass
multiple packages (kafka, mongoDB)  to the dataproc submit command, and
that is not working.

Command that is working (when i add single dependency eg. Kafka) :
```

gcloud dataproc jobs submit pyspark main.py \
  --cluster versa-structured-stream  \
  --properties 
spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,
spark.dynamicAllocation.enabled=true,spark.shuffle.service.enabled=true

```

However, when i add the mongoDB package as well (tried a few options) - it
seems to be failing.
eg.
```
Option 1 :
gcloud dataproc jobs submit pyspark main.py \

  --cluster versa-structured-stream  \
  --properties 
^#^spark:spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,org.mongodb.spark:mongo-spark-connector_2.12:3.0.2,spark:spark.dynamicAllocation.enabled=true,spark:spark.shuffle.service.enabled=true,spark:spark.executor.memory=20g,spark:spark.driver.memory=5g,spark:spark.executor.cores=2
\
  
--jars=gs://dataproc-spark-jars/spark-avro_2.12-3.1.2.jar,gs://dataproc-spark-jars/isolation-forest_2.4.3_2.12-2.0.8.jar,gs://dataproc-spark-jars/spark-bigquery-with-dependencies_2.12-0.23.2.jar
\
  
--files=gs://kafka-certs/versa-kafka-gke-ca.p12,gs://kafka-certs/syslog-vani.p12,gs://kafka-certs/alarm-compression-user.p12,gs://kafka-certs/appstats-user.p12,gs://kafka-certs/insights-user.p12,gs://kafka-certs/intfutil-user.p12,gs://kafka-certs/reloadpred-chkpoint-user.p12,gs://kafka-certs/reloadpred-user.p12,gs://dataproc-spark-configs/topic-customer-map.cfg,gs://dataproc-spark-configs/params.cfg
 \
  --region us-east1 \
  --py-files streams.zip,utils.zip


Option 2 :
gcloud dataproc jobs submit pyspark main.py \
  --cluster versa-structured-stream \
  --properties
spark.jars.packages='org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,org.mongodb.spark:mongo-spark-connector_2.12:3.0.2',spark:spark.dynamicAllocation.enabled=true,spark:spark.shuffle.service.enabled=true,spark:spark.executor.memory=20g,spark:spark.driver.memory=5g,spark:spark.executor.cores=
2 \

  
--jars=gs://dataproc-spark-jars/spark-avro_2.12-3.1.2.jar,gs://dataproc-spark-jars/isolation-forest_2.4.3_2.12-2.0.8.jar,gs://dataproc-spark-jars/spark-bigquery-with-dependencies_2.12-0.23.2.jar
\
  
--files=gs://kafka-certs/versa-kafka-gke-ca.p12,gs://kafka-certs/syslog-vani.p12,gs://kafka-certs/alarm-compression-user.p12,gs://kafka-certs/appstats-user.p12,gs://kafka-certs/insights-user.p12,gs://kafka-certs/intfutil-user.p12,gs://kafka-certs/reloadpred-chkpoint-user.p12,gs://kafka-certs/reloadpred-user.p12,gs://dataproc-spark-configs/topic-customer-map.cfg,gs://dataproc-spark-configs/params.cfg
 \
  --region us-east1 \
  --py-files streams.zip,utils.zip


```

Any pointers on how to fix/debug this ?

details also in the stackoverflow link -
https://stackoverflow.com/questions/72369619/gcp-dataproc-adding-multiple-packageskafka-mongodb-while-submitting-jobs-no

tia!


Re: StructuredStreaming - processing data based on new events in Kafka topic

2022-03-13 Thread karan alang
Hi Mich,

The code I sent for the function 'convertToDictForEachBatch' is not the
complete code.
It does use the DF to do a bunch of transformations/operations.

Specific to the problem I sent the email for :
One piece of the code reloads the prediction data from Bigquery based on
the 'event' in topic, the event indicates that the prediction data in
Bigquery is changed.
Here is the code with comments, hope that clarifies.

```

# called from - foreachbatch
def convertToDictForEachBatch(df, batchId):

   # Uses the dataframe to do processing of data, that code is Not
added, since it is not relevant to this question

   # Additional processing i.e. reloading of prediction data from Big
query, into DataFrame - based on event in Kafka topic
   # checks for event in topic - topic_reloadpred and further
processing takes place if there is new data in the topic

# requirement : read data from topic - topic_reloadpred, then check if
there are additional rows added, if yes - call method to reload data
from BigQuery

events = spark.read.format('kafka') \
.option("kafka.bootstrap.servers", kafkaBrokers) \
.option("kafka.security.protocol", "SSL") \
.option("kafka.ssl.truststore.location", ssl_truststore_location) \
.option("kafka.ssl.truststore.password", ssl_truststore_password) \
.option("kafka.ssl.keystore.location",
ssl_keystore_location_reloadpred) \
.option("kafka.ssl.keystore.password",
ssl_keystore_password_reloadpred) \
.option("subscribe", topic_reloadpred) \
.option("kafka.group.id", consumerGroupId_reloadpred) \
.load()

# if event dataframe had new records(from last read time), call method
to reload data from BigQuery

reloadDataFromBigQuery()

```
The requirement is to identify that new rows have been added to the topic -
topic_reloadpred, and then reload data from BigQuery to dataframe, if
required.
(pls note - the data loaded from BigQuery is persisted ie df.persist(), and
changes in-frequently)

One idea is to store the maxOffset read from each batch read from
topic_reloadpred, and when the next batch is read - compare that with
'stored' maxOffset,
to determine if new records have been added to the topic.

What is the best way to fulfill this requirement ?

regds,
Karan Alang







On Sat, Mar 12, 2022 at 12:42 PM Mich Talebzadeh 
wrote:

> There are a number of flaws here.
>
> You have defined your trigger based processing time within Spark
> Structured Streaming (SSS) as below
>
> trigger(processingTime='4 minutes')
>
>
> SSS will trigger every 4 minutes, in other words within a micro-batch of 4
> minutes. This is what is known as micro-batch interval. The way this works
> is that SSS is actioned every 4 minutes. If the previous batch finished in
> 1 minute, then SSS will wait for (4-1 = 3) minutes before processing again.
> If the previous processing took 5 minutes to finish, then we have a
> potential backlog and SSS will process immediately after the previous job
> finishes (in other words it kicks off the next micro-batch).
>
>
> Now the function foreachBatch(convertToDictForEachBatch) performs custom
> write logic on each micro-batch through convertToDictForEachBatch
> function. foreachBatch(convertToDictForEachBatch) expects 2 parameters,
> first: micro-batch as DataFrame or Dataset and second: unique id for each
> batch. However, in calling the function convertToDictForEachBatch as
> below
>
> def convertToDictForEachBatch(df, batchId):
>
> # checks for event in topic - events_topic and further processing takes 
> place if there is new data in the topic
> events = spark.read.format('kafka') \
> .option("kafka.bootstrap.servers", kafkaBrokers) \
> .option("kafka.security.protocol", "SSL") \
> .option("kafka.ssl.truststore.location", ssl_truststore_location) \
> .option("kafka.ssl.truststore.password", ssl_truststore_password) \
> .option("kafka.ssl.keystore.location", 
> ssl_keystore_location_reloadpred) \
> .option("kafka.ssl.keystore.password", 
> ssl_keystore_password_reloadpred) \
> .option("subscribe", topic_reloadpred) \
> .option("kafka.group.id", consumerGroupId_reloadpred) \
> .load()
>
> There is no use case for df -> DataFrame in the code? So what are you 
> checking here? What happens if df is empty?
>
> HTH
>
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of 

StructuredStreaming - processing data based on new events in Kafka topic

2022-03-11 Thread karan alang
Hello All,

I have a structured Streaming program, which reads data from Kafka topic,
and does some processing, and finally puts data into target Kafka Topic.

Note : the processing is done in function - convertToDictForEachBatch(),
which is called using - foreachBatch(convertToDictForEachBatch)

As part of the processing, it reads another Kafka Topic (events_topic), and
if there is New record(s) after the last read, it does some additional
processing - reloads data from BigQuery table, and persists it.

Here is the code :

```

df_stream = spark.readStream.format('kafka') \
.option("kafka.security.protocol", "SSL") \
.option("kafka.ssl.truststore.location", ssl_truststore_location) \
.option("kafka.ssl.truststore.password", ssl_truststore_password) \
.option("kafka.ssl.keystore.location", ssl_keystore_location) \
.option("kafka.ssl.keystore.password", ssl_keystore_password) \
.option("kafka.bootstrap.servers",kafkaBrokers)\
.option("subscribe", topic) \
.option("kafka.group.id", consumerGroupId)\
.option("startingOffsets", "latest") \
.option("failOnDataLoss", "false") \
.option("maxOffsetsPerTrigger", 1) \
.load()


print(" df_stream -> ", df_stream)
query = df_stream.selectExpr("CAST(value AS STRING)",
"timestamp").writeStream \
.outputMode("append") \
.trigger(processingTime='4 minutes') \
.option("numRows",1)\
.option("truncate", "false") \
.option("checkpointLocation", checkpoint) \
.foreachBatch(convertToDictForEachBatch) \
.start()

query.awaitTermination()

```

# called from - foreachbatch
def convertToDictForEachBatch(df, batchId):

# checks for event in topic - events_topic and further processing
takes place if there is new data in the topic
events = spark.read.format('kafka') \
.option("kafka.bootstrap.servers", kafkaBrokers) \
.option("kafka.security.protocol", "SSL") \
.option("kafka.ssl.truststore.location", ssl_truststore_location) \
.option("kafka.ssl.truststore.password", ssl_truststore_password) \
.option("kafka.ssl.keystore.location",
ssl_keystore_location_reloadpred) \
.option("kafka.ssl.keystore.password",
ssl_keystore_password_reloadpred) \
.option("subscribe", topic_reloadpred) \
.option("kafka.group.id", consumerGroupId_reloadpred) \
.load()

# events is passed to a function, and processing is done if new
events are generated

```

What is the best way to achieve this ? The current code is reading the
entire data in the kafka topic, i need it to read only the new data.

Additional Details in stackoverflow :

https://stackoverflow.com/questions/71446023/structuredstreaming-processing-data-based-on-new-events-in-kafka-topic


tia!


Re: StructuredStreaming error - pyspark.sql.utils.StreamingQueryException: batch 44 doesn't exist

2022-02-27 Thread karan alang
Hi Gourav,

Pls see my responses below :

Can you please let us know:
1. the SPARK version, and the kind of streaming query that you are running?

KA : Apache Spark 3.1.2 - on Dataproc using Ubunto 18.04 (the highest Spark
version supported on dataproc is 3.1.2) ,

2. whether you are using at least once, utmost once, or only once concepts?

KA : default value - at-least once delivery semantics
(per my understanding, i don't believe delivery semantics is related to the
issue, though)

3. any additional details that you can provide, regarding the storage
duration in Kafka, etc?

KA : storage duration - 1 day ..
However, as I mentioned in the stackoverflow ticket, on readStream ->
"failOnDataLoss" = "false", so the log retention should not cause this
issue.

4. are your running stateful or stateless operations? If you are using
stateful operations and SPARK 3.2 try to use RocksDB which is now natively
integrated with SPARK :)

KA : Stateful - since i'm using windowing+watermark in the aggregation
queries.

Also, thnx - will check the links you provided.

regds,
Karan Alang

On Sat, Feb 26, 2022 at 3:31 AM Gourav Sengupta 
wrote:

> Hi,
>
> Can you please let us know:
> 1. the SPARK version, and the kind of streaming query that you are
> running?
> 2. whether you are using at least once, utmost once, or only once concepts?
> 3. any additional details that you can provide, regarding the storage
> duration in Kafka, etc?
> 4. are your running stateful or stateless operations? If you are using
> stateful operations and SPARK 3.2 try to use RocksDB which is now natively
> integrated with SPARK :)
>
> Besides the mail sent by Mich, the following are useful:
> 1.
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries
> (see the stop operation, and awaitTermination... operation)
> 2. Try to always ensure that you are doing exception handling based on the
> option mentioned in the above link, long running streaming programmes in
> distributed systems do have issues, and handling exceptions is important
> 3. There is another thing which I do, and it is around reading the
> streaming metrics and pushing them for logging, that helps me to know in
> long running system whether there are any performance issues or not (
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#reading-metrics-interactively
> or
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#reporting-metrics-programmatically-using-asynchronous-apis)
> . The following is an interesting reading on the kind of metrics to look
> out for and the way to interpret them (
> https://docs.databricks.com/spark/latest/rdd-streaming/debugging-streaming-applications.html
> )
>
>
> Regards,
> Gourav
>
>
> On Sat, Feb 26, 2022 at 10:45 AM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Check the thread I forwarded on how to gracefully shutdown spark
>> structured streaming
>>
>> HTH
>>
>> On Fri, 25 Feb 2022 at 22:31, karan alang  wrote:
>>
>>> Hello All,
>>> I'm running a StructuredStreaming program on GCP Dataproc, which reads
>>> data from Kafka, does some processing and puts processed data back into
>>> Kafka. The program was running fine, when I killed it (to make minor
>>> changes), and then re-started it.
>>>
>>> It is giving me the error -
>>> pyspark.sql.utils.StreamingQueryExceptionace: batch 44 doesn't exist
>>>
>>> Here is the error:
>>>
>>> 22/02/25 22:14:08 ERROR 
>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution: Query [id = 
>>> 0b73937f-1140-40fe-b201-cf4b7443b599, runId = 
>>> 43c9242d-993a-4b9a-be2b-04d8c9e05b06] terminated with error
>>> java.lang.IllegalStateException: batch 44 doesn't exist
>>> at 
>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$populateStartOffsets$1(MicroBatchExecution.scala:286)
>>> at scala.Option.getOrElse(Option.scala:189)
>>> at 
>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.populateStartOffsets(MicroBatchExecution.scala:286)
>>> at 
>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:197)
>>> at 
>>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>>> at 
>>> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
>>> at 
>>> org.apache.spark.sql.e

Re: StructuredStreaming error - pyspark.sql.utils.StreamingQueryException: batch 44 doesn't exist

2022-02-27 Thread karan alang
Hi Mich,
thnx .. i'll check the thread you forwarded, and revert back.

regds,
Karan Alang

On Sat, Feb 26, 2022 at 2:44 AM Mich Talebzadeh 
wrote:

> Check the thread I forwarded on how to gracefully shutdown spark
> structured streaming
>
> HTH
>
> On Fri, 25 Feb 2022 at 22:31, karan alang  wrote:
>
>> Hello All,
>> I'm running a StructuredStreaming program on GCP Dataproc, which reads
>> data from Kafka, does some processing and puts processed data back into
>> Kafka. The program was running fine, when I killed it (to make minor
>> changes), and then re-started it.
>>
>> It is giving me the error - pyspark.sql.utils.StreamingQueryExceptionace:
>> batch 44 doesn't exist
>>
>> Here is the error:
>>
>> 22/02/25 22:14:08 ERROR 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution: Query [id = 
>> 0b73937f-1140-40fe-b201-cf4b7443b599, runId = 
>> 43c9242d-993a-4b9a-be2b-04d8c9e05b06] terminated with error
>> java.lang.IllegalStateException: batch 44 doesn't exist
>> at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$populateStartOffsets$1(MicroBatchExecution.scala:286)
>> at scala.Option.getOrElse(Option.scala:189)
>> at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.populateStartOffsets(MicroBatchExecution.scala:286)
>> at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:197)
>> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>> at 
>> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
>> at 
>> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
>> at 
>> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
>> at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
>> at 
>> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
>> at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
>> at 
>> org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334)
>> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>> at 
>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
>> at 
>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
>> Traceback (most recent call last):
>>   File 
>> "/tmp/0149aedd804c42f288718e013fb16f9c/StructuredStreaming_GCP_Versa_Sase_gcloud.py",
>>  line 609, in 
>> query.awaitTermination()
>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", 
>> line 101, in awaitTermination
>>   File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", 
>> line 1304, in __call__
>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 
>> 117, in deco
>> pyspark.sql.utils.StreamingQueryException: batch 44 doesn't exist
>>
>>
>> Question - what is the cause of this error and how to debug/fix ? Also, I
>> notice that the checkpoint location gets corrupted occasionally, when I do
>> multiple restarts. After checkpoint corruption, it does not return any
>> records
>>
>> For the above issue(as well as when the checkpoint was corrupted), when i
>> cleared the checkpoint location and re-started the program, it went trhough
>> fine.
>>
>> Pls note: while doing readStream, i've enabled failOnDataLoss=false
>>
>> Additional details are in stackoverflow :
>>
>>
>> https://stackoverflow.com/questions/71272328/structuredstreaming-error-pyspark-sql-utils-streamingqueryexception-batch-44
>>
>> any input on this ?
>>
>> tia!
>>
>>
>> --
>
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: StructuredStreaming error - pyspark.sql.utils.StreamingQueryException: batch 44 doesn't exist

2022-02-27 Thread karan alang
Hi Gabor,
i just responded to your comment on stackoverflow.

regds,
Karan Alang

On Sat, Feb 26, 2022 at 3:06 PM Gabor Somogyi 
wrote:

> Hi Karan,
>
> Plz have a look at the stackoverflow comment I've had 2 days ago
>
> G
>
> On Fri, 25 Feb 2022, 23:31 karan alang,  wrote:
>
>> Hello All,
>> I'm running a StructuredStreaming program on GCP Dataproc, which reads
>> data from Kafka, does some processing and puts processed data back into
>> Kafka. The program was running fine, when I killed it (to make minor
>> changes), and then re-started it.
>>
>> It is giving me the error - pyspark.sql.utils.StreamingQueryException:
>> batch 44 doesn't exist
>>
>> Here is the error:
>>
>> 22/02/25 22:14:08 ERROR 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution: Query [id = 
>> 0b73937f-1140-40fe-b201-cf4b7443b599, runId = 
>> 43c9242d-993a-4b9a-be2b-04d8c9e05b06] terminated with error
>> java.lang.IllegalStateException: batch 44 doesn't exist
>> at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$populateStartOffsets$1(MicroBatchExecution.scala:286)
>> at scala.Option.getOrElse(Option.scala:189)
>> at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.populateStartOffsets(MicroBatchExecution.scala:286)
>> at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:197)
>> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>> at 
>> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
>> at 
>> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
>> at 
>> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
>> at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
>> at 
>> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
>> at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
>> at 
>> org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334)
>> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>> at 
>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
>> at 
>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
>> Traceback (most recent call last):
>>   File 
>> "/tmp/0149aedd804c42f288718e013fb16f9c/StructuredStreaming_GCP_Versa_Sase_gcloud.py",
>>  line 609, in 
>> query.awaitTermination()
>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", 
>> line 101, in awaitTermination
>>   File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", 
>> line 1304, in __call__
>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 
>> 117, in deco
>> pyspark.sql.utils.StreamingQueryException: batch 44 doesn't exist
>>
>>
>> Question - what is the cause of this error and how to debug/fix ? Also, I
>> notice that the checkpoint location gets corrupted occasionally, when I do
>> multiple restarts. After checkpoint corruption, it does not return any
>> records
>>
>> For the above issue(as well as when the checkpoint was corrupted), when i
>> cleared the checkpoint location and re-started the program, it went trhough
>> fine.
>>
>> Pls note: while doing readStream, i've enabled failOnDataLoss=false
>>
>> Additional details are in stackoverflow :
>>
>>
>> https://stackoverflow.com/questions/71272328/structuredstreaming-error-pyspark-sql-utils-streamingqueryexception-batch-44
>>
>> any input on this ?
>>
>> tia!
>>
>>
>>


StructuredStreaming error - pyspark.sql.utils.StreamingQueryException: batch 44 doesn't exist

2022-02-25 Thread karan alang
Hello All,
I'm running a StructuredStreaming program on GCP Dataproc, which reads data
from Kafka, does some processing and puts processed data back into Kafka.
The program was running fine, when I killed it (to make minor changes), and
then re-started it.

It is giving me the error - pyspark.sql.utils.StreamingQueryException:
batch 44 doesn't exist

Here is the error:

22/02/25 22:14:08 ERROR
org.apache.spark.sql.execution.streaming.MicroBatchExecution: Query
[id = 0b73937f-1140-40fe-b201-cf4b7443b599, runId =
43c9242d-993a-4b9a-be2b-04d8c9e05b06] terminated with error
java.lang.IllegalStateException: batch 44 doesn't exist
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$populateStartOffsets$1(MicroBatchExecution.scala:286)
at scala.Option.getOrElse(Option.scala:189)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.populateStartOffsets(MicroBatchExecution.scala:286)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:197)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
at 
org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
Traceback (most recent call last):
  File 
"/tmp/0149aedd804c42f288718e013fb16f9c/StructuredStreaming_GCP_Versa_Sase_gcloud.py",
line 609, in 
query.awaitTermination()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py",
line 101, in awaitTermination
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
line 1304, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",
line 117, in deco
pyspark.sql.utils.StreamingQueryException: batch 44 doesn't exist


Question - what is the cause of this error and how to debug/fix ? Also, I
notice that the checkpoint location gets corrupted occasionally, when I do
multiple restarts. After checkpoint corruption, it does not return any
records

For the above issue(as well as when the checkpoint was corrupted), when i
cleared the checkpoint location and re-started the program, it went trhough
fine.

Pls note: while doing readStream, i've enabled failOnDataLoss=false

Additional details are in stackoverflow :

https://stackoverflow.com/questions/71272328/structuredstreaming-error-pyspark-sql-utils-streamingqueryexception-batch-44

any input on this ?

tia!


Structured Streaming + UDF - logic based on checking if a column is present in the Dataframe

2022-02-23 Thread karan alang
Hello All,

I'm using StructuredStreaming, and am trying to use UDF to parse each row.
Here is the requirement:

   - we can get alerts of a particular KPI with type 'major' OR 'critical'
   - for a KPI, if we get alerts of type 'major' eg _major, and we have a
   critical alert as well _critical, we need to ignore the _major alert, and
   consider _critical alert only

There are ~25 alerts which are stored in the array (AlarmKeys.alarm_all)

UDF Code (draft):

@udf(returnType=StringType())def convertStructToStr(APP_CAUSE,
tenantName, window,,__major,__major,
__critical, five__major, __critical):

res = "{window: "+ str(window) + "type: 10m, applianceName: "+
str(APP_CAUSE)+","
first = True
for curr_alarm in AlarmKeys.alarms_all:
alsplit = curr_alarm.split('__')
if len(alsplit) == 2:
# Only account for critical row if both major & critical are there
if alsplit[1] == 'major':
critical_alarm = alsplit[0] + "__critical"
if int(col(critical_alarm)) > 0:
continue
if int(col(curr_alarm)) > 0:
if first:
mystring = "{} {}({})".format(mystring,
AlarmKeys.alarm_exp[curr_alarm], str(col(curr_alarm)))
first = False
else:
mystring = "{}, {}({})".format(mystring,
AlarmKeys.alarm_exp[curr_alarm], str(col(curr_alarm)))
res+="insight: "+mystring +"}"

# structured streaming using udf, this is printing data on console#
eventually, i'll put data into Kafka instead
df.select(convertStructToStr(*df.columns)) \
.write \
.format("console") \
.option("numRows",100)\
.option("checkpointLocation",
"/Users/karanalang/PycharmProjects/Kafka/checkpoint") \
.option("outputMode", "complete")\
.save("output")

Additional Details in stackoverflow :
https://stackoverflow.com/questions/71243726/structured-streaming-udf-logic-based-on-checking-if-a-column-is-present-in-t


Question is -

Can this be done using UDF ? Since I'm passing column values to the UDF, I
have no way to check if a particular KPI of type 'critical' is available in
the dataframe ?

Any suggestions on the best way to solve this problem ?
tia!


Re: StructuredStreaming - foreach/foreachBatch

2022-02-21 Thread karan alang
Thanks, Gourav - will check out the book.

regds,
Karan Alang

On Thu, Feb 17, 2022 at 9:05 AM Gourav Sengupta 
wrote:

> Hi,
>
> The following excellent documentation may help as well:
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch
>
> The book from Dr. Zaharia on SPARK does a fantastic job in explaining the
> fundamental thinking behind these concepts.
>
>
> Regards,
> Gourav Sengupta
>
>
>
> On Wed, Feb 9, 2022 at 8:51 PM karan alang  wrote:
>
>> Thanks, Mich .. will check it out
>>
>> regds,
>> Karan Alang
>>
>> On Tue, Feb 8, 2022 at 3:06 PM Mich Talebzadeh 
>> wrote:
>>
>>> BTW you can check this Linkedin article of mine on Processing Change
>>> Data Capture with Spark Structured Streaming
>>> <https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/?trackingId=pRPrKh1iQzq4EJ%2BuiAoWGQ%3D%3D>
>>>
>>>
>>> It covers the concept of triggers including trigger(once = True) or
>>> one-time batch in Spark Structured Streaming
>>>
>>>
>>> HTH
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Mon, 7 Feb 2022 at 23:06, karan alang  wrote:
>>>
>>>> Thanks, Mich .. that worked fine!
>>>>
>>>>
>>>> On Mon, Feb 7, 2022 at 1:55 PM Mich Talebzadeh <
>>>> mich.talebza...@gmail.com> wrote:
>>>>
>>>>> read below
>>>>>
>>>>> """
>>>>>"foreach" performs custom write logic on each row and
>>>>> "foreachBatch" performs custom write logic on each micro-batch through
>>>>> SendToBigQuery function
>>>>> *foreachBatch(SendToBigQuery) expects 2 parameters,
>>>>> first: micro-batch as DataFrame or Dataset and second: unique id for each
>>>>> batch --> batchId*
>>>>>Using foreachBatch, we write each micro batch to
>>>>> storage defined in our custom logic. In this case, we store the output of
>>>>> our streaming application to Google BigQuery table.
>>>>>Note that we are appending data and column "rowkey" is
>>>>> defined as UUID so it can be used as the primary key
>>>>> """
>>>>> result = streamingDataFrame.select( \
>>>>>  col("parsed_value.rowkey").alias("rowkey") \
>>>>>, col("parsed_value.ticker").alias("ticker") \
>>>>>,
>>>>> col("parsed_value.timeissued").alias("timeissued") \
>>>>>, col("parsed_value.price").alias("price")). \
>>>>>  writeStream. \
>>>>>  outputMode('append'). \
>>>>>  option("truncate", "false"). \
>>>>>  *foreachBatch(SendToBigQuery)*. \
>>>>>  trigger(processingTime='2 seconds'). \
>>>>>  start()
>>>>>
>>>>> now you define your function *SendToBigQuery() *
>>>>>
>>>>>
>>>>> *def SendToBigQuery(df, batchId):*
>>>>>
>>>>> if(len(df.take(1))) > 0:
>>>>>
>>>>> df.printSchema()
>>>>>
>>>>> print(f"""batchId is {batchId}""")
>>>>>
>>>>> rows = df.count()
>>>>>
>>>>> print(f""" Total records processed in this run = {rows}""")
>>>>>
>>>>> ..
>>>>>
>>>>> else:
>>>>>
>>>>> print("DataFrame is empty")
>>>>>
>>>>> *HTH*
>>>>>
>>>>>
>>>>>view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, 7 Feb 2022 at 21:06, karan alang 
>>>>> wrote:
>>>>>
>>>>>> Hello All,
>>>>>>
>>>>>> I'm using StructuredStreaming to read data from Kafka, and need to do
>>>>>> transformation on each individual row.
>>>>>>
>>>>>> I'm trying to use 'foreach' (or foreachBatch), and running into
>>>>>> issues.
>>>>>> Basic question - how is the row passed to the function when foreach
>>>>>> is used ?
>>>>>>
>>>>>> Also, when I use foreachBatch, seems the BatchId is available in the
>>>>>> function called ? How do I access individual rows ?
>>>>>>
>>>>>> Details are in stackoverflow :
>>>>>>
>>>>>> https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working
>>>>>>
>>>>>> What is the best approach for this use-case ?
>>>>>>
>>>>>> tia!
>>>>>>
>>>>>


GCP Dataproc - error in importing KafkaProducer

2022-02-17 Thread karan alang
Hello All,

I've a GCP Dataproc cluster, and I'm running a Spark StructuredStreaming
job on this.
I'm trying to use KafkaProducer to push aggregated data into a Kafka
topic,  however when i import KafkaProducer
(from kafka import KafkaProducer),
it gives error

```

Traceback (most recent call last):

  File
"/tmp/7e27e272e64b461dbdc2e5083dc23202/StructuredStreaming_GCP_Versa_Sase_gcloud.py",
line 14, in 

from kafka.producer import KafkaProducer

  File "/opt/conda/default/lib/python3.8/site-packages/kafka/__init__.py",
line 23, in 

from kafka.producer import KafkaProducer

  File
"/opt/conda/default/lib/python3.8/site-packages/kafka/producer/__init__.py",
line 4, in 

from .simple import SimpleProducer

  File
"/opt/conda/default/lib/python3.8/site-packages/kafka/producer/simple.py",
line 54

return '' % self.async
```

As part of the initialization actions, i'm installing the following :
---

pip install pypi
pip install kafka-python
pip install google-cloud-storage
pip install pandas

---

Additional details in stackoverflow :
https://stackoverflow.com/questions/71169869/gcp-dataproc-getting-error-in-importing-kafkaproducer

Any ideas on what needs to be to fix this ?
tia!


writing a Dataframe (with one of the columns as struct) into Kafka

2022-02-17 Thread karan alang
Hello All,
I've a pyspark dataframe which i need to write to Kafka topic.

Structure of the DF is :

root
 |-- window: struct (nullable = true)
 ||-- start: timestamp (nullable = false)
 ||-- end: timestamp (nullable = false)
 |-- processedAlarmCnt: integer (nullable = false)
 |-- totalAlarmCnt: integer (nullable = false)

Currently, i'm looping over the rows, and adding the data in a hashmap,
and then using KafkaProducer to push data into Kafka topic.

This does not seem very efficient, since i'm looping over each row,
and using extra space as well.
What is the best way to design/code this ?

Current Code :

def writeCountToKafka(df):
   if df.count()>0:
  hm = {}
  df_pandas = df.toPandas()
  for _, row in df_pandas.iterrows():
   hm["window"] =
[datetime.timestamp(row["window"]["start"]),datetime.timestamp(row["window"]["end"])]
   hm["processedAlarmCnt"] = row["processedAlarmCnt"]
   hm["totalAlarmCnt"] = row["totalAlarmCnt"]

   # Python Kafka Producer
   kafka_producer.send(topic_count,
json.dumps(mymap).encode('utf-8'))
   kafka_producer.flush()


More details are in stackoverflow :

https://stackoverflow.com/questions/71166560/structured-streaming-writing-dataframe-into-kafka-row-by-row-dataframe-has-a

tia !


Re: SparkStructured Streaming using withWatermark - TypeError: 'module' object is not callable

2022-02-16 Thread karan alang
Hi Mich,
the issue was related to incorrect, which is resolved.

However, wrt your comment - 'OK sounds like your watermark is done outside
of your processing.'

In my use-case which primarily deals with syslogs, syslog is a string
which needs to be parsed (with defensive coding built in to ensure records
are in correct format), before it is fed to
3 different classes (AlarmProc being one of them) - where there is
additional parsing + aggregation for specific types of logs.
The way I'm handling this is by using -- foreachBatch(convertToDict) in the
writeStream method, and the parsing + aggregation happens for the
microbatch.
foreachBatch - will wait for the parsing and aggregation to complete for
the microbatch, and then proceed to do the same with the next microbatch.

Since it involves a lot of parsing + aggregation, it requires more than a
df.select() - hence the approach above is taken.
>From what I understand, the watermark is done within the processing ..
since it is done per microbatch pulled with each trigger.

Pls let me know if you have comments/suggestions on this approach.

thanks,
Karan Alang


On Wed, Feb 16, 2022 at 12:52 AM Mich Talebzadeh 
wrote:

> OK sounds like your watermark is done outside of your processing.
>
> Check this
>
> # construct a streaming dataframe streamingDataFrame that
> subscribes to topic temperature
> streamingDataFrame = self.spark \
> .readStream \
> .format("kafka") \
> .option("kafka.bootstrap.servers",
> config['MDVariables']['bootstrapServers'],) \
> .option("schema.registry.url",
> config['MDVariables']['schemaRegistryURL']) \
> .option("group.id", config['common']['appName']) \
> .option("zookeeper.connection.timeout.ms",
> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
> .option("rebalance.backoff.ms",
> config['MDVariables']['rebalanceBackoffMS']) \
> .option("zookeeper.session.timeout.ms",
> config['MDVariables']['zookeeperSessionTimeOutMs']) \
> .option("auto.commit.interval.ms",
> config['MDVariables']['autoCommitIntervalMS']) \
> .option("subscribe", "temperature") \
> .option("failOnDataLoss", "false") \
> .option("includeHeaders", "true") \
> .option("startingOffsets", "latest") \
> .load() \
> .select(from_json(col("value").cast("string"),
> schema).alias("parsed_value"))
>
>
> resultM = streamingDataFrame.select( \
>  col("parsed_value.rowkey").alias("rowkey") \
>, col("parsed_value.timestamp").alias("timestamp") \
>, col("parsed_value.temperature").alias("temperature"))
> result = resultM. \
>  withWatermark("timestamp", "5 minutes"). \
>  groupBy(window(resultM.timestamp, "5 minutes", "5
> minutes")). \
>  avg('temperature'). \
>  writeStream. \
>  outputMode('complete'). \
>  option("numRows", 1000). \
>  option("truncate", "false"). \
>  format('console'). \
>  option('checkpointLocation', checkpoint_path). \
>  queryName("temperature"). \
>  start()
>
> HTH
>
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 16 Feb 2022 at 06:37, karan alang  wrote:
>
>>
>> Hello All,
>>
>> I have a Structured Streaming pyspark program running on GCP Dataproc,
>> which reads data from Kafka, and does some data massaging, and aggregation.
>> I'm trying to use withWatermark(), and it is giving error.
>>
>> py4j.Py4JException: An exception was raised by the Python Proxy. Return
>> Message: Traceback (most recent call last):
>>
>>   Fi

SparkStructured Streaming using withWatermark - TypeError: 'module' object is not callable

2022-02-15 Thread karan alang
Hello All,

I have a Structured Streaming pyspark program running on GCP Dataproc,
which reads data from Kafka, and does some data massaging, and aggregation.
I'm trying to use withWatermark(), and it is giving error.

py4j.Py4JException: An exception was raised by the Python Proxy. Return
Message: Traceback (most recent call last):

  File
"/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line
2442, in _call_proxy

return_value = getattr(self.pool[obj_id], method)(*params)

  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line
196, in call

raise e

  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line
193, in call

self.func(DataFrame(jdf, self.sql_ctx), batch_id)

  File
"/tmp/178d0ac9c82e42a09942f7f9cdc76bb7/StructuredStreaming_GCP_Versa_Sase_gcloud.py",
line 444, in convertToDictForEachBatch

ap = Alarm(tdict, spark)

  File
"/tmp/178d0ac9c82e42a09942f7f9cdc76bb7/StructuredStreaming_GCP_Versa_Sase_gcloud.py",
line 356, in __init__

computeCount(l_alarm_df, l_alarm1_df)

  File
"/tmp/178d0ac9c82e42a09942f7f9cdc76bb7/StructuredStreaming_GCP_Versa_Sase_gcloud.py",
line 262, in computeCount

window(col("timestamp"), "10 minutes").alias("window")

TypeError: 'module' object is not callable

Details are in stackoverflow below :
https://stackoverflow.com/questions/71137296/structuredstreaming-withwatermark-typeerror-module-object-is-not-callable

Any ideas on how to debug/fix this ?
tia !


Re: Unable to access Google buckets using spark-submit

2022-02-13 Thread karan alang
Hi Gaurav, All,
I'm doing a spark-submit from my local system to a GCP Dataproc cluster ..
This is more for dev/testing.
I can run a -- 'gcloud dataproc jobs submit' command as well, which is what
will be done in Production.

Hope that clarifies.

regds,
Karan Alang


On Sat, Feb 12, 2022 at 10:31 PM Gourav Sengupta 
wrote:

> Hi,
>
> agree with Holden, have faced quite a few issues with FUSE.
>
> Also trying to understand "spark-submit from local" . Are you submitting
> your SPARK jobs from a local laptop or in local mode from a GCP dataproc /
> system?
>
> If you are submitting the job from your local laptop, there will be
> performance bottlenecks I guess based on the internet bandwidth and volume
> of data.
>
> Regards,
> Gourav
>
>
> On Sat, Feb 12, 2022 at 7:12 PM Holden Karau  wrote:
>
>> You can also put the GS access jar with your Spark jars — that’s what the
>> class not found exception is pointing you towards.
>>
>> On Fri, Feb 11, 2022 at 11:58 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> BTW I also answered you in in stackoverflow :
>>>
>>>
>>> https://stackoverflow.com/questions/71088934/unable-to-access-google-buckets-using-spark-submit
>>>
>>> HTH
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Sat, 12 Feb 2022 at 08:24, Mich Talebzadeh 
>>> wrote:
>>>
>>>> You are trying to access a Google storage bucket gs:// from your local
>>>> host.
>>>>
>>>> It does not see it because spark-submit assumes that it is a local file
>>>> system on the host which is not.
>>>>
>>>> You need to mount gs:// bucket as a local file system.
>>>>
>>>> You can use the tool called gcsfuse
>>>> https://cloud.google.com/storage/docs/gcs-fuse . Cloud Storage FUSE is
>>>> an open source FUSE <http://fuse.sourceforge.net/> adapter that allows
>>>> you to mount Cloud Storage buckets as file systems on Linux or macOS
>>>> systems. You can download gcsfuse from here
>>>> <https://github.com/GoogleCloudPlatform/gcsfuse>
>>>>
>>>>
>>>> Pretty simple.
>>>>
>>>>
>>>> It will be installed as /usr/bin/gcsfuse and you can mount it by
>>>> creating a local mount file like /mnt/gs as root and give permission to
>>>> others to use it.
>>>>
>>>>
>>>> As a normal user that needs to access gs:// bucket (not as root), use
>>>> gcsfuse to mount it. For example I am mounting a gcs bucket called
>>>> spark-jars-karan here
>>>>
>>>>
>>>> Just use the bucket name itself
>>>>
>>>>
>>>> gcsfuse spark-jars-karan /mnt/gs
>>>>
>>>>
>>>> Then you can refer to it as /mnt/gs in spark-submit from on-premise host
>>>>
>>>> spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 
>>>> --jars /mnt/gs/spark-bigquery-with-dependencies_2.12-0.23.2.jar
>>>>
>>>> HTH
>>>>
>>>>view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Sat, 12 Feb 2022 at 04:31, karan alang 
>>>> wrote:
>>>>
>>>>> Hello All,
>>>>>
>>>>> I'm trying to access gcp buckets while running spark-submit from
>>>>> local, and running into issues.
>>>>>
>>>>> I'm getting error :
>>>>> ```
>>>>>
>>>>> 22/02/11 20:06:59 WARN NativeCodeLoader: Unable to load native-hadoop 
>>>>> library for your platform... using builtin-java classes where applicable
>>>>> Exception in thread "main" 
>>>>> org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for 
>>>>> scheme "gs"
>>>>>
>>>>> ```
>>>>> I tried adding the --conf
>>>>> spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem
>>>>>
>>>>> to the spark-submit command, but getting ClassNotFoundException
>>>>>
>>>>> Details are in stackoverflow :
>>>>>
>>>>> https://stackoverflow.com/questions/71088934/unable-to-access-google-buckets-using-spark-submit
>>>>>
>>>>> Any ideas on how to fix this ?
>>>>> tia !
>>>>>
>>>>> --
>> Twitter: https://twitter.com/holdenkarau
>> Books (Learning Spark, High Performance Spark, etc.):
>> https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>
>


Re: Unable to access Google buckets using spark-submit

2022-02-13 Thread karan alang
Hi Holden,

when you mention - GS Access jar -  which jar is this ?
Can you pls clarify ?

thanks,
Karan Alang

On Sat, Feb 12, 2022 at 11:10 AM Holden Karau  wrote:

> You can also put the GS access jar with your Spark jars — that’s what the
> class not found exception is pointing you towards.
>
> On Fri, Feb 11, 2022 at 11:58 PM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> BTW I also answered you in in stackoverflow :
>>
>>
>> https://stackoverflow.com/questions/71088934/unable-to-access-google-buckets-using-spark-submit
>>
>> HTH
>>
>>
>>view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sat, 12 Feb 2022 at 08:24, Mich Talebzadeh 
>> wrote:
>>
>>> You are trying to access a Google storage bucket gs:// from your local
>>> host.
>>>
>>> It does not see it because spark-submit assumes that it is a local file
>>> system on the host which is not.
>>>
>>> You need to mount gs:// bucket as a local file system.
>>>
>>> You can use the tool called gcsfuse
>>> https://cloud.google.com/storage/docs/gcs-fuse . Cloud Storage FUSE is
>>> an open source FUSE <http://fuse.sourceforge.net/> adapter that allows
>>> you to mount Cloud Storage buckets as file systems on Linux or macOS
>>> systems. You can download gcsfuse from here
>>> <https://github.com/GoogleCloudPlatform/gcsfuse>
>>>
>>>
>>> Pretty simple.
>>>
>>>
>>> It will be installed as /usr/bin/gcsfuse and you can mount it by
>>> creating a local mount file like /mnt/gs as root and give permission to
>>> others to use it.
>>>
>>>
>>> As a normal user that needs to access gs:// bucket (not as root), use
>>> gcsfuse to mount it. For example I am mounting a gcs bucket called
>>> spark-jars-karan here
>>>
>>>
>>> Just use the bucket name itself
>>>
>>>
>>> gcsfuse spark-jars-karan /mnt/gs
>>>
>>>
>>> Then you can refer to it as /mnt/gs in spark-submit from on-premise host
>>>
>>> spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 
>>> --jars /mnt/gs/spark-bigquery-with-dependencies_2.12-0.23.2.jar
>>>
>>> HTH
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Sat, 12 Feb 2022 at 04:31, karan alang  wrote:
>>>
>>>> Hello All,
>>>>
>>>> I'm trying to access gcp buckets while running spark-submit from local,
>>>> and running into issues.
>>>>
>>>> I'm getting error :
>>>> ```
>>>>
>>>> 22/02/11 20:06:59 WARN NativeCodeLoader: Unable to load native-hadoop 
>>>> library for your platform... using builtin-java classes where applicable
>>>> Exception in thread "main" 
>>>> org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for 
>>>> scheme "gs"
>>>>
>>>> ```
>>>> I tried adding the --conf
>>>> spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem
>>>>
>>>> to the spark-submit command, but getting ClassNotFoundException
>>>>
>>>> Details are in stackoverflow :
>>>>
>>>> https://stackoverflow.com/questions/71088934/unable-to-access-google-buckets-using-spark-submit
>>>>
>>>> Any ideas on how to fix this ?
>>>> tia !
>>>>
>>>> --
> Twitter: https://twitter.com/holdenkarau
> Books (Learning Spark, High Performance Spark, etc.):
> https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>


Re: Unable to access Google buckets using spark-submit

2022-02-13 Thread karan alang
Thanks, Mich - will check this and update.

regds,
Karan Alang

On Sat, Feb 12, 2022 at 1:57 AM Mich Talebzadeh 
wrote:

> BTW I also answered you in in stackoverflow :
>
>
> https://stackoverflow.com/questions/71088934/unable-to-access-google-buckets-using-spark-submit
>
> HTH
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 12 Feb 2022 at 08:24, Mich Talebzadeh 
> wrote:
>
>> You are trying to access a Google storage bucket gs:// from your local
>> host.
>>
>> It does not see it because spark-submit assumes that it is a local file
>> system on the host which is not.
>>
>> You need to mount gs:// bucket as a local file system.
>>
>> You can use the tool called gcsfuse
>> https://cloud.google.com/storage/docs/gcs-fuse . Cloud Storage FUSE is
>> an open source FUSE <http://fuse.sourceforge.net/> adapter that allows
>> you to mount Cloud Storage buckets as file systems on Linux or macOS
>> systems. You can download gcsfuse from here
>> <https://github.com/GoogleCloudPlatform/gcsfuse>
>>
>>
>> Pretty simple.
>>
>>
>> It will be installed as /usr/bin/gcsfuse and you can mount it by creating
>> a local mount file like /mnt/gs as root and give permission to others to
>> use it.
>>
>>
>> As a normal user that needs to access gs:// bucket (not as root), use
>> gcsfuse to mount it. For example I am mounting a gcs bucket called
>> spark-jars-karan here
>>
>>
>> Just use the bucket name itself
>>
>>
>> gcsfuse spark-jars-karan /mnt/gs
>>
>>
>> Then you can refer to it as /mnt/gs in spark-submit from on-premise host
>>
>> spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 
>> --jars /mnt/gs/spark-bigquery-with-dependencies_2.12-0.23.2.jar
>>
>> HTH
>>
>>view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sat, 12 Feb 2022 at 04:31, karan alang  wrote:
>>
>>> Hello All,
>>>
>>> I'm trying to access gcp buckets while running spark-submit from local,
>>> and running into issues.
>>>
>>> I'm getting error :
>>> ```
>>>
>>> 22/02/11 20:06:59 WARN NativeCodeLoader: Unable to load native-hadoop 
>>> library for your platform... using builtin-java classes where applicable
>>> Exception in thread "main" 
>>> org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for 
>>> scheme "gs"
>>>
>>> ```
>>> I tried adding the --conf
>>> spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem
>>>
>>> to the spark-submit command, but getting ClassNotFoundException
>>>
>>> Details are in stackoverflow :
>>>
>>> https://stackoverflow.com/questions/71088934/unable-to-access-google-buckets-using-spark-submit
>>>
>>> Any ideas on how to fix this ?
>>> tia !
>>>
>>>


Unable to access Google buckets using spark-submit

2022-02-11 Thread karan alang
Hello All,

I'm trying to access gcp buckets while running spark-submit from local, and
running into issues.

I'm getting error :
```

22/02/11 20:06:59 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where
applicable
Exception in thread "main"
org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for
scheme "gs"

```
I tried adding the --conf
spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem

to the spark-submit command, but getting ClassNotFoundException

Details are in stackoverflow :
https://stackoverflow.com/questions/71088934/unable-to-access-google-buckets-using-spark-submit

Any ideas on how to fix this ?
tia !


Re: StructuredStreaming - foreach/foreachBatch

2022-02-09 Thread karan alang
Thanks, Mich .. will check it out

regds,
Karan Alang

On Tue, Feb 8, 2022 at 3:06 PM Mich Talebzadeh 
wrote:

> BTW you can check this Linkedin article of mine on Processing Change Data
> Capture with Spark Structured Streaming
> <https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/?trackingId=pRPrKh1iQzq4EJ%2BuiAoWGQ%3D%3D>
>
>
> It covers the concept of triggers including trigger(once = True) or
> one-time batch in Spark Structured Streaming
>
>
> HTH
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 7 Feb 2022 at 23:06, karan alang  wrote:
>
>> Thanks, Mich .. that worked fine!
>>
>>
>> On Mon, Feb 7, 2022 at 1:55 PM Mich Talebzadeh 
>> wrote:
>>
>>> read below
>>>
>>> """
>>>"foreach" performs custom write logic on each row and
>>> "foreachBatch" performs custom write logic on each micro-batch through
>>> SendToBigQuery function
>>> *foreachBatch(SendToBigQuery) expects 2 parameters,
>>> first: micro-batch as DataFrame or Dataset and second: unique id for each
>>> batch --> batchId*
>>>Using foreachBatch, we write each micro batch to storage
>>> defined in our custom logic. In this case, we store the output of our
>>> streaming application to Google BigQuery table.
>>>Note that we are appending data and column "rowkey" is
>>> defined as UUID so it can be used as the primary key
>>> """
>>> result = streamingDataFrame.select( \
>>>  col("parsed_value.rowkey").alias("rowkey") \
>>>, col("parsed_value.ticker").alias("ticker") \
>>>, col("parsed_value.timeissued").alias("timeissued") \
>>>, col("parsed_value.price").alias("price")). \
>>>  writeStream. \
>>>  outputMode('append'). \
>>>  option("truncate", "false"). \
>>>  *foreachBatch(SendToBigQuery)*. \
>>>  trigger(processingTime='2 seconds'). \
>>>  start()
>>>
>>> now you define your function *SendToBigQuery() *
>>>
>>>
>>> *def SendToBigQuery(df, batchId):*
>>>
>>> if(len(df.take(1))) > 0:
>>>
>>> df.printSchema()
>>>
>>> print(f"""batchId is {batchId}""")
>>>
>>> rows = df.count()
>>>
>>> print(f""" Total records processed in this run = {rows}""")
>>>
>>> ..
>>>
>>> else:
>>>
>>> print("DataFrame is empty")
>>>
>>> *HTH*
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Mon, 7 Feb 2022 at 21:06, karan alang  wrote:
>>>
>>>> Hello All,
>>>>
>>>> I'm using StructuredStreaming to read data from Kafka, and need to do
>>>> transformation on each individual row.
>>>>
>>>> I'm trying to use 'foreach' (or foreachBatch), and running into issues.
>>>> Basic question - how is the row passed to the function when foreach is
>>>> used ?
>>>>
>>>> Also, when I use foreachBatch, seems the BatchId is available in the
>>>> function called ? How do I access individual rows ?
>>>>
>>>> Details are in stackoverflow :
>>>>
>>>> https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working
>>>>
>>>> What is the best approach for this use-case ?
>>>>
>>>> tia!
>>>>
>>>


Re: StructuredStreaming - foreach/foreachBatch

2022-02-07 Thread karan alang
Thanks, Mich .. that worked fine!


On Mon, Feb 7, 2022 at 1:55 PM Mich Talebzadeh 
wrote:

> read below
>
> """
>"foreach" performs custom write logic on each row and
> "foreachBatch" performs custom write logic on each micro-batch through
> SendToBigQuery function
> *foreachBatch(SendToBigQuery) expects 2 parameters,
> first: micro-batch as DataFrame or Dataset and second: unique id for each
> batch --> batchId*
>Using foreachBatch, we write each micro batch to storage
> defined in our custom logic. In this case, we store the output of our
> streaming application to Google BigQuery table.
>Note that we are appending data and column "rowkey" is
> defined as UUID so it can be used as the primary key
> """
> result = streamingDataFrame.select( \
>  col("parsed_value.rowkey").alias("rowkey") \
>, col("parsed_value.ticker").alias("ticker") \
>, col("parsed_value.timeissued").alias("timeissued") \
>, col("parsed_value.price").alias("price")). \
>  writeStream. \
>  outputMode('append'). \
>  option("truncate", "false"). \
>  *foreachBatch(SendToBigQuery)*. \
>  trigger(processingTime='2 seconds'). \
>  start()
>
> now you define your function *SendToBigQuery() *
>
>
> *def SendToBigQuery(df, batchId):*
>
> if(len(df.take(1))) > 0:
>
> df.printSchema()
>
> print(f"""batchId is {batchId}""")
>
> rows = df.count()
>
> print(f""" Total records processed in this run = {rows}""")
>
> ..
>
> else:
>
> print("DataFrame is empty")
>
> *HTH*
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 7 Feb 2022 at 21:06, karan alang  wrote:
>
>> Hello All,
>>
>> I'm using StructuredStreaming to read data from Kafka, and need to do
>> transformation on each individual row.
>>
>> I'm trying to use 'foreach' (or foreachBatch), and running into issues.
>> Basic question - how is the row passed to the function when foreach is
>> used ?
>>
>> Also, when I use foreachBatch, seems the BatchId is available in the
>> function called ? How do I access individual rows ?
>>
>> Details are in stackoverflow :
>>
>> https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working
>>
>> What is the best approach for this use-case ?
>>
>> tia!
>>
>


StructuredStreaming - foreach/foreachBatch

2022-02-07 Thread karan alang
Hello All,

I'm using StructuredStreaming to read data from Kafka, and need to do
transformation on each individual row.

I'm trying to use 'foreach' (or foreachBatch), and running into issues.
Basic question - how is the row passed to the function when foreach is used
?

Also, when I use foreachBatch, seems the BatchId is available in the
function called ? How do I access individual rows ?

Details are in stackoverflow :
https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working

What is the best approach for this use-case ?

tia!


Re: GCP Dataproc - Failed to construct kafka consumer, Failed to load SSL keystore dataproc-versa-sase-p12-1.jks of type JKS

2022-02-02 Thread karan alang
re-checking to see if there is any suggestion on this issue.



On Wed, Feb 2, 2022 at 3:36 PM karan alang  wrote:

> Hello All,
>
> I'm trying to run a Structured Streaming program on GCP Dataproc, which
> accesses the data from Kafka and prints it.
>
> Access to Kafka is using SSL, and the truststore and keystore files are
> stored in buckets. I'm using Google Storage API to access the bucket, and
> store the file in the current working directory. The truststore and
> keystores are passed onto the Kafka Consumer/Producer. However - i'm
> getting an error
> Failed to construct kafka consumer, Failed to load SSL keystore
> dataproc-versa-sase-p12-1.jks of type JKS
>
> Details in stackoverflow -
> https://stackoverflow.com/questions/70964198/gcp-dataproc-failed-to-construct-kafka-consumer-failed-to-load-ssl-keystore-d
>
> From my local m/c, the same code is working fine .. though i'm using PKCS
> format truststore/keystore, on Dataproc - it is expecting JKS format files.
>
> Any ideas on how to debug/fix this ?
>
> tia!
>
>


GCP Dataproc - Failed to construct kafka consumer, Failed to load SSL keystore dataproc-versa-sase-p12-1.jks of type JKS

2022-02-02 Thread karan alang
Hello All,

I'm trying to run a Structured Streaming program on GCP Dataproc, which
accesses the data from Kafka and prints it.

Access to Kafka is using SSL, and the truststore and keystore files are
stored in buckets. I'm using Google Storage API to access the bucket, and
store the file in the current working directory. The truststore and
keystores are passed onto the Kafka Consumer/Producer. However - i'm
getting an error
Failed to construct kafka consumer, Failed to load SSL keystore
dataproc-versa-sase-p12-1.jks of type JKS

Details in stackoverflow -
https://stackoverflow.com/questions/70964198/gcp-dataproc-failed-to-construct-kafka-consumer-failed-to-load-ssl-keystore-d

>From my local m/c, the same code is working fine .. though i'm using PKCS
format truststore/keystore, on Dataproc - it is expecting JKS format files.

Any ideas on how to debug/fix this ?

tia!


Re: Structured Streaming on GCP Dataproc - java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArraySerializer

2022-02-02 Thread karan alang
Hi Mitch, All -

thnx, i was able to resolve this using the command below  :

---
gcloud dataproc jobs submit pyspark
/Users/karanalang/Documents/Technology/gcp/DataProc/StructuredStreaming_Kafka_GCP-Batch-feb2.py
 --cluster dataproc-ss-poc  --properties
spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2
--region us-central1



On Wed, Feb 2, 2022 at 1:25 AM Mich Talebzadeh 
wrote:

> The current Spark version on GCP is 3.1.2.
>
> Try using this jar file instead
>
> spark-sql-kafka-0-10_2.12-3.0.1.jar
>
>
> HTH
>
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 2 Feb 2022 at 06:51, karan alang  wrote:
>
>> Hello All,
>>
>> I'm running a simple Structured Streaming on GCP, which reads data from
>> Kafka and prints onto console.
>>
>> Command :
>>
>> cloud dataproc jobs submit pyspark 
>> /Users/karanalang/Documents/Technology/gcp/DataProc/StructuredStreaming_Kafka_GCP-Batch-feb1.py
>> --cluster dataproc-ss-poc  --jars
>> gs://spark-jars-karan/spark-sql-kafka-0-10_2.12-3.1.2.jar
>> gs://spark-jars-karan/spark-core_2.12-3.1.2.jar --region us-central1
>>
>> I'm getting error :
>>
>> File
>> "/tmp/01c16a55009a42a0a29da6dde9aae4d5/StructuredStreaming_Kafka_GCP-Batch-feb1.py",
>> line 49, in 
>>
>> df = spark.read.format('kafka')\
>>
>>   File
>> "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line
>> 210, in load
>>
>>   File
>> "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line
>> 1304, in __call__
>>
>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",
>> line 111, in deco
>>
>>   File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py",
>> line 326, in get_return_value
>>
>> py4j.protocol.Py4JJavaError: An error occurred while calling o69.load.
>>
>> : java.lang.NoClassDefFoundError:
>> org/apache/kafka/common/serialization/ByteArraySerializer
>>
>> at
>> org.apache.spark.sql.kafka010.KafkaSourceProvider$.(KafkaSourceProvider.scala:599)
>>
>> at
>> org.apache.spark.sql.kafka010.KafkaSourceProvider$.(KafkaSourceProvider.scala)
>>
>> at org.apache.spark.sql.kafka010.KafkaSourceProvider.org
>> $apache$spark$sql$kafka010$KafkaSourceProvider$$validateBatchOptions(KafkaSourceProvider.scala:348)
>>
>> at
>> org.apache.spark.sql.kafka010.KafkaSourceProvider.createRelation(KafkaSourceProvider.scala:128)
>>
>> at
>> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:355)
>>
>> at
>> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:325)
>>
>> at
>> org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:307)
>>
>> at scala.Option.getOrElse(Option.scala:189)
>>
>> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:307)
>>
>> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:225)
>>
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>
>> at java.lang.reflect.Method.invoke(Method.java:498)
>>
>> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>>
>> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>>
>> at py4j.Gateway.invoke(Gateway.java:282)
>>
>> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>>
>> at py4j.commands.CallCommand.execute(CallCommand.java:79)
>>
>> at py4j.GatewayConnection.run(GatewayConnection.java:238)
>>
>> at java.lang.Thread.run(Thread.java:748)
>>
>> Caused by: java.lang.ClassNotFoundException:
>> org.apache.kafka.common.serialization.ByteArraySerializer
>>
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
>>
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>>
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>>
>> Additional details are in stackoverflow -
>>
>>
>> https://stackoverflow.com/questions/70951195/gcp-dataproc-java-lang-noclassdeffounderror-org-apache-kafka-common-serializa
>>
>> Do we need to pass any other jar ?
>> What needs to be done to debug/fix this ?
>>
>> tia !
>>
>>
>>


Structured Streaming on GCP Dataproc - java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArraySerializer

2022-02-01 Thread karan alang
Hello All,

I'm running a simple Structured Streaming on GCP, which reads data from
Kafka and prints onto console.

Command :

cloud dataproc jobs submit pyspark
/Users/karanalang/Documents/Technology/gcp/DataProc/StructuredStreaming_Kafka_GCP-Batch-feb1.py
--cluster dataproc-ss-poc  --jars
gs://spark-jars-karan/spark-sql-kafka-0-10_2.12-3.1.2.jar
gs://spark-jars-karan/spark-core_2.12-3.1.2.jar --region us-central1

I'm getting error :

File
"/tmp/01c16a55009a42a0a29da6dde9aae4d5/StructuredStreaming_Kafka_GCP-Batch-feb1.py",
line 49, in 

df = spark.read.format('kafka')\

  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py",
line 210, in load

  File
"/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line
1304, in __call__

  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line
111, in deco

  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py",
line 326, in get_return_value

py4j.protocol.Py4JJavaError: An error occurred while calling o69.load.

: java.lang.NoClassDefFoundError:
org/apache/kafka/common/serialization/ByteArraySerializer

at
org.apache.spark.sql.kafka010.KafkaSourceProvider$.(KafkaSourceProvider.scala:599)

at
org.apache.spark.sql.kafka010.KafkaSourceProvider$.(KafkaSourceProvider.scala)

at org.apache.spark.sql.kafka010.KafkaSourceProvider.org
$apache$spark$sql$kafka010$KafkaSourceProvider$$validateBatchOptions(KafkaSourceProvider.scala:348)

at
org.apache.spark.sql.kafka010.KafkaSourceProvider.createRelation(KafkaSourceProvider.scala:128)

at
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:355)

at
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:325)

at
org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:307)

at scala.Option.getOrElse(Option.scala:189)

at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:307)

at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:225)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)

at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)

at py4j.Gateway.invoke(Gateway.java:282)

at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

at py4j.commands.CallCommand.execute(CallCommand.java:79)

at py4j.GatewayConnection.run(GatewayConnection.java:238)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.ClassNotFoundException:
org.apache.kafka.common.serialization.ByteArraySerializer

at java.net.URLClassLoader.findClass(URLClassLoader.java:387)

at java.lang.ClassLoader.loadClass(ClassLoader.java:418)

at java.lang.ClassLoader.loadClass(ClassLoader.java:351)

Additional details are in stackoverflow -

https://stackoverflow.com/questions/70951195/gcp-dataproc-java-lang-noclassdeffounderror-org-apache-kafka-common-serializa

Do we need to pass any other jar ?
What needs to be done to debug/fix this ?

tia !


Re: Structured Streaming - not showing records on console

2022-02-01 Thread karan alang
Hi Mich,

thnx, seems 'complete' mode is supported only if there are streaming
aggregations.
I get this error on changing the output mode.

pyspark.sql.utils.AnalysisException: Complete output mode not supported
when there are no streaming aggregations on streaming DataFrames/Datasets;

Project [value#8, topic#9, partition#10, timestamp#12]

On Tue, Feb 1, 2022 at 4:05 PM Mich Talebzadeh 
wrote:

> hm.
>
> I am trying to recall if I am correct  so you should try
> outpudeMode('complete') with format('console')
>
> result = resultMF. \
>  writeStream. \
>  outputMode('complete'). \
>  option("numRows", 1000). \
>  option("truncate", "false"). \
>  format('console'). \
>  option('checkpointLocation', checkpoint_path). \
>  queryName("temperature"). \
>  start()
>
> On another example I have
>
>result = streamingDataFrame.select( \
>  col("parsed_value.rowkey").alias("rowkey") \
>, col("parsed_value.timeissued").alias("timeissued") \
>,
> col("parsed_value.temperature").alias("temperature")). \
>  writeStream. \
>  outputMode('append'). \
>  option("truncate", "false"). \
>  foreachBatch(temperatures). \
>  trigger(processingTime='60 seconds'). \
>  option('checkpointLocation', checkpoint_path). \
>  queryName("temperature"). \
>  start()
>
> def temperatures(df, batchId):
> if(len(df.take(1))) > 0:
> df.show(100,False)
> df. persist()
> AvgTemp =
> df.select(round(F.avg(col("temperature".collect()[0][0]
> df.unpersist()
> now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
> print(f"""Average temperature at {now} from batchId {batchId} is
> {AvgTemp} degrees""")
> else:
> print("DataFrame s empty")
>
> HTH
>
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 1 Feb 2022 at 23:45, karan alang  wrote:
>
>> Hello Spark Experts,
>>
>> I've a simple Structured Streaming program, which reads data from Kafka,
>> and writes on the console. This is working in batch mode (i.e spark.read or
>> df.write), not not working in streaming mode.
>>
>> Details are in the stackoverflow
>>
>>
>> https://stackoverflow.com/questions/70948967/structured-streaming-not-writing-records-to-console-when-using-writestream-ba
>>
>> Any inputs on how to fix/debug this ?
>> tia !
>>
>


Structured Streaming - not showing records on console

2022-02-01 Thread karan alang
Hello Spark Experts,

I've a simple Structured Streaming program, which reads data from Kafka,
and writes on the console. This is working in batch mode (i.e spark.read or
df.write), not not working in streaming mode.

Details are in the stackoverflow

https://stackoverflow.com/questions/70948967/structured-streaming-not-writing-records-to-console-when-using-writestream-ba

Any inputs on how to fix/debug this ?
tia !


Databricks notebook - cluster taking a long time to get created, often timing out

2021-08-17 Thread karan alang
Hello - i've been using the Databricks notebook(for pyspark or scala/spark
development), and recently have had issues wherein the cluster creation
takes a long time to get created, often timing out.

Any ideas on how to resolve this ?
Any other alternatives to databricks notebook ?


spark-submit not running on macbook pro

2021-08-16 Thread karan alang
Hello Experts,

i'm trying to run spark-submit on my macbook pro(commandline or using
PyCharm), and it seems to be giving error ->

Exception: Java gateway process exited before sending its port number

i've tried setting values to variable in the program (based on the
recommendations by people on the internet), but the problem still remains.

Any pointers on how to resolve this issue?

# explicitly setting environment variables
os.environ["JAVA_HOME"] =
"/Library/Java/JavaVirtualMachines/applejdk-11.0.7.10.1.jdk/Contents/Home"
os.environ["PYTHONPATH"] =
"/usr/local/Cellar/apache-spark/3.1.2/libexec//python/lib/py4j-0.10.4-src.zip:/usr/local/Cellar/apache-spark/3.1.2/libexec//python/:"
os.environ["PYSPARK_SUBMIT_ARGS"]="--master local[2] pyspark-shell"

Traceback (most recent call last):
  File "", line 1, in 
  File "/Applications/PyCharm
CE.app/Contents/plugins/python-ce/helpers/pydev/_pydev_bundle/pydev_umd.py",
line 198, in runfile
pydev_imports.execfile(filename, global_vars, local_vars)  # execute
the script
  File "/Applications/PyCharm
CE.app/Contents/plugins/python-ce/helpers/pydev/_pydev_imps/_pydev_execfile.py",
line 18, in execfile
exec(compile(contents+"\n", file, 'exec'), glob, loc)
  File
"/Users/karanalang/Documents/Technology/StructuredStreamin_Udemy/Spark-Streaming-In-Python-master/00-HelloSparkSQL/HelloSparkSQL.py",
line 12, in 
spark = SparkSession.builder.master("local[*]").getOrCreate()
  File
"/Users/karanalang/.conda/envs/PythonLeetcode/lib/python3.9/site-packages/pyspark/sql/session.py",
line 228, in getOrCreate
sc = SparkContext.getOrCreate(sparkConf)
  File
"/Users/karanalang/.conda/envs/PythonLeetcode/lib/python3.9/site-packages/pyspark/context.py",
line 384, in getOrCreate
SparkContext(conf=conf or SparkConf())
  File
"/Users/karanalang/.conda/envs/PythonLeetcode/lib/python3.9/site-packages/pyspark/context.py",
line 144, in __init__
SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
  File
"/Users/karanalang/.conda/envs/PythonLeetcode/lib/python3.9/site-packages/pyspark/context.py",
line 331, in _ensure_initialized
SparkContext._gateway = gateway or launch_gateway(conf)
  File
"/Users/karanalang/.conda/envs/PythonLeetcode/lib/python3.9/site-packages/pyspark/java_gateway.py",
line 108, in launch_gateway
raise Exception("Java gateway process exited before sending its port
number")
Exception: Java gateway process exited before sending its port number


pyspark on pycharm error

2019-05-08 Thread karan alang
Hello - anyone has any ideas on this PySpark/PyCharm error in SO, pls. let
me know.


https://stackoverflow.com/questions/56028402/java-util-nosuchelementexception-key-not-found-pyspark-driver-callback-host



essentially, on pyspark on PyCharm, i get the following error -

java.util.NoSuchElementException: key not found:
_PYSPARK_DRIVER_CALLBACK_HOSTException: Java gateway process
exited before sending its port number


Jupyter Notebook (Scala, kernel - Apache Toree) with Vegas, Graph not showing data

2019-02-01 Thread karan alang
Anybody used Vegas-viz for charting/graphics on Jupyter Spark/Scala
notebook ?

I'm looking for input on the following issue -

https://stackoverflow.com/questions/54473744/jupyter-notebook-scala-kernel-apache-toree-with-vegas-graph-not-showing-da

Pls. let me know.

thanks!


java vs scala for Apache Spark - is there a performance difference ?

2018-10-26 Thread karan alang
Hello
- is there a "performance" difference when using Java or Scala for Apache
Spark ?

I understand, there are other obvious differences (less code with scala,
easier to focus on logic etc),
but wrt performance - i think there would not be much of a difference since
both of them are JVM based,
pls. let me know if this is not the case.

thanks!


Re: Error - Dropping SparkListenerEvent because no remaining room in event queue

2018-10-24 Thread karan alang
Pls note - Spark version is 2.2.0

On Wed, Oct 24, 2018 at 3:57 PM karan alang  wrote:

> Hello -
> we are running a Spark job, and getting the following error -
>
> "LiveListenerBus: Dropping SparkListenerEvent because no remaining room in
> event queue"
>
> As per the recommendation in the Spark Docs -
>
> I've increased the value of property
> spark.scheduler.listenerbus.eventqueue.capacity to 9 (from the default
> 1)
> and also increased the Diver memory
>
> That seems to have mitigated the issue.
>
> The question is - is there is any Code optimization (or any other) that
> can be done to resolve this problem ?
> Pls note - we are primarily using functions like - reduce(),
> collectAsList() and persist() as part of the job.
>


Error - Dropping SparkListenerEvent because no remaining room in event queue

2018-10-24 Thread karan alang
Hello -
we are running a Spark job, and getting the following error -

"LiveListenerBus: Dropping SparkListenerEvent because no remaining room in
event queue"

As per the recommendation in the Spark Docs -

I've increased the value of property
spark.scheduler.listenerbus.eventqueue.capacity to 9 (from the default
1)
and also increased the Diver memory

That seems to have mitigated the issue.

The question is - is there is any Code optimization (or any other) that can
be done to resolve this problem ?
Pls note - we are primarily using functions like - reduce(),
collectAsList() and persist() as part of the job.


modeling timestamp in Avro messages (read using Spark Structured Streaming)

2018-07-29 Thread karan alang
i've a questing regarding modeling timestamp column in Avro messages
The options are
- ISO 8601 "String" (UTC Time)
- "int" 32bit signed UNIX Epoch time
- Long (modeled as Logica datatype - timestamp in schema)
what would be the best way to model the timestamp ?

fyi. we are using Apache Spark(Structured Streaming) to read these messages
from kafka topics.


scala question (in spark project)- not able to call getClassSchema method in avro generated class

2018-02-24 Thread karan alang
i’ve an Avro generated class - com.avro.Person which has a method ->
getClassSchema

I’m passing className to a method, and in the method - i need to get the
Avro schema .  Here is the code i'm trying to use -

val pr = Class.forName(productCls)  //where productCls =
classOf[Product].getName

How do i call the method - getClassSchema of this class ?

Pls. let me know.

thanks!


not able to read git info from Scala Test Suite

2018-02-13 Thread karan alang
Hello - I'm writing a scala unittest for my Spark project
which checks the git information, and somehow it is not working from the
Unit Test

Added in pom.xml
--



pl.project13.maven
git-commit-id-plugin
2.2.4


get-the-git-infos

revision




{g...@github.com}/test.git
flat
true
true
true

true

true






folder structures :

{project_dir}/module/pom.xml
{project_dir}/module/src/main/scala/BuildInfo.scala
  (i'm able to read the git info from this file)
 {project_dir}/module_folder/test/main/scala/BuildInfoSuite.scala
  (i'm NOT able to read the git info from this file)


Any ideas on what i need to do to get this working ?


spark-shell not getting launched - Queue's AM resource limit exceeded.

2017-08-06 Thread karan alang
Hello - i've HDP 2.5.x and i'm trying to launch spark-shell ..
ApplicationMaster gets launched, but YARN is not able to assign containers.

*Command ->*

./bin/spark-shell --master yarn-client --driver-memory 512m
--executor-memory 512m

*Error ->*

[Sun Aug 06 19:33:29 + 2017] Application is added to the scheduler and
is not yet activated. Queue's AM resource limit exceeded. Details : AM
Partition = ; AM Resource Request = ; Queue Resource Limit for AM = ; User AM
Resource Limit of the queue = ; Queue AM Resource
Usage = ;

Any ideas on what parameters to change ?

Pls note -> In YARN, the parameter -
*yarn.scheduler.capacity.maximum-am-resource-percent
= 0.9* , AM should have access sufficient to assign container


Re: spark-shell - modes

2017-08-06 Thread karan alang
update - seems 'spark-shell' does not support mode -> yarn-cluster (i guess
since it is an interactive shell)

The only modes supported include -> yarn-client & local

Pls let me know if my understanding is incorrect.
Thanks!


On Sun, Aug 6, 2017 at 10:07 AM, karan alang <karan.al...@gmail.com> wrote:

> Hello all - i'd a basic question on the modes in which spark-shell can be
> run ..
>
> when i run the following command,
> does Spark run in local mode i.e. outside of YARN & using the local cores ?
> (since '--master' option is missing)
>
> ./bin/spark-shell --driver-memory 512m --executor-memory 512m
>
> Similarly, when i run the following -
>
> 1) ./bin/spark-shell --master yarn-client --driver-memory 512m
> --executor-memory 512m
>
>- Spark is run in Client mode & resources managed by YARN.
>
> 2) ./bin/spark-shell --master yarn-cluster --driver-memory 512m
> --executor-memory 512m
>
> - Spark is run in Cluster mode & resources managed by YARN.
>
>


spark-shell - modes

2017-08-06 Thread karan alang
Hello all - i'd a basic question on the modes in which spark-shell can be
run ..

when i run the following command,
does Spark run in local mode i.e. outside of YARN & using the local cores ?
(since '--master' option is missing)

./bin/spark-shell --driver-memory 512m --executor-memory 512m

Similarly, when i run the following -

1) ./bin/spark-shell --master yarn-client --driver-memory 512m
--executor-memory 512m

   - Spark is run in Client mode & resources managed by YARN.

2) ./bin/spark-shell --master yarn-cluster --driver-memory 512m
--executor-memory 512m

- Spark is run in Cluster mode & resources managed by YARN.


Re: error in running StructuredStreaming-Kafka integration code (Spark 2.x & Kafka 10)

2017-07-10 Thread karan alang
Actually, i've 2 versions of Kafka (0.9 & 0.10) ..
btw, i was able to resolve the issue ..

sbt by default considers src/main/scala as default source location,
I'd changed the location to a different one.

I changed the build.sbt to point to the required location, that fixed the
issue

regds,
Karan Alang

On Mon, Jul 10, 2017 at 11:48 AM, David Newberger <da...@phdata.io> wrote:

> Karen,
>
> It looks like the Kafka version is incorrect. You mention Kafka 0.10
> however the classpath references Kafka 0.9
>
> Thanks,
>
> David
>
> On July 10, 2017 at 1:44:06 PM, karan alang (karan.al...@gmail.com) wrote:
>
> Hi All,
>
> I'm running Spark Streaming - Kafka integration using Spark 2.x & Kafka 10.
> & seems to be running into issues.
>
> I compiled the program using sbt, and the compilation went through fine.
> I was able able to import this into Eclipse & run the program from Eclipse.
>
> However, when i run the program using spark-submit, i'm getting the
> following error :
>
> --
>
>>  $SPARK_HOME/bin/spark-submit --class 
>> "structuredStreaming.kafka.StructuredKafkaWordCount1"
>> --master local[2] /Users/karanalang/Documents/Te
>> chnology/Coursera_spark_scala/structuredStreamingKafka/targe
>> t/scala-2.11/StructuredStreamingKafka-assembly-1.0.jar
>
>
>
>> *java.lang.ClassNotFoundException:
>> structuredStreaming.kafka.StructuredKafkaWordCount1*
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:348)
>> at org.apache.spark.util.Utils$.classForName(Utils.scala:229)
>> at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy
>> $SparkSubmit$$runMain(SparkSubmit.scala:695)
>> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit
>> .scala:187)
>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>
> --
>
> I've put the jar in the classpath, but i still get the error ->
>
> echo $CLASSPATH
>
> .:/Users/karanalang/Documents/Technology/kafka/kafka_2.11-0.
>> 9.0.1/lib/jopt-simple-3.2.jar:/Users/karanalang/Documents/Te
>> chnology/kafka/kafka_2.11-0.9.0.1/lib/kafka-clients-0.9.0.1.
>> jar:/Users/karanalang/Documents/Technology/kafka/kafka_2.11-
>> 0.9.0.1/lib/kafka_2.11-0.9.0.1.jar:/Users/karanalang/Documen
>> ts/Technology/kafka/kafka_2.11-0.9.0.1/lib/log4j-1.2.17.
>> jar:/Users/karanalang/Documents/Technology/kafka/kafka_2.11-
>> 0.9.0.1/lib/metrics-core-2.2.0.jar:/Users/karanalang/
>> Documents/Technology/kafka/kafka_2.11-0.9.0.1/lib/scala-
>> library-2.11.7.jar:/Users/karanalang/Documents/
>> Technology/kafka/kafka_2.11-0.9.0.1/lib/slf4j-api-1.7.6.jar:
>> /Users/karanalang/Documents/Technology/kafka/kafka_2.11-0.
>> 9.0.1/lib/slf4j-log4j12-1.7.6.jar:/Users/karanalang/
>> Documents/Technology/kafka/kafka_2.11-0.9.0.1/lib/snappy-
>> java-1.1.1.7.jar:/Users/karanalang/Documents/
>> Technology/kafka/kafka_2.11-0.9.0.1/lib/zkclient-0.7.jar:/
>> Users/karanalang/Documents/Technology/kafka/kafka_2.11-0.
>> 9.0.1/lib/zookeeper-3.4.6.jar:/Users/karanalang/Documents/
>> Technology/ApacheSpark-v2.1/spark-2.1.0-bin-hadoop2.7/
>> jars/*.jar:/Users/karanalang/Documents/Technology/kafka/
>> mirrormaker_topic_rename-master/target/mmchangetopic-1.
>> 0-SNAPSHOT.jar:/Users/karanalang/Documents/Technology/
>> *Coursera_spark_scala/structuredStreamingKafka/target/scala-2.11/*
>> *StructuredStreamingKafka-assembly-1.0.jar*
>
>
> When i look inside the jar - *StructuredStreamingKafka-assembly-1.0.jar,
> i don't see the file "*StructuredKafkaWordCount1.class"
>
> Attaching my build.sbt.
>
> Any ideas on what i need to do ?
>
>
>
>
>
>
>
>
>
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


error in running StructuredStreaming-Kafka integration code (Spark 2.x & Kafka 10)

2017-07-10 Thread karan alang
Hi All,

I'm running Spark Streaming - Kafka integration using Spark 2.x & Kafka 10.
& seems to be running into issues.

I compiled the program using sbt, and the compilation went through fine.
I was able able to import this into Eclipse & run the program from Eclipse.

However, when i run the program using spark-submit, i'm getting the
following error :

--

>  $SPARK_HOME/bin/spark-submit --class 
> "structuredStreaming.kafka.StructuredKafkaWordCount1"
> --master local[2] /Users/karanalang/Documents/Te
> chnology/Coursera_spark_scala/structuredStreamingKafka/targe
> t/scala-2.11/StructuredStreamingKafka-assembly-1.0.jar



> *java.lang.ClassNotFoundException:
> structuredStreaming.kafka.StructuredKafkaWordCount1*
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at org.apache.spark.util.Utils$.classForName(Utils.scala:229)
> at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy
> $SparkSubmit$$runMain(SparkSubmit.scala:695)
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>
--

I've put the jar in the classpath, but i still get the error ->

echo $CLASSPATH

.:/Users/karanalang/Documents/Technology/kafka/kafka_2.11-0.
> 9.0.1/lib/jopt-simple-3.2.jar:/Users/karanalang/Documents/Te
> chnology/kafka/kafka_2.11-0.9.0.1/lib/kafka-clients-0.9.0.1.
> jar:/Users/karanalang/Documents/Technology/kafka/kafka_2.11-
> 0.9.0.1/lib/kafka_2.11-0.9.0.1.jar:/Users/karanalang/
> Documents/Technology/kafka/kafka_2.11-0.9.0.1/lib/log4j-
> 1.2.17.jar:/Users/karanalang/Documents/Technology/kafka/
> kafka_2.11-0.9.0.1/lib/metrics-core-2.2.0.jar:/Users/karanalang/Documents/
> Technology/kafka/kafka_2.11-0.9.0.1/lib/scala-library-2.11.
> 7.jar:/Users/karanalang/Documents/Technology/kafka/
> kafka_2.11-0.9.0.1/lib/slf4j-api-1.7.6.jar:/Users/
> karanalang/Documents/Technology/kafka/kafka_2.11-0.9.0.1/
> lib/slf4j-log4j12-1.7.6.jar:/Users/karanalang/Documents/
> Technology/kafka/kafka_2.11-0.9.0.1/lib/snappy-java-1.1.1.7.
> jar:/Users/karanalang/Documents/Technology/kafka/kafka_2.11-0.9.0.1/lib/
> zkclient-0.7.jar:/Users/karanalang/Documents/Technolog
> y/kafka/kafka_2.11-0.9.0.1/lib/zookeeper-3.4.6.jar:/
> Users/karanalang/Documents/Technology/ApacheSpark-v2.1/spark
> -2.1.0-bin-hadoop2.7/jars/*.jar:/Users/karanalang/Document
> s/Technology/kafka/mirrormaker_topic_rename-master/target/
> mmchangetopic-1.0-SNAPSHOT.jar:/Users/karanalang/Documents/Technology/
> *Coursera_spark_scala/structuredStreamingKafka/target/scala-2.11/*
> *StructuredStreamingKafka-assembly-1.0.jar*


When i look inside the jar - *StructuredStreamingKafka-assembly-1.0.jar, i
don't see the file "*StructuredKafkaWordCount1.class"

Attaching my build.sbt.

Any ideas on what i need to do ?


build.sbt
Description: Binary data

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: Spark-Kafka integration - build failing with sbt

2017-06-19 Thread karan alang
Hi Cody - i do have a additional basic question ..

When i tried to compile the code in Eclipse, i was not able to do that

eg.
import org.apache.spark.streaming.kafka.KafkaUtils

gave errors saying KafaUtils was not part of the package.
However, when i used sbt to compile - the compilation went through fine

So, I assume additional libraries are being downloaded when i provide the
appropriate packages in LibraryDependencies ?
which ones would have helped compile this ?



On Sat, Jun 17, 2017 at 2:53 PM, karan alang <karan.al...@gmail.com> wrote:

> Thanks, Cody .. yes, was able to fix that.
>
> On Sat, Jun 17, 2017 at 1:18 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> There are different projects for different versions of kafka,
>> spark-streaming-kafka-0-8 and spark-streaming-kafka-0-10
>>
>> See
>>
>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html
>>
>> On Fri, Jun 16, 2017 at 6:51 PM, karan alang <karan.al...@gmail.com>
>> wrote:
>> > I'm trying to compile kafka & Spark Streaming integration code i.e.
>> reading
>> > from Kafka using Spark Streaming,
>> >   and the sbt build is failing with error -
>> >
>> >   [error] (*:update) sbt.ResolveException: unresolved dependency:
>> > org.apache.spark#spark-streaming-kafka_2.11;2.1.0: not found
>> >
>> >   Scala version -> 2.10.7
>> >   Spark Version -> 2.1.0
>> >   Kafka version -> 0.9
>> >   sbt version -> 0.13
>> >
>> > Contents of sbt files is as shown below ->
>> >
>> > 1)
>> >   vi spark_kafka_code/project/plugins.sbt
>> >
>> >   addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")
>> >
>> >  2)
>> >   vi spark_kafka_code/sparkkafka.sbt
>> >
>> > import AssemblyKeys._
>> > assemblySettings
>> >
>> > name := "SparkKafka Project"
>> >
>> > version := "1.0"
>> > scalaVersion := "2.11.7"
>> >
>> > val sparkVers = "2.1.0"
>> >
>> > // Base Spark-provided dependencies
>> > libraryDependencies ++= Seq(
>> >   "org.apache.spark" %% "spark-core" % sparkVers % "provided",
>> >   "org.apache.spark" %% "spark-streaming" % sparkVers % "provided",
>> >   "org.apache.spark" %% "spark-streaming-kafka" % sparkVers)
>> >
>> > mergeStrategy in assembly := {
>> >   case m if m.toLowerCase.endsWith("manifest.mf") =>
>> MergeStrategy.discard
>> >   case m if m.toLowerCase.startsWith("META-INF")  =>
>> MergeStrategy.discard
>> >   case "reference.conf"   =>
>> MergeStrategy.concat
>> >   case m if m.endsWith("UnusedStubClass.class")   =>
>> MergeStrategy.discard
>> >   case _ => MergeStrategy.first
>> > }
>> >
>> >   i launch sbt, and then try to create an eclipse project, complete
>> error is
>> > as shown below -
>> >
>> >   -
>> >
>> >   sbt
>> > [info] Loading global plugins from /Users/karanalang/.sbt/0.13/plugins
>> > [info] Loading project definition from
>> > /Users/karanalang/Documents/Technology/Coursera_spark_scala/
>> spark_kafka_code/project
>> > [info] Set current project to SparkKafka Project (in build
>> > file:/Users/karanalang/Documents/Technology/Coursera_spark_
>> scala/spark_kafka_code/)
>> >> eclipse
>> > [info] About to create Eclipse project files for your project(s).
>> > [info] Updating
>> > {file:/Users/karanalang/Documents/Technology/Coursera_spark_
>> scala/spark_kafka_code/}spark_kafka_code...
>> > [info] Resolving org.apache.spark#spark-streaming-kafka_2.11;2.1.0 ...
>> > [warn] module not found:
>> > org.apache.spark#spark-streaming-kafka_2.11;2.1.0
>> > [warn]  local: tried
>> > [warn]
>> > /Users/karanalang/.ivy2/local/org.apache.spark/spark-streami
>> ng-kafka_2.11/2.1.0/ivys/ivy.xml
>> > [warn]  activator-launcher-local: tried
>> > [warn]
>> > /Users/karanalang/.activator/repository/org.apache.spark/spa
>> rk-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
>> > [warn]  activator-local: tried
>> > [warn]
>> > /Users/karanalang/Documents/Technology/SCALA/activator-dist-
>> 1.3.10/repository/org.apache.spark/s

Re: Spark-Kafka integration - build failing with sbt

2017-06-17 Thread karan alang
Thanks, Cody .. yes, was able to fix that.

On Sat, Jun 17, 2017 at 1:18 PM, Cody Koeninger <c...@koeninger.org> wrote:

> There are different projects for different versions of kafka,
> spark-streaming-kafka-0-8 and spark-streaming-kafka-0-10
>
> See
>
> http://spark.apache.org/docs/latest/streaming-kafka-integration.html
>
> On Fri, Jun 16, 2017 at 6:51 PM, karan alang <karan.al...@gmail.com>
> wrote:
> > I'm trying to compile kafka & Spark Streaming integration code i.e.
> reading
> > from Kafka using Spark Streaming,
> >   and the sbt build is failing with error -
> >
> >   [error] (*:update) sbt.ResolveException: unresolved dependency:
> > org.apache.spark#spark-streaming-kafka_2.11;2.1.0: not found
> >
> >   Scala version -> 2.10.7
> >   Spark Version -> 2.1.0
> >   Kafka version -> 0.9
> >   sbt version -> 0.13
> >
> > Contents of sbt files is as shown below ->
> >
> > 1)
> >   vi spark_kafka_code/project/plugins.sbt
> >
> >   addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")
> >
> >  2)
> >   vi spark_kafka_code/sparkkafka.sbt
> >
> > import AssemblyKeys._
> > assemblySettings
> >
> > name := "SparkKafka Project"
> >
> > version := "1.0"
> > scalaVersion := "2.11.7"
> >
> > val sparkVers = "2.1.0"
> >
> > // Base Spark-provided dependencies
> > libraryDependencies ++= Seq(
> >   "org.apache.spark" %% "spark-core" % sparkVers % "provided",
> >   "org.apache.spark" %% "spark-streaming" % sparkVers % "provided",
> >   "org.apache.spark" %% "spark-streaming-kafka" % sparkVers)
> >
> > mergeStrategy in assembly := {
> >   case m if m.toLowerCase.endsWith("manifest.mf") =>
> MergeStrategy.discard
> >   case m if m.toLowerCase.startsWith("META-INF")  =>
> MergeStrategy.discard
> >   case "reference.conf"   => MergeStrategy.concat
> >   case m if m.endsWith("UnusedStubClass.class")   =>
> MergeStrategy.discard
> >   case _ => MergeStrategy.first
> > }
> >
> >   i launch sbt, and then try to create an eclipse project, complete
> error is
> > as shown below -
> >
> >   -
> >
> >   sbt
> > [info] Loading global plugins from /Users/karanalang/.sbt/0.13/plugins
> > [info] Loading project definition from
> > /Users/karanalang/Documents/Technology/Coursera_spark_
> scala/spark_kafka_code/project
> > [info] Set current project to SparkKafka Project (in build
> > file:/Users/karanalang/Documents/Technology/Coursera_
> spark_scala/spark_kafka_code/)
> >> eclipse
> > [info] About to create Eclipse project files for your project(s).
> > [info] Updating
> > {file:/Users/karanalang/Documents/Technology/Coursera_
> spark_scala/spark_kafka_code/}spark_kafka_code...
> > [info] Resolving org.apache.spark#spark-streaming-kafka_2.11;2.1.0 ...
> > [warn] module not found:
> > org.apache.spark#spark-streaming-kafka_2.11;2.1.0
> > [warn]  local: tried
> > [warn]
> > /Users/karanalang/.ivy2/local/org.apache.spark/spark-
> streaming-kafka_2.11/2.1.0/ivys/ivy.xml
> > [warn]  activator-launcher-local: tried
> > [warn]
> > /Users/karanalang/.activator/repository/org.apache.spark/
> spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
> > [warn]  activator-local: tried
> > [warn]
> > /Users/karanalang/Documents/Technology/SCALA/activator-
> dist-1.3.10/repository/org.apache.spark/spark-streaming-
> kafka_2.11/2.1.0/ivys/ivy.xml
> > [warn]  public: tried
> > [warn]
> > https://repo1.maven.org/maven2/org/apache/spark/spark-
> streaming-kafka_2.11/2.1.0/spark-streaming-kafka_2.11-2.1.0.pom
> > [warn]  typesafe-releases: tried
> > [warn]
> > http://repo.typesafe.com/typesafe/releases/org/apache/
> spark/spark-streaming-kafka_2.11/2.1.0/spark-streaming-
> kafka_2.11-2.1.0.pom
> > [warn]  typesafe-ivy-releasez: tried
> > [warn]
> > http://repo.typesafe.com/typesafe/ivy-releases/org.
> apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
> > [info] Resolving jline#jline;2.12.1 ...
> > [warn] ::
> > [warn] ::  UNRESOLVED DEPENDENCIES ::
> > [warn] ::
> > [warn] :: org.apache.spark#spark-streaming-kafka_2.11;

Spark-Kafka integration - build failing with sbt

2017-06-16 Thread karan alang
I'm trying to compile kafka & Spark Streaming integration code i.e. reading
from Kafka using Spark Streaming,
  and the sbt build is failing with error -

  [error] (*:update) sbt.ResolveException: unresolved dependency:
org.apache.spark#spark-streaming-kafka_2.11;2.1.0: not found

  Scala version -> 2.10.7
  Spark Version -> 2.1.0
  Kafka version -> 0.9
  sbt version -> 0.13

Contents of sbt files is as shown below ->

1)
  vi spark_kafka_code/project/plugins.sbt

  addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")

 2)
  vi spark_kafka_code/sparkkafka.sbt

import AssemblyKeys._
assemblySettings

name := "SparkKafka Project"

version := "1.0"
scalaVersion := "2.11.7"

val sparkVers = "2.1.0"

// Base Spark-provided dependencies
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVers % "provided",
  "org.apache.spark" %% "spark-streaming" % sparkVers % "provided",
  "org.apache.spark" %% "spark-streaming-kafka" % sparkVers)

mergeStrategy in assembly := {
  case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
  case m if m.toLowerCase.startsWith("META-INF")  => MergeStrategy.discard
  case "reference.conf"   => MergeStrategy.concat
  case m if m.endsWith("UnusedStubClass.class")   => MergeStrategy.discard
  case _ => MergeStrategy.first
}

  i launch sbt, and then try to create an eclipse project, complete error
is as shown below -

  -

  sbt
[info] Loading global plugins from /Users/karanalang/.sbt/0.13/plugins
[info] Loading project definition from
/Users/karanalang/Documents/Technology/Coursera_spark_scala/spark_kafka_code/project
[info] Set current project to SparkKafka Project (in build
file:/Users/karanalang/Documents/Technology/Coursera_spark_scala/spark_kafka_code/)
> eclipse
[info] About to create Eclipse project files for your project(s).
[info] Updating
{file:/Users/karanalang/Documents/Technology/Coursera_spark_scala/spark_kafka_code/}spark_kafka_code...
[info] Resolving org.apache.spark#spark-streaming-kafka_2.11;2.1.0 ...
[warn] module not found:
org.apache.spark#spark-streaming-kafka_2.11;2.1.0
[warn]  local: tried
[warn]
/Users/karanalang/.ivy2/local/org.apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
[warn]  activator-launcher-local: tried
[warn]
/Users/karanalang/.activator/repository/org.apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
[warn]  activator-local: tried
[warn]
/Users/karanalang/Documents/Technology/SCALA/activator-dist-1.3.10/repository/org.apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
[warn]  public: tried
[warn]
https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka_2.11/2.1.0/spark-streaming-kafka_2.11-2.1.0.pom
[warn]  typesafe-releases: tried
[warn]
http://repo.typesafe.com/typesafe/releases/org/apache/spark/spark-streaming-kafka_2.11/2.1.0/spark-streaming-kafka_2.11-2.1.0.pom
[warn]  typesafe-ivy-releasez: tried
[warn]
http://repo.typesafe.com/typesafe/ivy-releases/org.apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
[info] Resolving jline#jline;2.12.1 ...
[warn] ::
[warn] ::  UNRESOLVED DEPENDENCIES ::
[warn] ::
[warn] :: org.apache.spark#spark-streaming-kafka_2.11;2.1.0: not found
[warn] ::
[warn]
[warn] Note: Unresolved dependencies path:
[warn] org.apache.spark:spark-streaming-kafka_2.11:2.1.0
(/Users/karanalang/Documents/Technology/Coursera_spark_scala/spark_kafka_code/sparkkafka.sbt#L12-16)
[warn]   +- sparkkafka-project:sparkkafka-project_2.11:1.0
[trace] Stack trace suppressed: run last *:update for the full output.
[error] (*:update) sbt.ResolveException: unresolved dependency:
org.apache.spark#spark-streaming-kafka_2.11;2.1.0: not found
[info] Updating
{file:/Users/karanalang/Documents/Technology/Coursera_spark_scala/spark_kafka_code/}spark_kafka_code...
[info] Resolving org.apache.spark#spark-streaming-kafka_2.11;2.1.0 ...
[warn] module not found:
org.apache.spark#spark-streaming-kafka_2.11;2.1.0
[warn]  local: tried
[warn]
/Users/karanalang/.ivy2/local/org.apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
[warn]  activator-launcher-local: tried
[warn]
/Users/karanalang/.activator/repository/org.apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
[warn]  activator-local: tried
[warn]
/Users/karanalang/Documents/Technology/SCALA/activator-dist-1.3.10/repository/org.apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
[warn]  public: tried
[warn]
https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka_2.11/2.1.0/spark-streaming-kafka_2.11-2.1.0.pom
[warn]  typesafe-releases: tried
[warn]
http://repo.typesafe.com/typesafe/releases/org/apache/spark/spark-streaming-kafka_2.11/2.1.0/spark-streaming-kafka_2.11-2.1.0.pom
[warn]