Re: Generic Dataset[T] Query

2019-05-09 Thread Ramandeep Singh Nanda
You need to supply a rowencoder.

Regards,
Ramandeep Singh

On Thu, May 9, 2019, 11:33 SNEHASISH DUTTA  wrote:

> Hi ,
>
> I am trying to write a generic method which will return custom type
> datasets as well as spark.sql.Row
>
> def read[T](params: Map[String, Any])(implicit encoder: Encoder[T]): 
> Dataset[T]
>
> is my method signature, which is working fine for custom types but when I
> am trying to obtain a Dataset[Row] it errors out with the following message
>
> " Unable to find encoder for type org.apache.spark.sql.Row. An implicit
> Encoder[org.apache.spark.sql.Row] is needed to store
> org.apache.spark.sql.Row instances in a Dataset. Primitive types (Int,
> String, etc) and Product types (case classes) are supported by importing
> spark.implicits._ "
>
> Is it possible to make some changes so that it can process both custom
> types and Row type.
>
> Regards,
> Snehasish
>


Re: Re: How to get all input tables of a SPARK SQL 'select' statement

2019-01-25 Thread Ramandeep Singh Nanda
Hi,

You don't have to run the SQL statement. You can parse it, that will be the
logical parsing.

val logicalPlan = ss.sessionState.sqlParser.parsePlan(sqlText = query)
println(logicalPlan.prettyJson)

[ {
  "class" : "org.apache.spark.sql.catalyst.plans.logical.Project",
  "num-children" : 1,
  "projectList" : [ [ {
"class" : "org.apache.spark.sql.catalyst.analysis.UnresolvedStar",
"num-children" : 0
  } ] ],
  "child" : 0
}, {
  "class" : "org.apache.spark.sql.catalyst.analysis.UnresolvedRelation",
  "num-children" : 0,
  "tableIdentifier" : {
"product-class" : "org.apache.spark.sql.catalyst.TableIdentifier",
"table" : "abc"
  }
} ]



On Fri, Jan 25, 2019 at 6:07 AM  wrote:

> Hi, All,
>
> I tried the suggested approach and it works, but it requires to 'run' the
> SQL statement first.
>
> I just want to parse the SQL statement without running it, so I can do
> this in my laptop without connecting to our production environment.
>
> I tried to write a tool which uses the SqlBase.g4 bundled with SPARK SQL
> to extract names of the input tables and it works as expected.
>
> But I have a question:
>
> The parser generated by SqlBase.g4 only accepts 'select' statement with
> all keywords such as 'SELECT', 'FROM' and table names capitalized
> e.g. it accepts 'SELECT * FROM FOO', but it doesn't accept 'select * from
> foo'.
>
> But I can run the spark.sql("select * from foo") in the spark2-shell
> without any problem.
>
> Is there another 'layer' in the SPARK SQL to capitalize those 'tokens'
> before invoking the parser?
>
> If so, why not just modify the SqlBase.g4 to accept lower cases keywords?
>
> Thanks
>
> Boying
>
>
>
> 发件人: "Shahab Yunus" 
> 收件人: "Ramandeep Singh Nanda" 
> 抄送: "Tomas Bartalos" , l...@china-inv.cn, "user
> @spark/'user @spark'/spark users/user@spark" 
> 日期: 2019/01/24 06:45
> 主题: Re: How to get all input tables of a SPARK SQL 'select' statement
> --
>
>
>
> Could be a tangential idea but might help: Why not use queryExecution and
> logicalPlan objects that are available when you execute a query using
> SparkSession and get a DataFrame back? The Json representation contains
> almost all the info that you need and you don't need to go to Hive to get
> this info.
>
> Some details here:
> *https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-Dataset.html#queryExecution*
> <https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-Dataset.html#queryExecution>
>
> On Wed, Jan 23, 2019 at 5:35 PM Ramandeep Singh Nanda <
> *ramannan...@gmail.com* > wrote:
> Explain extended or explain would list the plan along with the tables. Not
> aware of any statements that explicitly list dependencies or tables
> directly.
>
> Regards,
> Ramandeep Singh
>
> On Wed, Jan 23, 2019, 11:05 Tomas Bartalos <*tomas.barta...@gmail.com*
>  wrote:
> This might help:
> show tables;
>
> st 23. 1. 2019 o 10:43 <*l...@china-inv.cn* >
> napísal(a):
> Hi, All,
>
> We need to get all input tables of several SPARK SQL 'select' statements.
>
> We can get those information of Hive SQL statements by using 'explain
> dependency select'.
> But I can't find the equivalent command for SPARK SQL.
>
> Does anyone know how to get this information of a SPARK SQL 'select'
> statement?
>
> Thanks
>
> Boying
>
>
>
> --
>
>
>
> 本邮件内容包含保密信息。如阁下并非拟发送的收件人,请您不要阅读、保存、对外披露或复制本邮件的任何内容,或者打开本邮件的任何附件。请即回复邮件告知发件人,并立刻将该邮件及其附件从您的电脑系统中全部删除,不胜感激。
>
>
> This email message may contain confidential and/or privileged information.
> If you are not the intended recipient, please do not read, save, forward,
> disclose or copy the contents of this email or open any file attached to
> this email. We will be grateful if you could advise the sender immediately
> by replying this email, and delete this email and any attachment or links
> to this email completely and immediately from your computer system.
>
>
> --
>
>
>
>
> --
>
>
>
> 本邮件内容包含保密信息。如阁下并非拟发送的收件人,请您不要阅读、保存、对外披露或复制本邮件的任何内容,或者打开本邮件的任何附件。请即回复邮件告知发件人,并立刻将该邮件及其附件从您的电脑系统中全部删除,不胜感激。
>
>
>
> This email message may contain confidential and/or privileged information.
> If you are not the intended recipient, please do not read, save, forward,
> disclose or copy the contents of this email or open any file attached to
> this email. We will be grateful if you could advise the sender immediately
> by replying this email, and delete this email and any attachment or links
> to this email completely and immediately from your computer system.
>
>
> --
>
>

-- 
Regards,
Ramandeep Singh
http://orastack.com
+13474792296
ramannan...@gmail.com


Re: How to get all input tables of a SPARK SQL 'select' statement

2019-01-23 Thread Ramandeep Singh Nanda
Explain extended or explain would list the plan along with the tables. Not
aware of any statements that explicitly list dependencies or tables
directly.

Regards,
Ramandeep Singh

On Wed, Jan 23, 2019, 11:05 Tomas Bartalos  This might help:
>
> show tables;
>
> st 23. 1. 2019 o 10:43  napísal(a):
>
>> Hi, All,
>>
>> We need to get all input tables of several SPARK SQL 'select' statements.
>>
>> We can get those information of Hive SQL statements by using 'explain
>> dependency select'.
>> But I can't find the equivalent command for SPARK SQL.
>>
>> Does anyone know how to get this information of a SPARK SQL 'select'
>> statement?
>>
>> Thanks
>>
>> Boying
>>
>>
>>
>> --
>>
>>
>>
>> 本邮件内容包含保密信息。如阁下并非拟发送的收件人,请您不要阅读、保存、对外披露或复制本邮件的任何内容,或者打开本邮件的任何附件。请即回复邮件告知发件人,并立刻将该邮件及其附件从您的电脑系统中全部删除,不胜感激。
>>
>>
>>
>> This email message may contain confidential and/or privileged
>> information. If you are not the intended recipient, please do not read,
>> save, forward, disclose or copy the contents of this email or open any file
>> attached to this email. We will be grateful if you could advise the sender
>> immediately by replying this email, and delete this email and any
>> attachment or links to this email completely and immediately from your
>> computer system.
>>
>>
>> --
>>
>>


Re: Is it possible to rate limit an UDP?

2019-01-14 Thread Ramandeep Singh Nanda
Basically, it is a zipping two flowables using the defined function[takes
two parameters and returns one, Hence the name BiFunction].

Obviously, you could avoid using RXJava and by using a TimerTask.

val a = Seq(1, 2, 3)
val b = a.zipWithIndex
b.foreach(b => new Timer().schedule(new TimerTask {
  override def run(): Unit = println(b._1)
}, b._2 * 200));


On Sat, Jan 12, 2019 at 9:25 PM  wrote:

> Thank you for your suggestion Ramandeep , but the code is not clear to me.
> Could you please explain it?  Particularly this part :
>
>
>
> Flowable.zip[java.lang.Long, Row, Row](delSt, itF, new
> BiFunction[java.lang.Long, Row, Row]() {
>
>
>
> Also , is it possible to achieve this without third party libraries?
>
>
>
> Thank you
>
>
>
> *From:* Ramandeep Singh 
> *Sent:* Thursday, January 10, 2019 1:48 AM
> *To:* Sonal Goyal 
> *Cc:* em...@yeikel.com; user 
> *Subject:* Re: Is it possible to rate limit an UDP?
>
>
>
> Backpressure is the suggested way out here and is the correct approach, it
> rate limits at the source itself for safety.   Imagine a service with
> throttling enabled, It can outright reject your calls.
>
>
>
> Even if you split your df that alone won't achieve your purpose, You can
> combine that with backpressure enabled API or restricting by time.
>
>
>
> Here's an example, Using RxJava, if you don't want to use any streaming
> api.
>
> *def *main(args: Array[String]): Unit = {
>   *val *ss = 
> SparkSession.*builder*().master("local[*]").enableHiveSupport().getOrCreate()
>
>   *import *ss.*sqlContext*.implicits._
>
>   *val *df = ss.read.json("src/main/resources/person.json")
>   *implicit val *encoder = *RowEncoder*(df.schema)
>   df.repartition(2).mapPartitions(it => {
> *val *itF = Flowable.*fromIterable*[Row](it.toIterable.asJava)
> *val *delSt = Flowable.*interval*(1, TimeUnit.*SECONDS*)
> Flowable.*zip*[java.lang.Long, Row, Row](delSt, itF, *new 
> *BiFunction[java.lang.Long, Row, Row]() {
>   *override def *apply(t1: java.lang.Long, t2: Row): Row = {
> //call api here
> t2
>   }
> }).toList.blockingGet().iterator().asScala
>   })
>   df.show()
> }
>
>
>
> On Wed, Jan 9, 2019 at 6:12 AM Sonal Goyal  wrote:
>
> Have you tried controlling the number of partitions of the dataframe? Say
> you have 5 partitions, it means you are making 5 concurrent calls to the
> web service. The throughput of the web service would be your bottleneck and
> Spark workers would be waiting for tasks, but if you cant control the REST
> service, maybe its worth a shot.
>
>
> Thanks,
> Sonal
> Nube Technologies
> 
>
>
>
>
>
>
>
>
>
>
> On Wed, Jan 9, 2019 at 4:51 AM  wrote:
>
> I have a data frame for which I apply an UDF that calls a REST web
> service.  This web service is distributed in only a few nodes and it won’t
> be able to handle a massive load from Spark.
>
>
>
> Is it possible to rate limit this UDP? For example , something like 100
> op/s.
>
>
>
> If not , what are the options? Is splitting the df an option?
>
>
>
> I’ve read a similar question in Stack overflow [1] and the solution
> suggests Spark Streaming , but my application does not involve streaming.
> Do I need to turn the operations into a streaming workflow to achieve
> something like that?
>
>
>
> Current Workflow : Hive -> Spark ->  Service
>
>
>
> Thank you
>
>
>
> [1]
> https://stackoverflow.com/questions/43953882/how-to-rate-limit-a-spark-map-operation
> 
>
>
>
>
> --
>
> Regards,
>
> Ramandeep Singh
>
> Blog:http://ramannanda.blogspot.com
>


-- 
Regards,
Ramandeep Singh
http://orastack.com
+13474792296
ramannan...@gmail.com


Re: Need help with SparkSQL Query

2018-12-17 Thread Ramandeep Singh Nanda
You can use analytical functions in spark sql.

Something like select * from (select id, row_number() over (partition by id
order by timestamp ) as rn from root) where rn=1

On Mon, Dec 17, 2018 at 4:03 PM Nikhil Goyal  wrote:

> Hi guys,
>
> I have a dataframe of type Record (id: Long, timestamp: Long, isValid:
> Boolean,  other metrics)
>
> Schema looks like this:
> root
>  |-- id: long (nullable = true)
>  |-- timestamp: long (nullable = true)
>  |-- isValid: boolean (nullable = true)
> .
>
> I need to find the earliest valid record per id. In RDD world I can do
> groupBy 'id' and find the earliest one but I am not sure how I can do it in
> SQL. Since I am doing this in PySpark I cannot really use DataSet API for
> this.
>
> One thing I can do is groupBy 'id', find the earliest timestamp available
> and then join with the original dataframe to get the right record (all the
> metrics).
>
> Or I can create a single column with all the records and then implement a
> UDAF in scala and use it in pyspark.
>
> Both solutions don't seem to be straight forward. Is there a simpler
> solution to this?
>
> Thanks
> Nikhil
>


-- 
Regards,
Ramandeep Singh
http://orastack.com
+13474792296
ramannan...@gmail.com


Is Dataframe write blocking?

2018-11-08 Thread Ramandeep Singh Nanda
HI,

I have some futures setup to operate in stages, where I expect one stage to
complete before another begins. I was hoping that dataframe write call is
blocking, whereas the behavior i see is that the call returns before data
is persisted. This can cause unintended consequences. I am also using fair
scheduler so that independent jobs can run in parallel.

Spark 2.3.1.

-- 
Regards,
Ramandeep Singh
http://orastack.com