Current state of dataset api

2021-10-04 Thread Magnus Nilsson
Hi, I tried using the (typed) Dataset API about three years ago. Then there were limitations with predicate pushdown, overhead serialization and maybe more things I've forgotten. Ultimately we chose the Dataframe API as the sweet spot. Does anyone know of a good overview of the current state of

Re: Scala vs Python for ETL with Spark

2020-10-17 Thread Magnus Nilsson
Holy war is a bit dramatic don't you think?  The difference between Scala and Python will always be very relevant when choosing between Spark and Pyspark. I wouldn't call it irrelevant to the original question. br, molotch On Sat, 17 Oct 2020 at 16:57, "Yuri Oleynikov (‫יורי אולייניקוב‬‎)" <

Re: Scala vs Python for ETL with Spark

2020-10-17 Thread Magnus Nilsson
I'm sorry you were offended. I'm not an expert in Python and I wasn't trying to attack you personally. It's just an opinion about what makes a language better or worse, it's not the single source of truth. You don't have to take offense. In the end its about context and what you're trying to

Re: [spark streaming] checkpoint location feature for batch processing

2020-05-03 Thread Magnus Nilsson
d limit per micro-batch on data source > (like maxOffsetsPerTrigger) and process all available input as possible. > (Data sources should migrate to the new API to take effect, but works for > built-in data sources like file and Kafka.) > > 1. https://issues.apache.org/jira/browse/SPARK-30669 > >

Re: [spark streaming] checkpoint location feature for batch processing

2020-05-02 Thread Magnus Nilsson
I've always had a question about Trigger.Once that I never got around to ask or test for myself. If you have a 24/7 stream to a Kafka topic. Will Trigger.Once get the last offset(s) when it starts and then quit once it hits this offset(s) or will the job run until no new messages is added to the

Re: Unablee to get to_timestamp with Timezone Information

2020-03-31 Thread Magnus Nilsson
And to answer your question (sorry, read too fast). The string is not in proper ISO8601. Extended form must be used throughout, ie 2020-04-11T20:40:00-05:00, there's a colon (:) lacking in the UTC offset info. br, Magnus On Tue, Mar 31, 2020 at 7:11 PM Magnus Nilsson wrote: > Timesta

Re: Unablee to get to_timestamp with Timezone Information

2020-03-31 Thread Magnus Nilsson
Timestamps aren't timezoned. If you parse ISO8601 strings they will be converted to UTC automatically. If you parse timestamps without timezone they will converted to the the timezone the server Spark is running on uses. You can change the timezone Spark uses with

Re: Optimising multiple hive table join and query in spark

2020-03-15 Thread Magnus Nilsson
Been a while but I remember reading on Stack Overflow you can use a UDF as a join condition to trick catalyst into not reshuffling the partitions, ie use regular equality on the column you partitioned or bucketed by and your custom comparer for the other columns. Never got around to try it out

Re: Schema store for Parquet

2020-03-04 Thread Magnus Nilsson
o use Apache Hive and spark? > > Thanks. > > On Wed, Mar 4, 2020 at 10:40 AM lucas.g...@gmail.com > wrote: > >> Or AWS glue catalog if you're in AWS >> >> On Wed, 4 Mar 2020 at 10:35, Magnus Nilsson wrote: >> >>> Google hive metastore. >>&g

Re: Schema store for Parquet

2020-03-04 Thread Magnus Nilsson
Google hive metastore. On Wed, Mar 4, 2020 at 7:29 PM Ruijing Li wrote: > Hi all, > > Has anyone explored efforts to have a centralized storage of schemas of > different parquet files? I know there is schema management for Avro, but > couldn’t find solutions for parquet schema management.

Re: Questions for platform to choose

2019-08-21 Thread Magnus Nilsson
Well, you are posting on the Spark mailing list. Though for streaming I'd recommend Flink over Spark any day of the week. Flink was written as a streaming platform from the beginning quickly aligning the API with the theoretical framework of Google's Dataflow whitepaper. It's awesome for

CPU:s per task

2019-07-17 Thread Magnus Nilsson
Hello all, TLDR; Can the number of cores used by a task vary or is it always one core per task? Is there a UI, metrics or logs I can check to see the number of cores used by the task? I have an ETL-pipeline where I do some transformations. In one of the stages which ought to be quite CPU-heavy

Re: Spark structural streaming sinks output late

2019-07-10 Thread Magnus Nilsson
Well, you should get updates every 10 seconds as long as there are events surviving your quite aggressive watermark condition. Spark will try to drop (not guaranteed) all events with a timestamp more than 500 milliseconds before the current watermark timestamp. Try to increase the watermark

Re: Structured Streaming foreach function

2019-06-23 Thread Magnus Nilsson
Row is a generic ordered collection of fields that most likely contain a Schema of StructType. You need to keep track of the datatypes of the fields yourself. If you want compile time safety of datatypes (and intellisense support) you need to use RDD:s or the Dataset[T] api. Dataset[T] might

Re: adding a column to a groupBy (dataframe)

2019-06-06 Thread Magnus Nilsson
Well, you could do a repartition on cityname/nrOfCities and use the spark_partition_id function or the mappartitionswithindex dataframe method to add a city Id column. Then just split the dataframe into two subsets. Be careful of hashcollisions on the reparition Key though, or more than one city

Re: Upsert for hive tables

2019-05-30 Thread Magnus Nilsson
Since parquet don't support updates you have to backfill your dataset. If that is your regular scenario you should partition your parquet files so backfilling becomes easier. As the data is structured now you have to update everything just to upsert quite a small amount of changed data. Look at

Logging DataFrame API pipelines

2019-04-02 Thread Magnus Nilsson
Hello all, How do you log what is happening inside your Spark Dataframe pipelines? I would like to collect statistics along the way, mostly count of rows at particular steps, to see where rows where filtered and what not. Is there any other way to do this than calling .count on the dataframe?

Re: Windowing LAG function Usage in Spark2.2 Dataset scala

2019-03-14 Thread Magnus Nilsson
import org.apache.spark.sql.expressions.Window val partitionBy = Window.partitionBy("name", "sit").orderBy("data_date") val newDf = df.withColumn("PreviousDate", lag("uniq_im", 1).over(partitionBy)) Cheers... On Thu, Mar 14, 2019 at 4:55 AM anbu wrote: > Hi, > > To calculate LAG functions

Re: How can I parse an "unnamed" json array present in a column?

2019-02-24 Thread Magnus Nilsson
odify the String other than an UDF or map > on the string? > > > > At the moment I am modifying it returning a generic type “t” so I can use > the same UDF for many different JSONs that have the same issue. > > > > Also , is there any advantage(if possible) to extract

Re: How can I parse an "unnamed" json array present in a column?

2019-02-24 Thread Magnus Nilsson
rt a function from 2.3 to 2.1? What other > options do I have? > > > > Thank you. > > > > > > *From:* Magnus Nilsson > *Sent:* Saturday, February 23, 2019 3:43 PM > *Cc:* user@spark.apache.org > *Subject:* Re: How can I parse an "unnamed" json array presen

Re: How can I parse an "unnamed" json array present in a column?

2019-02-23 Thread Magnus Nilsson
Use spark.sql.types.ArrayType instead of a Scala Array as the root type when you define the schema and it will work. Regards, Magnus On Fri, Feb 22, 2019 at 11:15 PM Yeikel wrote: > I have an "unnamed" json array stored in a *column*. > > The format is the following : > > column name : news >

Structured Streaming restart results in illegal state exception

2018-11-21 Thread Magnus Nilsson
Hello, I'm evaluating Structured Streaming trying to understand how resilient the pipeline is to failures. I ran a small test streaming data from an Azure Event Hub using Azure Databricks saving the data into a parquet file on the Databricks filesystem dbfs:/. I did an unclean shutdown by

Structured Streaming to file sink results in illegal state exception

2018-11-21 Thread Magnus Nilsson
I'm evaluating Structured Streaming trying to understand how resilient the pipeline is. I ran a small test streaming data from an Azure Event Hub using Azure Databricks saving the data into a parquet file on the Databricks filesystem dbfs:/. I did an unclean shutdown by cancelling the query. Then

Re: Spark DataSets and multiple write(.) calls

2018-11-19 Thread Magnus Nilsson
Magnus Nilsson 9:43 AM (0 minutes ago) to info I had the same requirements. As far as I know the only way is to extend the foreachwriter, cache the microbatch result and write to each output. https://docs.databricks.com/spark/latest/structured-streaming/foreach.html Unfortunately it seems

Event Hubs properties kvp-value adds " to strings

2018-10-31 Thread Magnus Nilsson
Hello all, I have this peculiar problem where quote " characters are added to the beginning and end of my string values. I get data using Structured Streaming from an Azure Event Hub using a Scala Notebook in Azure Databricks. The Dataframe schema received contain a property of type Map named