[jira] [Commented] (SPARK-21892) status code is 200 OK when kill application fail via spark master rest api

2017-09-01 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16150171#comment-16150171
 ] 

Sean Owen commented on SPARK-21892:
---

The request succeeded at the HTTP level. The application action failed. That 
doesn't necessarily mean the transport layer should report a failure too.

> status code is 200 OK  when kill application fail via spark master rest api
> ---
>
> Key: SPARK-21892
> URL: https://issues.apache.org/jira/browse/SPARK-21892
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: Zhuang Xueyin
>Priority: Minor
>
> Sent a post request to spark master restapi, eg:
> http://:6066/v1/submissions/kill/driver-xxx
> Request body:
> {
> "action" : "KillSubmissionRequest",
> "clientSparkVersion" : "2.1.0",
> }
> Response body:
> {
>   "action" : "KillSubmissionResponse",
>   "message" : "Driver driver-xxx has already finished or does not exist",
>   "serverSparkVersion" : "2.1.0",
>   "submissionId" : "driver-xxx",
>   "success" : false
> }
> Response headers:
> *Status Code: 200 OK*
> Content-Length: 203
> Content-Type: application/json; charset=UTF-8
> Date: Fri, 01 Sep 2017 05:56:04 GMT
> Server: Jetty(9.2.z-SNAPSHOT)
> Result:
> status code is 200 OK  when kill application fail via spark master rest api. 
> While the response body indicates that the update is not successfully, this 
> is not rest api standard, suggest to improve it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-21893) Put Kafka 0.8 behind a profile

2017-09-01 Thread Sean Owen (JIRA)
Sean Owen created SPARK-21893:
-

 Summary: Put Kafka 0.8 behind a profile
 Key: SPARK-21893
 URL: https://issues.apache.org/jira/browse/SPARK-21893
 Project: Spark
  Issue Type: Sub-task
  Components: Structured Streaming
Affects Versions: 2.2.0
Reporter: Sean Owen


Kafka does not support 0.8.x for Scala 2.12. This code will have to, at least, 
be optionally enabled by a profile, which could be enabled by default for 2.11. 
Or outright removed.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21882) OutputMetrics doesn't count written bytes correctly in the saveAsHadoopDataset function

2017-09-01 Thread Saisai Shao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16150260#comment-16150260
 ] 

Saisai Shao commented on SPARK-21882:
-

Please submit the patch to Github Apache Spark repo.

> OutputMetrics doesn't count written bytes correctly in the 
> saveAsHadoopDataset function
> ---
>
> Key: SPARK-21882
> URL: https://issues.apache.org/jira/browse/SPARK-21882
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.1, 2.2.0
>Reporter: linxiaojun
>Priority: Minor
> Attachments: SPARK-21882.patch
>
>
> The first job called from saveAsHadoopDataset, running in each executor, does 
> not calculate the writtenBytes of OutputMetrics correctly (writtenBytes is 
> 0). The reason is that we did not initialize the callback function called to 
> find bytes written in the right way. As usual, statisticsTable which records 
> statistics in a FileSystem must be initialized at the beginning (this will be 
> triggered when open SparkHadoopWriter). The solution for this issue is to 
> adjust the order of callback function initialization. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21888) Cannot add stuff to Client Classpath for Yarn Cluster Mode

2017-09-01 Thread Saisai Shao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16150258#comment-16150258
 ] 

Saisai Shao commented on SPARK-21888:
-

Jars added by "--jars" will be added to client classpath in yarn-cluster mode. 

In your case the only problem is about hbase-site.xml, normally we will put 
this file in SPARK_CONF_DIR as well as hive-site.xml, doesn't it work for your?

> Cannot add stuff to Client Classpath for Yarn Cluster Mode
> --
>
> Key: SPARK-21888
> URL: https://issues.apache.org/jira/browse/SPARK-21888
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Parth Gandhi
>Priority: Minor
>
> While running Spark on Yarn in cluster mode, currently there is no way to add 
> any config files, jars etc. to Client classpath. An example for this is that 
> suppose you want to run an application that uses hbase. Then, unless and 
> until we do not copy the necessary config files required by hbase to Spark 
> Config folder, we cannot specify or set their exact locations in classpath on 
> Client end which we could do so earlier by setting the environment variable 
> "SPARK_CLASSPATH".



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19976) DirectStream API throws OffsetOutOfRange Exception

2017-09-01 Thread Taukir (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Taukir updated SPARK-19976:
---
Description: 
I am using following code. While data on kafka topic get deleted/retention 
period is over, it throws Exception and application crash
def functionToCreateContext(sc:SparkContext):StreamingContext = {

val kafkaParams = new mutable.HashMap[String, Object]()
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers)
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConsumerGrp)
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
classOf[StringDeserializer])
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
classOf[StringDeserializer])
kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true")
kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

   val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topic, 
kafkaParams)


val kafkaStream  = 
KafkaUtils.createDirectStream(ssc,LocationStrategies.PreferConsistent,consumerStrategy)
}


spark throws error and crash once OffsetOutOf RangeException  is thrown
WARN scheduler.TaskSetManager: Lost task 1.0 in stage 0.0 : 
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of 
range with no configured reset policy for partitions: {test-2=127287}
at 
org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:588)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:98)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)


  was:
I am using following code. While data on kafka topic get deleted/retention 
period is over, it throws Exception and application crash
def functionToCreateContext(sc:SparkContext):StreamingContext = {

val kafkaParams = new mutable.HashMap[String, Object]()
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers)
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConsumerGrp)
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
classOf[StringDeserializer])
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
classOf[StringDeserializer])
kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true")
kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

   val consumerStrategy = ConsumerStrategies.Subscribe[String, 
String](topic.split(",").map(_.trim).filter(!_.isEmpty).toSet, kafkaParams)


val kafkaStream  = 
KafkaUtils.createDirectStream(ssc,LocationStrategies.PreferConsistent,consumerStrategy)
}


spark throws error and crash once OffsetOutOf RangeException  is thrown
WARN scheduler.TaskSetManager: Lost task 1.0 in stage 0.0 : 
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of 
range with no configured reset policy for partitions: {test-2=127287}
at 
org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:588)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:98)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)



> DirectStream API throws OffsetOutOfRange Exception
> --
>
> Key: SPARK-19976
> URL: https://issues.apache.org/jira/browse/SPARK-19976
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.0
>Reporter: Taukir
>
> I am using following code. While data on kafka topic get deleted/retention 
> period is over, it throws Exception and application crash
> def functionToCreateContext(sc:SparkContext):StreamingContext = {
>

[jira] [Commented] (SPARK-21861) Add more details to PageRank illustration

2017-09-01 Thread Nikhil Bhide (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16150389#comment-16150389
 ] 

Nikhil Bhide commented on SPARK-21861:
--

Hi Sean,
Please find additional contents as follows. I have added few comments in the 
description section (highlighted), and I have slightly modified the example 
(highlighted).
Just to summarize :
1. Added details about damping factor & reset probability
2. Added details of Personalized Page Rank Algo supported in Graphx
3. Modified example 
- Sorted results in descending order by weights (ranks)
- Added example of PRR


PageRank measures the importance of each vertex in a graph, assuming an edge 
from u to v represents an endorsement of v’s importance by u. For example, if a 
Twitter user is followed by many others, the user will be ranked highly.* 
PageRank works by computing number and quality of links to a node to estimate 
the importance of a node.* 
GraphX comes with static and dynamic implementations of PageRank as methods on 
the PageRank object. Static PageRank runs for a fixed number of iterations, 
while dynamic PageRank runs until the ranks converge (i.e., stop changing by 
more than a specified tolerance). *Dynamic version of page rank 
PageRank$pageRank takes in two parameters tolerance factor and reset 
probability, whereas static version of page rank PageRank$staticPageRank takes 
in 2 parameters, number of iterations and reset probability. Reset probability 
is associated with damping factor, which is click through probability. Page 
rank is based on random surfer model, and damping factor is factor by which 
surfer would continue visiting different links. Damping factor ranges between 0 
and 1. By default, damping factor value is set to 0.85 and random probability 
is calculated as 1 – damping factor.*
*Graphx also supports Personalized PageRank (PRR), which is more general 
version of page rank. PRR is widely used in recommendation systems. For 
example, Twitter uses PRR to present users with other accounts that they may 
wish to follow. GraphX provides static and dynamic implementations of 
Personalized PageRank methods on PageRank object. 
GraphOpsallows calling these algorithms directly as methods on Graph. *

 import org.apache.spark.graphx.GraphLoader

// Load the edges as a graph
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Run PageRank
val ranks = graph.pageRank(0.0001).vertices
// Join the ranks with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
  val fields = line.split(",")
  (fields(0).toLong, fields(1))
}
val ranksByUsername = users.join(ranks).map {
  case (id, (username, rank)) => (username, rank)
}
// Print the result
*println(ranksByUsername.sortBy({ case (username, rank) => rank }, 
false).collect().mkString("\n"))
*
*val ranksPRR = graph.personalizedPageRank(graph.vertices.first._1, 
0.0001).vertices
val ranksPRRByUsername = users.join(ranksPRR).map {
  case (id, (username, rank)) => (username, rank)
}
// Print the result*
*println(ranksPRRByUsername.sortBy({ case (username, rank) => rank }, 
false).collect().mkString("\n"))
*

> Add more details to PageRank illustration
> -
>
> Key: SPARK-21861
> URL: https://issues.apache.org/jira/browse/SPARK-21861
> Project: Spark
>  Issue Type: Documentation
>  Components: Documentation
>Affects Versions: 2.2.0
>Reporter: Nikhil Bhide
>Priority: Trivial
>  Labels: documentation
>
> Add more details to PageRank illustration on 
> [https://spark.apache.org/docs/latest/graphx-programming-guide.html#pagerank]
> Adding details of page rank algorithm parameters such as dumping factor would 
> be pretty much effective. Also, adding more action on result such as sorting 
> based on weight would be more helpful.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-21861) Add more details to PageRank illustration

2017-09-01 Thread Nikhil Bhide (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16150389#comment-16150389
 ] 

Nikhil Bhide edited comment on SPARK-21861 at 9/1/17 11:43 AM:
---

Hi Sean,
Please find additional contents as follows. I have added few comments in the 
description section (highlighted), and I have slightly modified the example 
(highlighted).
Just to summarize :
1. Added details about damping factor & reset probability
2. Added details of Personalized Page Rank Algo supported in Graphx
3. Modified example 
- Sorted results in descending order by weights (ranks)
- Added example of PRR



PageRank measures the importance of each vertex in a graph, assuming an edge 
from u to v represents an endorsement of v’s importance by u. For example, if a 
Twitter user is followed by many others, the user will be ranked 
highly.{color:red} *PageRank works by computing number and quality of links to 
a node to estimate the importance of a node. *{color}
GraphX comes with static and dynamic implementations of PageRank as methods on 
the PageRank object. Static PageRank runs for a fixed number of iterations, 
while dynamic PageRank runs until the ranks converge (i.e., stop changing by 
more than a specified tolerance). {color:red}Dynamic version of page rank 
PageRank$pageRank takes in two parameters tolerance factor and reset 
probability, whereas static version of page rank PageRank$staticPageRank takes 
in 2 parameters, number of iterations and reset probability. Reset probability 
is associated with damping factor, which is click through probability. Page 
rank is based on random surfer model, and damping factor is factor by which 
surfer would continue visiting different links. Damping factor ranges between 0 
and 1. By default, damping factor value is set to 0.85 and random probability 
is calculated as 1 – damping factor.{color}
{color:red}GraphX also supports Personalized PageRank (PRR), which is more 
general version of page rank. PRR is widely used in recommendation systems. For 
example, Twitter uses PRR to present users with other accounts that they may 
wish to follow. GraphX provides static and dynamic implementations of 
Personalized PageRank methods on PageRank object. 
GraphOpsallows calling these algorithms directly as methods on Graph. {color}

 import org.apache.spark.graphx.GraphLoader

// Load the edges as a graph
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Run PageRank
val ranks = graph.pageRank(0.0001).vertices
// Join the ranks with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
  val fields = line.split(",")
  (fields(0).toLong, fields(1))
}
val ranksByUsername = users.join(ranks).map {
  case (id, (username, rank)) => (username, rank)
}
// Print the result
*println(ranksByUsername.sortBy({ case (username, rank) => rank }, 
false).collect().mkString("\n"))
*
*val ranksPRR = graph.personalizedPageRank(graph.vertices.first._1, 
0.0001).vertices
val ranksPRRByUsername = users.join(ranksPRR).map {
  case (id, (username, rank)) => (username, rank)
}
// Print the result*
*println(ranksPRRByUsername.sortBy({ case (username, rank) => rank }, 
false).collect().mkString("\n"))
*


was (Author: nikbhi15):
Hi Sean,
Please find additional contents as follows. I have added few comments in the 
description section (highlighted), and I have slightly modified the example 
(highlighted).
Just to summarize :
1. Added details about damping factor & reset probability
2. Added details of Personalized Page Rank Algo supported in Graphx
3. Modified example 
- Sorted results in descending order by weights (ranks)
- Added example of PRR



PageRank measures the importance of each vertex in a graph, assuming an edge 
from u to v represents an endorsement of v’s importance by u. For example, if a 
Twitter user is followed by many others, the user will be ranked 
highly.{color:red} *PageRank works by computing number and quality of links to 
a node to estimate the importance of a node. *{color}
GraphX comes with static and dynamic implementations of PageRank as methods on 
the PageRank object. Static PageRank runs for a fixed number of iterations, 
while dynamic PageRank runs until the ranks converge (i.e., stop changing by 
more than a specified tolerance). *Dynamic version of page rank 
PageRank$pageRank takes in two parameters tolerance factor and reset 
probability, whereas static version of page rank PageRank$staticPageRank takes 
in 2 parameters, number of iterations and reset probability. Reset probability 
is associated with damping factor, which is click through probability. Page 
rank is based on random surfer model, and damping factor is factor by which 
surfer would continue visiting different links. Damping factor ranges betw

[jira] [Comment Edited] (SPARK-21861) Add more details to PageRank illustration

2017-09-01 Thread Nikhil Bhide (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16150389#comment-16150389
 ] 

Nikhil Bhide edited comment on SPARK-21861 at 9/1/17 11:43 AM:
---

Hi Sean,
Please find additional contents as follows. I have added few comments in the 
description section (highlighted), and I have slightly modified the example 
(highlighted).
Just to summarize :
1. Added details about damping factor & reset probability
2. Added details of Personalized Page Rank Algo supported in Graphx
3. Modified example 
- Sorted results in descending order by weights (ranks)
- Added example of PRR



PageRank measures the importance of each vertex in a graph, assuming an edge 
from u to v represents an endorsement of v’s importance by u. For example, if a 
Twitter user is followed by many others, the user will be ranked 
highly.{color:red} *PageRank works by computing number and quality of links to 
a node to estimate the importance of a node. *{color}
GraphX comes with static and dynamic implementations of PageRank as methods on 
the PageRank object. Static PageRank runs for a fixed number of iterations, 
while dynamic PageRank runs until the ranks converge (i.e., stop changing by 
more than a specified tolerance). *Dynamic version of page rank 
PageRank$pageRank takes in two parameters tolerance factor and reset 
probability, whereas static version of page rank PageRank$staticPageRank takes 
in 2 parameters, number of iterations and reset probability. Reset probability 
is associated with damping factor, which is click through probability. Page 
rank is based on random surfer model, and damping factor is factor by which 
surfer would continue visiting different links. Damping factor ranges between 0 
and 1. By default, damping factor value is set to 0.85 and random probability 
is calculated as 1 – damping factor.*
*Graphx also supports Personalized PageRank (PRR), which is more general 
version of page rank. PRR is widely used in recommendation systems. For 
example, Twitter uses PRR to present users with other accounts that they may 
wish to follow. GraphX provides static and dynamic implementations of 
Personalized PageRank methods on PageRank object. 
GraphOpsallows calling these algorithms directly as methods on Graph. *

 import org.apache.spark.graphx.GraphLoader

// Load the edges as a graph
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Run PageRank
val ranks = graph.pageRank(0.0001).vertices
// Join the ranks with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
  val fields = line.split(",")
  (fields(0).toLong, fields(1))
}
val ranksByUsername = users.join(ranks).map {
  case (id, (username, rank)) => (username, rank)
}
// Print the result
*println(ranksByUsername.sortBy({ case (username, rank) => rank }, 
false).collect().mkString("\n"))
*
*val ranksPRR = graph.personalizedPageRank(graph.vertices.first._1, 
0.0001).vertices
val ranksPRRByUsername = users.join(ranksPRR).map {
  case (id, (username, rank)) => (username, rank)
}
// Print the result*
*println(ranksPRRByUsername.sortBy({ case (username, rank) => rank }, 
false).collect().mkString("\n"))
*


was (Author: nikbhi15):
Hi Sean,
Please find additional contents as follows. I have added few comments in the 
description section (highlighted), and I have slightly modified the example 
(highlighted).
Just to summarize :
1. Added details about damping factor & reset probability
2. Added details of Personalized Page Rank Algo supported in Graphx
3. Modified example 
- Sorted results in descending order by weights (ranks)
- Added example of PRR



PageRank measures the importance of each vertex in a graph, assuming an edge 
from u to v represents an endorsement of v’s importance by u. For example, if a 
Twitter user is followed by many others, the user will be ranked highly. 
*PageRank works by computing number and quality of links to a node to estimate 
the importance of a node. *
GraphX comes with static and dynamic implementations of PageRank as methods on 
the PageRank object. Static PageRank runs for a fixed number of iterations, 
while dynamic PageRank runs until the ranks converge (i.e., stop changing by 
more than a specified tolerance). *Dynamic version of page rank 
PageRank$pageRank takes in two parameters tolerance factor and reset 
probability, whereas static version of page rank PageRank$staticPageRank takes 
in 2 parameters, number of iterations and reset probability. Reset probability 
is associated with damping factor, which is click through probability. Page 
rank is based on random surfer model, and damping factor is factor by which 
surfer would continue visiting different links. Damping factor ranges between 0 
and 1. By default, damping factor value is 

[jira] [Comment Edited] (SPARK-21861) Add more details to PageRank illustration

2017-09-01 Thread Nikhil Bhide (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16150389#comment-16150389
 ] 

Nikhil Bhide edited comment on SPARK-21861 at 9/1/17 11:44 AM:
---

Hi Sean,
Please find additional contents as follows. I have added few comments in the 
description section (highlighted), and I have slightly modified the example 
(highlighted).
Just to summarize :
1. Added details about damping factor & reset probability
2. Added details of Personalized Page Rank Algo supported in Graphx
3. Modified example 
- Sorted results in descending order by weights (ranks)
- Added example of PRR



PageRank measures the importance of each vertex in a graph, assuming an edge 
from u to v represents an endorsement of v’s importance by u. For example, if a 
Twitter user is followed by many others, the user will be ranked 
highly.{color:red} *PageRank works by computing number and quality of links to 
a node to estimate the importance of a node. *{color}
GraphX comes with static and dynamic implementations of PageRank as methods on 
the PageRank object. Static PageRank runs for a fixed number of iterations, 
while dynamic PageRank runs until the ranks converge (i.e., stop changing by 
more than a specified tolerance). {color:red}Dynamic version of page rank 
PageRank$pageRank takes in two parameters tolerance factor and reset 
probability, whereas static version of page rank PageRank$staticPageRank takes 
in 2 parameters, number of iterations and reset probability. Reset probability 
is associated with damping factor, which is click through probability. Page 
rank is based on random surfer model, and damping factor is factor by which 
surfer would continue visiting different links. Damping factor ranges between 0 
and 1. By default, damping factor value is set to 0.85 and random probability 
is calculated as 1 – damping factor.{color}
{color:red}GraphX also supports Personalized PageRank (PRR), which is more 
general version of page rank. PRR is widely used in recommendation systems. For 
example, Twitter uses PRR to present users with other accounts that they may 
wish to follow. GraphX provides static and dynamic implementations of 
Personalized PageRank methods on PageRank object.{color}
GraphOpsallows calling these algorithms directly as methods on Graph.

 import org.apache.spark.graphx.GraphLoader

// Load the edges as a graph
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Run PageRank
val ranks = graph.pageRank(0.0001).vertices
// Join the ranks with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
  val fields = line.split(",")
  (fields(0).toLong, fields(1))
}
val ranksByUsername = users.join(ranks).map {
  case (id, (username, rank)) => (username, rank)
}
// Print the result
*println(ranksByUsername.sortBy({ case (username, rank) => rank }, 
false).collect().mkString("\n"))
*
*val ranksPRR = graph.personalizedPageRank(graph.vertices.first._1, 
0.0001).vertices
val ranksPRRByUsername = users.join(ranksPRR).map {
  case (id, (username, rank)) => (username, rank)
}
// Print the result*
*println(ranksPRRByUsername.sortBy({ case (username, rank) => rank }, 
false).collect().mkString("\n"))
*


was (Author: nikbhi15):
Hi Sean,
Please find additional contents as follows. I have added few comments in the 
description section (highlighted), and I have slightly modified the example 
(highlighted).
Just to summarize :
1. Added details about damping factor & reset probability
2. Added details of Personalized Page Rank Algo supported in Graphx
3. Modified example 
- Sorted results in descending order by weights (ranks)
- Added example of PRR



PageRank measures the importance of each vertex in a graph, assuming an edge 
from u to v represents an endorsement of v’s importance by u. For example, if a 
Twitter user is followed by many others, the user will be ranked 
highly.{color:red} *PageRank works by computing number and quality of links to 
a node to estimate the importance of a node. *{color}
GraphX comes with static and dynamic implementations of PageRank as methods on 
the PageRank object. Static PageRank runs for a fixed number of iterations, 
while dynamic PageRank runs until the ranks converge (i.e., stop changing by 
more than a specified tolerance). {color:red}Dynamic version of page rank 
PageRank$pageRank takes in two parameters tolerance factor and reset 
probability, whereas static version of page rank PageRank$staticPageRank takes 
in 2 parameters, number of iterations and reset probability. Reset probability 
is associated with damping factor, which is click through probability. Page 
rank is based on random surfer model, and damping factor is factor by which 
surfer would continue visiting different links. Damping factor ran

[jira] [Comment Edited] (SPARK-21861) Add more details to PageRank illustration

2017-09-01 Thread Nikhil Bhide (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16150389#comment-16150389
 ] 

Nikhil Bhide edited comment on SPARK-21861 at 9/1/17 11:45 AM:
---

Hi Sean,
Please find additional contents as follows. I have added few comments in the 
description section (highlighted), and I have slightly modified the example 
(highlighted).
Just to summarize :
1. Added details about damping factor & reset probability
2. Added details of Personalized Page Rank Algo supported in Graphx
3. Modified example 
- Sorted results in descending order by weights (ranks)
- Added example of PRR



PageRank measures the importance of each vertex in a graph, assuming an edge 
from u to v represents an endorsement of v’s importance by u. For example, if a 
Twitter user is followed by many others, the user will be ranked 
highly.{color:red} *PageRank works by computing number and quality of links to 
a node to estimate the importance of a node. *{color}
GraphX comes with static and dynamic implementations of PageRank as methods on 
the PageRank object. Static PageRank runs for a fixed number of iterations, 
while dynamic PageRank runs until the ranks converge (i.e., stop changing by 
more than a specified tolerance). {color:red}Dynamic version of page rank 
PageRank$pageRank takes in two parameters tolerance factor and reset 
probability, whereas static version of page rank PageRank$staticPageRank takes 
in 2 parameters, number of iterations and reset probability. Reset probability 
is associated with damping factor, which is click through probability. Page 
rank is based on random surfer model, and damping factor is factor by which 
surfer would continue visiting different links. Damping factor ranges between 0 
and 1. By default, damping factor value is set to 0.85 and random probability 
is calculated as 1 – damping factor.{color}
{color:red}GraphX also supports Personalized PageRank (PRR), which is more 
general version of page rank. PRR is widely used in recommendation systems. For 
example, Twitter uses PRR to present users with other accounts that they may 
wish to follow. GraphX provides static and dynamic implementations of 
Personalized PageRank methods on PageRank object.{color}
GraphOpsallows calling these algorithms directly as methods on Graph.

GraphX also includes an example social network dataset that we can run PageRank 
on. A set of users is given in data/graphx/users.txt, and a set of 
relationships between users is given in data/graphx/followers.txt. We compute 
the PageRank of each user as follows:

 import org.apache.spark.graphx.GraphLoader

// Load the edges as a graph
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Run PageRank
val ranks = graph.pageRank(0.0001).vertices
// Join the ranks with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
  val fields = line.split(",")
  (fields(0).toLong, fields(1))
}
val ranksByUsername = users.join(ranks).map {
  case (id, (username, rank)) => (username, rank)
}
// Print the result
*println(ranksByUsername.sortBy({ case (username, rank) => rank }, 
false).collect().mkString("\n"))
*
*val ranksPRR = graph.personalizedPageRank(graph.vertices.first._1, 
0.0001).vertices
val ranksPRRByUsername = users.join(ranksPRR).map {
  case (id, (username, rank)) => (username, rank)
}
// Print the result*
*println(ranksPRRByUsername.sortBy({ case (username, rank) => rank }, 
false).collect().mkString("\n"))
*


was (Author: nikbhi15):
Hi Sean,
Please find additional contents as follows. I have added few comments in the 
description section (highlighted), and I have slightly modified the example 
(highlighted).
Just to summarize :
1. Added details about damping factor & reset probability
2. Added details of Personalized Page Rank Algo supported in Graphx
3. Modified example 
- Sorted results in descending order by weights (ranks)
- Added example of PRR



PageRank measures the importance of each vertex in a graph, assuming an edge 
from u to v represents an endorsement of v’s importance by u. For example, if a 
Twitter user is followed by many others, the user will be ranked 
highly.{color:red} *PageRank works by computing number and quality of links to 
a node to estimate the importance of a node. *{color}
GraphX comes with static and dynamic implementations of PageRank as methods on 
the PageRank object. Static PageRank runs for a fixed number of iterations, 
while dynamic PageRank runs until the ranks converge (i.e., stop changing by 
more than a specified tolerance). {color:red}Dynamic version of page rank 
PageRank$pageRank takes in two parameters tolerance factor and reset 
probability, whereas static version of page rank PageRank$staticPageRank takes 
in 2 parameters, number of iterat

[jira] [Comment Edited] (SPARK-21861) Add more details to PageRank illustration

2017-09-01 Thread Nikhil Bhide (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16150389#comment-16150389
 ] 

Nikhil Bhide edited comment on SPARK-21861 at 9/1/17 11:42 AM:
---

Hi Sean,
Please find additional contents as follows. I have added few comments in the 
description section (highlighted), and I have slightly modified the example 
(highlighted).
Just to summarize :
1. Added details about damping factor & reset probability
2. Added details of Personalized Page Rank Algo supported in Graphx
3. Modified example 
- Sorted results in descending order by weights (ranks)
- Added example of PRR



PageRank measures the importance of each vertex in a graph, assuming an edge 
from u to v represents an endorsement of v’s importance by u. For example, if a 
Twitter user is followed by many others, the user will be ranked highly. 
*PageRank works by computing number and quality of links to a node to estimate 
the importance of a node. *
GraphX comes with static and dynamic implementations of PageRank as methods on 
the PageRank object. Static PageRank runs for a fixed number of iterations, 
while dynamic PageRank runs until the ranks converge (i.e., stop changing by 
more than a specified tolerance). *Dynamic version of page rank 
PageRank$pageRank takes in two parameters tolerance factor and reset 
probability, whereas static version of page rank PageRank$staticPageRank takes 
in 2 parameters, number of iterations and reset probability. Reset probability 
is associated with damping factor, which is click through probability. Page 
rank is based on random surfer model, and damping factor is factor by which 
surfer would continue visiting different links. Damping factor ranges between 0 
and 1. By default, damping factor value is set to 0.85 and random probability 
is calculated as 1 – damping factor.*
*Graphx also supports Personalized PageRank (PRR), which is more general 
version of page rank. PRR is widely used in recommendation systems. For 
example, Twitter uses PRR to present users with other accounts that they may 
wish to follow. GraphX provides static and dynamic implementations of 
Personalized PageRank methods on PageRank object. 
GraphOpsallows calling these algorithms directly as methods on Graph. *

 import org.apache.spark.graphx.GraphLoader

// Load the edges as a graph
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Run PageRank
val ranks = graph.pageRank(0.0001).vertices
// Join the ranks with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
  val fields = line.split(",")
  (fields(0).toLong, fields(1))
}
val ranksByUsername = users.join(ranks).map {
  case (id, (username, rank)) => (username, rank)
}
// Print the result
*println(ranksByUsername.sortBy({ case (username, rank) => rank }, 
false).collect().mkString("\n"))
*
*val ranksPRR = graph.personalizedPageRank(graph.vertices.first._1, 
0.0001).vertices
val ranksPRRByUsername = users.join(ranksPRR).map {
  case (id, (username, rank)) => (username, rank)
}
// Print the result*
*println(ranksPRRByUsername.sortBy({ case (username, rank) => rank }, 
false).collect().mkString("\n"))
*


was (Author: nikbhi15):
Hi Sean,
Please find additional contents as follows. I have added few comments in the 
description section (highlighted), and I have slightly modified the example 
(highlighted).
Just to summarize :
1. Added details about damping factor & reset probability
2. Added details of Personalized Page Rank Algo supported in Graphx
3. Modified example 
- Sorted results in descending order by weights (ranks)
- Added example of PRR


PageRank measures the importance of each vertex in a graph, assuming an edge 
from u to v represents an endorsement of v’s importance by u. For example, if a 
Twitter user is followed by many others, the user will be ranked highly.* 
PageRank works by computing number and quality of links to a node to estimate 
the importance of a node.* 
GraphX comes with static and dynamic implementations of PageRank as methods on 
the PageRank object. Static PageRank runs for a fixed number of iterations, 
while dynamic PageRank runs until the ranks converge (i.e., stop changing by 
more than a specified tolerance). *Dynamic version of page rank 
PageRank$pageRank takes in two parameters tolerance factor and reset 
probability, whereas static version of page rank PageRank$staticPageRank takes 
in 2 parameters, number of iterations and reset probability. Reset probability 
is associated with damping factor, which is click through probability. Page 
rank is based on random surfer model, and damping factor is factor by which 
surfer would continue visiting different links. Damping factor ranges between 0 
and 1. By default, damping factor value is set to 0.85 and ran

[jira] [Comment Edited] (SPARK-21861) Add more details to PageRank illustration

2017-09-01 Thread Nikhil Bhide (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16150389#comment-16150389
 ] 

Nikhil Bhide edited comment on SPARK-21861 at 9/1/17 11:48 AM:
---

Hi Sean,
Please find additional contents as follows. I have added few comments in the 
description section (highlighted), and I have slightly modified the example 
(highlighted).
Just to summarize :
1. Added details about damping factor & reset probability
2. Added details of Personalized Page Rank Algo supported in Graphx
3. Modified example 
- Sorted results in descending order by weights (ranks)
- Added example of PRR



PageRank measures the importance of each vertex in a graph, assuming an edge 
from u to v represents an endorsement of v’s importance by u. For example, if a 
Twitter user is followed by many others, the user will be ranked 
highly.{color:red} *PageRank works by computing number and quality of links to 
a node to estimate the importance of a node. *{color}
GraphX comes with static and dynamic implementations of PageRank as methods on 
the PageRank object. Static PageRank runs for a fixed number of iterations, 
while dynamic PageRank runs until the ranks converge (i.e., stop changing by 
more than a specified tolerance). {color:red}Dynamic version of page rank 
PageRank$pageRank takes in two parameters tolerance factor and reset 
probability, whereas static version of page rank PageRank$staticPageRank takes 
in 2 parameters, number of iterations and reset probability. Reset probability 
is associated with damping factor, which is click through probability. Page 
rank is based on random surfer model, and damping factor is factor by which 
surfer would continue visiting different links. Damping factor ranges between 0 
and 1. By default, damping factor value is set to 0.85 and random probability 
is calculated as 1 – damping factor.{color}
{color:red}GraphX also supports Personalized PageRank (PRR), which is more 
general version of page rank. PRR is widely used in recommendation systems. For 
example, Twitter uses PRR to present users with other accounts that they may 
wish to follow. GraphX provides static and dynamic implementations of 
Personalized PageRank methods on PageRank object.{color}
GraphOpsallows calling these algorithms directly as methods on Graph.

GraphX also includes an example social network dataset that we can run PageRank 
on. A set of users is given in data/graphx/users.txt, and a set of 
relationships between users is given in data/graphx/followers.txt. We compute 
the PageRank of each user as follows:

import org.apache.spark.graphx.GraphLoader
// Load the edges as a graph
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Run PageRank
val ranks = graph.pageRank(0.0001).vertices
// Join the ranks with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
  val fields = line.split(",")
  (fields(0).toLong, fields(1))
}
val ranksByUsername = users.join(ranks).map {
  case (id, (username, rank)) => (username, rank)
}
// Print the result
{color:red}println(ranksByUsername.sortBy({ case (username, rank) => rank 
}, false).collect().mkString("\n"))

//Run Personalized PageRank Algorithm on first vertex as a source vertex
val ranksPRR = graph.personalizedPageRank(graph.vertices.first._1, 
0.0001).vertices
val ranksPRRByUsername = users.join(ranksPRR).map {
  case (id, (username, rank)) => (username, rank)
}
// Print the result
println(ranksPRRByUsername.sortBy({ case (username, rank) => rank }, 
false).collect().mkString("\n")){color}




was (Author: nikbhi15):
Hi Sean,
Please find additional contents as follows. I have added few comments in the 
description section (highlighted), and I have slightly modified the example 
(highlighted).
Just to summarize :
1. Added details about damping factor & reset probability
2. Added details of Personalized Page Rank Algo supported in Graphx
3. Modified example 
- Sorted results in descending order by weights (ranks)
- Added example of PRR



PageRank measures the importance of each vertex in a graph, assuming an edge 
from u to v represents an endorsement of v’s importance by u. For example, if a 
Twitter user is followed by many others, the user will be ranked 
highly.{color:red} *PageRank works by computing number and quality of links to 
a node to estimate the importance of a node. *{color}
GraphX comes with static and dynamic implementations of PageRank as methods on 
the PageRank object. Static PageRank runs for a fixed number of iterations, 
while dynamic PageRank runs until the ranks converge (i.e., stop changing by 
more than a specified tolerance). {color:red}Dynamic version of page rank 
PageRank$pageRank takes in two parameters tolerance factor and reset 
probability, whereas stat

[jira] [Comment Edited] (SPARK-21861) Add more details to PageRank illustration

2017-09-01 Thread Nikhil Bhide (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16150389#comment-16150389
 ] 

Nikhil Bhide edited comment on SPARK-21861 at 9/1/17 11:47 AM:
---

Hi Sean,
Please find additional contents as follows. I have added few comments in the 
description section (highlighted), and I have slightly modified the example 
(highlighted).
Just to summarize :
1. Added details about damping factor & reset probability
2. Added details of Personalized Page Rank Algo supported in Graphx
3. Modified example 
- Sorted results in descending order by weights (ranks)
- Added example of PRR



PageRank measures the importance of each vertex in a graph, assuming an edge 
from u to v represents an endorsement of v’s importance by u. For example, if a 
Twitter user is followed by many others, the user will be ranked 
highly.{color:red} *PageRank works by computing number and quality of links to 
a node to estimate the importance of a node. *{color}
GraphX comes with static and dynamic implementations of PageRank as methods on 
the PageRank object. Static PageRank runs for a fixed number of iterations, 
while dynamic PageRank runs until the ranks converge (i.e., stop changing by 
more than a specified tolerance). {color:red}Dynamic version of page rank 
PageRank$pageRank takes in two parameters tolerance factor and reset 
probability, whereas static version of page rank PageRank$staticPageRank takes 
in 2 parameters, number of iterations and reset probability. Reset probability 
is associated with damping factor, which is click through probability. Page 
rank is based on random surfer model, and damping factor is factor by which 
surfer would continue visiting different links. Damping factor ranges between 0 
and 1. By default, damping factor value is set to 0.85 and random probability 
is calculated as 1 – damping factor.{color}
{color:red}GraphX also supports Personalized PageRank (PRR), which is more 
general version of page rank. PRR is widely used in recommendation systems. For 
example, Twitter uses PRR to present users with other accounts that they may 
wish to follow. GraphX provides static and dynamic implementations of 
Personalized PageRank methods on PageRank object.{color}
GraphOpsallows calling these algorithms directly as methods on Graph.

GraphX also includes an example social network dataset that we can run PageRank 
on. A set of users is given in data/graphx/users.txt, and a set of 
relationships between users is given in data/graphx/followers.txt. We compute 
the PageRank of each user as follows:

 import org.apache.spark.graphx.GraphLoader

// Load the edges as a graph
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Run PageRank
val ranks = graph.pageRank(0.0001).vertices
// Join the ranks with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
  val fields = line.split(",")
  (fields(0).toLong, fields(1))
}
val ranksByUsername = users.join(ranks).map {
  case (id, (username, rank)) => (username, rank)
}
// Print the result
{color:red}println(ranksByUsername.sortBy({ case (username, rank) => rank 
}, false).collect().mkString("\n"))

//Run Personalized PageRank Algorithm on first vertex as a source vertex
val ranksPRR = graph.personalizedPageRank(graph.vertices.first._1, 
0.0001).vertices
val ranksPRRByUsername = users.join(ranksPRR).map {
  case (id, (username, rank)) => (username, rank)
}
// Print the result
println(ranksPRRByUsername.sortBy({ case (username, rank) => rank }, 
false).collect().mkString("\n")){color}



was (Author: nikbhi15):
Hi Sean,
Please find additional contents as follows. I have added few comments in the 
description section (highlighted), and I have slightly modified the example 
(highlighted).
Just to summarize :
1. Added details about damping factor & reset probability
2. Added details of Personalized Page Rank Algo supported in Graphx
3. Modified example 
- Sorted results in descending order by weights (ranks)
- Added example of PRR



PageRank measures the importance of each vertex in a graph, assuming an edge 
from u to v represents an endorsement of v’s importance by u. For example, if a 
Twitter user is followed by many others, the user will be ranked 
highly.{color:red} *PageRank works by computing number and quality of links to 
a node to estimate the importance of a node. *{color}
GraphX comes with static and dynamic implementations of PageRank as methods on 
the PageRank object. Static PageRank runs for a fixed number of iterations, 
while dynamic PageRank runs until the ranks converge (i.e., stop changing by 
more than a specified tolerance). {color:red}Dynamic version of page rank 
PageRank$pageRank takes in two parameters tolerance factor and reset 
probability, whereas sta

[jira] [Updated] (SPARK-21861) Add more details to PageRank illustration

2017-09-01 Thread Nikhil Bhide (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nikhil Bhide updated SPARK-21861:
-
Attachment: (was: PageRankExample.scala)

> Add more details to PageRank illustration
> -
>
> Key: SPARK-21861
> URL: https://issues.apache.org/jira/browse/SPARK-21861
> Project: Spark
>  Issue Type: Documentation
>  Components: Documentation
>Affects Versions: 2.2.0
>Reporter: Nikhil Bhide
>Priority: Trivial
>  Labels: documentation
> Attachments: PageRankExample.scala
>
>
> Add more details to PageRank illustration on 
> [https://spark.apache.org/docs/latest/graphx-programming-guide.html#pagerank]
> Adding details of page rank algorithm parameters such as dumping factor would 
> be pretty much effective. Also, adding more action on result such as sorting 
> based on weight would be more helpful.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21861) Add more details to PageRank illustration

2017-09-01 Thread Nikhil Bhide (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nikhil Bhide updated SPARK-21861:
-
Attachment: PageRankExample.scala

> Add more details to PageRank illustration
> -
>
> Key: SPARK-21861
> URL: https://issues.apache.org/jira/browse/SPARK-21861
> Project: Spark
>  Issue Type: Documentation
>  Components: Documentation
>Affects Versions: 2.2.0
>Reporter: Nikhil Bhide
>Priority: Trivial
>  Labels: documentation
> Attachments: PageRankExample.scala
>
>
> Add more details to PageRank illustration on 
> [https://spark.apache.org/docs/latest/graphx-programming-guide.html#pagerank]
> Adding details of page rank algorithm parameters such as dumping factor would 
> be pretty much effective. Also, adding more action on result such as sorting 
> based on weight would be more helpful.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Issue Comment Deleted] (SPARK-21861) Add more details to PageRank illustration

2017-09-01 Thread Nikhil Bhide (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nikhil Bhide updated SPARK-21861:
-
Comment: was deleted

(was: PageRankExample.scala
)

> Add more details to PageRank illustration
> -
>
> Key: SPARK-21861
> URL: https://issues.apache.org/jira/browse/SPARK-21861
> Project: Spark
>  Issue Type: Documentation
>  Components: Documentation
>Affects Versions: 2.2.0
>Reporter: Nikhil Bhide
>Priority: Trivial
>  Labels: documentation
> Attachments: PageRankExample.scala
>
>
> Add more details to PageRank illustration on 
> [https://spark.apache.org/docs/latest/graphx-programming-guide.html#pagerank]
> Adding details of page rank algorithm parameters such as dumping factor would 
> be pretty much effective. Also, adding more action on result such as sorting 
> based on weight would be more helpful.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-21861) Add more details to PageRank illustration

2017-09-01 Thread Nikhil Bhide (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16150389#comment-16150389
 ] 

Nikhil Bhide edited comment on SPARK-21861 at 9/1/17 11:53 AM:
---

Hi Sean,
Please find additional contents as follows. I have added few comments in the 
description section (highlighted), and I have slightly modified the example 
(highlighted).
Just to summarize :
1. Added details about damping factor & reset probability
2. Added details of Personalized Page Rank Algo supported in Graphx
3. Modified example 
- Sorted results in descending order by weights (ranks)
- Added example of PRR



PageRank measures the importance of each vertex in a graph, assuming an edge 
from u to v represents an endorsement of v’s importance by u. For example, if a 
Twitter user is followed by many others, the user will be ranked 
highly.{color:red} *PageRank works by computing number and quality of links to 
a node to estimate the importance of a node. *{color}
GraphX comes with static and dynamic implementations of PageRank as methods on 
the PageRank object. Static PageRank runs for a fixed number of iterations, 
while dynamic PageRank runs until the ranks converge (i.e., stop changing by 
more than a specified tolerance). {color:red}Dynamic version of page rank 
PageRank$pageRank takes in two parameters tolerance factor and reset 
probability, whereas static version of page rank PageRank$staticPageRank takes 
in 2 parameters, number of iterations and reset probability. Reset probability 
is associated with damping factor, which is click through probability. Page 
rank is based on random surfer model, and damping factor is factor by which 
surfer would continue visiting different links. Damping factor ranges between 0 
and 1. By default, damping factor value is set to 0.85 and random probability 
is calculated as 1 – damping factor.{color}
{color:red}GraphX also supports Personalized PageRank (PRR), which is more 
general version of page rank. PRR is widely used in recommendation systems. For 
example, Twitter uses PRR to present users with other accounts that they may 
wish to follow. GraphX provides static and dynamic implementations of 
Personalized PageRank methods on PageRank object.{color}
GraphOpsallows calling these algorithms directly as methods on Graph.

GraphX also includes an example social network dataset that we can run PageRank 
on. A set of users is given in data/graphx/users.txt, and a set of 
relationships between users is given in data/graphx/followers.txt. We compute 
the PageRank of each user as follows:

Code changes - PageRankExample.scala



was (Author: nikbhi15):
Hi Sean,
Please find additional contents as follows. I have added few comments in the 
description section (highlighted), and I have slightly modified the example 
(highlighted).
Just to summarize :
1. Added details about damping factor & reset probability
2. Added details of Personalized Page Rank Algo supported in Graphx
3. Modified example 
- Sorted results in descending order by weights (ranks)
- Added example of PRR



PageRank measures the importance of each vertex in a graph, assuming an edge 
from u to v represents an endorsement of v’s importance by u. For example, if a 
Twitter user is followed by many others, the user will be ranked 
highly.{color:red} *PageRank works by computing number and quality of links to 
a node to estimate the importance of a node. *{color}
GraphX comes with static and dynamic implementations of PageRank as methods on 
the PageRank object. Static PageRank runs for a fixed number of iterations, 
while dynamic PageRank runs until the ranks converge (i.e., stop changing by 
more than a specified tolerance). {color:red}Dynamic version of page rank 
PageRank$pageRank takes in two parameters tolerance factor and reset 
probability, whereas static version of page rank PageRank$staticPageRank takes 
in 2 parameters, number of iterations and reset probability. Reset probability 
is associated with damping factor, which is click through probability. Page 
rank is based on random surfer model, and damping factor is factor by which 
surfer would continue visiting different links. Damping factor ranges between 0 
and 1. By default, damping factor value is set to 0.85 and random probability 
is calculated as 1 – damping factor.{color}
{color:red}GraphX also supports Personalized PageRank (PRR), which is more 
general version of page rank. PRR is widely used in recommendation systems. For 
example, Twitter uses PRR to present users with other accounts that they may 
wish to follow. GraphX provides static and dynamic implementations of 
Personalized PageRank methods on PageRank object.{color}
GraphOpsallows calling these algorithms directly as methods on Graph.

GraphX also includes an example social network dataset that we can run PageRank 
on. A set of users is given i

[jira] [Updated] (SPARK-21861) Add more details to PageRank illustration

2017-09-01 Thread Nikhil Bhide (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nikhil Bhide updated SPARK-21861:
-
Attachment: PageRankExample.scala

Modified PageRankExample

> Add more details to PageRank illustration
> -
>
> Key: SPARK-21861
> URL: https://issues.apache.org/jira/browse/SPARK-21861
> Project: Spark
>  Issue Type: Documentation
>  Components: Documentation
>Affects Versions: 2.2.0
>Reporter: Nikhil Bhide
>Priority: Trivial
>  Labels: documentation
> Attachments: PageRankExample.scala
>
>
> Add more details to PageRank illustration on 
> [https://spark.apache.org/docs/latest/graphx-programming-guide.html#pagerank]
> Adding details of page rank algorithm parameters such as dumping factor would 
> be pretty much effective. Also, adding more action on result such as sorting 
> based on weight would be more helpful.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-21861) Add more details to PageRank illustration

2017-09-01 Thread Nikhil Bhide (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16150389#comment-16150389
 ] 

Nikhil Bhide edited comment on SPARK-21861 at 9/1/17 11:54 AM:
---

Hi Sean,
Please find additional contents as follows. I have added few comments in the 
description section (highlighted), and I have slightly modified the example 
(highlighted).
Just to summarize :
1. Added details about damping factor & reset probability
2. Added details of Personalized Page Rank Algo supported in Graphx
3. Modified example 
- Sorted results in descending order by weights (ranks)
- Added example of PRR



PageRank measures the importance of each vertex in a graph, assuming an edge 
from u to v represents an endorsement of v’s importance by u. For example, if a 
Twitter user is followed by many others, the user will be ranked 
highly.{color:red} PageRank works by computing number and quality of links to a 
node to estimate the importance of a node.{color}
GraphX comes with static and dynamic implementations of PageRank as methods on 
the PageRank object. Static PageRank runs for a fixed number of iterations, 
while dynamic PageRank runs until the ranks converge (i.e., stop changing by 
more than a specified tolerance). {color:red}Dynamic version of page rank 
PageRank$pageRank takes in two parameters tolerance factor and reset 
probability, whereas static version of page rank PageRank$staticPageRank takes 
in 2 parameters, number of iterations and reset probability. Reset probability 
is associated with damping factor, which is click through probability. Page 
rank is based on random surfer model, and damping factor is factor by which 
surfer would continue visiting different links. Damping factor ranges between 0 
and 1. By default, damping factor value is set to 0.85 and random probability 
is calculated as 1 – damping factor.{color}
{color:red}GraphX also supports Personalized PageRank (PRR), which is more 
general version of page rank. PRR is widely used in recommendation systems. For 
example, Twitter uses PRR to present users with other accounts that they may 
wish to follow. GraphX provides static and dynamic implementations of 
Personalized PageRank methods on PageRank object.{color}
GraphOpsallows calling these algorithms directly as methods on Graph.

GraphX also includes an example social network dataset that we can run PageRank 
on. A set of users is given in data/graphx/users.txt, and a set of 
relationships between users is given in data/graphx/followers.txt. We compute 
the PageRank of each user as follows:

Code changes - PageRankExample.scala



was (Author: nikbhi15):
Hi Sean,
Please find additional contents as follows. I have added few comments in the 
description section (highlighted), and I have slightly modified the example 
(highlighted).
Just to summarize :
1. Added details about damping factor & reset probability
2. Added details of Personalized Page Rank Algo supported in Graphx
3. Modified example 
- Sorted results in descending order by weights (ranks)
- Added example of PRR



PageRank measures the importance of each vertex in a graph, assuming an edge 
from u to v represents an endorsement of v’s importance by u. For example, if a 
Twitter user is followed by many others, the user will be ranked 
highly.{color:red} *PageRank works by computing number and quality of links to 
a node to estimate the importance of a node. *{color}
GraphX comes with static and dynamic implementations of PageRank as methods on 
the PageRank object. Static PageRank runs for a fixed number of iterations, 
while dynamic PageRank runs until the ranks converge (i.e., stop changing by 
more than a specified tolerance). {color:red}Dynamic version of page rank 
PageRank$pageRank takes in two parameters tolerance factor and reset 
probability, whereas static version of page rank PageRank$staticPageRank takes 
in 2 parameters, number of iterations and reset probability. Reset probability 
is associated with damping factor, which is click through probability. Page 
rank is based on random surfer model, and damping factor is factor by which 
surfer would continue visiting different links. Damping factor ranges between 0 
and 1. By default, damping factor value is set to 0.85 and random probability 
is calculated as 1 – damping factor.{color}
{color:red}GraphX also supports Personalized PageRank (PRR), which is more 
general version of page rank. PRR is widely used in recommendation systems. For 
example, Twitter uses PRR to present users with other accounts that they may 
wish to follow. GraphX provides static and dynamic implementations of 
Personalized PageRank methods on PageRank object.{color}
GraphOpsallows calling these algorithms directly as methods on Graph.

GraphX also includes an example social network dataset that we can run PageRank 
on. A set of users is given in d

[jira] [Comment Edited] (SPARK-21861) Add more details to PageRank illustration

2017-09-01 Thread Nikhil Bhide (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16150405#comment-16150405
 ] 

Nikhil Bhide edited comment on SPARK-21861 at 9/1/17 11:54 AM:
---

PageRankExample.scala



was (Author: nikbhi15):
Modified PageRankExample

> Add more details to PageRank illustration
> -
>
> Key: SPARK-21861
> URL: https://issues.apache.org/jira/browse/SPARK-21861
> Project: Spark
>  Issue Type: Documentation
>  Components: Documentation
>Affects Versions: 2.2.0
>Reporter: Nikhil Bhide
>Priority: Trivial
>  Labels: documentation
> Attachments: PageRankExample.scala
>
>
> Add more details to PageRank illustration on 
> [https://spark.apache.org/docs/latest/graphx-programming-guide.html#pagerank]
> Adding details of page rank algorithm parameters such as dumping factor would 
> be pretty much effective. Also, adding more action on result such as sorting 
> based on weight would be more helpful.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-21861) Add more details to PageRank illustration

2017-09-01 Thread Nikhil Bhide (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16150389#comment-16150389
 ] 

Nikhil Bhide edited comment on SPARK-21861 at 9/1/17 11:56 AM:
---

Hi Sean,
Please find additional contents as follows. I have added few comments in the 
description section (highlighted), and I have slightly modified the example 
(highlighted).
Just to summarize :
1. Added details about damping factor & reset probability
2. Added details of Personalized Page Rank Algo supported in Graphx
3. Modified example 
- Sorted results in descending order by weights (ranks)
- Added example of PRR



PageRank measures the importance of each vertex in a graph, assuming an edge 
from u to v represents an endorsement of v’s importance by u. For example, if a 
Twitter user is followed by many others, the user will be ranked 
highly.{color:red} PageRank works by computing number and quality of links to a 
node to estimate the importance of a node.{color}
GraphX comes with static and dynamic implementations of PageRank as methods on 
the PageRank object. Static PageRank runs for a fixed number of iterations, 
while dynamic PageRank runs until the ranks converge (i.e., stop changing by 
more than a specified tolerance). {color:red}Dynamic version of page rank 
PageRank$pageRank takes in two parameters tolerance factor and reset 
probability, whereas static version of page rank PageRank$staticPageRank takes 
in 2 parameters, number of iterations and reset probability. Reset probability 
is associated with damping factor, which is click through probability. Page 
rank is based on random surfer model, and damping factor is factor by which 
surfer would continue visiting different links. Damping factor ranges between 0 
and 1. By default, damping factor value is set to 0.85 and random probability 
is calculated as 1 – damping factor.{color}
{color:red}GraphX also supports Personalized PageRank (PRR), which is more 
general version of page rank. PRR is widely used in recommendation systems. For 
example, Twitter uses PRR to present users with other accounts that they may 
wish to follow. GraphX provides static and dynamic implementations of 
Personalized PageRank methods on PageRank object.{color}
GraphOpsallows calling these algorithms directly as methods on Graph.

GraphX also includes an example social network dataset that we can run PageRank 
on. A set of users is given in data/graphx/users.txt, and a set of 
relationships between users is given in data/graphx/followers.txt. We compute 
the PageRank of each user as follows:

Code changes - [^PageRankExample.scala]



was (Author: nikbhi15):
Hi Sean,
Please find additional contents as follows. I have added few comments in the 
description section (highlighted), and I have slightly modified the example 
(highlighted).
Just to summarize :
1. Added details about damping factor & reset probability
2. Added details of Personalized Page Rank Algo supported in Graphx
3. Modified example 
- Sorted results in descending order by weights (ranks)
- Added example of PRR



PageRank measures the importance of each vertex in a graph, assuming an edge 
from u to v represents an endorsement of v’s importance by u. For example, if a 
Twitter user is followed by many others, the user will be ranked 
highly.{color:red} PageRank works by computing number and quality of links to a 
node to estimate the importance of a node.{color}
GraphX comes with static and dynamic implementations of PageRank as methods on 
the PageRank object. Static PageRank runs for a fixed number of iterations, 
while dynamic PageRank runs until the ranks converge (i.e., stop changing by 
more than a specified tolerance). {color:red}Dynamic version of page rank 
PageRank$pageRank takes in two parameters tolerance factor and reset 
probability, whereas static version of page rank PageRank$staticPageRank takes 
in 2 parameters, number of iterations and reset probability. Reset probability 
is associated with damping factor, which is click through probability. Page 
rank is based on random surfer model, and damping factor is factor by which 
surfer would continue visiting different links. Damping factor ranges between 0 
and 1. By default, damping factor value is set to 0.85 and random probability 
is calculated as 1 – damping factor.{color}
{color:red}GraphX also supports Personalized PageRank (PRR), which is more 
general version of page rank. PRR is widely used in recommendation systems. For 
example, Twitter uses PRR to present users with other accounts that they may 
wish to follow. GraphX provides static and dynamic implementations of 
Personalized PageRank methods on PageRank object.{color}
GraphOpsallows calling these algorithms directly as methods on Graph.

GraphX also includes an example social network dataset that we can run PageRank 
on. A set of users is given in d

[jira] [Comment Edited] (SPARK-21885) HiveMetastoreCatalog.InferIfNeeded too slow when caseSensitiveInference enabled

2017-09-01 Thread liupengcheng (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16149997#comment-16149997
 ] 

liupengcheng edited comment on SPARK-21885 at 9/1/17 1:25 PM:
--

[~viirya] I think it's necessary, consider this senario, you have a timer job, 
and your schema may varies with time, you need to read the history data with 
old schema, but you are not expected to use  `INFER_AND_SAVE` to change the 
current schema. 

What's more, event if use `INFER_AND_SAVE`, it seems like that it will still 
infer schema. although there is some cache, but i think it's not enough, the 
first execution of any query for each session would be very slow.

{code:java}
private def inferIfNeeded(
  relation: MetastoreRelation,
  options: Map[String, String],
  fileFormat: FileFormat,
  fileIndexOpt: Option[FileIndex] = None): (StructType, CatalogTable) = {
val inferenceMode = 
sparkSession.sessionState.conf.caseSensitiveInferenceMode
bq. val shouldInfer = (inferenceMode != {quote}NEVER_INFER{quote}) && 
!relation.catalogTable.schemaPreservesCase
val tableName = relation.catalogTable.identifier.unquotedString
if (shouldInfer) {
  logInfo(s"Inferring case-sensitive schema for table $tableName (inference 
mode: " +
s"$inferenceMode)")
  val fileIndex = fileIndexOpt.getOrElse {
val rootPath = new Path(relation.catalogTable.location)
new InMemoryFileIndex(sparkSession, Seq(rootPath), options, None)
  }

  val inferredSchema = fileFormat
.inferSchema(
  sparkSession,
  options,
  fileIndex.listFiles(Nil).flatMap(_.files))
.map(mergeWithMetastoreSchema(relation.catalogTable.schema, _))

  inferredSchema match {
case Some(schema) =>
  if (inferenceMode == INFER_AND_SAVE) {
updateCatalogSchema(relation.catalogTable.identifier, schema)
  }
  (schema, relation.catalogTable.copy(schema = schema))
case None =>
  logWarning(s"Unable to infer schema for table $tableName from file 
format " +
s"$fileFormat (inference mode: $inferenceMode). Using metastore 
schema.")
  (relation.catalogTable.schema, relation.catalogTable)
  }
} else {
  (relation.catalogTable.schema, relation.catalogTable)
}
  }
{code}



was (Author: liupengcheng):
[~viirya] I think it's necessary, consider this senario, you have a timer job, 
and your schema may varies with time, you need to read the history data with 
old schema, but you are not expected to use  `INFER_AND_SAVE` to change the 
current schema. 

What's more, event if use `INFER_AND_SAVE`, it seems like that it will still 
infer schema. although there is some cache, but i think it's not enough, the 
first execution of any query for each session would be very slow.

{code:java}
private def inferIfNeeded(
  relation: MetastoreRelation,
  options: Map[String, String],
  fileFormat: FileFormat,
  fileIndexOpt: Option[FileIndex] = None): (StructType, CatalogTable) = {
val inferenceMode = 
sparkSession.sessionState.conf.caseSensitiveInferenceMode
val shouldInfer = (inferenceMode != {color:red}NEVER_INFER{color}) && 
!relation.catalogTable.schemaPreservesCase
val tableName = relation.catalogTable.identifier.unquotedString
if (shouldInfer) {
  logInfo(s"Inferring case-sensitive schema for table $tableName (inference 
mode: " +
s"$inferenceMode)")
  val fileIndex = fileIndexOpt.getOrElse {
val rootPath = new Path(relation.catalogTable.location)
new InMemoryFileIndex(sparkSession, Seq(rootPath), options, None)
  }

  val inferredSchema = fileFormat
.inferSchema(
  sparkSession,
  options,
  fileIndex.listFiles(Nil).flatMap(_.files))
.map(mergeWithMetastoreSchema(relation.catalogTable.schema, _))

  inferredSchema match {
case Some(schema) =>
  if (inferenceMode == INFER_AND_SAVE) {
updateCatalogSchema(relation.catalogTable.identifier, schema)
  }
  (schema, relation.catalogTable.copy(schema = schema))
case None =>
  logWarning(s"Unable to infer schema for table $tableName from file 
format " +
s"$fileFormat (inference mode: $inferenceMode). Using metastore 
schema.")
  (relation.catalogTable.schema, relation.catalogTable)
  }
} else {
  (relation.catalogTable.schema, relation.catalogTable)
}
  }
{code}


> HiveMetastoreCatalog.InferIfNeeded too slow when caseSensitiveInference 
> enabled
> ---
>
> Key: SPARK-21885
> URL: https://issues.apache.org/jira/browse/SPARK-21885
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Aff

[jira] [Updated] (SPARK-21888) Cannot add stuff to Client Classpath for Yarn Cluster Mode

2017-09-01 Thread Thomas Graves (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves updated SPARK-21888:
--
Issue Type: Improvement  (was: Bug)

> Cannot add stuff to Client Classpath for Yarn Cluster Mode
> --
>
> Key: SPARK-21888
> URL: https://issues.apache.org/jira/browse/SPARK-21888
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Parth Gandhi
>Priority: Minor
>
> While running Spark on Yarn in cluster mode, currently there is no way to add 
> any config files, jars etc. to Client classpath. An example for this is that 
> suppose you want to run an application that uses hbase. Then, unless and 
> until we do not copy the necessary config files required by hbase to Spark 
> Config folder, we cannot specify or set their exact locations in classpath on 
> Client end which we could do so earlier by setting the environment variable 
> "SPARK_CLASSPATH".



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21888) Cannot add stuff to Client Classpath for Yarn Cluster Mode

2017-09-01 Thread Thomas Graves (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16150530#comment-16150530
 ] 

Thomas Graves commented on SPARK-21888:
---

Putting things into SPARK_CONF_DIR will work, the question is more about 
convenience for users.  In hosted/multitenant environments there is probably a 
generic SPARK_CONF_DIR shared by everyone (at least this is how our env works), 
for the user to add hbase-site.xml they would have to copy, add files  and then 
export SPARK_CONF_DIR.  If that user continues to use the copied version they 
might miss changes to the cluster version, etc.  Previously they didn't have to 
do this, they just had to set SPARK_CLASSPATH, of course even that doesn't 
always work if your cluster env (spark_env.sh) had SPARK_CLASSPATH set in it.

So the question is more of what we think about this for convenience for users.  
Personally I think it would be nice to have a config that would allow users to 
set an extra classpath on the client side without having to modify the 
SPARK_CONF_DIR? 
 
I think we can move this to an improvement jira, if other people here don't 
agree or see the usefulness then we can just close.



> Cannot add stuff to Client Classpath for Yarn Cluster Mode
> --
>
> Key: SPARK-21888
> URL: https://issues.apache.org/jira/browse/SPARK-21888
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Parth Gandhi
>Priority: Minor
>
> While running Spark on Yarn in cluster mode, currently there is no way to add 
> any config files, jars etc. to Client classpath. An example for this is that 
> suppose you want to run an application that uses hbase. Then, unless and 
> until we do not copy the necessary config files required by hbase to Spark 
> Config folder, we cannot specify or set their exact locations in classpath on 
> Client end which we could do so earlier by setting the environment variable 
> "SPARK_CLASSPATH".



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-21888) Cannot add stuff to Client Classpath for Yarn Cluster Mode

2017-09-01 Thread Thomas Graves (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16150530#comment-16150530
 ] 

Thomas Graves edited comment on SPARK-21888 at 9/1/17 1:37 PM:
---

Putting things into SPARK_CONF_DIR will work, the question is more about 
convenience for users.  In hosted/multitenant environments there is probably a 
generic SPARK_CONF_DIR shared by everyone (at least this is how our env works), 
for the user to add hbase-site.xml they would have to copy, add files  and then 
export SPARK_CONF_DIR.  If that user continues to use the copied version they 
might miss changes to the cluster version, etc.  Previously they didn't have to 
do this, they just had to set SPARK_CLASSPATH, of course even that doesn't 
always work if your cluster env (spark_env.sh) had SPARK_CLASSPATH set in it.

So the question is more of what we think about this for convenience for users.  
Personally I think it would be nice to have a config that would allow users to 
set an extra classpath on the client side without having to modify the 
SPARK_CONF_DIR. thoughts from others?
 
I think we can move this to an improvement jira, if other people here don't 
agree or see the usefulness then we can just close.




was (Author: tgraves):
Putting things into SPARK_CONF_DIR will work, the question is more about 
convenience for users.  In hosted/multitenant environments there is probably a 
generic SPARK_CONF_DIR shared by everyone (at least this is how our env works), 
for the user to add hbase-site.xml they would have to copy, add files  and then 
export SPARK_CONF_DIR.  If that user continues to use the copied version they 
might miss changes to the cluster version, etc.  Previously they didn't have to 
do this, they just had to set SPARK_CLASSPATH, of course even that doesn't 
always work if your cluster env (spark_env.sh) had SPARK_CLASSPATH set in it.

So the question is more of what we think about this for convenience for users.  
Personally I think it would be nice to have a config that would allow users to 
set an extra classpath on the client side without having to modify the 
SPARK_CONF_DIR? 
 
I think we can move this to an improvement jira, if other people here don't 
agree or see the usefulness then we can just close.



> Cannot add stuff to Client Classpath for Yarn Cluster Mode
> --
>
> Key: SPARK-21888
> URL: https://issues.apache.org/jira/browse/SPARK-21888
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Parth Gandhi
>Priority: Minor
>
> While running Spark on Yarn in cluster mode, currently there is no way to add 
> any config files, jars etc. to Client classpath. An example for this is that 
> suppose you want to run an application that uses hbase. Then, unless and 
> until we do not copy the necessary config files required by hbase to Spark 
> Config folder, we cannot specify or set their exact locations in classpath on 
> Client end which we could do so earlier by setting the environment variable 
> "SPARK_CLASSPATH".



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-21890) ObtainCredentials does not pass creds to addDelegationTokens

2017-09-01 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-21890:


Assignee: (was: Apache Spark)

> ObtainCredentials does not pass creds to addDelegationTokens
> 
>
> Key: SPARK-21890
> URL: https://issues.apache.org/jira/browse/SPARK-21890
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Sanket Reddy
>
> I observed this while running a oozie job trying to connect to hbase via 
> spark.
> It look like the creds are not being passed in 
> thehttps://github.com/apache/spark/blob/branch-2.2/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala#L53
>  for 2.2 release.
> Stack trace:
> Warning: Skip remote jar 
> hdfs://axonitered-nn1.red.ygrid.yahoo.com:8020/user/schintap/spark_oozie/apps/lib/spark-starter-2.0-SNAPSHOT-jar-with-dependencies.jar.
> Failing Oozie Launcher, Main class 
> [org.apache.oozie.action.hadoop.SparkMain], main() threw exception, 
> Delegation Token can be issued only with kerberos or web authentication
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:5858)
>   at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:687)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:1003)
>   at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:448)
>   at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:999)
>   at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:881)
>   at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:810)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1936)
>   at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2523)
> org.apache.hadoop.ipc.RemoteException(java.io.IOException): Delegation Token 
> can be issued only with kerberos or web authentication
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:5858)
>   at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:687)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:1003)
>   at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:448)
>   at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:999)
>   at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:881)
>   at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:810)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1936)
>   at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2523)
>   at org.apache.hadoop.ipc.Client.call(Client.java:1471)
>   at org.apache.hadoop.ipc.Client.call(Client.java:1408)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
>   at com.sun.proxy.$Proxy10.getDelegationToken(Unknown Source)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getDelegationToken(ClientNamenodeProtocolTranslatorPB.java:933)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>   at com.sun.proxy.$Proxy11.getDelegationToken(Unknown Source)
>   at 
> org.apache.hadoop.hdfs.DFSClient.getDelegationToken(DFSClient.java:1038)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem.getDelegat

[jira] [Assigned] (SPARK-21890) ObtainCredentials does not pass creds to addDelegationTokens

2017-09-01 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-21890:


Assignee: Apache Spark

> ObtainCredentials does not pass creds to addDelegationTokens
> 
>
> Key: SPARK-21890
> URL: https://issues.apache.org/jira/browse/SPARK-21890
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Sanket Reddy
>Assignee: Apache Spark
>
> I observed this while running a oozie job trying to connect to hbase via 
> spark.
> It look like the creds are not being passed in 
> thehttps://github.com/apache/spark/blob/branch-2.2/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala#L53
>  for 2.2 release.
> Stack trace:
> Warning: Skip remote jar 
> hdfs://axonitered-nn1.red.ygrid.yahoo.com:8020/user/schintap/spark_oozie/apps/lib/spark-starter-2.0-SNAPSHOT-jar-with-dependencies.jar.
> Failing Oozie Launcher, Main class 
> [org.apache.oozie.action.hadoop.SparkMain], main() threw exception, 
> Delegation Token can be issued only with kerberos or web authentication
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:5858)
>   at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:687)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:1003)
>   at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:448)
>   at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:999)
>   at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:881)
>   at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:810)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1936)
>   at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2523)
> org.apache.hadoop.ipc.RemoteException(java.io.IOException): Delegation Token 
> can be issued only with kerberos or web authentication
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:5858)
>   at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:687)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:1003)
>   at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:448)
>   at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:999)
>   at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:881)
>   at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:810)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1936)
>   at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2523)
>   at org.apache.hadoop.ipc.Client.call(Client.java:1471)
>   at org.apache.hadoop.ipc.Client.call(Client.java:1408)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
>   at com.sun.proxy.$Proxy10.getDelegationToken(Unknown Source)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getDelegationToken(ClientNamenodeProtocolTranslatorPB.java:933)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>   at com.sun.proxy.$Proxy11.getDelegationToken(Unknown Source)
>   at 
> org.apache.hadoop.hdfs.DFSClient.getDelegationToken(DFSClient.java:1038)
>   at 
> org.apache.hadoop.hdfs.Distrib

[jira] [Commented] (SPARK-21890) ObtainCredentials does not pass creds to addDelegationTokens

2017-09-01 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16150589#comment-16150589
 ] 

Apache Spark commented on SPARK-21890:
--

User 'redsanket' has created a pull request for this issue:
https://github.com/apache/spark/pull/19103

> ObtainCredentials does not pass creds to addDelegationTokens
> 
>
> Key: SPARK-21890
> URL: https://issues.apache.org/jira/browse/SPARK-21890
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Sanket Reddy
>
> I observed this while running a oozie job trying to connect to hbase via 
> spark.
> It look like the creds are not being passed in 
> thehttps://github.com/apache/spark/blob/branch-2.2/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala#L53
>  for 2.2 release.
> Stack trace:
> Warning: Skip remote jar 
> hdfs://axonitered-nn1.red.ygrid.yahoo.com:8020/user/schintap/spark_oozie/apps/lib/spark-starter-2.0-SNAPSHOT-jar-with-dependencies.jar.
> Failing Oozie Launcher, Main class 
> [org.apache.oozie.action.hadoop.SparkMain], main() threw exception, 
> Delegation Token can be issued only with kerberos or web authentication
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:5858)
>   at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:687)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:1003)
>   at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:448)
>   at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:999)
>   at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:881)
>   at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:810)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1936)
>   at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2523)
> org.apache.hadoop.ipc.RemoteException(java.io.IOException): Delegation Token 
> can be issued only with kerberos or web authentication
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:5858)
>   at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:687)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:1003)
>   at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:448)
>   at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:999)
>   at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:881)
>   at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:810)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1936)
>   at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2523)
>   at org.apache.hadoop.ipc.Client.call(Client.java:1471)
>   at org.apache.hadoop.ipc.Client.call(Client.java:1408)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
>   at com.sun.proxy.$Proxy10.getDelegationToken(Unknown Source)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getDelegationToken(ClientNamenodeProtocolTranslatorPB.java:933)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>   at com.sun.proxy.$Proxy11.getDelegationToken(Unknown Source)
>   at 
> org.apache.hadoop.hdfs.DFS

[jira] [Commented] (SPARK-21727) Operating on an ArrayType in a SparkR DataFrame throws error

2017-09-01 Thread Yanbo Liang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16150657#comment-16150657
 ] 

Yanbo Liang commented on SPARK-21727:
-

I can run successfully with minor change:
{code}
indices <- 1:4
myDf <- data.frame(indices)
myDf$data <- list(as.list(rep(0, 20)))
mySparkDf <- as.DataFrame(myDf)
collect(mySparkDf)
{code}
This is because rep(0, 20) is not type of list, we should convert it to list 
explicitly.
{code}
> class(rep(0, 20))
[1] "numeric"
> class(as.list(rep(0, 20)))
[1] "list"
{code}

> Operating on an ArrayType in a SparkR DataFrame throws error
> 
>
> Key: SPARK-21727
> URL: https://issues.apache.org/jira/browse/SPARK-21727
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Affects Versions: 2.2.0
>Reporter: Neil McQuarrie
>
> Previously 
> [posted|https://stackoverflow.com/questions/45056973/sparkr-dataframe-with-r-lists-as-elements]
>  this as a stack overflow question but it seems to be a bug.
> If I have an R data.frame where one of the column data types is an integer 
> *list* -- i.e., each of the elements in the column embeds an entire R list of 
> integers -- then it seems I can convert this data.frame to a SparkR DataFrame 
> just fine... SparkR treats the column as ArrayType(Double). 
> However, any subsequent operation on this SparkR DataFrame appears to throw 
> an error.
> Create an example R data.frame:
> {code}
> indices <- 1:4
> myDf <- data.frame(indices)
> myDf$data <- list(rep(0, 20))}}
> {code}
> Examine it to make sure it looks okay:
> {code}
> > str(myDf) 
> 'data.frame':   4 obs. of  2 variables:  
>  $ indices: int  1 2 3 4  
>  $ data   :List of 4
>..$ : num  0 0 0 0 0 0 0 0 0 0 ...
>..$ : num  0 0 0 0 0 0 0 0 0 0 ...
>..$ : num  0 0 0 0 0 0 0 0 0 0 ...
>..$ : num  0 0 0 0 0 0 0 0 0 0 ...
> > head(myDf)   
>   indices   data 
> 1   1 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 
> 2   2 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 
> 3   3 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 
> 4   4 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
> {code}
> Convert it to a SparkR DataFrame:
> {code}
> library(SparkR, lib.loc=paste0(Sys.getenv("SPARK_HOME"),"/R/lib"))
> sparkR.session(master = "local[*]")
> mySparkDf <- as.DataFrame(myDf)
> {code}
> Examine the SparkR DataFrame schema; notice that the list column was 
> successfully converted to ArrayType:
> {code}
> > schema(mySparkDf)
> StructType
> |-name = "indices", type = "IntegerType", nullable = TRUE
> |-name = "data", type = "ArrayType(DoubleType,true)", nullable = TRUE
> {code}
> However, operating on the SparkR DataFrame throws an error:
> {code}
> > collect(mySparkDf)
> 17/07/13 17:23:00 ERROR executor.Executor: Exception in task 0.0 in stage 1.0 
> (TID 1)
> java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: 
> java.lang.Double is not a valid external type for schema of array
> if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null 
> else validateexternaltype(getexternalrowfield(assertnotnull(input[0, 
> org.apache.spark.sql.Row, true]), 0, indices), IntegerType) AS indices#0
> ... long stack trace ...
> {code}
> Using Spark 2.2.0, R 3.4.0, Java 1.8.0_131, Windows 10.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21890) ObtainCredentials does not pass creds to addDelegationTokens

2017-09-01 Thread Sanket Reddy (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sanket Reddy updated SPARK-21890:
-
Description: 
I observed this while running a oozie job trying to connect to hbase via spark.
It look like the creds are not being passed in 
thehttps://github.com/apache/spark/blob/branch-2.2/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala#L53
 for 2.2 release.

More Info as to why it fails on secure grid,
Oozie launches the mapreduce job with a designated TGT to retrieve the tokens 
from the Namenode.
In this case in order to talk to hbase it gets a hbase token.
After which the the spark client is launched by oozie launcher which talks to 
hbase via tokens.
In the current scenario it uses new creds to talk to hbase which will be 
missing the tokens that have already been acquired and hence we
see the following stack trace exception.

Stack trace:
Warning: Skip remote jar 
hdfs://axonitered-nn1.red.ygrid.yahoo.com:8020/user/schintap/spark_oozie/apps/lib/spark-starter-2.0-SNAPSHOT-jar-with-dependencies.jar.
Failing Oozie Launcher, Main class [org.apache.oozie.action.hadoop.SparkMain], 
main() threw exception, Delegation Token can be issued only with kerberos or 
web authentication
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:5858)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:687)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:1003)
at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:448)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:999)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:881)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:810)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1936)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2523)

org.apache.hadoop.ipc.RemoteException(java.io.IOException): Delegation Token 
can be issued only with kerberos or web authentication
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:5858)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:687)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:1003)
at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:448)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:999)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:881)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:810)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1936)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2523)

at org.apache.hadoop.ipc.Client.call(Client.java:1471)
at org.apache.hadoop.ipc.Client.call(Client.java:1408)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
at com.sun.proxy.$Proxy10.getDelegationToken(Unknown Source)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getDelegationToken(ClientNamenodeProtocolTranslatorPB.java:933)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy11.getDelegationToken(Unknown Source)
at 
org.apache.hadoop.hdfs.DFSClient.getDelegationToken(DFSClient.java:1038)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.getDelegation

[jira] [Updated] (SPARK-21890) ObtainCredentials does not pass creds to addDelegationTokens

2017-09-01 Thread Thomas Graves (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves updated SPARK-21890:
--
Description: 
I observed this while running a oozie job trying to connect to hbase via spark.
It look like the creds are not being passed in 
thehttps://github.com/apache/spark/blob/branch-2.2/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala#L53
 for 2.2 release.

More Info as to why it fails on secure grid:
Oozie client gets the necessary tokens the application needs before launching.  
It passes those tokens along to the oozie launcher job (MR job) which will then 
actually call the Spark client to launch the spark app and pass the tokens 
along.
The oozie launcher job cannot get anymore tokens because all it has is tokens ( 
you can't get tokens with tokens, you need tgt or keytab).  
The error here is because the launcher job runs the Spark Client to submit the 
spark job but the spark client doesn't see that it already has the hdfs tokens 
so it tries to get more, which ends with the exception.
There was  a change with SPARK-19021 to generalize the hdfs credentials 
provider that changed it so we don't pass the existing credentials into the 
call to get tokens so it doesn't realize it already has the necessary tokens.


Stack trace:
Warning: Skip remote jar 
hdfs://axonitered-nn1.red.ygrid.yahoo.com:8020/user/schintap/spark_oozie/apps/lib/spark-starter-2.0-SNAPSHOT-jar-with-dependencies.jar.
Failing Oozie Launcher, Main class [org.apache.oozie.action.hadoop.SparkMain], 
main() threw exception, Delegation Token can be issued only with kerberos or 
web authentication
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:5858)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:687)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:1003)
at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:448)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:999)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:881)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:810)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1936)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2523)

org.apache.hadoop.ipc.RemoteException(java.io.IOException): Delegation Token 
can be issued only with kerberos or web authentication
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:5858)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:687)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:1003)
at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:448)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:999)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:881)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:810)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1936)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2523)

at org.apache.hadoop.ipc.Client.call(Client.java:1471)
at org.apache.hadoop.ipc.Client.call(Client.java:1408)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
at com.sun.proxy.$Proxy10.getDelegationToken(Unknown Source)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getDelegationToken(ClientNamenodeProtocolTranslatorPB.java:933)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.hadoop.io.ret

[jira] [Commented] (SPARK-21890) ObtainCredentials does not pass creds to addDelegationTokens

2017-09-01 Thread Sanket Reddy (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16150762#comment-16150762
 ] 

Sanket Reddy commented on SPARK-21890:
--

Will put up a PR for master too thanks

> ObtainCredentials does not pass creds to addDelegationTokens
> 
>
> Key: SPARK-21890
> URL: https://issues.apache.org/jira/browse/SPARK-21890
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Sanket Reddy
>
> I observed this while running a oozie job trying to connect to hbase via 
> spark.
> It look like the creds are not being passed in 
> thehttps://github.com/apache/spark/blob/branch-2.2/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala#L53
>  for 2.2 release.
> More Info as to why it fails on secure grid:
> Oozie client gets the necessary tokens the application needs before 
> launching.  It passes those tokens along to the oozie launcher job (MR job) 
> which will then actually call the Spark client to launch the spark app and 
> pass the tokens along.
> The oozie launcher job cannot get anymore tokens because all it has is tokens 
> ( you can't get tokens with tokens, you need tgt or keytab).  
> The error here is because the launcher job runs the Spark Client to submit 
> the spark job but the spark client doesn't see that it already has the hdfs 
> tokens so it tries to get more, which ends with the exception.
> There was  a change with SPARK-19021 to generalize the hdfs credentials 
> provider that changed it so we don't pass the existing credentials into the 
> call to get tokens so it doesn't realize it already has the necessary tokens.
> Stack trace:
> Warning: Skip remote jar 
> hdfs://axonitered-nn1.red.ygrid.yahoo.com:8020/user/schintap/spark_oozie/apps/lib/spark-starter-2.0-SNAPSHOT-jar-with-dependencies.jar.
> Failing Oozie Launcher, Main class 
> [org.apache.oozie.action.hadoop.SparkMain], main() threw exception, 
> Delegation Token can be issued only with kerberos or web authentication
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:5858)
>   at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:687)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:1003)
>   at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:448)
>   at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:999)
>   at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:881)
>   at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:810)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1936)
>   at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2523)
> org.apache.hadoop.ipc.RemoteException(java.io.IOException): Delegation Token 
> can be issued only with kerberos or web authentication
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:5858)
>   at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:687)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:1003)
>   at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:448)
>   at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:999)
>   at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:881)
>   at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:810)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1936)
>   at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2523)
>   at org.apache.hadoop.ipc.Client.call(Client.java:1471)
>   at org.apache.hadoop.ipc.Client.call(Client.java:1408)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
>   at com.sun.proxy

[jira] [Created] (SPARK-21894) Some Netty errors do not propagate to the top level driver

2017-09-01 Thread Charles Allen (JIRA)
Charles Allen created SPARK-21894:
-

 Summary: Some Netty errors do not propagate to the top level driver
 Key: SPARK-21894
 URL: https://issues.apache.org/jira/browse/SPARK-21894
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.1.0
Reporter: Charles Allen


We have an environment with Netty 4.1 ( 
https://issues.apache.org/jira/browse/SPARK-19552 for some context) and the 
following error occurs. The reason THIS issue is being filed is because this 
error leaves the Spark workload in a bad state where it does not make any 
progress, and does not shut down.

The expected behavior is that the spark job would throw an exception that can 
be caught by the driving application.

{code}
017-09-01T16:13:32,175 ERROR [shuffle-server-3-2] 
org.apache.spark.network.server.TransportRequestHandler - Error sending result 
StreamResponse{streamId=/jars/lz4-1.3.0.jar, byteCount=236880, 
body=FileSegmentManagedBuffer{file=/Users/charlesallen/.m2/repository/net/jpountz/lz4/lz4/1.3.0/lz4-1.3.0.jar,
 offset=0, length=236880}} to /192.168.59.3:56703; closing connection
java.lang.AbstractMethodError
at io.netty.util.ReferenceCountUtil.touch(ReferenceCountUtil.java:73) 
~[netty-all-4.1.11.Final.jar:4.1.11.Final]
at 
io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:107) 
~[netty-all-4.1.11.Final.jar:4.1.11.Final]
at 
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:810)
 ~[netty-all-4.1.11.Final.jar:4.1.11.Final]
at 
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723)
 ~[netty-all-4.1.11.Final.jar:4.1.11.Final]
at 
io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:111)
 ~[netty-all-4.1.11.Final.jar:4.1.11.Final]
at 
io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
 ~[netty-all-4.1.11.Final.jar:4.1.11.Final]
at 
io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
 ~[netty-all-4.1.11.Final.jar:4.1.11.Final]
at 
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:816)
 ~[netty-all-4.1.11.Final.jar:4.1.11.Final]
at 
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723)
 ~[netty-all-4.1.11.Final.jar:4.1.11.Final]
at 
io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:305) 
~[netty-all-4.1.11.Final.jar:4.1.11.Final]
at 
io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
 ~[netty-all-4.1.11.Final.jar:4.1.11.Final]
at 
io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:801)
 ~[netty-all-4.1.11.Final.jar:4.1.11.Final]
at 
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:814)
 ~[netty-all-4.1.11.Final.jar:4.1.11.Final]
at 
io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:794)
 ~[netty-all-4.1.11.Final.jar:4.1.11.Final]
at 
io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:831)
 ~[netty-all-4.1.11.Final.jar:4.1.11.Final]
at 
io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1032)
 ~[netty-all-4.1.11.Final.jar:4.1.11.Final]
at 
io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:296) 
~[netty-all-4.1.11.Final.jar:4.1.11.Final]
at 
org.apache.spark.network.server.TransportRequestHandler.respond(TransportRequestHandler.java:194)
 [spark-network-common_2.11-2.1.0-mmx9.jar:2.1.0-mmx9]
at 
org.apache.spark.network.server.TransportRequestHandler.processStreamRequest(TransportRequestHandler.java:150)
 [spark-network-common_2.11-2.1.0-mmx9.jar:2.1.0-mmx9]
at 
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:111)
 [spark-network-common_2.11-2.1.0-mmx9.jar:2.1.0-mmx9]
at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:119)
 [spark-network-common_2.11-2.1.0-mmx9.jar:2.1.0-mmx9]
at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
 [spark-network-common_2.11-2.1.0-mmx9.jar:2.1.0-mmx9]
at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
 [netty-all-4.1.11.Final.jar:4.1.11.Final]
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
 [netty-all-4.1.11.Final.jar:4.1.11.Final]
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandler

[jira] [Commented] (SPARK-21770) ProbabilisticClassificationModel: Improve normalization of all-zero raw predictions

2017-09-01 Thread Joseph K. Bradley (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16150846#comment-16150846
 ] 

Joseph K. Bradley commented on SPARK-21770:
---

Linear models are the most likely to hit this case; if the algorithm has done 0 
iterations, then all coefficients will be 0.  But I agree it's just fixing a 
corner case which few people would ever hit.  OK to fix though IMO.

> ProbabilisticClassificationModel: Improve normalization of all-zero raw 
> predictions
> ---
>
> Key: SPARK-21770
> URL: https://issues.apache.org/jira/browse/SPARK-21770
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 2.3.0
>Reporter: Siddharth Murching
>Priority: Minor
>
> Given an n-element raw prediction vector of all-zeros, 
> ProbabilisticClassifierModel.normalizeToProbabilitiesInPlace() should output 
> a probability vector of all-equal 1/n entries



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-21728) Allow SparkSubmit to use logging

2017-09-01 Thread Marcelo Vanzin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marcelo Vanzin resolved SPARK-21728.

Resolution: Fixed

> Allow SparkSubmit to use logging
> 
>
> Key: SPARK-21728
> URL: https://issues.apache.org/jira/browse/SPARK-21728
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Marcelo Vanzin
>Assignee: Marcelo Vanzin
>Priority: Minor
> Fix For: 2.3.0
>
> Attachments: logging.patch, sparksubmit.patch
>
>
> Currently, code in {{SparkSubmit}} cannot call classes or methods that 
> initialize the Spark {{Logging}} framework. That is because at that time 
> {{SparkSubmit}} doesn't yet know which application will run, and logging is 
> initialized differently for certain special applications (notably, the 
> shells).
> It would be better if either {{SparkSubmit}} did logging initialization 
> earlier based on the application to be run, or did it in a way that could be 
> overridden later when the app initializes.
> Without this, there are currently a few parts of {{SparkSubmit}} that 
> duplicates code from other parts of Spark just to avoid logging. For example:
> * 
> [downloadFiles|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L860]
>  replicates code from Utils.scala
> * 
> [createTempDir|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala#L54]
>  replicates code from Utils.scala and installs its own shutdown hook
> * a few parts of the code could use {{SparkConf}} but can't right now because 
> of the logging issue.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-21880) [spark UI]In the SQL table page, modify jobs trace information

2017-09-01 Thread Shixiong Zhu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shixiong Zhu resolved SPARK-21880.
--
   Resolution: Fixed
 Assignee: he.qiao
Fix Version/s: 2.3.0

> [spark UI]In the SQL table page, modify jobs trace information
> --
>
> Key: SPARK-21880
> URL: https://issues.apache.org/jira/browse/SPARK-21880
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 2.2.0
>Reporter: he.qiao
>Assignee: he.qiao
>Priority: Minor
> Fix For: 2.3.0
>
>
>  I think it makes sense for "jobs" to change to "job id" in the SQL table 
> page. Because when job 5 fails, it's easy to misunderstand that five jobs 
> have failed.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21190) SPIP: Vectorized UDFs in Python

2017-09-01 Thread Bryan Cutler (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16150924#comment-16150924
 ] 

Bryan Cutler commented on SPARK-21190:
--

I'm good with the API summary proposed by [~ueshin], but I'm also not crazy 
about the 0-parameter UDF.  My only other idea would be to allow any 
`pandas_udf` to optionally define `**kwargs` as the last argument in the UDF.  
In the python worker, it would be easy to inspect the UDF to check if it 
accepts kwargs and then provide the `size` hint (could add other metadata 
also).  Of course isn't the most intuitive also, but at least would be 
consistent across all `pandas_udf`s.  So a 0-parameter example would look like:

{code}
@pandas_udf(LongType())
def f0(**kwargs):
return pd.Series(1).repeat(kwargs['size'])

df.select(f0())
{code}

It's not perfect since it would require the user to know that size is in the 
kwargs, but just thought I'd throw it out as an alternative.


> SPIP: Vectorized UDFs in Python
> ---
>
> Key: SPARK-21190
> URL: https://issues.apache.org/jira/browse/SPARK-21190
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark, SQL
>Affects Versions: 2.2.0
>Reporter: Reynold Xin
>Assignee: Reynold Xin
>  Labels: SPIP
> Attachments: SPIPVectorizedUDFsforPython (1).pdf
>
>
> *Background and Motivation*
> Python is one of the most popular programming languages among Spark users. 
> Spark currently exposes a row-at-a-time interface for defining and executing 
> user-defined functions (UDFs). This introduces high overhead in serialization 
> and deserialization, and also makes it difficult to leverage Python libraries 
> (e.g. numpy, Pandas) that are written in native code.
>  
> This proposal advocates introducing new APIs to support vectorized UDFs in 
> Python, in which a block of data is transferred over to Python in some 
> columnar format for execution.
>  
>  
> *Target Personas*
> Data scientists, data engineers, library developers.
>  
> *Goals*
> - Support vectorized UDFs that apply on chunks of the data frame
> - Low system overhead: Substantially reduce serialization and deserialization 
> overhead when compared with row-at-a-time interface
> - UDF performance: Enable users to leverage native libraries in Python (e.g. 
> numpy, Pandas) for data manipulation in these UDFs
>  
> *Non-Goals*
> The following are explicitly out of scope for the current SPIP, and should be 
> done in future SPIPs. Nonetheless, it would be good to consider these future 
> use cases during API design, so we can achieve some consistency when rolling 
> out new APIs.
>  
> - Define block oriented UDFs in other languages (that are not Python).
> - Define aggregate UDFs
> - Tight integration with machine learning frameworks
>  
> *Proposed API Changes*
> The following sketches some possibilities. I haven’t spent a lot of time 
> thinking about the API (wrote it down in 5 mins) and I am not attached to 
> this design at all. The main purpose of the SPIP is to get feedback on use 
> cases and see how they can impact API design.
>  
> A few things to consider are:
>  
> 1. Python is dynamically typed, whereas DataFrames/SQL requires static, 
> analysis time typing. This means users would need to specify the return type 
> of their UDFs.
>  
> 2. Ratio of input rows to output rows. We propose initially we require number 
> of output rows to be the same as the number of input rows. In the future, we 
> can consider relaxing this constraint with support for vectorized aggregate 
> UDFs.
> 3. How do we handle null values, since Pandas doesn't have the concept of 
> nulls?
>  
> Proposed API sketch (using examples):
>  
> Use case 1. A function that defines all the columns of a DataFrame (similar 
> to a “map” function):
>  
> {code}
> @spark_udf(some way to describe the return schema)
> def my_func_on_entire_df(input):
>   """ Some user-defined function.
>  
>   :param input: A Pandas DataFrame with two columns, a and b.
>   :return: :class: A Pandas data frame.
>   """
>   input[c] = input[a] + input[b]
>   Input[d] = input[a] - input[b]
>   return input
>  
> spark.range(1000).selectExpr("id a", "id / 2 b")
>   .mapBatches(my_func_on_entire_df)
> {code}
>  
> Use case 2. A function that defines only one column (similar to existing 
> UDFs):
>  
> {code}
> @spark_udf(some way to describe the return schema)
> def my_func_that_returns_one_column(input):
>   """ Some user-defined function.
>  
>   :param input: A Pandas DataFrame with two columns, a and b.
>   :return: :class: A numpy array
>   """
>   return input[a] + input[b]
>  
> my_func = udf(my_func_that_returns_one_column)
>  
> df = spark.range(1000).selectExpr("id a", "id / 2 b")
> df.withColumn("c", my_func(df.a, df.b))
> {code}
>  
>  
>  
> *Optional Desig

[jira] [Comment Edited] (SPARK-21190) SPIP: Vectorized UDFs in Python

2017-09-01 Thread Bryan Cutler (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16150924#comment-16150924
 ] 

Bryan Cutler edited comment on SPARK-21190 at 9/1/17 5:56 PM:
--

I'm good with the API summary proposed by [~ueshin], but I'm also not crazy 
about the 0-parameter UDF.  My only other idea would be to allow any 
{{pandas_udf}} to optionally define `**kwargs` as the last argument in the UDF. 
 In the python worker, it would be easy to inspect the UDF to check if it 
accepts kwargs and then provide the {{size}} hint (could add other metadata 
also).  Of course isn't the most intuitive also, but at least would be 
consistent across all {{pandas_udf}}s.  So a 0-parameter example would look 
like:

{code}
@pandas_udf(LongType())
def f0(**kwargs):
return pd.Series(1).repeat(kwargs['size'])

df.select(f0())
{code}

It's not perfect since it would require the user to know that size is in the 
kwargs, but just thought I'd throw it out as an alternative.



was (Author: bryanc):
I'm good with the API summary proposed by [~ueshin], but I'm also not crazy 
about the 0-parameter UDF.  My only other idea would be to allow any 
`pandas_udf` to optionally define `**kwargs` as the last argument in the UDF.  
In the python worker, it would be easy to inspect the UDF to check if it 
accepts kwargs and then provide the `size` hint (could add other metadata 
also).  Of course isn't the most intuitive also, but at least would be 
consistent across all `pandas_udf`s.  So a 0-parameter example would look like:

{code}
@pandas_udf(LongType())
def f0(**kwargs):
return pd.Series(1).repeat(kwargs['size'])

df.select(f0())
{code}

It's not perfect since it would require the user to know that size is in the 
kwargs, but just thought I'd throw it out as an alternative.


> SPIP: Vectorized UDFs in Python
> ---
>
> Key: SPARK-21190
> URL: https://issues.apache.org/jira/browse/SPARK-21190
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark, SQL
>Affects Versions: 2.2.0
>Reporter: Reynold Xin
>Assignee: Reynold Xin
>  Labels: SPIP
> Attachments: SPIPVectorizedUDFsforPython (1).pdf
>
>
> *Background and Motivation*
> Python is one of the most popular programming languages among Spark users. 
> Spark currently exposes a row-at-a-time interface for defining and executing 
> user-defined functions (UDFs). This introduces high overhead in serialization 
> and deserialization, and also makes it difficult to leverage Python libraries 
> (e.g. numpy, Pandas) that are written in native code.
>  
> This proposal advocates introducing new APIs to support vectorized UDFs in 
> Python, in which a block of data is transferred over to Python in some 
> columnar format for execution.
>  
>  
> *Target Personas*
> Data scientists, data engineers, library developers.
>  
> *Goals*
> - Support vectorized UDFs that apply on chunks of the data frame
> - Low system overhead: Substantially reduce serialization and deserialization 
> overhead when compared with row-at-a-time interface
> - UDF performance: Enable users to leverage native libraries in Python (e.g. 
> numpy, Pandas) for data manipulation in these UDFs
>  
> *Non-Goals*
> The following are explicitly out of scope for the current SPIP, and should be 
> done in future SPIPs. Nonetheless, it would be good to consider these future 
> use cases during API design, so we can achieve some consistency when rolling 
> out new APIs.
>  
> - Define block oriented UDFs in other languages (that are not Python).
> - Define aggregate UDFs
> - Tight integration with machine learning frameworks
>  
> *Proposed API Changes*
> The following sketches some possibilities. I haven’t spent a lot of time 
> thinking about the API (wrote it down in 5 mins) and I am not attached to 
> this design at all. The main purpose of the SPIP is to get feedback on use 
> cases and see how they can impact API design.
>  
> A few things to consider are:
>  
> 1. Python is dynamically typed, whereas DataFrames/SQL requires static, 
> analysis time typing. This means users would need to specify the return type 
> of their UDFs.
>  
> 2. Ratio of input rows to output rows. We propose initially we require number 
> of output rows to be the same as the number of input rows. In the future, we 
> can consider relaxing this constraint with support for vectorized aggregate 
> UDFs.
> 3. How do we handle null values, since Pandas doesn't have the concept of 
> nulls?
>  
> Proposed API sketch (using examples):
>  
> Use case 1. A function that defines all the columns of a DataFrame (similar 
> to a “map” function):
>  
> {code}
> @spark_udf(some way to describe the return schema)
> def my_func_on_entire_df(input):
>   """ Some user-defined function.
>  
>   :pa

[jira] [Commented] (SPARK-21617) ALTER TABLE...ADD COLUMNS broken in Hive 2.1 for DS tables

2017-09-01 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16150940#comment-16150940
 ] 

Apache Spark commented on SPARK-21617:
--

User 'vanzin' has created a pull request for this issue:
https://github.com/apache/spark/pull/18849

> ALTER TABLE...ADD COLUMNS broken in Hive 2.1 for DS tables
> --
>
> Key: SPARK-21617
> URL: https://issues.apache.org/jira/browse/SPARK-21617
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Marcelo Vanzin
>Assignee: Marcelo Vanzin
> Fix For: 2.2.1, 2.3.0
>
>
> When you have a data source table and you run a "ALTER TABLE...ADD COLUMNS" 
> query, Spark will save invalid metadata to the Hive metastore.
> Namely, it will overwrite the table's schema with the data frame's schema; 
> that is not desired for data source tables (where the schema is stored in a 
> table property instead).
> Moreover, if you use a newer metastore client where 
> METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES is on by default, you 
> actually get an exception:
> {noformat}
> InvalidOperationException(message:The following columns have types 
> incompatible with the existing columns in their respective positions :
> c1)
>   at 
> org.apache.hadoop.hive.metastore.MetaStoreUtils.throwExceptionIfIncompatibleColTypeChange(MetaStoreUtils.java:615)
>   at 
> org.apache.hadoop.hive.metastore.HiveAlterHandler.alterTable(HiveAlterHandler.java:133)
>   at 
> org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.alter_table_core(HiveMetaStore.java:3704)
>   at 
> org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.alter_table_with_environment_context(HiveMetaStore.java:3675)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.hadoop.hive.metastore.RetryingHMSHandler.invokeInternal(RetryingHMSHandler.java:140)
>   at 
> org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:99)
>   at com.sun.proxy.$Proxy26.alter_table_with_environment_context(Unknown 
> Source)
>   at 
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.alter_table_with_environmentContext(HiveMetaStoreClient.java:402)
>   at 
> org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient.alter_table_with_environmentContext(SessionHiveMetaStoreClient.java:309)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:154)
>   at com.sun.proxy.$Proxy27.alter_table_with_environmentContext(Unknown 
> Source)
>   at org.apache.hadoop.hive.ql.metadata.Hive.alterTable(Hive.java:601)
> {noformat}
> That exception is handled by Spark in an odd way (see code in 
> {{HiveExternalCatalog.scala}}) which still stores invalid metadata.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17742) Spark Launcher does not get failed state in Listener

2017-09-01 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17742?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16150941#comment-16150941
 ] 

Apache Spark commented on SPARK-17742:
--

User 'vanzin' has created a pull request for this issue:
https://github.com/apache/spark/pull/19012

> Spark Launcher does not get failed state in Listener 
> -
>
> Key: SPARK-17742
> URL: https://issues.apache.org/jira/browse/SPARK-17742
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Submit
>Affects Versions: 2.0.0
>Reporter: Aseem Bansal
>Assignee: Marcelo Vanzin
> Fix For: 2.3.0
>
>
> I tried to launch an application using the below code. This is dummy code to 
> reproduce the problem. I tried exiting spark with status -1, throwing an 
> exception etc. but in no case did the listener give me failed status. But if 
> a spark job returns -1 or throws an exception from the main method it should 
> be considered as a failure. 
> {code}
> package com.example;
> import org.apache.spark.launcher.SparkAppHandle;
> import org.apache.spark.launcher.SparkLauncher;
> import java.io.IOException;
> public class Main2 {
> public static void main(String[] args) throws IOException, 
> InterruptedException {
> SparkLauncher launcher = new SparkLauncher()
> .setSparkHome("/opt/spark2")
> 
> .setAppResource("/home/aseem/projects/testsparkjob/build/libs/testsparkjob-1.0-SNAPSHOT.jar")
> .setMainClass("com.example.Main")
> .setMaster("local[2]");
> launcher.startApplication(new MyListener());
> Thread.sleep(1000 * 60);
> }
> }
> class MyListener implements SparkAppHandle.Listener {
> @Override
> public void stateChanged(SparkAppHandle handle) {
> System.out.println("state changed " + handle.getState());
> }
> @Override
> public void infoChanged(SparkAppHandle handle) {
> System.out.println("info changed " + handle.getState());
> }
> }
> {code}
> The spark job is 
> {code}
> package com.example;
> import org.apache.spark.sql.SparkSession;
> import java.io.IOException;
> public class Main {
> public static void main(String[] args) throws IOException {
> SparkSession sparkSession = SparkSession
> .builder()
> .appName("" + System.currentTimeMillis())
> .getOrCreate();
> try {
> for (int i = 0; i < 15; i++) {
> Thread.sleep(1000);
> System.out.println("sleeping 1");
> }
> } catch (InterruptedException e) {
> e.printStackTrace();
> }
> //sparkSession.stop();
> System.exit(-1);
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21858) Make Spark grouping_id() compatible with Hive grouping__id

2017-09-01 Thread Dongjoon Hyun (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16150955#comment-16150955
 ] 

Dongjoon Hyun commented on SPARK-21858:
---

Hi, [~_Yann_].
Thank you for investigating this and nice descriptions.
It seems there is a related issue, HIVE-12833, about this. I'm wondering about 
how you think about that.

> Make Spark grouping_id() compatible with Hive grouping__id
> --
>
> Key: SPARK-21858
> URL: https://issues.apache.org/jira/browse/SPARK-21858
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Yann Byron
>
> If you want to migrate some ETLs using `grouping__id` in Hive to Spark and 
> use Spark `grouping_id()` instead of Hive `grouping__id`, you will find 
> difference between their evaluations.
> Here is an example.
> {code:java}
> select A, B, grouping__id/grouping_id() from t group by A, B grouping 
> sets((), (A), (B), (A,B))
> {code}
> Running it on Hive and Spark separately, you'll find this: (the selected 
> attribute in selected grouping set is represented by (/) and  otherwise by 
> (x))
> ||A B||Binary Expression in Spark||Spark||Hive||Binary Expression in Hive||B 
> A||
> |(x) (x)|11|3|0|00|(x) (x)|
> |(x) (/)|10|2|2|10|(/) (x)|
> |(/) (x)|01|1|1|01|(x) (/)|
> |(/) (/)|00|0|3|11|(/) (/)|
> As shown above,In Hive, (/) set to 0, (x) set to 1, and in Spark it's 
> opposite.
> Moreover, attributes in `group by` will reverse firstly in Hive. In Spark 
> it'll be evaluated directly.
> In my opinion, I suggest that modifying the behavior of `grouping_id()` make 
> it compatible with Hive `grouping__id`.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21858) Make Spark grouping_id() compatible with Hive grouping__id

2017-09-01 Thread Dongjoon Hyun (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16150958#comment-16150958
 ] 

Dongjoon Hyun commented on SPARK-21858:
---

I'm adding SPARK-21055, too. IIUC, SPARK-21055 is implementing the syntax and 
this issue is suggesting the semantics.

> Make Spark grouping_id() compatible with Hive grouping__id
> --
>
> Key: SPARK-21858
> URL: https://issues.apache.org/jira/browse/SPARK-21858
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Yann Byron
>
> If you want to migrate some ETLs using `grouping__id` in Hive to Spark and 
> use Spark `grouping_id()` instead of Hive `grouping__id`, you will find 
> difference between their evaluations.
> Here is an example.
> {code:java}
> select A, B, grouping__id/grouping_id() from t group by A, B grouping 
> sets((), (A), (B), (A,B))
> {code}
> Running it on Hive and Spark separately, you'll find this: (the selected 
> attribute in selected grouping set is represented by (/) and  otherwise by 
> (x))
> ||A B||Binary Expression in Spark||Spark||Hive||Binary Expression in Hive||B 
> A||
> |(x) (x)|11|3|0|00|(x) (x)|
> |(x) (/)|10|2|2|10|(/) (x)|
> |(/) (x)|01|1|1|01|(x) (/)|
> |(/) (/)|00|0|3|11|(/) (/)|
> As shown above,In Hive, (/) set to 0, (x) set to 1, and in Spark it's 
> opposite.
> Moreover, attributes in `group by` will reverse firstly in Hive. In Spark 
> it'll be evaluated directly.
> In my opinion, I suggest that modifying the behavior of `grouping_id()` make 
> it compatible with Hive `grouping__id`.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-14280) Update change-version.sh and pom.xml to add Scala 2.12 profiles

2017-09-01 Thread Sean Owen (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-14280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen resolved SPARK-14280.
---
   Resolution: Fixed
Fix Version/s: 2.3.0

Issue resolved by pull request 18645
[https://github.com/apache/spark/pull/18645]

> Update change-version.sh and pom.xml to add Scala 2.12 profiles
> ---
>
> Key: SPARK-14280
> URL: https://issues.apache.org/jira/browse/SPARK-14280
> Project: Spark
>  Issue Type: Sub-task
>  Components: Build, Project Infra
>Reporter: Josh Rosen
>Assignee: Sean Owen
> Fix For: 2.3.0
>
>
> The following instructions will be kept quasi-up-to-date and are the best 
> starting point for building a Spark snapshot with Scala 2.12.0-M4:
> * Check out https://github.com/JoshRosen/spark/tree/build-for-2.12.
> * Install dependencies:
> ** chill: check out https://github.com/twitter/chill/pull/253 and run 
> {{sbt ++2.12.0-M4 publishLocal}}
> * Run {{./dev/change-scala-version.sh 2.12.0-M4}}
> * To compile Spark, run {{build/sbt -Dscala-2.12}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-18085) SPIP: Better History Server scalability for many / large applications

2017-09-01 Thread Marcelo Vanzin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16148093#comment-16148093
 ] 

Marcelo Vanzin edited comment on SPARK-18085 at 9/1/17 6:37 PM:


[~jincheng] do you have code that can reproduce that? The code in the exception 
hasn't really changed, so this is probably an artifact of how the new code is 
recording data from the application. From your description (when we clicking 
stages with no tasks successful) I haven't been able to reproduce this; a stage 
that has only failed tasks still renders fine with the code in my branch.


was (Author: vanzin):
[~jincheng] do you have code that can reproduce that? The code in the exception 
hasn't really changed, so this is probably an artifact of how the new code is 
recording data from the application. For your description (when we clicking 
stages with no tasks successful) I haven't been able to reproduce this; a stage 
that has only failed tasks still renders fine with the code in my branch.

> SPIP: Better History Server scalability for many / large applications
> -
>
> Key: SPARK-18085
> URL: https://issues.apache.org/jira/browse/SPARK-18085
> Project: Spark
>  Issue Type: Umbrella
>  Components: Spark Core, Web UI
>Affects Versions: 2.0.0
>Reporter: Marcelo Vanzin
>  Labels: SPIP
> Attachments: spark_hs_next_gen.pdf
>
>
> It's a known fact that the History Server currently has some annoying issues 
> when serving lots of applications, and when serving large applications.
> I'm filing this umbrella to track work related to addressing those issues. 
> I'll be attaching a document shortly describing the issues and suggesting a 
> path to how to solve them.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-21895) Support changing database in HiveClient

2017-09-01 Thread Xiao Li (JIRA)
Xiao Li created SPARK-21895:
---

 Summary: Support changing database in HiveClient
 Key: SPARK-21895
 URL: https://issues.apache.org/jira/browse/SPARK-21895
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 2.3.0
Reporter: Xiao Li
Assignee: Xiao Li






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21895) Support changing database in HiveClient

2017-09-01 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16151006#comment-16151006
 ] 

Apache Spark commented on SPARK-21895:
--

User 'gatorsmile' has created a pull request for this issue:
https://github.com/apache/spark/pull/19104

> Support changing database in HiveClient
> ---
>
> Key: SPARK-21895
> URL: https://issues.apache.org/jira/browse/SPARK-21895
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Xiao Li
>Assignee: Xiao Li
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-21895) Support changing database in HiveClient

2017-09-01 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21895?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-21895:


Assignee: Apache Spark  (was: Xiao Li)

> Support changing database in HiveClient
> ---
>
> Key: SPARK-21895
> URL: https://issues.apache.org/jira/browse/SPARK-21895
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Xiao Li
>Assignee: Apache Spark
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-21895) Support changing database in HiveClient

2017-09-01 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21895?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-21895:


Assignee: Xiao Li  (was: Apache Spark)

> Support changing database in HiveClient
> ---
>
> Key: SPARK-21895
> URL: https://issues.apache.org/jira/browse/SPARK-21895
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Xiao Li
>Assignee: Xiao Li
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-21864) Spark 2.0.1 - SaveMode.Overwrite does not work while saving data to memsql

2017-09-01 Thread Sean Owen (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen resolved SPARK-21864.
---
Resolution: Not A Problem

> Spark 2.0.1 - SaveMode.Overwrite does not work while saving data to memsql
> --
>
> Key: SPARK-21864
> URL: https://issues.apache.org/jira/browse/SPARK-21864
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.1
>Reporter: Vidya
>
> We are writing dataset and dataframes to Memsql via memsql connector but the 
> SaveMode.Overwrite does not work. Basically its appending the data to the 
> table.
> {code:java}
> schemaEthnicities.write.mode(SaveMode.Overwrite).format("com.memsql.spark.connector").saveAsTable("CD_ETHNICITY_SPARK")
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-21895) Support changing database in HiveClient

2017-09-01 Thread Xiao Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21895?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li resolved SPARK-21895.
-
   Resolution: Fixed
 Assignee: Xiao Li  (was: Apache Spark)
Fix Version/s: 2.3.0

> Support changing database in HiveClient
> ---
>
> Key: SPARK-21895
> URL: https://issues.apache.org/jira/browse/SPARK-21895
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Xiao Li
>Assignee: Xiao Li
> Fix For: 2.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-21895) Support changing database in HiveClient

2017-09-01 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21895?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-21895:


Assignee: Apache Spark  (was: Xiao Li)

> Support changing database in HiveClient
> ---
>
> Key: SPARK-21895
> URL: https://issues.apache.org/jira/browse/SPARK-21895
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Xiao Li
>Assignee: Apache Spark
> Fix For: 2.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21888) Cannot add stuff to Client Classpath for Yarn Cluster Mode

2017-09-01 Thread Marco Gaido (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16151128#comment-16151128
 ] 

Marco Gaido commented on SPARK-21888:
-

It is enough to add {{hbase-site.xml}} using {{--files}} in cluster mode to 
have it interpreted. The problem is in client mode: in this case it should be 
added to the Spark conf dir to be added to the classpath.

> Cannot add stuff to Client Classpath for Yarn Cluster Mode
> --
>
> Key: SPARK-21888
> URL: https://issues.apache.org/jira/browse/SPARK-21888
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Parth Gandhi
>Priority: Minor
>
> While running Spark on Yarn in cluster mode, currently there is no way to add 
> any config files, jars etc. to Client classpath. An example for this is that 
> suppose you want to run an application that uses hbase. Then, unless and 
> until we do not copy the necessary config files required by hbase to Spark 
> Config folder, we cannot specify or set their exact locations in classpath on 
> Client end which we could do so earlier by setting the environment variable 
> "SPARK_CLASSPATH".



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21477) Mark LocalTableScanExec's input data transient

2017-09-01 Thread Xiao Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li updated SPARK-21477:

Fix Version/s: 2.2.1

> Mark LocalTableScanExec's input data transient
> --
>
> Key: SPARK-21477
> URL: https://issues.apache.org/jira/browse/SPARK-21477
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Xiao Li
>Assignee: Xiao Li
> Fix For: 2.2.1, 2.3.0
>
>
> Mark the parameter rows and unsafeRow of LocalTableScanExec transient. It can 
> avoid serializing the unneeded objects.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20761) Union uses column order rather than schema

2017-09-01 Thread Munesh Bandaru (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16151181#comment-16151181
 ] 

Munesh Bandaru commented on SPARK-20761:


As the ticket was closed as 'Not a Problem', a workaround is to use the 
'select' to change the order of the columns of one of the dataframe.
But if we have a large number of columns, it doesn't look good to specify all 
the columns.

So we can use the columns of one dataframe to arrange the other dataframe in 
the order as below.

comb_df = df1.unionAll(df2.select(df1.columns))

> Union uses column order rather than schema
> --
>
> Key: SPARK-20761
> URL: https://issues.apache.org/jira/browse/SPARK-20761
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.1
>Reporter: Nakul Jeirath
>Priority: Minor
>
> I believe there is an issue when using union to combine two dataframes when 
> the order of columns differ between the left and right side of the union:
> {code}
> import org.apache.spark.sql.{Row, SparkSession}
> import org.apache.spark.sql.types.{BooleanType, StringType, StructField, 
> StructType}
> val schema = StructType(Seq(
>   StructField("id", StringType, false),
>   StructField("flag_one", BooleanType, false),
>   StructField("flag_two", BooleanType, false),
>   StructField("flag_three", BooleanType, false)
> ))
> val rowRdd = spark.sparkContext.parallelize(Seq(
>   Row("1", true, false, false),
>   Row("2", false, true, false),
>   Row("3", false, false, true)
> ))
> spark.createDataFrame(rowRdd, schema).createOrReplaceTempView("temp_flags")
> val emptyData = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], 
> schema)
> //Select columns out of order with respect to the emptyData schema
> val data = emptyData.union(spark.sql("select id, flag_two, flag_three, 
> flag_one from temp_flags"))
> {code}
> Selecting the data from the "temp_flags" table results in:
> {noformat}
> spark.sql("select * from temp_flags").show
> +---+++--+
> | id|flag_one|flag_two|flag_three|
> +---+++--+
> |  1|true|   false| false|
> |  2|   false|true| false|
> |  3|   false|   false|  true|
> +---+++--+
> {noformat}
> Which is the data we'd expect but when inspecting "data" we get:
> {noformat}
> data.show()
> +---+++--+
> | id|flag_one|flag_two|flag_three|
> +---+++--+
> |  1|   false|   false|  true|
> |  2|true|   false| false|
> |  3|   false|true| false|
> +---+++--+
> {noformat}
> Having a non-empty dataframe on the left side of the union doesn't seem to 
> make a difference either:
> {noformat}
> spark.sql("select * from temp_flags").union(spark.sql("select id, flag_two, 
> flag_three, flag_one from temp_flags")).show
> +---+++--+
> | id|flag_one|flag_two|flag_three|
> +---+++--+
> |  1|true|   false| false|
> |  2|   false|true| false|
> |  3|   false|   false|  true|
> |  1|   false|   false|  true|
> |  2|true|   false| false|
> |  3|   false|true| false|
> +---+++--+
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-20761) Union uses column order rather than schema

2017-09-01 Thread Munesh Bandaru (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16151181#comment-16151181
 ] 

Munesh Bandaru edited comment on SPARK-20761 at 9/1/17 9:31 PM:


As the ticket was closed as 'Not a Problem', a workaround is to use the 
'select' to change the order of the columns of one of the dataframe.
But if we have a large number of columns, it doesn't look good to specify all 
the columns.

So we can use the columns of one dataframe to arrange the other dataframe in 
the order as below.

 {{comb_df = df1.unionAll(df2.select(df1.columns))}}


was (Author: munesh):
As the ticket was closed as 'Not a Problem', a workaround is to use the 
'select' to change the order of the columns of one of the dataframe.
But if we have a large number of columns, it doesn't look good to specify all 
the columns.

So we can use the columns of one dataframe to arrange the other dataframe in 
the order as below.

comb_df = df1.unionAll(df2.select(df1.columns))

> Union uses column order rather than schema
> --
>
> Key: SPARK-20761
> URL: https://issues.apache.org/jira/browse/SPARK-20761
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.1
>Reporter: Nakul Jeirath
>Priority: Minor
>
> I believe there is an issue when using union to combine two dataframes when 
> the order of columns differ between the left and right side of the union:
> {code}
> import org.apache.spark.sql.{Row, SparkSession}
> import org.apache.spark.sql.types.{BooleanType, StringType, StructField, 
> StructType}
> val schema = StructType(Seq(
>   StructField("id", StringType, false),
>   StructField("flag_one", BooleanType, false),
>   StructField("flag_two", BooleanType, false),
>   StructField("flag_three", BooleanType, false)
> ))
> val rowRdd = spark.sparkContext.parallelize(Seq(
>   Row("1", true, false, false),
>   Row("2", false, true, false),
>   Row("3", false, false, true)
> ))
> spark.createDataFrame(rowRdd, schema).createOrReplaceTempView("temp_flags")
> val emptyData = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], 
> schema)
> //Select columns out of order with respect to the emptyData schema
> val data = emptyData.union(spark.sql("select id, flag_two, flag_three, 
> flag_one from temp_flags"))
> {code}
> Selecting the data from the "temp_flags" table results in:
> {noformat}
> spark.sql("select * from temp_flags").show
> +---+++--+
> | id|flag_one|flag_two|flag_three|
> +---+++--+
> |  1|true|   false| false|
> |  2|   false|true| false|
> |  3|   false|   false|  true|
> +---+++--+
> {noformat}
> Which is the data we'd expect but when inspecting "data" we get:
> {noformat}
> data.show()
> +---+++--+
> | id|flag_one|flag_two|flag_three|
> +---+++--+
> |  1|   false|   false|  true|
> |  2|true|   false| false|
> |  3|   false|true| false|
> +---+++--+
> {noformat}
> Having a non-empty dataframe on the left side of the union doesn't seem to 
> make a difference either:
> {noformat}
> spark.sql("select * from temp_flags").union(spark.sql("select id, flag_two, 
> flag_three, flag_one from temp_flags")).show
> +---+++--+
> | id|flag_one|flag_two|flag_three|
> +---+++--+
> |  1|true|   false| false|
> |  2|   false|true| false|
> |  3|   false|   false|  true|
> |  1|   false|   false|  true|
> |  2|true|   false| false|
> |  3|   false|true| false|
> +---+++--+
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15918) unionAll returns wrong result when two dataframes has schema in different order

2017-09-01 Thread Munesh Bandaru (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16151185#comment-16151185
 ] 

Munesh Bandaru commented on SPARK-15918:


As the ticket was closed as 'Not a Problem', a workaround is to use the 
'select' to change the order of the columns of one of the dataframe.
But if we have a large number of columns, it doesn't look good to specify all 
the columns.
So we can use the columns of one dataframe to arrange the other dataframe in 
the order as below.
{{comb_df = df1.unionAll(df2.select(df1.columns))}}

> unionAll returns wrong result when two dataframes has schema in different 
> order
> ---
>
> Key: SPARK-15918
> URL: https://issues.apache.org/jira/browse/SPARK-15918
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.1
> Environment: CentOS
>Reporter: Prabhu Joseph
>
> On applying unionAll operation between A and B dataframes, they both has same 
> schema but in different order and hence the result has column value mapping 
> changed.
> Repro:
> {code}
> A.show()
> +---++---+--+--+-++---+--+---+---+-+
> |tag|year_day|tm_hour|tm_min|tm_sec|dtype|time|tm_mday|tm_mon|tm_yday|tm_year|value|
> +---++---+--+--+-++---+--+---+---+-+
> +---++---+--+--+-++---+--+---+---+-+
> B.show()
> +-+---+--+---+---+--+--+--+---+---+--++
> |dtype|tag|  
> time|tm_hour|tm_mday|tm_min|tm_mon|tm_sec|tm_yday|tm_year| value|year_day|
> +-+---+--+---+---+--+--+--+---+---+--++
> |F|C_FNHXUT701Z.CNSTLO|1443790800| 13|  2| 0|10| 0|   
>  275|   2015|1.2345| 2015275|
> |F|C_FNHXUDP713.CNSTHI|1443790800| 13|  2| 0|10| 0|   
>  275|   2015|1.2345| 2015275|
> |F| C_FNHXUT718.CNSTHI|1443790800| 13|  2| 0|10| 0|   
>  275|   2015|1.2345| 2015275|
> |F|C_FNHXUT703Z.CNSTLO|1443790800| 13|  2| 0|10| 0|   
>  275|   2015|1.2345| 2015275|
> |F|C_FNHXUR716A.CNSTLO|1443790800| 13|  2| 0|10| 0|   
>  275|   2015|1.2345| 2015275|
> |F|C_FNHXUT803Z.CNSTHI|1443790800| 13|  2| 0|10| 0|   
>  275|   2015|1.2345| 2015275|
> |F| C_FNHXUT728.CNSTHI|1443790800| 13|  2| 0|10| 0|   
>  275|   2015|1.2345| 2015275|
> |F| C_FNHXUR806.CNSTHI|1443790800| 13|  2| 0|10| 0|   
>  275|   2015|1.2345| 2015275|
> +-+---+--+---+---+--+--+--+---+---+--++
> A = A.unionAll(B)
> A.show()
> +---+---+--+--+--+-++---+--+---+---+-+
> |tag|   year_day|   
> tm_hour|tm_min|tm_sec|dtype|time|tm_mday|tm_mon|tm_yday|tm_year|value|
> +---+---+--+--+--+-++---+--+---+---+-+
> |  F|C_FNHXUT701Z.CNSTLO|1443790800|13| 2|0|  10|  0|   275|  
>  2015| 1.2345|2015275.0|
> |  F|C_FNHXUDP713.CNSTHI|1443790800|13| 2|0|  10|  0|   275|  
>  2015| 1.2345|2015275.0|
> |  F| C_FNHXUT718.CNSTHI|1443790800|13| 2|0|  10|  0|   275|  
>  2015| 1.2345|2015275.0|
> |  F|C_FNHXUT703Z.CNSTLO|1443790800|13| 2|0|  10|  0|   275|  
>  2015| 1.2345|2015275.0|
> |  F|C_FNHXUR716A.CNSTLO|1443790800|13| 2|0|  10|  0|   275|  
>  2015| 1.2345|2015275.0|
> |  F|C_FNHXUT803Z.CNSTHI|1443790800|13| 2|0|  10|  0|   275|  
>  2015| 1.2345|2015275.0|
> |  F| C_FNHXUT728.CNSTHI|1443790800|13| 2|0|  10|  0|   275|  
>  2015| 1.2345|2015275.0|
> |  F| C_FNHXUR806.CNSTHI|1443790800|13| 2|0|  10|  0|   275|  
>  2015| 1.2345|2015275.0|
> +---+---+--+--+--+-++---+--+---+---+-+
> {code}
> On changing the schema of A according to B and doing unionAll works fine
> {code}
> C = 
> A.select("dtype","tag","time","tm_hour","tm_mday","tm_min",”tm_mon”,"tm_sec","tm_yday","tm_year","value","year_day")
> A = C.unionAll(B)
> A.show()
> +-+---+--+---+---+--+--+--+---+---+--++
> |dtype|tag|  
> time|tm_hour|tm_mday|tm_min|tm_mon|tm_sec|tm_yday|tm_year| value|year_day|
> +-+---+--+---+---+--+--+--+---+---+--++
> |F|C_FNHXUT701Z.CNSTLO|1443790800| 13|  2| 0|10| 0|   
>  275|  

[jira] [Created] (SPARK-21896) Stack Overflow when window function nested inside aggregate function

2017-09-01 Thread Luyao Yang (JIRA)
Luyao Yang created SPARK-21896:
--

 Summary: Stack Overflow when window function nested inside 
aggregate function
 Key: SPARK-21896
 URL: https://issues.apache.org/jira/browse/SPARK-21896
 Project: Spark
  Issue Type: Bug
  Components: PySpark, SQL
Affects Versions: 2.2.0, 2.1.0
Reporter: Luyao Yang
Priority: Minor


A minimal example: with the following simple test data

{noformat}
>>> df = spark.createDataFrame([(1, 2), (1, 3), (2, 4)], ['a', 'b'])
>>> df.show()
+---+---+
|  a|  b|
+---+---+
|  1|  2|
|  1|  3|
|  2|  4|
+---+---+
{noformat}

This works: 

{noformat}
>>> w = Window().orderBy('b')
>>> result = (df.select(F.rank().over(w).alias('rk'))
....groupby()
....agg(F.max('rk'))
...  )
>>> result.show()
+---+
|max(rk)|
+---+
|  3|
+---+
{noformat}

But this equivalent gives an error. Note that the error is thrown right when 
the operation is defined, not when an action is called later:

{noformat}
>>> result = (df.groupby()
....agg(F.max(F.rank().over(w)))
...  )

Traceback (most recent call last):
  File 
"/usr/local/lib/python2.7/site-packages/IPython/core/interactiveshell.py", line 
2885, in run_code
exec(code_obj, self.user_global_ns, self.user_ns)
  File "", line 2, in 
.agg(F.max(F.rank().over(w)))
  File "/usr/lib/spark/python/pyspark/sql/group.py", line 91, in agg
_to_seq(self.sql_ctx._sc, [c._jc for c in exprs[1:]]))
  File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", 
line 1133, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
  File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 
319, in get_return_value
format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o10789.agg.
: java.lang.StackOverflowError
at 
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:55)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:400)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:381)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:277)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$$anonfun$71.apply(Analyzer.scala:1688)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$$anonfun$71.apply(Analyzer.scala:1724)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$.org$apache$spark$sql$catalyst$analysis$Analyzer$ExtractWindowExpressions$$extract(Analyzer.scala:1687)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$$anonfun$apply$26.applyOrElse(Analyzer.scala:1825)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$$anonfun$apply$26.applyOrElse(Analyzer.scala:1800)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:287)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:331)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$2.apply(TreeNode.scala:295)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$2.apply(TreeNode.scala:295)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:331)
 

[jira] [Commented] (SPARK-14864) [MLLIB] Implement Doc2Vec

2017-09-01 Thread Li Ping Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16151229#comment-16151229
 ] 

Li Ping Zhang commented on SPARK-14864:
---

Agree. I think it would extend spark user a lot if doc2vec is implemented in 
spark, and we do have several real use scenarios about running doc2vec with 
large scale data.

> [MLLIB] Implement Doc2Vec
> -
>
> Key: SPARK-14864
> URL: https://issues.apache.org/jira/browse/SPARK-14864
> Project: Spark
>  Issue Type: New Feature
>  Components: MLlib
>Reporter: Peter Mountanos
>Priority: Minor
>
> It would be useful to implement Doc2Vec, as described in the paper 
> [Distributed Representations of Sentences and 
> Documents|https://cs.stanford.edu/~quocle/paragraph_vector.pdf]. Gensim has 
> an implementation [Deep learning with 
> paragraph2vec|https://radimrehurek.com/gensim/models/doc2vec.html]. 
> Le & Mikolov show that when aggregating Word2Vec vector representations for a 
> paragraph/document, it does not perform well for prediction tasks. Instead, 
> they propose the Paragraph Vector implementation, which provides 
> state-of-the-art results on several text classification and sentiment 
> analysis tasks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Issue Comment Deleted] (SPARK-21729) Generic test for ProbabilisticClassifier to ensure consistent output columns

2017-09-01 Thread Joseph K. Bradley (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joseph K. Bradley updated SPARK-21729:
--
Comment: was deleted

(was: User 'WeichenXu123' has created a pull request for this issue:
https://github.com/apache/spark/pull/19065)

> Generic test for ProbabilisticClassifier to ensure consistent output columns
> 
>
> Key: SPARK-21729
> URL: https://issues.apache.org/jira/browse/SPARK-21729
> Project: Spark
>  Issue Type: Test
>  Components: ML
>Affects Versions: 2.2.0
>Reporter: Joseph K. Bradley
>
> One challenge with the ProbabilisticClassifier abstraction is that it 
> introduces different code paths for predictions depending on which output 
> columns are turned on or off: probability, rawPrediction, prediction.  We ran 
> into a bug in MLOR with this.
> This task is for adding a generic test usable in all test suites for 
> ProbabilisticClassifier types which does the following:
> * Take a dataset + Estimator
> * Fit the Estimator
> * Test prediction using the model with all combinations of output columns 
> turned on/off.
> * Make sure the output column values match, presumably by comparing vs. the 
> case with all 3 output columns turned on
> CC [~WeichenXu123] since this came up in 
> https://github.com/apache/spark/pull/17373



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-21729) Generic test for ProbabilisticClassifier to ensure consistent output columns

2017-09-01 Thread Joseph K. Bradley (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joseph K. Bradley reassigned SPARK-21729:
-

Assignee: Weichen Xu

> Generic test for ProbabilisticClassifier to ensure consistent output columns
> 
>
> Key: SPARK-21729
> URL: https://issues.apache.org/jira/browse/SPARK-21729
> Project: Spark
>  Issue Type: Test
>  Components: ML
>Affects Versions: 2.2.0
>Reporter: Joseph K. Bradley
>Assignee: Weichen Xu
>
> One challenge with the ProbabilisticClassifier abstraction is that it 
> introduces different code paths for predictions depending on which output 
> columns are turned on or off: probability, rawPrediction, prediction.  We ran 
> into a bug in MLOR with this.
> This task is for adding a generic test usable in all test suites for 
> ProbabilisticClassifier types which does the following:
> * Take a dataset + Estimator
> * Fit the Estimator
> * Test prediction using the model with all combinations of output columns 
> turned on/off.
> * Make sure the output column values match, presumably by comparing vs. the 
> case with all 3 output columns turned on
> CC [~WeichenXu123] since this came up in 
> https://github.com/apache/spark/pull/17373



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21190) SPIP: Vectorized UDFs in Python

2017-09-01 Thread Leif Walsh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16151261#comment-16151261
 ] 

Leif Walsh commented on SPARK-21190:


I'm not 100% sure this is legal pandas but I think it might be. If no columns 
are passed in, we might be able to pass in a zero-column dataframe with an 
integer index from 0 to size-1. Then {{of.Series(data=1, index=D.C.index)}} 
might do the right thing, without magic extra parameters. I think this would be 
conceptually correct too. I will verify and provide sample code this weekend if 
I find the time. 

> SPIP: Vectorized UDFs in Python
> ---
>
> Key: SPARK-21190
> URL: https://issues.apache.org/jira/browse/SPARK-21190
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark, SQL
>Affects Versions: 2.2.0
>Reporter: Reynold Xin
>Assignee: Reynold Xin
>  Labels: SPIP
> Attachments: SPIPVectorizedUDFsforPython (1).pdf
>
>
> *Background and Motivation*
> Python is one of the most popular programming languages among Spark users. 
> Spark currently exposes a row-at-a-time interface for defining and executing 
> user-defined functions (UDFs). This introduces high overhead in serialization 
> and deserialization, and also makes it difficult to leverage Python libraries 
> (e.g. numpy, Pandas) that are written in native code.
>  
> This proposal advocates introducing new APIs to support vectorized UDFs in 
> Python, in which a block of data is transferred over to Python in some 
> columnar format for execution.
>  
>  
> *Target Personas*
> Data scientists, data engineers, library developers.
>  
> *Goals*
> - Support vectorized UDFs that apply on chunks of the data frame
> - Low system overhead: Substantially reduce serialization and deserialization 
> overhead when compared with row-at-a-time interface
> - UDF performance: Enable users to leverage native libraries in Python (e.g. 
> numpy, Pandas) for data manipulation in these UDFs
>  
> *Non-Goals*
> The following are explicitly out of scope for the current SPIP, and should be 
> done in future SPIPs. Nonetheless, it would be good to consider these future 
> use cases during API design, so we can achieve some consistency when rolling 
> out new APIs.
>  
> - Define block oriented UDFs in other languages (that are not Python).
> - Define aggregate UDFs
> - Tight integration with machine learning frameworks
>  
> *Proposed API Changes*
> The following sketches some possibilities. I haven’t spent a lot of time 
> thinking about the API (wrote it down in 5 mins) and I am not attached to 
> this design at all. The main purpose of the SPIP is to get feedback on use 
> cases and see how they can impact API design.
>  
> A few things to consider are:
>  
> 1. Python is dynamically typed, whereas DataFrames/SQL requires static, 
> analysis time typing. This means users would need to specify the return type 
> of their UDFs.
>  
> 2. Ratio of input rows to output rows. We propose initially we require number 
> of output rows to be the same as the number of input rows. In the future, we 
> can consider relaxing this constraint with support for vectorized aggregate 
> UDFs.
> 3. How do we handle null values, since Pandas doesn't have the concept of 
> nulls?
>  
> Proposed API sketch (using examples):
>  
> Use case 1. A function that defines all the columns of a DataFrame (similar 
> to a “map” function):
>  
> {code}
> @spark_udf(some way to describe the return schema)
> def my_func_on_entire_df(input):
>   """ Some user-defined function.
>  
>   :param input: A Pandas DataFrame with two columns, a and b.
>   :return: :class: A Pandas data frame.
>   """
>   input[c] = input[a] + input[b]
>   Input[d] = input[a] - input[b]
>   return input
>  
> spark.range(1000).selectExpr("id a", "id / 2 b")
>   .mapBatches(my_func_on_entire_df)
> {code}
>  
> Use case 2. A function that defines only one column (similar to existing 
> UDFs):
>  
> {code}
> @spark_udf(some way to describe the return schema)
> def my_func_that_returns_one_column(input):
>   """ Some user-defined function.
>  
>   :param input: A Pandas DataFrame with two columns, a and b.
>   :return: :class: A numpy array
>   """
>   return input[a] + input[b]
>  
> my_func = udf(my_func_that_returns_one_column)
>  
> df = spark.range(1000).selectExpr("id a", "id / 2 b")
> df.withColumn("c", my_func(df.a, df.b))
> {code}
>  
>  
>  
> *Optional Design Sketch*
> I’m more concerned about getting proper feedback for API design. The 
> implementation should be pretty straightforward and is not a huge concern at 
> this point. We can leverage the same implementation for faster toPandas 
> (using Arrow).
>  
>  
> *Optional Rejected Designs*
> See above.
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

[jira] [Comment Edited] (SPARK-21190) SPIP: Vectorized UDFs in Python

2017-09-01 Thread Leif Walsh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16151261#comment-16151261
 ] 

Leif Walsh edited comment on SPARK-21190 at 9/1/17 10:55 PM:
-

I'm not 100% sure this is legal pandas but I think it might be. If no columns 
are passed in, we might be able to pass in a zero-column dataframe with an 
integer index from 0 to size-1. Then {{pd.Series(data=1, index=df.index)}} 
might do the right thing, without magic extra parameters. I think this would be 
conceptually correct too. I will verify and provide sample code this weekend if 
I find the time. 


was (Author: leif):
I'm not 100% sure this is legal pandas but I think it might be. If no columns 
are passed in, we might be able to pass in a zero-column dataframe with an 
integer index from 0 to size-1. Then {{of.Series(data=1, index=D.C.index)}} 
might do the right thing, without magic extra parameters. I think this would be 
conceptually correct too. I will verify and provide sample code this weekend if 
I find the time. 

> SPIP: Vectorized UDFs in Python
> ---
>
> Key: SPARK-21190
> URL: https://issues.apache.org/jira/browse/SPARK-21190
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark, SQL
>Affects Versions: 2.2.0
>Reporter: Reynold Xin
>Assignee: Reynold Xin
>  Labels: SPIP
> Attachments: SPIPVectorizedUDFsforPython (1).pdf
>
>
> *Background and Motivation*
> Python is one of the most popular programming languages among Spark users. 
> Spark currently exposes a row-at-a-time interface for defining and executing 
> user-defined functions (UDFs). This introduces high overhead in serialization 
> and deserialization, and also makes it difficult to leverage Python libraries 
> (e.g. numpy, Pandas) that are written in native code.
>  
> This proposal advocates introducing new APIs to support vectorized UDFs in 
> Python, in which a block of data is transferred over to Python in some 
> columnar format for execution.
>  
>  
> *Target Personas*
> Data scientists, data engineers, library developers.
>  
> *Goals*
> - Support vectorized UDFs that apply on chunks of the data frame
> - Low system overhead: Substantially reduce serialization and deserialization 
> overhead when compared with row-at-a-time interface
> - UDF performance: Enable users to leverage native libraries in Python (e.g. 
> numpy, Pandas) for data manipulation in these UDFs
>  
> *Non-Goals*
> The following are explicitly out of scope for the current SPIP, and should be 
> done in future SPIPs. Nonetheless, it would be good to consider these future 
> use cases during API design, so we can achieve some consistency when rolling 
> out new APIs.
>  
> - Define block oriented UDFs in other languages (that are not Python).
> - Define aggregate UDFs
> - Tight integration with machine learning frameworks
>  
> *Proposed API Changes*
> The following sketches some possibilities. I haven’t spent a lot of time 
> thinking about the API (wrote it down in 5 mins) and I am not attached to 
> this design at all. The main purpose of the SPIP is to get feedback on use 
> cases and see how they can impact API design.
>  
> A few things to consider are:
>  
> 1. Python is dynamically typed, whereas DataFrames/SQL requires static, 
> analysis time typing. This means users would need to specify the return type 
> of their UDFs.
>  
> 2. Ratio of input rows to output rows. We propose initially we require number 
> of output rows to be the same as the number of input rows. In the future, we 
> can consider relaxing this constraint with support for vectorized aggregate 
> UDFs.
> 3. How do we handle null values, since Pandas doesn't have the concept of 
> nulls?
>  
> Proposed API sketch (using examples):
>  
> Use case 1. A function that defines all the columns of a DataFrame (similar 
> to a “map” function):
>  
> {code}
> @spark_udf(some way to describe the return schema)
> def my_func_on_entire_df(input):
>   """ Some user-defined function.
>  
>   :param input: A Pandas DataFrame with two columns, a and b.
>   :return: :class: A Pandas data frame.
>   """
>   input[c] = input[a] + input[b]
>   Input[d] = input[a] - input[b]
>   return input
>  
> spark.range(1000).selectExpr("id a", "id / 2 b")
>   .mapBatches(my_func_on_entire_df)
> {code}
>  
> Use case 2. A function that defines only one column (similar to existing 
> UDFs):
>  
> {code}
> @spark_udf(some way to describe the return schema)
> def my_func_that_returns_one_column(input):
>   """ Some user-defined function.
>  
>   :param input: A Pandas DataFrame with two columns, a and b.
>   :return: :class: A numpy array
>   """
>   return input[a] + input[b]
>  
> my_func = udf(my_func_that_returns_one_column)
>  
> df = spark.range(1000).selectExpr(

[jira] [Commented] (SPARK-15918) unionAll returns wrong result when two dataframes has schema in different order

2017-09-01 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16151279#comment-16151279
 ] 

Hyukjin Kwon commented on SPARK-15918:
--

[~Munesh], No need to leave a duplicated comment in a duplicated JIRA. Please 
checkout SPARK-21043.

> unionAll returns wrong result when two dataframes has schema in different 
> order
> ---
>
> Key: SPARK-15918
> URL: https://issues.apache.org/jira/browse/SPARK-15918
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.1
> Environment: CentOS
>Reporter: Prabhu Joseph
>
> On applying unionAll operation between A and B dataframes, they both has same 
> schema but in different order and hence the result has column value mapping 
> changed.
> Repro:
> {code}
> A.show()
> +---++---+--+--+-++---+--+---+---+-+
> |tag|year_day|tm_hour|tm_min|tm_sec|dtype|time|tm_mday|tm_mon|tm_yday|tm_year|value|
> +---++---+--+--+-++---+--+---+---+-+
> +---++---+--+--+-++---+--+---+---+-+
> B.show()
> +-+---+--+---+---+--+--+--+---+---+--++
> |dtype|tag|  
> time|tm_hour|tm_mday|tm_min|tm_mon|tm_sec|tm_yday|tm_year| value|year_day|
> +-+---+--+---+---+--+--+--+---+---+--++
> |F|C_FNHXUT701Z.CNSTLO|1443790800| 13|  2| 0|10| 0|   
>  275|   2015|1.2345| 2015275|
> |F|C_FNHXUDP713.CNSTHI|1443790800| 13|  2| 0|10| 0|   
>  275|   2015|1.2345| 2015275|
> |F| C_FNHXUT718.CNSTHI|1443790800| 13|  2| 0|10| 0|   
>  275|   2015|1.2345| 2015275|
> |F|C_FNHXUT703Z.CNSTLO|1443790800| 13|  2| 0|10| 0|   
>  275|   2015|1.2345| 2015275|
> |F|C_FNHXUR716A.CNSTLO|1443790800| 13|  2| 0|10| 0|   
>  275|   2015|1.2345| 2015275|
> |F|C_FNHXUT803Z.CNSTHI|1443790800| 13|  2| 0|10| 0|   
>  275|   2015|1.2345| 2015275|
> |F| C_FNHXUT728.CNSTHI|1443790800| 13|  2| 0|10| 0|   
>  275|   2015|1.2345| 2015275|
> |F| C_FNHXUR806.CNSTHI|1443790800| 13|  2| 0|10| 0|   
>  275|   2015|1.2345| 2015275|
> +-+---+--+---+---+--+--+--+---+---+--++
> A = A.unionAll(B)
> A.show()
> +---+---+--+--+--+-++---+--+---+---+-+
> |tag|   year_day|   
> tm_hour|tm_min|tm_sec|dtype|time|tm_mday|tm_mon|tm_yday|tm_year|value|
> +---+---+--+--+--+-++---+--+---+---+-+
> |  F|C_FNHXUT701Z.CNSTLO|1443790800|13| 2|0|  10|  0|   275|  
>  2015| 1.2345|2015275.0|
> |  F|C_FNHXUDP713.CNSTHI|1443790800|13| 2|0|  10|  0|   275|  
>  2015| 1.2345|2015275.0|
> |  F| C_FNHXUT718.CNSTHI|1443790800|13| 2|0|  10|  0|   275|  
>  2015| 1.2345|2015275.0|
> |  F|C_FNHXUT703Z.CNSTLO|1443790800|13| 2|0|  10|  0|   275|  
>  2015| 1.2345|2015275.0|
> |  F|C_FNHXUR716A.CNSTLO|1443790800|13| 2|0|  10|  0|   275|  
>  2015| 1.2345|2015275.0|
> |  F|C_FNHXUT803Z.CNSTHI|1443790800|13| 2|0|  10|  0|   275|  
>  2015| 1.2345|2015275.0|
> |  F| C_FNHXUT728.CNSTHI|1443790800|13| 2|0|  10|  0|   275|  
>  2015| 1.2345|2015275.0|
> |  F| C_FNHXUR806.CNSTHI|1443790800|13| 2|0|  10|  0|   275|  
>  2015| 1.2345|2015275.0|
> +---+---+--+--+--+-++---+--+---+---+-+
> {code}
> On changing the schema of A according to B and doing unionAll works fine
> {code}
> C = 
> A.select("dtype","tag","time","tm_hour","tm_mday","tm_min",”tm_mon”,"tm_sec","tm_yday","tm_year","value","year_day")
> A = C.unionAll(B)
> A.show()
> +-+---+--+---+---+--+--+--+---+---+--++
> |dtype|tag|  
> time|tm_hour|tm_mday|tm_min|tm_mon|tm_sec|tm_yday|tm_year| value|year_day|
> +-+---+--+---+---+--+--+--+---+---+--++
> |F|C_FNHXUT701Z.CNSTLO|1443790800| 13|  2| 0|10| 0|   
>  275|   2015|1.2345| 2015275|
> |F|C_FNHXUDP713.CNSTHI|1443790800| 13|  2| 0|10| 0|   
>  275|   2015|1.2345| 2015275|
> |F| C_FNHXUT718.CNSTHI|1443790800| 13|  2| 0|10| 0|   
>  275|   2015|1.2345| 2015275|
> |F|C_FNHXUT703Z.CNSTLO|144379080

[jira] [Commented] (SPARK-15918) unionAll returns wrong result when two dataframes has schema in different order

2017-09-01 Thread Munesh Bandaru (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16151294#comment-16151294
 ] 

Munesh Bandaru commented on SPARK-15918:


[~hyukjin.kwon] Thank you Hyukjin for providing the reference for SPARK-21043. 
I'm looking for an alternate solution like this. 
The ticket is closed as 'Not a problem' but no alternate solution is provided. 
That is the reason I have added the workaround which might help others.

> unionAll returns wrong result when two dataframes has schema in different 
> order
> ---
>
> Key: SPARK-15918
> URL: https://issues.apache.org/jira/browse/SPARK-15918
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.1
> Environment: CentOS
>Reporter: Prabhu Joseph
>
> On applying unionAll operation between A and B dataframes, they both has same 
> schema but in different order and hence the result has column value mapping 
> changed.
> Repro:
> {code}
> A.show()
> +---++---+--+--+-++---+--+---+---+-+
> |tag|year_day|tm_hour|tm_min|tm_sec|dtype|time|tm_mday|tm_mon|tm_yday|tm_year|value|
> +---++---+--+--+-++---+--+---+---+-+
> +---++---+--+--+-++---+--+---+---+-+
> B.show()
> +-+---+--+---+---+--+--+--+---+---+--++
> |dtype|tag|  
> time|tm_hour|tm_mday|tm_min|tm_mon|tm_sec|tm_yday|tm_year| value|year_day|
> +-+---+--+---+---+--+--+--+---+---+--++
> |F|C_FNHXUT701Z.CNSTLO|1443790800| 13|  2| 0|10| 0|   
>  275|   2015|1.2345| 2015275|
> |F|C_FNHXUDP713.CNSTHI|1443790800| 13|  2| 0|10| 0|   
>  275|   2015|1.2345| 2015275|
> |F| C_FNHXUT718.CNSTHI|1443790800| 13|  2| 0|10| 0|   
>  275|   2015|1.2345| 2015275|
> |F|C_FNHXUT703Z.CNSTLO|1443790800| 13|  2| 0|10| 0|   
>  275|   2015|1.2345| 2015275|
> |F|C_FNHXUR716A.CNSTLO|1443790800| 13|  2| 0|10| 0|   
>  275|   2015|1.2345| 2015275|
> |F|C_FNHXUT803Z.CNSTHI|1443790800| 13|  2| 0|10| 0|   
>  275|   2015|1.2345| 2015275|
> |F| C_FNHXUT728.CNSTHI|1443790800| 13|  2| 0|10| 0|   
>  275|   2015|1.2345| 2015275|
> |F| C_FNHXUR806.CNSTHI|1443790800| 13|  2| 0|10| 0|   
>  275|   2015|1.2345| 2015275|
> +-+---+--+---+---+--+--+--+---+---+--++
> A = A.unionAll(B)
> A.show()
> +---+---+--+--+--+-++---+--+---+---+-+
> |tag|   year_day|   
> tm_hour|tm_min|tm_sec|dtype|time|tm_mday|tm_mon|tm_yday|tm_year|value|
> +---+---+--+--+--+-++---+--+---+---+-+
> |  F|C_FNHXUT701Z.CNSTLO|1443790800|13| 2|0|  10|  0|   275|  
>  2015| 1.2345|2015275.0|
> |  F|C_FNHXUDP713.CNSTHI|1443790800|13| 2|0|  10|  0|   275|  
>  2015| 1.2345|2015275.0|
> |  F| C_FNHXUT718.CNSTHI|1443790800|13| 2|0|  10|  0|   275|  
>  2015| 1.2345|2015275.0|
> |  F|C_FNHXUT703Z.CNSTLO|1443790800|13| 2|0|  10|  0|   275|  
>  2015| 1.2345|2015275.0|
> |  F|C_FNHXUR716A.CNSTLO|1443790800|13| 2|0|  10|  0|   275|  
>  2015| 1.2345|2015275.0|
> |  F|C_FNHXUT803Z.CNSTHI|1443790800|13| 2|0|  10|  0|   275|  
>  2015| 1.2345|2015275.0|
> |  F| C_FNHXUT728.CNSTHI|1443790800|13| 2|0|  10|  0|   275|  
>  2015| 1.2345|2015275.0|
> |  F| C_FNHXUR806.CNSTHI|1443790800|13| 2|0|  10|  0|   275|  
>  2015| 1.2345|2015275.0|
> +---+---+--+--+--+-++---+--+---+---+-+
> {code}
> On changing the schema of A according to B and doing unionAll works fine
> {code}
> C = 
> A.select("dtype","tag","time","tm_hour","tm_mday","tm_min",”tm_mon”,"tm_sec","tm_yday","tm_year","value","year_day")
> A = C.unionAll(B)
> A.show()
> +-+---+--+---+---+--+--+--+---+---+--++
> |dtype|tag|  
> time|tm_hour|tm_mday|tm_min|tm_mon|tm_sec|tm_yday|tm_year| value|year_day|
> +-+---+--+---+---+--+--+--+---+---+--++
> |F|C_FNHXUT701Z.CNSTLO|1443790800| 13|  2| 0|10| 0|   
>  275|   2015|1.2345| 2015275|
> |F|C_FNHXUDP713.CNSTHI|1443790800| 13|  2| 0|10| 0|  

[jira] [Commented] (SPARK-12157) Support numpy types as return values of Python UDFs

2017-09-01 Thread Felix Cheung (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16151336#comment-16151336
 ] 

Felix Cheung commented on SPARK-12157:
--

any more thought on this?
I think we should at least document this if this is won't fix.

> Support numpy types as return values of Python UDFs
> ---
>
> Key: SPARK-12157
> URL: https://issues.apache.org/jira/browse/SPARK-12157
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, SQL
>Affects Versions: 1.5.2
>Reporter: Justin Uang
>
> Currently, if I have a python UDF
> {code}
> import pyspark.sql.types as T
> import pyspark.sql.functions as F
> from pyspark.sql import Row
> import numpy as np
> argmax = F.udf(lambda x: np.argmax(x), T.IntegerType())
> df = sqlContext.createDataFrame([Row(array=[1,2,3])])
> df.select(argmax("array")).count()
> {code}
> I get an exception that is fairly opaque:
> {code}
> Caused by: net.razorvine.pickle.PickleException: expected zero arguments for 
> construction of ClassDict (for numpy.dtype)
> at 
> net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
> at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:701)
> at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:171)
> at net.razorvine.pickle.Unpickler.load(Unpickler.java:85)
> at net.razorvine.pickle.Unpickler.loads(Unpickler.java:98)
> at 
> org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1$$anonfun$apply$3.apply(python.scala:404)
> at 
> org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1$$anonfun$apply$3.apply(python.scala:403)
> {code}
> Numpy types like np.int and np.float64 should automatically be cast to the 
> proper dtypes.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-21729) Generic test for ProbabilisticClassifier to ensure consistent output columns

2017-09-01 Thread Joseph K. Bradley (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joseph K. Bradley resolved SPARK-21729.
---
   Resolution: Fixed
Fix Version/s: 2.3.0

Issue resolved by pull request 19065
[https://github.com/apache/spark/pull/19065]

> Generic test for ProbabilisticClassifier to ensure consistent output columns
> 
>
> Key: SPARK-21729
> URL: https://issues.apache.org/jira/browse/SPARK-21729
> Project: Spark
>  Issue Type: Test
>  Components: ML
>Affects Versions: 2.2.0
>Reporter: Joseph K. Bradley
>Assignee: Weichen Xu
> Fix For: 2.3.0
>
>
> One challenge with the ProbabilisticClassifier abstraction is that it 
> introduces different code paths for predictions depending on which output 
> columns are turned on or off: probability, rawPrediction, prediction.  We ran 
> into a bug in MLOR with this.
> This task is for adding a generic test usable in all test suites for 
> ProbabilisticClassifier types which does the following:
> * Take a dataset + Estimator
> * Fit the Estimator
> * Test prediction using the model with all combinations of output columns 
> turned on/off.
> * Make sure the output column values match, presumably by comparing vs. the 
> case with all 3 output columns turned on
> CC [~WeichenXu123] since this came up in 
> https://github.com/apache/spark/pull/17373



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-21897) Add unionByName API to DataFrame in Python and R

2017-09-01 Thread Hyukjin Kwon (JIRA)
Hyukjin Kwon created SPARK-21897:


 Summary: Add unionByName API to DataFrame in Python and R
 Key: SPARK-21897
 URL: https://issues.apache.org/jira/browse/SPARK-21897
 Project: Spark
  Issue Type: Improvement
  Components: PySpark, SparkR
Affects Versions: 2.3.0
Reporter: Hyukjin Kwon


It would be nicer if we have {{unionByName}} added in 
https://issues.apache.org/jira/browse/SPARK-21043, in Python and R too.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21897) Add unionByName API to DataFrame in Python and R

2017-09-01 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16151368#comment-16151368
 ] 

Apache Spark commented on SPARK-21897:
--

User 'HyukjinKwon' has created a pull request for this issue:
https://github.com/apache/spark/pull/19105

> Add unionByName API to DataFrame in Python and R
> 
>
> Key: SPARK-21897
> URL: https://issues.apache.org/jira/browse/SPARK-21897
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, SparkR
>Affects Versions: 2.3.0
>Reporter: Hyukjin Kwon
>
> It would be nicer if we have {{unionByName}} added in 
> https://issues.apache.org/jira/browse/SPARK-21043, in Python and R too.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-21897) Add unionByName API to DataFrame in Python and R

2017-09-01 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-21897:


Assignee: Apache Spark

> Add unionByName API to DataFrame in Python and R
> 
>
> Key: SPARK-21897
> URL: https://issues.apache.org/jira/browse/SPARK-21897
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, SparkR
>Affects Versions: 2.3.0
>Reporter: Hyukjin Kwon
>Assignee: Apache Spark
>
> It would be nicer if we have {{unionByName}} added in 
> https://issues.apache.org/jira/browse/SPARK-21043, in Python and R too.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-21897) Add unionByName API to DataFrame in Python and R

2017-09-01 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-21897:


Assignee: (was: Apache Spark)

> Add unionByName API to DataFrame in Python and R
> 
>
> Key: SPARK-21897
> URL: https://issues.apache.org/jira/browse/SPARK-21897
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, SparkR
>Affects Versions: 2.3.0
>Reporter: Hyukjin Kwon
>
> It would be nicer if we have {{unionByName}} added in 
> https://issues.apache.org/jira/browse/SPARK-21043, in Python and R too.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21190) SPIP: Vectorized UDFs in Python

2017-09-01 Thread Leif Walsh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16151376#comment-16151376
 ] 

Leif Walsh commented on SPARK-21190:


Yep, that's totally a thing:

{noformat}In [1]: import pandas as pd

In [2]: pd.DataFrame(index=list(range(100)))
Out[2]: 
Empty DataFrame
Columns: []
Index: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 
20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 
40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 
60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 
80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]

[100 rows x 0 columns]

In [3]: df = Out[2]

In [4]: pd.Series(data=1, index=df.index)
Out[4]: 
0 1
1 1
2 1
3 1
4 1
5 1
6 1
7 1
8 1
9 1
101
111
121
131
141
151
161
171
181
191
201
211
221
231
241
251
261
271
281
291
 ..
701
711
721
731
741
751
761
771
781
791
801
811
821
831
841
851
861
871
881
891
901
911
921
931
941
951
961
971
981
991
Length: 100, dtype: int64

{noformat}

So, how about:

{noformat}
@pandas_udf(LongType())
def f0(df):
return pd.Series(data=1, index=df.index)

df.select(f0())
{noformat}

> SPIP: Vectorized UDFs in Python
> ---
>
> Key: SPARK-21190
> URL: https://issues.apache.org/jira/browse/SPARK-21190
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark, SQL
>Affects Versions: 2.2.0
>Reporter: Reynold Xin
>Assignee: Reynold Xin
>  Labels: SPIP
> Attachments: SPIPVectorizedUDFsforPython (1).pdf
>
>
> *Background and Motivation*
> Python is one of the most popular programming languages among Spark users. 
> Spark currently exposes a row-at-a-time interface for defining and executing 
> user-defined functions (UDFs). This introduces high overhead in serialization 
> and deserialization, and also makes it difficult to leverage Python libraries 
> (e.g. numpy, Pandas) that are written in native code.
>  
> This proposal advocates introducing new APIs to support vectorized UDFs in 
> Python, in which a block of data is transferred over to Python in some 
> columnar format for execution.
>  
>  
> *Target Personas*
> Data scientists, data engineers, library developers.
>  
> *Goals*
> - Support vectorized UDFs that apply on chunks of the data frame
> - Low system overhead: Substantially reduce serialization and deserialization 
> overhead when compared with row-at-a-time interface
> - UDF performance: Enable users to leverage native libraries in Python (e.g. 
> numpy, Pandas) for data manipulation in these UDFs
>  
> *Non-Goals*
> The following are explicitly out of scope for the current SPIP, and should be 
> done in future SPIPs. Nonetheless, it would be good to consider these future 
> use cases during API design, so we can achieve some consistency when rolling 
> out new APIs.
>  
> - Define block oriented UDFs in other languages (that are not Python).
> - Define aggregate UDFs
> - Tight integration with machine learning frameworks
>  
> *Proposed API Changes*
> The following sketches some possibilities. I haven’t spent a lot of time 
> thinking about the API (wrote it down in 5 mins) and I am not attached to 
> this design at all. The main purpose of the SPIP is to get feedback on use 
> cases and see how they can impact API design.
>  
> A few things to consider are:
>  
> 1. Python is dynamically typed, whereas DataFrames/SQL requires static, 
> analysis time typing. This means users would need to specify the return type 
> of their UDFs.
>  
> 2. Ratio of input rows to output rows. We propose initially we require number 
> of output rows to be the same as the number of input rows. In the future, we 
> can consider relaxing this constraint with support for vectorized aggregate 
> UDFs.
> 3. How do we handle null values, since Pandas doesn't have the concept of 
> nulls?
>  
> Proposed API sketch (using examples):
>  
> Use case 1. A function that defines all the columns of a DataFrame (similar 
> to a “map” function):
>  
> {code}
> @spark_udf(some way to describe the return schema)
> def my_func_on_entire_df(input):
>   """ Some user-defined function.
>  
>   :param input: A Pandas DataFrame with two columns, a and b.
>   :return: :class: A Pandas data frame.
>   """
>   input[c] = input[a] + input[b]
>   Input[d] = input[a] - input[b]
>   return input
>  
> spark.range(1000).selectExpr("id a", "id / 2 b")
>   .mapBatches(my_func_on_entire_df)
> {code}
>  
> Use case 2. A function that defines only one co

[jira] [Commented] (SPARK-21190) SPIP: Vectorized UDFs in Python

2017-09-01 Thread Leif Walsh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16151377#comment-16151377
 ] 

Leif Walsh commented on SPARK-21190:


You can also make a Series with no content and an index, but this becomes a 
float64 Series full of NaNs, which seems less good: it's indistinguishable from 
a column which is actually full of NaNs:

{noformat}
In [5]: pd.Series(index=list(range(100)))
Out[5]: 
0NaN
1NaN
2NaN
3NaN
4NaN
5NaN
6NaN
7NaN
8NaN
9NaN
10   NaN
11   NaN
12   NaN
13   NaN
14   NaN
15   NaN
16   NaN
17   NaN
18   NaN
19   NaN
20   NaN
21   NaN
22   NaN
23   NaN
24   NaN
25   NaN
26   NaN
27   NaN
28   NaN
29   NaN
  ..
70   NaN
71   NaN
72   NaN
73   NaN
74   NaN
75   NaN
76   NaN
77   NaN
78   NaN
79   NaN
80   NaN
81   NaN
82   NaN
83   NaN
84   NaN
85   NaN
86   NaN
87   NaN
88   NaN
89   NaN
90   NaN
91   NaN
92   NaN
93   NaN
94   NaN
95   NaN
96   NaN
97   NaN
98   NaN
99   NaN
Length: 100, dtype: float64

{noformat}

> SPIP: Vectorized UDFs in Python
> ---
>
> Key: SPARK-21190
> URL: https://issues.apache.org/jira/browse/SPARK-21190
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark, SQL
>Affects Versions: 2.2.0
>Reporter: Reynold Xin
>Assignee: Reynold Xin
>  Labels: SPIP
> Attachments: SPIPVectorizedUDFsforPython (1).pdf
>
>
> *Background and Motivation*
> Python is one of the most popular programming languages among Spark users. 
> Spark currently exposes a row-at-a-time interface for defining and executing 
> user-defined functions (UDFs). This introduces high overhead in serialization 
> and deserialization, and also makes it difficult to leverage Python libraries 
> (e.g. numpy, Pandas) that are written in native code.
>  
> This proposal advocates introducing new APIs to support vectorized UDFs in 
> Python, in which a block of data is transferred over to Python in some 
> columnar format for execution.
>  
>  
> *Target Personas*
> Data scientists, data engineers, library developers.
>  
> *Goals*
> - Support vectorized UDFs that apply on chunks of the data frame
> - Low system overhead: Substantially reduce serialization and deserialization 
> overhead when compared with row-at-a-time interface
> - UDF performance: Enable users to leverage native libraries in Python (e.g. 
> numpy, Pandas) for data manipulation in these UDFs
>  
> *Non-Goals*
> The following are explicitly out of scope for the current SPIP, and should be 
> done in future SPIPs. Nonetheless, it would be good to consider these future 
> use cases during API design, so we can achieve some consistency when rolling 
> out new APIs.
>  
> - Define block oriented UDFs in other languages (that are not Python).
> - Define aggregate UDFs
> - Tight integration with machine learning frameworks
>  
> *Proposed API Changes*
> The following sketches some possibilities. I haven’t spent a lot of time 
> thinking about the API (wrote it down in 5 mins) and I am not attached to 
> this design at all. The main purpose of the SPIP is to get feedback on use 
> cases and see how they can impact API design.
>  
> A few things to consider are:
>  
> 1. Python is dynamically typed, whereas DataFrames/SQL requires static, 
> analysis time typing. This means users would need to specify the return type 
> of their UDFs.
>  
> 2. Ratio of input rows to output rows. We propose initially we require number 
> of output rows to be the same as the number of input rows. In the future, we 
> can consider relaxing this constraint with support for vectorized aggregate 
> UDFs.
> 3. How do we handle null values, since Pandas doesn't have the concept of 
> nulls?
>  
> Proposed API sketch (using examples):
>  
> Use case 1. A function that defines all the columns of a DataFrame (similar 
> to a “map” function):
>  
> {code}
> @spark_udf(some way to describe the return schema)
> def my_func_on_entire_df(input):
>   """ Some user-defined function.
>  
>   :param input: A Pandas DataFrame with two columns, a and b.
>   :return: :class: A Pandas data frame.
>   """
>   input[c] = input[a] + input[b]
>   Input[d] = input[a] - input[b]
>   return input
>  
> spark.range(1000).selectExpr("id a", "id / 2 b")
>   .mapBatches(my_func_on_entire_df)
> {code}
>  
> Use case 2. A function that defines only one column (similar to existing 
> UDFs):
>  
> {code}
> @spark_udf(some way to describe the return schema)
> def my_func_that_returns_one_column(input):
>   """ Some user-defined function.
>  
>   :param input: A Pandas DataFrame with two columns, a and b.
>   :return: :class: A numpy array
>   """
>   return input[a] + input[b]
>  
> my_func = udf(my_func_that_returns_one_column)
>  
> df = spark.range(1000).selectExpr("id a", "id / 2 b")
> df.withColumn("c", my

[jira] [Assigned] (SPARK-21770) ProbabilisticClassificationModel: Improve normalization of all-zero raw predictions

2017-09-01 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-21770:


Assignee: Apache Spark

> ProbabilisticClassificationModel: Improve normalization of all-zero raw 
> predictions
> ---
>
> Key: SPARK-21770
> URL: https://issues.apache.org/jira/browse/SPARK-21770
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 2.3.0
>Reporter: Siddharth Murching
>Assignee: Apache Spark
>Priority: Minor
>
> Given an n-element raw prediction vector of all-zeros, 
> ProbabilisticClassifierModel.normalizeToProbabilitiesInPlace() should output 
> a probability vector of all-equal 1/n entries



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-21770) ProbabilisticClassificationModel: Improve normalization of all-zero raw predictions

2017-09-01 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-21770:


Assignee: (was: Apache Spark)

> ProbabilisticClassificationModel: Improve normalization of all-zero raw 
> predictions
> ---
>
> Key: SPARK-21770
> URL: https://issues.apache.org/jira/browse/SPARK-21770
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 2.3.0
>Reporter: Siddharth Murching
>Priority: Minor
>
> Given an n-element raw prediction vector of all-zeros, 
> ProbabilisticClassifierModel.normalizeToProbabilitiesInPlace() should output 
> a probability vector of all-equal 1/n entries



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21770) ProbabilisticClassificationModel: Improve normalization of all-zero raw predictions

2017-09-01 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16151404#comment-16151404
 ] 

Apache Spark commented on SPARK-21770:
--

User 'WeichenXu123' has created a pull request for this issue:
https://github.com/apache/spark/pull/19106

> ProbabilisticClassificationModel: Improve normalization of all-zero raw 
> predictions
> ---
>
> Key: SPARK-21770
> URL: https://issues.apache.org/jira/browse/SPARK-21770
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 2.3.0
>Reporter: Siddharth Murching
>Priority: Minor
>
> Given an n-element raw prediction vector of all-zeros, 
> ProbabilisticClassifierModel.normalizeToProbabilitiesInPlace() should output 
> a probability vector of all-equal 1/n entries



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-21799) KMeans performance regression (5-6x slowdown) in Spark 2.2

2017-09-01 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-21799:


Assignee: (was: Apache Spark)

> KMeans performance regression (5-6x slowdown) in Spark 2.2
> --
>
> Key: SPARK-21799
> URL: https://issues.apache.org/jira/browse/SPARK-21799
> Project: Spark
>  Issue Type: Bug
>  Components: MLlib
>Affects Versions: 2.2.0
>Reporter: Siddharth Murching
>
> I've been running KMeans performance tests using 
> [spark-sql-perf|https://github.com/databricks/spark-sql-perf/] and have 
> noticed a regression (slowdowns of 5-6x) when running tests on large datasets 
> in Spark 2.2 vs 2.1.
> The test params are:
> * Cluster: 510 GB RAM, 16 workers
> * Data: 100 examples, 1 features
> After talking to [~josephkb], the issue seems related to the changes in 
> [SPARK-18356|https://issues.apache.org/jira/browse/SPARK-18356] introduced in 
> [this PR|https://github.com/apache/spark/pull/16295].
> It seems `df.cache()` doesn't set the storageLevel of `df.rdd`, so 
> `handlePersistence` is true even when KMeans is run on a cached DataFrame. 
> This unnecessarily causes another copy of the input dataset to be persisted.
> As of Spark 2.1 ([JIRA 
> link|https://issues.apache.org/jira/browse/SPARK-16063]) `df.storageLevel` 
> returns the correct result after calling `df.cache()`, so I'd suggest 
> replacing instances of `df.rdd.getStorageLevel` with df.storageLevel` in 
> MLlib algorithms (the same pattern shows up in LogisticRegression, 
> LinearRegression, and others). I've verified this behavior in [this 
> notebook|https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/5211178207246023/950505630032626/7788830288800223/latest.html]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21799) KMeans performance regression (5-6x slowdown) in Spark 2.2

2017-09-01 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16151407#comment-16151407
 ] 

Apache Spark commented on SPARK-21799:
--

User 'WeichenXu123' has created a pull request for this issue:
https://github.com/apache/spark/pull/19107

> KMeans performance regression (5-6x slowdown) in Spark 2.2
> --
>
> Key: SPARK-21799
> URL: https://issues.apache.org/jira/browse/SPARK-21799
> Project: Spark
>  Issue Type: Bug
>  Components: MLlib
>Affects Versions: 2.2.0
>Reporter: Siddharth Murching
>
> I've been running KMeans performance tests using 
> [spark-sql-perf|https://github.com/databricks/spark-sql-perf/] and have 
> noticed a regression (slowdowns of 5-6x) when running tests on large datasets 
> in Spark 2.2 vs 2.1.
> The test params are:
> * Cluster: 510 GB RAM, 16 workers
> * Data: 100 examples, 1 features
> After talking to [~josephkb], the issue seems related to the changes in 
> [SPARK-18356|https://issues.apache.org/jira/browse/SPARK-18356] introduced in 
> [this PR|https://github.com/apache/spark/pull/16295].
> It seems `df.cache()` doesn't set the storageLevel of `df.rdd`, so 
> `handlePersistence` is true even when KMeans is run on a cached DataFrame. 
> This unnecessarily causes another copy of the input dataset to be persisted.
> As of Spark 2.1 ([JIRA 
> link|https://issues.apache.org/jira/browse/SPARK-16063]) `df.storageLevel` 
> returns the correct result after calling `df.cache()`, so I'd suggest 
> replacing instances of `df.rdd.getStorageLevel` with df.storageLevel` in 
> MLlib algorithms (the same pattern shows up in LogisticRegression, 
> LinearRegression, and others). I've verified this behavior in [this 
> notebook|https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/5211178207246023/950505630032626/7788830288800223/latest.html]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-21799) KMeans performance regression (5-6x slowdown) in Spark 2.2

2017-09-01 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-21799:


Assignee: Apache Spark

> KMeans performance regression (5-6x slowdown) in Spark 2.2
> --
>
> Key: SPARK-21799
> URL: https://issues.apache.org/jira/browse/SPARK-21799
> Project: Spark
>  Issue Type: Bug
>  Components: MLlib
>Affects Versions: 2.2.0
>Reporter: Siddharth Murching
>Assignee: Apache Spark
>
> I've been running KMeans performance tests using 
> [spark-sql-perf|https://github.com/databricks/spark-sql-perf/] and have 
> noticed a regression (slowdowns of 5-6x) when running tests on large datasets 
> in Spark 2.2 vs 2.1.
> The test params are:
> * Cluster: 510 GB RAM, 16 workers
> * Data: 100 examples, 1 features
> After talking to [~josephkb], the issue seems related to the changes in 
> [SPARK-18356|https://issues.apache.org/jira/browse/SPARK-18356] introduced in 
> [this PR|https://github.com/apache/spark/pull/16295].
> It seems `df.cache()` doesn't set the storageLevel of `df.rdd`, so 
> `handlePersistence` is true even when KMeans is run on a cached DataFrame. 
> This unnecessarily causes another copy of the input dataset to be persisted.
> As of Spark 2.1 ([JIRA 
> link|https://issues.apache.org/jira/browse/SPARK-16063]) `df.storageLevel` 
> returns the correct result after calling `df.cache()`, so I'd suggest 
> replacing instances of `df.rdd.getStorageLevel` with df.storageLevel` in 
> MLlib algorithms (the same pattern shows up in LogisticRegression, 
> LinearRegression, and others). I've verified this behavior in [this 
> notebook|https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/5211178207246023/950505630032626/7788830288800223/latest.html]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org