Re: Does Rollups work with spark structured streaming with state.

2021-06-17 Thread Mich Talebzadeh
Great Amit, best of luck

Cheers,

Mich



   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 17 Jun 2021 at 18:39, Amit Joshi  wrote:

> HI Mich,
>
> Thanks for your email.
> I have tried for the batch mode,
> Still looking to try in streaming mode.
> Will update you as per.
>
>
> Regards
> Amit Joshi
>
> On Thu, Jun 17, 2021 at 1:07 PM Mich Talebzadeh 
> wrote:
>
>> OK let us start with the basic cube
>>
>> create a DF first
>>
>> scala> val df = Seq(
>>  |   ("bar", 2L),
>>  |   ("bar", 2L),
>>  |   ("foo", 1L),
>>  |   ("foo", 2L)
>>  | ).toDF("word", "num")
>> df: org.apache.spark.sql.DataFrame = [word: string, num: bigint]
>>
>>
>> Now try cube on it
>>
>>
>> scala> df.cube($"word", $"num").count.sort(asc("word"), asc("num")).show
>>
>> +++-+
>> |word| num|count|
>> +++-+
>> |null|null|4| Total rows in df
>> |null|   1|1| Count where num equals 1
>> |null|   2|3| Count where num equals 2
>> | bar|null|2| Where word equals bar
>> | bar|   2|2| Where word equals bar and num equals 2
>> | foo|null|2| Where word equals foo
>> | foo|   1|1| Where word equals foo and num equals 1
>> | foo|   2|1| Where word equals foo and num equals 2
>> +++-+
>>
>>
>> and rollup
>>
>>
>> scala> df.rollup($"word",$"num").count.sort(asc("word"), asc("num")).show
>>
>>
>> +++-+
>> |word| num|count|
>> +++-+
>> |null|null|4| Count of all rows
>> | bar|null|2| Count when word is bar
>> | bar|   2|2| Count when num is 2
>> | foo|null|2| Count when word is foo
>> | foo|   1|1| When word is foo and num is 1
>> | foo|   2|1| When word is foo and num is 2
>> +++-+
>>
>>
>> So rollup() returns a subset of the rows returned by cube(). From the
>> above, rollup returns 6 rows whereas cube returns 8 rows. Here are the
>> missing rows.
>>
>> +++-+
>> |word| num|count|
>> +++-+
>> |null|   1|1| Word is null and num is 1
>> |null|   2|3| Word is null and num is 2
>> +++-+
>>
>> Now back to Spark Structured Streaming (SSS), we have basic aggregations
>>
>>
>> """
>> We work out the window and the AVG(temperature) in the
>> window's timeframe below
>> This should return back the following Dataframe as struct
>>
>>  root
>>  |-- window: struct (nullable = false)
>>  ||-- start: timestamp (nullable = true)
>>  ||-- end: timestamp (nullable = true)
>>  |-- avg(temperature): double (nullable = true)
>>
>> """
>> resultM = resultC. \
>>  withWatermark("timestamp", "5 minutes"). \
>>  groupBy(window(resultC.timestamp, "5 minutes", "5
>> minutes")). \
>>  avg('temperature')
>>
>> # We take the above Dataframe and flatten it to get the
>> columns aliased as "startOfWindowFrame", "endOfWindowFrame" and
>> "AVGTemperature"
>> resultMF = resultM. \
>>select( \
>>
>> F.col("window.start").alias("startOfWindowFrame") \
>>   , F.col("window.end").alias("endOfWindowFrame")
>> \
>>   ,
>> F.col("avg(temperature)").alias("AVGTemperature"))
>>
>> Now basic aggregation on singular columns can be done like
>> avg('temperature'),max(),stddev() etc
>>
>>
>> For cube() and rollup() I will require additional columns like location
>> etc in my kafka topic. Personally I have not tried it but it will be
>> interesting to see if it works.
>>
>>
>> Have you tried cube() first?
>>
>>
>> HTH
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 17 Jun 2021 at 07:44, Amit Joshi 
>> wrote:
>>
>>> Hi Mich,
>>>
>>> Yes, you may think of cube rollups.
>>> Let me try to give an example:
>>> If we have a stream of data like (country,area,count, time), we would be
>>> able to get the updated count with different combinations of keys.
>>>
 As example -
  (country - count)
  (country , area - count)
>>>
>>>
>>> We may need to store the state to update the count. So spark structured
>>> streaming states will come in

Re: Does Rollups work with spark structured streaming with state.

2021-06-17 Thread Amit Joshi
HI Mich,

Thanks for your email.
I have tried for the batch mode,
Still looking to try in streaming mode.
Will update you as per.


Regards
Amit Joshi

On Thu, Jun 17, 2021 at 1:07 PM Mich Talebzadeh 
wrote:

> OK let us start with the basic cube
>
> create a DF first
>
> scala> val df = Seq(
>  |   ("bar", 2L),
>  |   ("bar", 2L),
>  |   ("foo", 1L),
>  |   ("foo", 2L)
>  | ).toDF("word", "num")
> df: org.apache.spark.sql.DataFrame = [word: string, num: bigint]
>
>
> Now try cube on it
>
>
> scala> df.cube($"word", $"num").count.sort(asc("word"), asc("num")).show
>
> +++-+
> |word| num|count|
> +++-+
> |null|null|4| Total rows in df
> |null|   1|1| Count where num equals 1
> |null|   2|3| Count where num equals 2
> | bar|null|2| Where word equals bar
> | bar|   2|2| Where word equals bar and num equals 2
> | foo|null|2| Where word equals foo
> | foo|   1|1| Where word equals foo and num equals 1
> | foo|   2|1| Where word equals foo and num equals 2
> +++-+
>
>
> and rollup
>
>
> scala> df.rollup($"word",$"num").count.sort(asc("word"), asc("num")).show
>
>
> +++-+
> |word| num|count|
> +++-+
> |null|null|4| Count of all rows
> | bar|null|2| Count when word is bar
> | bar|   2|2| Count when num is 2
> | foo|null|2| Count when word is foo
> | foo|   1|1| When word is foo and num is 1
> | foo|   2|1| When word is foo and num is 2
> +++-+
>
>
> So rollup() returns a subset of the rows returned by cube(). From the
> above, rollup returns 6 rows whereas cube returns 8 rows. Here are the
> missing rows.
>
> +++-+
> |word| num|count|
> +++-+
> |null|   1|1| Word is null and num is 1
> |null|   2|3| Word is null and num is 2
> +++-+
>
> Now back to Spark Structured Streaming (SSS), we have basic aggregations
>
>
> """
> We work out the window and the AVG(temperature) in the
> window's timeframe below
> This should return back the following Dataframe as struct
>
>  root
>  |-- window: struct (nullable = false)
>  ||-- start: timestamp (nullable = true)
>  ||-- end: timestamp (nullable = true)
>  |-- avg(temperature): double (nullable = true)
>
> """
> resultM = resultC. \
>  withWatermark("timestamp", "5 minutes"). \
>  groupBy(window(resultC.timestamp, "5 minutes", "5
> minutes")). \
>  avg('temperature')
>
> # We take the above Dataframe and flatten it to get the
> columns aliased as "startOfWindowFrame", "endOfWindowFrame" and
> "AVGTemperature"
> resultMF = resultM. \
>select( \
>
> F.col("window.start").alias("startOfWindowFrame") \
>   , F.col("window.end").alias("endOfWindowFrame") \
>   ,
> F.col("avg(temperature)").alias("AVGTemperature"))
>
> Now basic aggregation on singular columns can be done like
> avg('temperature'),max(),stddev() etc
>
>
> For cube() and rollup() I will require additional columns like location
> etc in my kafka topic. Personally I have not tried it but it will be
> interesting to see if it works.
>
>
> Have you tried cube() first?
>
>
> HTH
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 17 Jun 2021 at 07:44, Amit Joshi 
> wrote:
>
>> Hi Mich,
>>
>> Yes, you may think of cube rollups.
>> Let me try to give an example:
>> If we have a stream of data like (country,area,count, time), we would be
>> able to get the updated count with different combinations of keys.
>>
>>> As example -
>>>  (country - count)
>>>  (country , area - count)
>>
>>
>> We may need to store the state to update the count. So spark structured
>> streaming states will come into picture.
>>
>> As now with batch programming, we can do it with
>>
>>> df.rollup(col1,col2).count
>>
>>
>> But if I try to use it with spark structured streaming state, will it
>> store the state of all the groups as well?
>> I hope I was able to make my point clear.
>>
>> Regards
>> Amit Joshi
>>
>> On Wed, Jun 16, 2021 at 11:36 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>>
>>>
>>> Hi,
>>>
>>> Just to clarify
>>>
>>> Are we talking about* rollup* as a subset of a cube that computes
>>> hierarchical subtotals from left to right?
>>>
>>>
>>>
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>

Re: class KafkaCluster related errors

2021-06-17 Thread Mich Talebzadeh
This is interesting because I am using PySpark but I need these jar files
for Spark 3.1.1 and Kafka 2.7.0 to work

kafka-clients-2.7.0.jar
commons-pool2-2.9.0.jar
spark-streaming_2.12-3.1.1.jar
spark-sql-kafka-0-10_2.12-3.1.0.jar

Do you have equivalent of these artifacts in your POM file?

HTH




   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 17 Jun 2021 at 17:30, Kiran Biswal  wrote:

> Hello Mich
>
> i added kafka-client back, I am back to seeing the earlier error about
> streaming-start.
>
> +
>
> +org.apache.kafka
>
> +kafka-clients
>
> +0.10.0.0
>
> +
>
> Anything you can think of
>
> Exception in thread "streaming-start" java.lang.NoClassDefFoundError:
> org/apache/kafka/common/security/JaasContext
>
> at
> org.apache.spark.kafka010.KafkaTokenUtil$.isGlobalJaasConfigurationProvided(KafkaTokenUtil.scala:155)
>
> at
> org.apache.spark.kafka010.KafkaConfigUpdater.setAuthenticationConfigIfNeeded(KafkaConfigUpdater.scala:72)
>
> at
> org.apache.spark.kafka010.KafkaConfigUpdater.setAuthenticationConfigIfNeeded(KafkaConfigUpdater.scala:62)
>
> at
> org.apache.spark.streaming.kafka010.ConsumerStrategy.setAuthenticationConfigIfNeeded(ConsumerStrategy.scala:64)
>
> at
> org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:91)
>
> at
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:73)
>
> at
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:258)
>
> at
> org.apache.spark.streaming.DStreamGraph.$anonfun$start$7(DStreamGraph.scala:55)
>
> at
> org.apache.spark.streaming.DStreamGraph.$anonfun$start$7$adapted(DStreamGraph.scala:55)
>
> at scala.collection.Iterator.foreach(Iterator.scala:941)
>
> at scala.collection.Iterator.foreach$(Iterator.scala:941)
>
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
>
> at
> scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:974)
>
> at scala.collection.parallel.Task.$anonfun$tryLeaf$1(Tasks.scala:53)
>
> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>
> at scala.util.control.Breaks$$anon$1.catchBreak(Breaks.scala:67)
>
> at scala.collection.parallel.Task.tryLeaf(Tasks.scala:56)
>
> at scala.collection.parallel.Task.tryLeaf$(Tasks.scala:50)
>
> at
> scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:971)
>
> at
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute(Tasks.scala:153)
>
> at
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute$(Tasks.scala:149)
>
> at
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:440)
>
> at java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
>
> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>
> at
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>
> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>
> at
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
>
> Caused by: java.lang.ClassNotFoundException:
> org.apache.kafka.common.security.JaasContext
>
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
>
> ... 27 more
>
> Regards
> Kiran
>
> On Thu, Jun 17, 2021 at 7:30 AM Mich Talebzadeh 
> wrote:
>
>> Hi Kiran,
>>
>> You need kafka-clients for the version of kafka you are using. So if it
>> is the correct version keep it.
>>
>> Try running and see what the error says.
>>
>> HTH
>>
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 17 Jun 2021 at 15:10, Kiran Biswal  wrote:
>>
>>> Hello Mich
>>>
>>> Thanks for the pointer. I believe they helped. I remove kafka-streams
>>> and  kafka-client. Is kafka-client needed?
>>>
>>> The streaming application runs in kubernetes environment. I see that the
>>> driver starts but subsequently the executors start but they crash and start
>>> in a repeat loop. i have attached some logs from the spark driver pod (

Re: class KafkaCluster related errors

2021-06-17 Thread Mich Talebzadeh
Hi Kiran,

You need kafka-clients for the version of kafka you are using. So if it is
the correct version keep it.

Try running and see what the error says.

HTH



   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 17 Jun 2021 at 15:10, Kiran Biswal  wrote:

> Hello Mich
>
> Thanks for the pointer. I believe they helped. I remove kafka-streams and
>  kafka-client. Is kafka-client needed?
>
> The streaming application runs in kubernetes environment. I see that the
> driver starts but subsequently the executors start but they crash and start
> in a repeat loop. i have attached some logs from the spark driver pod (see
> spark_driver_pod.txt)
>
> Any thoughts on what's causing the crash? Do these warnings look serious?
>
> Thanks
> Kiran
>
> On Sun, Jun 13, 2021 at 12:48 AM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Hi Kiran
>>
>> Looking at your pom file Isee below
>>
>>  
>>  org.apache.kafka
>>  kafka-streams
>>  0.10.0.0
>>
>>  
>>
>>
>> Why do you need  kafka-streams? I don't think spark uses it and may get
>> libraries confused.
>>
>> HTH
>>
>>
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sun, 13 Jun 2021 at 05:17, Kiran Biswal  wrote:
>>
>>> Hello Mich
>>>
>>> Thanks a lot for all your help. As per this document spark 3.0.1 and
>>> kafka 0.10 is a supported  combination that's what i have
>>>
>>> https://spark.apache.org/docs/3.0.1/streaming-kafka-0-10-integration.html
>>>
>>> I have attached the pom.xml file (I use maven environment). Would you
>>> kindly take a look to see if something needs to change in terms of
>>> versioning or anything additional I need to include to get streaming
>>> working?
>>>
>>> Look forward to your response
>>>
>>> Regards
>>> Kiran
>>>
>>> On Fri, Jun 11, 2021 at 1:09 AM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 Hi Kiran. I was using

- Kafka Cluster 2.12-1.1.0
- Spark Streaming 2.3, Spark SQL 2.3
- Scala 2.11.8

 Your Kafka version 0.10 seems to be pretty old. That may be the issue
 here. Try upgrading Kafka in a test environment to see if it helps.


 HTH


view my Linkedin profile
 



 *Disclaimer:* Use it at your own risk. Any and all responsibility for
 any loss, damage or destruction of data or any other property which may
 arise from relying on this email's technical content is explicitly
 disclaimed. The author will in no case be liable for any monetary damages
 arising from such loss, damage or destruction.




 On Fri, 11 Jun 2021 at 08:55, Kiran Biswal 
 wrote:

> Hello Mich
>
> When you were using dstream, what version of kafka, spark and scala
> were you using?
>
> I am using kafka 0.10 with spark 3.0.1 and scala 2.12. Do you feel
> this combination can reliably work from streaming point of view.?
>
> I get below error when invoke createDirectStreamException. Any
> suggestions on how to move forard here?
>
> Thanks a kit for ak help.
> Thanks
> kiran
>
> in thread "streaming-start" java.lang.NoClassDefFoundError:
> org/apache/kafka/common/security/JaasContext
>
>   at 
> org.apache.spark.kafka010.KafkaTokenUtil$.isGlobalJaasConfigurationProvided(KafkaTokenUtil.scala:155)
>   at 
> org.apache.spark.kafka010.KafkaConfigUpdater.setAuthenticationConfigIfNeeded(KafkaConfigUpdater.scala:72)
>   at 
> org.apache.spark.kafka010.KafkaConfigUpdater.setAuthenticationConfigIfNeeded(KafkaConfigUpdater.scala:62)
>   at 
> org.apache.spark.streaming.kafka010.ConsumerStrategy.setAuthenticationConfigIfNeeded(ConsumerStrategy.scala:64)
>   at 
> org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:91)
>   at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:73)
>   at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:258)
>   at 
> org.apache.spark.streaming.DS

Re: Migrating from hive to spark

2021-06-17 Thread Mich Talebzadeh
Ok the first link throws some clues

.*... Hive excels in batch disc processing with a map reduce execution
engine. Actually, Hive can also use Spark as its execution engine which
also has a Hive context allowing us to query Hive tables. Despite all the
great things Hive can solve, this post is to talk about why we move our
ETL’s to the ‘not so new’ player for batch processing, ...*

Great, you want to use Spark for ETL as opposed to Hive for cleaning up
your data once your upstream CDC files are landed on HDF? correct





   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 17 Jun 2021 at 08:17, Battula, Brahma Reddy 
wrote:

> Hi Talebzadeh,
>
>
>
> Looks I confused, Sorry.. Now I changed to subject to make it clear.
>
> Facebook has tried migration from hive to spark. Check the following links
> for same.
>
>
>
> *https://www.dcsl.com/migrating-from-hive-to-spark/
> *
>
>
> https://databricks.com/session/experiences-migrating-hive-workload-to-sparksql
>
> https://www.cloudwalker.io/2019/02/19/spark-ad-hoc-querying/
>
>
>
>
>
> would like to know, like this anybody else migrated..? and any challenges
> or pre-requisite to migrate(Like hardware)..? any tools to evaluate before
> we migrate?
>
>
>
>
>
>
>
>
>
> *From: *Mich Talebzadeh 
> *Date: *Tuesday, 15 June 2021 at 10:36 PM
> *To: *Battula, Brahma Reddy 
> *Cc: *Battula, Brahma Reddy , ayan guha <
> guha.a...@gmail.com>, d...@spark.apache.org ,
> user@spark.apache.org 
> *Subject: *Re: Spark-sql can replace Hive ?
>
> OK you mean use spark.sql as opposed to HiveContext.sql?
>
>
>
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>
> HiveContext.sql("")
>
>
>
> replace with
>
>
>
> spark.sql("")
>
> ?
>
>
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Tue, 15 Jun 2021 at 18:00, Battula, Brahma Reddy 
> wrote:
>
> Currently I am using hive sql engine for adhoc queries. As spark-sql also
> supports this, I want migrate from hive.
>
>
>
>
>
>
>
>
>
> *From: *Mich Talebzadeh 
> *Date: *Thursday, 10 June 2021 at 8:12 PM
> *To: *Battula, Brahma Reddy 
> *Cc: *ayan guha , d...@spark.apache.org <
> d...@spark.apache.org>, user@spark.apache.org 
> *Subject: *Re: Spark-sql can replace Hive ?
>
> These are different things. Spark provides a computational layer and a
> dialogue of SQL based on Hive.
>
>
>
> Hive is a DW on top of HDFS. What are you trying to replace?
>
>
>
> HTH
>
>
>
>
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Thu, 10 Jun 2021 at 12:09, Battula, Brahma Reddy
>  wrote:
>
> Thanks for prompt reply.
>
>
>
> I want to replace hive with spark.
>
>
>
>
>
>
>
>
>
> *From: *ayan guha 
> *Date: *Thursday, 10 June 2021 at 4:35 PM
> *To: *Battula, Brahma Reddy 
> *Cc: *d...@spark.apache.org , user@spark.apache.org <
> user@spark.apache.org>
> *Subject: *Re: Spark-sql can replace Hive ?
>
> Would you mind expanding the ask? Spark Sql can use hive by itaelf
>
>
>
> On Thu, 10 Jun 2021 at 8:58 pm, Battula, Brahma Reddy
>  wrote:
>
> Hi
>
>
>
> Would like know any refences/docs to replac

Re: Does Rollups work with spark structured streaming with state.

2021-06-17 Thread Mich Talebzadeh
OK let us start with the basic cube

create a DF first

scala> val df = Seq(
 |   ("bar", 2L),
 |   ("bar", 2L),
 |   ("foo", 1L),
 |   ("foo", 2L)
 | ).toDF("word", "num")
df: org.apache.spark.sql.DataFrame = [word: string, num: bigint]


Now try cube on it


scala> df.cube($"word", $"num").count.sort(asc("word"), asc("num")).show

+++-+
|word| num|count|
+++-+
|null|null|4| Total rows in df
|null|   1|1| Count where num equals 1
|null|   2|3| Count where num equals 2
| bar|null|2| Where word equals bar
| bar|   2|2| Where word equals bar and num equals 2
| foo|null|2| Where word equals foo
| foo|   1|1| Where word equals foo and num equals 1
| foo|   2|1| Where word equals foo and num equals 2
+++-+


and rollup


scala> df.rollup($"word",$"num").count.sort(asc("word"), asc("num")).show


+++-+
|word| num|count|
+++-+
|null|null|4| Count of all rows
| bar|null|2| Count when word is bar
| bar|   2|2| Count when num is 2
| foo|null|2| Count when word is foo
| foo|   1|1| When word is foo and num is 1
| foo|   2|1| When word is foo and num is 2
+++-+


So rollup() returns a subset of the rows returned by cube(). From the
above, rollup returns 6 rows whereas cube returns 8 rows. Here are the
missing rows.

+++-+
|word| num|count|
+++-+
|null|   1|1| Word is null and num is 1
|null|   2|3| Word is null and num is 2
+++-+

Now back to Spark Structured Streaming (SSS), we have basic aggregations


"""
We work out the window and the AVG(temperature) in the window's
timeframe below
This should return back the following Dataframe as struct

 root
 |-- window: struct (nullable = false)
 ||-- start: timestamp (nullable = true)
 ||-- end: timestamp (nullable = true)
 |-- avg(temperature): double (nullable = true)

"""
resultM = resultC. \
 withWatermark("timestamp", "5 minutes"). \
 groupBy(window(resultC.timestamp, "5 minutes", "5
minutes")). \
 avg('temperature')

# We take the above Dataframe and flatten it to get the columns
aliased as "startOfWindowFrame", "endOfWindowFrame" and "AVGTemperature"
resultMF = resultM. \
   select( \

F.col("window.start").alias("startOfWindowFrame") \
  , F.col("window.end").alias("endOfWindowFrame") \
  ,
F.col("avg(temperature)").alias("AVGTemperature"))

Now basic aggregation on singular columns can be done like
avg('temperature'),max(),stddev() etc


For cube() and rollup() I will require additional columns like location etc
in my kafka topic. Personally I have not tried it but it will be
interesting to see if it works.


Have you tried cube() first?


HTH


   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 17 Jun 2021 at 07:44, Amit Joshi  wrote:

> Hi Mich,
>
> Yes, you may think of cube rollups.
> Let me try to give an example:
> If we have a stream of data like (country,area,count, time), we would be
> able to get the updated count with different combinations of keys.
>
>> As example -
>>  (country - count)
>>  (country , area - count)
>
>
> We may need to store the state to update the count. So spark structured
> streaming states will come into picture.
>
> As now with batch programming, we can do it with
>
>> df.rollup(col1,col2).count
>
>
> But if I try to use it with spark structured streaming state, will it
> store the state of all the groups as well?
> I hope I was able to make my point clear.
>
> Regards
> Amit Joshi
>
> On Wed, Jun 16, 2021 at 11:36 PM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>>
>>
>> Hi,
>>
>> Just to clarify
>>
>> Are we talking about* rollup* as a subset of a cube that computes
>> hierarchical subtotals from left to right?
>>
>>
>>
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Wed, 16 Jun 2021 at 16:37, Amit Joshi 
>> wrote:
>>
>>> Appreciate if someone could give some poin

Migrating from hive to spark

2021-06-17 Thread Battula, Brahma Reddy
Hi Talebzadeh,

Looks I confused, Sorry.. Now I changed to subject to make it clear.
Facebook has tried migration from hive to spark. Check the following links for 
same.

https://www.dcsl.com/migrating-from-hive-to-spark/
https://databricks.com/session/experiences-migrating-hive-workload-to-sparksql
https://www.cloudwalker.io/2019/02/19/spark-ad-hoc-querying/


would like to know, like this anybody else migrated..? and any challenges or 
pre-requisite to migrate(Like hardware)..? any tools to evaluate before we 
migrate?




From: Mich Talebzadeh 
Date: Tuesday, 15 June 2021 at 10:36 PM
To: Battula, Brahma Reddy 
Cc: Battula, Brahma Reddy , ayan guha 
, d...@spark.apache.org , 
user@spark.apache.org 
Subject: Re: Spark-sql can replace Hive ?
OK you mean use spark.sql as opposed to HiveContext.sql?

val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
HiveContext.sql("")

replace with

spark.sql("")
?



 
[https://docs.google.com/uc?export=download&id=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ&revid=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Tue, 15 Jun 2021 at 18:00, Battula, Brahma Reddy 
mailto:bbatt...@visa.com>> wrote:
Currently I am using hive sql engine for adhoc queries. As spark-sql also 
supports this, I want migrate from hive.




From: Mich Talebzadeh 
mailto:mich.talebza...@gmail.com>>
Date: Thursday, 10 June 2021 at 8:12 PM
To: Battula, Brahma Reddy 
Cc: ayan guha mailto:guha.a...@gmail.com>>, 
d...@spark.apache.org 
mailto:d...@spark.apache.org>>, 
user@spark.apache.org 
mailto:user@spark.apache.org>>
Subject: Re: Spark-sql can replace Hive ?
These are different things. Spark provides a computational layer and a dialogue 
of SQL based on Hive.

Hive is a DW on top of HDFS. What are you trying to replace?

HTH





 
[https://docs.google.com/uc?export=download&id=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ&revid=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Thu, 10 Jun 2021 at 12:09, Battula, Brahma Reddy  
wrote:
Thanks for prompt reply.

I want to replace hive with spark.




From: ayan guha mailto:guha.a...@gmail.com>>
Date: Thursday, 10 June 2021 at 4:35 PM
To: Battula, Brahma Reddy 
Cc: d...@spark.apache.org 
mailto:d...@spark.apache.org>>, 
user@spark.apache.org 
mailto:user@spark.apache.org>>
Subject: Re: Spark-sql can replace Hive ?
Would you mind expanding the ask? Spark Sql can use hive by itaelf

On Thu, 10 Jun 2021 at 8:58 pm, Battula, Brahma Reddy 
 wrote:
Hi

Would like know any refences/docs to replace hive with spark-sql completely 
like how migrate the existing data in hive.?

thanks


--
Best Regards,
Ayan Guha


RE: Small file problem

2021-06-17 Thread Boris Litvak
Compact them and remove the small files.
One messy way of doing this, (including some cleanup) looks like the following, 
based on rdd.mapPartitions() on the file urls rdd:

import gzip
import json
import logging
import math
import re
from typing import List

import boto3
from mypy_boto3_s3.client import S3Client
from pyspark.sql import SparkSession

import configuration
import data_types.time_series
from data_types.shared import series
from s3_batcher import get_s3_objects_batches
from functools import partial

logger = logging.getLogger(__name__)

session = boto3.session.Session()
s3 = session.resource('s3')

def merge_json_files(file_names, config):
total = []
exceptions = []
for name in file_names:
try:
logger.debug(f'Loading {name} ...')
obj = s3.Object(config.get_non_concatenated_bucket_name(), name)
body = obj.get()['Body'].read()
if name.endswith('.gz'):
body = gzip.decompress(body)

company_id = re.search('company_id=(\S+)/', name).group(1)
clazz = config.get_schema_class()
loaded = json.loads(body)
obj: series.SchemaFixer = clazz(company_id=company_id, **loaded)
jason = obj.fix_value()

total.append(jason)
except Exception as ex:
logger.error(f'{name}: {ex}')
exceptions.append(name)

if exceptions:
logger.warning(f'Exceptions: {exceptions}')
return iter(total)

def get_json_files_list(s3client: S3Client, config: configuration.Config) -> 
List[str]:
"""
returns [{'Key': '$s3prefix'}, ]
"""
logger.info('Loading file list')
files = []
# aws s3 ls --summarize --human-readable --recursive \
#   s3://ingest-measurements-202006101309296531/TIME_SERIES/ 
--profile hierarchy_playground > list.txt
for batch in get_s3_objects_batches(s3client,

Bucket=config.get_non_concatenated_bucket_name(),

Prefix=config.get_non_concatenated_prefix()):
files_batch = [b['Key'] for b in batch if '=' in b['Key']]
# 
TIME_SERIES/company_id=00224d27-b66f-4b62-bae2-f1399f530d94/60514332-0bc0-4bff-8263-eb6b090b9210.json.gz
files.extend(files_batch)

logger.info('Finished listing files')
return files

def run(spark: SparkSession, config: configuration.Config):
files = get_json_files_list(boto3.client('s3'), config)
files_num = len(files)
logger.info(f'Loaded file list with {files_num} files')

# logger.info(f'Traversing {files}')

spark.sparkContext.setJobDescription('Parallelize filenames and read/merge 
files')

rdd = spark.sparkContext.parallelize(files, math.ceil(files_num / 
config.small_files_in_partition))
logger.info(f'Got an rdd with {rdd.getNumPartitions()} partitions')
func = partial(merge_json_files, config=config)
loaded_rdd = rdd.mapPartitions(func)

# destination = r'c:\tmp\jsonresult'
# shutil.rmtree(destination)
# print(loaded_rdd.take(2))
loaded_rdd.saveAsTextFile(config.get_concatenated_path())
# note: these are not sorted by time, will be hard to etl/read
# result.write.json(config.get_concatenated_path())

# df = load_json_df(spark, config.source_path, 
config.source_partition_keys, config.input_schema)
# logger.info(f'Schema is {df.schema.json()}')
# spark.read.json(destination).show(truncate=50)
pass
# logger.info(f'Read 
{spark.read.format("parquet").load(config.get_parquet_dir()).count()} rows from 
parquet')


Boris

From: Sachit Murarka 
Sent: Wednesday, 16 June 2021 21:25
To: spark users 
Subject: Small file problem

Hello Spark Users,

We are receiving too much small small files. About 3 million. Reading it using 
spark.read itself taking long time and job is not proceeding further.

Is there any way to fasten this and proceed?

Regards
Sachit Murarka