[no subject]

2023-08-23 Thread ayan guha
Unsubscribe-- Best Regards, Ayan Guha

Re: What is the best way to organize a join within a foreach?

2023-04-26 Thread ayan guha
Marco. >>>>>> >>>>>> >>>>>> On Tue, Apr 25, 2023 at 5:40 AM Mich Talebzadeh < >>>>>> mich.talebza...@gmail.com> wrote: >>>>>> >>>>>>> Have you thought of using windowing function >>>>>>> <https://sparkbyexamples.com/spark/spark-sql-window-functions/>s to >>>>>>> achieve this? >>>>>>> >>>>>>> Effectively all your information is in the orders table. >>>>>>> >>>>>>> HTH >>>>>>> >>>>>>> Mich Talebzadeh, >>>>>>> Lead Solutions Architect/Engineering Lead >>>>>>> Palantir Technologies Limited >>>>>>> London >>>>>>> United Kingdom >>>>>>> >>>>>>> >>>>>>>view my Linkedin profile >>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>>>> >>>>>>> >>>>>>> https://en.everybodywiki.com/Mich_Talebzadeh >>>>>>> >>>>>>> >>>>>>> >>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility >>>>>>> for any loss, damage or destruction of data or any other property which >>>>>>> may >>>>>>> arise from relying on this email's technical content is explicitly >>>>>>> disclaimed. The author will in no case be liable for any monetary >>>>>>> damages >>>>>>> arising from such loss, damage or destruction. >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, 25 Apr 2023 at 00:15, Marco Costantini < >>>>>>> marco.costant...@rocketfncl.com> wrote: >>>>>>> >>>>>>>> I have two tables: {users, orders}. In this example, let's say that >>>>>>>> for each 1 User in the users table, there are 10 Orders in the >>>>>>>> orders >>>>>>>> table. >>>>>>>> >>>>>>>> I have to use pyspark to generate a statement of Orders for each >>>>>>>> User. So, a single user will need his/her own list of Orders. >>>>>>>> Additionally, >>>>>>>> I need to send this statement to the real-world user via email (for >>>>>>>> example). >>>>>>>> >>>>>>>> My first intuition was to apply a DataFrame.foreach() on the users >>>>>>>> DataFrame. This way, I can rely on the spark workers to handle the >>>>>>>> email >>>>>>>> sending individually. However, I now do not know the best way to get >>>>>>>> each >>>>>>>> User's Orders. >>>>>>>> >>>>>>>> I will soon try the following (pseudo-code): >>>>>>>> >>>>>>>> ``` >>>>>>>> users_df = >>>>>>>> orders_df = >>>>>>>> >>>>>>>> #this is poorly named for max understandability in this context >>>>>>>> def foreach_function(row): >>>>>>>> user_id = row.user_id >>>>>>>> user_orders_df = orders_df.select(f'user_id = {user_id}') >>>>>>>> >>>>>>>> #here, I'd get any User info from 'row' >>>>>>>> #then, I'd convert all 'user_orders' to JSON >>>>>>>> #then, I'd prepare the email and send it >>>>>>>> >>>>>>>> users_df.foreach(foreach_function) >>>>>>>> ``` >>>>>>>> >>>>>>>> It is my understanding that if I do my user-specific work in the >>>>>>>> foreach function, I will capitalize on Spark's scalability when doing >>>>>>>> that >>>>>>>> work. However, I am worried of two things: >>>>>>>> >>>>>>>> If I take all Orders up front... >>>>>>>> >>>>>>>> Will that work? >>>>>>>> Will I be taking too much? Will I be taking Orders on partitions >>>>>>>> who won't handle them (different User). >>>>>>>> >>>>>>>> If I create the orders_df (filtered) within the foreach function... >>>>>>>> >>>>>>>> Will it work? >>>>>>>> Will that be too much IO to DB? >>>>>>>> >>>>>>>> The question ultimately is: How can I achieve this goal efficiently? >>>>>>>> >>>>>>>> I have not yet tried anything here. I am doing so as we speak, but >>>>>>>> am suffering from choice-paralysis. >>>>>>>> >>>>>>>> Please and thank you. >>>>>>>> >>>>>>> -- Best Regards, Ayan Guha

Re: Got Error Creating permanent view in Postgresql through Pyspark code

2023-01-05 Thread ayan guha
nt view table in the database.How shall create permanent view >> using Pyspark code. Please do reply. >> >> *Error Message::* >> *Exception has occurred: Analysis Exception* >> Not allowed to create a permanent view `default`.`TEMP1` by referencing a >> temporary view TEMP_VIEW. Please create a temp view instead by CREATE TEMP >> VIEW >> >> >> Regards, >> Vajiha >> Research Analyst >> MW Solutions >> > -- Best Regards, Ayan Guha

Re: Profiling data quality with Spark

2022-12-27 Thread ayan guha
damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>> >>> On Tue, 27 Dec 2022 at 19:25, rajat kumar >>> wrote: >>> >>>> Hi Folks >>>> Hoping you are doing well, I want to implement data quality to detect >>>> issues in data in advance. I have heard about few frameworks like GE/Deequ. >>>> Can anyone pls suggest which one is good and how do I get started on it? >>>> >>>> Regards >>>> Rajat >>>> >>> -- Best Regards, Ayan Guha

Re: [EXTERNAL] Re: Re: Re: Re: Stage level scheduling - lower the number of executors when using GPUs

2022-11-06 Thread ayan guha
U stage. We are using dynamic allocation with stage level scheduling, and > Spark tries to maximize the number of executors also during the GPU stage, > causing a bit of resources chaos in the cluster. This forces us to use a > lower value for 'maxExecutors' in the first place, at the cost of the CPU > stages performance. Or try to solve this in the Kubernets scheduler level, > which is not straightforward and doesn't feel like the right way to go. > > Is there a way to effectively use less executors in Stage Level > Scheduling? The API does not seem to include such an option, but maybe > there is some more advanced workaround? > > Thanks, > Shay > > > > > > > > -- Best Regards, Ayan Guha

Re: log transfering into hadoop/spark

2022-08-02 Thread ayan guha
e tool to transfer webserver logs into > hdfs/spark? > > thank you. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- Best Regards, Ayan Guha

Re: [pyspark delta] [delta][Spark SQL]: Getting an Analysis Exception. The associated location (path) is not empty

2022-08-02 Thread ayan guha
>>> self.target_id, self.name) 1306 >>> /Users/kyjan/spark-3.0.3-bin-hadoop2.7\python\pyspark\sql\utils.py in >>> deco(*a, **kw)132 # Hide where the exception came from >>> that shows a non-Pythonic133 # JVM exception >>> message.--> 134 raise_from(converted)135 >>> else:136 raise >>> /Users/kyjan/spark-3.0.3-bin-hadoop2.7\python\pyspark\sql\utils.py in >>> raise_from(e) >>> AnalysisException: Cannot create table ('`EXERCISE`.`WORKED_HOURS`'). The >>> associated location ('output/delta') is not empty.; >>> >>> >>> -- >>> Best Wishes, >>> Kumba Janga >>> >>> "The only way of finding the limits of the possible is by going beyond >>> them into the impossible" >>> -Arthur C. Clarke >>> >> > > -- > Best Wishes, > Kumba Janga > > "The only way of finding the limits of the possible is by going beyond > them into the impossible" > -Arthur C. Clarke > -- Best Regards, Ayan Guha

Re: Salting technique doubt

2022-07-31 Thread ayan guha
the below understanding: >>>>> >>>>> So, using the salting technique we can actually change the joining >>>>> column values by appending some random number in a specified range. >>>>> >>>>> So, suppose I have these two values in a partition of two different >>>>> tables: >>>>> >>>>> Table A: >>>>> Partition1: >>>>> x >>>>> . >>>>> . >>>>> . >>>>> x >>>>> >>>>> Table B: >>>>> Partition1: >>>>> x >>>>> . >>>>> . >>>>> . >>>>> x >>>>> >>>>> After Salting it would be something like the below: >>>>> >>>>> Table A: >>>>> Partition1: >>>>> x_1 >>>>> >>>>> Partition 2: >>>>> x_2 >>>>> >>>>> Table B: >>>>> Partition1: >>>>> x_3 >>>>> >>>>> Partition 2: >>>>> x_8 >>>>> >>>>> Now, when I inner join these two tables after salting in order to >>>>> avoid data skewness problems, I won't get a match since the keys are >>>>> different after applying salting techniques. >>>>> >>>>> So how does this resolves the data skewness issue or if there is some >>>>> understanding gap? >>>>> >>>>> Could anyone help me in layman's terms? >>>>> >>>>> TIA, >>>>> Sid >>>>> >>>> -- Best Regards, Ayan Guha

Re: very simple UI on webpage to display x/y plots+histogram of data stored in hive

2022-07-18 Thread ayan guha
m of data > stored in hive and retrieved with spark into pandas… > > Many thanks for your suggestion! > > > On 18 Jul 2022, at 15:08, Sean Owen wrote: > > You pull your data via Spark to a pandas DF and do whatever you want > > > -- Best Regards, Ayan Guha

Re: reading each JSON file from dataframe...

2022-07-12 Thread ayan guha
ms4ajmdtdedtwa3mf|gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr| >> >> |id-01f7pqqbxejt3ef4ap9qcs78m5|gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt| >> >> |id-01f7pqqbynh895ptpjjfxvk6dc|gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw| >> >> +-+---+---+ >> >> I would like to read each row from `file_path` and write the result to >> another dataframe containing `entity_id`, `other_useful_id`, >> `json_content`, `file_path`. >> Assume that I already have the required HDFS url libraries in my >> classpath. >> >> Please advice, >> Muthu >> >> >> >> > -- Best Regards, Ayan Guha

Re: Question about bucketing and custom partitioners

2022-04-11 Thread ayan guha
API dataframe, is it possible to apply custom > partitioner to a dataframe ? > Is it possible to repartition the dataframe with a narrow > transformation like what could be done with RDD ? > Is there some sort of dataframe developer API ? Do you have any > pointers on this ? > > Thanks ! > > David > -- Best Regards, Ayan Guha

Re: how to change data type for columns of dataframe

2022-04-01 Thread ayan guha
or some columns in this dataframe? > > For example, change the column type from Int to Float. > > Thanks. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- Best Regards, Ayan Guha

Re: pivoting panda dataframe

2022-03-16 Thread ayan guha
.Series[np.int32]: >>>>> ... return x ** 2 >>>>> >>> df.transform(square) >>>>>A B >>>>> 0 0 1 >>>>> 1 1 4 >>>>> 2 4 9 >>>>> >>>>> You can omit the type hint and let pandas-on-Spark infer its type. >>>>> >>>>> >>> df.transform(lambda x: x ** 2) >>>>>A B >>>>> 0 0 1 >>>>> 1 1 4 >>>>> 2 4 9 >>>>> >>>>> For multi-index columns: >>>>> >>>>> >>> df.columns = [('X', 'A'), ('X', 'B')] >>>>> >>> df.transform(square) # doctest: +NORMALIZE_WHITESPACE >>>>>X >>>>>A B >>>>> 0 0 1 >>>>> 1 1 4 >>>>> 2 4 9 >>>>> >>>>> >>> (df * -1).transform(abs) # doctest: +NORMALIZE_WHITESPACE >>>>>X >>>>>A B >>>>> 0 0 1 >>>>> 1 1 2 >>>>> 2 2 3 >>>>> >>>>> You can also specify extra arguments. >>>>> >>>>> >>> def calculation(x, y, z) -> ps.Series[int]: >>>>> ... return x ** y + z >>>>> >>> df.transform(calculation, y=10, z=20) # doctest: >>>>> >>> +NORMALIZE_WHITESPACE >>>>> X >>>>> A B >>>>> 020 21 >>>>> 121 1044 >>>>> 2 1044 59069File: /opt/spark/python/pyspark/pandas/frame.pyType: >>>>>method >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> tir. 15. mar. 2022 kl. 19:33 skrev Andrew Davidson >>>> >: >>>>> >>>>>> Hi Bjorn >>>>>> >>>>>> >>>>>> >>>>>> I have been looking for spark transform for a while. Can you send me >>>>>> a link to the pyspark function? >>>>>> >>>>>> >>>>>> >>>>>> I assume pandas transform is not really an option. I think it will >>>>>> try to pull the entire dataframe into the drivers memory. >>>>>> >>>>>> >>>>>> >>>>>> Kind regards >>>>>> >>>>>> >>>>>> >>>>>> Andy >>>>>> >>>>>> >>>>>> >>>>>> p.s. My real problem is that spark does not allow you to bind >>>>>> columns. You can use union() to bind rows. I could get the equivalent of >>>>>> cbind() using union().transform() >>>>>> >>>>>> >>>>>> >>>>>> *From: *Bjørn Jørgensen >>>>>> *Date: *Tuesday, March 15, 2022 at 10:37 AM >>>>>> *To: *Mich Talebzadeh >>>>>> *Cc: *"user @spark" >>>>>> *Subject: *Re: pivoting panda dataframe >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.transpose.html >>>>>> we >>>>>> have that transpose in pandas api for spark to. >>>>>> >>>>>> >>>>>> >>>>>> You also have stack() and multilevel >>>>>> https://pandas.pydata.org/pandas-docs/stable/user_guide/reshaping.html >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> tir. 15. mar. 2022 kl. 17:50 skrev Mich Talebzadeh < >>>>>> mich.talebza...@gmail.com>: >>>>>> >>>>>> >>>>>> hi, >>>>>> >>>>>> >>>>>> >>>>>> Is it possible to pivot a panda dataframe by making the row column >>>>>> heading? >>>>>> >>>>>> >>>>>> >>>>>> thanks >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> [image: Image removed by sender.] view my Linkedin profile >>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>>> >>>>>> >>>>>> >>>>>> https://en.everybodywiki.com/Mich_Talebzadeh >>>>>> >>>>>> >>>>>> >>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility >>>>>> for any loss, damage or destruction of data or any other property which >>>>>> may >>>>>> arise from relying on this email's technical content is explicitly >>>>>> disclaimed. The author will in no case be liable for any monetary damages >>>>>> arising from such loss, damage or destruction. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> Bjørn Jørgensen >>>>>> Vestre Aspehaug 4 >>>>>> <https://www.google.com/maps/search/Vestre+Aspehaug+4?entry=gmail=g>, >>>>>> 6010 Ålesund >>>>>> Norge >>>>>> >>>>>> +47 480 94 297 >>>>>> >>>>> >>>>> >>>>> -- >>>>> Bjørn Jørgensen >>>>> Vestre Aspehaug 4 >>>>> <https://www.google.com/maps/search/Vestre+Aspehaug+4?entry=gmail=g>, >>>>> 6010 Ålesund >>>>> Norge >>>>> >>>>> +47 480 94 297 >>>>> >>>> -- Best Regards, Ayan Guha

Re: Decompress Gzip files from EventHub with Structured Streaming

2022-03-08 Thread ayan guha
3. Should I use Autoloader or just simply stream data into Databricks >using Event Hubs? > > I am especially curious about the trade-offs and the best way forward. I > don't have massive amounts of data. > > Thank you very much in advance! > > Best wishes, > Maurizio Vancho Argall > > -- Best Regards, Ayan Guha

Re: Question about spark.sql min_by

2022-02-21 Thread ayan guha
pView("table") >> cheapest_sellers_df = spark.sql("select min_by(sellerId, price) sellerId, >> min(price) from table group by productId") >> >> Is there a way I can rely on min_by directly in groupby ? >> Is there some code missing in pyspark wrapper to make min_by visible >> somehow ? >> >> Thank you in advance for your help. >> >> Cheers >> David >> > -- Best Regards, Ayan Guha

Re: Cast int to string not possible?

2022-02-17 Thread ayan guha
--- >> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >> >> > >> > >> > - >> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> > >> >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >> -- Best Regards, Ayan Guha

Re: Does spark support something like the bind function in R?

2022-02-08 Thread ayan guha
; *##6 2.883 55 3300* > > ``` > > > > I wonder if this is just a wrapper around join? If so it is probably not > going to help me out. > > > > Also I would prefer to work in python > > > > Any thoughts? > > > > Kind regards > > > > Andy > > > > > -- Best Regards, Ayan Guha

Re: add an auto_increment column

2022-02-07 Thread ayan guha
gt; >> Monotonically_increasing_id() will give the same functionality > >> > >> On Mon, 7 Feb, 2022, 6:57 am , wrote: > >> > >>> For a dataframe object, how to add a column who is auto_increment > >>> like > >>> mysql's behavior? > >>> > >>> Thank you. > >>> > >>> > >> > > - > >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- Best Regards, Ayan Guha

Re: add an auto_increment column

2022-02-06 Thread ayan guha
t; Thank you. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- Best Regards, Ayan Guha

Re: How to delete the record

2022-01-27 Thread ayan guha
> >>>>> On Thu, 27 Jan 2022, 22:44 Sean Owen, wrote: >>>>> >>>>>> This is what storage engines like Delta, Hudi, Iceberg are for. No >>>>>> need to manage it manually or use a DBMS. These formats allow deletes, >>>>>> upserts, etc of data, using Spark, on cloud storage. >>>>>> >>>>>> On Thu, Jan 27, 2022 at 10:56 AM Mich Talebzadeh < >>>>>> mich.talebza...@gmail.com> wrote: >>>>>> >>>>>>> Where ETL data is stored? >>>>>>> >>>>>>> >>>>>>> >>>>>>> *But now the main problem is when the record at the source is >>>>>>> deleted, it should be deleted in my final transformed record too.* >>>>>>> >>>>>>> >>>>>>> If your final sync (storage) is data warehouse, it should be soft >>>>>>> flagged with op_type (Insert/Update/Delete) and op_time (timestamp). >>>>>>> >>>>>>> >>>>>>> >>>>>>> HTH >>>>>>> >>>>>>> >>>>>>>view my Linkedin profile >>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>>>> >>>>>>> >>>>>>> >>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility >>>>>>> for any loss, damage or destruction of data or any other property which >>>>>>> may >>>>>>> arise from relying on this email's technical content is explicitly >>>>>>> disclaimed. The author will in no case be liable for any monetary >>>>>>> damages >>>>>>> arising from such loss, damage or destruction. >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Thu, 27 Jan 2022 at 15:48, Sid Kal >>>>>>> wrote: >>>>>>> >>>>>>>> I am using Spark incremental approach for bringing the latest data >>>>>>>> everyday. Everything works fine. >>>>>>>> >>>>>>>> But now the main problem is when the record at the source is >>>>>>>> deleted, it should be deleted in my final transformed record too. >>>>>>>> >>>>>>>> How do I capture such changes and change my table too ? >>>>>>>> >>>>>>>> Best regards, >>>>>>>> Sid >>>>>>>> >>>>>>>> -- Best Regards, Ayan Guha

Re: What happens when a partition that holds data under a task fails

2022-01-23 Thread ayan guha
same partition >>>>>> is >>>>>> processed. >>>>>> >>>>>> On Fri, Jan 21, 2022 at 12:00 PM Siddhesh Kalgaonkar < >>>>>> kalgaonkarsiddh...@gmail.com> wrote: >>>>>> >>>>>>> Hello team, >>>>>>> >>>>>>> I am aware that in case of memory issues when a task fails, it will >>>>>>> try to restart 4 times since it is a default number and if it still >>>>>>> fails >>>>>>> then it will cause the entire job to fail. >>>>>>> >>>>>>> But suppose if I am reading a file that is distributed across nodes >>>>>>> in partitions. So, what will happen if a partition fails that holds some >>>>>>> data? Will it re-read the entire file and get that specific subset of >>>>>>> data >>>>>>> since the driver has the complete information? or will it copy the data >>>>>>> to >>>>>>> the other working nodes or tasks and try to run it? >>>>>>> >>>>>> -- Best Regards, Ayan Guha

Re: Naming files while saving a Dataframe

2021-07-17 Thread ayan guha
ame directory is that the data is > partitioned by 'day' (mmdd) but the job runs hourly. Maybe the only way > to do this is to create an hourly partition (/mmdd/hh). Is that the > only way to solve this? > > On Fri, Jul 16, 2021 at 5:45 PM ayan guha wrote: > >> IM

Re: Naming files while saving a Dataframe

2021-07-16 Thread ayan guha
ch job wrote > which file. Maybe provide a 'prefix' to the file names. I was wondering if > there's any 'option' that allows us to do this. Googling didn't come up > with any solution so thought of asking the Spark experts on this mailing > list. > > Thanks in advance. > -- Best Regards, Ayan Guha

Re: [Spark Structured Streaming] retry/replay failed messages

2021-07-09 Thread ayan guha
gt;>>>>>>>> checkpoint_path). \ >>>>>>>>> queryName(config['MDVariables']['topic']). \ >>>>>>>>> start() >>>>>>>>> print(result) >>>>>>>>> >>>>>>>>> except Exception as e: >>>>>>>>> print(f"""{e}, quitting""") >>>>>>>>> sys.exit(1) >>>>>>>>> >>>>>>>>> Inside that function say *sendToSink *you can get the df and >>>>>>>>> batchId >>>>>>>>> >>>>>>>>> def sendToSink(df, batchId): >>>>>>>>> if(len(df.take(1))) > 0: >>>>>>>>> print(f"""md batchId is {batchId}""") >>>>>>>>> df.show(100,False) >>>>>>>>> df. persist() >>>>>>>>> # write to BigQuery batch table >>>>>>>>> s.writeTableToBQ(df, "append", >>>>>>>>> config['MDVariables']['targetDataset'],config['MDVariables']['targetTable']) >>>>>>>>> df.unpersist() >>>>>>>>> print(f"""wrote to DB""") >>>>>>>>> else: >>>>>>>>> print("DataFrame md is empty") >>>>>>>>> >>>>>>>>> And you have created DF from the other topic newtopic >>>>>>>>> >>>>>>>>> def sendToControl(dfnewtopic, batchId): >>>>>>>>> if(len(dfnewtopic.take(1))) > 0: >>>>>>>>> .. >>>>>>>>> >>>>>>>>> Now you have two dataframe* df* and *dfnewtopic* in the same >>>>>>>>> session. Will you be able to join these two dataframes through common >>>>>>>>> key >>>>>>>>> value? >>>>>>>>> >>>>>>>>> HTH >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>>view my Linkedin profile >>>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility >>>>>>>>> for any loss, damage or destruction of data or any other property >>>>>>>>> which may >>>>>>>>> arise from relying on this email's technical content is explicitly >>>>>>>>> disclaimed. The author will in no case be liable for any monetary >>>>>>>>> damages >>>>>>>>> arising from such loss, damage or destruction. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, 9 Jul 2021 at 17:41, Bruno Oliveira >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hello! Sure thing! >>>>>>>>>> >>>>>>>>>> I'm reading them *separately*, both are apps written with Scala >>>>>>>>>> + Spark Structured Streaming. >>>>>>>>>> >>>>>>>>>> I feel like I missed some details on my original thread (sorry it >>>>>>>>>> was past 4 AM) and it was getting frustrating >>>>>>>>>> Please let me try to clarify some points: >>>>>>>>>> >>>>>>>>>> *Transactions Created Consumer* >>>>>>>>>> --- >>>>>>>>>> | Kafka trx-created-topic | <--- (Scala + SparkStructured >>>>>>>>>> Streaming) ConsumerApp ---> Sinks to ---> Postgres DB Table >>>>>>>>>> (Transactions) >>>>>>>>>> --- >>>>>>>>>> >>>>>>>>>> *Transactions Processed Consumer* >>>>>>>>>> - >>>>>>>>>> | Kafka trx-processed-topic | <--- 1) (Scala + SparkStructured >>>>>>>>>> Streaming) AnotherConsumerApp fetches a Dataset (let's call it "a") >>>>>>>>>> - 2) Selects the >>>>>>>>>> Ids >>>>>>>>>> - >>>>>>>>>> | Postgres / Trx table |. <--- 3) Fetches the rows w/ >>>>>>>>>> the matching ids that have status 'created (let's call it "b") >>>>>>>>>> - 4) Performs an >>>>>>>>>> intersection between "a" and "b" resulting in a >>>>>>>>>> "b_that_needs_sinking" (but >>>>>>>>>> now there's some "b_leftovers" that were out of the intersection) >>>>>>>>>> 5) Sinks >>>>>>>>>> "b_that_needs_sinking" to DB, but that leaves the "b_leftovers" as >>>>>>>>>> unprocessed (not persisted) >>>>>>>>>> 6) However, >>>>>>>>>> those "b_leftovers" would, ultimately, be processed at some point >>>>>>>>>> (even if >>>>>>>>>> it takes like 1-3 days) - when their corresponding transaction_id are >>>>>>>>>> pushed >>>>>>>>>> to the "trx-created-topic" Kafka topic, and are then processed by >>>>>>>>>> that >>>>>>>>>> first consumer >>>>>>>>>> >>>>>>>>>> So, what I'm trying to accomplish is find a way to reprocess >>>>>>>>>> those "b_leftovers" *without *having to restart the app >>>>>>>>>> Does that make sense? >>>>>>>>>> >>>>>>>>>> PS: It doesn't necessarily have to be real streaming, if >>>>>>>>>> micro-batching (legacy Spark Streaming) would allow such a thing, it >>>>>>>>>> would >>>>>>>>>> technically work (although I keep hearing it's not advisable) >>>>>>>>>> >>>>>>>>>> Thank you so much! >>>>>>>>>> >>>>>>>>>> Kind regards >>>>>>>>>> >>>>>>>>>> On Fri, Jul 9, 2021 at 12:13 PM Mich Talebzadeh < >>>>>>>>>> mich.talebza...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Can you please clarify if you are reading these two topics >>>>>>>>>>> separately or within the same scala or python script in Spark >>>>>>>>>>> Structured >>>>>>>>>>> Streaming? >>>>>>>>>>> >>>>>>>>>>> HTH >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>view my Linkedin profile >>>>>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all >>>>>>>>>>> responsibility for any loss, damage or destruction of data or any >>>>>>>>>>> other >>>>>>>>>>> property which may arise from relying on this email's technical >>>>>>>>>>> content is >>>>>>>>>>> explicitly disclaimed. The author will in no case be liable for any >>>>>>>>>>> monetary damages arising from such loss, damage or destruction. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Fri, 9 Jul 2021 at 13:44, Bruno Oliveira < >>>>>>>>>>> bruno.ar...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hello guys, >>>>>>>>>>>> >>>>>>>>>>>> I've been struggling with this for some days now, without >>>>>>>>>>>> success, so I would highly appreciate any enlightenment. The >>>>>>>>>>>> simplified >>>>>>>>>>>> scenario is the following: >>>>>>>>>>>> >>>>>>>>>>>>- I've got 2 topics in Kafka (it's already like that in >>>>>>>>>>>>production, can't change it) >>>>>>>>>>>> - transactions-created, >>>>>>>>>>>> - transaction-processed >>>>>>>>>>>>- Even though the schema is not exactly the same, they all >>>>>>>>>>>>share a correlation_id, which is their "transaction_id" >>>>>>>>>>>> >>>>>>>>>>>> So, long story short, I've got 2 consumers, one for each topic, >>>>>>>>>>>> and all I wanna do is sink them in a chain order. I'm writing them >>>>>>>>>>>> w/ Spark >>>>>>>>>>>> Structured Streaming, btw >>>>>>>>>>>> >>>>>>>>>>>> So far so good, the caveat here is: >>>>>>>>>>>> >>>>>>>>>>>> - I cannot write a given "*processed" *transaction unless >>>>>>>>>>>> there is an entry of that same transaction with the status " >>>>>>>>>>>> *created*". >>>>>>>>>>>> >>>>>>>>>>>> - There is *no* guarantee that any transactions in the topic >>>>>>>>>>>> "transaction-*processed*" have a match (same transaction_id) >>>>>>>>>>>> in the "transaction-*created*" at the moment the messages are >>>>>>>>>>>> fetched. >>>>>>>>>>>> >>>>>>>>>>>> So the workflow so far is: >>>>>>>>>>>> - Msgs from the "transaction-created" just get synced to >>>>>>>>>>>> postgres, no questions asked >>>>>>>>>>>> >>>>>>>>>>>> - As for the "transaction-processed", it goes as follows: >>>>>>>>>>>> >>>>>>>>>>>>- a) Messages are fetched from the Kafka topic >>>>>>>>>>>>- b) Select the transaction_id of those... >>>>>>>>>>>>- c) Fetch all the rows w/ the corresponding id from a >>>>>>>>>>>>Postgres table AND that have the status "CREATED" >>>>>>>>>>>>- d) Then, a pretty much do a intersection between the two >>>>>>>>>>>>datasets, and sink only on "processed" ones that have with step >>>>>>>>>>>> c >>>>>>>>>>>>- e) Persist the resulting dataset >>>>>>>>>>>> >>>>>>>>>>>> But the rows (from the 'processed') that were not part of the >>>>>>>>>>>> intersection get lost afterwards... >>>>>>>>>>>> >>>>>>>>>>>> So my question is: >>>>>>>>>>>> - Is there ANY way to reprocess/replay them at all WITHOUT >>>>>>>>>>>> restarting the app? >>>>>>>>>>>> - For this scenario, should I fall back to Spark Streaming, >>>>>>>>>>>> instead of Structured Streaming? >>>>>>>>>>>> >>>>>>>>>>>> PS: I was playing around with Spark Streaming (legacy) and >>>>>>>>>>>> managed to commit only the ones in the microbatches that were fully >>>>>>>>>>>> successful (still failed to find a way to "poll" for the >>>>>>>>>>>> uncommitted ones >>>>>>>>>>>> without restarting, though). >>>>>>>>>>>> >>>>>>>>>>>> Thank you very much in advance! >>>>>>>>>>>> >>>>>>>>>>>> -- Best Regards, Ayan Guha

Re: Insert into table with one the value is derived from DB function using spark

2021-06-19 Thread ayan guha
adeh-ph-d-5205b2/> >>> >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>> >>> On Fri, 18 Jun 2021 at 20:49, Anshul Kala wrote: >>> >>>> Hi All, >>>> >>>> I am using spark to ingest data from file to database Oracle table . >>>> For one of the fields , the value to be populated is generated from a >>>> function that is written in database . >>>> >>>> The input to the function is one of the fields of data frame >>>> >>>> I wanted to use spark.dbc.write to perform the operation, which >>>> generates the insert query at back end . >>>> >>>> For example : It can generate the insert query as : >>>> >>>> Insert into table values (?,?, getfunctionvalue(?) ) >>>> >>>> Please advise if it is possible in spark and if yes , how can it be >>>> done >>>> >>>> This is little urgent for me . So any help is appreciated >>>> >>>> Thanks >>>> Anshul >>>> >>> -- Best Regards, Ayan Guha

Re: Spark-sql can replace Hive ?

2021-06-10 Thread ayan guha
e.? > > > > thanks > > > > > -- Best Regards, Ayan Guha

Re: DF blank value fill

2021-05-21 Thread ayan guha
ronize empty cells of all records with the same > combination of MainKey and SubKey with the respective value of other rows > with the same key combination? > > A certain value, if not null, of a col is guaranteed to be unique within > the df. If a col exists then there is at least one row with a not-null > value. > > > > I am using pyspark. > > > > Thanks for any hint, > > Best > > Meikel > -- Best Regards, Ayan Guha

Re: PySpark Write File Container exited with a non-zero exit code 143

2021-05-19 Thread ayan guha
n lastcol): > > lastcol=lastcol.replace('\r','') > > df=df.withColumn(lastcol, > regexp_replace(col("{}\r".format(lastcol)), "[\r]", > "")).drop('{}\r'.format(lastcol)) > > > df.write.format('parquet').mode('overwrite').save("{}/{}".format(HDFS_OUT,fname_noext)) > > > > > > > > Caused by: org.apache.spark.SparkException: Job aborted due to stage > failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task > 0.3 in stage 1.0 (TID 4, DataNode01.mydomain.com, executor 5): > ExecutorLostFailure (executor 5 exited caused by one of the running tasks) > Reason: Container marked as failed: > container_e331_1621375512548_0021_01_06 on host: > DataNode01.mydomain.com. Exit status: 143. Diagnostics: [2021-05-19 > 18:09:06.392]Container killed on request. Exit code is 143 > [2021-05-19 18:09:06.413]Container exited with a non-zero exit code 143. > [2021-05-19 18:09:06.414]Killed by external signal > > > > > > THANKS! CLAY > > > > -- Best Regards, Ayan Guha

Re: Merge two dataframes

2021-05-19 Thread ayan guha
gt;> >>> >> >>> >> >>> -- >> >>> Raghavendra > > >> >>> >> >>> >> >>> On Wed, May 12, 2021 at 6:20 PM kushagra deep < >> kushagra94d...@gmail.com> wrote: >> >>>> >> >>>> Hi All, >> >>>> >> >>>> I have two dataframes >> >>>> >> >>>> df1 >> >>>> >> >>>> amount_6m >> >>>> 100 >> >>>> 200 >> >>>> 300 >> >>>> 400 >> >>>> 500 >> >>>> >> >>>> And a second data df2 below >> >>>> >> >>>> amount_9m >> >>>> 500 >> >>>> 600 >> >>>> 700 >> >>>> 800 >> >>>> 900 >> >>>> >> >>>> The number of rows is same in both dataframes. >> >>>> >> >>>> Can I merge the two dataframes to achieve below df >> >>>> >> >>>> df3 >> >>>> >> >>>> amount_6m | amount_9m >> >>>> 100 500 >> >>>> 200 600 >> >>>> 300 700 >> >>>> 400 800 >> >>>> 500 900 >> >>>> >> >>>> Thanks in advance >> >>>> >> >>>> Reg, >> >>>> Kushagra Deep >> >>>> >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >> -- Best Regards, Ayan Guha

Re: [Spark Catalog API] Support for metadata Backup/Restore

2021-05-07 Thread ayan guha
>>> We want to introduce the backup and restore from an API level. We are >>> thinking of doing this simply by adding backup() and restore() in >>> CatalogImpl, as ExternalCatalog already includes all the methods we need to >>> retrieve and recreate entities. We are wondering if there is any concern or >>> drawback of this approach. Please advise. >>> >>> Thank you in advance, >>> Tianchen >>> >> -- Best Regards, Ayan Guha

Re: Graceful shutdown SPARK Structured Streaming

2021-05-06 Thread ayan guha
t your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> >> On Wed, 5 May 2021 at 17:30, Gourav Sengupta < >> gourav.sengupta.develo...@gmail.com> wrote: >> >>> Hi, >>> >>> just thought of reaching out once again and seeking out your kind help >>> to find out what is the best way to stop SPARK streaming gracefully. Do we >>> still use the methods of creating a file as in SPARK 2.4.x which is several >>> years old method or do we have a better approach in SPARK 3.1? >>> >>> Regards, >>> Gourav Sengupta >>> >>> -- Forwarded message - >>> From: Gourav Sengupta >>> Date: Wed, Apr 21, 2021 at 10:06 AM >>> Subject: Graceful shutdown SPARK Structured Streaming >>> To: >>> >>> >>> Dear friends, >>> >>> is there any documentation available for gracefully stopping SPARK >>> Structured Streaming in 3.1.x? >>> >>> I am referring to articles which are 4 to 5 years old and was wondering >>> whether there is a better way available today to gracefully shutdown a >>> SPARK streaming job. >>> >>> Thanks a ton in advance for all your kind help. >>> >>> Regards, >>> Gourav Sengupta >>> >> -- Best Regards, Ayan Guha

Spark Streaming with Files

2021-04-23 Thread ayan guha
es on like T1, T2 etc folders. Also, lets assume files are written every 10 mins, but I want to process them every 4 hours. Can I use streaming method so that it can manage checkpoints on its own? Best - Ayan -- Best Regards, Ayan Guha

Re: Python level of knowledge for Spark and PySpark

2021-04-14 Thread ayan guha
ark? I also know > Pandas and am also familiar with plotting routines like matplotlib. > > Warmest > > Ashok > -- Best Regards, Ayan Guha

Re: [Spark SQL]:to calculate distance between four coordinates(Latitude1, Longtitude1, Latitude2, Longtitude2) in the pysaprk dataframe

2021-04-09 Thread ayan guha
over, at least, a single-row UDF. > > On Fri, Apr 9, 2021 at 9:14 AM ayan guha wrote: > >> Hi Sean - absolutely open to suggestions. >> >> My impression was using spark native functions should provide similar >> perf as scala ones because serialization penalty s

Re: [Spark SQL]:to calculate distance between four coordinates(Latitude1, Longtitude1, Latitude2, Longtitude2) in the pysaprk dataframe

2021-04-09 Thread ayan guha
> *From:* Sean Owen > *Sent:* Friday, April 9, 2021 6:11 PM > *To:* ayan guha > *Cc:* Rao Bandaru ; User > *Subject:* Re: [Spark SQL]:to calculate distance between four > coordinates(Latitude1, Longtitude1, Latitude2, Longtitude2) in the pysaprk > dataframe >

Re: [Spark SQL]:to calculate distance between four coordinates(Latitude1, Longtitude1, Latitude2, Longtitude2) in the pysaprk dataframe

2021-04-09 Thread ayan guha
; > Thanks, > > Ankamma Rao B > -- Best Regards, Ayan Guha

Re: Rdd - zip with index

2021-03-24 Thread ayan guha
y Registration No. (3),Proprietorship >>>> Category (3),Country Incorporated (3),Proprietor (3) Address (1),Proprietor >>>> (3) Address (2),Proprietor (3) Address (3),Proprietor Name (4),Company >>>> Registration No. (4),Proprietorship Category (4),Country Incorporated >>>> (4),Proprietor (4) Address (1),Proprietor (4) Address (2),Proprietor (4) >>>> Address (3),Date Proprietor Added,Additional Proprietor Indicator >>>> >>>> >>>> 10GB is not much of a big CSV file >>>> >>>> that will resolve the header anyway. >>>> >>>> >>>> Also how are you running the spark, in a local mode (single jvm) or >>>> other distributed modes (yarn, standalone) ? >>>> >>>> >>>> HTH >>>> >>> -- Best Regards, Ayan Guha

Re: Rdd - zip with index

2021-03-23 Thread ayan guha
.com> wrote: >> >> >> >> Hi, Mohammed >> >> I think that the reason that only one executor is running and have >> single partition is because you have single file that might be read/loaded >> into memory. >> >> >> >> In order to achieve better parallelism I’d suggest to split the csv >> file. >> >> >> >> -- Best Regards, Ayan Guha

Re: How to apply ranger policies on Spark

2020-11-24 Thread ayan guha
xpected when we use hive cli and beeline. But when we access those hive > tables using spark-shell or spark-submit it does not work. > > Any suggestions to make Ranger work with Spark? > > > Regards > > Joyan > > -- Best Regards, Ayan Guha

Re: Apache Spark Connector for SQL Server and Azure SQL

2020-10-26 Thread ayan guha
ll.se > > Please consider the environment before printing this e-mail > > Confidentiality: C2 - Internal > -- Best Regards, Ayan Guha

Re: Why spark-submit works with package not with jar

2020-10-20 Thread ayan guha
user/jars/ddhybrid.jar >>> *-**-packages com.github.samelamin:spark-bigquery_2.11:0.2.6* >>> >>> >>> I have read the write-ups about packages searching the maven >>> libraries etc. Not convinced why using the package should make so much >>> difference between a failure and success. In other words, when to use a >>> package rather than a jar. >>> >>> >>> Any ideas will be appreciated. >>> >>> >>> Thanks >>> >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >> -- Best Regards, Ayan Guha

Re: Count distinct and driver memory

2020-10-19 Thread ayan guha
; > nicolas paris > > > > - > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > > > > > > > - > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > > -- > nicolas paris > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- Best Regards, Ayan Guha

Re: Scala vs Python for ETL with Spark

2020-10-11 Thread ayan guha
has folks that can do python seriously why then spark in the first > place. You can do workflow on your own, streaming or batch or what ever you > want. > I would not do anything else aside from python, but that is me. > > On Sat, Oct 10, 2020, 9:42 PM ayan guha wrote: > >

Re: Scala vs Python for ETL with Spark

2020-10-10 Thread ayan guha
>>> >>>> These are my understanding but they are not facts so I would like to >>>> get some informed views on this if I can? >>>> >>>> Many thanks, >>>> >>>> Mich >>>> >>>> >>>> >>>> >>>> LinkedIn * >>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>> >>>> >>>> >>>> >>>> >>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>> any loss, damage or destruction of data or any other property which may >>>> arise from relying on this email's technical content is explicitly >>>> disclaimed. The author will in no case be liable for any monetary damages >>>> arising from such loss, damage or destruction. >>>> >>>> >>>> >>> -- Best Regards, Ayan Guha

https://issues.apache.org/jira/browse/SPARK-18381

2020-09-25 Thread ayan guha
Anyone aware of any workaround for https://issues.apache.org/jira/browse/SPARK-18381 Other than upgrade to Spark 3 I mean,,, -- Best Regards, Ayan Guha

Re: Edge AI with Spark

2020-09-24 Thread ayan guha
nning Android/iOS? > > > > > > Best regards, > > > Marco Sassarini > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > *Marco SassariniArtificial Intelligence Department* > > > > > > > office: +39 0434 562 978 > > > > www.overit.it > > > > > > > > > > > > > > > > > > > -- Best Regards, Ayan Guha

Re: Spark :- Update record in partition.

2020-06-07 Thread ayan guha
date the row and overwrite the partition? > > Is there a way to only update 1 row like DBMS. Otherwise 1 row update > takes a long time to rewrite the whole partition ? > > Thanks > Sunil > > > > > -- Best Regards, Ayan Guha

Re: How to pass a constant value to a partitioned hive table in spark

2020-04-16 Thread ayan guha
""" >> > sqltext: String = >> > $INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId = >> > $broadcastValue, brand = "dummy") >> > SELECT >> > ocis_party_id AS partyId >> > , target_mobile_no AS phoneNumber >> > FROM tmp >> > >> > >> > scala> spark.sql($sqltext) >> > :41: error: not found: value $sqltext >> > spark.sql($sqltext) >> > >> > >> > Any ideas? >> > >> > >> > Thanks >> > >> > >> > Dr Mich Talebzadeh >> > >> > >> > >> > LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> > < >> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> >* >> > >> > >> > >> > http://talebzadehmich.wordpress.com >> > >> > >> > *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any >> > loss, damage or destruction of data or any other property which may >> arise >> > from relying on this email's technical content is explicitly disclaimed. >> > The author will in no case be liable for any monetary damages arising >> from >> > such loss, damage or destruction. >> > -- Best Regards, Ayan Guha

Re: OFF TOPIC LIST CRITERIA

2020-03-28 Thread ayan guha
>> > >> > He has not reported any bugs while I have reported so many in such a >> short space of time. >> > He has warned me as well >> > >> > So that Sean Owen does not put a barrier in place for me in my path to >> free learning and free Apache software >> > I would like somebody to clarify the criteria for me. >> > >> > >> > Backbutton.co.uk >> > ¯\_(ツ)_/¯ >> > ♡۶Java♡۶RMI ♡۶ >> > Make Use Method {MUM} >> > makeuse.org >> > -- Best Regards, Ayan Guha

Re: Issue with UDF Int Conversion - Str to Int

2020-03-23 Thread ayan guha
ery plan: > > >>> df.select(conv(substring(sha1(col("value_to_hash")), 33, 8), 16, > 10).cast("long").alias("sha2long")).explain() > == Physical Plan == > Union > :- *(1) Project [478797741 AS sha2long#74L] > : +- Scan OneRowRelation[] &

Re: Issue with UDF Int Conversion - Str to Int

2020-03-23 Thread ayan guha
+ |value_to_hash|1 |2 | >> +-+--+---+ |4104003141 |478797741 |478797741 | >> |4102859263 >> |2520346415|-1774620881| +-+--+---+ >> >> The function working fine, as shown in the print statement. However >> values are not matching and vary widely. >> >> Any pointer? >> >> -- >> Best Regards, >> Ayan Guha >> > -- Best Regards, Ayan Guha

Issue with UDF Int Conversion - Str to Int

2020-03-22 Thread ayan guha
---+ |4104003141 |478797741 |478797741 | |4102859263 |2520346415|-1774620881| +-+--+---+ The function working fine, as shown in the print statement. However values are not matching and vary widely. Any pointer? -- Best Regards, Ayan Guha

Re: Optimising multiple hive table join and query in spark

2020-03-15 Thread ayan guha
ashParittioning + >> shuffle during join. All the table join contain `Country` column with some >> extra column based on the table. >> >> Is there any way to avoid these shuffles ? and improve performance ? >> >> >> Thanks and regards >> Manjunath >> > -- Best Regards, Ayan Guha

Re: Performance tuning on the Databricks pyspark 2.4.4

2020-01-21 Thread ayan guha
available resources. > > > > >start = time.time() > pool = multiprocessing.Pool(20) > # This will execute get_counts() parallel, on each element inside > input_paths. > # result (a list of dictionary) is constructed when all executions are > completed. > //result = pool.map(get_counts, input_paths) > > end = time.time() > > result_df = pd.DataFrame(result) > # You can use, result_df.to_csv() to store the results in a csv. > print(result_df) > print('Time take : {}'.format(end - start)) > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- Best Regards, Ayan Guha

Re: Identify bottleneck

2019-12-20 Thread ayan guha
t isn't any better. The logic all gets processed by > the same engine so to confirm, compare the DAGs generated from both > approaches and see if they're identical. > > On Fri, 20 Dec 2019, 8:56 am ayan guha, wrote: > >> Quick question: Why is it better to use one sql vs multiple wi

Re: Identify bottleneck

2019-12-19 Thread ayan guha
> I also noticed that memory storage is never used during the execution. I > know from several hours of research that bz2 is the only real compression > algorithm usable as an input in spark for parallelization reasons. > > Do you have any idea of why such a behaviour ? > and do you have any idea on how to improve such treatment ? > > Cheers > > Antoine > > > -- Best Regards, Ayan Guha

Re: How more than one spark job can write to same partition in the parquet file

2019-12-11 Thread ayan guha
We partitioned data logically for 2 different jobs...in our use case based on geography... On Thu, 12 Dec 2019 at 3:39 pm, Chetan Khatri wrote: > Thanks, If you can share alternative change in design. I would love to > hear from you. > > On Wed, Dec 11, 2019 at 9:34 PM ayan

Re: How more than one spark job can write to same partition in the parquet file

2019-12-11 Thread ayan guha
No we faced problem with that setup. On Thu, 12 Dec 2019 at 11:14 am, Chetan Khatri wrote: > Hi Spark Users, > would that be possible to write to same partition to the parquet file > through concurrent two spark jobs with different spark session. > > thanks > -- Best Regards, Ayan Guha

Re: Is there a merge API available for writing DataFrame

2019-11-15 Thread ayan guha
alent of Hive's MERGE INTO but maybe we can have a way of > writing (updating) only those rows present in the DF, with the rest of the > rows/data in the sink untouched. > > Sivaprasanna > -- Best Regards, Ayan Guha

Re: Explode/Flatten Map type Data Using Pyspark

2019-11-14 Thread ayan guha
ack 0.01 0998 > > eve_id, k1 k2k3 k4 k5 > 003, aaa device endpoint - > > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > ----- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- Best Regards, Ayan Guha

Re: Explode/Flatten Map type Data Using Pyspark

2019-11-14 Thread ayan guha
ues).The number of key values > will be different for each event id.so i want to flatten the records for > all > the map type(key values) as below > > eve_id k1 k2 k3 > 001abc xyz 10091 > > eve_id, k1 k2 k3 k4 > 002, 12 jack 0.01 0998 > > eve_id, k1 k2k3 k4 k5 > 003, aaa device endpoint - > > > Thanks > Anbu > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- Best Regards, Ayan Guha

Re: Avro file question

2019-11-04 Thread ayan guha
> >> I’m assuming that a single large avro file can also be split into >> multiple mappers/reducers/executors during processing. >> >> Thanks. >> > -- Best Regards, Ayan Guha

Fwd: Delta with intelligent upsett

2019-10-31 Thread ayan guha
to be an obvious performance booster. Any thoughts? -- Best Regards, Ayan Guha -- Best Regards, Ayan Guha

Re: Driver vs master

2019-10-07 Thread ayan guha
he driver regardless of how many threads you have. >>> >>> >>> So if we will run more jobs then we need more memory on master. Please >>>> correct me if I am wrong. >>>> >>> >>> This depends on your application, but in general more threads will >>> require more memory. >>> >>> >>> >>>> >>>> Thanks >>>> Amit >>>> >>> -- >>> It's dark in this basement. >>> >> -- > It's dark in this basement. > -- Best Regards, Ayan Guha

Re: Convert a line of String into column

2019-10-05 Thread ayan guha
gt;> val query = words. >> writeStream. >> outputMode("append"). >> format("console"). >> start >> query.awaitTermination() >> >> But in fact this code only turns the line into a single column >> >> +---+ >> | value| >> +---+ >> |col1...| >> |col2...| >> | col3..| >> | ... | >> | col6 | >> +--+ >> >> How to achieve the effect that I want to do? >> >> Thanks? >> >> -- Best Regards, Ayan Guha

Re: Learning Spark

2019-07-04 Thread ayan guha
;> I am new Spark learner. Can someone guide me with the strategy towards >> getting expertise in PySpark. >> >> Thanks!!! >> > -- Best Regards, Ayan Guha

Re: Announcing Delta Lake 0.2.0

2019-06-21 Thread ayan guha
with this workaround, other than during the time when the batch is running the table can provide some wrong information? Best Ayan On Fri, Jun 21, 2019 at 8:03 PM Tathagata Das wrote: > @ayan guha @Gourav Sengupta > > Delta Lake is OSS currently does not support defining tables in Hive &g

Re: Announcing Delta Lake 0.2.0

2019-06-20 Thread ayan guha
: >>> https://docs.delta.io/0.2.0/delta-storage.html >>> >>> *Improved concurrency* >>> Delta Lake now allows concurrent append-only writes while still ensuring >>> serializability. For concurrency control in Delta Lake, please see: >>> https://docs.delta.io/0.2.0/delta-concurrency.html >>> >>> We have also greatly expanded the test coverage as part of this release. >>> >>> We would like to acknowledge all community members for contributing to >>> this release. >>> >>> Best regards, >>> Liwen Sun >>> >>> -- Best Regards, Ayan Guha

Re: Announcing Delta Lake 0.2.0

2019-06-19 Thread ayan guha
gt;> >>> Best regards, >>> Liwen Sun >>> >>> -- >>> You received this message because you are subscribed to the Google >>> Groups "Delta Lake Users and Developers" group. >>> To unsubscribe from this group and stop receiving emails from it, send >>> an email to delta-users+unsubscr...@googlegroups.com. >>> To view this discussion on the web visit >>> https://groups.google.com/d/msgid/delta-users/CAE4dWq9g90NkUr_SLs2J6kFPbOpxx4wy6MEgb%3DQ5pBxkUcK%2B-A%40mail.gmail.com >>> <https://groups.google.com/d/msgid/delta-users/CAE4dWq9g90NkUr_SLs2J6kFPbOpxx4wy6MEgb%3DQ5pBxkUcK%2B-A%40mail.gmail.com?utm_medium=email_source=footer> >>> . >>> >> -- Best Regards, Ayan Guha

Re: Databricks - number of executors, shuffle.partitions etc

2019-05-15 Thread ayan guha
oesn't take into account any spark.conf.set properties... it creates 8 >> worker nodes (dat executors) but doesn't honor the supplied conf >> parameters. Any idea? >> >> -- >> Regards, >> >> Rishi Shah >> > > > -- > Regards, > > Rishi Shah > -- Best Regards, Ayan Guha

Re: How to retrieve multiple columns values (in one row) to variables in Spark Scala method

2019-04-05 Thread ayan guha
Try like this: val primitiveDS = spark.sql("select 1.2 avg ,2.3 stddev").collect().apply(0) val arr = Array(primitiveDS.getDecimal(0), primitiveDS.getDecimal(1)) primitiveDS: org.apache.spark.sql.Row = [1.2,2.3] arr: Array[java.math.BigDecimal] = Array(1.2, 2.3)

Re: Where does the Driver run?

2019-03-29 Thread ayan guha
rything seems to run fine and sometimes completes successfully. Frequent > failures are the reason for this question. > > > > Where is the Driver running? I don’t see it in the GUI, I see 2 Executors > taking all cluster resources. With a Yarn cluster I would expect the > “Driver" to run on/in the Yarn Master but I am using the Spark Standalone > Master, where is the Drive part of the Job running? > > > > If is is running in the Master, we are in trouble because I start the > Master on one of my 2 Workers sharing resources with one of the Executors. > Executor mem + driver mem is > available mem on a Worker. I can change this > but need so understand where the Driver part of the Spark Job runs. Is it > in the Spark Master, or inside and Executor, or ??? > > > > The “Driver” creates and broadcasts some large data structures so the need > for an answer is more critical than with more typical tiny Drivers. > > > > Thanks for you help! > > > > > -- > > Cheers! > > > > -- Best Regards, Ayan Guha

SparkR issue

2018-10-09 Thread ayan guha
uld we speed up this bit? 2. We understand better options would be to read data from external sources, but we need this data to be generated for some simulation purpose. Whats possibly going wrong? Best Ayan -- Best Regards, Ayan Guha

Re: Pyspark Partitioning

2018-09-30 Thread ayan guha
Point1 >> 2| id2| Point2 >> >> I want to have a partition for every Group_Id and apply on every >> partition a function defined by me. >> I have tried with partitionBy('Group_Id').mapPartitions() but i receive >> error. >> Could you please advice me how to do it? >> > -- Best Regards, Ayan Guha

Re: Time-Series Forecasting

2018-09-19 Thread ayan guha
2018, at 18:01, Mina Aslani wrote: >> > >> > Hi, >> > I have a question for you. Do we have any Time-Series Forecasting >> library in Spark? >> > >> > Best regards, >> > Mina >> > -- Best Regards, Ayan Guha

Re: Drawing Big Data tech diagrams using Pen Tablets

2018-09-12 Thread ayan guha
t;> that is clumsy and expensive. >>>>> >>>>> I was wondering if anyone has tried some tools like Wacom Intuos Pro >>>>> Paper Edition Pen Tablet >>>>> <https://www.wacom.com/en/products/pen-tablets/wacom-intuos-pro-paper> >>>>> or any similar tools for easier drawing and their recommendation? >>>>> >>>>> Thanks, >>>>> >>>>> Dr Mich Talebzadeh >>>>> >>>>> >>>>> >>>>> LinkedIn * >>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>> >>>>> >>>>> >>>>> http://talebzadehmich.wordpress.com >>>>> >>>>> >>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>>> any loss, damage or destruction of data or any other property which may >>>>> arise from relying on this email's technical content is explicitly >>>>> disclaimed. The author will in no case be liable for any monetary damages >>>>> arising from such loss, damage or destruction. >>>>> >>>>> >>>>> >>>> -- Best Regards, Ayan Guha

Big Burst of Streaming Changes

2018-07-29 Thread ayan guha
tool, is there any spark package which can actually listen to Oracle change stream? So can we use spark as the CDC tool itself? -- Best Regards, Ayan Guha

Re: the best tool to interact with Spark

2018-06-26 Thread ayan guha
t; Thank you, > Donni > -- Best Regards, Ayan Guha

Spark-Mongodb connector issue

2018-06-18 Thread ayan guha
-- Best Regards, Ayan Guha

Re: Append In-Place to S3

2018-06-03 Thread ayan guha
t; target, >>>>> target_id >>>>> FROM new_data i >>>>> LEFT ANTI JOIN existing_data im >>>>> ON i.source = im.source >>>>> AND i.source_id = im.source_id >>>>> AND i.target = im.target >>>>> AND i.target = im.target_id >>>>> """ >>>>> ) >>>>> append_df.coalesce(1).write.parquet(existing_data_path, mode='append', >>>>> compression='gzip’) >>>>> >>>>> >>>>> I thought this would append new rows and keep the data unique, but I >>>>> am see many duplicates. Can someone help me with this and tell me what I >>>>> am >>>>> doing wrong? >>>>> >>>>> Thanks, >>>>> Ben >>>>> >>>> -- Best Regards, Ayan Guha

Re: Bulk / Fast Read and Write with MSSQL Server and Spark

2018-05-23 Thread ayan guha
t;> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ >>> >>> - >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>> >>> >> > > -- > Thanks, > Ajay > -- Best Regards, Ayan Guha

Re: How to skip nonexistent file when read files with spark?

2018-05-21 Thread ayan guha
> Actually I want spark can just ignore and skip these nonexistent file >> path, and continues to run. I have tried python HDFSCli api to check the >> existence of path , but hdfs cli cannot support wildcard. >> >> >> >> Any good idea to solve my problem? Thanks~ >> >> >> >> Regard, >> Junfeng Chen >> > > -- Best Regards, Ayan Guha

Re: Submit many spark applications

2018-05-16 Thread ayan guha
ine where I run spark-submit if many spark-submit jvm are running. > Any > > suggestions on how to solve this problem? Thank you! > > > > -- > Marcelo > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- Best Regards, Ayan Guha

Re: native-lzo library not available

2018-05-03 Thread ayan guha
ool >> Executor.java:1145) >> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo >> lExecutor.java:615) >> at java.lang.Thread.run(Thread.java:745) >> Driver stacktrace: >> >> >> >> I see the LZO at GPextras: >> >> ll >> total 104 >> -rw-r--r-- 1 cloudera-scm cloudera-scm 35308 Oct 4 2017 >> COPYING.hadoop-lzo >> -rw-r--r-- 1 cloudera-scm cloudera-scm 62268 Oct 4 2017 >> hadoop-lzo-0.4.15-cdh5.13.0.jar >> lrwxrwxrwx 1 cloudera-scm cloudera-scm31 May 3 07:23 hadoop-lzo.jar >> -> hadoop-lzo-0.4.15-cdh5.13.0.jar >> drwxr-xr-x 2 cloudera-scm cloudera-scm 4096 Oct 4 2017 native >> >> >> >> >> -- >> Take Care >> Fawze Abujaber >> > > > > -- > Take Care > Fawze Abujaber > -- Best Regards, Ayan Guha

Re: Read or save specific blocks of a file

2018-05-03 Thread ayan guha
etrieve data from a specific block? (e.g: using > the blockID). > > Except that, is there any option to write the contents of each block > (or of one block) into separate files? > > Thank you very much, > Thodoris > > > - > To unsubscribe, e-mail: user-unsubscr...@hadoop.apache.org > For additional commands, e-mail: user-h...@hadoop.apache.org > > > > -- Best Regards, Ayan Guha

Re: How to read the schema of a partitioned dataframe without listing all the partitions ?

2018-04-27 Thread ayan guha
l spark : "hey, just > read a single partition and give me the schema of that partition and > consider it as the schema of the whole dataframe" ? (I don't care about > schema merge, it's off by the way) > > Thanks. > Walid. > -- Best Regards, Ayan Guha

Re: How to bulk insert using spark streaming job

2018-04-19 Thread ayan guha
iem...@gmail.com> wrote: > How to bulk insert using spark streaming job > > Sent from my iPhone > -- Best Regards, Ayan Guha

Re: Issue with map function in Spark 2.2.0

2018-04-11 Thread ayan guha
lection.TraversableOnce$class.toArray(TraversableOnce.scala:289) > at > org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28) > at > org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141) > at > org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:109) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > ... 1 more > > > > Please help me in this . Thanks. Nandan Priyadarshi > > -- Best Regards, Ayan Guha

Re: Need config params while doing rdd.foreach or map

2018-03-22 Thread ayan guha
nication in error, please notify > us immediately by responding to this email and then delete it from your > system. The firm is neither liable for the proper and complete transmission > of the information contained in this communication nor for any delay in its > receipt. > -- Best Regards, Ayan Guha

Re: Hive to Oracle using Spark - Type(Date) conversion issue

2018-03-18 Thread ayan guha
t; So is there any utility/api available in Spark to achieve this conversion >> issue? >> >> >> Thanks, >> Guru >> > -- Best Regards, Ayan Guha

Re: Pyspark Error: Unable to read a hive table with transactional property set as 'True'

2018-03-02 Thread ayan guha
nbr` int, > | > | `restr_trk_cls` string, > | > | `tst_hist_cd` string, > | > | `cret_ts` string, > | > | `ylw_grp_nbr` int, > | > | `geo_dfct_grp_nme` string, > | > | `supv_rollup_cd` string, > | > | `dfct_stat_cd` string, > | > | `lst_maint_id` string, > | > | `del_rsn_cd` string, > | > | `umt_prcs_user_id` string, > | > | `gdfct_vinsp_srestr` string, > | > | `gc_opr_init` string) > | > | CLUSTERED BY ( > | > | geo_car_nme) > | > | INTO 2 BUCKETS > | > | ROW FORMAT SERDE > | > | 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' >| > | STORED AS INPUTFORMAT > | > | 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' >| > | OUTPUTFORMAT > | > | 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' > | > | LOCATION > | > | 'hdfs://HADOOP02/apps/hive/warehouse/load_etl.db/trpt_ > geo_defect_prod_dec07_del_blank' | > | TBLPROPERTIES ( > | > | 'numFiles'='4', > | > | 'numRows'='0', > | > | 'rawDataSize'='0', > | > | 'totalSize'='2566942', > | > | 'transactional'='true', > | > | 'transient_lastDdlTime'='1518695199') >| > +--- > +--+ > > > Thanks, > D > -- Best Regards, Ayan Guha

Re: Can spark handle this scenario?

2018-02-16 Thread ayan guha
a UDF which apply random > operation on each record. Is Spark good at handling such scenario? > > > 2. Regarding the compilation error, any fix? I did not find a satisfactory > solution online. > > > Thanks for help! > > > > > > -- Best Regards, Ayan Guha

Re: Can spark handle this scenario?

2018-02-16 Thread ayan guha
** You do NOT need dataframes, I mean. On Sat, Feb 17, 2018 at 3:58 PM, ayan guha <guha.a...@gmail.com> wrote: > Hi > > Couple of suggestions: > > 1. Do not use Dataset, use Dataframe in this scenario. There is no benefit > of dataset features here. Using

Re: mapGroupsWithState in Python

2018-01-31 Thread ayan guha
https://databricks.com/session/deep-dive-into- > stateful-stream-processing-in-structured-streaming > > > > > > > > On Tue, Jan 30, 2018 at 5:14 AM, ayan guha <guha.a...@gmail.com> wrote: > >> Any help would be much appreciated :) >> >> On Mon, Jan 29, 2018 at 6:

Re: mapGroupsWithState in Python

2018-01-30 Thread ayan guha
Any help would be much appreciated :) On Mon, Jan 29, 2018 at 6:25 PM, ayan guha <guha.a...@gmail.com> wrote: > Hi > > I want to write something in Structured streaming: > > 1. I have a dataset which has 3 columns: id, last_update_timestamp, > attribute > 2. I am

mapGroupsWithState in Python

2018-01-28 Thread ayan guha
.alias('last_updated'), unpackedDF.jsonData) dedupDF = dataDF.dropDuplicates(subset=['id']).withWatermark('last_updated','24 hours') So it is not working. Any help is appreciated. -- Best Regards, Ayan Guha

Re: can HDFS be a streaming source like Kafka in Spark 2.2.0?

2018-01-15 Thread ayan guha
FS can be a streaming source like Kafka in Spark >>>>> 2.2.0? For example can I have stream1 reading from Kafka and writing to >>>>> HDFS and stream2 to read from HDFS and write it back to Kakfa ? such that >>>>> stream2 will be pulling the latest updates written by stream1. >>>>> >>>>> Thanks! >>>>> >>>> >>>> >>> >> > -- Best Regards, Ayan Guha

Re: Passing an array of more than 22 elements in a UDF

2017-12-22 Thread ayan guha
using Java, can anyone please suggest me how to take > more than 22 parameters in an UDF? I mean, if I want to pass all the > parameters as an array of integers? > > Thanks, > Aakash. > -- Best Regards, Ayan Guha

Re: JDBC to hive batch use case in spark

2017-12-09 Thread ayan guha
o write >> the data in hive. The data read parts works fine but when I ran the save >> call on hive context to write data, it throws the exception and it says the >> table or view does not exists even though the table is precreated in hive. >> >> Please help if anyone tried such scenario. >> >> Thanks >> > -- Best Regards, Ayan Guha

Re: Json Parsing.

2017-12-06 Thread ayan guha
ects in value column. > > Any advice or help would be appreciated. > > Regards, > Satyajit. > -- Best Regards, Ayan Guha

  1   2   3   4   5   6   7   8   >