Re: [Spark SQL] Structured Streaming in pyhton can connect to cassandra ?

2022-03-25 Thread Alex Ott
You don't need to use foreachBatch to write to Cassandra. You just need to
use Spark Cassandra Connector version 2.5.0 or higher - it supports native
writing of stream data into Cassandra.

Here is an announcement: 
https://www.datastax.com/blog/advanced-apache-cassandra-analytics-now-open-all

guillaume farcy  at "Mon, 21 Mar 2022 16:33:51 +0100" wrote:
 gf> Hello,

 gf> I am a student and I am currently doing a big data project.
 gf> Here is my code:
 gf> https://gist.github.com/Balykoo/262d94a7073d5a7e16dfb0d0a576b9c3

 gf> My project is to retrieve messages from a twitch chat and send them into 
kafka then spark
 gf> reads the kafka topic to perform the processing in the provided gist.

 gf> I will want to send these messages into cassandra.

 gf> I tested a first solution on line 72 which works but when there are too 
many messages
 gf> spark crashes. Probably due to the fact that my function connects to 
cassandra each time
 gf> it is called.

 gf> I tried the object approach to mutualize the connection object but without 
success:
 gf> _pickle.PicklingError: Could not serialize object: TypeError: cannot pickle
 gf> '_thread.RLock' object

 gf> Can you please tell me how to do this?
 gf> Or at least give me some advice?

 gf> Sincerely,
 gf> FARCY Guillaume.



 gf> -
 gf> To unsubscribe e-mail: user-unsubscr...@spark.apache.org



-- 
With best wishes,Alex Ott
http://alexott.net/
Twitter: alexott_en (English), alexott (Russian)

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark 3.2.0 upgrade

2022-01-22 Thread Alex Ott
Show how do you execute your code - either you didn't pack it as uberjar,
or didn't provide all necessary dependencies, if you're using `--jars`
option. You may try `-assembly` variant when submitting your application

Amit Sharma  at "Fri, 21 Jan 2022 11:17:38 -0500" wrote:
 AS> Hello, I tried using a cassandra unshaded  connector or normal connector 
both are giving the same error at runtime while
 AS> connecting to cassandra.

 AS> "com.datastax.spark" %% "spark-cassandra-connector-unshaded" % "2.4.2"

 AS> Or

 AS> "com.datastax.spark" %% "spark-cassandra-connector" % "3.1.0"

 AS> Russ similar issue is reported here also but no solution

 AS> 
https://community.datastax.com/questions/3519/issue-with-spring-boot-starter-data-cassandra-and.html

 AS> Caused by: java.lang.ClassNotFoundException: 
com.codahale.metrics.JmxReporter
 AS> at 
java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
 AS> at 
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
 AS> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)

 AS> On Thu, Jan 20, 2022 at 5:17 PM Amit Sharma  wrote:

 AS> Hello, I am trying to upgrade my project from spark 2.3.3 to spark 
3.2.0. While running the application locally I am getting
 AS> below error. 
 AS>
 AS> Could you please let me know which version of the cassandra connector 
I should use. I am using below shaded connector  but i
 AS> think that causing the issue 

 AS> "com.datastax.spark" %% "spark-cassandra-connector-unshaded" % "2.4.2"

 AS> Caused by: java.lang.ClassNotFoundException: 
com.codahale.metrics.JmxReporter
 AS> at 
java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
 AS> at 
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
 AS>         at 
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)

 AS> Thanks
 AS> 
 AS> Amit


-- 
With best wishes,Alex Ott
http://alexott.net/
Twitter: alexott_en (English), alexott (Russian)

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Unable to use WriteStream to write to delta file.

2021-12-19 Thread Alex Ott
runBatch$17(MicroBatchExecution.scala:600)
>>
>> at
>> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
>>
>> at
>> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
>>
>> at
>> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
>>
>> at
>> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>>
>> at
>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>>
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:598)
>>
>> at
>> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
>>
>> at
>> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
>>
>> at
>> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
>>
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:598)
>>
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:228)
>>
>> at
>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>>
>> at
>> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
>>
>> at
>> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
>>
>> at
>> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)Traceback
>> (most recent call last):
>>
>>   File
>> "C:\Users\agundapaneni\Development\ModernDataEstate\tests\test_mdefbasic.py",
>> line 60, in 
>>
>>
>>
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:193)
>>
>> at
>> org.apache.spark.sql.execution.streaming.OneTimeExecutor.execute(TriggerExecutor.scala:39)
>>
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187)
>>
>> at
>> org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:303)
>>
>> at
>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>>
>> at
>> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>>
>> at org.apache.spark.sql.execution.streaming.StreamExecution.org
>> $apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286)
>>
>> at
>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209)
>>
>> obj.test_ingest_incremental_data_batch1()
>>
>>   File
>> "C:\Users\agundapaneni\Development\ModernDataEstate\tests\test_mdefbasic.py",
>> line 56, in test_ingest_incremental_data_batch1
>>
>> mdef.ingest_incremental_data('example', entity,
>> self.schemas['studentattendance'], 'school_year')
>>
>>   File
>> "C:\Users\agundapaneni\Development\ModernDataEstate/src\MDEFBasic.py", line
>> 109, in ingest_incremental_data
>>
>> query.awaitTermination()   # block until query is terminated, with
>> stop() or with error; A StreamingQueryException will be thrown if an
>> exception occurs.
>>
>>   File
>> "C:\Users\agundapaneni\Development\ModernDataEstate\.tox\default\lib\site-packages\pyspark\sql\streaming.py",
>> line 101, in awaitTermination
>>
>> return self._jsq.awaitTermination()
>>
>>   File
>> "C:\Users\agundapaneni\Development\ModernDataEstate\.tox\default\lib\site-packages\py4j\java_gateway.py",
>> line 1309, in __call__
>>
>> return_value = get_return_value(
>>
>>   File
>> "C:\Users\agundapaneni\Development\ModernDataEstate\.tox\default\lib\site-packages\pyspark\sql\utils.py",
>> line 117, in deco
>>
>> raise converted from None
>>
>> pyspark.sql.utils.StreamingQueryException:
>> org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$.checkFieldNames(Lscala/collection/Seq;)V
>>
>> === Streaming Query ===
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>

-- 
With best wishes,Alex Ott
http://alexott.net/
Twitter: alexott_en (English), alexott (Russian)


Re: Spark Structured Streaming Continuous Trigger on multiple sinks

2021-09-12 Thread Alex Ott
Just don't call .awaitTermindation() because it blocks execution of the
next line of code. You can assign result of .start() to a specific
variable, or put them into list/array.

And to wait until one of the streams finishes, use
spark.streams.awaitAnyTermination() or something like this
(https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries)
 

S  at "Wed, 25 Aug 2021 14:14:48 +0530" wrote:
 S> Hello,

 S> I have a structured streaming job that needs to be able to write to 
multiple sinks. We are using Continuous Trigger and not Microbatch Trigger. 

 S> 1. When we use the foreach method using:
 S> dataset1.writeStream.foreach(kafka ForEachWriter 
logic).trigger(ContinuousMode).start().awaitTermination() 
 S> dataset1.writeStream.foreach(mongo ForEachWriter 
logic).trigger(ContinuousMode).start().awaitTermination() 
 S> The first statement blocks the second one for obvious reasons. So this does 
not serve our purpose.
 S> 2. The next step for this problem would be to use the foreachbatch. That is 
not supported in the ContinuousMode.
 S> 3. The next step was to use something like this 
 S> 
dataset1.writeStream.format("kafka").trigger(ContinuousMode).start().awaitTermination()
 
 S> 
dataset1.writeStream.format("mongo").trigger(ContinuousMode).start().awaitTermination()
 S> for both the sinks. This does not work either. Only the 1st query works. 
The second one does not.

 S> Is there any solution to the problem of being able to write to multiple 
sinks in Continuous Trigger Mode using Structured Streaming?



-- 
With best wishes,Alex Ott
http://alexott.net/
Twitter: alexott_en (English), alexott (Russian)

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Cassandra raw deletion

2020-07-04 Thread Alex Ott
Yes, you can do it using the RDD API of Spark Cassandra Connector:
https://github.com/datastax/spark-cassandra-connector/blob/b2.5/doc/5_saving.md#deleting-rows-and-columns

Depending on you if you're deleting only specific columns, or full rows,
it's recommended to look to the keyColumns parameter - deletion by primary
or partition key is the most effective way in Cassandra...

Amit Sharma  at "Sat, 4 Jul 2020 10:44:00 -0400" wrote:
 AS> Hi, I have to delete certain raw from Cassandra during my spark batch 
process. Is there any way to delete Rawat using spark Cassandra
 AS> connector.

 AS> Thanks
 AS> Amit



-- 
With best wishes,        Alex Ott
http://alexott.net/
Twitter: alexott_en (English), alexott (Russian)

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Kafka Zeppelin integration

2020-06-21 Thread Alex Ott
Can you post what settings have you configured for Spark interpreter?
I recently did a demo of using Zeppelin 0.9.0 preview1 + Structured Streaming
+ Kafka, running in distributed mode on the DSE Analytics, and everything
just worked...

P.S. Here is the notebook if you're interested
https://github.com/alexott/zeppelin-demos/blob/master/cassandra-day-russia/Cassandra%20Day%20Russia%20Streaming%20demo.zpln

silav...@dtechspace.com  at "Fri, 19 Jun 2020 19:41:45 -0700" wrote:
 s> hi here is my question. Spark code run on zeppelin is unable to find kafka 
source even
 s> though a dependency is specified. I ask is there any way to fix this. 
Zeppelin version is
 s> 0.9.0, Spark version is 2.4.6, and kafka version is 2.4.1. I have specified 
the dependency
 s> in the packages and add a jar file that contained the kafka stream 010.


-- 
With best wishes,        Alex Ott
http://alexott.net/
Twitter: alexott_en (English), alexott (Russian)

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark structured streaming - performance tuning

2020-04-18 Thread Alex Ott
Just to clarify - I didn't write this explicitly in my answer. When you're
working with Kafka, every partition in Kafka is mapped into Spark
partition. And in Spark, every partition is mapped into task.   But you can
use `coalesce` to decrease the number of Spark partitions, so you'll have
less tasks...

Srinivas V  at "Sat, 18 Apr 2020 10:32:33 +0530" wrote:
 SV> Thank you Alex. I will check it out and let you know if I have any 
questions

 SV> On Fri, Apr 17, 2020 at 11:36 PM Alex Ott  wrote:

 SV> http://shop.oreilly.com/product/0636920047568.do has quite good 
information
 SV> on it.  For Kafka, you need to start with approximation that 
processing of
 SV> each partition is a separate task that need to be executed, so you 
need to
 SV> plan number of cores correspondingly.
 SV>
 SV> Srinivas V  at "Thu, 16 Apr 2020 22:49:15 +0530" wrote:
 SV>  SV> Hello, 
 SV>  SV> Can someone point me to a good video or document which takes 
about performance tuning for structured streaming app? 
 SV>  SV> I am looking especially for listening to Kafka topics say 5 
topics each with 100 portions .
 SV>  SV> Trying to figure out best cluster size and number of executors 
and cores required. 


-- 
With best wishes,Alex Ott
http://alexott.net/
Twitter: alexott_en (English), alexott (Russian)

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark structured streaming - performance tuning

2020-04-17 Thread Alex Ott
http://shop.oreilly.com/product/0636920047568.do has quite good information
on it.  For Kafka, you need to start with approximation that processing of
each partition is a separate task that need to be executed, so you need to
plan number of cores correspondingly.

Srinivas V  at "Thu, 16 Apr 2020 22:49:15 +0530" wrote:
 SV> Hello, 
 SV> Can someone point me to a good video or document which takes about 
performance tuning for structured streaming app? 
 SV> I am looking especially for listening to Kafka topics say 5 topics each 
with 100 portions .
 SV> Trying to figure out best cluster size and number of executors and cores 
required. 


-- 
With best wishes,        Alex Ott
http://alexott.net/
Twitter: alexott_en (English), alexott (Russian)

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org