Re: Reading ASN.1 files in Spark

2017-04-06 Thread Yong Zhang
Spark can read any file, as long as you can provide it the Hadoop InputFormat 
implementation.


Did you try this guy's example?


http://awcoleman.blogspot.com/2014/07/processing-asn1-call-detail-records.html

[http://lh6.googleusercontent.com/-Yrre7Enx3TI/AAI/Abo/QNJEjH6MX0o/s80-c/photo.jpg]

Processing ASN.1 Call Detail Records with Hadoop (using 
...
awcoleman.blogspot.com
Processing ASN.1 Call Detail Records with Hadoop (using Bouncy Castle) Part 3



Yong




From: vincent gromakowski 
Sent: Thursday, April 6, 2017 5:24 AM
To: Hamza HACHANI
Cc: user@spark.apache.org
Subject: Re: Reading ASN.1 files in Spark

I would also be interested...

2017-04-06 11:09 GMT+02:00 Hamza HACHANI 
mailto:hamza.hach...@supcom.tn>>:
Does any body have a spark code example where he is reading ASN.1 files ?
Thx

Best regards
Hamza



Re: Why spark.sql.autoBroadcastJoinThreshold not available

2017-05-15 Thread Yong Zhang
You should post the execution plan here, so we can provide more accurate 
support.


Since in your feature table, you are building it with projection ("where 
"), so my guess is that the following JIRA 
(SPARK-13383) stops the 
broadcast join. This is fixed in the Spark 2.x. Can you try it on Spark 2.0?

Yong


From: Jone Zhang 
Sent: Wednesday, May 10, 2017 7:10 AM
To: user @spark/'user @spark'/spark users/user@spark
Subject: Why spark.sql.autoBroadcastJoinThreshold not available

Now i use spark1.6.0 in java
I wish the following sql to be executed in BroadcastJoin way
select * from sample join feature

This is my step
1.set spark.sql.autoBroadcastJoinThreshold=100M
2.HiveContext.sql("cache lazy table feature as "select * from src where ...) 
which result size is only 100K
3.HiveContext.sql("select * from sample join feature")
Why the join is SortMergeJoin?

Grateful for any idea!
Thanks.


Parquet file generated by Spark, but not compatible read by Hive

2017-06-12 Thread Yong Zhang
We are using Spark 1.6.2 as ETL to generate parquet file for one dataset, and 
partitioned by "brand" (which is a string to represent brand in this dataset).


After the partition files generated in HDFS like "brand=a" folder, we add the 
partitions in the Hive.


The hive version is 1.2.1 (In fact, we are using HDP 2.5.0).


Now the problem is that for 2 brand partitions, we cannot query the data 
generated in Spark, but it works fine for the rest of partitions.


Below is the error in the Hive CLI and hive.log I got if I query the bad 
partitions like "select * from  tablename where brand='BrandA' limit 3;"


Failed with exception 
java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: 
java.lang.UnsupportedOperationException: Cannot inspect 
org.apache.hadoop.io.LongWritable


Caused by: java.lang.UnsupportedOperationException: Cannot inspect 
org.apache.hadoop.io.LongWritable
at 
org.apache.hadoop.hive.ql.io.parquet.serde.primitive.ParquetStringInspector.getPrimitiveWritableObject(ParquetStringInspector.java:52)
at 
org.apache.hadoop.hive.serde2.lazy.LazyUtils.writePrimitiveUTF8(LazyUtils.java:222)
at 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.serialize(LazySimpleSerDe.java:307)
at 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.serializeField(LazySimpleSerDe.java:262)
at 
org.apache.hadoop.hive.serde2.DelimitedJSONSerDe.serializeField(DelimitedJSONSerDe.java:72)
at 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.doSerialize(LazySimpleSerDe.java:246)
at 
org.apache.hadoop.hive.serde2.AbstractEncodingAwareSerDe.serialize(AbstractEncodingAwareSerDe.java:50)
at 
org.apache.hadoop.hive.ql.exec.DefaultFetchFormatter.convert(DefaultFetchFormatter.java:71)
at 
org.apache.hadoop.hive.ql.exec.DefaultFetchFormatter.convert(DefaultFetchFormatter.java:40)
at 
org.apache.hadoop.hive.ql.exec.ListSinkOperator.process(ListSinkOperator.java:90)
... 22 more

There are not too much I can find by googling this error message, but it points 
to that the schema in Hive is different as in parquet file.
But this is a very strange case, as the same schema works fine for other 
brands, which defined as a partition column, and share the whole Hive schema as 
the above.

If I query like: "select * from tablename where brand='BrandB' limit 3:", 
everything works fine.

So is this really caused by the Hive schema mismatch with parquet file 
generated by Spark, or by the data within different partitioned keys, or really 
a compatible issue between Spark/Hive?

Thanks

Yong




Re: Parquet file generated by Spark, but not compatible read by Hive

2017-06-13 Thread Yong Zhang
The issue is cased by the data, and indeed a type miss match between Hive 
schema and Spark. Now it is fixed.


Without that kind of data, the problem won't be trigged in some brands.


Thanks taking a look of this problem.


Yong



From: ayan guha 
Sent: Tuesday, June 13, 2017 1:54 AM
To: Angel Francisco Orta
Cc: Yong Zhang; user@spark.apache.org
Subject: Re: Parquet file generated by Spark, but not compatible read by Hive

Try setting following Param:

conf.set("spark.sql.hive.convertMetastoreParquet","false")

On Tue, Jun 13, 2017 at 3:34 PM, Angel Francisco Orta 
mailto:angel.francisco.o...@gmail.com>> wrote:
Hello,

Do you use df.write or you make with hivecontext.sql(" insert into ...")?

Angel.

El 12 jun. 2017 11:07 p. m., "Yong Zhang" 
mailto:java8...@hotmail.com>> escribió:

We are using Spark 1.6.2 as ETL to generate parquet file for one dataset, and 
partitioned by "brand" (which is a string to represent brand in this dataset).


After the partition files generated in HDFS like "brand=a" folder, we add the 
partitions in the Hive.


The hive version is 1.2.1 (In fact, we are using HDP 2.5.0).


Now the problem is that for 2 brand partitions, we cannot query the data 
generated in Spark, but it works fine for the rest of partitions.


Below is the error in the Hive CLI and hive.log I got if I query the bad 
partitions like "select * from  tablename where brand='BrandA' limit 3;"


Failed with exception 
java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: 
java.lang.UnsupportedOperationException: Cannot inspect 
org.apache.hadoop.io<http://org.apache.hadoop.io>.LongWritable


Caused by: java.lang.UnsupportedOperationException: Cannot inspect 
org.apache.hadoop.io<http://org.apache.hadoop.io>.LongWritable
at 
org.apache.hadoop.hive.ql.io<http://org.apache.hadoop.hive.ql.io>.parquet.serde.primitive.ParquetStringInspector.getPrimitiveWritableObject(ParquetStringInspector.java:52)
at 
org.apache.hadoop.hive.serde2.lazy.LazyUtils.writePrimitiveUTF8(LazyUtils.java:222)
at 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.serialize(LazySimpleSerDe.java:307)
at 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.serializeField(LazySimpleSerDe.java:262)
at 
org.apache.hadoop.hive.serde2.DelimitedJSONSerDe.serializeField(DelimitedJSONSerDe.java:72)
at 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.doSerialize(LazySimpleSerDe.java:246)
at 
org.apache.hadoop.hive.serde2.AbstractEncodingAwareSerDe.serialize(AbstractEncodingAwareSerDe.java:50)
at 
org.apache.hadoop.hive.ql.exec.DefaultFetchFormatter.convert(DefaultFetchFormatter.java:71)
at 
org.apache.hadoop.hive.ql.exec.DefaultFetchFormatter.convert(DefaultFetchFormatter.java:40)
at 
org.apache.hadoop.hive.ql.exec.ListSinkOperator.process(ListSinkOperator.java:90)
... 22 more

There are not too much I can find by googling this error message, but it points 
to that the schema in Hive is different as in parquet file.
But this is a very strange case, as the same schema works fine for other 
brands, which defined as a partition column, and share the whole Hive schema as 
the above.

If I query like: "select * from tablename where brand='BrandB' limit 3:", 
everything works fine.

So is this really caused by the Hive schema mismatch with parquet file 
generated by Spark, or by the data within different partitioned keys, or really 
a compatible issue between Spark/Hive?

Thanks

Yong





--
Best Regards,
Ayan Guha


Re: [Spark Sql/ UDFs] Spark and Hive UDFs parity

2017-06-18 Thread Yong Zhang
I assume you use Scala to implement your UDFs.


In this case, Scala language itself provides some options already for you.


If you want to control more logic when UDFs init, you can define a Scala 
object, def your UDF as part of it, then the object in Scala will behavior like 
Singleton pattern for you.


So the Sacala object's constructor logic can be treated as init/configure 
contract as in Hive. They will be called once per JVM, to init your Scala 
object. That should meet your requirement.


The only trick part is the context reference for configure() method, which 
allow you to pass some configuration dynamic to your UDF for runtime. Since 
object in Scala has to fix at compile time, so you cannot pass any parameters 
to the construct of it. But there is nothing stopping you building Scala 
class/companion object to allow any parameter passed in at constructor/init 
time, which can control your UDF's behavior.


If you have a concrete example that you cannot do in Spark Scala UDF, you can 
post here.


Yong



From: RD 
Sent: Friday, June 16, 2017 11:37 AM
To: Georg Heiler
Cc: user@spark.apache.org
Subject: Re: [Spark Sql/ UDFs] Spark and Hive UDFs parity

Thanks Georg. But I'm not sure how mapPartitions is relevant here.  Can you 
elaborate?



On Thu, Jun 15, 2017 at 4:18 AM, Georg Heiler 
mailto:georg.kf.hei...@gmail.com>> wrote:
What about using map partitions instead?

RD mailto:rdsr...@gmail.com>> schrieb am Do. 15. Juni 2017 
um 06:52:
Hi Spark folks,

Is there any plan to support the richer UDF API that Hive supports for 
Spark UDFs ? Hive supports the GenericUDF API which has, among others methods 
like initialize(), configure() (called once on the cluster) etc, which a lot of 
our users use. We have now a lot of UDFs in Hive which make use of these 
methods. We plan to move to UDFs to Spark UDFs but are being limited by not 
having similar lifecycle methods.
   Are there plans to address these? Or do people usually adopt some sort of 
workaround?

   If we  directly use  the Hive UDFs  in Spark we pay a performance penalty. I 
think Spark anyways does a conversion from InternalRow to Row back to 
InternalRow for native spark udfs and for Hive it does InternalRow to Hive 
Object back to InternalRow but somehow the conversion in native udfs is more 
performant.

-Best,
R.



Re: how to call udf with parameters

2017-06-18 Thread Yong Zhang
What version of spark you are using? I cannot reproduce your error:


scala> spark.version
res9: String = 2.1.1
scala> val dataset = Seq((0, "hello"), (1, "world")).toDF("id", "text")
dataset: org.apache.spark.sql.DataFrame = [id: int, text: string]
scala> import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions.udf

// define a method in similar way like you did
scala> def len = udf { (data: String) => data.length > 0 }
len: org.apache.spark.sql.expressions.UserDefinedFunction

// use it
scala> dataset.select(len($"text").as('length)).show
+--+
|length|
+--+
|  true|
|  true|
+--+


Yong




From: Pralabh Kumar 
Sent: Friday, June 16, 2017 12:19 AM
To: lk_spark
Cc: user.spark
Subject: Re: how to call udf with parameters

sample UDF
val getlength=udf((data:String)=>data.length())
data.select(getlength(data("col1")))

On Fri, Jun 16, 2017 at 9:21 AM, lk_spark 
mailto:lk_sp...@163.com>> wrote:
hi,all
 I define a udf with multiple parameters  ,but I don't know how to call it 
with DataFrame

UDF:

def ssplit2 = udf { (sentence: String, delNum: Boolean, delEn: Boolean, 
minTermLen: Int) =>
val terms = HanLP.segment(sentence).asScala
.

Call :

scala> val output = input.select(ssplit2($"text",true,true,2).as('words))
:40: error: type mismatch;
 found   : Boolean(true)
 required: org.apache.spark.sql.Column
   val output = input.select(ssplit2($"text",true,true,2).as('words))
 ^
:40: error: type mismatch;
 found   : Boolean(true)
 required: org.apache.spark.sql.Column
   val output = input.select(ssplit2($"text",true,true,2).as('words))
  ^
:40: error: type mismatch;
 found   : Int(2)
 required: org.apache.spark.sql.Column
   val output = input.select(ssplit2($"text",true,true,2).as('words))
   ^

scala> val output = 
input.select(ssplit2($"text",$"true",$"true",$"2").as('words))
org.apache.spark.sql.AnalysisException: cannot resolve '`true`' given input 
columns: [id, text];;
'Project [UDF(text#6, 'true, 'true, '2) AS words#16]
+- Project [_1#2 AS id#5, _2#3 AS text#6]
   +- LocalRelation [_1#2, _2#3]

I need help!!


2017-06-16

lk_spark



Re: SparkSQL to read XML Blob data to create multiple rows

2017-06-29 Thread Yong Zhang
scala>spark.version
res6: String = 2.1.1

scala>val rdd  = 
sc.parallelize(Seq("""Title1.1Description_1.1
Title1.2Description_1.2
Title1.3Description_1.3
"""))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize 
at :24

scala>import com.databricks.spark.xml.XmlReader

scala>val df = new XmlReader().xmlRdd(spark.sqlContext, rdd)
df: org.apache.spark.sql.DataFrame = [Comments: struct>>]

scala>df.printSchema
root
 |-- Comments: struct (nullable = true)
 ||-- Comment: array (nullable = true)
 |||-- element: struct (containsNull = true)
 ||||-- Description: string (nullable = true)
 ||||-- Title: string (nullable = true)

scala>df.show(false)
+--+
|Comments   
   |
+--+
|[WrappedArray([Description_1.1,Title1.1], [Description_1.2,Title1.2], 
[Description_1.3,Title1.3])]|
+--+


scala>df.withColumn("comment", 
explode(df("Comments.Comment"))).select($"comment.Description", 
$"comment.Title").show
+---++
|Description|   Title|
+---++
|Description_1.1|Title1.1|
|Description_1.2|Title1.2|
|Description_1.3|Title1.3|
+---++




From: Talap, Amol 
Sent: Thursday, June 29, 2017 9:38 AM
To: Judit Planas; user@spark.apache.org
Subject: RE: SparkSQL to read XML Blob data to create multiple rows


Thanks Judit, Ayan

Judit,

You almost got it. The explode might help here.

But when I tried I see load() doesn’t like to read from xmlcomment column on 
oracle_data.



scala> val xmlDF = sqlContext.sql("SELECT * FROM oracle_data")

17/06/29 18:31:58 INFO parse.ParseDriver: Parsing command: SELECT * FROM 
oracle_data

17/06/29 18:31:58 INFO parse.ParseDriver: Parse Completed

…

scala> val xmlDF_flattened = 
xmlDF.withColumn("xmlcomment",explode(sqlContext.read.format("com.databricks.spark.xml").option("rowTag","book").load($"xmlcomment")))

:22: error: overloaded method value load with alternatives:

  ()org.apache.spark.sql.DataFrame 

  (path: String)org.apache.spark.sql.DataFrame

cannot be applied to (org.apache.spark.sql.ColumnName)

   val xmlDF_flattened = 
xmlDF.withColumn("xmlcomment",explode(sqlContext.read.format("com.databricks.spark.xml").option("rowTag","book").load($"xmlcomment")))



Ayan,

Output of books_inexp.show was as below
title, author
Midnight Rain,Ralls, Kim
Maeve Ascendant,  Corets, Eva



Regards,

Amol

From: Judit Planas [mailto:judit.pla...@epfl.ch]
Sent: Thursday, June 29, 2017 3:46 AM
To: user@spark.apache.org
Subject: Re: SparkSQL to read XML Blob data to create multiple rows



Hi Amol,

Not sure I understand completely your question, but the SQL function "explode" 
may help you:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.explode

pyspark.sql module — PySpark 2.1.1 
documentation
spark.apache.org
pyspark.sql.SparkSession Main entry point for DataFrame and SQL functionality. 
pyspark.sql.DataFrame A distributed collection of data grouped into named 
columns.




Here you can find a nice example:
https://stackoverflow.com/questions/38210507/explode-in-pyspark
[https://cdn.sstatic.net/Sites/stackoverflow/img/apple-touch-i...@2.png?v=73d79a89bded]

python - Explode in PySpark - Stack 
Overflow
stackoverflow.com
I would like to transform from a DataFrame that contains lists of words into a 
DataFrame with each word in its own row. How do I do explode on a column in a 
DataFrame?




HTH,
Judit

On 29/06/17 09:05, ayan guha wrote:

Hi



Not sure if I follow your issue. Can you please post output of 
books_inexp.show()?



On Thu, Jun 29, 2017 at 2:30 PM, Talap, Amol 
mailto:amol.ta...@capgemini.com>> wrote:

Hi:



We are trying to parse XML data to get below output from given input sample.

Can someone suggest a way to pass one DFrames output into load() function or 
any other alternative to get this output.



Input Data from Oracle Table XMLBlob:

SequenceID


Name


City


XMLComment


1


Amol


Kolhapur


Title1.1Description_1.1Title1.2Description_1.2Title1.3Description_1.3


2


Suresh


Mumbai


Title2Description_2


3


Vishal


Delhi


Title3Description_3


4


Swastik


Bangalore


Title4Description_4




Output Data Expected using Spark SQL:

SequenceID


Name


City


Title


Description


1


Amol


Kolhapur


Title1.1


Description_1.1


1


Amol


K

Re: about broadcast join of base table in spark sql

2017-06-30 Thread Yong Zhang
Or since you already use the DataFrame API, instead of SQL, you can add the 
broadcast function to force it.


https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/functions.html#broadcast(org.apache.spark.sql.DataFrame)


Yong

functions - Apache 
Spark
spark.apache.org
Computes the numeric value of the first character of the string column, and 
returns the result as a int column.






From: Bryan Jeffrey 
Sent: Friday, June 30, 2017 6:57 AM
To: d...@spark.org; user@spark.apache.org; paleyl
Subject: Re: about broadcast join of base table in spark sql

Hello.

If you want to allow broadcast join with larger broadcasts you can set 
spark.sql.autoBroadcastJoinThreshold to a higher value. This will cause the 
plan to allow join despite 'A' being larger than the default threshold.

Get Outlook for Android



From: paleyl
Sent: Wednesday, June 28, 10:42 PM
Subject: about broadcast join of base table in spark sql
To: d...@spark.org, user@spark.apache.org


Hi All,


Recently I meet a problem in broadcast join: I want to left join table A and B, 
A is the smaller one and the left table, so I wrote

A = A.join(B,A("key1") === B("key2"),"left")

but I found that A is not broadcast out, as the shuffle size is still very 
large.

I guess this is a designed mechanism in spark, so could anyone please tell me 
why it is designed like this? I am just very curious.


Best,


Paley





Re: about broadcast join of base table in spark sql

2017-07-02 Thread Yong Zhang
Then you need to tell us the spark version, and post the execution plan here, 
so we can help you better.


Yong



From: Paley Louie 
Sent: Sunday, July 2, 2017 12:36 AM
To: Yong Zhang
Cc: Bryan Jeffrey; d...@spark.org; user@spark.apache.org
Subject: Re: about broadcast join of base table in spark sql

Thank you for your reply, I have tried to add broadcast hint to the base table, 
but it just cannot be broadcast out.
On Jun 30, 2017, at 9:13 PM, Yong Zhang 
mailto:java8...@hotmail.com>> wrote:

Or since you already use the DataFrame API, instead of SQL, you can add the 
broadcast function to force it.

https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/functions.html#broadcast(org.apache.spark.sql.DataFrame)

Yong
functions - Apache 
Spark<https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/functions.html#broadcast(org.apache.spark.sql.DataFrame)>
spark.apache.org<http://spark.apache.org/>
Computes the numeric value of the first character of the string column, and 
returns the result as a int column.






From: Bryan Jeffrey mailto:bryan.jeff...@gmail.com>>
Sent: Friday, June 30, 2017 6:57 AM
To: d...@spark.org<mailto:d...@spark.org>; 
user@spark.apache.org<mailto:user@spark.apache.org>; paleyl
Subject: Re: about broadcast join of base table in spark sql

Hello.

If you want to allow broadcast join with larger broadcasts you can set 
spark.sql.autoBroadcastJoinThreshold to a higher value. This will cause the 
plan to allow join despite 'A' being larger than the default threshold.

Get Outlook for Android<https://aka.ms/ghei36>



From: paleyl
Sent: Wednesday, June 28, 10:42 PM
Subject: about broadcast join of base table in spark sql
To: d...@spark.org<mailto:d...@spark.org>, 
user@spark.apache.org<mailto:user@spark.apache.org>


Hi All,


Recently I meet a problem in broadcast join: I want to left join table A and B, 
A is the smaller one and the left table, so I wrote

A = A.join(B,A("key1") === B("key2"),"left")

but I found that A is not broadcast out, as the shuffle size is still very 
large.

I guess this is a designed mechanism in spark, so could anyone please tell me 
why it is designed like this? I am just very curious.


Best,


Paley



Re: DataFrameReader read from S3 org.apache.spark.sql.AnalysisException: Path does not exist

2017-07-12 Thread Yong Zhang
Can't you just catch that exception and return an empty dataframe?


Yong



From: Sumona Routh 
Sent: Wednesday, July 12, 2017 4:36 PM
To: user
Subject: DataFrameReader read from S3 org.apache.spark.sql.AnalysisException: 
Path does not exist

Hi there,
I'm trying to read a list of paths from S3 into a dataframe for a window of 
time using the following:

sparkSession.read.parquet(listOfPaths:_*)

In some cases, the path may not be there because there is no data, which is an 
acceptable scenario.
However, Spark throws an AnalysisException: Path does not exist. Is there an 
option I can set to tell it to gracefully return an empty dataframe if a 
particular path is missing? Looking at the spark code, there is an option 
checkFilesExist, but I don't believe that is set in the particular flow of code 
that I'm accessing.

Thanks!
Sumona



Re: Parquet files from spark not readable in Cascading

2017-11-16 Thread Yong Zhang
I don't have experience with Cascading, but we saw similar issue for importing 
the data generated in Spark into Hive.


Did you try this setting "spark.sql.parquet.writeLegacyFormat" to true?


https://stackoverflow.com/questions/44279870/why-cant-impala-read-parquet-files-after-spark-sqls-write

[https://cdn.sstatic.net/Sites/stackoverflow/img/apple-touch-i...@2.png?v=73d79a89bded]

java - Why can't Impala read parquet files after Spark SQL 
...
stackoverflow.com
Having some issues with the way that Spark is interpreting columns for parquet. 
I have an Oracle source with confirmed schema (df.schema() method): root |-- 
LM_PERSON ...






From: Vikas Gandham 
Sent: Wednesday, November 15, 2017 2:30 PM
To: user@spark.apache.org
Subject: Parquet files from spark not readable in Cascading


Hi,



When I  tried reading parquet data that was generated by spark in cascading it 
throws following error







Caused by: 
org.apache.parquet.io.ParquetDecodingException: 
Can not read value at 0 in block -1 in file ""

at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)

at 
org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)

at 
org.apache.parquet.hadoop.mapred.DeprecatedParquetInputFormat$RecordReaderWrapper.(DeprecatedParquetInputFormat.java:103)

at 
org.apache.parquet.hadoop.mapred.DeprecatedParquetInputFormat.getRecordReader(DeprecatedParquetInputFormat.java:47)

at 
cascading.tap.hadoop.io.MultiInputFormat$1.operate(MultiInputFormat.java:253)

at 
cascading.tap.hadoop.io.MultiInputFormat$1.operate(MultiInputFormat.java:248)

at cascading.util.Util.retry(Util.java:1044)

at 
cascading.tap.hadoop.io.MultiInputFormat.getRecordReader(MultiInputFormat.java:247)

at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:394)

at org.apache.hadoop.mapred.MapTask.run(MapTask.java:332)

at 
org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:268)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.ArrayIndexOutOfBoundsException: -1

at java.util.ArrayList.elementData(ArrayList.java:418)

at java.util.ArrayList.get(ArrayList.java:431)

at 
org.apache.parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:98)

at 
org.apache.parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:98)

at 
org.apache.parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:83)

at 
org.apache.parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:77)

at 
org.apache.parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:293)

at 
org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:134)

at 
org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:99)

at 
org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:154)

at 
org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:99)

at 
org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)

at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:208)



This is mostly seen when parquet has nested structures.



I didnt find any solution to this.



I see some JIRA issues like this 
https://issues.apache.org/jira/browse/SPARK-10434 (parquet compatability 
/interoperabilityissues) where reading parquet files in Spark 1.4 where the 
files

[SPARK-10434] Parquet compatibility with 1.4 is broken 
...
issues.apache.org
This behavior is a hybrid of parquet-avro and parquet-hive: the 3-level 
structure and repeated group name "bag" are borrowed from parquet-hive, while 
the innermost ...



were generated by Spark 1.5 .This was fixed in later versions but was it fixed 
in Cascading?



Not sure if this is something to do with Parquet version or Cascading has a bug 
or Spark is doing something with Parquet files

which cascading is not accepting



Note : I am trying to read Parquet with avro schema in Cascading



I hav

Re: CATALYST rule join

2018-02-27 Thread Yong Zhang
Not fully understand your question, but maybe you want check out this JIRA 
https://issues.apache.org/jira/browse/SPARK-17728, especially in the comments 
area. There are some discussion about the logic why UDF could be executed multi 
times by Spark.

Yong


From: tan shai 
Sent: Tuesday, February 27, 2018 4:19 AM
To: user@spark.apache.org
Subject: Re: CATALYST rule join

Hi,

I need to write a rule to customize the join function using Spark Catalyst 
optimizer. The objective to duplicate the second dataset using this process:

- Execute a udf on the column called x, this udf returns an array

- Execute an explode function on the new column

Using SQL terms, my objective is to execute this query on the second table :

SELECT EXPLODE(foo(x)) from table2

Where `foo` is is a udf that return an array of elements.

I have this rule:

case class JoinRule(spark: SparkSession) extends Rule[LogicalPlan] {

override def apply(plan: LogicalPlan): LogicalPlan = plan transform {

case join@Join(left, right, _, Some(condition)) =>

{

val attr = right.outputSet.find(x => x.toString().contains("x"))

val udf = ScalaUDF((x: Long) => x +: f(ipix), ArrayType(LongType), 
Seq(attr.last.toAttribute))

val explode = Explode(udf)

val resolvedGenerator = Generate(explode, true,false, qualifier = None, 
udf.references.toSeq, right)

var newRight = Project(resolvedGenerator.output,resolvedGenerator)

Join(left, newRight , Inner,Option(condition))

}
  }
}

But the problem is that the operation `Generate explode` appears many times in 
the physical plan.


Do you have any other ideas ? Maybe rewriting the code.

Thank you


2018-02-25 23:08 GMT+01:00 tan shai 
mailto:tan.shai...@gmail.com>>:
Hi,

I need to write a rule to customize the join function using Spark Catalyst 
optimizer. The objective to duplicate the second dataset using this process:

- Execute a udf on the column called x, this udf returns an array

- Execute an explode function on the new column

Using SQL terms, my objective is to execute this query on the second table :

SELECT EXPLODE(foo(x)) from table2

Where `foo` is is a udf that return an array of elements.

I have this rule:

case class JoinRule(spark: SparkSession) extends Rule[LogicalPlan] {

override def apply(plan: LogicalPlan): LogicalPlan = plan transform {

case join@Join(left, right, _, Some(condition)) =>

{

val attr = right.outputSet.find(x => x.toString().contains("x"))

val udf = ScalaUDF((x: Long) => x +: f(ipix), ArrayType(LongType), 
Seq(attr.last.toAttribute))

val explode = Explode(udf)

val resolvedGenerator = Generate(explode, true,false, qualifier = None, 
udf.references.toSeq, right)

var newRight = Project(resolvedGenerator.output,resolvedGenerator)

Join(left, newRight , Inner,Option(condition))

}
  }
}

But the problem is that the operation `Generate explode` appears many times in 
the physical plan.


Do you have any other ideas ? Maybe rewriting the code.

Thank you.




java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to Case class

2018-03-22 Thread Yong Zhang
I am trying to research a custom Aggregator implementation, and following the 
example in the Spark sample code here:

https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedTypedAggregation.scala


But I cannot use it in the agg function, and got the error like 
java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
to my case class. If I don't use the group by, then it works as in the same way 
in the sample code. To make it with group by, what I need to change?


This is on Spark 2.2, as shown below. Following the spark example, I can do

rawDS.select(ChangeLogAggregator.toColumn.name("change_log")).show(false)
without any issue, but if
rawDS.groupBy($"domain").agg(ChangeLogAggregator.toColumn.name("change_log")).show(false)

I will get the cast exception. But I want to apply my custom Aggregator 
implementation per group. How do I fix this?

Thanks


scala> spark.version
res31: String = 2.2.1

case class FlagChangeLog(date: String, old_flag: Boolean, new_flag: Boolean)
case class DeriveRecord (domain: String, date: String, flag: Boolean, isDelta: 
Boolean, flag_changelog: scala.collection.mutable.ListBuffer[FlagChangeLog])

val rawDS = Seq(
  DeriveRecord("abc.com", "2017-01-09", true, false, ListBuffer.empty),
  DeriveRecord("123.com", "2015-01-01", false, false, ListBuffer.empty),
  DeriveRecord("abc.com", "2018-01-09", false, true, ListBuffer.empty),
  DeriveRecord("123.com", "2017-01-09", true, true, ListBuffer.empty),
  DeriveRecord("xyz.com", "2018-03-09", false, true, ListBuffer.empty)
).toDS

scala> rawDS.show(false)
+---+--+-+---+--+
|domain |date  |flag |isDelta|flag_changelog|
+---+--+-+---+--+
|abc.com|2017-01-09|true |false  |[]|
|123.com|2015-01-01|false|false  |[]|
|abc.com|2018-01-09|false|true   |[]|
|123.com|2017-01-09|true |true   |[]|
|xyz.com|2018-03-09|false|true   |[]|
+---+--+-+---+--+

object ChangeLogAggregator extends Aggregator[DeriveRecord, DeriveRecord, 
DeriveRecord] {
  def zero: DeriveRecord = ///

  def reduce(buffer: DeriveRecord, curr: DeriveRecord): DeriveRecord = {
/// ommit
  }

  def merge(b1: DeriveRecord, b2: DeriveRecord): DeriveRecord = {
/// ommit
  }

  def finish(output: DeriveRecord): DeriveRecord = {
/// ommit
  }

  def bufferEncoder: Encoder[DeriveRecord] = Encoders.product
  def outputEncoder: Encoder[DeriveRecord] = Encoders.product
}

scala> rawDS.select(ChangeLogAggregator.toColumn.name("change_log")).show(false)
+---+--+-+---+---+
|domain |date  |flag |isDelta|flag_changelog
 |
+---+--+-+---+---+
|abc.com|2018-01-09|false|true   |[[2015-01-01,true,false], 
[2018-01-09,false,false]]|
+---+--+-+---+---+

scala> 
rawDS.groupBy($"domain").agg(ChangeLogAggregator.toColumn.name("change_log")).show(false)
18/03/22 22:04:44 ERROR Executor: Exception in task 1.0 in stage 36.0 (TID 48)
java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
to $line15.$read$$iw$$iw$DeriveRecord
at 
$line110.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$ChangeLogAggregator$.reduce(:31)
at 
org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression.update(TypedAggregateExpression.scala:239)
at 
org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.update(interfaces.scala:524)
at 
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1$$anonfun$applyOrElse$1.apply(AggregationIterator.scala:171)
at 
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1$$anonfun$applyOrElse$1.apply(AggregationIterator.scala:171)
at 
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateProcessRow$1.apply(AggregationIterator.scala:187)
at 
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateProcessRow$1.apply(AggregationIterator.scala:181)
at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:162)
at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:80)
at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:111)
at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:103)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.a

Re: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to Case class

2018-03-23 Thread Yong Zhang
I am still stuck with this. Anyone knows the correct way to use the custom 
Aggregator for the case class in agg way?


I like to use Dataset API, but it looks like in aggregation, Spark lost the 
Type, and back to GenericRowWithSchema, instead of my case class. Is that right?


Thanks



From: Yong Zhang 
Sent: Thursday, March 22, 2018 10:08 PM
To: user@spark.apache.org
Subject: java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
to Case class


I am trying to research a custom Aggregator implementation, and following the 
example in the Spark sample code here:

https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedTypedAggregation.scala


But I cannot use it in the agg function, and got the error like 
java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
to my case class. If I don't use the group by, then it works as in the same way 
in the sample code. To make it with group by, what I need to change?


This is on Spark 2.2, as shown below. Following the spark example, I can do

rawDS.select(ChangeLogAggregator.toColumn.name("change_log")).show(false)
without any issue, but if
rawDS.groupBy($"domain").agg(ChangeLogAggregator.toColumn.name("change_log")).show(false)

I will get the cast exception. But I want to apply my custom Aggregator 
implementation per group. How do I fix this?

Thanks


scala> spark.version
res31: String = 2.2.1

case class FlagChangeLog(date: String, old_flag: Boolean, new_flag: Boolean)
case class DeriveRecord (domain: String, date: String, flag: Boolean, isDelta: 
Boolean, flag_changelog: scala.collection.mutable.ListBuffer[FlagChangeLog])

val rawDS = Seq(
  DeriveRecord("abc.com", "2017-01-09", true, false, ListBuffer.empty),
  DeriveRecord("123.com", "2015-01-01", false, false, ListBuffer.empty),
  DeriveRecord("abc.com", "2018-01-09", false, true, ListBuffer.empty),
  DeriveRecord("123.com", "2017-01-09", true, true, ListBuffer.empty),
  DeriveRecord("xyz.com", "2018-03-09", false, true, ListBuffer.empty)
).toDS

scala> rawDS.show(false)
+---+--+-+---+--+
|domain |date  |flag |isDelta|flag_changelog|
+---+--+-+---+--+
|abc.com|2017-01-09|true |false  |[]|
|123.com|2015-01-01|false|false  |[]|
|abc.com|2018-01-09|false|true   |[]|
|123.com|2017-01-09|true |true   |[]|
|xyz.com|2018-03-09|false|true   |[]|
+---+--+-+---+--+

object ChangeLogAggregator extends Aggregator[DeriveRecord, DeriveRecord, 
DeriveRecord] {
  def zero: DeriveRecord = ///

  def reduce(buffer: DeriveRecord, curr: DeriveRecord): DeriveRecord = {
/// ommit
  }

  def merge(b1: DeriveRecord, b2: DeriveRecord): DeriveRecord = {
/// ommit
  }

  def finish(output: DeriveRecord): DeriveRecord = {
/// ommit
  }

  def bufferEncoder: Encoder[DeriveRecord] = Encoders.product
  def outputEncoder: Encoder[DeriveRecord] = Encoders.product
}

scala> rawDS.select(ChangeLogAggregator.toColumn.name("change_log")).show(false)
+---+--+-+---+---+
|domain |date  |flag |isDelta|flag_changelog
 |
+---+--+-+---+---+
|abc.com|2018-01-09|false|true   |[[2015-01-01,true,false], 
[2018-01-09,false,false]]|
+---+--+-+---+---+

scala> 
rawDS.groupBy($"domain").agg(ChangeLogAggregator.toColumn.name("change_log")).show(false)
18/03/22 22:04:44 ERROR Executor: Exception in task 1.0 in stage 36.0 (TID 48)
java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
to $line15.$read$$iw$$iw$DeriveRecord
at 
$line110.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$ChangeLogAggregator$.reduce(:31)
at 
org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression.update(TypedAggregateExpression.scala:239)
at 
org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.update(interfaces.scala:524)
at 
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1$$anonfun$applyOrElse$1.apply(AggregationIterator.scala:171)
at 
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1$$anonfun$applyOrElse$1.apply(AggregationIterator.scala:171)
at 
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateProcessRow$1.apply(AggregationIterator.scala:187)
at 
org.apache.spark.sql.execution.aggregate.Ag

Re: java.lang.UnsupportedOperationException: CSV data source does not support struct/ERROR RetryingBlockFetcher

2018-03-28 Thread Yong Zhang
Your dataframe has array data type, which is NOT supported by CSV. How csv file 
can include array or other nest structure?


If you want your data to be human readable text, write out as json in your case 
then.


Yong



From: Mina Aslani 
Sent: Wednesday, March 28, 2018 12:22 AM
To: naresh Goud
Cc: user @spark
Subject: Re: java.lang.UnsupportedOperationException: CSV data source does not 
support struct/ERROR RetryingBlockFetcher

Hi Naresh,

Thank you for the quick response, appreciate it.
Removing the option("header","true") and trying

df = spark.read.parquet("test.parquet"), now can read the parquet works. 
However, I would like to find a way to have the data in csv/readable.
still I cannot save df as csv as it throws.
ava.lang.UnsupportedOperationException: CSV data source does not support 
struct,values:array> data type.

Any idea?

Best regards,

Mina


On Tue, Mar 27, 2018 at 10:51 PM, naresh Goud 
mailto:nareshgoud.du...@gmail.com>> wrote:
In case of storing as parquet file I don’t think it requires header.
option("header","true")

Give a try by removing header option and then try to read it.  I haven’t tried. 
Just a thought.

Thank you,
Naresh


On Tue, Mar 27, 2018 at 9:47 PM Mina Aslani 
mailto:aslanim...@gmail.com>> wrote:

Hi,


I am using pyspark. To transform my sample data and create model, I use 
stringIndexer and OneHotEncoder.


However, when I try to write data as csv using below command

df.coalesce(1).write.option("header","true").mode("overwrite").csv("output.csv")


I get UnsupportedOperationException

java.lang.UnsupportedOperationException: CSV data source does not support 
struct,values:array> data type.

Therefore, to save data and avoid getting the error I use


df.coalesce(1).write.option("header","true").mode("overwrite").save("output")


The above command saves data but it's in parquet format.
How can I read parquet file and convert to csv to observe the data?

When I use

df = spark.read.parquet("1.parquet"), it throws:

ERROR RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding 
blocks

Your input is appreciated.


Best regards,

Mina



--
Thanks,
Naresh
www.linkedin.com/in/naresh-dulam
http://hadoopandspark.blogspot.com/




Re: how to create all possible combinations from an array? how to join and explode row array?

2018-03-30 Thread Yong Zhang
What's wrong just using a UDF doing for loop in scala? You can change the for 
loop logic for what combination you want.


scala> spark.version
res4: String = 2.2.1

scala> aggDS.printSchema
root
 |-- name: string (nullable = true)
 |-- colors: array (nullable = true)
 ||-- element: string (containsNull = true)


scala> aggDS.show(false)
+++
|name|colors  |
+++
|john|[red, blue, red]|
|bill|[blue, red] |
|sam |[gree]  |
+++

scala> import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions.udf

scala> val loopUDF = udf { x: Seq[String] => for (a <- x; b <-x) yield (a,b) }
loopUDF: org.apache.spark.sql.expressions.UserDefinedFunction = 
UserDefinedFunction(,ArrayType(StructType(StructField(_1,StringType,true),
 StructField(_2,StringType,true)),true),Some(List(ArrayType(StringType,true

scala> aggDS.withColumn("newCol", loopUDF($"colors")).show(false)
+++-+
|name|colors  |newCol   
|
+++-+
|john|[red, blue, red]|[[red,red], [red,blue], [red,red], [blue,red], 
[blue,blue], [blue,red], [red,red], [red,blue], [red,red]]|
|bill|[blue, red] |[[blue,blue], [blue,red], [red,blue], [red,red]] 
|
|sam |[gree]  |[[gree,gree]]
|
+++-

Yong



From: Andy Davidson 
Sent: Friday, March 30, 2018 8:58 PM
To: Andy Davidson; user
Subject: Re: how to create all possible combinations from an array? how to join 
and explode row array?

I was a little sloppy when I created the sample output. Its missing a few pairs

Assume for a given row I have [a, b, c] I want to create something like the 
cartesian join

From: Andrew Davidson 
mailto:a...@santacruzintegration.com>>
Date: Friday, March 30, 2018 at 5:54 PM
To: "user @spark" mailto:user@spark.apache.org>>
Subject: how to create all possible combinations from an array? how to join and 
explode row array?

I have a dataframe and execute  df.groupBy(“xyzy”).agg( collect_list(“abc”)

This produces a column of type array. Now for each row I want to create a 
multiple pairs/tuples from the array so that I can create a contingency table.  
Any idea how I can transform my data so that call crosstab() ? The join 
transformation operate on the entire dataframe. I need something at the row 
array level?


Bellow is some sample python and describes what I would like my results to be?

Kind regards

Andy


c1 = ["john", "bill", "sam"]
c2 = [['red', 'blue', 'red'], ['blue', 'red'], ['green']]
p = pd.DataFrame({"a":c1, "b":c2})

df = sqlContext.createDataFrame(p)
df.printSchema()
df.show()

root
 |-- a: string (nullable = true)
 |-- b: array (nullable = true)
 ||-- element: string (containsNull = true)

+++
|   a|   b|
+++
|john|[red, blue, red]|
|bill   | [blue, red]|
| sam| [green]|
+++


The output I am trying to create is. I could live with a crossJoin (cartesian 
join) and add my own filtering if it makes the problem easier?


+++
|  x1|x2|
+++
red  | blue
red  | red
blue | red
+++




Re: How to read the schema of a partitioned dataframe without listing all the partitions ?

2018-04-27 Thread Yong Zhang
What version of Spark you are using?


You can search "spark.sql.parquet.mergeSchema" on 
https://spark.apache.org/docs/latest/sql-programming-guide.html


Starting from Spark 1.5, the default is already "false", which means Spark 
shouldn't scan all the parquet files to generate the schema.


Yong

Spark SQL and DataFrames - Spark 2.3.0 
Documentation
spark.apache.org
Global Temporary View. Temporary views in Spark SQL are session-scoped and will 
disappear if the session that creates it terminates. If you want to have a 
temporary view that is shared among all sessions and keep alive until the Spark 
application terminates, you can create a global temporary view.





From: Walid LEZZAR 
Sent: Friday, April 27, 2018 7:42 AM
To: spark users
Subject: How to read the schema of a partitioned dataframe without listing all 
the partitions ?

Hi,

I have a parquet on S3 partitioned by day. I have 2 years of data (-> about 
1000 partitions). With spark, when I just want to know the schema of this 
parquet without even asking for a single row of data, spark tries to list all 
the partitions and the nested partitions of the parquet. Which makes it very 
slow just to build the dataframe object on Zeppelin.

Is there a way to avoid that ? Is there way to tell spark : "hey, just read a 
single partition and give me the schema of that partition and consider it as 
the schema of the whole dataframe" ? (I don't care about schema merge, it's off 
by the way)

Thanks.
Walid.


Re: Why Spark JDBC Writing in a sequential order

2018-05-25 Thread Yong Zhang
I am not sure about Redshift, but I know the target table is not partitioned. 
But we should be able to just insert into non-partitioned remote table from 12 
clients concurrently, right?


Even let's say Redshift doesn't allow concurrently write, then Spark Driver 
will detect this and coordinating all tasks and executors as I observed?


Yong


From: Jörn Franke 
Sent: Friday, May 25, 2018 10:50 AM
To: Yong Zhang
Cc: user@spark.apache.org
Subject: Re: Why Spark JDBC Writing in a sequential order

Can your database receive the writes concurrently ? Ie do you make sure that 
each executor writes into a different partition at database side ?

On 25. May 2018, at 16:42, Yong Zhang 
mailto:java8...@hotmail.com>> wrote:


Spark version 2.2.0


We are trying to write a DataFrame to remote relationship database (AWS 
Redshift). Based on the Spark JDBC document, we already repartition our DF as 
12 and set the spark jdbc to concurrent writing for 12 partitions as 
"numPartitions" parameter.


We run the command as following:

dataframe.repartition(12).write.mode("overwrite").option("batchsize", 
5000).option("numPartitions", 12).jdbc(url=jdbcurl, table="tableName", 
connectionProperties=connectionProps)


Here is the Spark UI:




We found out that the 12 tasks obviously are running in sequential order. They 
are all in "Running" status in the beginning at the same time, but if we check 
the "Duration" and "Shuffle Read Size/Records" of them, it is clear that they 
are run one by one.

For example, task 8 finished first in about 2 hours, and wrote 34732 records to 
remote DB (I knew the speed looks terrible, but that's not the question of this 
post), and task 0 started after task 8, and took 4 hours (first 2 hours waiting 
for task 8).

In this picture, only task 2 and 4 are in running stage, but task 4 is 
obviously waiting for task 2 to finish, then start writing after that.


My question is, in the above Spark command, my understanding that 12 executors 
should open the JDBC connection to the remote DB concurrently, and all 12 tasks 
should start writing also in concurrent, and whole job should finish around 2 
hours overall.


Why 12 tasks indeed are in "RUNNING" stage, but looks like waiting for 
something, and can ONLY write to remote DB sequentially? The 12 executors are 
on different JVMs on different physical nodes. Why this is happening? What 
stops Spark pushing the data truly concurrent?


Thanks


Yong



Spark 3.1 with spark AVRO

2022-03-10 Thread Yong Zhang
Hi,

I am puzzled with this issue of Spark 3.1 version to read avro file. Everything 
is done on my local mac laptop so far, and I really don't know where the issue 
comes from, and I googled a lot and cannot find any clue.

I am always using Spark 2.4 version, as it is really mature. But for a new 
project, I want to taste Spark 3.1, which needs to read AVRO file.

To my surprise, on my local, the Spark 3.1.3 throws error when trying to read 
the avro files.

  *   I download the Spark 3.1.2 and 3.1.3 with Hadoop2 or 3 from 
https://spark.apache.org/downloads.html
  *   Use JDK "1.8.0_321" on the Mac
  *   Untar the spark 3.1.x local
  *   And follow https://spark.apache.org/docs/3.1.3/sql-data-sources-avro.html

Start the spark-shell in the following exactly command:

spark-3.1.3-bin-hadoop3.2/bin/spark-shell --packages 
org.apache.spark:spark-avro_2.12:3.1.3

  *

And I always get the following error when read the existing test AVRO files:

scala> val pageview = spark.read.format("avro").load("/Users/user/output/raw/")
java.lang.NoClassDefFoundError: 
org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2

I tried different version of Spark 3.x, from Spark 3.1.2 -> 3.1.3 -> 3.2.1, and 
I believe they are all under Scala 2.12, and I start the spark-shell with 
"--packages org.apache.spark:spark-avro_2.12:x.x.x", which x.x.x matches the 
Spark version, but I got the above wired "NoClassDefFoundError" in all cases.

Meantime, download Spark2.4.8 and start spark-shell with "--packages 
org.apache.spark:spark-avro_2.11:2.4.3", I can read the exactly same ARVO file 
without any issue.

I am thinking it must be done wrongly on my end, but:

  *   I downloaded several versions of Spark and untar them directly.
  *   I DIDN'T have any custom "spark-env.sh/spark-default.conf" file to 
include any potential jar files to mess up things
  *   Straight creating a spark session under spark-shell with the correct 
package and try to read avro files. Nothing more.

I have to doubt there are something wrong with the Spark 3.x avro package 
releases, but I know that possiblity is very low, especailly for multi 
different veresions. But the class 
"org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2" existed under 
"spark-sql_2.12-3.1.3.jar", as blow:
jar tvf spark-sql_2.12-3.1.3.jar | grep FileDataSourceV2
15436 Sun Feb 06 22:54:00 EST 2022 
org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.class

So what could be wrong?

Thanks

Yong



Re: Spark 3.1 with spark AVRO

2022-03-10 Thread Yong Zhang
Thank you so much, you absolutely nailed it.

There is a stupid "SPARK_HOME" env variable pointing to Spark2.4 existed on 
zsh, which is the troublemaker.

Totally forgot that and didn't realize this environment variable could cause 
days frustration for me.

Yong


From: Artemis User 
Sent: Thursday, March 10, 2022 3:13 PM
To: user 
Subject: Re: Spark 3.1 with spark AVRO

It must be some misconfiguration in your environment.  Do you perhaps have a 
hardwired $SPARK_HOME env variable in your shell?  An easy test would be to 
place the spark-avro jar file you downloaded in the jars directory of Spark and 
run spark-shell again without the packages option.  This will guarantee that 
the jar file is on the classpath of Spark driver and executors..

On 3/10/22 1:24 PM, Yong Zhang wrote:
Hi,

I am puzzled with this issue of Spark 3.1 version to read avro file. Everything 
is done on my local mac laptop so far, and I really don't know where the issue 
comes from, and I googled a lot and cannot find any clue.

I am always using Spark 2.4 version, as it is really mature. But for a new 
project, I want to taste Spark 3.1, which needs to read AVRO file.

To my surprise, on my local, the Spark 3.1.3 throws error when trying to read 
the avro files.

  *   I download the Spark 3.1.2 and 3.1.3 with Hadoop2 or 3 from 
https://spark.apache.org/downloads.html
  *   Use JDK "1.8.0_321" on the Mac
  *   Untar the spark 3.1.x local
  *   And follow https://spark.apache.org/docs/3.1.3/sql-data-sources-avro.html

Start the spark-shell in the following exactly command:

spark-3.1.3-bin-hadoop3.2/bin/spark-shell --packages 
org.apache.spark:spark-avro_2.12:3.1.3

  *

And I always get the following error when read the existing test AVRO files:

scala> val pageview = spark.read.format("avro").load("/Users/user/output/raw/")
java.lang.NoClassDefFoundError: 
org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2

I tried different version of Spark 3.x, from Spark 3.1.2 -> 3.1.3 -> 3.2.1, and 
I believe they are all under Scala 2.12, and I start the spark-shell with 
"--packages org.apache.spark:spark-avro_2.12:x.x.x", which x.x.x matches the 
Spark version, but I got the above wired "NoClassDefFoundError" in all cases.

Meantime, download Spark2.4.8 and start spark-shell with "--packages 
org.apache.spark:spark-avro_2.11:2.4.3", I can read the exactly same ARVO file 
without any issue.

I am thinking it must be done wrongly on my end, but:

  *   I downloaded several versions of Spark and untar them directly.
  *   I DIDN'T have any custom "spark-env.sh/spark-default.conf" file to 
include any potential jar files to mess up things
  *   Straight creating a spark session under spark-shell with the correct 
package and try to read avro files. Nothing more.

I have to doubt there are something wrong with the Spark 3.x avro package 
releases, but I know that possiblity is very low, especailly for multi 
different veresions. But the class 
"org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2" existed under 
"spark-sql_2.12-3.1.3.jar", as blow:
jar tvf spark-sql_2.12-3.1.3.jar | grep FileDataSourceV2
15436 Sun Feb 06 22:54:00 EST 2022 
org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.class

So what could be wrong?

Thanks

Yong




Can Spark SQL (not DataFrame or Dataset) aggregate array into map of element of count?

2023-05-05 Thread Yong Zhang
Hi, This is on Spark 3.1 environment.

For some reason, I can ONLY do this in Spark SQL, instead of either Scala or 
PySpark environment.

I want to aggregate an array into a Map of element count, within that array, 
but in Spark SQL.
I know that there is an aggregate function available like

aggregate(expr, start, merge [, finish])

But I want to know if this can be done in the Spark SQL only, and:

  *   How to represent an empty Map as "start" element above
  *   How to merge each element (as String type) into Map (as adding count if 
exist in the Map, or add as (element -> 1) as new entry in the Map if not exist)

Like the following example -> 
https://docs.databricks.com/sql/language-manual/functions/aggregate.html

SELECT aggregate(array(1, 2, 3, 4),
   named_struct('sum', 0, 'cnt', 0),
   (acc, x) -> named_struct('sum', acc.sum + x, 'cnt', acc.cnt 
+ 1),
   acc -> acc.sum / acc.cnt) AS avg

I wonder:
select
  aggregate(
  array('a','b','a')),
  map('', 0),
  (acc, x) -> ???
  acc -> acc) as output

How to do the logic after "(acc, x) -> ", so I can output a map of count of 
each element in the array?
I know I can "explode", then groupby + count, but since I have multi array 
columns need to transform, so I want to do more a high order function way, and 
in pure Spark SQL.

Thanks



Re: Can Spark SQL (not DataFrame or Dataset) aggregate array into map of element of count?

2023-05-09 Thread Yong Zhang
Hi, Mich:

Thanks for your reply, but maybe I didn't make my question clear.

I am looking for a solution to compute the count of each element in an array, 
without "exploding" the array, and output a Map structure as a column.
For example, for an array as ('a', 'b', 'a'), I want to output a column as 
Map('a' -> 2, 'b' -> 1).
I think that "aggregate" function should be able to, using the example shown in 
the link of my original email, as

SELECT aggregate(array('a', 'b', 'a'),
   map(),
   (acc, x) -> ???,
   acc -> acc) AS feq_cnt

Here are my questions:

  *   Is using "map()" above the best way? The "start" structure in this case 
should be Map.empty[String, Int], but of course, it won't work in pure Spark 
SQL, so the best solution I can think of is "map()", and it is a mutable Map.
  *   How to implement the logic in "???" place? If I do it in the Scala, I 
will do "acc.update(x, acc.getOrElse(x, 0) + 1)", which means if element 
exists, plus one for the value; otherwise, start the element with count of 0. 
Of course, the above code wont' work in Spark SQL.
  *   As I said, I am NOT running in either Scale or PySpark session, but in a 
pure Spark SQL.
  *   Is it possible to do the above logic in Spark SQL, without using 
"exploding"?

Thanks


From: Mich Talebzadeh 
Sent: Saturday, May 6, 2023 4:52 PM
To: Yong Zhang 
Cc: user@spark.apache.org 
Subject: Re: Can Spark SQL (not DataFrame or Dataset) aggregate array into map 
of element of count?

you can create DF from your SQL RS and work with that in Python the way you want

## you don't need all these
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf, col, current_timestamp, lit
from pyspark.sql.types import *
sqltext = """
SELECT aggregate(array(1, 2, 3, 4),
   named_struct('sum', 0, 'cnt', 0),
   (acc, x) -> named_struct('sum', acc.sum + x, 'cnt', acc.cnt 
+ 1),
   acc -> acc.sum / acc.cnt) AS avg
"""
df = spark.sql(sqltext)
df.printSchema()

root
 |-- avg: double (nullable = true)


Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


 
[https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE]
   view my Linkedin 
profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Fri, 5 May 2023 at 20:33, Yong Zhang 
mailto:java8...@hotmail.com>> wrote:
Hi, This is on Spark 3.1 environment.

For some reason, I can ONLY do this in Spark SQL, instead of either Scala or 
PySpark environment.

I want to aggregate an array into a Map of element count, within that array, 
but in Spark SQL.
I know that there is an aggregate function available like

aggregate(expr, start, merge [, finish])

But I want to know if this can be done in the Spark SQL only, and:

  *   How to represent an empty Map as "start" element above
  *   How to merge each element (as String type) into Map (as adding count if 
exist in the Map, or add as (element -> 1) as new entry in the Map if not exist)

Like the following example -> 
https://docs.databricks.com/sql/language-manual/functions/aggregate.html

SELECT aggregate(array(1, 2, 3, 4),
   named_struct('sum', 0, 'cnt', 0),
   (acc, x) -> named_struct('sum', acc.sum + x, 'cnt', acc.cnt 
+ 1),
   acc -> acc.sum / acc.cnt) AS avg

I wonder:
select
  aggregate(
  array('a','b','a')),
  map('', 0),
  (acc, x) -> ???
  acc -> acc) as output

How to do the logic after "(acc, x) -> ", so I can output a map of count of 
each element in the array?
I know I can "explode", then groupby + count, but since I have multi array 
columns need to transform, so I want to do more a high order function way, and 
in pure Spark SQL.

Thanks



Re: Can Spark SQL (not DataFrame or Dataset) aggregate array into map of element of count?

2023-05-09 Thread Yong Zhang
Hi, Mich:

Thanks for your reply, but maybe I didn't make my question clear.

I am looking for a solution to compute the count of each element in an array, 
without "exploding" the array, and output a Map structure as a column.
For example, for an array as ('a', 'b', 'a'), I want to output a column as 
Map('a' -> 2, 'b' -> 1).
I think that the "aggregate" function should be able to, following the example 
shown in the link of my original email, as

SELECT aggregate(array('a', 'b', 'a'),   map(), 
  (acc, x) -> ???,   acc -> acc) AS feq_cnt

Here are my questions:

  *   Is using "map()" above the best way? The "start" structure in this case 
should be Map.empty[String, Int], but of course, it won't work in pure Spark 
SQL, so the best solution I can think of is "map()", and I want a mutable Map.
  *   How to implement the logic in "???" place? If I do it in Scala, I will do 
"acc.update(x, acc.getOrElse(x, 0) + 1)", which means if an element exists, 
plus one for the value; otherwise, start the element with the count of 1. Of 
course, the above code won't work in Spark SQL.
  *   As I said, I am NOT running in either Scale or PySpark session, but in a 
pure Spark SQL.
  *   Is it possible to do the above logic in Spark SQL, without using 
"exploding"?

Thanks


From: Mich Talebzadeh 
Sent: Saturday, May 6, 2023 4:52 PM
To: Yong Zhang 
Cc: user@spark.apache.org 
Subject: Re: Can Spark SQL (not DataFrame or Dataset) aggregate array into map 
of element of count?

you can create DF from your SQL RS and work with that in Python the way you want

## you don't need all these
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf, col, current_timestamp, lit
from pyspark.sql.types import *
sqltext = """
SELECT aggregate(array(1, 2, 3, 4),
   named_struct('sum', 0, 'cnt', 0),
   (acc, x) -> named_struct('sum', acc.sum + x, 'cnt', acc.cnt 
+ 1),
   acc -> acc.sum / acc.cnt) AS avg
"""
df = spark.sql(sqltext)
df.printSchema()

root
 |-- avg: double (nullable = true)


Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


 
[https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE]
   view my Linkedin 
profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Fri, 5 May 2023 at 20:33, Yong Zhang 
mailto:java8...@hotmail.com>> wrote:
Hi, This is on Spark 3.1 environment.

For some reason, I can ONLY do this in Spark SQL, instead of either Scala or 
PySpark environment.

I want to aggregate an array into a Map of element count, within that array, 
but in Spark SQL.
I know that there is an aggregate function available like

aggregate(expr, start, merge [, finish])

But I want to know if this can be done in the Spark SQL only, and:

  *   How to represent an empty Map as "start" element above
  *   How to merge each element (as String type) into Map (as adding count if 
exist in the Map, or add as (element -> 1) as new entry in the Map if not exist)

Like the following example -> 
https://docs.databricks.com/sql/language-manual/functions/aggregate.html

SELECT aggregate(array(1, 2, 3, 4),
   named_struct('sum', 0, 'cnt', 0),
   (acc, x) -> named_struct('sum', acc.sum + x, 'cnt', acc.cnt 
+ 1),
   acc -> acc.sum / acc.cnt) AS avg

I wonder:
select
  aggregate(
  array('a','b','a')),
  map('', 0),
  (acc, x) -> ???
  acc -> acc) as output

How to do the logic after "(acc, x) -> ", so I can output a map of count of 
each element in the array?
I know I can "explode", then groupby + count, but since I have multi array 
columns need to transform, so I want to do more a high order function way, and 
in pure Spark SQL.

Thanks



<    1   2