[pyspark 2.4+] BucketBy SortBy doesn't retain sort order

2020-03-02 Thread Rishi Shah
Hi All,

I have 2 large tables (~1TB), I used the following to save both the tables.
Then when I try to join both tables with join_column, it still does shuffle
& sort before the join. Could someone please help?

df.repartition(2000).write.bucketBy(1,
join_column).sortBy(join_column).saveAsTable(tablename)

-- 
Regards,

Rishi Shah


Re: Spark Streaming with mapGroupsWithState

2020-03-02 Thread Something Something
I changed it to Tuple2 and that problem is solved.

Any thoughts on this message

*Unapplied methods are only converted to functions when a function type is
expected.*

*You can make this conversion explicit by writing `updateAcrossEvents _` or
`updateAcrossEvents(_,_,_,_,_)` instead of `updateAcrossEvents`.
.mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)*

On Mon, Mar 2, 2020 at 5:12 PM lec ssmi  wrote:

> maybe you can combine the fields you want to use into one field
>
> Something Something  于2020年3月3日周二 上午6:37写道:
>
>> I am writing a Stateful Streaming application in which I am using
>> mapGroupsWithState to create aggregates for Groups but I need to create 
>> *Groups
>> based on more than one column in the Input Row*. All the examples in the
>> 'Spark: The Definitive Guide' use only one column such as 'User' or
>> 'Device'. I am using code similar to what's given below. *How do I
>> specify more than one field in the 'groupByKey'?*
>>
>> There are other challenges as well. The book says we can use
>> 'updateAcrossEvents' the way given below but I get compile time error
>> saying:
>>
>>
>> *Error:(43, 65) missing argument list for method updateAcrossEvents in
>> object MainUnapplied methods are only converted to functions when a
>> function type is expected.You can make this conversion explicit by writing
>> `updateAcrossEvents _` or `updateAcrossEvents(_,_,_,_,_)` instead of
>> `updateAcrossEvents`.
>> .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)*
>>
>> Another challenge: Compiler also complains about the my *MyReport*: 
>> *Error:(41,
>> 12) Unable to find encoder for type stored in a Dataset.  Primitive types
>> (Int, String, etc) and Product types (case classes) are supported by
>> importing spark.implicits._  Support for serializing other types will be
>> added in future releases.*
>>
>> Help in resolving these errors would be greatly appreciated. Thanks in
>> advance.
>>
>>
>> withEventTime
>> .as[MyReport]
>>   .groupByKey(_.getKeys.getKey1). // How do I add _.getKeys.getKey2?
>>   
>> .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)
>>   .writeStream
>>   .queryName("test_query")
>>   .format("memory")
>>   .outputMode("update")
>>   .start()
>>
>>


Re: Spark Streaming with mapGroupsWithState

2020-03-02 Thread lec ssmi
maybe you can combine the fields you want to use into one field

Something Something  于2020年3月3日周二 上午6:37写道:

> I am writing a Stateful Streaming application in which I am using
> mapGroupsWithState to create aggregates for Groups but I need to create 
> *Groups
> based on more than one column in the Input Row*. All the examples in the
> 'Spark: The Definitive Guide' use only one column such as 'User' or
> 'Device'. I am using code similar to what's given below. *How do I
> specify more than one field in the 'groupByKey'?*
>
> There are other challenges as well. The book says we can use
> 'updateAcrossEvents' the way given below but I get compile time error
> saying:
>
>
> *Error:(43, 65) missing argument list for method updateAcrossEvents in
> object MainUnapplied methods are only converted to functions when a
> function type is expected.You can make this conversion explicit by writing
> `updateAcrossEvents _` or `updateAcrossEvents(_,_,_,_,_)` instead of
> `updateAcrossEvents`.
> .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)*
>
> Another challenge: Compiler also complains about the my *MyReport*: 
> *Error:(41,
> 12) Unable to find encoder for type stored in a Dataset.  Primitive types
> (Int, String, etc) and Product types (case classes) are supported by
> importing spark.implicits._  Support for serializing other types will be
> added in future releases.*
>
> Help in resolving these errors would be greatly appreciated. Thanks in
> advance.
>
>
> withEventTime
> .as[MyReport]
>   .groupByKey(_.getKeys.getKey1). // How do I add _.getKeys.getKey2?
>   
> .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)
>   .writeStream
>   .queryName("test_query")
>   .format("memory")
>   .outputMode("update")
>   .start()
>
>


Re: SPARK Suitable IDE

2020-03-02 Thread Jeff Evans
For developing Spark itself, or applications built using Spark? In either
case, IntelliJ IDEA works well. For the former case, there is even a page
explaining how to set it up. https://spark.apache.org/developer-tools.html

On Mon, Mar 2, 2020, 4:43 PM Zahid Rahman  wrote:

> Hi,
>
> Can you recommend a suitable IDE for Apache sparks from the list below or
> if you know a more suitable one ?
>
> Codeanywhere
> goormIDE
> Koding
> SourceLair
> ShiftEdit
> Browxy
> repl.it
> PaizaCloud IDE
> Eclipse Che
> Visual Studio Online
> Gitpod
> Google Cloud Shell
> Codio
> Codepen
> CodeTasty
> Glitch
> JSitor
> ICEcoder
> Codiad
> Dirigible
> Orion
> Codiva.io
> Collide
> Codenvy
> AWS Cloud9
> JSFiddle
> GitLab
> SLAppForge Sigma
> Jupyter
> CoCalc
>
> Backbutton.co.uk
> ¯\_(ツ)_/¯
> ♡۶Java♡۶RMI ♡۶
> Make Use Method {MUM}
> makeuse.org
> 
>


SPARK Suitable IDE

2020-03-02 Thread Zahid Rahman
Hi,

Can you recommend a suitable IDE for Apache sparks from the list below or
if you know a more suitable one ?

Codeanywhere
goormIDE
Koding
SourceLair
ShiftEdit
Browxy
repl.it
PaizaCloud IDE
Eclipse Che
Visual Studio Online
Gitpod
Google Cloud Shell
Codio
Codepen
CodeTasty
Glitch
JSitor
ICEcoder
Codiad
Dirigible
Orion
Codiva.io
Collide
Codenvy
AWS Cloud9
JSFiddle
GitLab
SLAppForge Sigma
Jupyter
CoCalc

Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



Spark Streaming with mapGroupsWithState

2020-03-02 Thread Something Something
I am writing a Stateful Streaming application in which I am using
mapGroupsWithState to create aggregates for Groups but I need to create *Groups
based on more than one column in the Input Row*. All the examples in the
'Spark: The Definitive Guide' use only one column such as 'User' or
'Device'. I am using code similar to what's given below. *How do I specify
more than one field in the 'groupByKey'?*

There are other challenges as well. The book says we can use
'updateAcrossEvents' the way given below but I get compile time error
saying:


*Error:(43, 65) missing argument list for method updateAcrossEvents in
object MainUnapplied methods are only converted to functions when a
function type is expected.You can make this conversion explicit by writing
`updateAcrossEvents _` or `updateAcrossEvents(_,_,_,_,_)` instead of
`updateAcrossEvents`.
.mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)*

Another challenge: Compiler also complains about the my *MyReport*: *Error:(41,
12) Unable to find encoder for type stored in a Dataset.  Primitive types
(Int, String, etc) and Product types (case classes) are supported by
importing spark.implicits._  Support for serializing other types will be
added in future releases.*

Help in resolving these errors would be greatly appreciated. Thanks in
advance.


withEventTime
.as[MyReport]
  .groupByKey(_.getKeys.getKey1). // How do I add _.getKeys.getKey2?
  .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)
  .writeStream
  .queryName("test_query")
  .format("memory")
  .outputMode("update")
  .start()


Re:

2020-03-02 Thread Wim Van Leuven
Ok, good luck!

On Mon, 2 Mar 2020 at 10:04, Hamish Whittal 
wrote:

> Enrico, Wim (and privately Neil), thanks for the replies. I will give your
> suggestions a whirl.
>
> Basically Wim recommended a pre-processing step to weed out the
> problematic files. I am going to build that into the pipeline. I am not
> sure how the problems are creeping in because this is a regular lift from a
> PGSQL db/table. And so some of these files are correct and some are
> patently wrong.
>
> I'm working around the problem by trying small subsets of the 3000+ files,
> but until I can weed out the problem files the processing is going to fail.
> I need something more bulletproof than what I'm doing. So this is what I'm
> going to try now.
>
> Hamish
>
> On Mon, Mar 2, 2020 at 10:15 AM Enrico Minack 
> wrote:
>
>> Looks like the schema of some files is unexpected.
>>
>> You could either run parquet-tools on each of the files and extract the
>> schema to find the problematic files:
>>
>> hdfs -stat "%n" hdfs://
>> ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-*.parquet
>> 
>> | while read file
>> do
>>echo -n "$file: "
>>hadoop jar parquet-tools-1.9.0.jar schema $file
>> done
>>
>>
>> https://confusedcoders.com/data-engineering/hadoop/how-to-view-content-of-parquet-files-on-s3hdfs-from-hadoop-cluster-using-parquet-tools
>>
>>
>> Or you can use Spark to investigate the parquet files in parallel:
>>
>> spark.sparkContext
>>   
>> .binaryFiles("hdfs://ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-*.parquet
>>  
>> ")
>>   .map { case (path, _) =>
>> import collection.JavaConverters._val file = 
>> HadoopInputFile.fromPath(new Path(path), new Configuration())
>> val reader = ParquetFileReader.open(file)
>> try {
>>   val schema = reader.getFileMetaData().getSchema
>>   (
>> schema.getName,schema.getFields.asScala.map(f => (
>>   Option(f.getId).map(_.intValue()),  f.getName,  
>> Option(f.getOriginalType).map(_.name()),  
>> Option(f.getRepetition).map(_.name()))
>> ).toArray
>>   )
>> } finally {
>>   reader.close()
>> }
>>   }
>>   .toDF("schema name", "fields")
>>   .show(false)
>>
>> .binaryFiles provides you all filenames that match the given pattern as
>> an RDD, so the following .map is executed on the Spark executors.
>> The map then opens each parquet file via ParquetFileReader and provides
>> access to its schema and data.
>>
>> I hope this points you in the right direction.
>>
>> Enrico
>>
>>
>> Am 01.03.20 um 22:56 schrieb Hamish Whittal:
>>
>> Hi there,
>>
>> I have an hdfs directory with thousands of files. It seems that some of
>> them - and I don't know which ones - have a problem with their schema and
>> it's causing my Spark application to fail with this error:
>>
>> Caused by: org.apache.spark.sql.execution.QueryExecutionException:
>> Parquet column cannot be converted in file hdfs://
>> ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-0-8b83989a-e387-4f64-8ac5-22b16770095e-c000.snappy.parquet.
>> Column: [price], Expected: double, Found: FIXED_LEN_BYTE_ARRAY
>>
>> The problem is not only that it's causing the application to fail, but
>> every time if does fail, I have to copy that file out of the directory and
>> start the app again.
>>
>> I thought of trying to use try-except, but I can't seem to get that to
>> work.
>>
>> Is there any advice anyone can give me because I really can't see myself
>> going through thousands of files trying to figure out which ones are broken.
>>
>> Thanks in advance,
>>
>> hamish
>>
>>
>>
>
> --
> Cloud-Fundis.co.za
> Cape Town, South Africa
> +27 79 614 4913
>


Java Spark UDF cast exception

2020-03-02 Thread vishal.verma


*Facing casting issues while working with the spark UDF*

  UDF1 mode1 = new UDF1>, String>()
{
  @Override
  public String call(WrappedArray> maps) throws
Exception {

List> lis = (List>)
JavaConverters.seqAsJavaListConverter(maps).asJava();
java.util.Map a= lis.stream().flatMap(map ->
map.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
System.out.println(a.get(key));
return "";
  }
};

*error:
*
   / Caused by: java.lang.ClassCastException:
scala.collection.immutable.Map$Map1 cannot be cast to java.util.Map at
java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:269)

/



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Compute the Hash of each row in new column

2020-03-02 Thread Chetan Khatri
Thanks Enrico. I meant one hash of each single row in extra column
something like this.. val newDs = typedRows.withColumn("hash", hash(
typedRows.columns.map(col): _*))

On Mon, Mar 2, 2020 at 3:51 PM Enrico Minack  wrote:

> Well, then apply md5 on all columns:
>
> ds.select(ds.columns.map(col) ++ ds.columns.map(column =>
> md5(col(column)).as(s"$column hash")): _*).show(false)
>
> Enrico
>
> Am 02.03.20 um 11:10 schrieb Chetan Khatri:
>
> Thanks Enrico
> I want to compute hash of all the columns value in the row.
>
> On Fri, Feb 28, 2020 at 7:28 PM Enrico Minack 
> wrote:
>
>> This computes the md5 hash of a given column id of Dataset ds:
>>
>> ds.withColumn("id hash", md5($"id")).show(false)
>>
>> Test with this Dataset ds:
>>
>> import org.apache.spark.sql.types._
>> val ds = spark.range(10).select($"id".cast(StringType))
>>
>> Available are md5, sha, sha1, sha2 and hash:
>> https://spark.apache.org/docs/2.4.5/api/sql/index.html
>>
>> Enrico
>>
>>
>> Am 28.02.20 um 13:56 schrieb Chetan Khatri:
>> > Hi Spark Users,
>> > How can I compute Hash of each row and store in new column at
>> > Dataframe, could someone help me.
>> >
>> > Thanks
>>
>>
>>
>


Re: Compute the Hash of each row in new column

2020-03-02 Thread Enrico Minack

Well, then apply md5 on all columns:

ds.select(ds.columns.map(col) ++ ds.columns.map(column => 
md5(col(column)).as(s"$column hash")): _*).show(false)


Enrico

Am 02.03.20 um 11:10 schrieb Chetan Khatri:

Thanks Enrico
I want to compute hash of all the columns value in the row.

On Fri, Feb 28, 2020 at 7:28 PM Enrico Minack > wrote:


This computes the md5 hash of a given column id of Dataset ds:

ds.withColumn("id hash", md5($"id")).show(false)

Test with this Dataset ds:

import org.apache.spark.sql.types._
val ds = spark.range(10).select($"id".cast(StringType))

Available are md5, sha, sha1, sha2 and hash:
https://spark.apache.org/docs/2.4.5/api/sql/index.html

Enrico


Am 28.02.20 um 13:56 schrieb Chetan Khatri:
> Hi Spark Users,
> How can I compute Hash of each row and store in new column at
> Dataframe, could someone help me.
>
> Thanks






[no subject]

2020-03-02 Thread lucas.wu


Re: Compute the Hash of each row in new column

2020-03-02 Thread Chetan Khatri
Thanks Enrico
I want to compute hash of all the columns value in the row.

On Fri, Feb 28, 2020 at 7:28 PM Enrico Minack 
wrote:

> This computes the md5 hash of a given column id of Dataset ds:
>
> ds.withColumn("id hash", md5($"id")).show(false)
>
> Test with this Dataset ds:
>
> import org.apache.spark.sql.types._
> val ds = spark.range(10).select($"id".cast(StringType))
>
> Available are md5, sha, sha1, sha2 and hash:
> https://spark.apache.org/docs/2.4.5/api/sql/index.html
>
> Enrico
>
>
> Am 28.02.20 um 13:56 schrieb Chetan Khatri:
> > Hi Spark Users,
> > How can I compute Hash of each row and store in new column at
> > Dataframe, could someone help me.
> >
> > Thanks
>
>
>


Re:

2020-03-02 Thread Hamish Whittal
Enrico, Wim (and privately Neil), thanks for the replies. I will give your
suggestions a whirl.

Basically Wim recommended a pre-processing step to weed out the problematic
files. I am going to build that into the pipeline. I am not sure how the
problems are creeping in because this is a regular lift from a PGSQL
db/table. And so some of these files are correct and some are patently
wrong.

I'm working around the problem by trying small subsets of the 3000+ files,
but until I can weed out the problem files the processing is going to fail.
I need something more bulletproof than what I'm doing. So this is what I'm
going to try now.

Hamish

On Mon, Mar 2, 2020 at 10:15 AM Enrico Minack 
wrote:

> Looks like the schema of some files is unexpected.
>
> You could either run parquet-tools on each of the files and extract the
> schema to find the problematic files:
>
> hdfs -stat "%n" hdfs://
> ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-*.parquet
> 
> | while read file
> do
>echo -n "$file: "
>hadoop jar parquet-tools-1.9.0.jar schema $file
> done
>
>
> https://confusedcoders.com/data-engineering/hadoop/how-to-view-content-of-parquet-files-on-s3hdfs-from-hadoop-cluster-using-parquet-tools
>
>
> Or you can use Spark to investigate the parquet files in parallel:
>
> spark.sparkContext
>   
> .binaryFiles("hdfs://ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-*.parquet
>  
> ")
>   .map { case (path, _) =>
> import collection.JavaConverters._val file = 
> HadoopInputFile.fromPath(new Path(path), new Configuration())
> val reader = ParquetFileReader.open(file)
> try {
>   val schema = reader.getFileMetaData().getSchema
>   (
> schema.getName,schema.getFields.asScala.map(f => (
>   Option(f.getId).map(_.intValue()),  f.getName,  
> Option(f.getOriginalType).map(_.name()),  
> Option(f.getRepetition).map(_.name()))
> ).toArray
>   )
> } finally {
>   reader.close()
> }
>   }
>   .toDF("schema name", "fields")
>   .show(false)
>
> .binaryFiles provides you all filenames that match the given pattern as an
> RDD, so the following .map is executed on the Spark executors.
> The map then opens each parquet file via ParquetFileReader and provides
> access to its schema and data.
>
> I hope this points you in the right direction.
>
> Enrico
>
>
> Am 01.03.20 um 22:56 schrieb Hamish Whittal:
>
> Hi there,
>
> I have an hdfs directory with thousands of files. It seems that some of
> them - and I don't know which ones - have a problem with their schema and
> it's causing my Spark application to fail with this error:
>
> Caused by: org.apache.spark.sql.execution.QueryExecutionException: Parquet
> column cannot be converted in file hdfs://
> ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-0-8b83989a-e387-4f64-8ac5-22b16770095e-c000.snappy.parquet.
> Column: [price], Expected: double, Found: FIXED_LEN_BYTE_ARRAY
>
> The problem is not only that it's causing the application to fail, but
> every time if does fail, I have to copy that file out of the directory and
> start the app again.
>
> I thought of trying to use try-except, but I can't seem to get that to
> work.
>
> Is there any advice anyone can give me because I really can't see myself
> going through thousands of files trying to figure out which ones are broken.
>
> Thanks in advance,
>
> hamish
>
>
>

-- 
Cloud-Fundis.co.za
Cape Town, South Africa
+27 79 614 4913


Re:

2020-03-02 Thread Enrico Minack

Looks like the schema of some files is unexpected.

You could either run parquet-tools on each of the files and extract the 
schema to find the problematic files:


|hdfs |||-stat "%n"| 
|hdfs://ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-*.parquet 
| 
while read file

do
   echo -n "$file: "
   hadoop jar parquet-tools-1.9.0.jar schema $file
done

https://confusedcoders.com/data-engineering/hadoop/how-to-view-content-of-parquet-files-on-s3hdfs-from-hadoop-cluster-using-parquet-tools

||


Or you can use Spark to investigate the parquet files in parallel:

spark.sparkContext
  .binaryFiles("||hdfs://ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-*.parquet 
")

  .map {case (path, _) =>
import collection.JavaConverters._
val file = HadoopInputFile.fromPath(new Path(path), new Configuration())
val reader = ParquetFileReader.open(file)
try {
  val schema = reader.getFileMetaData().getSchema
  (
schema.getName, schema.getFields.asScala.map(f => (
  Option(f.getId).map(_.intValue()), f.getName, 
Option(f.getOriginalType).map(_.name()), Option(f.getRepetition).map(_.name()))
).toArray
  )
}finally {
  reader.close()
}
  }
  .toDF("schema name", "fields")
  .show(false)

.binaryFiles provides you all filenames that match the given pattern as 
an RDD, so the following .map is executed on the Spark executors.
The map then opens each parquet file via ParquetFileReader and provides 
access to its schema and data.


I hope this points you in the right direction.

Enrico


Am 01.03.20 um 22:56 schrieb Hamish Whittal:

Hi there,

I have an hdfs directory with thousands of files. It seems that some 
of them - and I don't know which ones - have a problem with their 
schema and it's causing my Spark application to fail with this error:


Caused by: org.apache.spark.sql.execution.QueryExecutionException: 
Parquet column cannot be converted in file 
hdfs://ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-0-8b83989a-e387-4f64-8ac5-22b16770095e-c000.snappy.parquet 
. 
Column: [price], Expected: double, Found: FIXED_LEN_BYTE_ARRAY


The problem is not only that it's causing the application to fail, but 
every time if does fail, I have to copy that file out of the directory 
and start the app again.


I thought of trying to use try-except, but I can't seem to get that to 
work.


Is there any advice anyone can give me because I really can't see 
myself going through thousands of files trying to figure out which 
ones are broken.


Thanks in advance,

hamish