Re: Can Spark SQL (not DataFrame or Dataset) aggregate array into map of element of count?

2023-05-09 Thread Yong Zhang
Hi, Mich:

Thanks for your reply, but maybe I didn't make my question clear.

I am looking for a solution to compute the count of each element in an array, 
without "exploding" the array, and output a Map structure as a column.
For example, for an array as ('a', 'b', 'a'), I want to output a column as 
Map('a' -> 2, 'b' -> 1).
I think that the "aggregate" function should be able to, following the example 
shown in the link of my original email, as

SELECT aggregate(array('a', 'b', 'a'),   map(), 
  (acc, x) -> ???,   acc -> acc) AS feq_cnt

Here are my questions:

  *   Is using "map()" above the best way? The "start" structure in this case 
should be Map.empty[String, Int], but of course, it won't work in pure Spark 
SQL, so the best solution I can think of is "map()", and I want a mutable Map.
  *   How to implement the logic in "???" place? If I do it in Scala, I will do 
"acc.update(x, acc.getOrElse(x, 0) + 1)", which means if an element exists, 
plus one for the value; otherwise, start the element with the count of 1. Of 
course, the above code won't work in Spark SQL.
  *   As I said, I am NOT running in either Scale or PySpark session, but in a 
pure Spark SQL.
  *   Is it possible to do the above logic in Spark SQL, without using 
"exploding"?

Thanks

____
From: Mich Talebzadeh 
Sent: Saturday, May 6, 2023 4:52 PM
To: Yong Zhang 
Cc: user@spark.apache.org 
Subject: Re: Can Spark SQL (not DataFrame or Dataset) aggregate array into map 
of element of count?

you can create DF from your SQL RS and work with that in Python the way you want

## you don't need all these
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf, col, current_timestamp, lit
from pyspark.sql.types import *
sqltext = """
SELECT aggregate(array(1, 2, 3, 4),
   named_struct('sum', 0, 'cnt', 0),
   (acc, x) -> named_struct('sum', acc.sum + x, 'cnt', acc.cnt 
+ 1),
   acc -> acc.sum / acc.cnt) AS avg
"""
df = spark.sql(sqltext)
df.printSchema()

root
 |-- avg: double (nullable = true)


Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


 
[https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE]
   view my Linkedin 
profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Fri, 5 May 2023 at 20:33, Yong Zhang 
mailto:java8...@hotmail.com>> wrote:
Hi, This is on Spark 3.1 environment.

For some reason, I can ONLY do this in Spark SQL, instead of either Scala or 
PySpark environment.

I want to aggregate an array into a Map of element count, within that array, 
but in Spark SQL.
I know that there is an aggregate function available like

aggregate(expr, start, merge [, finish])

But I want to know if this can be done in the Spark SQL only, and:

  *   How to represent an empty Map as "start" element above
  *   How to merge each element (as String type) into Map (as adding count if 
exist in the Map, or add as (element -> 1) as new entry in the Map if not exist)

Like the following example -> 
https://docs.databricks.com/sql/language-manual/functions/aggregate.html

SELECT aggregate(array(1, 2, 3, 4),
   named_struct('sum', 0, 'cnt', 0),
   (acc, x) -> named_struct('sum', acc.sum + x, 'cnt', acc.cnt 
+ 1),
   acc -> acc.sum / acc.cnt) AS avg

I wonder:
select
  aggregate(
  array('a','b','a')),
  map('', 0),
  (acc, x) -> ???
  acc -> acc) as output

How to do the logic after "(acc, x) -> ", so I can output a map of count of 
each element in the array?
I know I can "explode", then groupby + count, but since I have multi array 
columns need to transform, so I want to do more a high order function way, and 
in pure Spark SQL.

Thanks



Re: Can Spark SQL (not DataFrame or Dataset) aggregate array into map of element of count?

2023-05-09 Thread Yong Zhang
Hi, Mich:

Thanks for your reply, but maybe I didn't make my question clear.

I am looking for a solution to compute the count of each element in an array, 
without "exploding" the array, and output a Map structure as a column.
For example, for an array as ('a', 'b', 'a'), I want to output a column as 
Map('a' -> 2, 'b' -> 1).
I think that "aggregate" function should be able to, using the example shown in 
the link of my original email, as

SELECT aggregate(array('a', 'b', 'a'),
   map(),
   (acc, x) -> ???,
   acc -> acc) AS feq_cnt

Here are my questions:

  *   Is using "map()" above the best way? The "start" structure in this case 
should be Map.empty[String, Int], but of course, it won't work in pure Spark 
SQL, so the best solution I can think of is "map()", and it is a mutable Map.
  *   How to implement the logic in "???" place? If I do it in the Scala, I 
will do "acc.update(x, acc.getOrElse(x, 0) + 1)", which means if element 
exists, plus one for the value; otherwise, start the element with count of 0. 
Of course, the above code wont' work in Spark SQL.
  *   As I said, I am NOT running in either Scale or PySpark session, but in a 
pure Spark SQL.
  *   Is it possible to do the above logic in Spark SQL, without using 
"exploding"?

Thanks

____
From: Mich Talebzadeh 
Sent: Saturday, May 6, 2023 4:52 PM
To: Yong Zhang 
Cc: user@spark.apache.org 
Subject: Re: Can Spark SQL (not DataFrame or Dataset) aggregate array into map 
of element of count?

you can create DF from your SQL RS and work with that in Python the way you want

## you don't need all these
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf, col, current_timestamp, lit
from pyspark.sql.types import *
sqltext = """
SELECT aggregate(array(1, 2, 3, 4),
   named_struct('sum', 0, 'cnt', 0),
   (acc, x) -> named_struct('sum', acc.sum + x, 'cnt', acc.cnt 
+ 1),
   acc -> acc.sum / acc.cnt) AS avg
"""
df = spark.sql(sqltext)
df.printSchema()

root
 |-- avg: double (nullable = true)


Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


 
[https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE]
   view my Linkedin 
profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Fri, 5 May 2023 at 20:33, Yong Zhang 
mailto:java8...@hotmail.com>> wrote:
Hi, This is on Spark 3.1 environment.

For some reason, I can ONLY do this in Spark SQL, instead of either Scala or 
PySpark environment.

I want to aggregate an array into a Map of element count, within that array, 
but in Spark SQL.
I know that there is an aggregate function available like

aggregate(expr, start, merge [, finish])

But I want to know if this can be done in the Spark SQL only, and:

  *   How to represent an empty Map as "start" element above
  *   How to merge each element (as String type) into Map (as adding count if 
exist in the Map, or add as (element -> 1) as new entry in the Map if not exist)

Like the following example -> 
https://docs.databricks.com/sql/language-manual/functions/aggregate.html

SELECT aggregate(array(1, 2, 3, 4),
   named_struct('sum', 0, 'cnt', 0),
   (acc, x) -> named_struct('sum', acc.sum + x, 'cnt', acc.cnt 
+ 1),
   acc -> acc.sum / acc.cnt) AS avg

I wonder:
select
  aggregate(
  array('a','b','a')),
  map('', 0),
  (acc, x) -> ???
  acc -> acc) as output

How to do the logic after "(acc, x) -> ", so I can output a map of count of 
each element in the array?
I know I can "explode", then groupby + count, but since I have multi array 
columns need to transform, so I want to do more a high order function way, and 
in pure Spark SQL.

Thanks



Can Spark SQL (not DataFrame or Dataset) aggregate array into map of element of count?

2023-05-05 Thread Yong Zhang
Hi, This is on Spark 3.1 environment.

For some reason, I can ONLY do this in Spark SQL, instead of either Scala or 
PySpark environment.

I want to aggregate an array into a Map of element count, within that array, 
but in Spark SQL.
I know that there is an aggregate function available like

aggregate(expr, start, merge [, finish])

But I want to know if this can be done in the Spark SQL only, and:

  *   How to represent an empty Map as "start" element above
  *   How to merge each element (as String type) into Map (as adding count if 
exist in the Map, or add as (element -> 1) as new entry in the Map if not exist)

Like the following example -> 
https://docs.databricks.com/sql/language-manual/functions/aggregate.html

SELECT aggregate(array(1, 2, 3, 4),
   named_struct('sum', 0, 'cnt', 0),
   (acc, x) -> named_struct('sum', acc.sum + x, 'cnt', acc.cnt 
+ 1),
   acc -> acc.sum / acc.cnt) AS avg

I wonder:
select
  aggregate(
  array('a','b','a')),
  map('', 0),
  (acc, x) -> ???
  acc -> acc) as output

How to do the logic after "(acc, x) -> ", so I can output a map of count of 
each element in the array?
I know I can "explode", then groupby + count, but since I have multi array 
columns need to transform, so I want to do more a high order function way, and 
in pure Spark SQL.

Thanks



Re: Spark 3.1 with spark AVRO

2022-03-10 Thread Yong Zhang
Thank you so much, you absolutely nailed it.

There is a stupid "SPARK_HOME" env variable pointing to Spark2.4 existed on 
zsh, which is the troublemaker.

Totally forgot that and didn't realize this environment variable could cause 
days frustration for me.

Yong


From: Artemis User 
Sent: Thursday, March 10, 2022 3:13 PM
To: user 
Subject: Re: Spark 3.1 with spark AVRO

It must be some misconfiguration in your environment.  Do you perhaps have a 
hardwired $SPARK_HOME env variable in your shell?  An easy test would be to 
place the spark-avro jar file you downloaded in the jars directory of Spark and 
run spark-shell again without the packages option.  This will guarantee that 
the jar file is on the classpath of Spark driver and executors..

On 3/10/22 1:24 PM, Yong Zhang wrote:
Hi,

I am puzzled with this issue of Spark 3.1 version to read avro file. Everything 
is done on my local mac laptop so far, and I really don't know where the issue 
comes from, and I googled a lot and cannot find any clue.

I am always using Spark 2.4 version, as it is really mature. But for a new 
project, I want to taste Spark 3.1, which needs to read AVRO file.

To my surprise, on my local, the Spark 3.1.3 throws error when trying to read 
the avro files.

  *   I download the Spark 3.1.2 and 3.1.3 with Hadoop2 or 3 from 
https://spark.apache.org/downloads.html
  *   Use JDK "1.8.0_321" on the Mac
  *   Untar the spark 3.1.x local
  *   And follow https://spark.apache.org/docs/3.1.3/sql-data-sources-avro.html

Start the spark-shell in the following exactly command:

spark-3.1.3-bin-hadoop3.2/bin/spark-shell --packages 
org.apache.spark:spark-avro_2.12:3.1.3

  *

And I always get the following error when read the existing test AVRO files:

scala> val pageview = spark.read.format("avro").load("/Users/user/output/raw/")
java.lang.NoClassDefFoundError: 
org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2

I tried different version of Spark 3.x, from Spark 3.1.2 -> 3.1.3 -> 3.2.1, and 
I believe they are all under Scala 2.12, and I start the spark-shell with 
"--packages org.apache.spark:spark-avro_2.12:x.x.x", which x.x.x matches the 
Spark version, but I got the above wired "NoClassDefFoundError" in all cases.

Meantime, download Spark2.4.8 and start spark-shell with "--packages 
org.apache.spark:spark-avro_2.11:2.4.3", I can read the exactly same ARVO file 
without any issue.

I am thinking it must be done wrongly on my end, but:

  *   I downloaded several versions of Spark and untar them directly.
  *   I DIDN'T have any custom "spark-env.sh/spark-default.conf" file to 
include any potential jar files to mess up things
  *   Straight creating a spark session under spark-shell with the correct 
package and try to read avro files. Nothing more.

I have to doubt there are something wrong with the Spark 3.x avro package 
releases, but I know that possiblity is very low, especailly for multi 
different veresions. But the class 
"org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2" existed under 
"spark-sql_2.12-3.1.3.jar", as blow:
jar tvf spark-sql_2.12-3.1.3.jar | grep FileDataSourceV2
15436 Sun Feb 06 22:54:00 EST 2022 
org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.class

So what could be wrong?

Thanks

Yong




Spark 3.1 with spark AVRO

2022-03-10 Thread Yong Zhang
Hi,

I am puzzled with this issue of Spark 3.1 version to read avro file. Everything 
is done on my local mac laptop so far, and I really don't know where the issue 
comes from, and I googled a lot and cannot find any clue.

I am always using Spark 2.4 version, as it is really mature. But for a new 
project, I want to taste Spark 3.1, which needs to read AVRO file.

To my surprise, on my local, the Spark 3.1.3 throws error when trying to read 
the avro files.

  *   I download the Spark 3.1.2 and 3.1.3 with Hadoop2 or 3 from 
https://spark.apache.org/downloads.html
  *   Use JDK "1.8.0_321" on the Mac
  *   Untar the spark 3.1.x local
  *   And follow https://spark.apache.org/docs/3.1.3/sql-data-sources-avro.html

Start the spark-shell in the following exactly command:

spark-3.1.3-bin-hadoop3.2/bin/spark-shell --packages 
org.apache.spark:spark-avro_2.12:3.1.3

  *

And I always get the following error when read the existing test AVRO files:

scala> val pageview = spark.read.format("avro").load("/Users/user/output/raw/")
java.lang.NoClassDefFoundError: 
org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2

I tried different version of Spark 3.x, from Spark 3.1.2 -> 3.1.3 -> 3.2.1, and 
I believe they are all under Scala 2.12, and I start the spark-shell with 
"--packages org.apache.spark:spark-avro_2.12:x.x.x", which x.x.x matches the 
Spark version, but I got the above wired "NoClassDefFoundError" in all cases.

Meantime, download Spark2.4.8 and start spark-shell with "--packages 
org.apache.spark:spark-avro_2.11:2.4.3", I can read the exactly same ARVO file 
without any issue.

I am thinking it must be done wrongly on my end, but:

  *   I downloaded several versions of Spark and untar them directly.
  *   I DIDN'T have any custom "spark-env.sh/spark-default.conf" file to 
include any potential jar files to mess up things
  *   Straight creating a spark session under spark-shell with the correct 
package and try to read avro files. Nothing more.

I have to doubt there are something wrong with the Spark 3.x avro package 
releases, but I know that possiblity is very low, especailly for multi 
different veresions. But the class 
"org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2" existed under 
"spark-sql_2.12-3.1.3.jar", as blow:
jar tvf spark-sql_2.12-3.1.3.jar | grep FileDataSourceV2
15436 Sun Feb 06 22:54:00 EST 2022 
org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.class

So what could be wrong?

Thanks

Yong



Re: Why Spark JDBC Writing in a sequential order

2018-05-25 Thread Yong Zhang
I am not sure about Redshift, but I know the target table is not partitioned. 
But we should be able to just insert into non-partitioned remote table from 12 
clients concurrently, right?


Even let's say Redshift doesn't allow concurrently write, then Spark Driver 
will detect this and coordinating all tasks and executors as I observed?


Yong


From: Jörn Franke <jornfra...@gmail.com>
Sent: Friday, May 25, 2018 10:50 AM
To: Yong Zhang
Cc: user@spark.apache.org
Subject: Re: Why Spark JDBC Writing in a sequential order

Can your database receive the writes concurrently ? Ie do you make sure that 
each executor writes into a different partition at database side ?

On 25. May 2018, at 16:42, Yong Zhang 
<java8...@hotmail.com<mailto:java8...@hotmail.com>> wrote:


Spark version 2.2.0


We are trying to write a DataFrame to remote relationship database (AWS 
Redshift). Based on the Spark JDBC document, we already repartition our DF as 
12 and set the spark jdbc to concurrent writing for 12 partitions as 
"numPartitions" parameter.


We run the command as following:

dataframe.repartition(12).write.mode("overwrite").option("batchsize", 
5000).option("numPartitions", 12).jdbc(url=jdbcurl, table="tableName", 
connectionProperties=connectionProps)


Here is the Spark UI:




We found out that the 12 tasks obviously are running in sequential order. They 
are all in "Running" status in the beginning at the same time, but if we check 
the "Duration" and "Shuffle Read Size/Records" of them, it is clear that they 
are run one by one.

For example, task 8 finished first in about 2 hours, and wrote 34732 records to 
remote DB (I knew the speed looks terrible, but that's not the question of this 
post), and task 0 started after task 8, and took 4 hours (first 2 hours waiting 
for task 8).

In this picture, only task 2 and 4 are in running stage, but task 4 is 
obviously waiting for task 2 to finish, then start writing after that.


My question is, in the above Spark command, my understanding that 12 executors 
should open the JDBC connection to the remote DB concurrently, and all 12 tasks 
should start writing also in concurrent, and whole job should finish around 2 
hours overall.


Why 12 tasks indeed are in "RUNNING" stage, but looks like waiting for 
something, and can ONLY write to remote DB sequentially? The 12 executors are 
on different JVMs on different physical nodes. Why this is happening? What 
stops Spark pushing the data truly concurrent?


Thanks


Yong



Re: How to read the schema of a partitioned dataframe without listing all the partitions ?

2018-04-27 Thread Yong Zhang
What version of Spark you are using?


You can search "spark.sql.parquet.mergeSchema" on 
https://spark.apache.org/docs/latest/sql-programming-guide.html


Starting from Spark 1.5, the default is already "false", which means Spark 
shouldn't scan all the parquet files to generate the schema.


Yong

Spark SQL and DataFrames - Spark 2.3.0 
Documentation
spark.apache.org
Global Temporary View. Temporary views in Spark SQL are session-scoped and will 
disappear if the session that creates it terminates. If you want to have a 
temporary view that is shared among all sessions and keep alive until the Spark 
application terminates, you can create a global temporary view.





From: Walid LEZZAR 
Sent: Friday, April 27, 2018 7:42 AM
To: spark users
Subject: How to read the schema of a partitioned dataframe without listing all 
the partitions ?

Hi,

I have a parquet on S3 partitioned by day. I have 2 years of data (-> about 
1000 partitions). With spark, when I just want to know the schema of this 
parquet without even asking for a single row of data, spark tries to list all 
the partitions and the nested partitions of the parquet. Which makes it very 
slow just to build the dataframe object on Zeppelin.

Is there a way to avoid that ? Is there way to tell spark : "hey, just read a 
single partition and give me the schema of that partition and consider it as 
the schema of the whole dataframe" ? (I don't care about schema merge, it's off 
by the way)

Thanks.
Walid.


Re: how to create all possible combinations from an array? how to join and explode row array?

2018-03-30 Thread Yong Zhang
What's wrong just using a UDF doing for loop in scala? You can change the for 
loop logic for what combination you want.


scala> spark.version
res4: String = 2.2.1

scala> aggDS.printSchema
root
 |-- name: string (nullable = true)
 |-- colors: array (nullable = true)
 ||-- element: string (containsNull = true)


scala> aggDS.show(false)
+++
|name|colors  |
+++
|john|[red, blue, red]|
|bill|[blue, red] |
|sam |[gree]  |
+++

scala> import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions.udf

scala> val loopUDF = udf { x: Seq[String] => for (a <- x; b <-x) yield (a,b) }
loopUDF: org.apache.spark.sql.expressions.UserDefinedFunction = 
UserDefinedFunction(,ArrayType(StructType(StructField(_1,StringType,true),
 StructField(_2,StringType,true)),true),Some(List(ArrayType(StringType,true

scala> aggDS.withColumn("newCol", loopUDF($"colors")).show(false)
+++-+
|name|colors  |newCol   
|
+++-+
|john|[red, blue, red]|[[red,red], [red,blue], [red,red], [blue,red], 
[blue,blue], [blue,red], [red,red], [red,blue], [red,red]]|
|bill|[blue, red] |[[blue,blue], [blue,red], [red,blue], [red,red]] 
|
|sam |[gree]  |[[gree,gree]]
|
+++-

Yong



From: Andy Davidson 
Sent: Friday, March 30, 2018 8:58 PM
To: Andy Davidson; user
Subject: Re: how to create all possible combinations from an array? how to join 
and explode row array?

I was a little sloppy when I created the sample output. Its missing a few pairs

Assume for a given row I have [a, b, c] I want to create something like the 
cartesian join

From: Andrew Davidson 
>
Date: Friday, March 30, 2018 at 5:54 PM
To: "user @spark" >
Subject: how to create all possible combinations from an array? how to join and 
explode row array?

I have a dataframe and execute  df.groupBy(“xyzy”).agg( collect_list(“abc”)

This produces a column of type array. Now for each row I want to create a 
multiple pairs/tuples from the array so that I can create a contingency table.  
Any idea how I can transform my data so that call crosstab() ? The join 
transformation operate on the entire dataframe. I need something at the row 
array level?


Bellow is some sample python and describes what I would like my results to be?

Kind regards

Andy


c1 = ["john", "bill", "sam"]
c2 = [['red', 'blue', 'red'], ['blue', 'red'], ['green']]
p = pd.DataFrame({"a":c1, "b":c2})

df = sqlContext.createDataFrame(p)
df.printSchema()
df.show()

root
 |-- a: string (nullable = true)
 |-- b: array (nullable = true)
 ||-- element: string (containsNull = true)

+++
|   a|   b|
+++
|john|[red, blue, red]|
|bill   | [blue, red]|
| sam| [green]|
+++


The output I am trying to create is. I could live with a crossJoin (cartesian 
join) and add my own filtering if it makes the problem easier?


+++
|  x1|x2|
+++
red  | blue
red  | red
blue | red
+++




Re: java.lang.UnsupportedOperationException: CSV data source does not support struct/ERROR RetryingBlockFetcher

2018-03-28 Thread Yong Zhang
Your dataframe has array data type, which is NOT supported by CSV. How csv file 
can include array or other nest structure?


If you want your data to be human readable text, write out as json in your case 
then.


Yong



From: Mina Aslani 
Sent: Wednesday, March 28, 2018 12:22 AM
To: naresh Goud
Cc: user @spark
Subject: Re: java.lang.UnsupportedOperationException: CSV data source does not 
support struct/ERROR RetryingBlockFetcher

Hi Naresh,

Thank you for the quick response, appreciate it.
Removing the option("header","true") and trying

df = spark.read.parquet("test.parquet"), now can read the parquet works. 
However, I would like to find a way to have the data in csv/readable.
still I cannot save df as csv as it throws.
ava.lang.UnsupportedOperationException: CSV data source does not support 
struct data type.

Any idea?

Best regards,

Mina


On Tue, Mar 27, 2018 at 10:51 PM, naresh Goud 
> wrote:
In case of storing as parquet file I don’t think it requires header.
option("header","true")

Give a try by removing header option and then try to read it.  I haven’t tried. 
Just a thought.

Thank you,
Naresh


On Tue, Mar 27, 2018 at 9:47 PM Mina Aslani 
> wrote:

Hi,


I am using pyspark. To transform my sample data and create model, I use 
stringIndexer and OneHotEncoder.


However, when I try to write data as csv using below command

df.coalesce(1).write.option("header","true").mode("overwrite").csv("output.csv")


I get UnsupportedOperationException

java.lang.UnsupportedOperationException: CSV data source does not support 
struct data type.

Therefore, to save data and avoid getting the error I use


df.coalesce(1).write.option("header","true").mode("overwrite").save("output")


The above command saves data but it's in parquet format.
How can I read parquet file and convert to csv to observe the data?

When I use

df = spark.read.parquet("1.parquet"), it throws:

ERROR RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding 
blocks

Your input is appreciated.


Best regards,

Mina



--
Thanks,
Naresh
www.linkedin.com/in/naresh-dulam
http://hadoopandspark.blogspot.com/




Re: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to Case class

2018-03-23 Thread Yong Zhang
I am still stuck with this. Anyone knows the correct way to use the custom 
Aggregator for the case class in agg way?


I like to use Dataset API, but it looks like in aggregation, Spark lost the 
Type, and back to GenericRowWithSchema, instead of my case class. Is that right?


Thanks



From: Yong Zhang <java8...@hotmail.com>
Sent: Thursday, March 22, 2018 10:08 PM
To: user@spark.apache.org
Subject: java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
to Case class


I am trying to research a custom Aggregator implementation, and following the 
example in the Spark sample code here:

https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedTypedAggregation.scala


But I cannot use it in the agg function, and got the error like 
java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
to my case class. If I don't use the group by, then it works as in the same way 
in the sample code. To make it with group by, what I need to change?


This is on Spark 2.2, as shown below. Following the spark example, I can do

rawDS.select(ChangeLogAggregator.toColumn.name("change_log")).show(false)
without any issue, but if
rawDS.groupBy($"domain").agg(ChangeLogAggregator.toColumn.name("change_log")).show(false)

I will get the cast exception. But I want to apply my custom Aggregator 
implementation per group. How do I fix this?

Thanks


scala> spark.version
res31: String = 2.2.1

case class FlagChangeLog(date: String, old_flag: Boolean, new_flag: Boolean)
case class DeriveRecord (domain: String, date: String, flag: Boolean, isDelta: 
Boolean, flag_changelog: scala.collection.mutable.ListBuffer[FlagChangeLog])

val rawDS = Seq(
  DeriveRecord("abc.com", "2017-01-09", true, false, ListBuffer.empty),
  DeriveRecord("123.com", "2015-01-01", false, false, ListBuffer.empty),
  DeriveRecord("abc.com", "2018-01-09", false, true, ListBuffer.empty),
  DeriveRecord("123.com", "2017-01-09", true, true, ListBuffer.empty),
  DeriveRecord("xyz.com", "2018-03-09", false, true, ListBuffer.empty)
).toDS

scala> rawDS.show(false)
+---+--+-+---+--+
|domain |date  |flag |isDelta|flag_changelog|
+---+--+-+---+--+
|abc.com|2017-01-09|true |false  |[]|
|123.com|2015-01-01|false|false  |[]|
|abc.com|2018-01-09|false|true   |[]|
|123.com|2017-01-09|true |true   |[]|
|xyz.com|2018-03-09|false|true   |[]|
+---+--+-+---+--+

object ChangeLogAggregator extends Aggregator[DeriveRecord, DeriveRecord, 
DeriveRecord] {
  def zero: DeriveRecord = ///

  def reduce(buffer: DeriveRecord, curr: DeriveRecord): DeriveRecord = {
/// ommit
  }

  def merge(b1: DeriveRecord, b2: DeriveRecord): DeriveRecord = {
/// ommit
  }

  def finish(output: DeriveRecord): DeriveRecord = {
/// ommit
  }

  def bufferEncoder: Encoder[DeriveRecord] = Encoders.product
  def outputEncoder: Encoder[DeriveRecord] = Encoders.product
}

scala> rawDS.select(ChangeLogAggregator.toColumn.name("change_log")).show(false)
+---+--+-+---+---+
|domain |date  |flag |isDelta|flag_changelog
 |
+---+--+-+---+---+
|abc.com|2018-01-09|false|true   |[[2015-01-01,true,false], 
[2018-01-09,false,false]]|
+---+--+-+---+---+

scala> 
rawDS.groupBy($"domain").agg(ChangeLogAggregator.toColumn.name("change_log")).show(false)
18/03/22 22:04:44 ERROR Executor: Exception in task 1.0 in stage 36.0 (TID 48)
java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
to $line15.$read$$iw$$iw$DeriveRecord
at 
$line110.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$ChangeLogAggregator$.reduce(:31)
at 
org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression.update(TypedAggregateExpression.scala:239)
at 
org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.update(interfaces.scala:524)
at 
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1$$anonfun$applyOrElse$1.apply(AggregationIterator.scala:171)
at 
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1$$anonfun$applyOrElse$1.apply(AggregationIterator.scala:171)
at 
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateProcessRow

java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to Case class

2018-03-22 Thread Yong Zhang
I am trying to research a custom Aggregator implementation, and following the 
example in the Spark sample code here:

https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedTypedAggregation.scala


But I cannot use it in the agg function, and got the error like 
java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
to my case class. If I don't use the group by, then it works as in the same way 
in the sample code. To make it with group by, what I need to change?


This is on Spark 2.2, as shown below. Following the spark example, I can do

rawDS.select(ChangeLogAggregator.toColumn.name("change_log")).show(false)
without any issue, but if
rawDS.groupBy($"domain").agg(ChangeLogAggregator.toColumn.name("change_log")).show(false)

I will get the cast exception. But I want to apply my custom Aggregator 
implementation per group. How do I fix this?

Thanks


scala> spark.version
res31: String = 2.2.1

case class FlagChangeLog(date: String, old_flag: Boolean, new_flag: Boolean)
case class DeriveRecord (domain: String, date: String, flag: Boolean, isDelta: 
Boolean, flag_changelog: scala.collection.mutable.ListBuffer[FlagChangeLog])

val rawDS = Seq(
  DeriveRecord("abc.com", "2017-01-09", true, false, ListBuffer.empty),
  DeriveRecord("123.com", "2015-01-01", false, false, ListBuffer.empty),
  DeriveRecord("abc.com", "2018-01-09", false, true, ListBuffer.empty),
  DeriveRecord("123.com", "2017-01-09", true, true, ListBuffer.empty),
  DeriveRecord("xyz.com", "2018-03-09", false, true, ListBuffer.empty)
).toDS

scala> rawDS.show(false)
+---+--+-+---+--+
|domain |date  |flag |isDelta|flag_changelog|
+---+--+-+---+--+
|abc.com|2017-01-09|true |false  |[]|
|123.com|2015-01-01|false|false  |[]|
|abc.com|2018-01-09|false|true   |[]|
|123.com|2017-01-09|true |true   |[]|
|xyz.com|2018-03-09|false|true   |[]|
+---+--+-+---+--+

object ChangeLogAggregator extends Aggregator[DeriveRecord, DeriveRecord, 
DeriveRecord] {
  def zero: DeriveRecord = ///

  def reduce(buffer: DeriveRecord, curr: DeriveRecord): DeriveRecord = {
/// ommit
  }

  def merge(b1: DeriveRecord, b2: DeriveRecord): DeriveRecord = {
/// ommit
  }

  def finish(output: DeriveRecord): DeriveRecord = {
/// ommit
  }

  def bufferEncoder: Encoder[DeriveRecord] = Encoders.product
  def outputEncoder: Encoder[DeriveRecord] = Encoders.product
}

scala> rawDS.select(ChangeLogAggregator.toColumn.name("change_log")).show(false)
+---+--+-+---+---+
|domain |date  |flag |isDelta|flag_changelog
 |
+---+--+-+---+---+
|abc.com|2018-01-09|false|true   |[[2015-01-01,true,false], 
[2018-01-09,false,false]]|
+---+--+-+---+---+

scala> 
rawDS.groupBy($"domain").agg(ChangeLogAggregator.toColumn.name("change_log")).show(false)
18/03/22 22:04:44 ERROR Executor: Exception in task 1.0 in stage 36.0 (TID 48)
java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
to $line15.$read$$iw$$iw$DeriveRecord
at 
$line110.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$ChangeLogAggregator$.reduce(:31)
at 
org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression.update(TypedAggregateExpression.scala:239)
at 
org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.update(interfaces.scala:524)
at 
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1$$anonfun$applyOrElse$1.apply(AggregationIterator.scala:171)
at 
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1$$anonfun$applyOrElse$1.apply(AggregationIterator.scala:171)
at 
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateProcessRow$1.apply(AggregationIterator.scala:187)
at 
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateProcessRow$1.apply(AggregationIterator.scala:181)
at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:162)
at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:80)
at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:111)
at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:103)
at 

Re: CATALYST rule join

2018-02-27 Thread Yong Zhang
Not fully understand your question, but maybe you want check out this JIRA 
https://issues.apache.org/jira/browse/SPARK-17728, especially in the comments 
area. There are some discussion about the logic why UDF could be executed multi 
times by Spark.

Yong


From: tan shai 
Sent: Tuesday, February 27, 2018 4:19 AM
To: user@spark.apache.org
Subject: Re: CATALYST rule join

Hi,

I need to write a rule to customize the join function using Spark Catalyst 
optimizer. The objective to duplicate the second dataset using this process:

- Execute a udf on the column called x, this udf returns an array

- Execute an explode function on the new column

Using SQL terms, my objective is to execute this query on the second table :

SELECT EXPLODE(foo(x)) from table2

Where `foo` is is a udf that return an array of elements.

I have this rule:

case class JoinRule(spark: SparkSession) extends Rule[LogicalPlan] {

override def apply(plan: LogicalPlan): LogicalPlan = plan transform {

case join@Join(left, right, _, Some(condition)) =>

{

val attr = right.outputSet.find(x => x.toString().contains("x"))

val udf = ScalaUDF((x: Long) => x +: f(ipix), ArrayType(LongType), 
Seq(attr.last.toAttribute))

val explode = Explode(udf)

val resolvedGenerator = Generate(explode, true,false, qualifier = None, 
udf.references.toSeq, right)

var newRight = Project(resolvedGenerator.output,resolvedGenerator)

Join(left, newRight , Inner,Option(condition))

}
  }
}

But the problem is that the operation `Generate explode` appears many times in 
the physical plan.


Do you have any other ideas ? Maybe rewriting the code.

Thank you


2018-02-25 23:08 GMT+01:00 tan shai 
>:
Hi,

I need to write a rule to customize the join function using Spark Catalyst 
optimizer. The objective to duplicate the second dataset using this process:

- Execute a udf on the column called x, this udf returns an array

- Execute an explode function on the new column

Using SQL terms, my objective is to execute this query on the second table :

SELECT EXPLODE(foo(x)) from table2

Where `foo` is is a udf that return an array of elements.

I have this rule:

case class JoinRule(spark: SparkSession) extends Rule[LogicalPlan] {

override def apply(plan: LogicalPlan): LogicalPlan = plan transform {

case join@Join(left, right, _, Some(condition)) =>

{

val attr = right.outputSet.find(x => x.toString().contains("x"))

val udf = ScalaUDF((x: Long) => x +: f(ipix), ArrayType(LongType), 
Seq(attr.last.toAttribute))

val explode = Explode(udf)

val resolvedGenerator = Generate(explode, true,false, qualifier = None, 
udf.references.toSeq, right)

var newRight = Project(resolvedGenerator.output,resolvedGenerator)

Join(left, newRight , Inner,Option(condition))

}
  }
}

But the problem is that the operation `Generate explode` appears many times in 
the physical plan.


Do you have any other ideas ? Maybe rewriting the code.

Thank you.




Re: Parquet files from spark not readable in Cascading

2017-11-16 Thread Yong Zhang
I don't have experience with Cascading, but we saw similar issue for importing 
the data generated in Spark into Hive.


Did you try this setting "spark.sql.parquet.writeLegacyFormat" to true?


https://stackoverflow.com/questions/44279870/why-cant-impala-read-parquet-files-after-spark-sqls-write

[https://cdn.sstatic.net/Sites/stackoverflow/img/apple-touch-i...@2.png?v=73d79a89bded]

java - Why can't Impala read parquet files after Spark SQL 
...
stackoverflow.com
Having some issues with the way that Spark is interpreting columns for parquet. 
I have an Oracle source with confirmed schema (df.schema() method): root |-- 
LM_PERSON ...






From: Vikas Gandham 
Sent: Wednesday, November 15, 2017 2:30 PM
To: user@spark.apache.org
Subject: Parquet files from spark not readable in Cascading


Hi,



When I  tried reading parquet data that was generated by spark in cascading it 
throws following error







Caused by: 
org.apache.parquet.io.ParquetDecodingException: 
Can not read value at 0 in block -1 in file ""

at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)

at 
org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)

at 
org.apache.parquet.hadoop.mapred.DeprecatedParquetInputFormat$RecordReaderWrapper.(DeprecatedParquetInputFormat.java:103)

at 
org.apache.parquet.hadoop.mapred.DeprecatedParquetInputFormat.getRecordReader(DeprecatedParquetInputFormat.java:47)

at 
cascading.tap.hadoop.io.MultiInputFormat$1.operate(MultiInputFormat.java:253)

at 
cascading.tap.hadoop.io.MultiInputFormat$1.operate(MultiInputFormat.java:248)

at cascading.util.Util.retry(Util.java:1044)

at 
cascading.tap.hadoop.io.MultiInputFormat.getRecordReader(MultiInputFormat.java:247)

at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:394)

at org.apache.hadoop.mapred.MapTask.run(MapTask.java:332)

at 
org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:268)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.ArrayIndexOutOfBoundsException: -1

at java.util.ArrayList.elementData(ArrayList.java:418)

at java.util.ArrayList.get(ArrayList.java:431)

at 
org.apache.parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:98)

at 
org.apache.parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:98)

at 
org.apache.parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:83)

at 
org.apache.parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:77)

at 
org.apache.parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:293)

at 
org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:134)

at 
org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:99)

at 
org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:154)

at 
org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:99)

at 
org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)

at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:208)



This is mostly seen when parquet has nested structures.



I didnt find any solution to this.



I see some JIRA issues like this 
https://issues.apache.org/jira/browse/SPARK-10434 (parquet compatability 
/interoperabilityissues) where reading parquet files in Spark 1.4 where the 
files

[SPARK-10434] Parquet compatibility with 1.4 is broken 
...
issues.apache.org
This behavior is a hybrid of parquet-avro and parquet-hive: the 3-level 
structure and repeated group name "bag" are borrowed from parquet-hive, while 
the innermost ...



were generated by Spark 1.5 .This was fixed in later versions but was it fixed 
in Cascading?



Not sure if this is something to do with Parquet version or Cascading has a bug 
or Spark is doing something with Parquet files

which cascading is not accepting



Note : I am trying to read Parquet with avro schema 

Re: DataFrameReader read from S3 org.apache.spark.sql.AnalysisException: Path does not exist

2017-07-12 Thread Yong Zhang
Can't you just catch that exception and return an empty dataframe?


Yong



From: Sumona Routh 
Sent: Wednesday, July 12, 2017 4:36 PM
To: user
Subject: DataFrameReader read from S3 org.apache.spark.sql.AnalysisException: 
Path does not exist

Hi there,
I'm trying to read a list of paths from S3 into a dataframe for a window of 
time using the following:

sparkSession.read.parquet(listOfPaths:_*)

In some cases, the path may not be there because there is no data, which is an 
acceptable scenario.
However, Spark throws an AnalysisException: Path does not exist. Is there an 
option I can set to tell it to gracefully return an empty dataframe if a 
particular path is missing? Looking at the spark code, there is an option 
checkFilesExist, but I don't believe that is set in the particular flow of code 
that I'm accessing.

Thanks!
Sumona



Re: about broadcast join of base table in spark sql

2017-07-02 Thread Yong Zhang
Then you need to tell us the spark version, and post the execution plan here, 
so we can help you better.


Yong



From: Paley Louie <paley2...@gmail.com>
Sent: Sunday, July 2, 2017 12:36 AM
To: Yong Zhang
Cc: Bryan Jeffrey; d...@spark.org; user@spark.apache.org
Subject: Re: about broadcast join of base table in spark sql

Thank you for your reply, I have tried to add broadcast hint to the base table, 
but it just cannot be broadcast out.
On Jun 30, 2017, at 9:13 PM, Yong Zhang 
<java8...@hotmail.com<mailto:java8...@hotmail.com>> wrote:

Or since you already use the DataFrame API, instead of SQL, you can add the 
broadcast function to force it.

https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/functions.html#broadcast(org.apache.spark.sql.DataFrame)

Yong
functions - Apache 
Spark<https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/functions.html#broadcast(org.apache.spark.sql.DataFrame)>
spark.apache.org<http://spark.apache.org/>
Computes the numeric value of the first character of the string column, and 
returns the result as a int column.






From: Bryan Jeffrey <bryan.jeff...@gmail.com<mailto:bryan.jeff...@gmail.com>>
Sent: Friday, June 30, 2017 6:57 AM
To: d...@spark.org<mailto:d...@spark.org>; 
user@spark.apache.org<mailto:user@spark.apache.org>; paleyl
Subject: Re: about broadcast join of base table in spark sql

Hello.

If you want to allow broadcast join with larger broadcasts you can set 
spark.sql.autoBroadcastJoinThreshold to a higher value. This will cause the 
plan to allow join despite 'A' being larger than the default threshold.

Get Outlook for Android<https://aka.ms/ghei36>



From: paleyl
Sent: Wednesday, June 28, 10:42 PM
Subject: about broadcast join of base table in spark sql
To: d...@spark.org<mailto:d...@spark.org>, 
user@spark.apache.org<mailto:user@spark.apache.org>


Hi All,


Recently I meet a problem in broadcast join: I want to left join table A and B, 
A is the smaller one and the left table, so I wrote

A = A.join(B,A("key1") === B("key2"),"left")

but I found that A is not broadcast out, as the shuffle size is still very 
large.

I guess this is a designed mechanism in spark, so could anyone please tell me 
why it is designed like this? I am just very curious.


Best,


Paley



Re: about broadcast join of base table in spark sql

2017-06-30 Thread Yong Zhang
Or since you already use the DataFrame API, instead of SQL, you can add the 
broadcast function to force it.


https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/functions.html#broadcast(org.apache.spark.sql.DataFrame)


Yong

functions - Apache 
Spark
spark.apache.org
Computes the numeric value of the first character of the string column, and 
returns the result as a int column.






From: Bryan Jeffrey 
Sent: Friday, June 30, 2017 6:57 AM
To: d...@spark.org; user@spark.apache.org; paleyl
Subject: Re: about broadcast join of base table in spark sql

Hello.

If you want to allow broadcast join with larger broadcasts you can set 
spark.sql.autoBroadcastJoinThreshold to a higher value. This will cause the 
plan to allow join despite 'A' being larger than the default threshold.

Get Outlook for Android



From: paleyl
Sent: Wednesday, June 28, 10:42 PM
Subject: about broadcast join of base table in spark sql
To: d...@spark.org, user@spark.apache.org


Hi All,


Recently I meet a problem in broadcast join: I want to left join table A and B, 
A is the smaller one and the left table, so I wrote

A = A.join(B,A("key1") === B("key2"),"left")

but I found that A is not broadcast out, as the shuffle size is still very 
large.

I guess this is a designed mechanism in spark, so could anyone please tell me 
why it is designed like this? I am just very curious.


Best,


Paley





Re: SparkSQL to read XML Blob data to create multiple rows

2017-06-29 Thread Yong Zhang
scala>spark.version
res6: String = 2.1.1

scala>val rdd  = 
sc.parallelize(Seq("""Title1.1Description_1.1
Title1.2Description_1.2
Title1.3Description_1.3
"""))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize 
at :24

scala>import com.databricks.spark.xml.XmlReader

scala>val df = new XmlReader().xmlRdd(spark.sqlContext, rdd)
df: org.apache.spark.sql.DataFrame = [Comments: struct>>]

scala>df.printSchema
root
 |-- Comments: struct (nullable = true)
 ||-- Comment: array (nullable = true)
 |||-- element: struct (containsNull = true)
 ||||-- Description: string (nullable = true)
 ||||-- Title: string (nullable = true)

scala>df.show(false)
+--+
|Comments   
   |
+--+
|[WrappedArray([Description_1.1,Title1.1], [Description_1.2,Title1.2], 
[Description_1.3,Title1.3])]|
+--+


scala>df.withColumn("comment", 
explode(df("Comments.Comment"))).select($"comment.Description", 
$"comment.Title").show
+---++
|Description|   Title|
+---++
|Description_1.1|Title1.1|
|Description_1.2|Title1.2|
|Description_1.3|Title1.3|
+---++




From: Talap, Amol 
Sent: Thursday, June 29, 2017 9:38 AM
To: Judit Planas; user@spark.apache.org
Subject: RE: SparkSQL to read XML Blob data to create multiple rows


Thanks Judit, Ayan

Judit,

You almost got it. The explode might help here.

But when I tried I see load() doesn’t like to read from xmlcomment column on 
oracle_data.



scala> val xmlDF = sqlContext.sql("SELECT * FROM oracle_data")

17/06/29 18:31:58 INFO parse.ParseDriver: Parsing command: SELECT * FROM 
oracle_data

17/06/29 18:31:58 INFO parse.ParseDriver: Parse Completed

…

scala> val xmlDF_flattened = 
xmlDF.withColumn("xmlcomment",explode(sqlContext.read.format("com.databricks.spark.xml").option("rowTag","book").load($"xmlcomment")))

:22: error: overloaded method value load with alternatives:

  ()org.apache.spark.sql.DataFrame 

  (path: String)org.apache.spark.sql.DataFrame

cannot be applied to (org.apache.spark.sql.ColumnName)

   val xmlDF_flattened = 
xmlDF.withColumn("xmlcomment",explode(sqlContext.read.format("com.databricks.spark.xml").option("rowTag","book").load($"xmlcomment")))



Ayan,

Output of books_inexp.show was as below
title, author
Midnight Rain,Ralls, Kim
Maeve Ascendant,  Corets, Eva



Regards,

Amol

From: Judit Planas [mailto:judit.pla...@epfl.ch]
Sent: Thursday, June 29, 2017 3:46 AM
To: user@spark.apache.org
Subject: Re: SparkSQL to read XML Blob data to create multiple rows



Hi Amol,

Not sure I understand completely your question, but the SQL function "explode" 
may help you:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.explode

pyspark.sql module — PySpark 2.1.1 
documentation
spark.apache.org
pyspark.sql.SparkSession Main entry point for DataFrame and SQL functionality. 
pyspark.sql.DataFrame A distributed collection of data grouped into named 
columns.




Here you can find a nice example:
https://stackoverflow.com/questions/38210507/explode-in-pyspark
[https://cdn.sstatic.net/Sites/stackoverflow/img/apple-touch-i...@2.png?v=73d79a89bded]

python - Explode in PySpark - Stack 
Overflow
stackoverflow.com
I would like to transform from a DataFrame that contains lists of words into a 
DataFrame with each word in its own row. How do I do explode on a column in a 
DataFrame?




HTH,
Judit

On 29/06/17 09:05, ayan guha wrote:

Hi



Not sure if I follow your issue. Can you please post output of 
books_inexp.show()?



On Thu, Jun 29, 2017 at 2:30 PM, Talap, Amol 
> wrote:

Hi:



We are trying to parse XML data to get below output from given input sample.

Can someone suggest a way to pass one DFrames output into load() function or 
any other alternative to get this output.



Input Data from Oracle Table XMLBlob:

SequenceID


Name


City


XMLComment


1


Amol


Kolhapur


Title1.1Description_1.1Title1.2Description_1.2Title1.3Description_1.3


2


Suresh


Mumbai


Title2Description_2


3


Vishal


Delhi


Title3Description_3


4


Swastik


Bangalore


Title4Description_4




Output Data Expected using Spark SQL:

SequenceID


Name


City


Title


Description


1


Amol



Re: how to call udf with parameters

2017-06-18 Thread Yong Zhang
What version of spark you are using? I cannot reproduce your error:


scala> spark.version
res9: String = 2.1.1
scala> val dataset = Seq((0, "hello"), (1, "world")).toDF("id", "text")
dataset: org.apache.spark.sql.DataFrame = [id: int, text: string]
scala> import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions.udf

// define a method in similar way like you did
scala> def len = udf { (data: String) => data.length > 0 }
len: org.apache.spark.sql.expressions.UserDefinedFunction

// use it
scala> dataset.select(len($"text").as('length)).show
+--+
|length|
+--+
|  true|
|  true|
+--+


Yong




From: Pralabh Kumar 
Sent: Friday, June 16, 2017 12:19 AM
To: lk_spark
Cc: user.spark
Subject: Re: how to call udf with parameters

sample UDF
val getlength=udf((data:String)=>data.length())
data.select(getlength(data("col1")))

On Fri, Jun 16, 2017 at 9:21 AM, lk_spark 
> wrote:
hi,all
 I define a udf with multiple parameters  ,but I don't know how to call it 
with DataFrame

UDF:

def ssplit2 = udf { (sentence: String, delNum: Boolean, delEn: Boolean, 
minTermLen: Int) =>
val terms = HanLP.segment(sentence).asScala
.

Call :

scala> val output = input.select(ssplit2($"text",true,true,2).as('words))
:40: error: type mismatch;
 found   : Boolean(true)
 required: org.apache.spark.sql.Column
   val output = input.select(ssplit2($"text",true,true,2).as('words))
 ^
:40: error: type mismatch;
 found   : Boolean(true)
 required: org.apache.spark.sql.Column
   val output = input.select(ssplit2($"text",true,true,2).as('words))
  ^
:40: error: type mismatch;
 found   : Int(2)
 required: org.apache.spark.sql.Column
   val output = input.select(ssplit2($"text",true,true,2).as('words))
   ^

scala> val output = 
input.select(ssplit2($"text",$"true",$"true",$"2").as('words))
org.apache.spark.sql.AnalysisException: cannot resolve '`true`' given input 
columns: [id, text];;
'Project [UDF(text#6, 'true, 'true, '2) AS words#16]
+- Project [_1#2 AS id#5, _2#3 AS text#6]
   +- LocalRelation [_1#2, _2#3]

I need help!!


2017-06-16

lk_spark



Re: [Spark Sql/ UDFs] Spark and Hive UDFs parity

2017-06-18 Thread Yong Zhang
I assume you use Scala to implement your UDFs.


In this case, Scala language itself provides some options already for you.


If you want to control more logic when UDFs init, you can define a Scala 
object, def your UDF as part of it, then the object in Scala will behavior like 
Singleton pattern for you.


So the Sacala object's constructor logic can be treated as init/configure 
contract as in Hive. They will be called once per JVM, to init your Scala 
object. That should meet your requirement.


The only trick part is the context reference for configure() method, which 
allow you to pass some configuration dynamic to your UDF for runtime. Since 
object in Scala has to fix at compile time, so you cannot pass any parameters 
to the construct of it. But there is nothing stopping you building Scala 
class/companion object to allow any parameter passed in at constructor/init 
time, which can control your UDF's behavior.


If you have a concrete example that you cannot do in Spark Scala UDF, you can 
post here.


Yong



From: RD 
Sent: Friday, June 16, 2017 11:37 AM
To: Georg Heiler
Cc: user@spark.apache.org
Subject: Re: [Spark Sql/ UDFs] Spark and Hive UDFs parity

Thanks Georg. But I'm not sure how mapPartitions is relevant here.  Can you 
elaborate?



On Thu, Jun 15, 2017 at 4:18 AM, Georg Heiler 
> wrote:
What about using map partitions instead?

RD > schrieb am Do. 15. Juni 2017 
um 06:52:
Hi Spark folks,

Is there any plan to support the richer UDF API that Hive supports for 
Spark UDFs ? Hive supports the GenericUDF API which has, among others methods 
like initialize(), configure() (called once on the cluster) etc, which a lot of 
our users use. We have now a lot of UDFs in Hive which make use of these 
methods. We plan to move to UDFs to Spark UDFs but are being limited by not 
having similar lifecycle methods.
   Are there plans to address these? Or do people usually adopt some sort of 
workaround?

   If we  directly use  the Hive UDFs  in Spark we pay a performance penalty. I 
think Spark anyways does a conversion from InternalRow to Row back to 
InternalRow for native spark udfs and for Hive it does InternalRow to Hive 
Object back to InternalRow but somehow the conversion in native udfs is more 
performant.

-Best,
R.



Re: Parquet file generated by Spark, but not compatible read by Hive

2017-06-13 Thread Yong Zhang
The issue is cased by the data, and indeed a type miss match between Hive 
schema and Spark. Now it is fixed.


Without that kind of data, the problem won't be trigged in some brands.


Thanks taking a look of this problem.


Yong



From: ayan guha <guha.a...@gmail.com>
Sent: Tuesday, June 13, 2017 1:54 AM
To: Angel Francisco Orta
Cc: Yong Zhang; user@spark.apache.org
Subject: Re: Parquet file generated by Spark, but not compatible read by Hive

Try setting following Param:

conf.set("spark.sql.hive.convertMetastoreParquet","false")

On Tue, Jun 13, 2017 at 3:34 PM, Angel Francisco Orta 
<angel.francisco.o...@gmail.com<mailto:angel.francisco.o...@gmail.com>> wrote:
Hello,

Do you use df.write or you make with hivecontext.sql(" insert into ...")?

Angel.

El 12 jun. 2017 11:07 p. m., "Yong Zhang" 
<java8...@hotmail.com<mailto:java8...@hotmail.com>> escribió:

We are using Spark 1.6.2 as ETL to generate parquet file for one dataset, and 
partitioned by "brand" (which is a string to represent brand in this dataset).


After the partition files generated in HDFS like "brand=a" folder, we add the 
partitions in the Hive.


The hive version is 1.2.1 (In fact, we are using HDP 2.5.0).


Now the problem is that for 2 brand partitions, we cannot query the data 
generated in Spark, but it works fine for the rest of partitions.


Below is the error in the Hive CLI and hive.log I got if I query the bad 
partitions like "select * from  tablename where brand='BrandA' limit 3;"


Failed with exception 
java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: 
java.lang.UnsupportedOperationException: Cannot inspect 
org.apache.hadoop.io<http://org.apache.hadoop.io>.LongWritable


Caused by: java.lang.UnsupportedOperationException: Cannot inspect 
org.apache.hadoop.io<http://org.apache.hadoop.io>.LongWritable
at 
org.apache.hadoop.hive.ql.io<http://org.apache.hadoop.hive.ql.io>.parquet.serde.primitive.ParquetStringInspector.getPrimitiveWritableObject(ParquetStringInspector.java:52)
at 
org.apache.hadoop.hive.serde2.lazy.LazyUtils.writePrimitiveUTF8(LazyUtils.java:222)
at 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.serialize(LazySimpleSerDe.java:307)
at 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.serializeField(LazySimpleSerDe.java:262)
at 
org.apache.hadoop.hive.serde2.DelimitedJSONSerDe.serializeField(DelimitedJSONSerDe.java:72)
at 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.doSerialize(LazySimpleSerDe.java:246)
at 
org.apache.hadoop.hive.serde2.AbstractEncodingAwareSerDe.serialize(AbstractEncodingAwareSerDe.java:50)
at 
org.apache.hadoop.hive.ql.exec.DefaultFetchFormatter.convert(DefaultFetchFormatter.java:71)
at 
org.apache.hadoop.hive.ql.exec.DefaultFetchFormatter.convert(DefaultFetchFormatter.java:40)
at 
org.apache.hadoop.hive.ql.exec.ListSinkOperator.process(ListSinkOperator.java:90)
... 22 more

There are not too much I can find by googling this error message, but it points 
to that the schema in Hive is different as in parquet file.
But this is a very strange case, as the same schema works fine for other 
brands, which defined as a partition column, and share the whole Hive schema as 
the above.

If I query like: "select * from tablename where brand='BrandB' limit 3:", 
everything works fine.

So is this really caused by the Hive schema mismatch with parquet file 
generated by Spark, or by the data within different partitioned keys, or really 
a compatible issue between Spark/Hive?

Thanks

Yong





--
Best Regards,
Ayan Guha


Parquet file generated by Spark, but not compatible read by Hive

2017-06-12 Thread Yong Zhang
We are using Spark 1.6.2 as ETL to generate parquet file for one dataset, and 
partitioned by "brand" (which is a string to represent brand in this dataset).


After the partition files generated in HDFS like "brand=a" folder, we add the 
partitions in the Hive.


The hive version is 1.2.1 (In fact, we are using HDP 2.5.0).


Now the problem is that for 2 brand partitions, we cannot query the data 
generated in Spark, but it works fine for the rest of partitions.


Below is the error in the Hive CLI and hive.log I got if I query the bad 
partitions like "select * from  tablename where brand='BrandA' limit 3;"


Failed with exception 
java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: 
java.lang.UnsupportedOperationException: Cannot inspect 
org.apache.hadoop.io.LongWritable


Caused by: java.lang.UnsupportedOperationException: Cannot inspect 
org.apache.hadoop.io.LongWritable
at 
org.apache.hadoop.hive.ql.io.parquet.serde.primitive.ParquetStringInspector.getPrimitiveWritableObject(ParquetStringInspector.java:52)
at 
org.apache.hadoop.hive.serde2.lazy.LazyUtils.writePrimitiveUTF8(LazyUtils.java:222)
at 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.serialize(LazySimpleSerDe.java:307)
at 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.serializeField(LazySimpleSerDe.java:262)
at 
org.apache.hadoop.hive.serde2.DelimitedJSONSerDe.serializeField(DelimitedJSONSerDe.java:72)
at 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.doSerialize(LazySimpleSerDe.java:246)
at 
org.apache.hadoop.hive.serde2.AbstractEncodingAwareSerDe.serialize(AbstractEncodingAwareSerDe.java:50)
at 
org.apache.hadoop.hive.ql.exec.DefaultFetchFormatter.convert(DefaultFetchFormatter.java:71)
at 
org.apache.hadoop.hive.ql.exec.DefaultFetchFormatter.convert(DefaultFetchFormatter.java:40)
at 
org.apache.hadoop.hive.ql.exec.ListSinkOperator.process(ListSinkOperator.java:90)
... 22 more

There are not too much I can find by googling this error message, but it points 
to that the schema in Hive is different as in parquet file.
But this is a very strange case, as the same schema works fine for other 
brands, which defined as a partition column, and share the whole Hive schema as 
the above.

If I query like: "select * from tablename where brand='BrandB' limit 3:", 
everything works fine.

So is this really caused by the Hive schema mismatch with parquet file 
generated by Spark, or by the data within different partitioned keys, or really 
a compatible issue between Spark/Hive?

Thanks

Yong




Re: Why spark.sql.autoBroadcastJoinThreshold not available

2017-05-15 Thread Yong Zhang
You should post the execution plan here, so we can provide more accurate 
support.


Since in your feature table, you are building it with projection ("where 
"), so my guess is that the following JIRA 
(SPARK-13383) stops the 
broadcast join. This is fixed in the Spark 2.x. Can you try it on Spark 2.0?

Yong


From: Jone Zhang 
Sent: Wednesday, May 10, 2017 7:10 AM
To: user @spark/'user @spark'/spark users/user@spark
Subject: Why spark.sql.autoBroadcastJoinThreshold not available

Now i use spark1.6.0 in java
I wish the following sql to be executed in BroadcastJoin way
select * from sample join feature

This is my step
1.set spark.sql.autoBroadcastJoinThreshold=100M
2.HiveContext.sql("cache lazy table feature as "select * from src where ...) 
which result size is only 100K
3.HiveContext.sql("select * from sample join feature")
Why the join is SortMergeJoin?

Grateful for any idea!
Thanks.


Re: Reading ASN.1 files in Spark

2017-04-06 Thread Yong Zhang
Spark can read any file, as long as you can provide it the Hadoop InputFormat 
implementation.


Did you try this guy's example?


http://awcoleman.blogspot.com/2014/07/processing-asn1-call-detail-records.html

[http://lh6.googleusercontent.com/-Yrre7Enx3TI/AAI/Abo/QNJEjH6MX0o/s80-c/photo.jpg]

Processing ASN.1 Call Detail Records with Hadoop (using 
...
awcoleman.blogspot.com
Processing ASN.1 Call Detail Records with Hadoop (using Bouncy Castle) Part 3



Yong




From: vincent gromakowski 
Sent: Thursday, April 6, 2017 5:24 AM
To: Hamza HACHANI
Cc: user@spark.apache.org
Subject: Re: Reading ASN.1 files in Spark

I would also be interested...

2017-04-06 11:09 GMT+02:00 Hamza HACHANI 
>:
Does any body have a spark code example where he is reading ASN.1 files ?
Thx

Best regards
Hamza



Re: Need help for RDD/DF transformation.

2017-03-30 Thread Yong Zhang
Unfortunately, I don't think there is any optimized way to do this. Maybe 
someone else can correct me, but in theory, there is no way other than a 
cartesian product of your 2 sides if you can not change the data.


Think about it, if you want to join between 2 different types (Array and Int in 
your case), Spark cannot do HashJoin, nor SortMergeJoin. In the RelationDB 
world, you have to do a NestedLoop, which is cartesian join in BigData world. 
After you create a cartesian product of both, then check if Array column 
contains the other column.


I saw a similar question answered in 
stackoverflow<http://stackoverflow.com/questions/41595099/spark-join-dataframe-column-with-an-array>,
 but the answer is NOT correct for serious case.

Assume the key never contains any duplicate strings:


scala> sc.version
res1: String = 1.6.3

scala> val df1 = Seq((1, "a"), (3, "a"), (5, "b")).toDF("key", "value")
df1: org.apache.spark.sql.DataFrame = [key: int, value: string]

scala> val df2 = Seq((Array(1,2,3), "other1"),(Array(4,5), 
"other2")).toDF("keys", "other")
df2: org.apache.spark.sql.DataFrame = [keys: array, other: string]

scala> df1.show
+---+-+
|key|value|
+---+-+
|  1|a|
|  3|a|
|  5|b|
+---+-+


scala> df2.show
+-+--+
| keys| other|
+-+--+
|[1, 2, 3]|other1|
|   [4, 5]|other2|
+-+--+


scala> df1.join(df2, 
df2("keys").cast("string").contains(df1("key").cast("string"))).show
+---+-+-+--+
|key|value| keys| other|
+---+-+-+--+
|  1|a|[1, 2, 3]|other1|
|  3|a|[1, 2, 3]|other1|
|  5|b|   [4, 5]|other2|
+---+-+-+--+

This code looks like working in Spark 1.6.x, but in fact, it has serious issue. 
As it assumes that the key will never have any conflict string in it, but this 
won't be true for any serious data.
So [11,2,3] contains "1" will return true, but is broken in your logic.

And it won't work in Spark 2.x, as the cast("string") logic changes for Array 
in Spark 2.x. The idea behind whole thing is to transfer your Array field to a 
String type, and use contains method to check if it contains another field (In 
String type too). But this is impossible to match the Array.contains(element) 
logic in most cases.

You need to know your data, then try to see if you can find any optimized way 
to avoid cartesian product. For example, maybe make sure "key" in DF1, always 
guarantee presenting the first element of the Array in a logic order, so you 
can just pick the first element out from the Array "keys" of DF2, to join. 
Otherwise, I don't see any way to avoid a cartesian join.

Yong


From: Mungeol Heo <mungeol@gmail.com>
Sent: Thursday, March 30, 2017 3:05 AM
To: ayan guha
Cc: Yong Zhang; user@spark.apache.org
Subject: Re: Need help for RDD/DF transformation.

Hello ayan,

Same key will not exists in different lists.
Which means, If "1" exists in a list, then it will not be presented in
another list.

Thank you.

On Thu, Mar 30, 2017 at 3:56 PM, ayan guha <guha.a...@gmail.com> wrote:
> Is it possible for one key in 2 groups in rdd2?
>
> [1,2,3]
> [1,4,5]
>
> ?
>
> On Thu, 30 Mar 2017 at 12:23 pm, Mungeol Heo <mungeol@gmail.com> wrote:
>>
>> Hello Yong,
>>
>> First of all, thank your attention.
>> Note that the values of elements, which have values at RDD/DF1, in the
>> same list will be always same.
>> Therefore, the "1" and "3", which from RDD/DF 1, will always have the
>> same value which is "a".
>>
>> The goal here is assigning same value to elements of the list which
>> does not exist in RDD/DF 1.
>> So, all the elements in the same list can have same value.
>>
>> Or, the final RDD/DF also can be like this,
>>
>> [1, 2, 3], a
>> [4, 5], b
>>
>> Thank you again.
>>
>> - Mungeol
>>
>>
>> On Wed, Mar 29, 2017 at 9:03 PM, Yong Zhang <java8...@hotmail.com> wrote:
>> > What is the desired result for
>> >
>> >
>> > RDD/DF 1
>> >
>> > 1, a
>> > 3, c
>> > 5, b
>> >
>> > RDD/DF 2
>> >
>> > [1, 2, 3]
>> > [4, 5]
>> >
>> >
>> > Yong
>> >
>> > 
>> > From: Mungeol Heo <mungeol@gmail.com>
>> > Sent: Wednesday, March 29, 2017 5:37 AM
>> > To: user@spark.apache.org
>> > Subject: Need help for RDD/DF transformation.
>> >
>> > Hello,
>> >
>> > Suppose, I have two R

Re: Spark SQL, dataframe join questions.

2017-03-29 Thread Yong Zhang
You don't need to repartition your data just for join purpose. But if the 
either parties of join is already partitioned, Spark will use this advantage as 
part of join optimization.

Should you reduceByKey before the join really depend on your join logic. 
ReduceByKey will shuffle, and following join COULD cause another shuffle. So I 
am not sure if it is a smart way.

Yong


From: shyla deshpande 
Sent: Wednesday, March 29, 2017 12:33 PM
To: user
Subject: Re: Spark SQL, dataframe join questions.



On Tue, Mar 28, 2017 at 2:57 PM, shyla deshpande 
> wrote:

Following are my questions. Thank you.

1. When joining dataframes is it a good idea to repartition on the key column 
that is used in the join or
the optimizer is too smart so forget it.

2. In RDD join, wherever possible we do reduceByKey before the join to avoid a 
big shuffle of data. Do we need
to do anything similar with dataframe joins, or the optimizer is too smart so 
forget it.



Re: Secondary Sort using Apache Spark 1.6

2017-03-29 Thread Yong Zhang
The error message indeed is not very clear.


What you did wrong is that the repartitionAndSortWithinPartitions not only 
requires PairRDD, but also OrderedRDD. Your case class as key is NOT Ordered.


Either you extends it from Ordered, or provide a companion object to do the 
implicit Ordering.


scala> spark.version
res1: String = 2.1.0

scala> case class DeviceKey(serialNum: String, eventDate: String, EventTs: Long)
extends Ordered[DeviceKey] {
 |   import scala.math.Ordered.orderingToOrdered
 |   def compare(that: DeviceKey): Int =
 |  (this.serialNum, this.eventDate, this.EventTs * -1) compare
 |  (that.serialNum, that.eventDate, that.EventTs * -1)
 | }
defined class DeviceKey

scala>

scala> val t = sc.parallelize(List(((DeviceKey("2","100",1),1)),
(DeviceKey("2","100",3),1)), 1)
t: org.apache.spark.rdd.RDD[(DeviceKey, Int)] = ParallelCollectionRDD[0] at 
parallelize at :26

scala>

scala> class DeviceKeyPartitioner(partitions: Int) extends 
org.apache.spark.Partitioner {
 | require(partitions >= 0, s"Number of partitions ($partitions) cannot 
be negative.")
 |
 | override def numPartitions: Int = partitions
 |
 | override def getPartition(key: Any): Int = {
 |   val k = key.asInstanceOf[DeviceKey]
 |   k.serialNum.hashCode() % numPartitions
 | }
 | }
defined class DeviceKeyPartitioner

scala>

scala> t.repartitionAndSortWithinPartitions(new DeviceKeyPartitioner(2))
res0: org.apache.spark.rdd.RDD[(DeviceKey, Int)] = ShuffledRDD[1] at 
repartitionAndSortWithinPartitions at :30


Yong



From: Pariksheet Barapatre 
Sent: Wednesday, March 29, 2017 9:02 AM
To: user
Subject: Secondary Sort using Apache Spark 1.6

Hi,

I am referring web link http://codingjunkie.net/spark-secondary-sort/ to 
implement secondary sort in my spark job.

I have defined my key case class as

case class DeviceKey(serialNum: String, eventDate: String, EventTs: Long) {
  implicit def orderingBySerialNum[A <: DeviceKey] : Ordering[A] = {
   Ordering.by(fk => (fk.serialNum, fk.eventDate, fk.EventTs * -1))
}
}


but when I try to apply function
t.repartitionAndSortWithinPartitions(partitioner)


#t is a RDD[(DeviceKey, Int)]


I get error
I am getting error as -
value repartitionAndSortWithinPartitions is not a member of 
org.apache.spark.rdd.RDD[(DeviceKey, Int)]



Example code available at
http://stackoverflow.com/questions/43038682/secondary-sort-using-apache-spark-1-6


Could somebody help me to understand error.


Many Thanks

Pari

--
Cheers,
Pari


Re: Need help for RDD/DF transformation.

2017-03-29 Thread Yong Zhang
What is the desired result for


RDD/DF 1

1, a
3, c
5, b

RDD/DF 2

[1, 2, 3]
[4, 5]

Yong


From: Mungeol Heo 
Sent: Wednesday, March 29, 2017 5:37 AM
To: user@spark.apache.org
Subject: Need help for RDD/DF transformation.

Hello,

Suppose, I have two RDD or data frame like addressed below.

RDD/DF 1

1, a
3, a
5, b

RDD/DF 2

[1, 2, 3]
[4, 5]

I need to create a new RDD/DF like below from RDD/DF 1 and 2.

1, a
2, a
3, a
4, b
5, b

Is there an efficient way to do this?
Any help will be great.

Thank you.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: how to read object field within json file

2017-03-24 Thread Yong Zhang
I missed the part to pass in a schema to force the "struct" to a Map, then use 
explode. Good option.


Yong



From: Michael Armbrust <mich...@databricks.com>
Sent: Friday, March 24, 2017 3:02 PM
To: Yong Zhang
Cc: Selvam Raman; user
Subject: Re: how to read object field within json file

I'm not sure you can parse this as an Array, but you can hint to the parser 
that you would like to treat source as a map instead of as a struct.  This is a 
good strategy when you have dynamic columns in your data.

Here is an example of the schema you can use to parse this JSON and also how to 
use explode to turn it into separate 
rows<https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/679071429109042/2840265927289860/latest.html>.
  This blog post has more on working with semi-structured data in 
Spark<https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html>.

On Thu, Mar 23, 2017 at 2:49 PM, Yong Zhang 
<java8...@hotmail.com<mailto:java8...@hotmail.com>> wrote:

That's why your "source" should be defined as an Array[Struct] type (which 
makes sense in this case, it has an undetermined length  , so you can explode 
it and get the description easily.

Now you need write your own UDF, maybe can do what you want.

Yong


From: Selvam Raman <sel...@gmail.com<mailto:sel...@gmail.com>>
Sent: Thursday, March 23, 2017 5:03 PM
To: user
Subject: how to read object field within json file

Hi,

{
"id": "test1",
"source": {
"F1": {
  "id": "4970",
  "eId": "F1",
  "description": "test1",
},
"F2": {
  "id": "5070",
  "eId": "F2",
  "description": "test2",
},
"F3": {
  "id": "5170",
  "eId": "F3",
  "description": "test3",
},
"F4":{}
  etc..
  "F999":{}
}

I am having bzip json files like above format.
some json row contains two objects within source(like F1 and F2), sometime 
five(F1,F2,F3,F4,F5),etc. So the final schema will contains combination of all 
objects for the source field.

Now, every row will contain n number of objects but only some contains valid 
records.
how can i retreive the value of "description" in "source" field.

source.F1.description - returns the result but how can i get all description 
result for every row..(something like this "source.*.description").

--
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"



Re: [Worker Crashing] OutOfMemoryError: GC overhead limit execeeded

2017-03-24 Thread Yong Zhang
Not sure if anyone else here can help you. But if I were you, I will adjust 
SPARK_DAEMON_MEMORY to 2g, to bump the worker to 2G. Even though the worker's 
responsibility is very limited, but in today's world, who knows. Give 2g a try 
to see if the problem goes away.


BTW, in our production, I set the worker to 2g, and never experienced any OOM 
from workers. Our cluster is live for more than 1 year, and we also use Spark 
1.6.2 on production.


Yong



From: Behroz Sikander <behro...@gmail.com>
Sent: Friday, March 24, 2017 9:29 AM
To: Yong Zhang
Cc: user@spark.apache.org
Subject: Re: [Worker Crashing] OutOfMemoryError: GC overhead limit execeeded

Yea we also didn't find anything related to this online.

Are you aware of any memory leaks in worker in 1.6.2 spark which might be 
causing this ?
Do you know of any documentation which explains all the tasks that a worker is 
performing ? Maybe we can get some clue from there.

Regards,
Behroz

On Fri, Mar 24, 2017 at 2:21 PM, Yong Zhang 
<java8...@hotmail.com<mailto:java8...@hotmail.com>> wrote:

I never experienced worker OOM or very rarely see this online. So my guess that 
you have to generate the heap dump file to analyze it.


Yong



From: Behroz Sikander <behro...@gmail.com<mailto:behro...@gmail.com>>
Sent: Friday, March 24, 2017 9:15 AM
To: Yong Zhang
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: [Worker Crashing] OutOfMemoryError: GC overhead limit execeeded

Thank you for the response.

Yes, I am sure because the driver was working fine. Only 2 workers went down 
with OOM.

Regards,
Behroz

On Fri, Mar 24, 2017 at 2:12 PM, Yong Zhang 
<java8...@hotmail.com<mailto:java8...@hotmail.com>> wrote:

I am not 100% sure, but normally "dispatcher-event-loop" OOM means the driver 
OOM. Are you sure your workers OOM?


Yong



From: bsikander <behro...@gmail.com<mailto:behro...@gmail.com>>
Sent: Friday, March 24, 2017 5:48 AM
To: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: [Worker Crashing] OutOfMemoryError: GC overhead limit execeeded

Spark version: 1.6.2
Hadoop: 2.6.0

Cluster:
All VMS are deployed on AWS.
1 Master (t2.large)
1 Secondary Master (t2.large)
5 Workers (m4.xlarge)
Zookeeper (t2.large)

Recently, 2 of our workers went down with out of memory exception.
java.lang.OutOfMemoryError: GC overhead limit exceeded (max heap: 1024 MB)

Both of these worker processes were in hanged state. We restarted them to
bring them back to normal state.

Here is the complete exception
https://gist.github.com/bsikander/84f1a0f3cc831c7a120225a71e435d91
[https://avatars1.githubusercontent.com/u/4642104?v=3=400]<https://gist.github.com/bsikander/84f1a0f3cc831c7a120225a71e435d91>

Worker 
crashing<https://gist.github.com/bsikander/84f1a0f3cc831c7a120225a71e435d91>
gist.github.com<http://gist.github.com>
Worker crashing




Master's spark-default.conf file:
https://gist.github.com/bsikander/4027136f6a6c91eabad576495c4d797d
[https://avatars1.githubusercontent.com/u/4642104?v=3=400]<https://gist.github.com/bsikander/4027136f6a6c91eabad576495c4d797d>

Default Configuration file for 
MASTER<https://gist.github.com/bsikander/4027136f6a6c91eabad576495c4d797d>
gist.github.com<http://gist.github.com>
Default Configuration file for MASTER




Master's spark-env.sh
https://gist.github.com/bsikander/42f76d7a8e4079098d8a2df3cdee8ee0

Slave's spark-default.conf file:
https://gist.github.com/bsikander/54264349b49e6227c6912eb14d344b8c

So, what could be the reason of our workers crashing due to OutOfMemory ?
How can we avoid that in future.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Worker-Crashing-OutOfMemoryError-GC-overhead-limit-execeeded-tp28535.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>





Re: [Worker Crashing] OutOfMemoryError: GC overhead limit execeeded

2017-03-24 Thread Yong Zhang
I never experienced worker OOM or very rarely see this online. So my guess that 
you have to generate the heap dump file to analyze it.


Yong



From: Behroz Sikander <behro...@gmail.com>
Sent: Friday, March 24, 2017 9:15 AM
To: Yong Zhang
Cc: user@spark.apache.org
Subject: Re: [Worker Crashing] OutOfMemoryError: GC overhead limit execeeded

Thank you for the response.

Yes, I am sure because the driver was working fine. Only 2 workers went down 
with OOM.

Regards,
Behroz

On Fri, Mar 24, 2017 at 2:12 PM, Yong Zhang 
<java8...@hotmail.com<mailto:java8...@hotmail.com>> wrote:

I am not 100% sure, but normally "dispatcher-event-loop" OOM means the driver 
OOM. Are you sure your workers OOM?


Yong



From: bsikander <behro...@gmail.com<mailto:behro...@gmail.com>>
Sent: Friday, March 24, 2017 5:48 AM
To: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: [Worker Crashing] OutOfMemoryError: GC overhead limit execeeded

Spark version: 1.6.2
Hadoop: 2.6.0

Cluster:
All VMS are deployed on AWS.
1 Master (t2.large)
1 Secondary Master (t2.large)
5 Workers (m4.xlarge)
Zookeeper (t2.large)

Recently, 2 of our workers went down with out of memory exception.
java.lang.OutOfMemoryError: GC overhead limit exceeded (max heap: 1024 MB)

Both of these worker processes were in hanged state. We restarted them to
bring them back to normal state.

Here is the complete exception
https://gist.github.com/bsikander/84f1a0f3cc831c7a120225a71e435d91
[https://avatars1.githubusercontent.com/u/4642104?v=3=400]<https://gist.github.com/bsikander/84f1a0f3cc831c7a120225a71e435d91>

Worker 
crashing<https://gist.github.com/bsikander/84f1a0f3cc831c7a120225a71e435d91>
gist.github.com<http://gist.github.com>
Worker crashing




Master's spark-default.conf file:
https://gist.github.com/bsikander/4027136f6a6c91eabad576495c4d797d
[https://avatars1.githubusercontent.com/u/4642104?v=3=400]<https://gist.github.com/bsikander/4027136f6a6c91eabad576495c4d797d>

Default Configuration file for 
MASTER<https://gist.github.com/bsikander/4027136f6a6c91eabad576495c4d797d>
gist.github.com<http://gist.github.com>
Default Configuration file for MASTER




Master's spark-env.sh
https://gist.github.com/bsikander/42f76d7a8e4079098d8a2df3cdee8ee0

Slave's spark-default.conf file:
https://gist.github.com/bsikander/54264349b49e6227c6912eb14d344b8c

So, what could be the reason of our workers crashing due to OutOfMemory ?
How can we avoid that in future.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Worker-Crashing-OutOfMemoryError-GC-overhead-limit-execeeded-tp28535.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>




Re: spark-submit config via file

2017-03-24 Thread Yong Zhang
Of course it is possible.


You can always to set any configurations in your application using API, instead 
of pass in through the CLI.


val sparkConf = new 
SparkConf().setAppName(properties.get("appName")).set("master", 
properties.get("master")).set(xxx, properties.get("xxx"))

Your error is your environment problem.

Yong

From: , Roy 
Sent: Friday, March 24, 2017 7:38 AM
To: user
Subject: spark-submit config via file

Hi,

I am trying to deploy spark job by using spark-submit which has bunch of 
parameters like

spark-submit --class StreamingEventWriterDriver --master yarn --deploy-mode 
cluster --executor-memory 3072m --executor-cores 4 --files streaming.conf 
spark_streaming_2.11-assembly-1.0-SNAPSHOT.jar -conf "streaming.conf"

I was looking a way to put all these flags in the file to pass to spark-submit 
to make my spark-submitcommand simple like this

spark-submit --class StreamingEventWriterDriver --master yarn --deploy-mode 
cluster --properties-file properties.conf --files streaming.conf 
spark_streaming_2.11-assembly-1.0-SNAPSHOT.jar -conf "streaming.conf"

properties.conf has following contents


spark.executor.memory 3072m

spark.executor.cores 4


But I am getting following error


17/03/24 11:36:26 INFO Client: Use hdfs cache file as spark.yarn.archive for 
HDP, 
hdfsCacheFile:hdfs:///hdp/apps/2.6.0.0-403/spark2/spark2-hdp-yarn-archive.tar.gz

17/03/24 11:36:26 WARN AzureFileSystemThreadPoolExecutor: Disabling threads for 
Delete operation as thread count 0 is <= 1

17/03/24 11:36:26 INFO AzureFileSystemThreadPoolExecutor: Time taken for Delete 
operation is: 1 ms with threads: 0

17/03/24 11:36:27 INFO Client: Deleted staging directory 
wasb://a...@abc.blob.core.windows.net/user/sshuser/.sparkStaging/application_1488402758319_0492

Exception in thread "main" java.io.IOException: Incomplete HDFS URI, no host: 
hdfs:///hdp/apps/2.6.0.0-403/spark2/spark2-hdp-yarn-archive.tar.gz

at 
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:154)

at 
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2791)

at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:99)

at 
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2825)

at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2807)

at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:386)

at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)

at 
org.apache.spark.deploy.yarn.Client.copyFileToRemote(Client.scala:364)

at 
org.apache.spark.deploy.yarn.Client.org$apache$spark$deploy$yarn$Client$$distribute$1(Client.scala:480)

at 
org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:552)

at 
org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:881)

at 
org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:170)

at org.apache.spark.deploy.yarn.Client.run(Client.scala:1218)

at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1277)

at org.apache.spark.deploy.yarn.Client.main(Client.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:745)

at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)

at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)

at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)

at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

17/03/24 11:36:27 INFO MetricsSystemImpl: Stopping azure-file-system metrics 
system...

Anyone know is this is even possible ?


Thanks...

Roy


Re: [Worker Crashing] OutOfMemoryError: GC overhead limit execeeded

2017-03-24 Thread Yong Zhang
I am not 100% sure, but normally "dispatcher-event-loop" OOM means the driver 
OOM. Are you sure your workers OOM?


Yong



From: bsikander 
Sent: Friday, March 24, 2017 5:48 AM
To: user@spark.apache.org
Subject: [Worker Crashing] OutOfMemoryError: GC overhead limit execeeded

Spark version: 1.6.2
Hadoop: 2.6.0

Cluster:
All VMS are deployed on AWS.
1 Master (t2.large)
1 Secondary Master (t2.large)
5 Workers (m4.xlarge)
Zookeeper (t2.large)

Recently, 2 of our workers went down with out of memory exception.
java.lang.OutOfMemoryError: GC overhead limit exceeded (max heap: 1024 MB)

Both of these worker processes were in hanged state. We restarted them to
bring them back to normal state.

Here is the complete exception
https://gist.github.com/bsikander/84f1a0f3cc831c7a120225a71e435d91
[https://avatars1.githubusercontent.com/u/4642104?v=3=400]

Worker 
crashing
gist.github.com
Worker crashing




Master's spark-default.conf file:
https://gist.github.com/bsikander/4027136f6a6c91eabad576495c4d797d
[https://avatars1.githubusercontent.com/u/4642104?v=3=400]

Default Configuration file for 
MASTER
gist.github.com
Default Configuration file for MASTER




Master's spark-env.sh
https://gist.github.com/bsikander/42f76d7a8e4079098d8a2df3cdee8ee0

Slave's spark-default.conf file:
https://gist.github.com/bsikander/54264349b49e6227c6912eb14d344b8c

So, what could be the reason of our workers crashing due to OutOfMemory ?
How can we avoid that in future.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Worker-Crashing-OutOfMemoryError-GC-overhead-limit-execeeded-tp28535.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark dataframe, UserDefinedAggregateFunction(UDAF) help!!

2017-03-23 Thread Yong Zhang
Change:

val arrayinput = input.getAs[Array[String]](0)

to:

val arrayinput = input.getAs[Seq[String]](0)


Yong



From: shyla deshpande 
Sent: Thursday, March 23, 2017 8:18 PM
To: user
Subject: Spark dataframe, UserDefinedAggregateFunction(UDAF) help!!

This is my input data. The UDAF needs to aggregate the goals for a team and 
return a map that  gives the count for every goal in the team.
I am getting the following error

java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef 
cannot be cast to [Ljava.lang.String;
at com.whil.common.GoalAggregator.update(GoalAggregator.scala:27)

+--+--+
|teamid|goals |
+--+--+
|t1|[Goal1, Goal2]|
|t1|[Goal1, Goal3]|
|t2|[Goal1, Goal2]|
|t3|[Goal2, Goal3]|
+--+--+

root
 |-- teamid: string (nullable = true)
 |-- goals: array (nullable = true)
 ||-- element: string (containsNull = true)

/Calling the UDAF//

object TestUDAF {
  def main(args: Array[String]): Unit = {

val spark = SparkSession
  .builder
  .getOrCreate()

val sc: SparkContext = spark.sparkContext
val sqlContext = spark.sqlContext

import sqlContext.implicits._

val data = Seq(
  ("t1", Seq("Goal1", "Goal2")),
  ("t1", Seq("Goal1", "Goal3")),
  ("t2", Seq("Goal1", "Goal2")),
  ("t3", Seq("Goal2", "Goal3"))).toDF("teamid","goals")

data.show(truncate = false)
data.printSchema()

import spark.implicits._

val sumgoals = new GoalAggregator
val result = data.groupBy("teamid").agg(sumgoals(col("goals")))

result.show(truncate = false)

  }
}

///UDAF/

import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

class GoalAggregator extends UserDefinedAggregateFunction{

  override def inputSchema: org.apache.spark.sql.types.StructType =
  StructType(StructField("value", ArrayType(StringType)) :: Nil)

  override def bufferSchema: StructType = StructType(
  StructField("combined", MapType(StringType,IntegerType)) :: Nil
  )

  override def dataType: DataType = MapType(StringType,IntegerType)

  override def deterministic: Boolean = true

  override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer.update(0, Map[String, Integer]())
  }

  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val mapbuf = buffer.getAs[Map[String, Int]](0)
val arrayinput = input.getAs[Array[String]](0)
val result = mapbuf ++ arrayinput.map(goal => {
  val cnt  = mapbuf.get(goal).getOrElse(0) + 1
  goal -> cnt
})
buffer.update(0, result)
  }

  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
val map1 = buffer1.getAs[Map[String, Int]](0)
val map2 = buffer2.getAs[Map[String, Int]](0)
val result = map1 ++ map2.map { case (k,v) =>
  val cnt = map1.get(k).getOrElse(0) + 1
  k -> cnt
}
buffer1.update(0, result)
  }

  override def evaluate(buffer: Row): Any = {
buffer.getAs[Map[String, Int]](0)
  }
}




Re: how to read object field within json file

2017-03-23 Thread Yong Zhang
That's why your "source" should be defined as an Array[Struct] type (which 
makes sense in this case, it has an undetermined length  , so you can explode 
it and get the description easily.

Now you need write your own UDF, maybe can do what you want.

Yong


From: Selvam Raman 
Sent: Thursday, March 23, 2017 5:03 PM
To: user
Subject: how to read object field within json file

Hi,

{
"id": "test1",
"source": {
"F1": {
  "id": "4970",
  "eId": "F1",
  "description": "test1",
},
"F2": {
  "id": "5070",
  "eId": "F2",
  "description": "test2",
},
"F3": {
  "id": "5170",
  "eId": "F3",
  "description": "test3",
},
"F4":{}
  etc..
  "F999":{}
}

I am having bzip json files like above format.
some json row contains two objects within source(like F1 and F2), sometime 
five(F1,F2,F3,F4,F5),etc. So the final schema will contains combination of all 
objects for the source field.

Now, every row will contain n number of objects but only some contains valid 
records.
how can i retreive the value of "description" in "source" field.

source.F1.description - returns the result but how can i get all description 
result for every row..(something like this "source.*.description").

--
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Re: Converting dataframe to dataset question

2017-03-23 Thread Yong Zhang
Not sure I understand this problem, why I cannot reproduce it?


scala> spark.version
res22: String = 2.1.0

scala> case class Teamuser(teamid: String, userid: String, role: String)
defined class Teamuser

scala> val df = Seq(Teamuser("t1", "u1", "role1")).toDF
df: org.apache.spark.sql.DataFrame = [teamid: string, userid: string ... 1 more 
field]

scala> df.show
+--+--+-+
|teamid|userid| role|
+--+--+-+
|t1|u1|role1|
+--+--+-+

scala> df.createOrReplaceTempView("teamuser")

scala> val newDF = spark.sql("select teamid, userid, role from teamuser")
newDF: org.apache.spark.sql.DataFrame = [teamid: string, userid: string ... 1 
more field]

scala> val userDS: Dataset[Teamuser] = newDF.as[Teamuser]
userDS: org.apache.spark.sql.Dataset[Teamuser] = [teamid: string, userid: 
string ... 1 more field]

scala> userDS.show
+--+--+-+
|teamid|userid| role|
+--+--+-+
|t1|u1|role1|
+--+--+-+


scala> userDS.printSchema
root
 |-- teamid: string (nullable = true)
 |-- userid: string (nullable = true)
 |-- role: string (nullable = true)


Am I missing anything?


Yong



From: shyla deshpande 
Sent: Thursday, March 23, 2017 3:49 PM
To: user
Subject: Re: Converting dataframe to dataset question

I realized, my case class was inside the object. It should be defined outside 
the scope of the object. Thanks

On Wed, Mar 22, 2017 at 6:07 PM, shyla deshpande 
> wrote:

Why userDS is Dataset[Any], instead of Dataset[Teamuser]?  Appreciate your 
help. Thanks

val spark = SparkSession
  .builder
  .config("spark.cassandra.connection.host", cassandrahost)
  .appName(getClass.getSimpleName)
  .getOrCreate()

import spark.implicits._
val sqlContext = spark.sqlContext
import sqlContext.implicits._

case class Teamuser(teamid:String, userid:String, role:String)
spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("keyspace" -> "test", "table" -> "teamuser"))
  .load
  .createOrReplaceTempView("teamuser")

val userDF = spark.sql("SELECT teamid, userid, role FROM teamuser")

userDF.show()

val userDS:Dataset[Teamuser] = userDF.as[Teamuser]




Re: calculate diff of value and median in a group

2017-03-22 Thread Yong Zhang
He is looking for median, not mean/avg.


You have to implement the median logic by yourself, as there is no directly 
implementation from Spark. You can use RDD API, if you are using 1.6.x, or 
dataset if 2.x


The following example gives you an idea how to calculate the median using 
dataset API. You can even change the code to add additional logic to calculate 
the diff of every value with the median.


scala> spark.version
res31: String = 2.1.0

scala> val ds = Seq((100,0.43),(100,0.33),(100,0.73),(101,0.29),(101,0.96),
(101,0.42),(101,0.01)).toDF("id","value").as[(Int, Double)]
ds: org.apache.spark.sql.Dataset[(Int, Double)] = [id: int, value: double]

scala> ds.show
+---+-+
| id|value|
+---+-+
|100| 0.43|
|100| 0.33|
|100| 0.73|
|101| 0.29|
|101| 0.96|
|101| 0.42|
|101| 0.01|
+---+-+

scala> def median(seq: Seq[Double]) = {
 |   val size = seq.size
 |   val sorted = seq.sorted
 |   size match {
 | case even if size % 2 == 0 => (sorted((size-2)/2) + sorted(size/2)) 
/ 2
 | case odd => sorted((size-1)/2)
 |   }
 | }
median: (seq: Seq[Double])Double

scala> ds.groupByKey(_._1).mapGroups((id, iter) => (id, 
median(iter.map(_._2).toSeq))).show
+---+-+
| _1|   _2|
+---+-+
|101|0.355|
|100| 0.43|
+---+-+


Yong




From: ayan guha <guha.a...@gmail.com>
Sent: Wednesday, March 22, 2017 7:23 PM
To: Craig Ching
Cc: Yong Zhang; user@spark.apache.org
Subject: Re: calculate diff of value and median in a group

I would suggest use window function with partitioning.

select group1,group2,name,value, avg(value) over (partition group1,group2 order 
by name) m
from t

On Thu, Mar 23, 2017 at 9:58 AM, Craig Ching 
<craigch...@gmail.com<mailto:craigch...@gmail.com>> wrote:

Are the elements count big per group? If not, you can group them and use the 
code to calculate the median and diff.


They're not big, no.  Any pointers on how I might do that?  The part I'm having 
trouble with is the grouping, I can't seem to see how to do the median per 
group.  For mean, we have the agg feature, but not for median (and I understand 
the reasons for that).


Yong


From: Craig Ching <craigch...@gmail.com<mailto:craigch...@gmail.com>>
Sent: Wednesday, March 22, 2017 3:17 PM
To: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: calculate diff of value and median in a group

Hi,

When using pyspark, I'd like to be able to calculate the difference between 
grouped values and their median for the group.  Is this possible?  Here is some 
code I hacked up that does what I want except that it calculates the grouped 
diff from mean.  Also, please feel free to comment on how I could make this 
better if you feel like being helpful :)

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import (
StringType,
LongType,
DoubleType,
StructField,
StructType
)
from pyspark.sql import functions as F


sc = SparkContext(appName='myapp')
spark = SparkSession(sc)

file_name = 'data.csv'

fields = [
StructField(
'group2',
LongType(),
True),
StructField(
'name',
StringType(),
True),
StructField(
'value',
DoubleType(),
True),
StructField(
'group1',
LongType(),
True)
]
schema = StructType(fields)

df = spark.read.csv(
file_name, header=False, mode="DROPMALFORMED", schema=schema
)
df.show()
means = df.select([
'group1',
'group2',
'name',
'value']).groupBy([
'group1',
'group2'
]).agg(
F.mean('value').alias('mean_value')
).orderBy('group1', 'group2')

cond = [df.group1 == means.group1, df.group2 == means.group2]

means.show()
df = df.select([
'group1',
'group2',
'name',
'value']).join(
means,
cond
).drop(
df.group1
).drop(
df.group2
).select('group1',
 'group2',
 'name',
 'value',
 'mean_value')

final = df.withColumn(
'diff',
F.abs(df.value - df.mean_value))
final.show()

sc.stop()

And here is an example dataset I'm playing with:

100,name1,0.43,0
100,name2,0.33,0
100,name3,0.73,0
101,name1,0.29,0
101,name2,0.96,0
101,name3,0.42,0
102,name1,0.01,0
102,name2,0.42,0
102,name3,0.51,0
103,name1,0.55,0
103,name2,0.45,0
103,name3,0.02,0
104,name1,0.93,0
104,name2,0.16,0
104,name3,0.74,0
105,name1,0.41,0
105,name2,0.65,0
105,name3,0.29,0
100,name1,0.51,1
100,name2,0.51,1
100,name3,0.43,1
101,name1,0.59,1
101,name2,0.55,1
101,name3,0.84,1
102,name1,0.01,1
102,name2,0.98,1
102,name3,0.44,1
103,name1,0.47,1
103,name2,0.16,1
103,name3,0.02,1
104,name1,0.83,1
104,name2,0.89,1
104,name3,0.31,1
105,name1,0.59,1
105,name2,0.77,1
105,name3,0.45,1

and here is what I'm trying to produce:

group1,group2,name,value,m

Re: calculate diff of value and median in a group

2017-03-22 Thread Yong Zhang
Are the elements count big per group? If not, you can group them and use the 
code to calculate the median and diff.


Yong


From: Craig Ching 
Sent: Wednesday, March 22, 2017 3:17 PM
To: user@spark.apache.org
Subject: calculate diff of value and median in a group

Hi,

When using pyspark, I'd like to be able to calculate the difference between 
grouped values and their median for the group.  Is this possible?  Here is some 
code I hacked up that does what I want except that it calculates the grouped 
diff from mean.  Also, please feel free to comment on how I could make this 
better if you feel like being helpful :)

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import (
StringType,
LongType,
DoubleType,
StructField,
StructType
)
from pyspark.sql import functions as F


sc = SparkContext(appName='myapp')
spark = SparkSession(sc)

file_name = 'data.csv'

fields = [
StructField(
'group2',
LongType(),
True),
StructField(
'name',
StringType(),
True),
StructField(
'value',
DoubleType(),
True),
StructField(
'group1',
LongType(),
True)
]
schema = StructType(fields)

df = spark.read.csv(
file_name, header=False, mode="DROPMALFORMED", schema=schema
)
df.show()
means = df.select([
'group1',
'group2',
'name',
'value']).groupBy([
'group1',
'group2'
]).agg(
F.mean('value').alias('mean_value')
).orderBy('group1', 'group2')

cond = [df.group1 == means.group1, df.group2 == means.group2]

means.show()
df = df.select([
'group1',
'group2',
'name',
'value']).join(
means,
cond
).drop(
df.group1
).drop(
df.group2
).select('group1',
 'group2',
 'name',
 'value',
 'mean_value')

final = df.withColumn(
'diff',
F.abs(df.value - df.mean_value))
final.show()

sc.stop()

And here is an example dataset I'm playing with:

100,name1,0.43,0
100,name2,0.33,0
100,name3,0.73,0
101,name1,0.29,0
101,name2,0.96,0
101,name3,0.42,0
102,name1,0.01,0
102,name2,0.42,0
102,name3,0.51,0
103,name1,0.55,0
103,name2,0.45,0
103,name3,0.02,0
104,name1,0.93,0
104,name2,0.16,0
104,name3,0.74,0
105,name1,0.41,0
105,name2,0.65,0
105,name3,0.29,0
100,name1,0.51,1
100,name2,0.51,1
100,name3,0.43,1
101,name1,0.59,1
101,name2,0.55,1
101,name3,0.84,1
102,name1,0.01,1
102,name2,0.98,1
102,name3,0.44,1
103,name1,0.47,1
103,name2,0.16,1
103,name3,0.02,1
104,name1,0.83,1
104,name2,0.89,1
104,name3,0.31,1
105,name1,0.59,1
105,name2,0.77,1
105,name3,0.45,1

and here is what I'm trying to produce:

group1,group2,name,value,median,diff
0,100,name1,0.43,0.43,0.0
0,100,name2,0.33,0.43,0.10
0,100,name3,0.73,0.43,0.30
0,101,name1,0.29,0.42,0.13
0,101,name2,0.96,0.42,0.54
0,101,name3,0.42,0.42,0.0
0,102,name1,0.01,0.42,0.41
0,102,name2,0.42,0.42,0.0
0,102,name3,0.51,0.42,0.09
0,103,name1,0.55,0.45,0.10
0,103,name2,0.45,0.45,0.0
0,103,name3,0.02,0.45,0.43
0,104,name1,0.93,0.74,0.19
0,104,name2,0.16,0.74,0.58
0,104,name3,0.74,0.74,0.0
0,105,name1,0.41,0.41,0.0
0,105,name2,0.65,0.41,0.24
0,105,name3,0.29,0.41,0.24
1,100,name1,0.51,0.51,0.0
1,100,name2,0.51,0.51,0.0
1,100,name3,0.43,0.51,0.08
1,101,name1,0.59,0.59,0.0
1,101,name2,0.55,0.59,0.04
1,101,name3,0.84,0.59,0.25
1,102,name1,0.01,0.44,0.43
1,102,name2,0.98,0.44,0.54
1,102,name3,0.44,0.44,0.0
1,103,name1,0.47,0.16,0.31
1,103,name2,0.16,0.16,0.0
1,103,name3,0.02,0.16,0.14
1,104,name1,0.83,0.83,0.0
1,104,name2,0.89,0.83,0.06
1,104,name3,0.31,0.83,0.52
1,105,name1,0.59,0.59,0.0
1,105,name2,0.77,0.59,0.18
1,105,name3,0.45,0.59,0.14

Thanks for any help!

Cheers,
Craig


If TypedColumn is a subclass of Column, why I cannot apply function on it in Dataset?

2017-03-18 Thread Yong Zhang
In the following example, after I used "typed.avg" to generate a TypedColumn, 
and I want to apply round on top of it? But why Spark complains about it? 
Because it doesn't know that it is a TypedColumn?


Thanks


Yong



scala> spark.version

res20: String = 2.1.0

scala> case class Token(name: String, productId: Int, score: Double)

defined class Token

scala> val data = Token("aaa", 100, 0.12) :: Token("aaa", 200, 0.29) :: 
Token("bbb", 200, 0.53) :: Token("bbb", 300, 0.42) :: Nil

data: List[Token] = List(Token(aaa,100,0.12), Token(aaa,200,0.29), 
Token(bbb,200,0.53), Token(bbb,300,0.42))

scala> val dataset = data.toDS

dataset: org.apache.spark.sql.Dataset[Token] = [name: string, productId: int 
... 1 more field]

scala> import org.apache.spark.sql.expressions.scalalang._

import org.apache.spark.sql.expressions.scalalang._

scala> dataset.groupByKey(_.productId).agg(typed.avg[Token](_.score)).show

+-+-+

|value|TypedAverage($line22.$readiwiw$Token)|

+-+-+

| 300| 0.42|

| 100| 0.12|

| 200| 0.41003|

+-+-+

scala> dataset.groupByKey(_.productId).agg(round(typed.avg[Token](_.score)))

:36: error: type mismatch;

found : org.apache.spark.sql.Column

required: org.apache.spark.sql.TypedColumn[Token,?]

dataset.groupByKey(_.productId).agg(round(typed.avg[Token](_.score)))



Re: Spark 2.0.2 - hiveContext.emptyDataFrame.except(hiveContext.emptyDataFrame).count()

2017-03-17 Thread Yong Zhang
Starting from Spark 2, these kind of operation are implemented in left anti 
join, instead of using RDD operation directly.


Same issue also on sqlContext.


scala> spark.version
res25: String = 2.0.2


spark.sqlContext.emptyDataFrame.except(spark.sqlContext.emptyDataFrame).explain(true)

== Physical Plan ==
*HashAggregate(keys=[], functions=[], output=[])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[], output=[])
  +- BroadcastNestedLoopJoin BuildRight, LeftAnti, false
 :- Scan ExistingRDD[]
 +- BroadcastExchange IdentityBroadcastMode
+- Scan ExistingRDD[]


This arguably means a bug. But my guess is liking the logic of comparing NULL = 
NULL, should it return true or false, causing this kind of confusion.

Yong


From: Ravindra 
Sent: Friday, March 17, 2017 4:30 AM
To: user@spark.apache.org
Subject: Spark 2.0.2 - 
hiveContext.emptyDataFrame.except(hiveContext.emptyDataFrame).count()

Can someone please explain why

println ( " Empty count " + 
hiveContext.emptyDataFrame.except(hiveContext.emptyDataFrame).count()

prints -  Empty count 1

This was not the case in Spark 1.5.2... I am upgrading to spark 2.0.2 and found 
this. This causes my tests to fail. Is there another way to check full equality 
of 2 dataframes.

Thanks,
Ravindra.


Re: Dataset : Issue with Save

2017-03-17 Thread Yong Zhang
Looks like the current fix is reducing accumulator data being sent to driver, 
but there are still lots of more statistics data being sent to the driver. It 
is arguable that how much data is reasonable for 3.7k tasks.


You can attach your heap dump file in that JIRA and follow it.


Yong


From: Bahubali Jain <bahub...@gmail.com>
Sent: Thursday, March 16, 2017 11:41 PM
To: Yong Zhang
Cc: user@spark.apache.org
Subject: Re: Dataset : Issue with Save

I am using SPARK 2.0 . There are comments in the ticket since Oct-2016 which 
clearly mention that issue still persists even in 2.0.
I agree 1G is very small today's world, and I have already resolved by 
increasing the spark.driver.maxResultSize.
I was more intrigued as to why is the data being sent to driver during 
save(similat to collect() action ), are there any plans to fix this 
behavior/issue ?

Thanks,
Baahu

On Fri, Mar 17, 2017 at 8:17 AM, Yong Zhang 
<java8...@hotmail.com<mailto:java8...@hotmail.com>> wrote:

Did you read the JIRA ticket? Are you confirming that it is fixed in Spark 2.0, 
or you complain that it still exists in Spark 2.0?


First, you didn't tell us what version of your Spark you are using. The JIRA 
clearly said that it is a bug in Spark 1.x, and should be fixed in Spark 2.0. 
So help yourself and the community, to confirm if this is the case.


If you are looking for workaround, the JIRA ticket clearly show you how to 
increase your driver heap. 1G in today's world really is kind of small.


Yong



From: Bahubali Jain <bahub...@gmail.com<mailto:bahub...@gmail.com>>
Sent: Thursday, March 16, 2017 10:34 PM
To: Yong Zhang
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: Dataset : Issue with Save

Hi,
Was this not yet resolved?
Its a very common requirement to save a dataframe, is there a better way to 
save a dataframe by avoiding data being sent to driver?.

"Total size of serialized results of 3722 tasks (1024.0 MB) is bigger than 
spark.driver.maxResultSize (1024.0 MB) "

Thanks,
Baahu

On Fri, Mar 17, 2017 at 1:19 AM, Yong Zhang 
<java8...@hotmail.com<mailto:java8...@hotmail.com>> wrote:

You can take a look of https://issues.apache.org/jira/browse/SPARK-12837


Yong

Spark driver requires large memory space for serialized 
...<https://issues.apache.org/jira/browse/SPARK-12837>
issues.apache.org<http://issues.apache.org>
Executing a sql statement with a large number of partitions requires a high 
memory space for the driver even there are no requests to collect data back to 
the driver.





From: Bahubali Jain <bahub...@gmail.com<mailto:bahub...@gmail.com>>
Sent: Thursday, March 16, 2017 1:39 PM
To: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Dataset : Issue with Save

Hi,
While saving a dataset usingmydataset.write().csv("outputlocation") 
  I am running into an exception

"Total size of serialized results of 3722 tasks (1024.0 MB) is bigger than 
spark.driver.maxResultSize (1024.0 MB)"

Does it mean that for saving a dataset whole of the dataset contents are being 
sent to driver ,similar to collect()  action?

Thanks,
Baahu



--
Twitter:http://twitter.com/Baahu




--
Twitter:http://twitter.com/Baahu



Re: RDD can not convert to df, thanks

2017-03-17 Thread Yong Zhang
You also need the import the sqlContext implicits


import sqlContext.implicits._

Yong


From: 萝卜丝炒饭 <1427357...@qq.com>
Sent: Friday, March 17, 2017 1:52 AM
To: user-return-68576-1427357147=qq.com; user
Subject: Re: RDD can not convert to df, thanks

More info,I have imported the implicitics of sparksession.

---Original---
From: 

Re: Dataset : Issue with Save

2017-03-16 Thread Yong Zhang
Did you read the JIRA ticket? Are you confirming that it is fixed in Spark 2.0, 
or you complain that it still exists in Spark 2.0?


First, you didn't tell us what version of your Spark you are using. The JIRA 
clearly said that it is a bug in Spark 1.x, and should be fixed in Spark 2.0. 
So help yourself and the community, to confirm if this is the case.


If you are looking for workaround, the JIRA ticket clearly show you how to 
increase your driver heap. 1G in today's world really is kind of small.


Yong



From: Bahubali Jain <bahub...@gmail.com>
Sent: Thursday, March 16, 2017 10:34 PM
To: Yong Zhang
Cc: user@spark.apache.org
Subject: Re: Dataset : Issue with Save

Hi,
Was this not yet resolved?
Its a very common requirement to save a dataframe, is there a better way to 
save a dataframe by avoiding data being sent to driver?.

"Total size of serialized results of 3722 tasks (1024.0 MB) is bigger than 
spark.driver.maxResultSize (1024.0 MB) "

Thanks,
Baahu

On Fri, Mar 17, 2017 at 1:19 AM, Yong Zhang 
<java8...@hotmail.com<mailto:java8...@hotmail.com>> wrote:

You can take a look of https://issues.apache.org/jira/browse/SPARK-12837


Yong

Spark driver requires large memory space for serialized 
...<https://issues.apache.org/jira/browse/SPARK-12837>
issues.apache.org<http://issues.apache.org>
Executing a sql statement with a large number of partitions requires a high 
memory space for the driver even there are no requests to collect data back to 
the driver.





From: Bahubali Jain <bahub...@gmail.com<mailto:bahub...@gmail.com>>
Sent: Thursday, March 16, 2017 1:39 PM
To: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Dataset : Issue with Save

Hi,
While saving a dataset usingmydataset.write().csv("outputlocation") 
  I am running into an exception

"Total size of serialized results of 3722 tasks (1024.0 MB) is bigger than 
spark.driver.maxResultSize (1024.0 MB)"

Does it mean that for saving a dataset whole of the dataset contents are being 
sent to driver ,similar to collect()  action?

Thanks,
Baahu



--
Twitter:http://twitter.com/Baahu



Re: spark streaming exectors memory increasing and executor killed by yarn

2017-03-16 Thread Yong Zhang
In this kind of question, you always want to tell us the spark version.


Yong



From: darin 
Sent: Thursday, March 16, 2017 9:59 PM
To: user@spark.apache.org
Subject: spark streaming exectors memory increasing and executor killed by yarn

Hi,
I got this exception when streaming program run some hours.

```
*User class threw exception: org.apache.spark.SparkException: Job aborted
due to stage failure: Task 21 in stage 1194.0 failed 4 times, most recent
failure: Lost task 21.3 in stage 1194.0 (TID 2475, 2.dev3, executor 66):
ExecutorLostFailure (executor 66 exited caused by one of the running tasks)
Reason: Container killed by YARN for exceeding memory limits. 3.5 GB of 3.5
GB physical memory used. Consider boosting
spark.yarn.executor.memoryOverhead.*
```

I have googled some solutions like close yarn memory monitor ,increasing
exector memory... .I think it is not the right way .


And this is the submit script:
```
*spark-submit --master yarn-cluster --driver-cores 1 --driver-memory 1G
--num-executors 6 --executor-cores 3 --executor-memory 3G --conf
"spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC -XX:+UseParNewGC
-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/javadump.hprof" --conf
"spark.kryoserializer.buffer.max=512m" --class
com.dtise.data.streaming.ad.DTStreamingStatistics
hdfs://nameservice1/user/yanghb/spark-streaming-1.0.jar*
```

And This is the main codes:

```
val originalStream = ssc.textFileStream(rawDataPath)
originalStream.repartition(10).mapPartitions(parseAdLog).reduceByKey(_
++ _)
  .mapWithState(StateSpec.function(countAdLogWithState
_)).foreachRDD(rdd => {
if (!rdd.isEmpty()) {
  val batchTime = Calendar.getInstance.getTimeInMillis
  val dimensionSumMap = rdd.map(_._1).reduce(_ ++ _)
  val nameList = rdd.map(_._2).reduce(_ ++ _).toList
  val jedis = RedisUtils.jedis()
  jedis.hmset(joinString("t_ad_dimension_sum", batchTime),
dimensionSumMap)
  jedis.lpush(joinString("t_ad_name", batchTime), nameList: _*)
  jedis.set(joinString("t_ad", batchTime.toString), "OK")
  jedis.close()

  rdd.flatMap(_._3).foreachPartition(logInfoList => {
val producter = new StringProducter
for (logInfo <- logInfoList) {
  val logInfoArr = logInfo.split("\t", -1)
  val kafkaKey = "ad/" + logInfoArr(campaignIdIdx) + "/" +
logInfoArr(logDateIdx)
  producter.send("cookedLog", kafkaKey, logInfo)
}
producter.close()
  })
}
  })
```

These are jvm heap mat results





/*Anybody has any advice about this ?
Thanks*/





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-exectors-memory-increasing-and-executor-killed-by-yarn-tp28500.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


[http://apache-spark-user-list.1001560.n3.nabble.com/file/n28500/QQ20170317-095238%402x.png]

[http://apache-spark-user-list.1001560.n3.nabble.com/file/n28500/QQ20170317-095254%402x.png]

[http://apache-spark-user-list.1001560.n3.nabble.com/file/n28500/QQ20170317-095331%402x.png]


Re: Dataset : Issue with Save

2017-03-16 Thread Yong Zhang
You can take a look of https://issues.apache.org/jira/browse/SPARK-12837


Yong

Spark driver requires large memory space for serialized 
...
issues.apache.org
Executing a sql statement with a large number of partitions requires a high 
memory space for the driver even there are no requests to collect data back to 
the driver.





From: Bahubali Jain 
Sent: Thursday, March 16, 2017 1:39 PM
To: user@spark.apache.org
Subject: Dataset : Issue with Save

Hi,
While saving a dataset usingmydataset.write().csv("outputlocation") 
  I am running into an exception

"Total size of serialized results of 3722 tasks (1024.0 MB) is bigger than 
spark.driver.maxResultSize (1024.0 MB)"

Does it mean that for saving a dataset whole of the dataset contents are being 
sent to driver ,similar to collect()  action?

Thanks,
Baahu


Re: apply UDFs to N columns dynamically in dataframe

2017-03-15 Thread Yong Zhang
Is the answer here good for your case?


http://stackoverflow.com/questions/33151866/spark-udf-with-varargs

[https://cdn.sstatic.net/Sites/stackoverflow/img/apple-touch-i...@2.png?v=73d79a89bded]

scala - Spark UDF with varargs - Stack 
Overflow
stackoverflow.com
UDFs don't support varargs* but you can pass an arbitrary number of columns 
wrapped using an array function: import org.apache.spark.sql.functions.{udf, 
array, lit ...






From: anup ahire 
Sent: Wednesday, March 15, 2017 2:04 AM
To: user@spark.apache.org
Subject: apply UDFs to N columns dynamically in dataframe

Hello,

I have a schema and name of columns to apply UDF to. Name of columns are user 
input and they can differ in numbers for each input.

Is there a way to apply UDFs to N columns in dataframe  ?



Thanks !


Re: Setting Optimal Number of Spark Executor Instances

2017-03-15 Thread Yong Zhang
Not really sure what is the root problem you try to address.


The number of tasks need to be run in Spark depends on the number of partitions 
in your job.


Let's use a simple word count example, if your spark job read 128G data from 
HDFS (assume the default block size is 128M), then the mapper stage of your 
spark job will spawn 1000 tasks (128G / 128M).


In the reducer stage, by default, spark will spawn 200 tasks (controlled by 
spark.default.parallelism if you are using RDD api or 
spark.sql.shuffle.partitions if you are using DataFrame, and you didn't specify 
the partition number in any of your API call).


In either case, you can change the tasks number spawned (Even in the mapper 
case, but I didn't see any reason under normal case). For huge datasets running 
in Spark, people often to increase the tasks count spawned in the reducing 
stage, to make each task processing much less volume of data, to reduce the 
memory pressure and increase performance.


Still in the word count example, if you have 2000 unique words in your dataset, 
then your reducer count could be from 1 to 2000. 1 is the worst, as only one 
task will process all 2000 unique words, meaning all the data will be sent to 
this one task, and it will be the slowest. But on the other hand, 2000 maybe is 
neither the best.


Let's say we set 200 is the best number, so you will have 200 reduce tasks to 
process 2000 unique words. Setting the number of executors and cores is just to 
allocation how many these tasks can be run concurrently. So if your cluster has 
enough cores and memory available, obviously grant as many as cores up to 200 
to your spark job for this reducing stage is the best.


You need to be more clear about what problem you are facing when running your 
spark job here, so we can provide help. Reducing the number of tasks spawned 
normally is a very strange way.


Yong



From: Kevin Peng 
Sent: Wednesday, March 15, 2017 1:35 PM
To: mohini kalamkar
Cc: user@spark.apache.org
Subject: Re: Setting Optimal Number of Spark Executor Instances

Mohini,

We set that parameter before we went and played with the number of executors 
and that didn't seem to help at all.

Thanks,

KP

On Tue, Mar 14, 2017 at 3:37 PM, mohini kalamkar 
> wrote:
Hi,

try using this parameter --conf spark.sql.shuffle.partitions=1000

Thanks,
Mohini

On Tue, Mar 14, 2017 at 3:30 PM, kpeng1 
> wrote:
Hi All,

I am currently on Spark 1.6 and I was doing a sql join on two tables that
are over 100 million rows each and I noticed that it was spawn 3+ tasks
(this is the progress meter that we are seeing show up).  We tried to
coalesece, repartition and shuffle partitions to drop the number of tasks
down because we were getting time outs due to the number of task being
spawned, but those operations did not seem to reduce the number of tasks.
The solution we came up with was actually to set the num executors to 50
(--num-executors=50) and it looks like it spawned 200 active tasks, but the
total number of tasks remained the same.  Was wondering if anyone knows what
is going on?  Is there an optimal number of executors, I was under the
impression that the default dynamic allocation would pick the optimal number
of executors for us and that this situation wouldn't happen.  Is there
something I am missing?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Setting-Optimal-Number-of-Spark-Executor-Instances-tp28493.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org




--
Thanks & Regards,
Mohini Kalamkar
M: +1 310 567 9329



Re: Sorted partition ranges without overlap

2017-03-13 Thread Yong Zhang
You can implement your own partitioner based on your own logic.


Yong



From: Kristoffer Sjögren 
Sent: Monday, March 13, 2017 9:34 AM
To: user
Subject: Sorted partition ranges without overlap

Hi

I have a RDD that needs to be sorted lexicographically and
then processed by partition. The partitions should be split in to
ranged blocks where sorted order is maintained and each partition
containing sequential, non-overlapping keys.

Given keys (1,2,3,4,5,6)

1. Correct
  - 2 partition = (1,2,3),(4,5,6).
  - 3 partition = (1,2),(3,4),(5,6)

2. Incorrect, the ranges overlap even though they're sorted.
  - 2 partitions (1,3,5) (2,4,6)
  - 3 partitions (1,3),(2,5),(4,6)


Is this possible with spark?

Cheers,
-Kristoffer

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: keep or remove sc.stop() coz of RpcEnv already stopped error

2017-03-13 Thread Yong Zhang
What version of Spark you are using?


Based on Spark-12967, it is fixed on Spark 2.0 and later. If you are using 
Spark 1.x, you can ignore this Warning. It shouldn't affect any functions.


Yong


From: nancy henry 
Sent: Monday, March 13, 2017 7:08 AM
To: user @spark
Subject: keep or remove sc.stop() coz of RpcEnv already stopped error

 Hi Team,

getting this error if we put sc.stop() in application..

can we remove it from application but i read if yu dont explicitly stop using 
sc.stop the yarn application will not get registered in history service.. SO 
what to do ?

 WARN Dispatcher: Message RemoteProcessDisconnected dropped.
java.lang.IllegalStateException: RpcEnv already stopped.
at 
org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:159)
at org.apache.spark.rpc.netty.Dispatcher.postToAll(Dispatcher.scala:109)
at 
org.apache.spark.rpc.netty.NettyRpcHandler.connectionTerminated(NettyRpcEnv.scala:630)
at 
org.apache.spark.network.server.TransportRequestHandler.channelUnregistered(TransportRequestHandler.java:94)




Re: org.apache.spark.SparkException: Task not serializable

2017-03-13 Thread Yong Zhang
In fact, I will suggest different way to handle the originally problem.


The example listed originally comes with a Java Function doesn't use any 
instance fields/methods, so serializing the whole class is a overkill solution.


Instead, you can/should make the Function static, which will work in the logic 
of that function tries to do, and it is a better solution than marking the 
whole class serializable.


The whole issue is that the function is not static, but doesn't use any 
instance fields or other methods. But Spark sends the non-static function call, 
it has to wrapper the whole class which contains the function as a whole 
closure through network, and in this case, it requires the whole class to be 
serializable.


Yong



From: 颜发才(Yan Facai) 
Sent: Saturday, March 11, 2017 6:48 AM
To: 萝卜丝炒饭
Cc: Mina Aslani; Ankur Srivastava; user@spark.apache.org
Subject: Re: org.apache.spark.SparkException: Task not serializable

For scala,
make your class Serializable, like this
```
class YourClass extends Serializable {
}
```



On Sat, Mar 11, 2017 at 3:51 PM, 萝卜丝炒饭 
<1427357...@qq.com> wrote:
hi mina,

can you paste your new code here pleasel
i meet this issue too but do not get Ankur's idea.

thanks
Robin

---Original---
From: "Mina Aslani">
Date: 2017/3/7 05:32:10
To: "Ankur 
Srivastava">;
Cc: 
"user@spark.apache.org">;
Subject: Re: org.apache.spark.SparkException: Task not serializable

Thank you Ankur for the quick response, really appreciate it! Making the class 
serializable resolved the exception!

Best regards,
Mina

On Mon, Mar 6, 2017 at 4:20 PM, Ankur Srivastava 
> wrote:
The fix for this make your class Serializable. The reason being the closures 
you have defined in the class need to be serialized and copied over to all 
executor nodes.

Hope this helps.

Thanks
Ankur

On Mon, Mar 6, 2017 at 1:06 PM, Mina Aslani 
> wrote:

Hi,

I am trying to start with spark and get number of lines of a text file in my 
mac, however I get

org.apache.spark.SparkException: Task not serializable error on

JavaRDD logData = javaCtx.textFile(file);

Please see below for the sample of code and the stackTrace.

Any idea why this error is thrown?

Best regards,

Mina

System.out.println("Creating Spark Configuration");
SparkConf javaConf = new SparkConf();
javaConf.setAppName("My First Spark Java Application");
javaConf.setMaster("PATH to my spark");
System.out.println("Creating Spark Context");
JavaSparkContext javaCtx = new JavaSparkContext(javaConf);
System.out.println("Loading the Dataset and will further process it");
String file = "file:///file.txt";
JavaRDD logData = javaCtx.textFile(file);

long numLines = logData.filter(new Function() {
   public Boolean call(String s) {
  return true;
   }
}).count();

System.out.println("Number of Lines in the Dataset "+numLines);

javaCtx.close();


Exception in thread "main" org.apache.spark.SparkException: Task not 
serializable
at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:387)
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:386)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.filter(RDD.scala:386)
at org.apache.spark.api.java.JavaRDD.filter(JavaRDD.scala:78)





Re: can spark take advantage of ordered data?

2017-03-10 Thread Yong Zhang
I think it is an interesting requirement, but I am not familiar with Spark 
enough to say it can be done as latest spark version or not.


>From my understanding, you are looking for some API from the spark to read the 
>source directly into a ShuffledRDD, which indeed needs (K, V and a Partitioner 
>instance).


I don't think Spark provides that directly as now, but in your case, it makes 
sense to create a JIRA for spark to support in the future.


For right now, maybe there are ways to use Spark developerAPI to do what you 
need, and I will leave that to other Spark expert to confirm.


Yong



From: sourabh chaki 
Sent: Friday, March 10, 2017 9:03 AM
To: Imran Rashid
Cc: Jonathan Coveney; user@spark.apache.org
Subject: Re: can spark take advantage of ordered data?

My use case is also quite similar. I have 2 feeds. One 3TB and another 100GB. 
Both the feeds are generated by hadoop reduce operation and partitioned by 
hadoop hashpartitioner. 3TB feed has 10K partitions whereas 100GB file has 200 
partitions.

Now when I do a join between these two feeds using spark, spark shuffles both 
the RDDS and it takes long time to complete. Can we do something so that spark 
can recognise the existing partitions of 3TB feed and shuffles only 200GB feed?
It can be mapside scan for bigger RDD and shuffle read from smaller RDD?

I have looked at spark-sorted project, but that project does not utilise the 
pre-existing partitions in the feed.
Any pointer will be helpful.

Thanks
Sourabh

On Thu, Mar 12, 2015 at 6:35 AM, Imran Rashid 
> wrote:
Hi Jonathan,

you might be interested in https://issues.apache.org/jira/browse/SPARK-3655 
(not yet available) and https://github.com/tresata/spark-sorted (not part of 
spark, but it is available right now).  Hopefully thats what you are looking 
for.  To the best of my knowledge that covers what is available now / what is 
being worked on.

Imran

On Wed, Mar 11, 2015 at 4:38 PM, Jonathan Coveney 
> wrote:
Hello all,

I am wondering if spark already has support for optimizations on sorted data 
and/or if such support could be added (I am comfortable dropping to a lower 
level if necessary to implement this, but I'm not sure if it is possible at 
all).

Context: we have a number of data sets which are essentially already sorted on 
a key. With our current systems, we can take advantage of this to do a lot of 
analysis in a very efficient fashion...merges and joins, for example, can be 
done very efficiently, as can folds on a secondary key and so on.

I was wondering if spark would be a fit for implementing these sorts of 
optimizations? Obviously it is sort of a niche case, but would this be 
achievable? Any pointers on where I should look?




Re: Spark failing while persisting sorted columns.

2017-03-09 Thread Yong Zhang
My guess is that your executor already crashed, due to OOM?. You should check 
the executor log, it may tell you more information.


Yong



From: Rohit Verma 
Sent: Thursday, March 9, 2017 4:41 AM
To: user
Subject: Spark failing while persisting sorted columns.

Hi all,

Please help me with below scenario.

While writing below query on large dataset (rowCount=100,000,000) using below 
query

// there are other instance of below job submitting to spark in multithreaded 
app.

final Dataset df = spark.read().parquet(tablePath);
// df storage is hdfs is 5.64 GB with 45 blocks.
df.select(col).na().drop().dropDuplicates(col).coalesce(20).sort(df.col(col)).coalesce(1).write().mode(SaveMode.Ignore).csv(path);

Getting below exception.

Task failed while writing rows
at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:261)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.shuffle.MetadataFetchFailedException: Missing an 
output location for shuffle 2991


Here are spark env details:


  *   Cores in use: 20 Total, 0 Used
  *   Memory in use: 72.2 GB Total, 0.0 B Used

And process configuration are as

"spark.cores.max", “20"
"spark.executor.memory", “3400MB"
“spark.kryoserializer.buffer.max”,”1000MB”

Any leads would be highly appreciated.

Regards
Rohit Verma




Re: finding Spark Master

2017-03-07 Thread Yong Zhang
This website explains it very clear, if you are using Yarn.


https://www.cloudera.com/documentation/enterprise/5-6-x/topics/cdh_ig_running_spark_on_yarn.html

Running Spark Applications on YARN - 
cloudera.com
www.cloudera.com
When Spark applications run on a YARN cluster manager, resource management, 
scheduling, and security are controlled by YARN.






From: Adaryl Wakefield 
Sent: Tuesday, March 7, 2017 8:53 PM
To: Koert Kuipers
Cc: user@spark.apache.org
Subject: RE: finding Spark Master


Ah so I see setMaster(‘yarn-client’). Hmm.



What I was ultimately trying to do was develop with Eclipse on my windows box 
and have the code point to my cluster so it executes there instead of my local 
windows machine. Perhaps I’m going about this wrong.



Adaryl "Bob" Wakefield, MBA
Principal
Mass Street Analytics, LLC
913.938.6685

www.massstreet.net

www.linkedin.com/in/bobwakefieldmba
Twitter: @BobLovesData



From: Koert Kuipers [mailto:ko...@tresata.com]
Sent: Tuesday, March 7, 2017 7:47 PM
To: Adaryl Wakefield 
Cc: user@spark.apache.org
Subject: Re: finding Spark Master



assuming this is running on yarn there is really spark-master. every job 
created its own "master" within a yarn application.



On Tue, Mar 7, 2017 at 6:27 PM, Adaryl Wakefield 
> wrote:

I’m running a three node cluster along with Spark along with Hadoop as part of 
a HDP stack. How do I find my Spark Master? I’m just seeing the clients. I’m 
trying to figure out what goes in setMaster() aside from local[*].



Adaryl "Bob" Wakefield, MBA
Principal
Mass Street Analytics, LLC
913.938.6685

www.massstreet.net

www.linkedin.com/in/bobwakefieldmba
Twitter: @BobLovesData






Re: Spark driver CPU usage

2017-03-01 Thread Yong Zhang
It won't control the cpu usage of Driver.


You should check out what CPUs are doing on your driver side. But I just want 
to make sure that you do know the full CPU usage on a 4 cores Linux box will be 
400%. So 100% really just make one core busy.


Driver does maintain the application web UI, and track all kinds of tasks 
statistics. So even if just a word count program, but if the source is huge, 
and generating thousands of tasks, then driver will be busy.


Yong



From: Phadnis, Varun 
Sent: Wednesday, March 1, 2017 7:57 AM
To: user@spark.apache.org
Subject: RE: Spark driver CPU usage

Does that configuration parameter affect the CPU usage of the driver? If it 
does, we have that property unchanged from its default value of "1" yet the 
same behaviour as before.

-Original Message-
From: Rohit Verma [mailto:rohit.ve...@rokittech.com]
Sent: 01 March 2017 06:08
To: Phadnis, Varun 
Cc: user@spark.apache.org
Subject: Re: Spark driver CPU usage

Use conf spark.task.cpus to control number of cpus to use in a task.

On Mar 1, 2017, at 5:41 PM, Phadnis, Varun  wrote:
>
> Hello,
>
> Is there a way to control CPU usage for driver when running applications in 
> client mode?
>
> Currently we are observing that the driver occupies all the cores. Launching 
> just 3 instances of driver of WordCount sample application concurrently on 
> the same machine brings the usage of its 4 core CPU to 100%. Is this expected 
> behaviour?
>
> Thanks.


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Why Spark cannot get the derived field of case class in Dataset?

2017-02-28 Thread Yong Zhang
In the following example, the "day" value is in the case class, but I cannot 
get that in the Spark dataset, which I would like to use at runtime? Any idea? 
Do I have to force it to be present in the case class constructor? I like to 
derive it out automatically and used in the dataset or dataframe.


Thanks


scala> spark.version
res12: String = 2.1.0

scala> import java.text.SimpleDateFormat
import java.text.SimpleDateFormat

scala> val dateFormat = new SimpleDateFormat("-MM-dd")
dateFormat: java.text.SimpleDateFormat = java.text.SimpleDateFormat@f67a0200

scala> case class Test(time: Long) {
 |   val day = dateFormat.format(time)
 | }
defined class Test
scala> val t = Test(1487185076410L)
t: Test = Test(1487185076410)

scala> t.time
res13: Long = 1487185076410

scala> t.day
res14: String = 2017-02-15

scala> val ds = Seq(t).toDS()
ds: org.apache.spark.sql.Dataset[Test] = [time: bigint]

scala> ds.show
+-+
| time|
+-+
|1487185076410|
+-+



Re: Duplicate Rank for within same partitions

2017-02-24 Thread Yong Zhang
What you described is not clear here.


Do you want to rank your data based on (date, hour, language, item_type, 
time_zone), and sort by score;

or you want to rank your data based on (date, hour) and sort by language, 
item_type, time_zone and score?


If you mean the first one, then your Spark code looks like right, but the 
example you gave didn't include "time_zone", which maybe the reason the rank 
starting from 1 again.


In Spark windows specification, partition by is for the columns you want to 
grouping at, order by is to decide the ordering order within the partition. 
Both can be applied for multi columns.


Yong


From: Dana Ram Meghwal 
Sent: Friday, February 24, 2017 2:08 AM
To: user@spark.apache.org
Subject: Fwd: Duplicate Rank for within same partitions


-- Forwarded message --
From: Dana Ram Meghwal >
Date: Thu, Feb 23, 2017 at 10:40 PM
Subject: Duplicate Rank for within same partitions
To: user-h...@spark.apache.org


Hey Guys,

I am new to spark. I am trying to write a spark script which involves finding  
rank of records over same data partitions-- (I will be clear in short while )


I have a table which have following column name and example data looks like 
this (record are around 20 million for each pair of  date ,hour, language and 
item_type)

Id,  language,   date,  hour,  item_type,   score
1hindi2017022000song10
2hindi 2017022000song  12
3hindi  20170220   00 song  15
.
.
.
till 20 million


4   english20170220   00 song9
5   english2017022000 song18
6  english 2017022000  song12
.
.
.
till 20 million


Now I want to rank them over language, date, hour, item_type

so finally it will look like this

Id,  language,   date,hour,  item_type,   score   rank
1hindi20170220 00song10  1
2hindi 20170220 00song  12  2
3hindi  2017022000  song  15   3

4   english20170220 00  song9   1
6  english 20170220  00 song12  2
5   english20170220   00 song18  3



to solve this I use rank function in spark

code look like following

1. converting rdd to dataframe

rdd_with_final_score_df  = spark.read.json(rdd_with_final_score).repartition(1);

2. setting window specifications

w = 
Window.partitionBy("dt","hour","language","item_type","time_zone").orderBy(rdd_with_final_score_df.score.cast("float").desc())

3. calculating ranks by repartition to 1  partition

rdd_with_final_score_df_rank_df = 
rdd_with_final_score_df.repartition(1).withColumn('rank', row_number().over(w))

Now number of row in " rdd_with_final_score" is so high  so this RDD is 
distributed across machines in cluster.


I am getting result but for each partition I am getting duplicate rank within 
partition

for e.g.

Id,  language,   date,hour,  item_type,   score   rank
1hindi20170220 00song10  1
2hindi 20170220 00song12  2
3hindi  2017022000  song  15   1


here record 1 and record 3 have same rank but it is expected that they should 
have different rank or rank should be unique for different score values.

 is case that each partition of RDD  rank is getting calculated separately ? 
and then merging because of that that multiple row getting same rank.


It will be very very help for me if you guys can help me understand what is 
going on here and how can we solve this.. I thought repartition would work but 
it did not..


I try to use rowBetween or rangeBetween but  it was giving error --

pyspark.sql.utils.AnalysisException: u'Window Frame ROWS BETWEEN 1 PRECEDING 
AND 1 FOLLOWING must match the required frame ROWS BETWEEN UNBOUNDED PRECEDING 
AN

D CURRENT ROW;'








--
Dana Ram Meghwal
Software Engineer
dana...@saavn.com




--
Dana Ram Meghwal
Software Engineer
dana...@saavn.com



Re: Spark SQL : Join operation failure

2017-02-22 Thread Yong Zhang
Your error message is not clear about what really happens.


Is your container killed by Yarn, or it indeed runs OOM?

When I run the spark job with big data, here is normally what I will do:

1) Enable GC output. You need to monitor the GC output in the executor, to 
understand the GC pressure. If you see the feq full GC, you know your job is in 
danger.
2) Monitor the statistics of tasks in feq full GC executor. How many records 
are processing so far, what is the spill read/write bytes. Is the OOM only 
happening in one task with much higher statistics than the rest? This normally 
means data skew. If lots of task all have GC pressure, then your setting is 
just not enough for job.
3) In your case, you first want to know what kind of join Spark is using for 
your outer join. Does it make sense for your data? Wrong join way could lead to 
wrong way to do the job.

Yong


From: jatinpreet 
Sent: Wednesday, February 22, 2017 1:11 AM
To: user@spark.apache.org
Subject: Spark SQL : Join operation failure

Hi,

I am having a hard time running outer join operation on two parquet
datasets. The dataset size is large ~500GB with a lot of culumns in tune of
1000.

As per YARN administer imposed limits in the queue, I can have a total of 20
vcores and 8GB memory per executor.

I specified meory overhead and increased number of shuffle partitions to no
avail. This is how I submitted the job with pyspark,

spark-submit --master yarn-cluster --executor-memory 5500m --num-executors
19 --executor-cores 1 --conf spark.yarn.executor.memoryOverhead=2000 --conf
spark.sql.shuffle.partitions=2048 --driver-memory 7g --queue
./

The relevant code is,

cm_go.registerTempTable("x")
ko.registerTempTable("y")
joined_df = sqlCtx.sql("select * from x FULL OUTER JOIN y ON field1=field2")
joined_df.write.save("/user/data/output")


I am getting errors like these:

ExecutorLostFailure (executor 5 exited caused by one of the running tasks)
Reason: Container marked as failed:
container_e36_1487531133522_0058_01_06 on host: dn2.bigdatalab.org. Exit
status: 52. Diagnostics: Exception from container-launch.
Container id: container_e36_1487531133522_0058_01_06
Exit code: 52
Stack trace: ExitCodeException exitCode=52:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:933)
at org.apache.hadoop.util.Shell.run(Shell.java:844)
at
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1123)
at
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:225)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:317)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:83)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


Container exited with a non-zero exit code 52

--

FetchFailed(null, shuffleId=0, mapId=-1, reduceId=508, message=
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
location for shuffle 0
at
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:695)
at
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:691)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at
org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:691)
at
org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:145)
at
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49)
at
org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:169)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at

Re: How to query a query with not contain, not start_with, not end_with condition effective?

2017-02-21 Thread Yong Zhang
If you read the source code of SparkStrategies


https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala#L106


If there is no joining keys, Join implementations are chosen with the following 
precedence:

  *   BroadcastNestedLoopJoin: if one side of the join could be broadcasted
  *   CartesianProduct: for Inner join
  *   BroadcastNestedLoopJoin


So your case will use BroadcastNestedLoopJoin, as there is no joining keys.


In this case, if there are lots of userId where url not like '%sell%', then 
Spark has to retrieve them back to Driver (to be broadcast), that explains why 
the high CPU usage on the driver side.

So if there are lots of userId where url not like '%sell%', then you can just 
try left semi join, which Spark will use SortMerge join in this case, I guess.


Yong


From: Yong Zhang <java8...@hotmail.com>
Sent: Tuesday, February 21, 2017 1:17 PM
To: Sidney Feiner; Chanh Le; user @spark
Subject: Re: How to query a query with not contain, not start_with, not 
end_with condition effective?


Sorry, didn't pay attention to the originally requirement.


Did you try the left outer join, or left semi join?

What is the explain plan when you use "not in"? Is it leading to a 
broadcastNestedLoopJoin?


spark.sql("select user_id from data where user_id not in (select user_id from 
data where url like '%sell%')").explain(true)


Yong



From: Sidney Feiner <sidney.fei...@startapp.com>
Sent: Tuesday, February 21, 2017 10:46 AM
To: Yong Zhang; Chanh Le; user @spark
Subject: RE: How to query a query with not contain, not start_with, not 
end_with condition effective?


Chanh wants to return user_id's that don't have any record with a url 
containing "sell". Without a subquery/join, it can only filter per record 
without knowing about the rest of the user_id's record



Sidney Feiner   /  SW Developer

M: +972.528197720  /  Skype: sidney.feiner.startapp



[StartApp]<http://www.startapp.com/>



From: Yong Zhang [mailto:java8...@hotmail.com]
Sent: Tuesday, February 21, 2017 4:10 PM
To: Chanh Le <giaosu...@gmail.com>; user @spark <user@spark.apache.org>
Subject: Re: How to query a query with not contain, not start_with, not 
end_with condition effective?



Not sure if I misunderstand your question, but what's wrong doing it this way?



scala> spark.version

res6: String = 2.0.2

scala> val df = Seq((1,"lao.com/sell"), (2, "lao.com/buy")).toDF("user_id", 
"url")

df: org.apache.spark.sql.DataFrame = [user_id: int, url: string]



scala> df.registerTempTable("data")

warning: there was one deprecation warning; re-run with -deprecation for details



scala> spark.sql("select user_id from data where url not like '%sell%'").show

+---+

|user_id|

+---+

|  2|

+---+



Yong





From: Chanh Le <giaosu...@gmail.com<mailto:giaosu...@gmail.com>>
Sent: Tuesday, February 21, 2017 4:56 AM
To: user @spark
Subject: How to query a query with not contain, not start_with, not end_with 
condition effective?



Hi everyone,



I am working on a dataset like this
user_id url
1  lao.com/buy<http://lao.com/buy>
2  bao.com/sell<http://bao.com/sell>
2  cao.com/market<http://cao.com/market>
1   lao.com/sell<http://lao.com/sell>
3  vui.com/sell<http://vui.com/sell>

I have to find all user_id with url not contain sell. Which means I need to 
query all user_id contains sell and put it into a set then do another query to 
find all user_id not in that set.
SELECT user_id
FROM data
WHERE user_id not in ( SELECT user_id FROM data WHERE url like ‘%sell%’;

My data is about 20 million records and it’s growing. When I tried in zeppelin 
I need to set spark.sql.crossJoin.enabled = true

Then I ran the query and the driver got extremely high CPU percentage and the 
process get stuck and I need to kill it.

I am running at client mode that submit to a Mesos cluster.



I am using Spark 2.0.2 and my data store in HDFS with parquet format.



Any advices for me in this situation?



Thank you in advance!.



Regards,

Chanh


Re: How to query a query with not contain, not start_with, not end_with condition effective?

2017-02-21 Thread Yong Zhang
Sorry, didn't pay attention to the originally requirement.


Did you try the left outer join, or left semi join?

What is the explain plan when you use "not in"? Is it leading to a 
broadcastNestedLoopJoin?


spark.sql("select user_id from data where user_id not in (select user_id from 
data where url like '%sell%')").explain(true)


Yong



From: Sidney Feiner <sidney.fei...@startapp.com>
Sent: Tuesday, February 21, 2017 10:46 AM
To: Yong Zhang; Chanh Le; user @spark
Subject: RE: How to query a query with not contain, not start_with, not 
end_with condition effective?


Chanh wants to return user_id's that don't have any record with a url 
containing "sell". Without a subquery/join, it can only filter per record 
without knowing about the rest of the user_id's record



Sidney Feiner   /  SW Developer

M: +972.528197720  /  Skype: sidney.feiner.startapp



[StartApp]<http://www.startapp.com/>



From: Yong Zhang [mailto:java8...@hotmail.com]
Sent: Tuesday, February 21, 2017 4:10 PM
To: Chanh Le <giaosu...@gmail.com>; user @spark <user@spark.apache.org>
Subject: Re: How to query a query with not contain, not start_with, not 
end_with condition effective?



Not sure if I misunderstand your question, but what's wrong doing it this way?



scala> spark.version

res6: String = 2.0.2

scala> val df = Seq((1,"lao.com/sell"), (2, "lao.com/buy")).toDF("user_id", 
"url")

df: org.apache.spark.sql.DataFrame = [user_id: int, url: string]



scala> df.registerTempTable("data")

warning: there was one deprecation warning; re-run with -deprecation for details



scala> spark.sql("select user_id from data where url not like '%sell%'").show

+---+

|user_id|

+---+

|  2|

+---+



Yong





From: Chanh Le <giaosu...@gmail.com<mailto:giaosu...@gmail.com>>
Sent: Tuesday, February 21, 2017 4:56 AM
To: user @spark
Subject: How to query a query with not contain, not start_with, not end_with 
condition effective?



Hi everyone,



I am working on a dataset like this
user_id url
1  lao.com/buy<http://lao.com/buy>
2  bao.com/sell<http://bao.com/sell>
2  cao.com/market<http://cao.com/market>
1   lao.com/sell<http://lao.com/sell>
3  vui.com/sell<http://vui.com/sell>

I have to find all user_id with url not contain sell. Which means I need to 
query all user_id contains sell and put it into a set then do another query to 
find all user_id not in that set.
SELECT user_id
FROM data
WHERE user_id not in ( SELECT user_id FROM data WHERE url like ‘%sell%’;

My data is about 20 million records and it’s growing. When I tried in zeppelin 
I need to set spark.sql.crossJoin.enabled = true

Then I ran the query and the driver got extremely high CPU percentage and the 
process get stuck and I need to kill it.

I am running at client mode that submit to a Mesos cluster.



I am using Spark 2.0.2 and my data store in HDFS with parquet format.



Any advices for me in this situation?



Thank you in advance!.



Regards,

Chanh


Re: How to query a query with not contain, not start_with, not end_with condition effective?

2017-02-21 Thread Yong Zhang
Not sure if I misunderstand your question, but what's wrong doing it this way?


scala> spark.version
res6: String = 2.0.2
scala> val df = Seq((1,"lao.com/sell"), (2, "lao.com/buy")).toDF("user_id", 
"url")
df: org.apache.spark.sql.DataFrame = [user_id: int, url: string]

scala> df.registerTempTable("data")
warning: there was one deprecation warning; re-run with -deprecation for details

scala> spark.sql("select user_id from data where url not like '%sell%'").show
+---+
|user_id|
+---+
|  2|
+---+


Yong



From: Chanh Le 
Sent: Tuesday, February 21, 2017 4:56 AM
To: user @spark
Subject: How to query a query with not contain, not start_with, not end_with 
condition effective?

Hi everyone,

I am working on a dataset like this
user_id url
1  lao.com/buy
2  bao.com/sell
2  cao.com/market
1   lao.com/sell
3  vui.com/sell

I have to find all user_id with url not contain sell. Which means I need to 
query all user_id contains sell and put it into a set then do another query to 
find all user_id not in that set.
SELECT user_id
FROM data
WHERE user_id not in ( SELECT user_id FROM data WHERE url like ‘%sell%’;

My data is about 20 million records and it’s growing. When I tried in zeppelin 
I need to set spark.sql.crossJoin.enabled = true
Then I ran the query and the driver got extremely high CPU percentage and the 
process get stuck and I need to kill it.
I am running at client mode that submit to a Mesos cluster.

I am using Spark 2.0.2 and my data store in HDFS with parquet format.

Any advices for me in this situation?

Thank you in advance!.

Regards,
Chanh


Re: [SparkSQL] pre-check syntex before running spark job?

2017-02-21 Thread Yong Zhang
You can always use explain method to validate your DF or SQL, before any action.


Yong



From: Jacek Laskowski 
Sent: Tuesday, February 21, 2017 4:34 AM
To: Linyuxin
Cc: user
Subject: Re: [SparkSQL] pre-check syntex before running spark job?

Hi,

Never heard about such a tool before. You could use Antlr to parse SQLs (just 
as Spark SQL does while parsing queries). I think it's a one-hour project.

Jacek

On 21 Feb 2017 4:44 a.m., "Linyuxin" 
> wrote:
Hi All,
Is there any tool/api to check the sql syntax without running spark job 
actually?

Like the siddhiQL on storm here:
SiddhiManagerService. validateExecutionPlan
https://github.com/wso2/siddhi/blob/master/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/SiddhiManagerService.java
it can validate the syntax before running the sql on storm

this is very useful for exposing sql string as a DSL of the platform.

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org




Re: Serialization error - sql UDF related

2017-02-18 Thread Yong Zhang
You define "getNewColumnName" as method, which requires the class/object 
holding it has to be serializable.

>From the stack trace, it looks like this method defined in 
>ProductDimensionSFFConverterRealApp, but it is not serializable.


In fact, your method only uses String and Boolean, which are serializable by 
default. So you can change the definition to function, instead of method, which 
should work.


Yong



From: Darshan Pandya 
Sent: Friday, February 17, 2017 10:36 PM
To: user
Subject: Serialization error - sql UDF related

Hello,

I am getting the famous serialization exception on running some code as below,


val correctColNameUDF = udf(getNewColumnName(_: String, false: Boolean): 
String);
val charReference: DataFrame = thinLong.select("char_name_id", 
"char_name").withColumn("columnNameInDimTable", 
correctColNameUDF(col("char_name"))).withColumn("applicable_dimension", 
lit(dimension).cast(StringType)).distinct();
val charReferenceTableName: String = s"""$TargetSFFDBName.fg_char_reference"""
val tableName: String = charReferenceTableName.toString
charReference.saveAsTable(tableName, saveMode)

I think it has something to do with the UDF, so I am pasting the UDF function 
as well


def getNewColumnName(oldColName: String, appendID: Boolean): String = {
  var newColName = oldColName.replaceAll("\\s", "_").replaceAll("%", 
"_pct").replaceAllLiterally("#", "No")
  return newColName;
}

Exception seen is

Caused by: org.apache.spark.SparkException: Task not serializable
at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2066)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:707)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:706)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:706)
at 
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:86)
at 
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:80)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
... 73 more
Caused by: java.io.NotSerializableException: 
com.nielsen.datamodel.converters.cip2sff.ProductDimensionSFFConverterRealApp$
Serialization stack:
- object not serializable (class: 
com.nielsen.datamodel.converters.cip2sff.ProductDimensionSFFConverterRealApp$, 
value: 
com.nielsen.datamodel.converters.cip2sff.ProductDimensionSFFConverterRealApp$@247a8411)
- field (class: 
com.nielsen.datamodel.converters.cip2sff.CommonTransformationTraits$$anonfun$1, 
name: $outer, type: interface 
com.nielsen.datamodel.converters.cip2sff.CommonTransformationTraits)
- object (class 
com.nielsen.datamodel.converters.cip2sff.CommonTransformationTraits$$anonfun$1, 
)
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, 
name: func$2, type: interface scala.Function1)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, 
)
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: f, 
type: interface scala.Function1)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, 
UDF(char_name#3))
- field (class: org.apache.spark.sql.catalyst.expressions.Alias, name: child, 
type: class org.apache.spark.sql.catalyst.expressions.Expression)
- object (class org.apache.spark.sql.catalyst.expressions.Alias, 
UDF(char_name#3) AS columnNameInDimTable#304)
- element of array (index: 2)
- array (class [Ljava.lang.Object;, size 4)
- field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class 
[Ljava.lang.Object;)
- object (class scala.collection.mutable.ArrayBuffer, 
ArrayBuffer(char_name_id#2, char_name#3, UDF(char_name#3) AS 
columnNameInDimTable#304, PRODUCT AS applicable_dimension#305))
- field (class: org.apache.spark.sql.execution.Project, name: projectList, 
type: interface scala.collection.Seq)
- object (class org.apache.spark.sql.execution.Project, Project 
[char_name_id#2,char_name#3,UDF(char_name#3) AS 
columnNameInDimTable#304,PRODUCT AS applicable_dimension#305]



--
Sincerely,
Darshan



Re: Efficient Spark-Sql queries when only nth Column changes

2017-02-18 Thread Yong Zhang
If you only need the group by in the same hierarchy logic, then you can group 
by at the lowest level, and cache it, then use the cached DF to derive to the 
higher level, so Spark will only scan the originally table once, and reuse the 
cache in the following.


val df_base =  sqlContext.sql("select col1,col2,col3,col4,col5, count(*) from 
table groupby col1,col2,col3,col4,col5").cache

df_base.registerTempTable("df_base")

val df1 = sqlContext.sql("select col1, col2, count(*) from df_base group by 
col1, col2")

val df2 = // similar logic

Yong

From: Patrick 
Sent: Saturday, February 18, 2017 4:23 PM
To: user
Subject: Efficient Spark-Sql queries when only nth Column changes

Hi,

I have read 5 columns from parquet into data frame. My queries on the parquet 
table is of below type:

val df1 = sqlContext.sql(select col1,col2,count(*) from table groupby col1,col2)
val df2 = sqlContext.sql(select col1,col3,count(*) from table  groupby 
col1,col3)
val df3 = sqlContext.sql(select col1,col4,count(*) from table  groupby 
col1,col4)
val df4 = sqlContext.sql(select col1,col5,count(*) from table  groupby 
col1,col5)

And then i require to union the results from df1 to df4 into a single df.


So basically, only the second column is changing, Is there any efficient way to 
write the above queries  in Spark-Sql instead of writing 4 different queries(OR 
in loop) and doing union to get the result.


Thanks







Re: skewed data in join

2017-02-16 Thread Yong Zhang
Yes. You have to change your key, or as BigData term, "adding salt".


Yong


From: Gourav Sengupta 
Sent: Thursday, February 16, 2017 11:11 AM
To: user
Subject: skewed data in join

Hi,

Is there a way to do multiple reducers for joining on skewed data?

Regards,
Gourav


Re: How to specify default value for StructField?

2017-02-15 Thread Yong Zhang
If it works under hive, do you try just create the DF from Hive table directly 
in Spark? That should work, right?


Yong



From: Begar, Veena <veena.be...@hpe.com>
Sent: Wednesday, February 15, 2017 10:16 AM
To: Yong Zhang; smartzjp; user@spark.apache.org
Subject: RE: How to specify default value for StructField?


Thanks Yong.



I know about merging the schema option.

Using Hive we can read AVRO files having different schemas. And also we can do 
the same in Spark also.

Similarly we can read ORC files having different schemas in Hive. But, we can’t 
do the same in Spark using dataframe. How we can do it using dataframe?



Thanks.

From: Yong Zhang [mailto:java8...@hotmail.com]
Sent: Tuesday, February 14, 2017 8:31 PM
To: Begar, Veena <veena.be...@hpe.com>; smartzjp <zjp_j...@163.com>; 
user@spark.apache.org
Subject: Re: How to specify default value for StructField?



You maybe are looking for something like "spark.sql.parquet.mergeSchema" for 
ORC. Unfortunately, I don't think it is available, unless someone tells me I am 
wrong.

You can create a JIRA to request this feature, but we all know that Parquet is 
the first citizen format [??]



Yong





From: Begar, Veena <veena.be...@hpe.com<mailto:veena.be...@hpe.com>>
Sent: Tuesday, February 14, 2017 10:37 AM
To: smartzjp; user@spark.apache.org<mailto:user@spark.apache.org>
Subject: RE: How to specify default value for StructField?



Thanks, it didn't work. Because, the folder has files from 2 different schemas.
It fails with the following exception:
org.apache.spark.sql.AnalysisException: cannot resolve '`f2`' given input 
columns: [f1];


-Original Message-
From: smartzjp [mailto:zjp_j...@163.com]
Sent: Tuesday, February 14, 2017 10:32 AM
To: Begar, Veena <veena.be...@hpe.com<mailto:veena.be...@hpe.com>>; 
user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: How to specify default value for StructField?

You can try the below code.

val df = spark.read.format("orc").load("/user/hos/orc_files_test_together")
df.select(“f1”,”f2”).show





在 2017/2/14 上午6:54,“vbegar”

Re: How to specify default value for StructField?

2017-02-14 Thread Yong Zhang
You maybe are looking for something like "spark.sql.parquet.mergeSchema" for 
ORC. Unfortunately, I don't think it is available, unless someone tells me I am 
wrong.

You can create a JIRA to request this feature, but we all know that Parquet is 
the first citizen format []

Yong


From: Begar, Veena 
Sent: Tuesday, February 14, 2017 10:37 AM
To: smartzjp; user@spark.apache.org
Subject: RE: How to specify default value for StructField?

Thanks, it didn't work. Because, the folder has files from 2 different schemas.
It fails with the following exception:
org.apache.spark.sql.AnalysisException: cannot resolve '`f2`' given input 
columns: [f1];


-Original Message-
From: smartzjp [mailto:zjp_j...@163.com]
Sent: Tuesday, February 14, 2017 10:32 AM
To: Begar, Veena ; user@spark.apache.org
Subject: Re: How to specify default value for StructField?

You can try the below code.

val df = spark.read.format("orc").load("/user/hos/orc_files_test_together")
df.select(“f1”,”f2”).show





在 2017/2/14 

Re: Spark #cores

2017-01-18 Thread Yong Zhang
Tried it first, to see if it indeed changes the parallelism you want to control 
in the pageRank you are running.


Starting it with the # of cores you want to give to your job, increasing it 
when your job fails due to GC OOM.


Yong



From: Saliya Ekanayake <esal...@gmail.com>
Sent: Wednesday, January 18, 2017 3:21 PM
To: Yong Zhang
Cc: spline_pal...@yahoo.com; jasbir.s...@accenture.com; User
Subject: Re: Spark #cores

So, I should be using spark.sql.shuffle.partitions to control the parallelism? 
Is there there a guide to how to tune this?

Thank you,
Saliya

On Wed, Jan 18, 2017 at 2:01 PM, Yong Zhang 
<java8...@hotmail.com<mailto:java8...@hotmail.com>> wrote:

spark.sql.shuffle.partitions is not only controlling of the Spark SQL, but also 
in any implementation based on Spark DataFrame.


If you are using "spark.ml<http://spark.ml>" package, then most ML libraries in 
it are based on DataFrame. So you shouldn't use "spark.default.parallelism", 
instead of "spark.sql.shuffle.partitions".


Yong



From: Saliya Ekanayake <esal...@gmail.com<mailto:esal...@gmail.com>>
Sent: Wednesday, January 18, 2017 12:33 PM
To: spline_pal...@yahoo.com<mailto:spline_pal...@yahoo.com>
Cc: jasbir.s...@accenture.com<mailto:jasbir.s...@accenture.com>; User
Subject: Re: Spark #cores

The Spark version I am using is 2.10. The language is Scala. This is running in 
standalone cluster mode.

Each worker is able to use all physical CPU cores in the cluster as is the 
default case.

I was using the following parameters to spark-submit

--conf spark.executor.cores=1 --conf spark.default.parallelism=32

Later, I read that the term "cores" doesn't mean physical CPU cores but rather 
#tasks that an executor can execute.

Anyway, I don't have a clear idea how to set the number of executors per 
physical node. I see there's an option in the Yarn mode, but it's not available 
for standalone cluster mode.

Thank you,
Saliya

On Wed, Jan 18, 2017 at 12:13 PM, Palash Gupta 
<spline_pal...@yahoo.com<mailto:spline_pal...@yahoo.com>> wrote:
Hi,

Can you please share how you are assigning cpu core & tell us spark version and 
language you are using?

//Palash

Sent from Yahoo Mail on 
Android<https://overview.mail.yahoo.com/mobile/?.src=Android>

On Wed, 18 Jan, 2017 at 10:16 pm, Saliya Ekanayake
<esal...@gmail.com<mailto:esal...@gmail.com>> wrote:
Thank you, for the quick response. No, this is not Spark SQL. I am running the 
built-in PageRank.

On Wed, Jan 18, 2017 at 10:33 AM, <jasbir.s...@accenture.com> wrote:
Are you talking here of Spark SQL ?
If yes, spark.sql.shuffle.partitions needs to be changed.

From: Saliya Ekanayake [mailto:esal...@gmail.com]
Sent: Wednesday, January 18, 2017 8:56 PM
To: User <user@spark.apache.org>
Subject: Spark #cores

Hi,

I am running a Spark application setting the number of executor cores 1 and a 
default parallelism of 32 over 8 physical nodes.

The web UI shows it's running on 200 cores. I can't relate this number to the 
parameters I've used. How can I control the parallelism in a more deterministic 
way?

Thank you,
Saliya

--
Saliya Ekanayake, Ph.D
Applied Computer Scientist
Network Dynamics and Simulation Science Laboratory (NDSSL)
Virginia Tech, Blacksburg




This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Where allowed by local law, electronic 
communications with Accenture and its affiliates, including e-mail and instant 
messaging (including content), may be scanned by our systems for the purposes 
of information security and assessment of internal compliance with Accenture 
policy.
__ __ 
__

www.accenture.com<http://www.accenture.com>



--
Saliya Ekanayake, Ph.D
Applied Computer Scientist
Network Dynamics and Simulation Science Laboratory (NDSSL)
Virginia Tech, Blacksburg




--
Saliya Ekanayake, Ph.D
Applied Computer Scientist
Network Dynamics and Simulation Science Laboratory (NDSSL)
Virginia Tech, Blacksburg




--
Saliya Ekanayake, Ph.D
Applied Computer Scientist
Network Dynamics and Simulation Science Laboratory (NDSSL)
Virginia Tech, Blacksburg



Re: Spark #cores

2017-01-18 Thread Yong Zhang
spark.sql.shuffle.partitions is not only controlling of the Spark SQL, but also 
in any implementation based on Spark DataFrame.


If you are using "spark.ml" package, then most ML libraries in it are based on 
DataFrame. So you shouldn't use "spark.default.parallelism", instead of 
"spark.sql.shuffle.partitions".


Yong



From: Saliya Ekanayake 
Sent: Wednesday, January 18, 2017 12:33 PM
To: spline_pal...@yahoo.com
Cc: jasbir.s...@accenture.com; User
Subject: Re: Spark #cores

The Spark version I am using is 2.10. The language is Scala. This is running in 
standalone cluster mode.

Each worker is able to use all physical CPU cores in the cluster as is the 
default case.

I was using the following parameters to spark-submit

--conf spark.executor.cores=1 --conf spark.default.parallelism=32

Later, I read that the term "cores" doesn't mean physical CPU cores but rather 
#tasks that an executor can execute.

Anyway, I don't have a clear idea how to set the number of executors per 
physical node. I see there's an option in the Yarn mode, but it's not available 
for standalone cluster mode.

Thank you,
Saliya

On Wed, Jan 18, 2017 at 12:13 PM, Palash Gupta 
> wrote:
Hi,

Can you please share how you are assigning cpu core & tell us spark version and 
language you are using?

//Palash

Sent from Yahoo Mail on 
Android

On Wed, 18 Jan, 2017 at 10:16 pm, Saliya Ekanayake
> wrote:
Thank you, for the quick response. No, this is not Spark SQL. I am running the 
built-in PageRank.

On Wed, Jan 18, 2017 at 10:33 AM,  wrote:
Are you talking here of Spark SQL ?
If yes, spark.sql.shuffle.partitions needs to be changed.

From: Saliya Ekanayake [mailto:esal...@gmail.com]
Sent: Wednesday, January 18, 2017 8:56 PM
To: User 
Subject: Spark #cores

Hi,

I am running a Spark application setting the number of executor cores 1 and a 
default parallelism of 32 over 8 physical nodes.

The web UI shows it's running on 200 cores. I can't relate this number to the 
parameters I've used. How can I control the parallelism in a more deterministic 
way?

Thank you,
Saliya

--
Saliya Ekanayake, Ph.D
Applied Computer Scientist
Network Dynamics and Simulation Science Laboratory (NDSSL)
Virginia Tech, Blacksburg




This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Where allowed by local law, electronic 
communications with Accenture and its affiliates, including e-mail and instant 
messaging (including content), may be scanned by our systems for the purposes 
of information security and assessment of internal compliance with Accenture 
policy.
__ __ 
__

www.accenture.com



--
Saliya Ekanayake, Ph.D
Applied Computer Scientist
Network Dynamics and Simulation Science Laboratory (NDSSL)
Virginia Tech, Blacksburg




--
Saliya Ekanayake, Ph.D
Applied Computer Scientist
Network Dynamics and Simulation Science Laboratory (NDSSL)
Virginia Tech, Blacksburg



Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

2017-01-17 Thread Yong Zhang
What DB you are using for your Hive meta store, and what types are your 
partition columns?


You maybe want to read the discussion in SPARK-6910, and especially the 
comments in PR. There are some limitation about partition pruning in 
Hive/Spark, maybe yours is one of them.


Yong



From: Raju Bairishetti 
Sent: Tuesday, January 17, 2017 3:00 AM
To: user @spark
Subject: Re: Spark sql query plan contains all the partitions from hive table 
even though filtering of partitions is provided

Had a high level look into the code. Seems getHiveQlPartitions  method from 
HiveMetastoreCatalog is getting called irrespective of 
metastorePartitionPruning conf value.

 It should not fetch all partitions if we set metastorePartitionPruning to true 
(Default value for this is false)

def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
  val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
table.getPartitions(predicates)
  } else {
allPartitions
  }

...

def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
  client.getPartitionsByFilter(this, predicates)

lazy val allPartitions = table.getAllPartitions

But somehow getAllPartitions is getting called eventough after setting 
metastorePartitionPruning to true.

Am I missing something or looking at wrong place?

On Mon, Jan 16, 2017 at 12:53 PM, Raju Bairishetti 
> wrote:
Waiting for suggestions/help on this...

On Wed, Jan 11, 2017 at 12:14 PM, Raju Bairishetti 
> wrote:
Hello,

   Spark sql is generating query plan with all partitions information even 
though if we apply filters on partitions in the query.  Due to this, spark 
driver/hive metastore is hitting with OOM as each table is with lots of 
partitions.

We can confirm from hive audit logs that it tries to fetch all partitions from 
hive metastore.

 2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit 
(HiveMetaStore.java:logAuditEvent(371)) - ugi=rajubip=/x.x.x.x   
cmd=get_partitions : db= tbl=x


Configured the following parameters in the spark conf to fix the above 
issue(source: from spark-jira & github pullreq):
spark.sql.hive.convertMetastoreParquet   false
spark.sql.hive.metastorePartitionPruning   true

   plan:  rdf.explain
   == Physical Plan ==
   HiveTableScan [rejection_reason#626], MetastoreRelation dbname, 
tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 = 28),(hour#317 
= 2),(venture#318 = DEFAULT)]

get_partitions_by_filter method is called and fetching only required 
partitions.

But we are seeing parquetDecode errors in our applications frequently after 
this. Looks like these decoding errors were because of changing serde from 
spark-builtin to hive serde.

I feel like, fixing query plan generation in the spark-sql is the right 
approach instead of forcing users to use hive serde.

Is there any workaround/way to fix this issue? I would like to hear more 
thoughts on this :)

--
Thanks,
Raju Bairishetti,
www.lazada.com



--

--
Thanks,
Raju Bairishetti,
www.lazada.com



--

--
Thanks,
Raju Bairishetti,
www.lazada.com


Re: Spark SQL 1.6.3 ORDER BY and partitions

2017-01-09 Thread Yong Zhang
I am not sure what do you mean that "table" is comprised of 200/1200 partitions.


A partition could mean the dataset(RDD/DataFrame) will be chunked within Spark, 
then processed; Or it could mean you define the metadata in the Hive of the 
partitions of the table.

If you mean the first one, so you control the number of partitions by 
'spark.sql.shuffle.partitions', which has the default value of 200.

I will be surprised that a query works with default 200, but fails with the new 
value you set as 1200. As in general, when you increase this value, you force 
more partitions in your DF, which will lead less data per partition. So if you 
overset this value, it will hurt your performance, but should fail your job, if 
you can run the same job with less configured value.

Yong


From: Joseph Naegele 
Sent: Friday, January 6, 2017 1:14 PM
To: 'user'
Subject: Spark SQL 1.6.3 ORDER BY and partitions

I have two separate but similar issues that I've narrowed down to a pretty good 
level of detail. I'm using Spark 1.6.3, particularly Spark SQL.

I'm concerned with a single dataset for now, although the details apply to 
other, larger datasets. I'll call it "table". It's around 160 M records, 
average of 78 bytes each, so about 12 GB uncompressed. It's 2 GB compressed in 
HDFS.

First issue:
The following query works if "table" is comprised of 200 partitions (on disk), 
but fails when "table" is 1200 partitions with the "Total size of serialized 
results of 1031 tasks (6.0 GB) is bigger than spark.driver.maxResultSize (6.0 
GB)" error:

SELECT * FROM orc.`table` ORDER BY field DESC LIMIT 10;

This is possibly related to the TakeOrderedAndProject step in the execution 
plan, because the following queries do not give me problems:

SELECT * FROM orc.`table`;
SELECT * FROM orc.`table` ORDER BY field DESC;
SELECT * FROM orc.`table` LIMIT 10;

All of which have different execution plans.
My "table" has 1200 partitions because I must use a large value for 
spark.sql.shuffle.partitions to handle joins and window functions on much 
larger DataFrames in my application. Too many partitions may be suboptimal, but 
it shouldn't lead to large serialized results, correct?

Any ideas? I've seen https://issues.apache.org/jira/browse/SPARK-12837, but I 
think my issue is a bit more specific.


Second issue:
The difference between execution when calling .cache() and .count() on the 
following two DataFrames:

A: sqlContext.sql("SELECT * FROM table")
B: sqlContext.sql("SELECT * FROM table ORDER BY field DESC")

Counting the rows of A works as expected. A single Spark job with 2 stages. 
Load from Hadoop, map, aggregate, reduce to a number.

The same can't be said for B, however. The .cache() call spawns a Spark job 
before I even call .count(), loading from HDFS and performing ConvertToSafe and 
Exchange. The .count() call spawns another job, the first task of which appears 
to re-load from HDFS and again perform ConvertToSafe and Exchange, writing 1200 
shuffle partitions. The next stage then proceeds to read the shuffle data 
across only 2 tasks. One of these tasks completes immediately and the other 
runs indefinitely, failing because the partition is too large (the 
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE error).

Does this behavior make sense at all? Obviously it doesn't make sense to sort 
rows if I'm just counting them, but this is a simplified example of a more 
complex application in which caching makes sense. My executors have more than 
enough memory to cache this entire DataFrame.

Thanks for reading

---
Joe Naegele
Grier Forensics



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Java to show struct field from a Dataframe

2016-12-17 Thread Yong Zhang
Why not you just return the struct you defined, instead of an array?


@Override
public Row call(Double x, Double y) throws Exception {
Row row = RowFactory.create(x, y);
return row;
}



From: Richard Xin <richardxin...@yahoo.com>
Sent: Saturday, December 17, 2016 8:53 PM
To: Yong Zhang; zjp_j...@163.com; user
Subject: Re: Java to show struct field from a Dataframe

I tried to transform
root
 |-- latitude: double (nullable = false)
 |-- longitude: double (nullable = false)
 |-- name: string (nullable = true)

to:
root
 |-- name: string (nullable = true)
 |-- location: struct (nullable = true)
 ||-- longitude: double (nullable = true)
 ||-- latitude: double (nullable = true)

Code snippet is as followings:

sqlContext.udf().register("toLocation", new UDF2<Double, Double, Row>() 
{
@Override
public Row call(Double x, Double y) throws Exception {
Row row = RowFactory.create(new double[] { x, y });
return row;
}
}, DataTypes.createStructType(new StructField[] {
new StructField("longitude", DataTypes.DoubleType, true, 
Metadata.empty()),
new StructField("latitude", DataTypes.DoubleType, true, 
Metadata.empty())
}));

DataFrame transformedDf1 = citiesDF.withColumn("location",
callUDF("toLocation", col("longitude"), col("latitude")));

transformedDf1.drop("latitude").drop("longitude").schema().printTreeString();  
// prints schema tree OK as expected

transformedDf.show();  // java.lang.ClassCastException: [D cannot be 
cast to java.lang.Double


seems to me that the ReturnType of the UDF2 might be the root cause. but not 
sure how to correct.

Thanks,
Richard




On Sunday, December 18, 2016 7:15 AM, Yong Zhang <java8...@hotmail.com> wrote:


"[D" type means a double array type. So this error simple means you have 
double[] data, but Spark needs to cast it to Double, as your schema defined.

The error message clearly indicates the data doesn't match with  the type 
specified in the schema.

I wonder how you are so sure about your data? Do you check it under other tool?

Yong



From: Richard Xin <richardxin...@yahoo.com.INVALID>
Sent: Saturday, December 17, 2016 10:56 AM
To: zjp_j...@163.com; user
Subject: Re: Java to show struct field from a Dataframe

data is good


On Saturday, December 17, 2016 11:50 PM, "zjp_j...@163.com" <zjp_j...@163.com> 
wrote:


I think the causation is your invanlid Double data , have u checked your data ?


zjp_j...@163.com

From: Richard Xin<mailto:richardxin...@yahoo.com.INVALID>
Date: 2016-12-17 23:28
To: User<mailto:user@spark.apache.org>
Subject: Java to show struct field from a Dataframe
let's say I have a DataFrame with schema of followings:
root
 |-- name: string (nullable = true)
 |-- location: struct (nullable = true)
 ||-- longitude: double (nullable = true)
 ||-- latitude: double (nullable = true)

df.show(); throws following exception:

java.lang.ClassCastException: [D cannot be cast to java.lang.Double
at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:119)
at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getDouble(rows.scala:44)
at 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getDouble(rows.scala:221)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)


Any advise?
Thanks in advance.
Richard






Re: Java to show struct field from a Dataframe

2016-12-17 Thread Yong Zhang
"[D" type means a double array type. So this error simple means you have 
double[] data, but Spark needs to cast it to Double, as your schema defined.


The error message clearly indicates the data doesn't match with  the type 
specified in the schema.


I wonder how you are so sure about your data? Do you check it under other tool?


Yong



From: Richard Xin 
Sent: Saturday, December 17, 2016 10:56 AM
To: zjp_j...@163.com; user
Subject: Re: Java to show struct field from a Dataframe

data is good


On Saturday, December 17, 2016 11:50 PM, "zjp_j...@163.com"  
wrote:


I think the causation is your invanlid Double data , have u checked your data ?


zjp_j...@163.com

From: Richard Xin
Date: 2016-12-17 23:28
To: User
Subject: Java to show struct field from a Dataframe
let's say I have a DataFrame with schema of followings:
root
 |-- name: string (nullable = true)
 |-- location: struct (nullable = true)
 ||-- longitude: double (nullable = true)
 ||-- latitude: double (nullable = true)

df.show(); throws following exception:

java.lang.ClassCastException: [D cannot be cast to java.lang.Double
at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:119)
at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getDouble(rows.scala:44)
at 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getDouble(rows.scala:221)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)


Any advise?
Thanks in advance.
Richard




Re: null values returned by max() over a window function

2016-11-29 Thread Yong Zhang
This is not a bug, but a intension of windows function.


When you use max + rowsBetween, it is kind of strange requirement.


RowsBetween is more like to be used to calculate the moving sun or avg, which 
will handle null as 0.


But in your case, you want your grouping window as 2 rows before + 2 rows after 
current row, plus use max function. In the max function, if the current row is 
already in the last row (with max revenue per catalog), then it won't have 2 
rows after it. So in this case, the max function has to return NULL, as 
max(null, anything) is NULL.


Yong



From: Han-Cheol Cho 
Sent: Monday, November 28, 2016 10:57 PM
To: user@spark.apache.org
Subject: null values returned by max() over a window function


Hello,

I am trying to test Spark's SQL window functions in the following blog,
  
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

Introducing Window Functions in Spark SQL - 
Databricks
databricks.com
To use window functions, users need to mark that a function is used as a window 
function by either. Adding an OVER clause after a supported function in SQL, 
e.g. avg ...



, and facing a problem as follows:

# testing rowsBetween()
winSpec2 = 
window.Window.partitionBy(data["category"]).orderBy(data["revenue"]).rowsBetween(2,2)
tmp4 = functions.max(data["revenue"]).over(winSpec2)
data.select(["product","category","revenue", 
tmp4.alias("rowbetween2and2")]).orderBy(["category","revenue"]).show()

+--+--+---+---+
   product  category   revenuerowbetween2and2
+--+--+---+---+
  BendableCell phone   3000   5000
  FoldableCell   phone   3000   6000
Ultra thinCell   phone   5000   6000
  ThinCellphone   6000   null --> ???
 Very thinCell phone   6000   null
Normal  Tablet   1500   4500
   Big Tablet   2500   5500
   Pro Tablet   4500   6500
  Mini Tablet   5500   null
  Pro2 Tablet   6500   null
+--+--+---+---+

As you can see, the last column calculates the max value among the current row,
left two rows and right two rows partitioned by category row.
However, the result for the last two rows in each category partition is null.

Is there something that I missed or is this a bug?



Han-Cheol Cho Data Laboratory / Data Scientist
?160-0022?6-27-30??13?
Email  hancheol@nhn-techorus.com 


[https://kr1-mail.worksmobile.com/readReceipt/notify/?img=YZYlKoU%2FFoJvKqmsKxgmpoEXMoF0K63oK4ulpztZMqElpxpCKxv%2Fp6M9poEdtzFr%2BrkSKxu5%2Br9C16m5W4C5bX0q%2BzkR74FTWx%2Fs%2BBF0bvIqbzJZ1ZlCbzJo1zE5WXiN.gif]


Re: Dataframe broadcast join hint not working

2016-11-28 Thread Yong Zhang
If your query plan has "Project" in it, there is a bug in Spark preventing 
"broadcast" hint working in pre-2.0 release.


https://issues.apache.org/jira/browse/SPARK-13383


Unfortunately, there is no port fix in 1.x.


Yong



From: Anton Okolnychyi 
Sent: Saturday, November 26, 2016 4:05 PM
To: Swapnil Shinde
Cc: Benyi Wang; user@spark.apache.org
Subject: Re: Dataframe broadcast join hint not working

Hi guys,

I also experienced a situation when Spark 1.6.2 ignored my hint to do a 
broadcast join (i.e. broadcast(df)) with a small dataset. However, this 
happened only in 1 of 3 cases. Setting the 
"spark.sql.autoBroadcastJoinThreshold" property did not have any impact as 
well. All 3 cases work fine in Spark 2.0.

Is there any chance that Spark can neglect manually specified broadcast 
operation? In other words, is Spark forced to perform a broadcast if one 
specifies "df1.join(broadcast(df2), ...)"?

Best regards,
Anton



2016-11-26 21:04 GMT+01:00 Swapnil Shinde 
>:
I am using Spark 1.6.3 and below is the real plan (a,b,c in above were just for 
illustration purpose)

== Physical Plan ==
Project [ltt#3800 AS ltt#3814,CASE WHEN isnull(etv_demo_id#3813) THEN 
mr_demo_id#3801 ELSE etv_demo_id#3813 AS etv_demo_id#3815]
+- SortMergeOuterJoin [mr_demoname#3802,mr_demo_id#3801], 
[mr_demoname#3810,mr_demo_id#3811], LeftOuter, None
   :- Sort [mr_demoname#3802 ASC,mr_demo_id#3801 ASC], false, 0
   :  +- TungstenExchange 
hashpartitioning(mr_demoname#3802,mr_demo_id#3801,200), None
   : +- Project [_1#3797 AS ltt#3800,_2#3798 AS mr_demo_id#3801,_3#3799 AS 
mr_demoname#3802]
   :+- Scan ExistingRDD[_1#3797,_2#3798,_3#3799]
   +- Sort [mr_demoname#3810 ASC,mr_demo_id#3811 ASC], false, 0
  +- TungstenExchange 
hashpartitioning(mr_demoname#3810,mr_demo_id#3811,200), None
 +- Project [mr_demoname#3810,mr_demo_id#3811,etv_demo_id#3813]
+- Project [demogroup#3803 AS mr_demoname#3810,demovalue#3804 AS 
mr_demo_id#3811,demoname#3805 AS mr_demo_value#3812,demovalue_etv_map#3806 AS 
etv_demo_id#3813]
   +- Filter ((map_type#3809 = master_roster_to_etv) && NOT 
(demogroup#3803 = gender_age_id))
  +- Scan 
ExistingRDD[demogroup#3803,demovalue#3804,demoname#3805,demovalue_etv_map#3806,demoname_etv_map#3807,demovalue_old_map#3808,map_type#3809]


Thanks
Swapnil

On Sat, Nov 26, 2016 at 2:32 PM, Benyi Wang 
> wrote:
Could you post the result of explain `c.explain`? If it is broadcast join, you 
will see it in explain.

On Sat, Nov 26, 2016 at 10:51 AM, Swapnil Shinde 
> wrote:
Hello
I am trying a broadcast join on dataframes but it is still doing 
SortMergeJoin. I even try setting spark.sql.autoBroadcastJoinThreshold higher 
but still no luck.

Related piece of code-
  val c = a.join(braodcast(b), "id")

On a side note, if I do SizeEstimator.estimate(b) and it is really 
high(460956584 bytes) compared to data it contains. b has just 85 rows and 
around 4964 bytes.
Help is very much appreciated!!

Thanks
Swapnil







Re: find outliers within data

2016-11-22 Thread Yong Zhang
Spark Dataframe window functions?


https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

Introducing Window Functions in Spark SQL - 
Databricks
databricks.com
To use window functions, users need to mark that a function is used as a window 
function by either. Adding an OVER clause after a supported function in SQL, 
e.g. avg ...






From: anup ahire 
Sent: Tuesday, November 22, 2016 11:00 AM
To: user@spark.apache.org
Subject: find outliers within data


I have a large data set with millions of records which is something like

Movie Likes Comments Shares Views
 A 100 10  20 30
 A 102 11  22 35
 A 104 12  25 45
 A *103*   13 *24*50
 B 200 10  20 30
 B 205*9*  21 35
 B *203*   12  29 42
 B 210 13 *23*   *39*


Likes, comments etc are rolling totals and they are suppose to increase. If 
there is drop in any of this for a movie then its a bad data needs to be 
identified.

I have initial thoughts about groupby movie and then sort within the group. I 
am using dataframes in spark 1.6 for processing and it does not seem to be 
achievable as there is no sorting within the grouped data in dataframe.

Buidling something for outlier detection can be another approach but because of 
time constraint I have not explored it yet.

Is there anyway I can achieve this ?

Thanks !!


Re: Will spark cache table once even if I call read/cache on the same table multiple times

2016-11-20 Thread Yong Zhang
If you have 2 different RDD (as 2 different references and RDD ID shown in your 
example), then YES, Spark will cache 2 exactly same thing in the memory.


There is no way that spark will compare and know that they are the same 
content. You define them as 2 RDD, then they are different RDDs, and will be 
cached individually.


Yong



From: Taotao.Li <charles.up...@gmail.com>
Sent: Sunday, November 20, 2016 6:18 AM
To: Rabin Banerjee
Cc: Yong Zhang; user; Mich Talebzadeh; Tathagata Das
Subject: Re: Will spark cache table once even if I call read/cache on the same 
table multiple times

hi, you can check my stackoverflow question : 
http://stackoverflow.com/questions/36195105/what-happens-if-i-cache-the-same-rdd-twice-in-spark/36195812#36195812

On Sat, Nov 19, 2016 at 3:16 AM, Rabin Banerjee 
<dev.rabin.baner...@gmail.com<mailto:dev.rabin.baner...@gmail.com>> wrote:
Hi Yong,

  But every time  val tabdf = sqlContext.table(tablename) is called tabdf.rdd 
is having a new id which can be checked by calling 
tabdf.rdd.id<http://tabdf.rdd.id> .
And,
https://github.com/apache/spark/blob/b6de0c98c70960a97b07615b0b08fbd8f900fbe7/core/src/main/scala/org/apache/spark/SparkContext.scala#L268

Spark is maintaining the Map if [RDD_ID,RDD] , as RDD id is changing , will 
spark cache same data again and again ??

For example ,

val tabdf = sqlContext.table("employee")
tabdf.cache()
tabdf.someTransformation.someAction
println(tabledf.rdd.id<http://tabledf.rdd.id>)
val tabdf1 = sqlContext.table("employee")
tabdf1.cache() <= Will spark again go to disk read and load data into memory or 
look into cache ?
tabdf1.someTransformation.someAction
println(tabledf1.rdd.id<http://tabledf1.rdd.id>)

Regards,
R Banerjee




On Fri, Nov 18, 2016 at 9:14 PM, Yong Zhang 
<java8...@hotmail.com<mailto:java8...@hotmail.com>> wrote:

That's correct, as long as you don't change the StorageLevel.


https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L166



Yong


From: Rabin Banerjee 
<dev.rabin.baner...@gmail.com<mailto:dev.rabin.baner...@gmail.com>>
Sent: Friday, November 18, 2016 10:36 AM
To: user; Mich Talebzadeh; Tathagata Das
Subject: Will spark cache table once even if I call read/cache on the same 
table multiple times

Hi All ,

  I am working in a project where code is divided into multiple reusable module 
. I am not able to understand spark persist/cache on that context.

My Question is Will spark cache table once even if I call read/cache on the 
same table multiple times ??

 Sample Code ::

  TableReader::

   def getTableDF(tablename:String,persist:Boolean = false) : DataFrame = {
 val tabdf = sqlContext.table(tablename)
 if(persist) {
 tabdf.cache()
}
  return tableDF
}

 Now
Module1::
 val emp = TableReader.getTable("employee")
 emp.someTransformation.someAction

Module2::
 val emp = TableReader.getTable("employee")
 emp.someTransformation.someAction



ModuleN::
 val emp = TableReader.getTable("employee")
 emp.someTransformation.someAction

Will spark cache emp table once , or it will cache every time I am calling ?? 
Shall I maintain a global hashmap to handle that ? something like 
Map[String,DataFrame] ??

 Regards,
Rabin Banerjee







--
___
Quant | Engineer | Boy
___
blog:
http://litaotao.github.io<http://litaotao.github.io?utm_source=spark_mail>
github: www.github.com/litaotao<http://www.github.com/litaotao>


Re: Will spark cache table once even if I call read/cache on the same table multiple times

2016-11-18 Thread Yong Zhang
That's correct, as long as you don't change the StorageLevel.


https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L166



Yong


From: Rabin Banerjee 
Sent: Friday, November 18, 2016 10:36 AM
To: user; Mich Talebzadeh; Tathagata Das
Subject: Will spark cache table once even if I call read/cache on the same 
table multiple times

Hi All ,

  I am working in a project where code is divided into multiple reusable module 
. I am not able to understand spark persist/cache on that context.

My Question is Will spark cache table once even if I call read/cache on the 
same table multiple times ??

 Sample Code ::

  TableReader::

   def getTableDF(tablename:String,persist:Boolean = false) : DataFrame = {
 val tabdf = sqlContext.table(tablename)
 if(persist) {
 tabdf.cache()
}
  return tableDF
}

 Now
Module1::
 val emp = TableReader.getTable("employee")
 emp.someTransformation.someAction

Module2::
 val emp = TableReader.getTable("employee")
 emp.someTransformation.someAction



ModuleN::
 val emp = TableReader.getTable("employee")
 emp.someTransformation.someAction

Will spark cache emp table once , or it will cache every time I am calling ?? 
Shall I maintain a global hashmap to handle that ? something like 
Map[String,DataFrame] ??

 Regards,
Rabin Banerjee





Re: Long-running job OOMs driver process

2016-11-18 Thread Yong Zhang
Just wondering, is it possible the memory usage keeping going up due to the web 
UI content?


Yong



From: Alexis Seigneurin 
Sent: Friday, November 18, 2016 10:17 AM
To: Nathan Lande
Cc: Keith Bourgoin; Irina Truong; u...@spark.incubator.apache.org
Subject: Re: Long-running job OOMs driver process

+1 for using S3A.

It would also depend on what format you're using. I agree with Steve that 
Parquet, for instance, is a good option. If you're using plain text files, some 
people use GZ files but they cannot be partitioned, thus putting a lot of 
pressure on the driver. It doesn't look like this is the issue you're running 
into, though, because it would not be a progressive slow down, but please 
provide as much detail as possible about your app.

The cache could be an issue but the OOM would come from an executor, not from 
the driver.

>From what you're saying, Keith, it indeed looks like some memory is not being 
>freed. Seeing the code would help. If you can, also send all the logs (with 
>Spark at least in INFO level).

Alexis

On Fri, Nov 18, 2016 at 10:08 AM, Nathan Lande 
> wrote:

+1 to not threading.

What does your load look like? If you are loading many files and cacheing them 
in N rdds rather than 1 rdd this could be an issue.

If the above two things don't fix your oom issue, without knowing anything else 
about your job, I would focus on your cacheing strategy as a potential culprit. 
Try running without any cacheing to isolate the issue; bad cacheing strategy is 
the source of oom issues for me most of the time.

On Nov 18, 2016 6:31 AM, "Keith Bourgoin" 
> wrote:
Hi Alexis,

Thanks for the response. I've been working with Irina on trying to sort this 
issue out.

We thread the file processing to amortize the cost of things like getting files 
from S3. It's a pattern we've seen recommended in many places, but I don't have 
any of those links handy.  The problem isn't the threading, per se, but clearly 
some sort of memory leak in the driver itself.  Each file is a self-contained 
unit of work, so once it's done all memory related to it should be freed. 
Nothing in the script itself grows over time, so if it can do 10 concurrently, 
it should be able to run like that forever.

I've hit this same issue working on another Spark app which wasn't threaded, 
but produced tens of thousands of jobs. Eventually, the Spark UI would get 
slow, then unresponsive, and then be killed due to OOM.

I'll try to cook up some examples of this today, threaded and not. We were 
hoping that someone had seen this before and it rung a bell. Maybe there's a 
setting to clean up info from old jobs that we can adjust.

Cheers,

Keith.

On Thu, Nov 17, 2016 at 9:50 PM Alexis Seigneurin 
> wrote:
Hi Irina,

I would question the use of multiple threads in your application. Since Spark 
is going to run the processing of each DataFrame on all the cores of your 
cluster, the processes will be competing for resources. In fact, they would not 
only compete for CPU cores but also for memory.

Spark is designed to run your processes in a sequence, and each process will be 
run in a distributed manner (multiple threads on multiple instances). I would 
suggest to follow this principle.

Feel free to share to code if you can. It's always helpful so that we can give 
better advice.

Alexis

On Thu, Nov 17, 2016 at 8:51 PM, Irina Truong 
> wrote:

We have an application that reads text files, converts them to dataframes, and 
saves them in Parquet format. The application runs fine when processing a few 
files, but we have several thousand produced every day. When running the job 
for all files, we have spark-submit killed on OOM:


#
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill -9 %p"
#   Executing /bin/sh -c "kill -9 27226"...


The job is written in Python. We're running it in Amazon EMR 5.0 (Spark 2.0.0) 
with spark-submit. We're using a cluster with a master c3.2xlarge instance (8 
cores and 15g of RAM) and 3 core c3.4xlarge instances (16 cores and 30g of RAM 
each). Spark config settings are as follows:


('spark.serializer', 'org.apache.spark.serializer.KryoSerializer'),

('spark.executors.instances', '3'),

('spark.yarn.executor.memoryOverhead', '9g'),

('spark.executor.cores', '15'),

('spark.executor.memory', '12g'),

('spark.scheduler.mode', 'FIFO'),

('spark.cleaner.ttl', '1800'),


The job processes each file in a thread, and we have 10 threads running 
concurrently. The process will OOM after about 4 hours, at which point Spark 
has processed over 20,000 jobs.

It seems like the driver is running out of memory, but each individual job is 
quite small. Are there any known memory leaks for long-running Spark 
applications on Yarn?




Re: How to use Spark SQL to connect to Cassandra from Spark-Shell?

2016-11-11 Thread Yong Zhang
Read the document on https://github.com/datastax/spark-cassandra-connector


Yong




From: kant kodali 
Sent: Friday, November 11, 2016 11:04 AM
To: user @spark
Subject: How to use Spark SQL to connect to Cassandra from Spark-Shell?

How to use Spark SQL to connect to Cassandra from Spark-Shell?

Any examples ? I use Java 8.

Thanks!
kant


Re: With spark DataFrame, how to write to existing folder?

2016-09-23 Thread Yong Zhang
df.write.format(source).mode("overwrite").save(path)


Yong



From: Dan Bikle 
Sent: Friday, September 23, 2016 6:45 PM
To: user@spark.apache.org
Subject: With spark DataFrame, how to write to existing folder?

spark-world,

I am walking through the example here:

https://github.com/databricks/spark-csv#scala-api

The example complains if I try to write a DataFrame to an existing folder:

val selectedData = df.select("year", "model")
selectedData.write
.format("com.databricks.spark.csv")
.option("header", "true")
.save("newcars.csv")

I used google to look for DataFrame.write() API.

It sent me here:
http://spark.apache.org/docs/latest/sql-programming-guide.html#dataframe-data-readerwriter-interface

There I found this link:
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame@write:DataFrameWriter

And that link is a 404-error.

Question:
How to enhance this call so it overwrites instead of failing:

selectedData.write
.format("com.databricks.spark.csv")
.option("header", "true")
.save("newcars.csv")
??


Re: spark-xml to avro - SchemaParseException: Can't redefine

2016-09-08 Thread Yong Zhang
Do you take a look about this -> 
https://github.com/databricks/spark-avro/issues/54



Yong

[https://avatars0.githubusercontent.com/u/1457102?v=3=400]

spark-avro fails to save DF with nested records having the 
...
github.com
sixers changed the title from Save DF with nested records with the same name to 
spark-avro fails to save DF with nested records having the same name Jun 23, 
2015





From: Arun Patel 
Sent: Thursday, September 8, 2016 5:31 PM
To: user
Subject: spark-xml to avro - SchemaParseException: Can't redefine

I'm trying to convert XML to AVRO.  But, I am getting SchemaParser exception 
for 'Rules' which is existing in two separate containers.  Any thoughts?

XML is attached.

 df = 
sqlContext.read.format('com.databricks.spark.xml').options(rowTag='GGLResponse',attributePrefix='').load('GGL.xml')
 df.show()
 +++---++
 | ResponseDataset|  ResponseHeader|ns2|   xmlns|
 +++---++
 |[1,1],[SD2000...|[2016-07-26T16:28...|GGL|http://www..c...|
 +++---++

 >>> df.printSchema()
 root
  |-- ResponseDataset: struct (nullable = true)
  ||-- ResponseFileGGL: struct (nullable = true)
  |||-- OfferSets: struct (nullable = true)
  ||||-- OfferSet: struct (nullable = true)
  |||||-- OfferSetHeader: struct (nullable = true)
  ||||||-- OfferSetIdentifier: long (nullable = true)
  ||||||-- TotalOffersProcessed: long (nullable = true)
  |||||-- Offers: struct (nullable = true)
  ||||||-- Identifier: string (nullable = true)
  ||||||-- Offer: struct (nullable = true)
  |||||||-- Rules: struct (nullable = true)
  ||||||||-- Rule: array (nullable = true)
  |||||||||-- element: struct (containsNull = 
true)
  ||||||||||-- BorrowerIdentifier: long 
(nullable = true)
  ||||||||||-- RuleIdentifier: long 
(nullable = true)
  ||||||-- PartyRoleIdentifier: long (nullable = true)
  ||||||-- SuffixIdentifier: string (nullable = true)
  ||||||-- UCP: string (nullable = true)
  |||||-- Pool: struct (nullable = true)
  ||||||-- Identifier: string (nullable = true)
  ||||||-- PartyRoleIdentifier: long (nullable = true)
  ||||||-- Rules: struct (nullable = true)
  |||||||-- Rule: array (nullable = true)
  ||||||||-- element: struct (containsNull = true)
  |||||||||-- BIdentifier: long (nullable = 
true)
  |||||||||-- RIdentifier: long (nullable = 
true)
  ||||||-- SuffixIdentifier: string (nullable = true)
  ||||||-- UCP: string (nullable = true)
  |||-- ResultHeader: struct (nullable = true)
  ||||-- RequestDateTime: string (nullable = true)
  ||||-- ResultDateTime: string (nullable = true)
  ||-- ResponseFileUUID: string (nullable = true)
  ||-- ResponseFileVersion: double (nullable = true)
  |-- ResponseHeader: struct (nullable = true)
  ||-- ResponseDateTime: string (nullable = true)
  ||-- SessionIdentifier: string (nullable = true)
  |-- ns2: string (nullable = true)
  |-- xmlns: string (nullable = true)


df.write.format('com.databricks.spark.avro').save('ggl_avro')

16/09/08 17:07:20 INFO MemoryStore: Block broadcast_73 stored as values in 
memory (estimated size 233.5 KB, free 772.4 KB)
16/09/08 17:07:20 INFO MemoryStore: Block broadcast_73_piece0 stored as bytes 
in memory (estimated size 28.2 KB, free 800.6 KB)
16/09/08 17:07:20 INFO BlockManagerInfo: Added broadcast_73_piece0 in memory on 
localhost:29785 (size: 28.2 KB, free: 511.4 MB)
16/09/08 17:07:20 INFO SparkContext: Created broadcast 73 from newAPIHadoopFile 
at XmlFile.scala:39
Traceback (most recent call last):
  File "", line 1, in 
  File "/usr/hdp/2.4.2.0-258/spark/python/pyspark/sql/readwriter.py", line 397, 
in save
self._jwrite.save(path)
  File 
"/usr/hdp/2.4.2.0-258/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", 
line 813, in __call__
  File "/usr/hdp/2.4.2.0-258/spark/python/pyspark/sql/utils.py", line 45, in 
deco
return f(*a, **kw)
  File 
"/usr/hdp/2.4.2.0-258/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 
308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o426.save.
: 

Re: distribute work (files)

2016-09-07 Thread Yong Zhang
What error do you get? FileNotFoundException?


Please paste the stacktrace here.


Yong



From: Peter Figliozzi 
Sent: Wednesday, September 7, 2016 10:18 AM
To: ayan guha
Cc: Lydia Ickler; user.spark
Subject: Re: distribute work (files)

That's failing for me.  Can someone please try this-- is this even supposed to 
work:

  *   create a directory somewhere and add two text files to it
  *   mount that directory on the Spark worker machines with sshfs
  *   read the textfiles into one datas structure using a file URL with a 
wildcard

Thanks,

Pete

On Tue, Sep 6, 2016 at 11:20 PM, ayan guha 
> wrote:
To access local file, try with file:// URI.

On Wed, Sep 7, 2016 at 8:52 AM, Peter Figliozzi 
> wrote:
This is a great question.  Basically you don't have to worry about the 
details-- just give a wildcard in your call to textFile.  See the Programming 
Guide section 
entitled "External Datasets".  The Spark framework will distribute your data 
across the workers.  Note that:

If using a path on the local filesystem, the file must also be accessible at 
the same path on worker nodes. Either copy the file to all workers or use a 
network-mounted shared file system.

In your case this would mean the directory of files.

Curiously, I cannot get this to work when I mount a directory with sshfs on all 
of my worker nodes.  It says "file not found" even though the file clearly 
exists in the specified path on all workers.   Anyone care to try and comment 
on this?

Thanks,

Pete

On Tue, Sep 6, 2016 at 9:51 AM, Lydia Ickler 
> wrote:
Hi,

maybe this is a stupid question:

I have a list of files. Each file I want to take as an input for a 
ML-algorithm. All files are independent from another.
My question now is how do I distribute the work so that each worker takes a 
block of files and just runs the algorithm on them one by one.
I hope somebody can point me in the right direction! :)

Best regards,
Lydia
-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org





--
Best Regards,
Ayan Guha



Re: Re[8]: Spark 2.0: SQL runs 5x times slower when adding 29th field to aggregation.

2016-09-06 Thread Yong Zhang
This is an interesting point.


I tested with originally data with Spark 2.0 release, I can get the same 
statistic output in the originally email like following:


50 1.77695393562
51 0.695149898529
52 0.638142108917
53 0.647341966629
54 0.663456916809
55 0.629166126251
56 0.644149065018
57 0.661190986633
58 2.6616499424
59 2.6137509346
60 2.71165704727
61 2.63473916054


Then I tested with your suggestion:


spark/bin/pyspark --driver-java-options '-XX:-DontCompileHugeMethods'


Run the same test code, and here is the output:


50 1.77180695534
51 0.679394006729
52 0.629493951797
53 0.62108206749
54 0.637018918991
55 0.640591144562
56 0.649922132492
57 0.652480125427
58 0.636356830597
59 0.667215824127
60 0.643863916397
61 0.669810056686
62 0.664624929428
63 0.682888031006
64 0.691393136978
65 0.690823078156
66 0.70525097847
67 0.724694013596
68 0.737638950348
69 0.749594926834



Yong


From: Davies Liu 
Sent: Tuesday, September 6, 2016 2:27 PM
To: Сергей Романов
Cc: Gavin Yue; Mich Talebzadeh; user
Subject: Re: Re[8]: Spark 2.0: SQL runs 5x times slower when adding 29th field 
to aggregation.

I think the slowness is caused by generated aggregate method has more
than 8K bytecodes, than it's not JIT compiled, became much slower.

Could you try to disable the DontCompileHugeMethods by:

-XX:-DontCompileHugeMethods

On Mon, Sep 5, 2016 at 4:21 AM, Сергей Романов
 wrote:
> Hi, Gavin,
>
> Shuffling is exactly the same in both requests and is minimal. Both requests
> produces one shuffle task. Running time is the only difference I can see in
> metrics:
>
> timeit.timeit(spark.read.csv('file:///data/dump/test_csv',
> schema=schema).groupBy().sum(*(['dd_convs'] * 57) ).collect, number=1)
> 0.713730096817
>  {
> "id" : 368,
> "name" : "duration total (min, med, max)",
> "value" : "524"
>   }, {
> "id" : 375,
> "name" : "internal.metrics.executorRunTime",
> "value" : "527"
>   }, {
> "id" : 391,
> "name" : "internal.metrics.shuffle.write.writeTime",
> "value" : "244495"
>   }
>
> timeit.timeit(spark.read.csv('file:///data/dump/test_csv',
> schema=schema).groupBy().sum(*(['dd_convs'] * 58) ).collect, number=1)
> 2.97951102257
>
>   }, {
> "id" : 469,
> "name" : "duration total (min, med, max)",
> "value" : "2654"
>   }, {
> "id" : 476,
> "name" : "internal.metrics.executorRunTime",
> "value" : "2661"
>   }, {
> "id" : 492,
> "name" : "internal.metrics.shuffle.write.writeTime",
> "value" : "371883"
>   }, {
>
> Full metrics in attachment.
>
> Суббота, 3 сентября 2016, 19:53 +03:00 от Gavin Yue
> :
>
>
> Any shuffling?
>
>
> On Sep 3, 2016, at 5:50 AM, Сергей Романов 
> wrote:
>
> Same problem happens with CSV data file, so it's not parquet-related either.
>
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/__ / .__/\_,_/_/ /_/\_\   version 2.0.0
>   /_/
>
> Using Python version 2.7.6 (default, Jun 22 2015 17:58:13)
> SparkSession available as 'spark'.
 import timeit
 from pyspark.sql.types import *
 schema = StructType([StructField('dd_convs', FloatType(), True)])
 for x in range(50, 70): print x,
 timeit.timeit(spark.read.csv('file:///data/dump/test_csv',
 schema=schema).groupBy().sum(*(['dd_convs'] * x) ).collect, number=1)
> 50 0.372850894928
> 51 0.376906871796
> 52 0.381325960159
> 53 0.385444164276
> 54 0.38685192
> 55 0.388918161392
> 56 0.397624969482
> 57 0.391713142395
> 58 2.62714004517
> 59 2.68421196938
> 60 2.74627685547
> 61 2.81081581116
> 62 3.43532109261
> 63 3.07742786407
> 64 3.03904604912
> 65 3.01616096497
> 66 3.06293702126
> 67 3.09386610985
> 68 3.27610206604
> 69 3.2041969299
>
> Суббота, 3 сентября 2016, 15:40 +03:00 от Сергей Романов
> :
>
> Hi,
>
> I had narrowed down my problem to a very simple case. I'm sending 27kb
> parquet in attachment. (file:///data/dump/test2 in example)
>
> Please, can you take a look at it? Why there is performance drop after 57
> sum columns?
>
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/__ / .__/\_,_/_/ /_/\_\   version 2.0.0
>   /_/
>
> Using Python version 2.7.6 (default, Jun 22 2015 17:58:13)
> SparkSession available as 'spark'.
 import timeit
 for x in range(70): print x,
 timeit.timeit(spark.read.parquet('file:///data/dump/test2').groupBy().sum(*(['dd_convs']
 * x) ).collect, number=1)
> ...
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
> details.
> 0 1.05591607094
> 1 0.200426101685
> 2 0.203800916672
> 3 0.176458120346
> 4 0.184863805771
> 5 0.232321023941
> 6 0.216032981873
> 7 

Great performance improvement of Spark 1.6.2 on our production cluster

2016-08-29 Thread Yong Zhang
Today I deployed Spark 1.6.2 on our production cluster.

There is one daily huge job we run it every day using Spark SQL, and it is the 
biggest Spark job on our cluster running daily. I was impressive by the speed 
improvement.

Here is the history statistics of this daily job:

1) 11 to 12 hours on Hive 0.12 using MR
2) 6 hours on Spark 1.3.1
3) 4.5 hours on Spark 1.5.2

1.6 hours on Spark 1.6.2 with the same resource allocation (We are using 
Standalone mode). Very hard to believe.


 Looking forward to the coming Spark 2.x release (Can you guys really make 10x 
faster? For this job, 2x will already blow my heart).


Great job, Spark development team! Thank you for such great product.


Yong


Re: Spark join and large temp files

2016-08-08 Thread Yong Zhang
Join requires shuffling. The problem is that you have to shuffle 1.5T data, 
which caused problem on your disk usage. Another way is to broadcast the 1.5G 
small dataset, so there is no shuffle requirement for 1.5T dataset. But you 
need to make sure you have enough memory.


Can you try to increase your partition count, which will make each partition 
contains less data for your 1.5T, so the whole disk usage of split data maybe 
less.


But keep in mind you should always have enough space of your disk to handle the 
job you plan to run.


Yong



From: Ashic Mahtab 
Sent: Monday, August 8, 2016 2:53 PM
To: Deepak Sharma
Cc: Apache Spark
Subject: RE: Spark join and large temp files

Hi Deepak,
Thanks for the response.

Registering the temp tables didn't help. Here's what I have:

val a = sqlContext..read.parquet(...).select("eid.id", 
"name").withColumnRenamed("eid.id", "id")
val b = sqlContext.read.parquet(...).select("id", "number")

a.registerTempTable("a")
b.registerTempTable("b")

val results = sqlContext.sql("SELECT x.id, x.name, y.number FROM a x join b y 
on x.id=y.id)

results.write.parquet(...)

Is there something I'm missing?

Cheers,
Ashic.


From: deepakmc...@gmail.com
Date: Tue, 9 Aug 2016 00:01:32 +0530
Subject: Re: Spark join and large temp files
To: as...@live.com
CC: user@spark.apache.org

Register you dataframes as temp tables and then try the join on the temp table.
This should resolve your issue.

Thanks
Deepak

On Mon, Aug 8, 2016 at 11:47 PM, Ashic Mahtab 
> wrote:
Hello,
We have two parquet inputs of the following form:

a: id:String, Name:String  (1.5TB)
b: id:String, Number:Int  (1.3GB)

We need to join these two to get (id, Number, Name). We've tried two approaches:

a.join(b, Seq("id"), "right_outer")

where a and b are dataframes. We also tried taking the rdds, mapping them to 
pair rdds with id as the key, and then joining. What we're seeing is that temp 
file usage is increasing on the join stage, and filling up our disks, causing 
the job to crash. Is there a way to join these two data sets without 
well...crashing?

Note, the ids are unique, and there's a one to one mapping between the two 
datasets.

Any help would be appreciated.

-Ashic.







--
Thanks
Deepak
www.bigdatabig.com
www.keosha.net


Re: Spark SQL and number of task

2016-08-04 Thread Yong Zhang
The 2 plans look similar, but they are big difference, if you also consider 
that your source is in fact from a no-sql DB, like C*.


The OR plan has "Filter ((id#0L = 94) || (id#0L = 2))", which means the filter 
is indeed happening on Spark side, instead of on C* side. Which means to 
fulfill your query, Spark has to load all the data back C* (Image your have 
millions of IDs), and filter most of them out, and only keep data with id 94 
and 2. The IO is bottleneck in this case, and huge data need to transfer from 
C* to spark.


In the other case, the ids being pushed down to C* (and in most case, the id is 
the primary key (or at least partition key)), so C* will find the data for 
these 2 ids very fast, and only return the matching data back to Spark, then 
doing the aggregation based on very small data in Spark. That is why your 
performance is big difference in these 2 cases.


You can argue that Spark-Cassandra connector should be smarter to handle the 
"OR" case. But in general, OR is not  easy to handle, as in most cases, "OR" 
will be applied on different columns, instead of only on IDs in this case.


If your query will use partition keys in C*, always use them with either "=" or 
"in". If not, then you have to wait for the data transfer from C* to spark. 
Spark + C* allow to run any ad-hoc queries, but you need to know the underline 
price paid.


Yong



From: Takeshi Yamamuro 
Sent: Thursday, August 4, 2016 8:18 AM
To: Marco Colombo
Cc: user
Subject: Re: Spark SQL and number of task

Seems the performance difference comes from `CassandraSourceRelation`.
I'm not familiar with the implementation though, I guess the filter `IN` is 
pushed down
into the datasource and the other not.

You'd better off checking performance metrics in webUI.

// maropu

On Thu, Aug 4, 2016 at 8:41 PM, Marco Colombo 
> wrote:
Ok, thanx.
The 2 plan are very similar

with in condition
+--+--+
|   plan
   |
+--+--+
| == Physical Plan ==   
   |
| TungstenAggregate(key=[id#0L], 
functions=[(avg(avg#2),mode=Final,isDistinct=false)], output=[id#0L,_c1#81])
  |
| +- TungstenExchange hashpartitioning(id#0L,10), None  
   |
|+- TungstenAggregate(key=[id#0L], 
functions=[(avg(avg#2),mode=Partial,isDistinct=false)], 
output=[id#0L,sum#85,count#86L])|
|   +- Scan 
org.apache.spark.sql.cassandra.CassandraSourceRelation@49243f65[id#0L,avg#2] 
PushedFilters: [In(id,[Ljava.lang.Object;@6f8b2e9d)]  |
+--+--+

with the or condition
+--+--+
|   plan
   |
+--+--+
| == Physical Plan ==   
   |
| TungstenAggregate(key=[id#0L], 
functions=[(avg(avg#2),mode=Final,isDistinct=false)], output=[id#0L,_c1#88])
  |
| +- TungstenExchange hashpartitioning(id#0L,10), None  
   |
|+- TungstenAggregate(key=[id#0L], 
functions=[(avg(avg#2),mode=Partial,isDistinct=false)], 
output=[id#0L,sum#92,count#93L])|
|   +- Filter ((id#0L = 94) || (id#0L = 2)) 
   |
|  +- Scan 
org.apache.spark.sql.cassandra.CassandraSourceRelation@49243f65[id#0L,avg#2] 
PushedFilters: [Or(EqualTo(id,94),EqualTo(id,2))]  |
+--+--+


Filters are pushed down, so I cannot realize why it is performing a so big data 

Re: Tuning level of Parallelism: Increase or decrease?

2016-08-03 Thread Yong Zhang
Data Locality is part of job/task scheduling responsibility. So both links you 
specified originally are correct, one is for the standalone mode comes with 
Spark, another is for the YARN. Both have this ability.


But YARN, as a very popular scheduling component, comes with MUCH, MUCH more 
features than the Standalone mode. You can research more on google about it.


Yong



From: Jestin Ma 
Sent: Tuesday, August 2, 2016 7:11 PM
To: Jacek Laskowski
Cc: Nikolay Zhebet; Andrew Ehrlich; user
Subject: Re: Tuning level of Parallelism: Increase or decrease?

Hi Jacek,
I found this page of your book here: 
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-data-locality.html
Data Locality · Mastering Apache 
Spark
jaceklaskowski.gitbooks.io
Spark relies on data locality, aka data placement or proximity to data source, 
that makes Spark jobs sensitive to where the data is located. It is therefore 
important ...


which says:  "It is therefore important to have Spark running on Hadoop YARN 
cluster
 if the data comes from HDFS. In Spark on 
YARN 
Spark tries to place tasks alongside HDFS blocks."


So my reasoning was that since Spark takes care of data locality when workers 
load data from HDFS, I can't see why running on YARN is more important.

Hope this makes my question clearer.


On Tue, Aug 2, 2016 at 3:54 PM, Jacek Laskowski 
> wrote:
On Mon, Aug 1, 2016 at 5:56 PM, Jestin Ma 
> wrote:
> Hi Nikolay, I'm looking at data locality improvements for Spark, and I have
> conflicting sources on using YARN for Spark.
>
> Reynold said that Spark workers automatically take care of data locality
> here:
> https://www.quora.com/Does-Apache-Spark-take-care-of-data-locality-when-Spark-workers-load-data-from-HDFS
>
> However, I've read elsewhere
> (https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/yarn/)
> that Spark on YARN increases data locality because YARN tries to place tasks
> next to HDFS blocks.
>
> Can anyone verify/support one side or the other?

Hi Jestin,

I'm the author of the latter. I can't seem to find how Reynold
"conflicts" with what I wrote in the notes? Could you elaborate?

I certainly may be wrong.

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski



Re: Extracting key word from a textual column

2016-08-02 Thread Yong Zhang
Well, if you still want to use windows function for your logic, then you need 
to derive a new column out, like "catalog", and use it as part of grouping 
logic.


Maybe you can use regex for deriving out this new column. The implementation 
needs to depend on your data in "transactiondescription", and regex gives you 
the most powerful way to handle your data.


This is really not a Spark question, but how to you process your logic based on 
the data given.


Yong



From: Mich Talebzadeh 
Sent: Tuesday, August 2, 2016 10:00 AM
To: user @spark
Subject: Extracting key word from a textual column

Hi,

Need some ideas.

Summary:

I am working on a tool to slice and dice the amount of money I have spent so 
far (meaning the whole data sample) on a given retailer so I have a better idea 
of where I am wasting the money

Approach

Downloaded my bank statements from a given account in csv format from inception 
till end of July. Read the data and stored it in ORC table.

I am interested for all bills that I paid using Debit Card ( transactiontype = 
"DEB") that comes out the account directly. Transactiontype is the three 
character code lookup that I download as well.

scala> ll_18740868.printSchema
root
 |-- transactiondate: date (nullable = true)
 |-- transactiontype: string (nullable = true)
 |-- sortcode: string (nullable = true)
 |-- accountnumber: string (nullable = true)
 |-- transactiondescription: string (nullable = true)
 |-- debitamount: double (nullable = true)
 |-- creditamount: double (nullable = true)
 |-- balance: double (nullable = true)

The important fields are transactiondate, transactiontype, 
transactiondescription and debitamount

So using analytics. windowing I can do all sorts of things. For example this 
one gives me the last time I spent money on retailer XYZ and the amount

SELECT *
FROM (
  select transactiondate, transactiondescription, debitamount
  , rank() over (order by transactiondate desc) AS rank
  from accounts.ll_18740868 where transactiondescription like '%XYZ%'
 ) tmp
where rank <= 1

And its equivalent using Windowing in FP

import org.apache.spark.sql.expressions.Window
val wSpec = 
Window.partitionBy("transactiontype").orderBy(desc("transactiondate"))
ll_18740868.filter(col("transactiondescription").contains("XYZ")).select($"transactiondate",$"transactiondescription",
 rank().over(wSpec).as("rank")).filter($"rank"===1).show


+---+--++
|transactiondate|transactiondescription|rank|
+---+--++
| 2015-12-15|  XYZ LTD CD 4636 |   1|
+---+--++

So far so good. But if I want to find all I spent on each retailer, then it 
gets trickier as a retailer appears like below in the column 
transactiondescription:

ll_18740868.where($"transactiondescription".contains("SAINSBURY")).select($"transactiondescription").show(5)
+--+
|transactiondescription|
+--+
|  SAINSBURYS SMKT C...|
|  SACAT SAINSBURYS ...|
|  SAINSBURY'S SMKT ...|
|  SAINSBURYS S/MKT ...|
|  SACAT SAINSBURYS ...|
+--+

If I look at them I know they all belong to SAINBURYS (food retailer). I have 
done some crude grouping and it works somehow

//define UDF here to handle substring
val SubstrUDF = udf { (s: String, start: Int, end: Int) => s.substring(start, 
end) }
var cutoff = "CD"  // currently used in the statement
val wSpec2 = 
Window.partitionBy(SubstrUDF($"transactiondescription",lit(0),instr($"transactiondescription",
 cutoff)-1))
ll_18740868.where($"transactiontype" === "DEB" && 
($"transactiondescription").isNotNull).select(SubstrUDF($"transactiondescription",lit(0),instr($"transactiondescription",
 
cutoff)-1).as("Retailer"),sum($"debitamount").over(wSpec2).as("Spent")).distinct.orderBy($"Spent").collect.foreach(println)

However, I really need to extract the "keyword" retailer name from 
transactiondescription column And I need some ideas about the best way of doing 
it. Is this possible in Spark?


Thanks

Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




Re: UDF returning generic Seq

2016-07-26 Thread Yong Zhang
I don't know the if "ANY" will work or not, but do you take a look about how 
"map_values" UDF implemented in Spark, which return map values of an array/seq 
of arbitrary type.


https://issues.apache.org/jira/browse/SPARK-16279


Yong



From: Chris Beavers 
Sent: Monday, July 25, 2016 10:32 PM
To: user@spark.apache.org
Subject: UDF returning generic Seq

Hey there,

Interested in writing a UDF that returns an ArrayType column of unknown 
subtype. My understanding is that this translated JVM-type-wise be a Seq of 
generic templated type: Seq[Any]. I seem to be hitting the constraint at 
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala:657
 that basically necessitates a fully qualified schema on the return type (i.e. 
the templated Any is hitting the default exception throwing case at the end of 
schemaFor).

Is there any more canonical way have a UDF produce an ArrayType column of 
unknown type? Or is my only alternative here to reduce this to BinaryType and 
use whatever encoding/data structures I want under the covers there and in 
subsequent UDFs?

Thanks,
Chris


Re: Outer Explode needed

2016-07-26 Thread Yong Zhang
The reason of no response is that this feature is not available yet.


You can vote and following this JIRA 
https://issues.apache.org/jira/browse/SPARK-13721, if you really need this 
feature.


Yong



From: Don Drake 
Sent: Monday, July 25, 2016 9:12 PM
To: d...@spark.apache.org
Subject: Fwd: Outer Explode needed

No response on the Users list, I thought I would repost here.

See below.

-Don
-- Forwarded message --
From: Don Drake >
Date: Sun, Jul 24, 2016 at 2:18 PM
Subject: Outer Explode needed
To: user >


I have a nested data structure (array of structures) that I'm using the DSL 
df.explode() API to flatten the data.  However, when the array is empty, I'm 
not getting the rest of the row in my output as it is skipped.

This is the intended behavior, and Hive supports a SQL "OUTER explode()" to 
generate the row when the explode would not yield any output.

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+LateralView

Can we get this same outer explode in the DSL?  I have to jump through some 
outer join hoops to get the rows where the array is empty.

Thanks.

-Don

--
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake
800-733-2143



--
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake
800-733-2143


RE: Processing json document

2016-07-07 Thread Yong Zhang
The problem is for Hadoop Input format to identify the record delimiter. If the 
whole json record is in one line, then the nature record delimiter will be the 
new line character. 
Keep in mind in distribute file system, the file split position most likely IS 
not on the record delimiter. The input format implementation has to go back or 
forward in the bytes array looking for the next record delimiter on another 
node. 
Without a perfect record delimiter, then you just has to parse the whole file, 
as you know the file boundary is a reliable record delimiter.
JSON is Never a good format to be stored in BigData platform. If your source 
json is liking this, then you have to preprocess it. Or write your own 
implementation to handle the record delimiter, for your json data case. But 
good luck with that. There is no perfect generic solution for any kind of JSON 
data you want to handle.
Yong

From: ljia...@gmail.com
Date: Thu, 7 Jul 2016 11:57:26 -0500
Subject: Re: Processing json document
To: gurwls...@gmail.com
CC: jornfra...@gmail.com; user@spark.apache.org

Hi, there,
Thank you all for your input. @Hyukjin, as a matter of fact, I have read the 
blog link you posted before asking the question on the forum. As you pointed 
out, the link uses wholeTextFiles(0, which is bad in my case, because my json 
file can be as large as 20G+ and OOM might occur. I am not sure how to extract 
the value by using textFile call as it will create an RDD of string and treat 
each line without ordering. It destroys the json context. 
Large multiline json file with parent node are very common in the real world. 
Take the common employees json example below, assuming we have millions of 
employee and it is super large json document, how can spark handle this? This 
should be a common pattern, shouldn't it? In real world, json document does not 
always come as cleanly formatted as the spark example requires. 
{"employees":[{  "firstName":"John",   "lastName":"Doe"},{  
"firstName":"Anna","lastName":"Smith"},{   
"firstName":"Peter", "lastName":"Jones"}]}


On Thu, Jul 7, 2016 at 1:47 AM, Hyukjin Kwon  wrote:
The link uses wholeTextFiles() API which treats each file as each record.

2016-07-07 15:42 GMT+09:00 Jörn Franke :
This does not need necessarily the case if you look at the Hadoop 
FileInputFormat architecture then you can even split large multi line Jsons 
without issues. I would need to have a look at it, but one large file does not 
mean one Executor independent of the underlying format.
On 07 Jul 2016, at 08:12, Hyukjin Kwon  wrote:

There is a good link for this here, 
http://searchdatascience.com/spark-adventures-1-processing-multi-line-json-files
If there are a lot of small files, then it would work pretty okay in a 
distributed manner, but I am worried if it is single large file. 
In this case, this would only work in single executor which I think will end up 
with OutOfMemoryException.
Spark JSON data source does not support multi-line JSON as input due to the 
limitation of TextInputFormat and LineRecordReader.You may have to just extract 
the values after reading it by textFile..
​

2016-07-07 14:48 GMT+09:00 Lan Jiang :
Hi, there
Spark has provided json document processing feature for a long time. In most 
examples I see, each line is a json object in the sample file. That is the 
easiest case. But how can we process a json document, which does not conform to 
this standard format (one line per json object)? Here is the document I am 
working on. 
First of all, it is multiple lines for one single big json object. The real 
file can be as long as 20+ G. Within that one single json object, it contains 
many name/value pairs. The name is some kind of id values. The value is the 
actual json object that I would like to be part of dataframe. Is there any way 
to do that? Appreciate any input. 

{"id1": {"Title":"title1","Author":"Tom","Source":{
"Date":"20160506","Type":"URL"},"Data":" blah blah"},
"id2": {"Title":"title2","Author":"John","Source":{
"Date":"20150923","Type":"URL"},"Data":"  blah blah "},
"id3: {"Title":"title3","Author":"John","Source":{
"Date":"20150902","Type":"URL"},"Data":" blah blah "}}






  

RE: Possible to broadcast a function?

2016-06-30 Thread Yong Zhang
How about this old discussion related to similar problem as yours.
http://apache-spark-user-list.1001560.n3.nabble.com/Running-a-task-once-on-each-executor-td3203.html
Yong

From: aper...@timerazor.com
Date: Wed, 29 Jun 2016 14:00:07 +
Subject: Possible to broadcast a function?
To: user@spark.apache.org

The user guide describes a broadcast as a way to move a large dataset to each 
node:

"Broadcast variables allow the programmer to keep a read-only variable cached 
on each machine rather
than shipping a copy of it with tasks. They can be used, for example, to give 
every node a copy of a
large input dataset in an efficient manner."

And the broadcast example shows it being used with a variable.

But, is it somehow possible to instead broadcast a function that can be 
executed once, per node?

My use case is the following:

I have a large data structure that I currently create on each executor.  The 
way that I create it is a hack.  That is, when the RDD function is executed on 
the executor, I block, load a bunch of data (~250 GiB) from an external data 
source, create the data structure as a static object in the JVM, and then 
resume execution.  This works, but it ends up costing me a lot of extra memory 
(i.e. a few TiB when I have a lot of executors).

What I'd like to do is use the broadcast mechanism to load the data structure 
once, per node.  But, I can't serialize the data structure from the driver.

Any ideas?

Thanks!

Aaron

  

RE: Is it possible to turn a SortMergeJoin into BroadcastHashJoin?

2016-06-20 Thread Yong Zhang
If  you are using Spark > 1.5, the best way is to use DataFrame API directly, 
instead of SQL. In dataframe, you can specify the boardcast join hint in the 
dataframe API, which will force the boardcast join.
Yong

From: mich.talebza...@gmail.com
Date: Mon, 20 Jun 2016 13:09:17 +0100
Subject: Re: Is it possible to turn a SortMergeJoin into BroadcastHashJoin?
To: linguin@gmail.com
CC: zhen...@dtdream.com; user@spark.apache.org

what sort of the tables are these?
Can you register the result set as temp table and do a join on that assuming 
the RS is going to be small
s.filter(($"c2" < 1000)).registerTempTable("tmp")
and then do a join between tmp and Table2
HTH


Dr Mich Talebzadeh

 

LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 



On 20 June 2016 at 12:38, Takeshi Yamamuro  wrote:
Seems it is hard to predict the output size of filters because the current 
spark has limited statistics of input data. A few hours ago, Reynold created a 
ticket for cost-based optimizer framework in 
https://issues.apache.org/jira/browse/SPARK-16026.If you have ideas, questions, 
and suggestions, feel free to join the discussion.
// maropu

On Mon, Jun 20, 2016 at 8:21 PM, 梅西0247  wrote:


Thanks for your reply, In fact, that is what i just did
But my question is: Can we change the spark join behavior more clever, to turn 
a sortmergejoin into broadcasthashjoin automatically when if "found" that a 
output RDD is small enough?

--发件人:Takeshi 
Yamamuro 发送时间:2016年6月20日(星期一) 19:16收件人:梅西0247 
抄 送:user 主 题:Re: Is it possible to 
turn a SortMergeJoin into BroadcastHashJoin?
Hi,
How about caching the result of `select * from a where a.c2 < 1000`, then 
joining them?You probably need to tune `spark.sql.autoBroadcastJoinThreshold` 
to enable broadcast joins for the result table.
// maropu

On Mon, Jun 20, 2016 at 8:06 PM, 梅西0247  wrote:
Hi everyone, 
I ran a SQL join statement on Spark 1.6.1 like this:
select * from table1 a join table2 b on a.c1 = b.c1 where a.c2 < 1000;and it 
took quite a long time because It is a SortMergeJoin and the two tables are big.


In fact,  the size of filter result(select * from a where a.c2 < 1000) is very 
small, and I think a better solution is to use a BroadcastJoin with the filter 
result, but  I know  the physical plan is static and it won't be changed.
So, can we make the physical plan more adaptive? (In this example, I mean using 
a  BroadcastHashJoin instead of SortMergeJoin automatically. )






-- 
---
Takeshi Yamamuro



-- 
---
Takeshi Yamamuro



  

RE: Not able to write output to local filsystem from Standalone mode.

2016-05-27 Thread Yong Zhang
I am not familiar with that particular piece of code. But the spark's 
concurrency comes from Multi-thread. One executor will use multi threads to 
process tasks, and these tasks share the JVM memory of the executor. So it 
won't be surprised that Spark needs some blocking/sync for the memory some 
places.
Yong

> Date: Fri, 27 May 2016 20:21:46 +0200
> Subject: Re: Not able to write output to local filsystem from Standalone mode.
> From: ja...@japila.pl
> To: java8...@hotmail.com
> CC: math...@closetwork.org; stutiawas...@hcl.com; user@spark.apache.org
> 
> Hi Yong,
> 
> It makes sense...almost. :) I'm not sure how relevant it is, but just
> today was reviewing BlockInfoManager code with the locks for reading
> and writing, and my understanding of the code shows that Spark if fine
> when there are multiple attempts for writes of new memory blocks
> (pages) with a mere synchronized code block. See
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala#L324-L325
> 
> With that, it's not that simple to say "that just makes sense".
> 
> p.s. The more I know the less things "just make sense to me".
> 
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
> 
> 
> On Fri, May 27, 2016 at 3:42 AM, Yong Zhang <java8...@hotmail.com> wrote:
> > That just makes sense, doesn't it?
> >
> > The only place will be driver. If not, the executor will be having
> > contention by whom should create the directory in this case.
> >
> > Only the coordinator (driver in this case) is the best place for doing it.
> >
> > Yong
> >
> > 
> > From: math...@closetwork.org
> > Date: Wed, 25 May 2016 18:23:02 +
> > Subject: Re: Not able to write output to local filsystem from Standalone
> > mode.
> > To: ja...@japila.pl
> > CC: stutiawas...@hcl.com; user@spark.apache.org
> >
> >
> > Experience. I don't use Mesos or Yarn or Hadoop, so I don't know.
> >
> >
> > On Wed, May 25, 2016 at 2:51 AM Jacek Laskowski <ja...@japila.pl> wrote:
> >
> > Hi Mathieu,
> >
> > Thanks a lot for the answer! I did *not* know it's the driver to
> > create the directory.
> >
> > You said "standalone mode", is this the case for the other modes -
> > yarn and mesos?
> >
> > p.s. Did you find it in the code or...just experienced before? #curious
> >
> > Pozdrawiam,
> > Jacek Laskowski
> > 
> > https://medium.com/@jaceklaskowski/
> > Mastering Apache Spark http://bit.ly/mastering-apache-spark
> > Follow me at https://twitter.com/jaceklaskowski
> >
> >
> > On Tue, May 24, 2016 at 4:04 PM, Mathieu Longtin <math...@closetwork.org>
> > wrote:
> >> In standalone mode, executor assume they have access to a shared file
> >> system. The driver creates the directory and the executor write files, so
> >> the executors end up not writing anything since there is no local
> >> directory.
> >>
> >> On Tue, May 24, 2016 at 8:01 AM Stuti Awasthi <stutiawas...@hcl.com>
> >> wrote:
> >>>
> >>> hi Jacek,
> >>>
> >>> Parent directory already present, its my home directory. Im using Linux
> >>> (Redhat) machine 64 bit.
> >>> Also I noticed that "test1" folder is created in my master with
> >>> subdirectory as "_temporary" which is empty. but on slaves, no such
> >>> directory is created under /home/stuti.
> >>>
> >>> Thanks
> >>> Stuti
> >>> 
> >>> From: Jacek Laskowski [ja...@japila.pl]
> >>> Sent: Tuesday, May 24, 2016 5:27 PM
> >>> To: Stuti Awasthi
> >>> Cc: user
> >>> Subject: Re: Not able to write output to local filsystem from Standalone
> >>> mode.
> >>>
> >>> Hi,
> >>>
> >>> What happens when you create the parent directory /home/stuti? I think
> >>> the
> >>> failure is due to missing parent directories. What's the OS?
> >>>
> >>> Jacek
> >>>
> >>> On 24 May 2016 11:27 a.m., "Stuti Awasthi" <stutiawas...@hcl.com> wrote:
> >>>
> >>> Hi All,
> >>>
> >>> I have 3 nodes Spark 1.6 Standalone mode cluster w

RE: Not able to write output to local filsystem from Standalone mode.

2016-05-26 Thread Yong Zhang
That just makes sense, doesn't it?
The only place will be driver. If not, the executor will be having contention 
by whom should create the directory in this case.
Only the coordinator (driver in this case) is the best place for doing it.
Yong

From: math...@closetwork.org
Date: Wed, 25 May 2016 18:23:02 +
Subject: Re: Not able to write output to local filsystem from Standalone mode.
To: ja...@japila.pl
CC: stutiawas...@hcl.com; user@spark.apache.org

Experience. I don't use Mesos or Yarn or Hadoop, so I don't know.

On Wed, May 25, 2016 at 2:51 AM Jacek Laskowski  wrote:
Hi Mathieu,



Thanks a lot for the answer! I did *not* know it's the driver to

create the directory.



You said "standalone mode", is this the case for the other modes -

yarn and mesos?



p.s. Did you find it in the code or...just experienced before? #curious



Pozdrawiam,

Jacek Laskowski



https://medium.com/@jaceklaskowski/

Mastering Apache Spark http://bit.ly/mastering-apache-spark

Follow me at https://twitter.com/jaceklaskowski





On Tue, May 24, 2016 at 4:04 PM, Mathieu Longtin  wrote:

> In standalone mode, executor assume they have access to a shared file

> system. The driver creates the directory and the executor write files, so

> the executors end up not writing anything since there is no local directory.

>

> On Tue, May 24, 2016 at 8:01 AM Stuti Awasthi  wrote:

>>

>> hi Jacek,

>>

>> Parent directory already present, its my home directory. Im using Linux

>> (Redhat) machine 64 bit.

>> Also I noticed that "test1" folder is created in my master with

>> subdirectory as "_temporary" which is empty. but on slaves, no such

>> directory is created under /home/stuti.

>>

>> Thanks

>> Stuti

>> 

>> From: Jacek Laskowski [ja...@japila.pl]

>> Sent: Tuesday, May 24, 2016 5:27 PM

>> To: Stuti Awasthi

>> Cc: user

>> Subject: Re: Not able to write output to local filsystem from Standalone

>> mode.

>>

>> Hi,

>>

>> What happens when you create the parent directory /home/stuti? I think the

>> failure is due to missing parent directories. What's the OS?

>>

>> Jacek

>>

>> On 24 May 2016 11:27 a.m., "Stuti Awasthi"  wrote:

>>

>> Hi All,

>>

>> I have 3 nodes Spark 1.6 Standalone mode cluster with 1 Master and 2

>> Slaves. Also Im not having Hadoop as filesystem . Now, Im able to launch

>> shell , read the input file from local filesystem and perform transformation

>> successfully. When I try to write my output in local filesystem path then I

>> receive below error .

>>

>>

>>

>> I tried to search on web and found similar Jira :

>> https://issues.apache.org/jira/browse/SPARK-2984 . Even though it shows

>> resolved for Spark 1.3+ but already people have posted the same issue still

>> persists in latest versions.

>>

>>

>>

>> ERROR

>>

>> scala> data.saveAsTextFile("/home/stuti/test1")

>>

>> 16/05/24 05:03:42 WARN TaskSetManager: Lost task 1.0 in stage 1.0 (TID 2,

>> server1): java.io.IOException: The temporary job-output directory

>> file:/home/stuti/test1/_temporary doesn't exist!

>>

>> at

>> org.apache.hadoop.mapred.FileOutputCommitter.getWorkPath(FileOutputCommitter.java:250)

>>

>> at

>> org.apache.hadoop.mapred.FileOutputFormat.getTaskOutputPath(FileOutputFormat.java:244)

>>

>> at

>> org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:116)

>>

>> at

>> org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:91)

>>

>> at

>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1193)

>>

>> at

>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1185)

>>

>> at

>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)

>>

>> at org.apache.spark.scheduler.Task.run(Task.scala:89)

>>

>> at

>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)

>>

>> at

>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

>>

>> at

>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

>>

>> at java.lang.Thread.run(Thread.java:745)

>>

>>

>>

>> What is the best way to resolve this issue if suppose I don’t want to have

>> Hadoop installed OR is it mandatory to have Hadoop to write the output from

>> Standalone cluster mode.

>>

>>

>>

>> Please suggest.

>>

>>

>>

>> Thanks 

>>

>> Stuti Awasthi

>>

>>

>>

>>

>>

>> ::DISCLAIMER::

>>

>> 

>>

>> The contents of this e-mail and any attachment(s) are confidential and

>> intended for the named recipient(s) only.

>> 

RE: SQLContext and HiveContext parse a query string differently ?

2016-05-12 Thread Yong Zhang
Not sure what do you mean? You want to have one exactly query running fine in 
both sqlContext and HiveContext? The query parser are different, why do you 
want to have this feature? Do I understand your question correctly?
Yong

Date: Thu, 12 May 2016 13:09:34 +0200
Subject: SQLContext and HiveContext parse a query string differently ?
From: inv...@gmail.com
To: user@spark.apache.org

HI,
I just want to figure out why the two contexts behavior differently even on a 
simple query.In a netshell, I have a query in which there is a String 
containing single quote and casting to Array/Map.I have tried all the 
combination of diff type of sql context and query call api (sql, df.select, 
df.selectExpr).I can't find one rules all.
Here is the code for reproducing the 
problem.-
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}

object Test extends App {

  val sc  = new SparkContext("local[2]", "test", new SparkConf)
  val hiveContext = new HiveContext(sc)
  val sqlContext  = new SQLContext(sc)

  val context = hiveContext
  //  val context = sqlContext

  import context.implicits._

  val df = Seq((Seq(1, 2), 2)).toDF("a", "b")
  df.registerTempTable("tbl")
  df.printSchema()

  // case 1
  context.sql("select cast(a as array) from tbl").show()
  // HiveContext => org.apache.spark.sql.AnalysisException: cannot recognize 
input near 'array' '<' 'string' in primitive type specification; line 1 pos 17
  // SQLContext => OK

  // case 2
  context.sql("select 'a\\'b'").show()
  // HiveContext => OK
  // SQLContext => failure: ``union'' expected but ErrorToken(unclosed string 
literal) found

  // case 3
  df.selectExpr("cast(a as array)").show() // OK with HiveContext and 
SQLContext

  // case 4
  df.selectExpr("'a\\'b'").show() // HiveContext, SQLContext => failure: end of 
input expected
}-
Any clarification / workaround is high appreciated.
-- 
Hao Ren
Data Engineer @ leboncoin
Paris, France
  

RE: Weird results with Spark SQL Outer joins

2016-05-02 Thread Yong Zhang
We are still not sure what is the problem, if you cannot show us with some 
example data.
For dps with 42632 rows, and swig with 42034 rows, if dps full outer join with 
swig on 3 columns; with additional filters, get the same resultSet row count as 
dps lefter outer join with swig on 3 columns, with additional filters, again 
get the the same resultSet row count as dps right outer join with swig on 3 
columns, with same additional filters.
Without knowing your data, I cannot see the reason that has to be a bug in the 
spark.
Am I misunderstanding your bug?
Yong

From: kpe...@gmail.com
Date: Mon, 2 May 2016 12:11:18 -0700
Subject: Re: Weird results with Spark SQL Outer joins
To: gourav.sengu...@gmail.com
CC: user@spark.apache.org

Gourav,
I wish that was case, but I have done a select count on each of the two tables 
individually and they return back different number of rows:









dps.registerTempTable("dps_pin_promo_lt")

swig.registerTempTable("swig_pin_promo_lt")




dps.count()

RESULT: 42632




swig.count()

RESULT: 42034

On Mon, May 2, 2016 at 11:55 AM, Gourav Sengupta  
wrote:
This shows that both the tables have matching records and no mismatches. 
Therefore obviously you have the same results irrespective of whether you use 
right or left join. 
I think that there is no problem here, unless I am missing something.
Regards,Gourav 
On Mon, May 2, 2016 at 7:48 PM, kpeng1  wrote:
Also, the results of the inner query produced the same results:

sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account AS

d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,

d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s INNER JOIN

dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad =

d.ad) WHERE s.date >= '2016-01-03'AND d.date >= '2016-01-03'").count()

RESULT:23747







--

View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Weird-results-with-Spark-SQL-Outer-joins-tp26861p26863.html

Sent from the Apache Spark User List mailing list archive at Nabble.com.



-

To unsubscribe, e-mail: user-unsubscr...@spark.apache.org

For additional commands, e-mail: user-h...@spark.apache.org






  

RE: Java exception when showing join

2016-04-25 Thread Yong Zhang
sorry I didn't pay attention you are using pyspark, so ignore my reply, as I 
only use Scala version.
Yong

From: java8...@hotmail.com
To: webe...@aim.com; user@spark.apache.org
Subject: RE: Java exception when showing join
Date: Mon, 25 Apr 2016 09:41:18 -0400




dispute_df.join(comments_df, $"dispute_df.COMMENTID" === 
$"comments_df.COMMENTID").first()
If you are using DataFrame API, and some of them are trick for first time user, 
my suggestion is to always referring the unit tests. That is in fact the way I 
tried to find out how to do it for lots of cases.
https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
Yong

> Subject: Re: Java exception when showing join
> From: webe...@aim.com
> To: java8...@hotmail.com; user@spark.apache.org
> Date: Mon, 25 Apr 2016 07:45:12 -0500
> 
> I get an invalid syntax error when I do that.
> 
> On Fri, 2016-04-22 at 20:06 -0400, Yong Zhang wrote:
> > use "dispute_df.join(comments_df, dispute_df.COMMENTID ===
> > comments_df.COMMENTID).first()" instead.
> > 
> > Yong
> > 
> > Date: Fri, 22 Apr 2016 17:42:26 -0400
> > From: webe...@aim.com
> > To: user@spark.apache.org
> > Subject: Java exception when showing join
> > 
> > I am using pyspark with netezza.  I am getting a java exception when
> > trying to show the first row of a join.  I can show the first row for
> > of the two dataframes separately but not the result of a join.  I get
> > the same error for any action I take(first, collect, show).  Am I
> > doing something wrong?
> > 
> > from pyspark.sql import SQLContext
> > sqlContext = SQLContext(sc)
> > dispute_df =
> > sqlContext.read.format('com.ibm.spark.netezza').options(url='jdbc:net
> > ezza://***:5480/db', user='***', password='***', dbtable='table1',
> > driver='com.ibm.spark.netezza').load()
> > dispute_df.printSchema()
> > comments_df =
> > sqlContext.read.format('com.ibm.spark.netezza').options(url='jdbc:net
> > ezza://***:5480/db', user='***', password='***', dbtable='table2',
> > driver='com.ibm.spark.netezza').load()
> > comments_df.printSchema()
> > dispute_df.join(comments_df, dispute_df.COMMENTID ==
> > comments_df.COMMENTID).first()
> > 
> > 
> > root
> >  |-- COMMENTID: string (nullable = true)
> >  |-- EXPORTDATETIME: timestamp (nullable = true)
> >  |-- ARTAGS: string (nullable = true)
> >  |-- POTAGS: string (nullable = true)
> >  |-- INVTAG: string (nullable = true)
> >  |-- ACTIONTAG: string (nullable = true)
> >  |-- DISPUTEFLAG: string (nullable = true)
> >  |-- ACTIONFLAG: string (nullable = true)
> >  |-- CUSTOMFLAG1: string (nullable = true)
> >  |-- CUSTOMFLAG2: string (nullable = true)
> > 
> > root
> >  |-- COUNTRY: string (nullable = true)
> >  |-- CUSTOMER: string (nullable = true)
> >  |-- INVNUMBER: string (nullable = true)
> >  |-- INVSEQNUMBER: string (nullable = true)
> >  |-- LEDGERCODE: string (nullable = true)
> >  |-- COMMENTTEXT: string (nullable = true)
> >  |-- COMMENTTIMESTAMP: timestamp (nullable = true)
> >  |-- COMMENTLENGTH: long (nullable = true)
> >  |-- FREEINDEX: long (nullable = true)
> >  |-- COMPLETEDFLAG: long (nullable = true)
> >  |-- ACTIONFLAG: long (nullable = true)
> >  |-- FREETEXT: string (nullable = true)
> >  |-- USERNAME: string (nullable = true)
> >  |-- ACTION: string (nullable = true)
> >  |-- COMMENTID: string (nullable = true)
> > 
> > ---
> > 
> > Py4JJavaError Traceback (most recent call
> > last)
> >  in ()
> >   5 comments_df =
> > sqlContext.read.format('com.ibm.spark.netezza').options(url='jdbc:net
> > ezza://dstbld-pda02.bld.dst.ibm.com:5480/BACC_DEV_CSP_NBAAR',
> > user='rnahar', password='Sfeb2016',
> > dbtable='UK_METRICS.EU_COMMENTS2',
> > driver='com.ibm.spark.netezza').load()
> >   6 comments_df.printSchema()
> > > 7 dispute_df.join(comments_df, dispute_df.COMMENTID ==
> > comments_df.COMMENTID).first()
> > 
> > /usr/local/src/spark/spark-1.6.1-bin-
> > hadoop2.6/python/pyspark/sql/dataframe.pyc in first(self)
> > 802 Row(age=2, name=u'Alice')
> > 803 """
> > --> 804 return self.head()
> > 805 
> > 806 @ignore_unicode_prefix
> > 
> > /usr/local/src/spark/spark-1.6.1-bin-
> > hadoop2.6/python/pyspark/sql/dataframe.pyc in head(self, n)
> > 790 """
> &g

RE: Java exception when showing join

2016-04-25 Thread Yong Zhang
dispute_df.join(comments_df, $"dispute_df.COMMENTID" === 
$"comments_df.COMMENTID").first()
If you are using DataFrame API, and some of them are trick for first time user, 
my suggestion is to always referring the unit tests. That is in fact the way I 
tried to find out how to do it for lots of cases.
https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
Yong

> Subject: Re: Java exception when showing join
> From: webe...@aim.com
> To: java8...@hotmail.com; user@spark.apache.org
> Date: Mon, 25 Apr 2016 07:45:12 -0500
> 
> I get an invalid syntax error when I do that.
> 
> On Fri, 2016-04-22 at 20:06 -0400, Yong Zhang wrote:
> > use "dispute_df.join(comments_df, dispute_df.COMMENTID ===
> > comments_df.COMMENTID).first()" instead.
> > 
> > Yong
> > 
> > Date: Fri, 22 Apr 2016 17:42:26 -0400
> > From: webe...@aim.com
> > To: user@spark.apache.org
> > Subject: Java exception when showing join
> > 
> > I am using pyspark with netezza.  I am getting a java exception when
> > trying to show the first row of a join.  I can show the first row for
> > of the two dataframes separately but not the result of a join.  I get
> > the same error for any action I take(first, collect, show).  Am I
> > doing something wrong?
> > 
> > from pyspark.sql import SQLContext
> > sqlContext = SQLContext(sc)
> > dispute_df =
> > sqlContext.read.format('com.ibm.spark.netezza').options(url='jdbc:net
> > ezza://***:5480/db', user='***', password='***', dbtable='table1',
> > driver='com.ibm.spark.netezza').load()
> > dispute_df.printSchema()
> > comments_df =
> > sqlContext.read.format('com.ibm.spark.netezza').options(url='jdbc:net
> > ezza://***:5480/db', user='***', password='***', dbtable='table2',
> > driver='com.ibm.spark.netezza').load()
> > comments_df.printSchema()
> > dispute_df.join(comments_df, dispute_df.COMMENTID ==
> > comments_df.COMMENTID).first()
> > 
> > 
> > root
> >  |-- COMMENTID: string (nullable = true)
> >  |-- EXPORTDATETIME: timestamp (nullable = true)
> >  |-- ARTAGS: string (nullable = true)
> >  |-- POTAGS: string (nullable = true)
> >  |-- INVTAG: string (nullable = true)
> >  |-- ACTIONTAG: string (nullable = true)
> >  |-- DISPUTEFLAG: string (nullable = true)
> >  |-- ACTIONFLAG: string (nullable = true)
> >  |-- CUSTOMFLAG1: string (nullable = true)
> >  |-- CUSTOMFLAG2: string (nullable = true)
> > 
> > root
> >  |-- COUNTRY: string (nullable = true)
> >  |-- CUSTOMER: string (nullable = true)
> >  |-- INVNUMBER: string (nullable = true)
> >  |-- INVSEQNUMBER: string (nullable = true)
> >  |-- LEDGERCODE: string (nullable = true)
> >  |-- COMMENTTEXT: string (nullable = true)
> >  |-- COMMENTTIMESTAMP: timestamp (nullable = true)
> >  |-- COMMENTLENGTH: long (nullable = true)
> >  |-- FREEINDEX: long (nullable = true)
> >  |-- COMPLETEDFLAG: long (nullable = true)
> >  |-- ACTIONFLAG: long (nullable = true)
> >  |-- FREETEXT: string (nullable = true)
> >  |-- USERNAME: string (nullable = true)
> >  |-- ACTION: string (nullable = true)
> >  |-- COMMENTID: string (nullable = true)
> > 
> > ---
> > 
> > Py4JJavaError Traceback (most recent call
> > last)
> >  in ()
> >   5 comments_df =
> > sqlContext.read.format('com.ibm.spark.netezza').options(url='jdbc:net
> > ezza://dstbld-pda02.bld.dst.ibm.com:5480/BACC_DEV_CSP_NBAAR',
> > user='rnahar', password='Sfeb2016',
> > dbtable='UK_METRICS.EU_COMMENTS2',
> > driver='com.ibm.spark.netezza').load()
> >   6 comments_df.printSchema()
> > > 7 dispute_df.join(comments_df, dispute_df.COMMENTID ==
> > comments_df.COMMENTID).first()
> > 
> > /usr/local/src/spark/spark-1.6.1-bin-
> > hadoop2.6/python/pyspark/sql/dataframe.pyc in first(self)
> > 802 Row(age=2, name=u'Alice')
> > 803 """
> > --> 804 return self.head()
> > 805 
> > 806 @ignore_unicode_prefix
> > 
> > /usr/local/src/spark/spark-1.6.1-bin-
> > hadoop2.6/python/pyspark/sql/dataframe.pyc in head(self, n)
> > 790 """
> > 791 if n is None:
> > --> 792 rs = self.head(1)
> > 793 return rs[0] if rs else None
> > 794 return self.take(n)
> > 
> > /usr/local/src/spark/spark-1.6.1-bin-
> &

RE: How this unit test passed on master trunk?

2016-04-24 Thread Yong Zhang
So in that case then the result will be following:
[1,[1,1]][3,[3,1]][2,[2,1]]Thanks for explaining the meaning of the it. But the 
question is that how first() will be [3,[1,1]]? In fact, if there were any 
ordering in the final result, it will be [1,[1,1]], instead of [3,[1,1]], 
correct? 
Yong
Subject: Re: How this unit test passed on master trunk?
From: zzh...@hortonworks.com
To: java8...@hotmail.com; gatorsm...@gmail.com
CC: user@spark.apache.org
Date: Sun, 24 Apr 2016 04:37:11 +






There are multiple records for the DF




scala> structDF.groupBy($"a").agg(min(struct($"record.*"))).show
+---+-+
|  a|min(struct(unresolvedstar()))|
+---+-+
|  1|[1,1]|
|  3|[3,1]|
|  2|[2,1]|



The meaning of .groupBy($"a").agg(min(struct($"record.*"))) is to get the min 
for all the records with the same $”a”



For example: TestData2(1,1) :: TestData2(1,2) The result would be 1, (1, 1), 
since struct(1, 1) is less than struct(1, 2). Please check how the Ordering is 
implemented in InterpretedOrdering.



The output itself does not have any ordering. I am not sure why the unit test 
and the real env have different environment.



Xiao,



I do see the difference between unit test and local cluster run. Do you know 
the reason?



Thanks.



Zhan Zhang









 

On Apr 22, 2016, at 11:23 AM, Yong Zhang <java8...@hotmail.com> wrote:



Hi,



I was trying to find out why this unit test can pass in Spark code.



in
https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala



for this unit test:

  test("Star Expansion - CreateStruct and CreateArray") {
val structDf = testData2.select("a", "b").as("record")
// CreateStruct and CreateArray in aggregateExpressions
assert(structDf.groupBy($"a").agg(min(struct($"record.*"))).first() == 
Row(3, Row(3, 1)))
assert(structDf.groupBy($"a").agg(min(array($"record.*"))).first() == 
Row(3, Seq(3, 1)))

// CreateStruct and CreateArray in project list (unresolved alias)
assert(structDf.select(struct($"record.*")).first() == Row(Row(1, 1)))
assert(structDf.select(array($"record.*")).first().getAs[Seq[Int]](0) === 
Seq(1, 1))

// CreateStruct and CreateArray in project list (alias)
assert(structDf.select(struct($"record.*").as("a")).first() == Row(Row(1, 
1)))

assert(structDf.select(array($"record.*").as("a")).first().getAs[Seq[Int]](0) 
=== Seq(1, 1))
  }
>From my understanding, the data return in this case should be Row(1, Row(1, 
>1]), as that will be min of struct.
In fact, if I run the spark-shell on my laptop, and I got the result I expected:


./bin/spark-shell
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.0.0-SNAPSHOT
  /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_91)
Type in expressions to have them evaluated.
Type :help for more information.

scala> case class TestData2(a: Int, b: Int)
defined class TestData2
scala> val testData2DF = sqlContext.sparkContext.parallelize(TestData2(1,1) :: 
TestData2(1,2) :: TestData2(2,1) :: TestData2(2,2) :: TestData2(3,1) :: 
TestData2(3,2) :: Nil, 2).toDF()
scala> val structDF = testData2DF.select("a","b").as("record")
scala> structDF.groupBy($"a").agg(min(struct($"record.*"))).first()
res0: org.apache.spark.sql.Row = [1,[1,1]]

scala> structDF.show
+---+---+
|  a|  b|
+---+---+
|  1|  1|
|  1|  2|
|  2|  1|
|  2|  2|
|  3|  1|
|  3|  2|
+---+---+
So from my spark, which I built on the master, I cannot get Row[3,[1,1]] back 
in this case. Why the unit test asserts that Row[3,[1,1]] should be the first, 
and it will pass? But I cannot reproduce that in my spark-shell? I am trying to 
understand how to interpret the meaning of "agg(min(struct($"record.*")))"


Thanks
Yong 







  

RE: Java exception when showing join

2016-04-22 Thread Yong Zhang
use "dispute_df.join(comments_df, dispute_df.COMMENTID === 
comments_df.COMMENTID).first()" instead.
Yong

Date: Fri, 22 Apr 2016 17:42:26 -0400
From: webe...@aim.com
To: user@spark.apache.org
Subject: Java exception when showing join

I am using pyspark with netezza.  I am getting a java exception when trying to 
show the first row of a join.  I can show the first row for of the two 
dataframes separately but not the result of a join.  I get the same error for 
any action I take(first, collect, show).  Am I doing something wrong?



from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

dispute_df = 
sqlContext.read.format('com.ibm.spark.netezza').options(url='jdbc:netezza://***:5480/db',
 user='***', password='***', dbtable='table1', 
driver='com.ibm.spark.netezza').load()

dispute_df.printSchema()

comments_df = 
sqlContext.read.format('com.ibm.spark.netezza').options(url='jdbc:netezza://***:5480/db',
 user='***', password='***', dbtable='table2', 
driver='com.ibm.spark.netezza').load()

comments_df.printSchema()

dispute_df.join(comments_df, dispute_df.COMMENTID == 
comments_df.COMMENTID).first()





root

 |-- COMMENTID: string (nullable = true)

 |-- EXPORTDATETIME: timestamp (nullable = true)

 |-- ARTAGS: string (nullable = true)

 |-- POTAGS: string (nullable = true)

 |-- INVTAG: string (nullable = true)

 |-- ACTIONTAG: string (nullable = true)

 |-- DISPUTEFLAG: string (nullable = true)

 |-- ACTIONFLAG: string (nullable = true)

 |-- CUSTOMFLAG1: string (nullable = true)

 |-- CUSTOMFLAG2: string (nullable = true)



root

 |-- COUNTRY: string (nullable = true)

 |-- CUSTOMER: string (nullable = true)

 |-- INVNUMBER: string (nullable = true)

 |-- INVSEQNUMBER: string (nullable = true)

 |-- LEDGERCODE: string (nullable = true)

 |-- COMMENTTEXT: string (nullable = true)

 |-- COMMENTTIMESTAMP: timestamp (nullable = true)

 |-- COMMENTLENGTH: long (nullable = true)

 |-- FREEINDEX: long (nullable = true)

 |-- COMPLETEDFLAG: long (nullable = true)

 |-- ACTIONFLAG: long (nullable = true)

 |-- FREETEXT: string (nullable = true)

 |-- USERNAME: string (nullable = true)

 |-- ACTION: string (nullable = true)

 |-- COMMENTID: string (nullable = true)



---

Py4JJavaError Traceback (most recent call last)

 in ()

  5 comments_df = 
sqlContext.read.format('com.ibm.spark.netezza').options(url='jdbc:netezza://dstbld-pda02.bld.dst.ibm.com:5480/BACC_DEV_CSP_NBAAR',
 user='rnahar', password='Sfeb2016', dbtable='UK_METRICS.EU_COMMENTS2', 
driver='com.ibm.spark.netezza').load()

  6 comments_df.printSchema()

> 7 dispute_df.join(comments_df, dispute_df.COMMENTID == 
comments_df.COMMENTID).first()



/usr/local/src/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc 
in first(self)

802 Row(age=2, name=u'Alice')

803 """

--> 804 return self.head()

805 

806 @ignore_unicode_prefix



/usr/local/src/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc 
in head(self, n)

790 """

791 if n is None:

--> 792 rs = self.head(1)

793 return rs[0] if rs else None

794 return self.take(n)



/usr/local/src/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc 
in head(self, n)

792 rs = self.head(1)

793 return rs[0] if rs else None

--> 794 return self.take(n)

795 

796 @ignore_unicode_prefix



/usr/local/src/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc 
in take(self, num)

304 with SCCallSiteSync(self._sc) as css:

305 port = 
self._sc._jvm.org.apache.spark.sql.execution.EvaluatePython.takeAndServe(

--> 306 self._jdf, num)

307 return list(_load_from_socket(port, 
BatchedSerializer(PickleSerializer(

308 



/usr/local/src/spark/spark-1.6.1-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py
 in __call__(self, *args)

811 answer = self.gateway_client.send_command(command)

812 return_value = get_return_value(

--> 813 answer, self.gateway_client, self.target_id, self.name)

814 

815 for temp_arg in temp_args:



/usr/local/src/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/utils.pyc in 
deco(*a, **kw)

 43 def deco(*a, **kw):

 44 try:

---> 45 return f(*a, **kw)

 46 except py4j.protocol.Py4JJavaError as e:

 47 s = e.java_exception.toString()



/usr/local/src/spark/spark-1.6.1-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/protocol.py
 in get_return_value(answer, gateway_client, target_id, name)

306 raise Py4JJavaError(

307 "An error occurred while calling {0}{1}{2}.\n".

--> 308 format(target_id, ".", name), value)


  1   2   >