Re: How to split a dataframe into two dataframes based on count

2020-05-18 Thread Vipul Rajan
Hi Mohit,

"Seems like the limit on parent is executed twice and return different
records each time. Not sure why it is executed twice when I mentioned only
once"

That is to be expected. Since spark follows lazy evaluation, which means
that execution only happens when you call an action, every action would
result in every step being processed again (mostly, some steps are
automatically cached and skipped). You can try using this

var dfChild1 = dfParent.limit(dfChild1Count).cache()

when you sort your dataframe this problem does not occur because sorting
causes data to shuffle. Spark automatically caches when a data shuffle
happens.

Let me know if you get it to work.

Regards

On Mon, May 18, 2020 at 10:27 PM Mohit Durgapal 
wrote:

> Dear All,
>
> I would like to know how, in spark 2.0, can I split a dataframe into two
> dataframes when I know the exact counts the two dataframes should have. I
> tried using limit but got quite weird results. Also, I am looking for exact
> counts in child dfs, not the approximate % based split.
>
> *Following is what I have tried:*
>
> var dfParent = sc.read.parquet("somelocation");// let's say it has 4000
> rows
>
> I want to split the parent into two dfs with the following counts:
>
> var dfChild1Count = 1000
>
> var dfChild2Count = 3000
>
> *I tried this: *
>
> var dfChild1 = dfParent.limit(dfChild1Count);
>
> var dfChild2 = dfParent.except(dfChild1);
>
> *and wrote that to output hdfs directories:*
>
> dfChild1.write.parquet("/outputfilechild1");
>
> dfChild2.write.parquet("/outputfilechild2");
>
> It turns out this results in some duplicates saved in
> files outputfilechild1 & outputfilechild2. Could anyone explain why they
> have duplicates?
>
> When I sorted my parent dataframe before limit, it then worked fine:
>
>
> *dfParent = dfParent.sortBy(col("unique_col").desc())*
> Seems like the limit on parent is executed twice and return different
> records each time. Not sure why it is executed twice when I mentioned only
> once.
>
> Also, Is there a better way to split a df into multiple dfs when we know
> exact counts of the child dfs?
>
>
>
>
> Regards,
> Mohit
>
>
>


Re: Issue with UDF Int Conversion - Str to Int

2020-03-23 Thread Vipul Rajan
Hi Ayan,

You don't have to bother with conversion at all. All functions that should
work on number columns would still work as long as all values in the column
are numbers:
scala> df2.printSchema
root
 |-- id: string (nullable = false)
 |-- id2: string (nullable = false)


scala> df2.show
+---+---+
| id|id2|
+---+---+
|  0|  0|
|  1|  1|
|  2|  2|
|  3|  3|
|  4|  4|
|  5|  5|
|  6|  6|
|  7|  7|
|  8|  8|
|  9|  9|
+---+---+


scala> df2.select($"id" + $"id2").show
+--+
|(id + id2)|
+--+
|   0.0|
|   2.0|
|   4.0|
|   6.0|
|   8.0|
|  10.0|
|  12.0|
|  14.0|
|  16.0|
|  18.0|
+--+


scala> df2.select(sum("id")).show
+---+
|sum(id)|
+---+
|   45.0|
+---+

On Tue, Mar 24, 2020 at 12:11 AM ayan guha  wrote:

> AwesomeDid not know about conv function so thanks for that
>
> On Tue, 24 Mar 2020 at 1:23 am, Enrico Minack 
> wrote:
>
>> Ayan,
>>
>> no need for UDFs, the SQL API provides all you need (sha1, substring,
>> conv):
>> https://spark.apache.org/docs/2.4.5/api/python/pyspark.sql.html
>>
>> >>> df.select(conv(substring(sha1(col("value_to_hash")), 33, 8), 16,
>> 10).cast("long").alias("sha2long")).show()
>> +--+
>> |  sha2long|
>> +--+
>> | 478797741|
>> |2520346415|
>> +--+
>>
>> This creates a lean query plan:
>>
>> >>> df.select(conv(substring(sha1(col("value_to_hash")), 33, 8), 16,
>> 10).cast("long").alias("sha2long")).explain()
>> == Physical Plan ==
>> Union
>> :- *(1) Project [478797741 AS sha2long#74L]
>> :  +- Scan OneRowRelation[]
>> +- *(2) Project [2520346415 AS sha2long#76L]
>>+- Scan OneRowRelation[]
>>
>>
>> Enrico
>>
>>
>> Am 23.03.20 um 06:13 schrieb ayan guha:
>>
>> Hi
>>
>> I am trying to implement simple hashing/checksum logic. The key logic is
>> -
>>
>> 1. Generate sha1 hash
>> 2. Extract last 8 chars
>> 3. Convert 8 chars to Int (using base 16)
>>
>> Here is the cut down version of the code:
>>
>>
>> ---
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *from pyspark.sql.functions import * from pyspark.sql.types import * from
>> hashlib import sha1 as local_sha1 df = spark.sql("select '4104003141'
>> value_to_hash union all  select '4102859263'") f1 = lambda x:
>> str(int(local_sha1(x.encode('UTF-8')).hexdigest()[32:],16)) f2 = lambda x:
>> int(local_sha1(x.encode('UTF-8')).hexdigest()[32:],16) sha2Int1 = udf( f1 ,
>> StringType()) sha2Int2 = udf( f2 , IntegerType()) print(f('4102859263'))
>> dfr = df.select(df.value_to_hash, sha2Int1(df.value_to_hash).alias('1'),
>> sha2Int2(df.value_to_hash).alias('2')) *
>> *dfr.show(truncate=False)*
>>
>> -
>>
>> I was expecting both columns should provide exact same values, however
>> thats not the case *"always" *
>>
>> 2520346415 +-+--+---+ |value_to_hash|1 |2 |
>> +-+--+---+ |4104003141 |478797741 |478797741 | 
>> |4102859263
>> |2520346415|-1774620881| +-+--+---+
>>
>> The function working fine, as shown in the print statement. However
>> values are not matching and vary widely.
>>
>> Any pointer?
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>>
>> --
> Best Regards,
> Ayan Guha
>


Re: [External]Re: spark 2.x design docs

2019-09-19 Thread Vipul Rajan
Yes,

I realize what you were looking for, I am also looking for the same docs.
Haven't found em yet. Also, jacek laskowski's gitbooks are the next best
thing to follow. If you haven't yet.

Regards

On Thu, Sep 19, 2019 at 12:46 PM  wrote:

> Thanks Vipul,
>
>
>
> I was looking specifically for documents spark committer use for reference.
>
>
>
> Currently I’ve put custom logs in spark-core sources then building and
> running jobs on it.
>
> Form printed logs I try to understand execution flows.
>
>
>
> *From:* Vipul Rajan 
> *Sent:* Thursday, September 19, 2019 12:23 PM
> *To:* Kamal7 Kumar 
> *Cc:* spark-user 
> *Subject:* [External]Re: spark 2.x design docs
>
>
>
> The e-mail below is from an external source. Please do not open
> attachments or click links from an unknown or suspicious origin.
>
>
> https://github.com/JerryLead/SparkInternals/blob/master/EnglishVersion/2-JobLogicalPlan.md
> This is pretty old. but it might help a little bit. I myself am going
> through the source code and trying to reverse engineer stuff. Let me know
> if you'd like to pool resources sometime.
>
>
>
> Regards
>
>
>
> On Thu, Sep 19, 2019 at 11:35 AM  wrote:
>
> Hi ,
>
> Can someone provide documents/links (apart from official documentation) *for
> understanding internal workings of spark-core*,
>
> Document containing components pseudo codes, class diagrams, execution
> flows , etc.
>
> Thanks, Kamal
>
>
> "*Confidentiality Warning*: This message and any attachments are intended
> only for the use of the intended recipient(s), are confidential and may be
> privileged. If you are not the intended recipient, you are hereby notified
> that any review, re-transmission, conversion to hard copy, copying,
> circulation or other use of this message and any attachments is strictly
> prohibited. If you are not the intended recipient, please notify the sender
> immediately by return email and delete this message and any attachments
> from your system.
>
> *Virus Warning:* Although the company has taken reasonable precautions to
> ensure no viruses are present in this email. The company cannot accept
> responsibility for any loss or damage arising from the use of this email or
> attachment."
>
>
> "*Confidentiality Warning*: This message and any attachments are intended
> only for the use of the intended recipient(s), are confidential and may be
> privileged. If you are not the intended recipient, you are hereby notified
> that any review, re-transmission, conversion to hard copy, copying,
> circulation or other use of this message and any attachments is strictly
> prohibited. If you are not the intended recipient, please notify the sender
> immediately by return email and delete this message and any attachments
> from your system.
>
> *Virus Warning:* Although the company has taken reasonable precautions to
> ensure no viruses are present in this email. The company cannot accept
> responsibility for any loss or damage arising from the use of this email or
> attachment."
>


Re: spark 2.x design docs

2019-09-19 Thread Vipul Rajan
https://github.com/JerryLead/SparkInternals/blob/master/EnglishVersion/2-JobLogicalPlan.md
This is pretty old. but it might help a little bit. I myself am going
through the source code and trying to reverse engineer stuff. Let me know
if you'd like to pool resources sometime.

Regards

On Thu, Sep 19, 2019 at 11:35 AM  wrote:

> Hi ,
>
> Can someone provide documents/links (apart from official documentation) *for
> understanding internal workings of spark-core*,
>
> Document containing components pseudo codes, class diagrams, execution
> flows , etc.
>
> Thanks, Kamal
>
>
> "*Confidentiality Warning*: This message and any attachments are intended
> only for the use of the intended recipient(s), are confidential and may be
> privileged. If you are not the intended recipient, you are hereby notified
> that any review, re-transmission, conversion to hard copy, copying,
> circulation or other use of this message and any attachments is strictly
> prohibited. If you are not the intended recipient, please notify the sender
> immediately by return email and delete this message and any attachments
> from your system.
>
> *Virus Warning:* Although the company has taken reasonable precautions to
> ensure no viruses are present in this email. The company cannot accept
> responsibility for any loss or damage arising from the use of this email or
> attachment."
>


Re: Use derived column for other derived column in the same statement

2019-04-22 Thread Vipul Rajan
Hi Rishi,

TL;DR Using Scala, this would work
df.withColumn("derived1", lit("something")).withColumn("derived2",
col("derived1") === "something")

just note that I used 3 equal to signs instead of just two. That should be
enough, if you want to understand why read further.

so "==" gives boolean as a return value, but that is not what you want,
that's why you wrap your string "something" in lit() in the first
withColumn statement. This turns your string type into
org.apache.spark.sql.Column type which the withColumn function would accept.
alternatively
lit(col("derived1") == "something") would syntactically work and not throw
any errors, but it would always be false, since you are not checking the
values in the column derived1, you are merely testing if col("derived1"),
which is of type org.apache.spark.sql.Column is the same as "something",
which is of type string which is obviously false

below is the output of my spark shell:
scala> col("asdf") == col("asdf")
res5: Boolean = true

scala> col("derived1") == "something"
res6: Boolean = false

what you want is for your expression to return an
org.apache.spark.sql.Column type. Please take a look here and scroll down
till the "===" function. You'd see that it return an
org.apache.spark.sql.Column.
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Column@===(other:Any):org.apache.spark.sql.Column

It doesn't explicitly say so but using this you actually compare the values
in column "derived1" against the string "something".

Hope it helps

Regards

On Mon, Apr 22, 2019 at 8:56 AM Shraddha Shah 
wrote:

> Also the same thing for groupby agg operation, how can we use one
> aggregated result (say min(amount)) to derive another aggregated column?
>
> On Sun, Apr 21, 2019 at 11:24 PM Rishi Shah 
> wrote:
>
>> Hello All,
>>
>> How can we use a derived column1 for deriving another column in the same
>> dataframe operation statement?
>>
>> something like:
>>
>> df = df.withColumn('derived1', lit('something'))
>> .withColumn('derived2', col('derived1') == 'something')
>>
>> --
>> Regards,
>>
>> Rishi Shah
>>
>


Re: Structured Streaming initialized with cached data or others

2019-04-22 Thread Vipul Rajan
Please look into arbitrary stateful aggregation. I do not completely
understand your problem though. If you could give me an example. I'd be
happy to help

On Mon, 22 Apr 2019, 15:31 shicheng31...@gmail.com, 
wrote:

> Hi ,all:
> As we all known, structured streaming  is used to handle incremental
> problems.  However, if I need to make an increment based on an initial
> value, I need to get a previous state value when the program is
> initialized.
> Is there any way to assign an initial value to the'state'? Or
> other solutions?
>Thanks!
>
> --
> shicheng31...@gmail.com
>