[pyspark 2.4+] BucketBy SortBy doesn't retain sort order

2020-03-02 Thread Rishi Shah
Hi All, I have 2 large tables (~1TB), I used the following to save both the tables. Then when I try to join both tables with join_column, it still does shuffle & sort before the join. Could someone please help? df.repartition(2000).write.bucketBy(1,

Re: Spark Streaming with mapGroupsWithState

2020-03-02 Thread Something Something
I changed it to Tuple2 and that problem is solved. Any thoughts on this message *Unapplied methods are only converted to functions when a function type is expected.* *You can make this conversion explicit by writing `updateAcrossEvents _` or `updateAcrossEvents(_,_,_,_,_)` instead of

Re: Spark Streaming with mapGroupsWithState

2020-03-02 Thread lec ssmi
maybe you can combine the fields you want to use into one field Something Something 于2020年3月3日周二 上午6:37写道: > I am writing a Stateful Streaming application in which I am using > mapGroupsWithState to create aggregates for Groups but I need to create > *Groups > based on more than one column in

Re: SPARK Suitable IDE

2020-03-02 Thread Jeff Evans
For developing Spark itself, or applications built using Spark? In either case, IntelliJ IDEA works well. For the former case, there is even a page explaining how to set it up. https://spark.apache.org/developer-tools.html On Mon, Mar 2, 2020, 4:43 PM Zahid Rahman wrote: > Hi, > > Can you

SPARK Suitable IDE

2020-03-02 Thread Zahid Rahman
Hi, Can you recommend a suitable IDE for Apache sparks from the list below or if you know a more suitable one ? Codeanywhere goormIDE Koding SourceLair ShiftEdit Browxy repl.it PaizaCloud IDE Eclipse Che Visual Studio Online Gitpod Google Cloud Shell Codio Codepen CodeTasty Glitch JSitor

Spark Streaming with mapGroupsWithState

2020-03-02 Thread Something Something
I am writing a Stateful Streaming application in which I am using mapGroupsWithState to create aggregates for Groups but I need to create *Groups based on more than one column in the Input Row*. All the examples in the 'Spark: The Definitive Guide' use only one column such as 'User' or 'Device'. I

Re:

2020-03-02 Thread Wim Van Leuven
Ok, good luck! On Mon, 2 Mar 2020 at 10:04, Hamish Whittal wrote: > Enrico, Wim (and privately Neil), thanks for the replies. I will give your > suggestions a whirl. > > Basically Wim recommended a pre-processing step to weed out the > problematic files. I am going to build that into the

Java Spark UDF cast exception

2020-03-02 Thread vishal.verma
*Facing casting issues while working with the spark UDF* UDF1 mode1 = new UDF1>, String>() { @Override public String call(WrappedArray> maps) throws Exception { List> lis = (List>) JavaConverters.seqAsJavaListConverter(maps).asJava(); java.util.Map a= lis.stream().flatMap(map ->

Re: Compute the Hash of each row in new column

2020-03-02 Thread Chetan Khatri
Thanks Enrico. I meant one hash of each single row in extra column something like this.. val newDs = typedRows.withColumn("hash", hash( typedRows.columns.map(col): _*)) On Mon, Mar 2, 2020 at 3:51 PM Enrico Minack wrote: > Well, then apply md5 on all columns: > > ds.select(ds.columns.map(col)

Re: Compute the Hash of each row in new column

2020-03-02 Thread Enrico Minack
Well, then apply md5 on all columns: ds.select(ds.columns.map(col) ++ ds.columns.map(column => md5(col(column)).as(s"$column hash")): _*).show(false) Enrico Am 02.03.20 um 11:10 schrieb Chetan Khatri: Thanks Enrico I want to compute hash of all the columns value in the row. On Fri, Feb 28,

[no subject]

2020-03-02 Thread lucas.wu

Re: Compute the Hash of each row in new column

2020-03-02 Thread Chetan Khatri
Thanks Enrico I want to compute hash of all the columns value in the row. On Fri, Feb 28, 2020 at 7:28 PM Enrico Minack wrote: > This computes the md5 hash of a given column id of Dataset ds: > > ds.withColumn("id hash", md5($"id")).show(false) > > Test with this Dataset ds: > > import

Re:

2020-03-02 Thread Hamish Whittal
Enrico, Wim (and privately Neil), thanks for the replies. I will give your suggestions a whirl. Basically Wim recommended a pre-processing step to weed out the problematic files. I am going to build that into the pipeline. I am not sure how the problems are creeping in because this is a regular

Re:

2020-03-02 Thread Enrico Minack
Looks like the schema of some files is unexpected. You could either run parquet-tools on each of the files and extract the schema to find the problematic files: |hdfs |||-stat "%n"| |hdfs://ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-*.parquet