[jira] [Commented] (SPARK-21015) Check field name is not null and empty in GenericRowWithSchema
[ https://issues.apache.org/jira/browse/SPARK-21015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16042272#comment-16042272 ] Apache Spark commented on SPARK-21015: -- User 'darionyaphet' has created a pull request for this issue: https://github.com/apache/spark/pull/18236 > Check field name is not null and empty in GenericRowWithSchema > -- > > Key: SPARK-21015 > URL: https://issues.apache.org/jira/browse/SPARK-21015 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.1.1 >Reporter: darion yaphet >Priority: Minor > > When we get field index from row with schema , we shoule make sure the field > name is not null and empty . -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-21015) Check field name is not null and empty in GenericRowWithSchema
[ https://issues.apache.org/jira/browse/SPARK-21015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-21015: Assignee: (was: Apache Spark) > Check field name is not null and empty in GenericRowWithSchema > -- > > Key: SPARK-21015 > URL: https://issues.apache.org/jira/browse/SPARK-21015 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.1.1 >Reporter: darion yaphet >Priority: Minor > > When we get field index from row with schema , we shoule make sure the field > name is not null and empty . -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-21015) Check field name is not null and empty in GenericRowWithSchema
[ https://issues.apache.org/jira/browse/SPARK-21015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-21015: Assignee: Apache Spark > Check field name is not null and empty in GenericRowWithSchema > -- > > Key: SPARK-21015 > URL: https://issues.apache.org/jira/browse/SPARK-21015 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.1.1 >Reporter: darion yaphet >Assignee: Apache Spark >Priority: Minor > > When we get field index from row with schema , we shoule make sure the field > name is not null and empty . -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21015) Check field name is not null and empty in GenericRowWithSchema
darion yaphet created SPARK-21015: - Summary: Check field name is not null and empty in GenericRowWithSchema Key: SPARK-21015 URL: https://issues.apache.org/jira/browse/SPARK-21015 Project: Spark Issue Type: New Feature Components: SQL Affects Versions: 2.1.1 Reporter: darion yaphet Priority: Minor When we get field index from row with schema , we shoule make sure the field name is not null and empty . -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21014) Support get fields with schema name
darion yaphet created SPARK-21014: - Summary: Support get fields with schema name Key: SPARK-21014 URL: https://issues.apache.org/jira/browse/SPARK-21014 Project: Spark Issue Type: New Feature Components: SQL Affects Versions: 2.1.1 Reporter: darion yaphet Priority: Minor Currently when we want to get field from row we should use index to fetch it.But the row this schema we can use field name to read field from row is very useful. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-19547) KafkaUtil throw 'No current assignment for partition' Exception
[ https://issues.apache.org/jira/browse/SPARK-19547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16042241#comment-16042241 ] Utkarsh edited comment on SPARK-19547 at 6/8/17 6:00 AM: - Hi, I'm using Kafka 0.10.0 and kafka-clients 0.10.0.0, spark-streaming_2.11 version 2.1.1 and am facing the same issue. Can anyone kindly point me to the right direction here. Thanks. was (Author: utkarshkajaria): Hi, I'm using Kafka 0.10.0 and kafka-clients 0.10.0.0 and am facing the same issue. Can anyone kindly point me to the right direction here. Thanks. > KafkaUtil throw 'No current assignment for partition' Exception > --- > > Key: SPARK-19547 > URL: https://issues.apache.org/jira/browse/SPARK-19547 > Project: Spark > Issue Type: Question > Components: DStreams >Affects Versions: 1.6.1 >Reporter: wuchang > > Below is my scala code to create spark kafka stream: > val kafkaParams = Map[String, Object]( > "bootstrap.servers" -> "server110:2181,server110:9092", > "zookeeper" -> "server110:2181", > "key.deserializer" -> classOf[StringDeserializer], > "value.deserializer" -> classOf[StringDeserializer], > "group.id" -> "example", > "auto.offset.reset" -> "latest", > "enable.auto.commit" -> (false: java.lang.Boolean) > ) > val topics = Array("ABTest") > val stream = KafkaUtils.createDirectStream[String, String]( > ssc, > PreferConsistent, > Subscribe[String, String](topics, kafkaParams) > ) > But after run for 10 hours, it throws exceptions: > 2017-02-10 10:56:20,000 INFO [JobGenerator] internals.ConsumerCoordinator: > Revoking previously assigned partitions [ABTest-0, ABTest-1] for group example > 2017-02-10 10:56:20,000 INFO [JobGenerator] internals.AbstractCoordinator: > (Re-)joining group example > 2017-02-10 10:56:20,011 INFO [JobGenerator] internals.AbstractCoordinator: > (Re-)joining group example > 2017-02-10 10:56:40,057 INFO [JobGenerator] internals.AbstractCoordinator: > Successfully joined group example with generation 5 > 2017-02-10 10:56:40,058 INFO [JobGenerator] internals.ConsumerCoordinator: > Setting newly assigned partitions [ABTest-1] for group example > 2017-02-10 10:56:40,080 ERROR [JobScheduler] scheduler.JobScheduler: Error > generating jobs for time 148669538 ms > java.lang.IllegalStateException: No current assignment for partition ABTest-0 > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:231) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:295) > at > org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1169) > at > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:179) > at > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:196) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) > at scala.Option.orElse(Option.scala:289) > at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330) > at > org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > scala.collection.TraversableLike$class.fla
[jira] [Commented] (SPARK-19547) KafkaUtil throw 'No current assignment for partition' Exception
[ https://issues.apache.org/jira/browse/SPARK-19547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16042241#comment-16042241 ] Utkarsh commented on SPARK-19547: - Hi, I'm using Kafka 0.10.0 and kafka-clients 0.10.0.0 and am facing the same issue. Can anyone kindly point me to the right direction here. Thanks. > KafkaUtil throw 'No current assignment for partition' Exception > --- > > Key: SPARK-19547 > URL: https://issues.apache.org/jira/browse/SPARK-19547 > Project: Spark > Issue Type: Question > Components: DStreams >Affects Versions: 1.6.1 >Reporter: wuchang > > Below is my scala code to create spark kafka stream: > val kafkaParams = Map[String, Object]( > "bootstrap.servers" -> "server110:2181,server110:9092", > "zookeeper" -> "server110:2181", > "key.deserializer" -> classOf[StringDeserializer], > "value.deserializer" -> classOf[StringDeserializer], > "group.id" -> "example", > "auto.offset.reset" -> "latest", > "enable.auto.commit" -> (false: java.lang.Boolean) > ) > val topics = Array("ABTest") > val stream = KafkaUtils.createDirectStream[String, String]( > ssc, > PreferConsistent, > Subscribe[String, String](topics, kafkaParams) > ) > But after run for 10 hours, it throws exceptions: > 2017-02-10 10:56:20,000 INFO [JobGenerator] internals.ConsumerCoordinator: > Revoking previously assigned partitions [ABTest-0, ABTest-1] for group example > 2017-02-10 10:56:20,000 INFO [JobGenerator] internals.AbstractCoordinator: > (Re-)joining group example > 2017-02-10 10:56:20,011 INFO [JobGenerator] internals.AbstractCoordinator: > (Re-)joining group example > 2017-02-10 10:56:40,057 INFO [JobGenerator] internals.AbstractCoordinator: > Successfully joined group example with generation 5 > 2017-02-10 10:56:40,058 INFO [JobGenerator] internals.ConsumerCoordinator: > Setting newly assigned partitions [ABTest-1] for group example > 2017-02-10 10:56:40,080 ERROR [JobScheduler] scheduler.JobScheduler: Error > generating jobs for time 148669538 ms > java.lang.IllegalStateException: No current assignment for partition ABTest-0 > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:231) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:295) > at > org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1169) > at > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:179) > at > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:196) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) > at scala.Option.orElse(Option.scala:289) > at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330) > at > org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) > at > org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) > at > org.apache.spark.streaming.scheduler.JobGene
[jira] [Updated] (SPARK-21012) Support glob path for resources adding to Spark
[ https://issues.apache.org/jira/browse/SPARK-21012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Saisai Shao updated SPARK-21012: Description: Current "\-\-jars (spark.jars)", "\-\-files (spark.files)", "\-\-py-files (spark.submit.pyFiles)" and "\-\-archives (spark.yarn.dist.archives)" only support non-glob path. This is OK for most of the cases, but when user requires to add more jars, files into Spark, it is too verbose to list one by one. So here propose to support glob path for adding resources. Also improving the code of downloading resources. was: Current "\-\-jars (spark.jars)", "\-\-files (spark.files)", "\-\-py-files (spark.submit.pyFiles)" and "\-\-archives (spark.yarn.dist.archives)" only support non-glob path. This is OK for most of the cases, but when user requires to add more jars, files into Spark, it is too verbose to list one by one. Also improving the code of downloading resources. > Support glob path for resources adding to Spark > --- > > Key: SPARK-21012 > URL: https://issues.apache.org/jira/browse/SPARK-21012 > Project: Spark > Issue Type: Improvement > Components: Spark Submit >Affects Versions: 2.2.0 >Reporter: Saisai Shao >Priority: Minor > > Current "\-\-jars (spark.jars)", "\-\-files (spark.files)", "\-\-py-files > (spark.submit.pyFiles)" and "\-\-archives (spark.yarn.dist.archives)" only > support non-glob path. This is OK for most of the cases, but when user > requires to add more jars, files into Spark, it is too verbose to list one by > one. So here propose to support glob path for adding resources. > Also improving the code of downloading resources. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-21001) Staging folders from Hive table are not being cleared.
[ https://issues.apache.org/jira/browse/SPARK-21001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16040341#comment-16040341 ] Liang-Chi Hsieh edited comment on SPARK-21001 at 6/8/17 5:20 AM: - I found a PR which backports the related fix to 2.0 branch: https://github.com/apache/spark/pull/16399 But looks like it is backported after 2.0.2 release. So 2.0.2 release doesn't include this fix. You may try the latest 2.0 branch if you could. was (Author: viirya): I found a PR which backports the related fix to 2.0 branch: https://github.com/apache/spark/pull/16399 But looks like it is backported after 2.0.2 release. You may try the latest 2.0 branch if you could. > Staging folders from Hive table are not being cleared. > -- > > Key: SPARK-21001 > URL: https://issues.apache.org/jira/browse/SPARK-21001 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2 >Reporter: Ajay Cherukuri > > Staging folders that were being created as a part of Data loading to Hive > table by using spark job, are not cleared. > Staging folder are remaining in Hive External table folders even after Spark > job is completed. > This is the same issue mentioned in the > ticket:https://issues.apache.org/jira/browse/SPARK-18372 > This ticket says the issues was resolved in 1.6.4. But, now i found that it's > still existing on 2.0.2. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-19057) Instance weights must be non-negative
[ https://issues.apache.org/jira/browse/SPARK-19057?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhengruifeng closed SPARK-19057. Resolution: Won't Fix > Instance weights must be non-negative > - > > Key: SPARK-19057 > URL: https://issues.apache.org/jira/browse/SPARK-19057 > Project: Spark > Issue Type: Improvement > Components: ML >Reporter: zhengruifeng >Priority: Trivial > > It looks like that negative-weighted instances make no sense and can cause > divergence in ml algs. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20760) Memory Leak of RDD blocks
[ https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16042177#comment-16042177 ] Jose Soltren commented on SPARK-20760: -- Hi Binzi: 1. If this is so then there is some error in my understanding. 2. I don't have a good answer here yet, sorry. 3. Though these RDD operations appear serially in your test program, they are all asynchronous behind the scenes. In particular, unpersisting RDDs really amounts to the driver sending the executors a bunch of messages to do unpersisting. Since they are asynchronous, decoupled operations, it is possible for one to be faster than another. I'm not entirely sure precisely what changed since Spark 1.6 here. My investigation was focused on where all the memory was going, not in digesting all the changes across a major version of Spark. Cheers. > Memory Leak of RDD blocks > -- > > Key: SPARK-20760 > URL: https://issues.apache.org/jira/browse/SPARK-20760 > Project: Spark > Issue Type: Bug > Components: Block Manager >Affects Versions: 2.1.0 > Environment: Spark 2.1.0 >Reporter: Binzi Cao > Attachments: RDD blocks in spark 2.1.1.png, RDD Blocks .png, Storage > in spark 2.1.1.png > > > Memory leak for RDD blocks for a long time running rdd process. > We have a long term running application, which is doing computations of > RDDs. and we found the RDD blocks are keep increasing in the spark ui page. > The rdd blocks and memory usage do not mach the cached rdds and memory. It > looks like spark keeps old rdd in memory and never released it or never got a > chance to release it. The job will eventually die of out of memory. > In addition, I'm not seeing this issue in spark 1.6. We are seeing the same > issue in Yarn Cluster mode both in kafka streaming and batch applications. > The issue in streaming is similar, however, it seems the rdd blocks grows a > bit slower than batch jobs. > The below is the sample code and it is reproducible by justing running it in > local mode. > Scala file: > {code} > import scala.concurrent.duration.Duration > import scala.util.{Try, Failure, Success} > import org.apache.spark.SparkConf > import org.apache.spark.SparkContext > import org.apache.spark.rdd.RDD > import scala.concurrent._ > import ExecutionContext.Implicits.global > case class Person(id: String, name: String) > object RDDApp { > def run(sc: SparkContext) = { > while (true) { > val r = scala.util.Random > val data = (1 to r.nextInt(100)).toList.map { a => > Person(a.toString, a.toString) > } > val rdd = sc.parallelize(data) > rdd.cache > println("running") > val a = (1 to 100).toList.map { x => > Future(rdd.filter(_.id == x.toString).collect) > } > a.foreach { f => > println(Await.ready(f, Duration.Inf).value.get) > } > rdd.unpersist() > } > } > def main(args: Array[String]): Unit = { >val conf = new SparkConf().setAppName("test") > val sc = new SparkContext(conf) > run(sc) > } > } > {code} > build sbt file: > {code} > name := "RDDTest" > version := "0.1.1" > scalaVersion := "2.11.5" > libraryDependencies ++= Seq ( > "org.scalaz" %% "scalaz-core" % "7.2.0", > "org.scalaz" %% "scalaz-concurrent" % "7.2.0", > "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided", > "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided" > ) > addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1") > mainClass in assembly := Some("RDDApp") > test in assembly := {} > {code} > To reproduce it: > Just > {code} > spark-2.1.0-bin-hadoop2.7/bin/spark-submit --driver-memory 4G \ > --executor-memory 4G \ > --executor-cores 1 \ > --num-executors 1 \ > --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21013) Spark History Server does not show the logs of completed Yarn Jobs
HARIKRISHNAN CHENEPERTH KUNHUMVEETTIL created SPARK-21013: - Summary: Spark History Server does not show the logs of completed Yarn Jobs Key: SPARK-21013 URL: https://issues.apache.org/jira/browse/SPARK-21013 Project: Spark Issue Type: Bug Components: Web UI Affects Versions: 2.1.0, 2.0.1, 1.6.1 Reporter: HARIKRISHNAN CHENEPERTH KUNHUMVEETTIL I am facing issue when accessing the container logs of a completed Spark (Yarn) application from the History Server. Repro Steps: 1) Run the spark-shell in yarn client mode. Or run Pi job in Yarn mode. 2) Once the job is completed, (in the case of spark shell, exit after doing some simple operations), try to access the STDOUT or STDERR logs of the application from the Executors tab in the Spark History Server UI. 3) If yarn log aggregation is enabled, then logs won't be available in node manager's log location. But history Server is trying to access the logs from the nodemanager's log location giving below error in the UI: Failed redirect for container_e31_1496881617682_0003_01_02 ResourceManager RM Home NodeManager Tools Failed while trying to construct the redirect url to the log server. Log Server url may not be configured java.lang.Exception: Unknown container. Container either has not started or has already completed or doesn't belong to this node at all. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21012) Support glob path for resources adding to Spark
[ https://issues.apache.org/jira/browse/SPARK-21012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16042126#comment-16042126 ] Apache Spark commented on SPARK-21012: -- User 'jerryshao' has created a pull request for this issue: https://github.com/apache/spark/pull/18235 > Support glob path for resources adding to Spark > --- > > Key: SPARK-21012 > URL: https://issues.apache.org/jira/browse/SPARK-21012 > Project: Spark > Issue Type: Improvement > Components: Spark Submit >Affects Versions: 2.2.0 >Reporter: Saisai Shao >Priority: Minor > > Current "\-\-jars (spark.jars)", "\-\-files (spark.files)", "\-\-py-files > (spark.submit.pyFiles)" and "\-\-archives (spark.yarn.dist.archives)" only > support non-glob path. This is OK for most of the cases, but when user > requires to add more jars, files into Spark, it is too verbose to list one by > one. > Also improving the code of downloading resources. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-21012) Support glob path for resources adding to Spark
[ https://issues.apache.org/jira/browse/SPARK-21012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-21012: Assignee: Apache Spark > Support glob path for resources adding to Spark > --- > > Key: SPARK-21012 > URL: https://issues.apache.org/jira/browse/SPARK-21012 > Project: Spark > Issue Type: Improvement > Components: Spark Submit >Affects Versions: 2.2.0 >Reporter: Saisai Shao >Assignee: Apache Spark >Priority: Minor > > Current "\-\-jars (spark.jars)", "\-\-files (spark.files)", "\-\-py-files > (spark.submit.pyFiles)" and "\-\-archives (spark.yarn.dist.archives)" only > support non-glob path. This is OK for most of the cases, but when user > requires to add more jars, files into Spark, it is too verbose to list one by > one. > Also improving the code of downloading resources. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-21012) Support glob path for resources adding to Spark
[ https://issues.apache.org/jira/browse/SPARK-21012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-21012: Assignee: (was: Apache Spark) > Support glob path for resources adding to Spark > --- > > Key: SPARK-21012 > URL: https://issues.apache.org/jira/browse/SPARK-21012 > Project: Spark > Issue Type: Improvement > Components: Spark Submit >Affects Versions: 2.2.0 >Reporter: Saisai Shao >Priority: Minor > > Current "\-\-jars (spark.jars)", "\-\-files (spark.files)", "\-\-py-files > (spark.submit.pyFiles)" and "\-\-archives (spark.yarn.dist.archives)" only > support non-glob path. This is OK for most of the cases, but when user > requires to add more jars, files into Spark, it is too verbose to list one by > one. > Also improving the code of downloading resources. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21012) Support glob path for resources adding to Spark
[ https://issues.apache.org/jira/browse/SPARK-21012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Saisai Shao updated SPARK-21012: Issue Type: Improvement (was: Bug) > Support glob path for resources adding to Spark > --- > > Key: SPARK-21012 > URL: https://issues.apache.org/jira/browse/SPARK-21012 > Project: Spark > Issue Type: Improvement > Components: Spark Submit >Affects Versions: 2.2.0 >Reporter: Saisai Shao > > Current "\-\-jars (spark.jars)", "\-\-files (spark.files)", "\-\-py-files > (spark.submit.pyFiles)" and "\-\-archives (spark.yarn.dist.archives)" only > support non-glob path. This is OK for most of the cases, but when user > requires to add more jars, files into Spark, it is too verbose to list one by > one. > Also improving the code of downloading resources. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21012) Support glob path for resources adding to Spark
[ https://issues.apache.org/jira/browse/SPARK-21012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Saisai Shao updated SPARK-21012: Priority: Minor (was: Major) > Support glob path for resources adding to Spark > --- > > Key: SPARK-21012 > URL: https://issues.apache.org/jira/browse/SPARK-21012 > Project: Spark > Issue Type: Improvement > Components: Spark Submit >Affects Versions: 2.2.0 >Reporter: Saisai Shao >Priority: Minor > > Current "\-\-jars (spark.jars)", "\-\-files (spark.files)", "\-\-py-files > (spark.submit.pyFiles)" and "\-\-archives (spark.yarn.dist.archives)" only > support non-glob path. This is OK for most of the cases, but when user > requires to add more jars, files into Spark, it is too verbose to list one by > one. > Also improving the code of downloading resources. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21012) Support glob path for resources adding to Spark
Saisai Shao created SPARK-21012: --- Summary: Support glob path for resources adding to Spark Key: SPARK-21012 URL: https://issues.apache.org/jira/browse/SPARK-21012 Project: Spark Issue Type: Bug Components: Spark Submit Affects Versions: 2.2.0 Reporter: Saisai Shao Current "\-\-jars (spark.jars)", "\-\-files (spark.files)", "\-\-py-files (spark.submit.pyFiles)" and "\-\-archives (spark.yarn.dist.archives)" only support non-glob path. This is OK for most of the cases, but when user requires to add more jars, files into Spark, it is too verbose to list one by one. Also improving the code of downloading resources. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21010) Spark-Sql Can't Handle char() type Well
[ https://issues.apache.org/jira/browse/SPARK-21010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16042107#comment-16042107 ] fengchaoge commented on SPARK-21010: thank you > Spark-Sql Can't Handle char() type Well > > > Key: SPARK-21010 > URL: https://issues.apache.org/jira/browse/SPARK-21010 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.1, 2.1.0, 2.1.1 > Environment: spark1.6.1 hadoop-2.6.0-cdh5.4.2 >Reporter: fengchaoge > > we create table in spark-sql like this : > 1. create table cid_test (name string,id char(20)) ROW FORMAT DELIMITED > FIELDS TERMINATED BY ' ' stored as textfile; > 2. LOAD DATA LOCAL INPATH '/home/hadoop/id.txt' OVERWRITE INTO TABLE > cid_test; > content for id.txt: > fengchaoge 41302219990808 > 3. select * from cid_test where id='41302219990808'; > 4. select * from cid_test where id='41302219990808 '; > In third step,we got nothing ,but in four step we got the right ring. we must > add two spaces in id if we want the right value. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20760) Memory Leak of RDD blocks
[ https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16042009#comment-16042009 ] Binzi Cao commented on SPARK-20760: --- Hi Jose, Thanks very much for your detailed explanation. I got some questions as below 1. If the rdd blocks in ui just reflects how many rdds have been created, why does the number of rdd blocks go up and down during the test? 2. How about the storage page? I can see there is a list of RDDs in that page, does that page also show something wrong? 3. In this case, the code flow is : create rdd, cache rdd, do computations and unpersist , and they are in serial. I don't quite understand how the RDD blocks are created faster than they can be unpersisted. In addition, the issue did not happen in spark 1.6. Regards Binzi > Memory Leak of RDD blocks > -- > > Key: SPARK-20760 > URL: https://issues.apache.org/jira/browse/SPARK-20760 > Project: Spark > Issue Type: Bug > Components: Block Manager >Affects Versions: 2.1.0 > Environment: Spark 2.1.0 >Reporter: Binzi Cao > Attachments: RDD blocks in spark 2.1.1.png, RDD Blocks .png, Storage > in spark 2.1.1.png > > > Memory leak for RDD blocks for a long time running rdd process. > We have a long term running application, which is doing computations of > RDDs. and we found the RDD blocks are keep increasing in the spark ui page. > The rdd blocks and memory usage do not mach the cached rdds and memory. It > looks like spark keeps old rdd in memory and never released it or never got a > chance to release it. The job will eventually die of out of memory. > In addition, I'm not seeing this issue in spark 1.6. We are seeing the same > issue in Yarn Cluster mode both in kafka streaming and batch applications. > The issue in streaming is similar, however, it seems the rdd blocks grows a > bit slower than batch jobs. > The below is the sample code and it is reproducible by justing running it in > local mode. > Scala file: > {code} > import scala.concurrent.duration.Duration > import scala.util.{Try, Failure, Success} > import org.apache.spark.SparkConf > import org.apache.spark.SparkContext > import org.apache.spark.rdd.RDD > import scala.concurrent._ > import ExecutionContext.Implicits.global > case class Person(id: String, name: String) > object RDDApp { > def run(sc: SparkContext) = { > while (true) { > val r = scala.util.Random > val data = (1 to r.nextInt(100)).toList.map { a => > Person(a.toString, a.toString) > } > val rdd = sc.parallelize(data) > rdd.cache > println("running") > val a = (1 to 100).toList.map { x => > Future(rdd.filter(_.id == x.toString).collect) > } > a.foreach { f => > println(Await.ready(f, Duration.Inf).value.get) > } > rdd.unpersist() > } > } > def main(args: Array[String]): Unit = { >val conf = new SparkConf().setAppName("test") > val sc = new SparkContext(conf) > run(sc) > } > } > {code} > build sbt file: > {code} > name := "RDDTest" > version := "0.1.1" > scalaVersion := "2.11.5" > libraryDependencies ++= Seq ( > "org.scalaz" %% "scalaz-core" % "7.2.0", > "org.scalaz" %% "scalaz-concurrent" % "7.2.0", > "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided", > "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided" > ) > addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1") > mainClass in assembly := Some("RDDApp") > test in assembly := {} > {code} > To reproduce it: > Just > {code} > spark-2.1.0-bin-hadoop2.7/bin/spark-submit --driver-memory 4G \ > --executor-memory 4G \ > --executor-cores 1 \ > --num-executors 1 \ > --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21008) Streaming applications read stale credentials file when recovering from checkpoint.
[ https://issues.apache.org/jira/browse/SPARK-21008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-21008: - Component/s: (was: Structured Streaming) DStreams > Streaming applications read stale credentials file when recovering from > checkpoint. > --- > > Key: SPARK-21008 > URL: https://issues.apache.org/jira/browse/SPARK-21008 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 1.6.3, 2.0.2, 2.1.1, 2.2.0 >Reporter: Xing Shi > > On a security(Kerberos) enabled cluster, streaming applications renew HDFS > delegation tokens periodically and save them in > {{/.sparkStaging//}} directory on HDFS. > The path of the credentials file will written into checkpoint, and reloaded > as the *old applicationId* at application restarting, although the > application has changed to a new id. > This issue can be reproduced by restarting a checkpoint-enabled streaming > application on a kerberized cluster. > The application run well - but with thousands of > {{java.io.FileNotFoundException}} logged - and finally failed by token > expiration. > The log file is something like this: > {code:title=the_first_run.log} > 17/06/07 14:52:06 INFO executor.CoarseGrainedExecutorBackend: Will > periodically update credentials from: > hdfs://nameservice1/user/xxx/.sparkStaging/application_1496384469444_0035/credentials-19a7c11e-8c93-478c-ab0a-cdbfae5b2ae5 > 17/06/07 14:52:06 INFO security.CredentialUpdater: Scheduling credentials > refresh from HDFS in 92263 ms. > {code} > {code:title=after_restart.log} > 17/06/07 15:11:24 INFO executor.CoarseGrainedExecutorBackend: Will > periodically update credentials from: > hdfs://nameservice1/user/xxx/.sparkStaging/application_1496384469444_0035/credentials-19a7c11e-8c93-478c-ab0a-cdbfae5b2ae5 > ... > 17/06/07 15:12:24 WARN yarn.YarnSparkHadoopUtil: Error while attempting to > list files from application staging dir > java.io.FileNotFoundException: File > hdfs://nameservice1/user/xxx/.sparkStaging/application_1496384469444_0035 > does not exist. > at > org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:697) > at > org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:105) > at > org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:755) > at > org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:751) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:751) > at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1485) > at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1525) > at > org.apache.spark.deploy.SparkHadoopUtil.listFilesSorted(SparkHadoopUtil.scala:257) > at > org.apache.spark.deploy.yarn.security.CredentialUpdater.org$apache$spark$deploy$yarn$security$CredentialUpdater$$updateCredentialsIfRequired(CredentialUpdater.scala:72) > at > org.apache.spark.deploy.yarn.security.CredentialUpdater$$anon$1$$anonfun$run$1.apply$mcV$sp(CredentialUpdater.scala:53) > at > org.apache.spark.deploy.yarn.security.CredentialUpdater$$anon$1$$anonfun$run$1.apply(CredentialUpdater.scala:53) > at > org.apache.spark.deploy.yarn.security.CredentialUpdater$$anon$1$$anonfun$run$1.apply(CredentialUpdater.scala:53) > at > org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1962) > at > org.apache.spark.deploy.yarn.security.CredentialUpdater$$anon$1.run(CredentialUpdater.scala:53) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > Notice that the applicationId after restart is > application_1496384469444_{color:red}0036{color} but the application still > attempt to read credentials from 0035's directory. > Recently I used Spark 1.6 in my cluster, and tested this issue with Spark > 1.6.3 and 2.1.1. But it should affect all the versions from 1.5.x to current > master(2.3.x). -- This mess
[jira] [Commented] (SPARK-16784) Configurable log4j settings
[ https://issues.apache.org/jira/browse/SPARK-16784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16041856#comment-16041856 ] Irina Truong commented on SPARK-16784: -- In 2.1.0, setting "spark.driver.extraJavaOptions" to "-Dlog4j.configuration=file:/home/hadoop/log4j.properties" in SparkConfig seemed to work. In 2.1.1, it does not work anymore, but setting it via "--driver-java-options" still works. Is this a bug in 2.1.1? > Configurable log4j settings > --- > > Key: SPARK-16784 > URL: https://issues.apache.org/jira/browse/SPARK-16784 > Project: Spark > Issue Type: Improvement >Affects Versions: 2.0.0, 2.1.0 >Reporter: Michael Gummelt > > I often want to change the logging configuration on a single spark job. This > is easy in client mode. I just modify log4j.properties. It's difficult in > cluster mode, because I need to modify the log4j.properties in the > distribution in which the driver runs. I'd like a way of setting this > dynamically, such as a java system property. Some brief searching showed > that log4j doesn't seem to accept such a property, but I'd like to open up > this idea for further comment. Maybe we can find a solution. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19700) Design an API for pluggable scheduler implementations
[ https://issues.apache.org/jira/browse/SPARK-19700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16041803#comment-16041803 ] Andrew Ash commented on SPARK-19700: Found another potential implementation: Nomad by [~barnardb] at SPARK-20992 > Design an API for pluggable scheduler implementations > - > > Key: SPARK-19700 > URL: https://issues.apache.org/jira/browse/SPARK-19700 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Matt Cheah > > One point that was brought up in discussing SPARK-18278 was that schedulers > cannot easily be added to Spark without forking the whole project. The main > reason is that much of the scheduler's behavior fundamentally depends on the > CoarseGrainedSchedulerBackend class, which is not part of the public API of > Spark and is in fact quite a complex module. As resource management and > allocation continues evolves, Spark will need to be integrated with more > cluster managers, but maintaining support for all possible allocators in the > Spark project would be untenable. Furthermore, it would be impossible for > Spark to support proprietary frameworks that are developed by specific users > for their other particular use cases. > Therefore, this ticket proposes making scheduler implementations fully > pluggable. The idea is that Spark will provide a Java/Scala interface that is > to be implemented by a scheduler that is backed by the cluster manager of > interest. The user can compile their scheduler's code into a JAR that is > placed on the driver's classpath. Finally, as is the case in the current > world, the scheduler implementation is selected and dynamically loaded > depending on the user's provided master URL. > Determining the correct API is the most challenging problem. The current > CoarseGrainedSchedulerBackend handles many responsibilities, some of which > will be common across all cluster managers, and some which will be specific > to a particular cluster manager. For example, the particular mechanism for > creating the executor processes will differ between YARN and Mesos, but, once > these executors have started running, the means to submit tasks to them over > the Netty RPC is identical across the board. > We must also consider a plugin model and interface for submitting the > application as well, because different cluster managers support different > configuration options, and thus the driver must be bootstrapped accordingly. > For example, in YARN mode the application and Hadoop configuration must be > packaged and shipped to the distributed cache prior to launching the job. A > prototype of a Kubernetes implementation starts a Kubernetes pod that runs > the driver in cluster mode. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-12559) Cluster mode doesn't work with --packages
[ https://issues.apache.org/jira/browse/SPARK-12559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16041622#comment-16041622 ] Stavros Kontopoulos edited comment on SPARK-12559 at 6/7/17 9:15 PM: - Is anyone working on this? Some people may want to dynamically add deps having lighter jars when testing, also it is useful when you resolve deps from private repositories. I think people do use often the --packages option: https://stackoverflow.com/questions/36676395/how-to-resolve-external-packages-with-spark-shell-when-behind-a-corporate-proxy. https://github.com/databricks/spark-csv etc... was (Author: skonto): Is anyone working on this? Some people may want to dynamically add deps having lighter jars when testing, also it is useful when you resolve deps from private repositories check here: https://stackoverflow.com/questions/36676395/how-to-resolve-external-packages-with-spark-shell-when-behind-a-corporate-proxy. > Cluster mode doesn't work with --packages > - > > Key: SPARK-12559 > URL: https://issues.apache.org/jira/browse/SPARK-12559 > Project: Spark > Issue Type: Bug > Components: Spark Submit >Affects Versions: 1.3.0 >Reporter: Andrew Or > > From the mailing list: > {quote} > Another problem I ran into that you also might is that --packages doesn't > work with --deploy-mode cluster. It downloads the packages to a temporary > location on the node running spark-submit, then passes those paths to the > node that is running the Driver, but since that isn't the same machine, it > can't find anything and fails. The driver process *should* be the one > doing the downloading, but it isn't. I ended up having to create a fat JAR > with all of the dependencies to get around that one. > {quote} > The problem is that we currently don't upload jars to the cluster. It seems > to fix this we either (1) do upload jars, or (2) just run the packages code > on the driver side. I slightly prefer (2) because it's simpler. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12559) Cluster mode doesn't work with --packages
[ https://issues.apache.org/jira/browse/SPARK-12559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16041622#comment-16041622 ] Stavros Kontopoulos commented on SPARK-12559: - Is anyone working on this? Some people may want to dynamically add deps having lighter jars when testing, also it is useful when you resolve deps from private repositories check here: https://stackoverflow.com/questions/36676395/how-to-resolve-external-packages-with-spark-shell-when-behind-a-corporate-proxy. > Cluster mode doesn't work with --packages > - > > Key: SPARK-12559 > URL: https://issues.apache.org/jira/browse/SPARK-12559 > Project: Spark > Issue Type: Bug > Components: Spark Submit >Affects Versions: 1.3.0 >Reporter: Andrew Or > > From the mailing list: > {quote} > Another problem I ran into that you also might is that --packages doesn't > work with --deploy-mode cluster. It downloads the packages to a temporary > location on the node running spark-submit, then passes those paths to the > node that is running the Driver, but since that isn't the same machine, it > can't find anything and fails. The driver process *should* be the one > doing the downloading, but it isn't. I ended up having to create a fat JAR > with all of the dependencies to get around that one. > {quote} > The problem is that we currently don't upload jars to the cluster. It seems > to fix this we either (1) do upload jars, or (2) just run the packages code > on the driver side. I slightly prefer (2) because it's simpler. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2868) Support named accumulators in Python
[ https://issues.apache.org/jira/browse/SPARK-2868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16041557#comment-16041557 ] Kyle Heath commented on SPARK-2868: --- @[~holdenk]: I would love to better understand the scope of the work if you have time to sketch it out for me. > Support named accumulators in Python > > > Key: SPARK-2868 > URL: https://issues.apache.org/jira/browse/SPARK-2868 > Project: Spark > Issue Type: New Feature > Components: PySpark >Reporter: Patrick Wendell > > SPARK-2380 added this for Java/Scala. To allow this in Python we'll need to > make some additional changes. One potential path is to have a 1:1 > correspondence with Scala accumulators (instead of a one-to-many). A > challenge is exposing the stringified values of the accumulators to the Scala > code. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21011) RDD filter can combine/corrupt columns
[ https://issues.apache.org/jira/browse/SPARK-21011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Landes updated SPARK-21011: -- Description: I used PySpark to read in some CSV files (actually separated by backspace, might be relevant). The resulting dataframe.show() gives me good data - all my columns are there, everything's great. df = spark.read.option('delimiter', '\b').csv('') df.show() # all is good here Now, I want to filter this bad boy... but I want to use RDD's filters because they're just nicer to use. my_rdd = df.rdd my_rdd.take(5) #all my columns are still here filtered_rdd = my_rdd.filter() filtered_rdd.take(5) My filtered_rdd is missing a column. Specifically, _c2 has been mashed in to _c1. Here's a relevant record (anonymized) from the df.show(): |3 |Text Field |12345||150.00|UserName|2012-08-14 00:50:00|2015-02-24 01:23:45|2017-02-34 13:02:33|true|false| ...and the return from the filtered_rdd.take() Row(_c0=u'3', _c1=u'"Text Field"\x08"12345"', _c2=u'|', _c3=u'150.00', _c4=u'UserName', _c5=u'2012-08-14 00:50:00', _c6=u'2015-02-24 01:23:45', _c7=u'2017-02-34 13:02:33', _c8=u'true', _c9=u'false', _c10=None) Look at _c1 there - it's been mishmashed together with what was formerly _c2 (with an ascii backspace - \x08 - in there)... and poor old _c10 is left without a value. was: I used PySpark to read in some CSV files (actually separated by backspace, might be relevant). The resulting dataframe.show() gives me good data - all my columns are there, everything's great. df = spark.read.option('delimiter', '\b').csv('') df.show() # all is good here Now, I want to filter this bad boy... but I want to use RDD's filters because they're just nicer to use. my_rdd = df.rdd my_rdd.take(5) #all my columns are still here filtered_rdd = my_rdd.filter() filtered_rdd.take(5) My filtered_rdd is missing a column. Specifically, _c2 has been mashed in to _c1. Here's a relevant record (anonymized) from the df.show(): |3 |Text Field |12345||150.00|UserName|2012-08-14 00:50:00|2015-02-24 01:23:45|2017-02-34 13:02:33|true|false| ...and the return from the filtered_rdd.take() Row(_c0=u'3', _c1=u'"Text Field"\x08"12345"', _c2=u'|', _c3=u'150.00', _c4=u'UserName', _c5=u'2012-08-14 00:50:00', _c6=u'2015-02-24 01:23:45', _c7=u'2017-02-34 13:02:33', _c8=u'true', _c9=u'false', _c10=None) Look at _c1 there - it's been mishmashed together with what was formerly _c2... and poor old _c10 is left without a value. > RDD filter can combine/corrupt columns > -- > > Key: SPARK-21011 > URL: https://issues.apache.org/jira/browse/SPARK-21011 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.1.0 >Reporter: Steven Landes > > I used PySpark to read in some CSV files (actually separated by backspace, > might be relevant). The resulting dataframe.show() gives me good data - all > my columns are there, everything's great. > df = spark.read.option('delimiter', '\b').csv('') > df.show() # all is good here > Now, I want to filter this bad boy... but I want to use RDD's filters > because they're just nicer to use. > my_rdd = df.rdd > my_rdd.take(5) #all my columns are still here > filtered_rdd = my_rdd.filter() > filtered_rdd.take(5) > My filtered_rdd is missing a column. Specifically, _c2 has been mashed in to > _c1. > Here's a relevant record (anonymized) from the df.show(): > |3 |Text Field |12345| here>|150.00|UserName|2012-08-14 00:50:00|2015-02-24 01:23:45|2017-02-34 > 13:02:33|true|false| > ...and the return from the filtered_rdd.take() > Row(_c0=u'3', _c1=u'"Text Field"\x08"12345"', _c2=u'| mess here>', _c3=u'150.00', _c4=u'UserName', _c5=u'2012-08-14 00:50:00', > _c6=u'2015-02-24 01:23:45', _c7=u'2017-02-34 13:02:33', _c8=u'true', > _c9=u'false', _c10=None) > Look at _c1 there - it's been mishmashed together with what was formerly _c2 > (with an ascii backspace - \x08 - in there)... and poor old _c10 is left > without a value. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21011) RDD filter can combine/corrupt columns
Steven Landes created SPARK-21011: - Summary: RDD filter can combine/corrupt columns Key: SPARK-21011 URL: https://issues.apache.org/jira/browse/SPARK-21011 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 2.1.0 Reporter: Steven Landes I used PySpark to read in some CSV files (actually separated by backspace, might be relevant). The resulting dataframe.show() gives me good data - all my columns are there, everything's great. df = spark.read.option('delimiter', '\b').csv('') df.show() # all is good here Now, I want to filter this bad boy... but I want to use RDD's filters because they're just nicer to use. my_rdd = df.rdd my_rdd.take(5) #all my columns are still here filtered_rdd = my_rdd.filter() filtered_rdd.take(5) My filtered_rdd is missing a column. Specifically, _c2 has been mashed in to _c1. Here's a relevant record (anonymized) from the df.show(): |3 |Text Field |12345||150.00|UserName|2012-08-14 00:50:00|2015-02-24 01:23:45|2017-02-34 13:02:33|true|false| ...and the return from the filtered_rdd.take() Row(_c0=u'3', _c1=u'"Text Field"\x08"12345"', _c2=u'|', _c3=u'150.00', _c4=u'UserName', _c5=u'2012-08-14 00:50:00', _c6=u'2015-02-24 01:23:45', _c7=u'2017-02-34 13:02:33', _c8=u'true', _c9=u'false', _c10=None) Look at _c1 there - it's been mishmashed together with what was formerly _c2... and poor old _c10 is left without a value. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18085) Better History Server scalability for many / large applications
[ https://issues.apache.org/jira/browse/SPARK-18085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16041498#comment-16041498 ] Marcelo Vanzin commented on SPARK-18085: BTW for the last issue I had already filed SPARK-20656 for the approach I had in mind. > Better History Server scalability for many / large applications > --- > > Key: SPARK-18085 > URL: https://issues.apache.org/jira/browse/SPARK-18085 > Project: Spark > Issue Type: Umbrella > Components: Spark Core, Web UI >Affects Versions: 2.0.0 >Reporter: Marcelo Vanzin > Attachments: spark_hs_next_gen.pdf > > > It's a known fact that the History Server currently has some annoying issues > when serving lots of applications, and when serving large applications. > I'm filing this umbrella to track work related to addressing those issues. > I'll be attaching a document shortly describing the issues and suggesting a > path to how to solve them. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-17237) DataFrame fill after pivot causing org.apache.spark.sql.AnalysisException
[ https://issues.apache.org/jira/browse/SPARK-17237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16041434#comment-16041434 ] Takeshi Yamamuro edited comment on SPARK-17237 at 6/7/17 7:04 PM: -- Thanks for the report. I think there are two points you suggestedt: a qualifier and buck-ticks. Yea, you're right and it seems my pr above wrongly drop a qualifier for aggregated column names(then, it changed the behaviour). {code} // Spark-v2.1 scala> Seq((1, 2)).toDF("id", "v1").createOrReplaceTempView("s") scala> Seq((1, 2)).toDF("id", "v2").createOrReplaceTempView("t") scala> val df1 = sql("SELECT * FROM s") df1: org.apache.spark.sql.DataFrame = [id: int, v1: int] scala> val df2 = sql("SELECT * FROM t") df2: org.apache.spark.sql.DataFrame = [id: int, v2: int] scala> df1.join(df2, "id" :: Nil).groupBy("id").pivot("id").max("v1", "v2").show +---+-+-+ | id|1_max(s.`v1`)|1_max(t.`v2`)| +---+-+-+ | 1|2|2| +---+-+-+ // Master scala> df1.join(df2, "id" :: Nil).groupBy("id").pivot("id").max("v1", "v2").show +---+-+-+ | id|1_max(v1)|1_max(v2)| +---+-+-+ | 1|2|2| +---+-+-+ {code} We could easily fix this, but I'm not 100% sure that we need to fix this. WDYT? cc: [~smilegator] {code} // Master with a patch (https://github.com/apache/spark/compare/master...maropu:SPARK-17237-4) scala> df1.join(df2, "id" :: Nil).groupBy("id").pivot("id").max("v1", "v2").show +---+---+---+ | id|1_max(s.v1)|1_max(t.v2)| +---+---+---+ | 1| 2| 2| +---+---+---+ {code} On the other hand, IIUC back-ticks are not allowed in column names cuz they have special meaning in Spark. was (Author: maropu): Thanks for the report. I think there are two points you pointed out: a qualifier and buck-ticks. Yea, you're right and it seems my pr above wrongly drop a qualifier for aggregated column names(then, it changed the behaviour). {code} // Spark-v2.1 scala> Seq((1, 2)).toDF("id", "v1").createOrReplaceTempView("s") scala> Seq((1, 2)).toDF("id", "v2").createOrReplaceTempView("t") scala> val df1 = sql("SELECT * FROM s") df1: org.apache.spark.sql.DataFrame = [id: int, v1: int] scala> val df2 = sql("SELECT * FROM t") df2: org.apache.spark.sql.DataFrame = [id: int, v2: int] scala> df1.join(df2, "id" :: Nil).groupBy("id").pivot("id").max("v1", "v2").show +---+-+-+ | id|1_max(s.`v1`)|1_max(t.`v2`)| +---+-+-+ | 1|2|2| +---+-+-+ // Master scala> df1.join(df2, "id" :: Nil).groupBy("id").pivot("id").max("v1", "v2").show +---+-+-+ | id|1_max(v1)|1_max(v2)| +---+-+-+ | 1|2|2| +---+-+-+ {code} We could easily fix this, but I'm not 100% sure that we need to fix this. WDYT? cc: [~smilegator] {code} // Master with a patch (https://github.com/apache/spark/compare/master...maropu:SPARK-17237-4) scala> df1.join(df2, "id" :: Nil).groupBy("id").pivot("id").max("v1", "v2").show +---+---+---+ | id|1_max(s.v1)|1_max(t.v2)| +---+---+---+ | 1| 2| 2| +---+---+---+ {code} On the other hand, IIUC back-ticks are not allowed in column names cuz they have special meaning in Spark. > DataFrame fill after pivot causing org.apache.spark.sql.AnalysisException > - > > Key: SPARK-17237 > URL: https://issues.apache.org/jira/browse/SPARK-17237 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 >Reporter: Jiang Qiqi >Assignee: Takeshi Yamamuro > Labels: newbie > Fix For: 2.0.3, 2.1.1, 2.2.0 > > > I am trying to run a pivot transformation which I ran on a spark1.6 cluster, > namely > sc.parallelize(Seq((2,3,4), (3,4,5))).toDF("a", "b", "c") > res1: org.apache.spark.sql.DataFrame = [a: int, b: int, c: int] > scala> res1.groupBy("a").pivot("b").agg(count("c"), avg("c")).na.fill(0) > res2: org.apache.spark.sql.DataFrame = [a: int, 3_count(c): bigint, 3_avg(c): > double, 4_count(c): bigint, 4_avg(c): double] > scala> res1.groupBy("a").pivot("b").agg(count("c"), avg("c")).na.fill(0).show > +---+--++--++ > | a|3_count(c)|3_avg(c)|4_count(c)|4_avg(c)| > +---+
[jira] [Commented] (SPARK-17237) DataFrame fill after pivot causing org.apache.spark.sql.AnalysisException
[ https://issues.apache.org/jira/browse/SPARK-17237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16041434#comment-16041434 ] Takeshi Yamamuro commented on SPARK-17237: -- Thanks for the report. I think there are two points you pointed out: a qualifier and buck-ticks. Yea, you're right and it seems my pr above wrongly drop a qualifier for aggregated column names(then, it changed the behaviour). {code} // Spark-v2.1 scala> Seq((1, 2)).toDF("id", "v1").createOrReplaceTempView("s") scala> Seq((1, 2)).toDF("id", "v2").createOrReplaceTempView("t") scala> val df1 = sql("SELECT * FROM s") df1: org.apache.spark.sql.DataFrame = [id: int, v1: int] scala> val df2 = sql("SELECT * FROM t") df2: org.apache.spark.sql.DataFrame = [id: int, v2: int] scala> df1.join(df2, "id" :: Nil).groupBy("id").pivot("id").max("v1", "v2").show +---+-+-+ | id|1_max(s.`v1`)|1_max(t.`v2`)| +---+-+-+ | 1|2|2| +---+-+-+ // Master scala> df1.join(df2, "id" :: Nil).groupBy("id").pivot("id").max("v1", "v2").show +---+-+-+ | id|1_max(v1)|1_max(v2)| +---+-+-+ | 1|2|2| +---+-+-+ {code} We could easily fix this, but I'm not 100% sure that we need to fix this. WDYT? cc: [~smilegator] {code} // Master with a patch (https://github.com/apache/spark/compare/master...maropu:SPARK-17237-4) scala> df1.join(df2, "id" :: Nil).groupBy("id").pivot("id").max("v1", "v2").show +---+---+---+ | id|1_max(s.v1)|1_max(t.v2)| +---+---+---+ | 1| 2| 2| +---+---+---+ {code} On the other hand, IIUC back-ticks are not allowed in column names cuz they have special meaning in Spark. > DataFrame fill after pivot causing org.apache.spark.sql.AnalysisException > - > > Key: SPARK-17237 > URL: https://issues.apache.org/jira/browse/SPARK-17237 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 >Reporter: Jiang Qiqi >Assignee: Takeshi Yamamuro > Labels: newbie > Fix For: 2.0.3, 2.1.1, 2.2.0 > > > I am trying to run a pivot transformation which I ran on a spark1.6 cluster, > namely > sc.parallelize(Seq((2,3,4), (3,4,5))).toDF("a", "b", "c") > res1: org.apache.spark.sql.DataFrame = [a: int, b: int, c: int] > scala> res1.groupBy("a").pivot("b").agg(count("c"), avg("c")).na.fill(0) > res2: org.apache.spark.sql.DataFrame = [a: int, 3_count(c): bigint, 3_avg(c): > double, 4_count(c): bigint, 4_avg(c): double] > scala> res1.groupBy("a").pivot("b").agg(count("c"), avg("c")).na.fill(0).show > +---+--++--++ > | a|3_count(c)|3_avg(c)|4_count(c)|4_avg(c)| > +---+--++--++ > | 2| 1| 4.0| 0| 0.0| > | 3| 0| 0.0| 1| 5.0| > +---+--++--++ > after upgrade the environment to spark2.0, got an error while executing > .na.fill method > scala> sc.parallelize(Seq((2,3,4), (3,4,5))).toDF("a", "b", "c") > res3: org.apache.spark.sql.DataFrame = [a: int, b: int ... 1 more field] > scala> res3.groupBy("a").pivot("b").agg(count("c"), avg("c")).na.fill(0) > org.apache.spark.sql.AnalysisException: syntax error in attribute name: > `3_count(`c`)`; > at > org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$.e$1(unresolved.scala:103) > at > org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$.parseAttributeName(unresolved.scala:113) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveQuoted(LogicalPlan.scala:168) > at org.apache.spark.sql.Dataset.resolve(Dataset.scala:218) > at org.apache.spark.sql.Dataset.col(Dataset.scala:921) > at > org.apache.spark.sql.DataFrameNaFunctions.org$apache$spark$sql$DataFrameNaFunctions$$fillCol(DataFrameNaFunctions.scala:411) > at > org.apache.spark.sql.DataFrameNaFunctions$$anonfun$2.apply(DataFrameNaFunctions.scala:162) > at > org.apache.spark.sql.DataFrameNaFunctions$$anonfun$2.apply(DataFrameNaFunctions.scala:159) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.s
[jira] [Resolved] (SPARK-19800) Implement one kind of streaming sampling - reservoir sampling
[ https://issues.apache.org/jira/browse/SPARK-19800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-19800. --- Resolution: Won't Fix > Implement one kind of streaming sampling - reservoir sampling > - > > Key: SPARK-19800 > URL: https://issues.apache.org/jira/browse/SPARK-19800 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.0.2, 2.1.0 >Reporter: Genmao Yu > -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19185) ConcurrentModificationExceptions with CachedKafkaConsumers when Windowing
[ https://issues.apache.org/jira/browse/SPARK-19185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16041271#comment-16041271 ] Apache Spark commented on SPARK-19185: -- User 'markgrover' has created a pull request for this issue: https://github.com/apache/spark/pull/18234 > ConcurrentModificationExceptions with CachedKafkaConsumers when Windowing > - > > Key: SPARK-19185 > URL: https://issues.apache.org/jira/browse/SPARK-19185 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.0.2 > Environment: Spark 2.0.2 > Spark Streaming Kafka 010 > Mesos 0.28.0 - client mode > spark.executor.cores 1 > spark.mesos.extra.cores 1 >Reporter: Kalvin Chau > Labels: streaming, windowing > > We've been running into ConcurrentModificationExcpetions "KafkaConsumer is > not safe for multi-threaded access" with the CachedKafkaConsumer. I've been > working through debugging this issue and after looking through some of the > spark source code I think this is a bug. > Our set up is: > Spark 2.0.2, running in Mesos 0.28.0-2 in client mode, using > Spark-Streaming-Kafka-010 > spark.executor.cores 1 > spark.mesos.extra.cores 1 > Batch interval: 10s, window interval: 180s, and slide interval: 30s > We would see the exception when in one executor there are two task worker > threads assigned the same Topic+Partition, but a different set of offsets. > They would both get the same CachedKafkaConsumer, and whichever task thread > went first would seek and poll for all the records, and at the same time the > second thread would try to seek to its offset but fail because it is unable > to acquire the lock. > Time0 E0 Task0 - TopicPartition("abc", 0) X to Y > Time0 E0 Task1 - TopicPartition("abc", 0) Y to Z > Time1 E0 Task0 - Seeks and starts to poll > Time1 E0 Task1 - Attempts to seek, but fails > Here are some relevant logs: > {code} > 17/01/06 03:10:01 Executor task launch worker-1 INFO KafkaRDD: Computing > topic test-topic, partition 2 offsets 4394204414 -> 4394238058 > 17/01/06 03:10:01 Executor task launch worker-0 INFO KafkaRDD: Computing > topic test-topic, partition 2 offsets 4394238058 -> 4394257712 > 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer: > Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested > 4394204414 > 17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer: > Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested > 4394238058 > 17/01/06 03:10:01 Executor task launch worker-0 INFO CachedKafkaConsumer: > Initial fetch for spark-executor-consumer test-topic 2 4394238058 > 17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer: > Seeking to test-topic-2 4394238058 > 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Putting > block rdd_199_2 failed due to an exception > 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Block > rdd_199_2 could not be removed as it was not found on disk or in memory > 17/01/06 03:10:01 Executor task launch worker-0 ERROR Executor: Exception in > task 49.0 in stage 45.0 (TID 3201) > java.util.ConcurrentModificationException: KafkaConsumer is not safe for > multi-threaded access > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431) > at > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132) > at > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95) > at > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:360) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:951) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866) > at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(
[jira] [Assigned] (SPARK-20342) DAGScheduler sends SparkListenerTaskEnd before updating task's accumulators
[ https://issues.apache.org/jira/browse/SPARK-20342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-20342: Assignee: Apache Spark > DAGScheduler sends SparkListenerTaskEnd before updating task's accumulators > --- > > Key: SPARK-20342 > URL: https://issues.apache.org/jira/browse/SPARK-20342 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: Marcelo Vanzin >Assignee: Apache Spark > > Hit this on 2.2, but probably has been there forever. This is similar in > spirit to SPARK-20205. > Event is sent here, around L1154: > {code} > listenerBus.post(SparkListenerTaskEnd( >stageId, task.stageAttemptId, taskType, event.reason, event.taskInfo, > taskMetrics)) > {code} > Accumulators are updated later, around L1173: > {code} > val stage = stageIdToStage(task.stageId) > event.reason match { > case Success => > task match { > case rt: ResultTask[_, _] => > // Cast to ResultStage here because it's part of the ResultTask > // TODO Refactor this out to a function that accepts a ResultStage > val resultStage = stage.asInstanceOf[ResultStage] > resultStage.activeJob match { > case Some(job) => > if (!job.finished(rt.outputId)) { > updateAccumulators(event) > {code} > Same thing applies here; UI shows correct info because it's pointing at the > mutable {{TaskInfo}} structure. But the event log, for example, may record > the wrong information. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-20342) DAGScheduler sends SparkListenerTaskEnd before updating task's accumulators
[ https://issues.apache.org/jira/browse/SPARK-20342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-20342: Assignee: (was: Apache Spark) > DAGScheduler sends SparkListenerTaskEnd before updating task's accumulators > --- > > Key: SPARK-20342 > URL: https://issues.apache.org/jira/browse/SPARK-20342 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: Marcelo Vanzin > > Hit this on 2.2, but probably has been there forever. This is similar in > spirit to SPARK-20205. > Event is sent here, around L1154: > {code} > listenerBus.post(SparkListenerTaskEnd( >stageId, task.stageAttemptId, taskType, event.reason, event.taskInfo, > taskMetrics)) > {code} > Accumulators are updated later, around L1173: > {code} > val stage = stageIdToStage(task.stageId) > event.reason match { > case Success => > task match { > case rt: ResultTask[_, _] => > // Cast to ResultStage here because it's part of the ResultTask > // TODO Refactor this out to a function that accepts a ResultStage > val resultStage = stage.asInstanceOf[ResultStage] > resultStage.activeJob match { > case Some(job) => > if (!job.finished(rt.outputId)) { > updateAccumulators(event) > {code} > Same thing applies here; UI shows correct info because it's pointing at the > mutable {{TaskInfo}} structure. But the event log, for example, may record > the wrong information. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20342) DAGScheduler sends SparkListenerTaskEnd before updating task's accumulators
[ https://issues.apache.org/jira/browse/SPARK-20342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16041256#comment-16041256 ] Apache Spark commented on SPARK-20342: -- User 'vanzin' has created a pull request for this issue: https://github.com/apache/spark/pull/18233 > DAGScheduler sends SparkListenerTaskEnd before updating task's accumulators > --- > > Key: SPARK-20342 > URL: https://issues.apache.org/jira/browse/SPARK-20342 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: Marcelo Vanzin > > Hit this on 2.2, but probably has been there forever. This is similar in > spirit to SPARK-20205. > Event is sent here, around L1154: > {code} > listenerBus.post(SparkListenerTaskEnd( >stageId, task.stageAttemptId, taskType, event.reason, event.taskInfo, > taskMetrics)) > {code} > Accumulators are updated later, around L1173: > {code} > val stage = stageIdToStage(task.stageId) > event.reason match { > case Success => > task match { > case rt: ResultTask[_, _] => > // Cast to ResultStage here because it's part of the ResultTask > // TODO Refactor this out to a function that accepts a ResultStage > val resultStage = stage.asInstanceOf[ResultStage] > resultStage.activeJob match { > case Some(job) => > if (!job.finished(rt.outputId)) { > updateAccumulators(event) > {code} > Same thing applies here; UI shows correct info because it's pointing at the > mutable {{TaskInfo}} structure. But the event log, for example, may record > the wrong information. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20342) DAGScheduler sends SparkListenerTaskEnd before updating task's accumulators
[ https://issues.apache.org/jira/browse/SPARK-20342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16041198#comment-16041198 ] Marcelo Vanzin commented on SPARK-20342: I have a fix for this, might as well use a variant of your test code for it... > DAGScheduler sends SparkListenerTaskEnd before updating task's accumulators > --- > > Key: SPARK-20342 > URL: https://issues.apache.org/jira/browse/SPARK-20342 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: Marcelo Vanzin > > Hit this on 2.2, but probably has been there forever. This is similar in > spirit to SPARK-20205. > Event is sent here, around L1154: > {code} > listenerBus.post(SparkListenerTaskEnd( >stageId, task.stageAttemptId, taskType, event.reason, event.taskInfo, > taskMetrics)) > {code} > Accumulators are updated later, around L1173: > {code} > val stage = stageIdToStage(task.stageId) > event.reason match { > case Success => > task match { > case rt: ResultTask[_, _] => > // Cast to ResultStage here because it's part of the ResultTask > // TODO Refactor this out to a function that accepts a ResultStage > val resultStage = stage.asInstanceOf[ResultStage] > resultStage.activeJob match { > case Some(job) => > if (!job.finished(rt.outputId)) { > updateAccumulators(event) > {code} > Same thing applies here; UI shows correct info because it's pointing at the > mutable {{TaskInfo}} structure. But the event log, for example, may record > the wrong information. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18085) Better History Server scalability for many / large applications
[ https://issues.apache.org/jira/browse/SPARK-18085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16041153#comment-16041153 ] Marcelo Vanzin commented on SPARK-18085: bq. the procedure of loading the history summary page is still a little long That's only the first time. That's tracked by SPARK-6951 and is somewhat orthogonal to this work, although I have ideas about how to make it better. > Better History Server scalability for many / large applications > --- > > Key: SPARK-18085 > URL: https://issues.apache.org/jira/browse/SPARK-18085 > Project: Spark > Issue Type: Umbrella > Components: Spark Core, Web UI >Affects Versions: 2.0.0 >Reporter: Marcelo Vanzin > Attachments: spark_hs_next_gen.pdf > > > It's a known fact that the History Server currently has some annoying issues > when serving lots of applications, and when serving large applications. > I'm filing this umbrella to track work related to addressing those issues. > I'll be attaching a document shortly describing the issues and suggesting a > path to how to solve them. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-21009) SparkListenerTaskEnd.taskInfo.accumulables might not be accurate
[ https://issues.apache.org/jira/browse/SPARK-21009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bogdan Raducanu resolved SPARK-21009. - Resolution: Duplicate > SparkListenerTaskEnd.taskInfo.accumulables might not be accurate > > > Key: SPARK-21009 > URL: https://issues.apache.org/jira/browse/SPARK-21009 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: Bogdan Raducanu > > The following code reproduces it: > {code} > test("test") { > val foundMetrics = mutable.Set.empty[String] > spark.sparkContext.addSparkListener(new SparkListener { > override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { > taskEnd.taskInfo.accumulables.foreach { a => > if (a.name.isDefined) { > foundMetrics.add(a.name.get) > } > } > } > }) > for (iter <- 0 until 100) { > foundMetrics.clear() > println(s"iter = $iter") > spark.range(10).groupBy().agg("id" -> "sum").collect > spark.sparkContext.listenerBus.waitUntilEmpty(3000) > assert(foundMetrics.size > 0) > } > } > {code} > The problem comes from DAGScheduler.handleTaskCompletion. > The SparkListenerTaskEnd event is sent before updateAccumulators is called, > so it might not be up to date. > The code there looks like it needs refactoring. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21009) SparkListenerTaskEnd.taskInfo.accumulables might not be accurate
[ https://issues.apache.org/jira/browse/SPARK-21009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16041055#comment-16041055 ] Bogdan Raducanu commented on SPARK-21009: - Yes, looks like duplicate. I posted the repro code in that one. I'll close this one. > SparkListenerTaskEnd.taskInfo.accumulables might not be accurate > > > Key: SPARK-21009 > URL: https://issues.apache.org/jira/browse/SPARK-21009 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: Bogdan Raducanu > > The following code reproduces it: > {code} > test("test") { > val foundMetrics = mutable.Set.empty[String] > spark.sparkContext.addSparkListener(new SparkListener { > override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { > taskEnd.taskInfo.accumulables.foreach { a => > if (a.name.isDefined) { > foundMetrics.add(a.name.get) > } > } > } > }) > for (iter <- 0 until 100) { > foundMetrics.clear() > println(s"iter = $iter") > spark.range(10).groupBy().agg("id" -> "sum").collect > spark.sparkContext.listenerBus.waitUntilEmpty(3000) > assert(foundMetrics.size > 0) > } > } > {code} > The problem comes from DAGScheduler.handleTaskCompletion. > The SparkListenerTaskEnd event is sent before updateAccumulators is called, > so it might not be up to date. > The code there looks like it needs refactoring. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20342) DAGScheduler sends SparkListenerTaskEnd before updating task's accumulators
[ https://issues.apache.org/jira/browse/SPARK-20342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16041054#comment-16041054 ] Bogdan Raducanu commented on SPARK-20342: - This code fails because of this issue: {code} test("test") { val foundMetrics = mutable.Set.empty[String] spark.sparkContext.addSparkListener(new SparkListener { override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { taskEnd.taskInfo.accumulables.foreach { a => if (a.name.isDefined) { foundMetrics.add(a.name.get) } } } }) for (iter <- 0 until 100) { foundMetrics.clear() println(s"iter = $iter") spark.range(10).groupBy().agg("id" -> "sum").collect spark.sparkContext.listenerBus.waitUntilEmpty(3000) assert(foundMetrics.size > 0) } } {code} > DAGScheduler sends SparkListenerTaskEnd before updating task's accumulators > --- > > Key: SPARK-20342 > URL: https://issues.apache.org/jira/browse/SPARK-20342 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: Marcelo Vanzin > > Hit this on 2.2, but probably has been there forever. This is similar in > spirit to SPARK-20205. > Event is sent here, around L1154: > {code} > listenerBus.post(SparkListenerTaskEnd( >stageId, task.stageAttemptId, taskType, event.reason, event.taskInfo, > taskMetrics)) > {code} > Accumulators are updated later, around L1173: > {code} > val stage = stageIdToStage(task.stageId) > event.reason match { > case Success => > task match { > case rt: ResultTask[_, _] => > // Cast to ResultStage here because it's part of the ResultTask > // TODO Refactor this out to a function that accepts a ResultStage > val resultStage = stage.asInstanceOf[ResultStage] > resultStage.activeJob match { > case Some(job) => > if (!job.finished(rt.outputId)) { > updateAccumulators(event) > {code} > Same thing applies here; UI shows correct info because it's pointing at the > mutable {{TaskInfo}} structure. But the event log, for example, may record > the wrong information. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19360) Spark 2.X does not support stored by clause
[ https://issues.apache.org/jira/browse/SPARK-19360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Artur Sukhenko updated SPARK-19360: --- Summary: Spark 2.X does not support stored by clause (was: Spark 2.X does not support stored by cluase) > Spark 2.X does not support stored by clause > --- > > Key: SPARK-19360 > URL: https://issues.apache.org/jira/browse/SPARK-19360 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0 >Reporter: Ran Haim >Priority: Minor > > Spark 1.6 and below versions support HiveContext which supports Hive storage > handler with "stored by" clause. However, Spark 2.x does not support "stored > by". -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19307) SPARK-17387 caused ignorance of conf object passed to SparkContext:
[ https://issues.apache.org/jira/browse/SPARK-19307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16041040#comment-16041040 ] Irina Truong commented on SPARK-19307: -- Is this available in 2.1.1? I could not find it in release notes. > SPARK-17387 caused ignorance of conf object passed to SparkContext: > --- > > Key: SPARK-19307 > URL: https://issues.apache.org/jira/browse/SPARK-19307 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.1.0 >Reporter: yuriy_hupalo >Assignee: Marcelo Vanzin > Attachments: SPARK-19307.patch > > > after patch SPARK-17387 was applied -- Sparkconf object is ignored when > launching SparkContext programmatically via python from spark-submit: > https://github.com/apache/spark/blob/master/python/pyspark/context.py#L128: > in case when we are running python SparkContext(conf=xxx) from spark-submit: > conf is set, conf._jconf is None () > passed as arg conf object is ignored (and used only when we are > launching java_gateway). > how to fix: > python/pyspark/context.py:132 > {code:title=python/pyspark/context.py:132} > if conf is not None and conf._jconf is not None: > # conf has been initialized in JVM properly, so use conf > directly. This represent the > # scenario that JVM has been launched before SparkConf is created > (e.g. SparkContext is > # created and then stopped, and we create a new SparkConf and new > SparkContext again) > self._conf = conf > else: > self._conf = SparkConf(_jvm=SparkContext._jvm) > + if conf: > + for key, value in conf.getAll(): > + self._conf.set(key,value) > + print(key,value) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21009) SparkListenerTaskEnd.taskInfo.accumulables might not be accurate
[ https://issues.apache.org/jira/browse/SPARK-21009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16041039#comment-16041039 ] Yuming Wang commented on SPARK-21009: - This could be duplicate of [SPARK-20342|https://issues.apache.org/jira/browse/SPARK-20342]. > SparkListenerTaskEnd.taskInfo.accumulables might not be accurate > > > Key: SPARK-21009 > URL: https://issues.apache.org/jira/browse/SPARK-21009 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: Bogdan Raducanu > > The following code reproduces it: > {code} > test("test") { > val foundMetrics = mutable.Set.empty[String] > spark.sparkContext.addSparkListener(new SparkListener { > override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { > taskEnd.taskInfo.accumulables.foreach { a => > if (a.name.isDefined) { > foundMetrics.add(a.name.get) > } > } > } > }) > for (iter <- 0 until 100) { > foundMetrics.clear() > println(s"iter = $iter") > spark.range(10).groupBy().agg("id" -> "sum").collect > spark.sparkContext.listenerBus.waitUntilEmpty(3000) > assert(foundMetrics.size > 0) > } > } > {code} > The problem comes from DAGScheduler.handleTaskCompletion. > The SparkListenerTaskEnd event is sent before updateAccumulators is called, > so it might not be up to date. > The code there looks like it needs refactoring. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20760) Memory Leak of RDD blocks
[ https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16041027#comment-16041027 ] Jose Soltren commented on SPARK-20760: -- Hi - I've been researching this issue, and I agree with David above. I'm nearly certain SPARK-18991 fixes this issue. I ran the application until it was killed with OOM, inspected the heap dump, and saw that the ConcurrentLinkedQueue in ContextCleaner was using several GB of RAM. This suggested to me that RDDs were being persisted faster than they could be unpersisted. I wouldn't call this a memory leak so much as a bottleneck. I'm running this again with SPARK-18991 and will have a look at a heap dump in a bit. Binzi, the Spark UI's reporting of RDD Blocks is misleading. This reported number is non-decreasing, and does not accurately reflect the number of RDDs that are currently active, only how many have been created. See executorspage.js. This is eventually backed by numBlocks in StorageUtils.scala, which, AFAICT, can never be negative, so this quantity in the UI is strictly monotonically increasing. I don't think you can use UI reporting of RDD Blocks as an indicator of correct behavior. Bounded memory usage and not being killed by YARN or by an OOM is a better way to validate this. It would be good if there were a unit test for SPARK-18991 validating that RDD blocks cannot be created faster than they can be unpersisted. > Memory Leak of RDD blocks > -- > > Key: SPARK-20760 > URL: https://issues.apache.org/jira/browse/SPARK-20760 > Project: Spark > Issue Type: Bug > Components: Block Manager >Affects Versions: 2.1.0 > Environment: Spark 2.1.0 >Reporter: Binzi Cao > Attachments: RDD blocks in spark 2.1.1.png, RDD Blocks .png, Storage > in spark 2.1.1.png > > > Memory leak for RDD blocks for a long time running rdd process. > We have a long term running application, which is doing computations of > RDDs. and we found the RDD blocks are keep increasing in the spark ui page. > The rdd blocks and memory usage do not mach the cached rdds and memory. It > looks like spark keeps old rdd in memory and never released it or never got a > chance to release it. The job will eventually die of out of memory. > In addition, I'm not seeing this issue in spark 1.6. We are seeing the same > issue in Yarn Cluster mode both in kafka streaming and batch applications. > The issue in streaming is similar, however, it seems the rdd blocks grows a > bit slower than batch jobs. > The below is the sample code and it is reproducible by justing running it in > local mode. > Scala file: > {code} > import scala.concurrent.duration.Duration > import scala.util.{Try, Failure, Success} > import org.apache.spark.SparkConf > import org.apache.spark.SparkContext > import org.apache.spark.rdd.RDD > import scala.concurrent._ > import ExecutionContext.Implicits.global > case class Person(id: String, name: String) > object RDDApp { > def run(sc: SparkContext) = { > while (true) { > val r = scala.util.Random > val data = (1 to r.nextInt(100)).toList.map { a => > Person(a.toString, a.toString) > } > val rdd = sc.parallelize(data) > rdd.cache > println("running") > val a = (1 to 100).toList.map { x => > Future(rdd.filter(_.id == x.toString).collect) > } > a.foreach { f => > println(Await.ready(f, Duration.Inf).value.get) > } > rdd.unpersist() > } > } > def main(args: Array[String]): Unit = { >val conf = new SparkConf().setAppName("test") > val sc = new SparkContext(conf) > run(sc) > } > } > {code} > build sbt file: > {code} > name := "RDDTest" > version := "0.1.1" > scalaVersion := "2.11.5" > libraryDependencies ++= Seq ( > "org.scalaz" %% "scalaz-core" % "7.2.0", > "org.scalaz" %% "scalaz-concurrent" % "7.2.0", > "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided", > "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided" > ) > addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1") > mainClass in assembly := Some("RDDApp") > test in assembly := {} > {code} > To reproduce it: > Just > {code} > spark-2.1.0-bin-hadoop2.7/bin/spark-submit --driver-memory 4G \ > --executor-memory 4G \ > --executor-cores 1 \ > --num-executors 1 \ > --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-21010) Spark-Sql Can't Handle char() type Well
[ https://issues.apache.org/jira/browse/SPARK-21010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-21010. --- Resolution: Not A Problem Fix Version/s: (was: 2.1.1) Target Version/s: (was: 2.1.1) > Spark-Sql Can't Handle char() type Well > > > Key: SPARK-21010 > URL: https://issues.apache.org/jira/browse/SPARK-21010 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.1, 2.1.0, 2.1.1 > Environment: spark1.6.1 hadoop-2.6.0-cdh5.4.2 >Reporter: fengchaoge > > we create table in spark-sql like this : > 1. create table cid_test (name string,id char(20)) ROW FORMAT DELIMITED > FIELDS TERMINATED BY ' ' stored as textfile; > 2. LOAD DATA LOCAL INPATH '/home/hadoop/id.txt' OVERWRITE INTO TABLE > cid_test; > content for id.txt: > fengchaoge 41302219990808 > 3. select * from cid_test where id='41302219990808'; > 4. select * from cid_test where id='41302219990808 '; > In third step,we got nothing ,but in four step we got the right ring. we must > add two spaces in id if we want the right value. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21010) Spark-Sql Can't Handle char() type Well
[ https://issues.apache.org/jira/browse/SPARK-21010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16041021#comment-16041021 ] Yuming Wang commented on SPARK-21010: - The problem is that you define {{char(20)}}, {{legnth('41302219990808')}} is 18, auto append 2 space, you can try {{varchar}}. > Spark-Sql Can't Handle char() type Well > > > Key: SPARK-21010 > URL: https://issues.apache.org/jira/browse/SPARK-21010 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.1, 2.1.0, 2.1.1 > Environment: spark1.6.1 hadoop-2.6.0-cdh5.4.2 >Reporter: fengchaoge > Fix For: 2.1.1 > > > we create table in spark-sql like this : > 1. create table cid_test (name string,id char(20)) ROW FORMAT DELIMITED > FIELDS TERMINATED BY ' ' stored as textfile; > 2. LOAD DATA LOCAL INPATH '/home/hadoop/id.txt' OVERWRITE INTO TABLE > cid_test; > content for id.txt: > fengchaoge 41302219990808 > 3. select * from cid_test where id='41302219990808'; > 4. select * from cid_test where id='41302219990808 '; > In third step,we got nothing ,but in four step we got the right ring. we must > add two spaces in id if we want the right value. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21010) Spark-Sql Can't Handle char() type Well
fengchaoge created SPARK-21010: -- Summary: Spark-Sql Can't Handle char() type Well Key: SPARK-21010 URL: https://issues.apache.org/jira/browse/SPARK-21010 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.1.1, 2.1.0, 1.6.1 Environment: spark1.6.1 hadoop-2.6.0-cdh5.4.2 Reporter: fengchaoge Fix For: 2.1.1 we create table in spark-sql like this : 1. create table cid_test (name string,id char(20)) ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' stored as textfile; 2. LOAD DATA LOCAL INPATH '/home/hadoop/id.txt' OVERWRITE INTO TABLE cid_test; content for id.txt: fengchaoge 41302219990808 3. select * from cid_test where id='41302219990808'; 4. select * from cid_test where id='41302219990808 '; In third step,we got nothing ,but in four step we got the right ring. we must add two spaces in id if we want the right value. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-20998) BroadcastHashJoin producing wrong results
[ https://issues.apache.org/jira/browse/SPARK-20998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16040687#comment-16040687 ] Mohit edited comment on SPARK-20998 at 6/7/17 12:34 PM: [~viirya] Attached an orc file which is giving this error. I used simply val df = spark.read.orc("file:///mnt/mohit/data/*") to read it. was (Author: mohitgargk): [~viirya] Attached an orc file which is giving this error. I used simply val df = spark.read.orc("file:///mnt/mohit/data") to read it. > BroadcastHashJoin producing wrong results > - > > Key: SPARK-20998 > URL: https://issues.apache.org/jira/browse/SPARK-20998 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 >Reporter: Mohit > Attachments: > part-r-0-e071fc92-9f0f-4ac9-acd6-75fe74d8b175.snappy.orc > > > I have a hive table : _DistributionAttributes_, with > *Schema*: > root > |-- distributionstatus: string (nullable = true) > |-- enabledforselectionflag: boolean (nullable = true) > |-- sourcedistributionid: integer (nullable = true) > |-- rowstartdate: date (nullable = true) > |-- rowenddate: date (nullable = true) > |-- rowiscurrent: string (nullable = true) > |-- dwcreatedate: timestamp (nullable = true) > |-- dwlastupdatedate: timestamp (nullable = true) > |-- appid: integer (nullable = true) > |-- siteid: integer (nullable = true) > |-- brandid: integer (nullable = true) > *DataFrame* > val df = spark.sql("SELECT s.sourcedistributionid as sid, > t.sourcedistributionid as tid, s.appid as sapp, t.appid as tapp, s.brandid > as sbrand, t.brandid as tbrand FROM DistributionAttributes t INNER JOIN > DistributionAttributes s ON t.sourcedistributionid=s.sourcedistributionid > AND t.appid=s.appid AND t.brandid=s.brandid"). > *Without BroadCastJoin* ( spark-shell --conf > "spark.sql.autoBroadcastJoinThreshold=-1") : > df.explain > == Physical Plan == > *Project [sourcedistributionid#71 AS sid#0, sourcedistributionid#60 AS tid#1, > appid#77 AS sapp#2, appid#66 AS tapp#3, brandid#79 AS sbrand#4, brandid#68 AS > tbrand#5] > +- *SortMergeJoin [sourcedistributionid#60, appid#66, brandid#68], > [sourcedistributionid#71, appid#77, brandid#79], Inner >:- *Sort [sourcedistributionid#60 ASC, appid#66 ASC, brandid#68 ASC], > false, 0 >: +- Exchange hashpartitioning(sourcedistributionid#60, appid#66, > brandid#68, 200) >: +- *Filter ((isnotnull(sourcedistributionid#60) && > isnotnull(brandid#68)) && isnotnull(appid#66)) >:+- HiveTableScan [sourcedistributionid#60, appid#66, brandid#68], > MetastoreRelation distributionattributes, t >+- *Sort [sourcedistributionid#71 ASC, appid#77 ASC, brandid#79 ASC], > false, 0 > +- Exchange hashpartitioning(sourcedistributionid#71, appid#77, > brandid#79, 200) > +- *Filter ((isnotnull(sourcedistributionid#71) && > isnotnull(appid#77)) && isnotnull(brandid#79)) > +- HiveTableScan [sourcedistributionid#71, appid#77, brandid#79], > MetastoreRelation distributionattributes, s > df.show > |sid|tid|sapp|tapp|sbrand|tbrand| > | 22| 22| 61| 61| 614| 614| > | 29| 29| 65| 65| 0| 0| > | 30| 30| 12| 12| 121| 121| > | 10| 10| 73| 73| 731| 731| > | 24| 24| 61| 61| 611| 611| > | 35| 35| 65| 65| 0| 0| > *With BroadCastJoin* ( spark-shell ) > df.explain > == Physical Plan == > *Project [sourcedistributionid#136 AS sid#65, sourcedistributionid#125 AS > tid#66, appid#142 AS sapp#67, appid#131 AS tapp#68, brandid#144 AS sbrand#69, > brandid#133 AS tbrand#70] > +- *BroadcastHashJoin [sourcedistributionid#125, appid#131, brandid#133], > [sourcedistributionid#136, appid#142, brandid#144], Inner, BuildRight >:- *Filter ((isnotnull(brandid#133) && isnotnull(appid#131)) && > isnotnull(sourcedistributionid#125)) >: +- HiveTableScan [sourcedistributionid#125, appid#131, brandid#133], > MetastoreRelation distributionattributes, t >+- BroadcastExchange > HashedRelationBroadcastMode(List((shiftleft((shiftleft(cast(input[0, int, > false] as bigint), 32) | (cast(input[1, int, false] as bigint) & > 4294967295)), 32) | (cast(input[2, int, false] as bigint) & 4294967295 > +- *Filter ((isnotnull(brandid#144) && > isnotnull(sourcedistributionid#136)) && isnotnull(appid#142)) > +- HiveTableScan [sourcedistributionid#136, appid#142, brandid#144], > MetastoreRelation distributionattributes, s > df.show > |sid|tid|sapp|tapp|sbrand|tbrand| > | 15| 22| 61| 61| 614| 614| > | 13| 22| 61| 61| 614| 614| > | 10| 22| 61| 61| 614| 614| > | 7| 22| 61| 61| 614| 614| > | 9| 22| 61| 61| 614| 614| > | 16| 22| 61| 61| 614| 614| -- This message was sent by Atlassian JIRA (v6.3.15#6346) ---
[jira] [Updated] (SPARK-21009) SparkListenerTaskEnd.taskInfo.accumulables might not be accurate
[ https://issues.apache.org/jira/browse/SPARK-21009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bogdan Raducanu updated SPARK-21009: Description: The following code reproduces it: {code} test("test") { val foundMetrics = mutable.Set.empty[String] spark.sparkContext.addSparkListener(new SparkListener { override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { taskEnd.taskInfo.accumulables.foreach { a => if (a.name.isDefined) { foundMetrics.add(a.name.get) } } } }) for (iter <- 0 until 100) { foundMetrics.clear() println(s"iter = $iter") spark.range(10).groupBy().agg("id" -> "sum").collect spark.sparkContext.listenerBus.waitUntilEmpty(3000) assert(foundMetrics.size > 0) } } {code} The problem comes from DAGScheduler.handleTaskCompletion. The SparkListenerTaskEnd event is sent before updateAccumulators is called, so it might not be up to date. The code there looks like it needs refactoring. was: The following code reproduces it: {code} test("test") { val foundMetrics = mutable.Set.empty[String] spark.sparkContext.addSparkListener(new SparkListener { override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { taskEnd.taskInfo.accumulables.foreach { a => if (a.name.isDefined) { foundMetrics.add(a.name.get) } } } }) for (iter <- 0 until 100) { foundMetrics.clear() println(s"iter = $iter") spark.range(10).groupBy().agg("id" -> "sum").collect spark.sparkContext.listenerBus.waitUntilEmpty(3000) assert(foundMetrics.size > 0) } } {code} > SparkListenerTaskEnd.taskInfo.accumulables might not be accurate > > > Key: SPARK-21009 > URL: https://issues.apache.org/jira/browse/SPARK-21009 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: Bogdan Raducanu > > The following code reproduces it: > {code} > test("test") { > val foundMetrics = mutable.Set.empty[String] > spark.sparkContext.addSparkListener(new SparkListener { > override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { > taskEnd.taskInfo.accumulables.foreach { a => > if (a.name.isDefined) { > foundMetrics.add(a.name.get) > } > } > } > }) > for (iter <- 0 until 100) { > foundMetrics.clear() > println(s"iter = $iter") > spark.range(10).groupBy().agg("id" -> "sum").collect > spark.sparkContext.listenerBus.waitUntilEmpty(3000) > assert(foundMetrics.size > 0) > } > } > {code} > The problem comes from DAGScheduler.handleTaskCompletion. > The SparkListenerTaskEnd event is sent before updateAccumulators is called, > so it might not be up to date. > The code there looks like it needs refactoring. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21009) SparkListenerTaskEnd.taskInfo.accumulables might not be accurate
Bogdan Raducanu created SPARK-21009: --- Summary: SparkListenerTaskEnd.taskInfo.accumulables might not be accurate Key: SPARK-21009 URL: https://issues.apache.org/jira/browse/SPARK-21009 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.2.0 Reporter: Bogdan Raducanu The following code reproduces it: {code} test("test") { val foundMetrics = mutable.Set.empty[String] spark.sparkContext.addSparkListener(new SparkListener { override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { taskEnd.taskInfo.accumulables.foreach { a => if (a.name.isDefined) { foundMetrics.add(a.name.get) } } } }) for (iter <- 0 until 100) { foundMetrics.clear() println(s"iter = $iter") spark.range(10).groupBy().agg("id" -> "sum").collect spark.sparkContext.listenerBus.waitUntilEmpty(3000) assert(foundMetrics.size > 0) } } {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-17237) DataFrame fill after pivot causing org.apache.spark.sql.AnalysisException
[ https://issues.apache.org/jira/browse/SPARK-17237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16040716#comment-16040716 ] Alberto Fernández edited comment on SPARK-17237 at 6/7/17 11:52 AM: Hi there, I think this change introduced a breaking change in the way the "withColumnRenamed" method works. I can reproduce this breaking change with the following example: {code} dataframe = sql("SELECT * FROM db.table") another_dataframe = sql("SELECT * FROM db.another_table") dataframe .join(another_dataframe, on=[...]) .pivot("column_name", values=[0, 1]) .max("column1", "column2") .withColumnRenamed("0_max(another_table.`column1`)", "name1") .withColumnRenamed("1_max(another_table.`column2`)", "name2") {code} With Spark 2.1.0, the behaviour is the expected: columns get renamed. With Spark 2.1.1, and if this issue was resolved, you wouldn't need to change anything for the renames to work. However, the column doesn't get renamed at all because now you would need to use the following renames: {code} dataframe = sql("SELECT * FROM db.table") another_dataframe = sql("SELECT * FROM db.another_table") dataframe .join(another_dataframe, on=[...]) .pivot("column_name", values=[0, 1]) .max("column1", "column2") .withColumnRenamed("0_max(column1)", "name1") .withColumnRenamed("1_max(column2)", "name2") {code} As you can see, it seems that this PR somehow managed to removed the table name from the join context and also removed the backticks, thus introducing a breaking change. I should also notice that the original issue didn't happen when using JSON as output format. It only happens because Parquet doesn't support () characters in column names, but in JSON they work just fine. Here is an example of the error thrown by Parquet after upgrading to Spark 2.1.1 and not modifying your code. {code} Attribute name "0_max(column1)" contains invalid character(s) among " ,;{}()\\n\\t=". Please use alias to rename it. {code} I think the original issue was that the parseAttributeName cannot detect "table.column" notation, and as I understand this PR still doesn't fix this issue right? As a workaround, you can change your column renames to accomodate the new format. Any ideas? Am I missing something? was (Author: albertofem): Hi there, I think this change introduced a breaking change in the way the "withColumnRenamed" method works. I can reproduce this breaking change with the following example: {code} dataframe = sql("SELECT * FROM db.table") another_dataframe = sql("SELECT * FROM db.another_table") dataframe .join(another_dataframe, on=[...]) .pivot("column_name", values=[0, 1]) .max("column1", "column2") .withColumnRenamed("0_max(another_table.`column1`)", "name1") .withColumnRenamed("0_max(another_table.`column2`)", "name2") {code} With Spark 2.1.0, the behaviour is the expected (buggy, but expected): columns doesn't get renamed. With Spark 2.1.1, and if this issue was resolved, you wouldn't need to change anything for the renames to work. However, the column doesn't get renamed at all because now you would need to use the following renames: {code} dataframe = sql("SELECT * FROM db.table") another_dataframe = sql("SELECT * FROM db.another_table") dataframe .join(another_dataframe, on=[...]) .pivot("column_name", values=[0, 1]) .max("column1", "column2") .withColumnRenamed("0_max(column1)", "name1") .withColumnRenamed("1_max(column2)", "name2") {code} As you can see, it seems that this PR somehow managed to removed the table name from the join context and also removed the backticks, thus introducing a breaking change. I should also notice that the original issue didn't happen when using JSON as output format. It only happens because Parquet doesn't support () characters in column names, but in JSON they work just fine. Here is an example of the error thrown by Parquet after upgrading to Spark 2.1.1 and not modifying your code. {code} Attribute name "0_max(column1)" contains invalid character(s) among " ,;{}()\\n\\t=". Please use alias to rename it. {code} I think the original issue was that the parseAttributeName cannot detect "table.column" notation, and as I understand this PR still doesn't fix this issue right? As a workaround, you can change your column renames to accomodate the new format. Any ideas? Am I missing something? > DataFrame fill after pivot causing org.apache.spark.sql.AnalysisException > - > > Key: SPARK-17237 > URL: https://issues.apache.org/jira/browse/SPARK-17237 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 >
[jira] [Commented] (SPARK-17237) DataFrame fill after pivot causing org.apache.spark.sql.AnalysisException
[ https://issues.apache.org/jira/browse/SPARK-17237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16040716#comment-16040716 ] Alberto Fernández commented on SPARK-17237: --- Hi there, I think this change introduced a breaking change in the way the "withColumnRenamed" method works. I can reproduce this breaking change with the following example: {code} dataframe = sql("SELECT * FROM db.table") another_dataframe = sql("SELECT * FROM db.another_table") dataframe .join(another_dataframe, on=[...]) .pivot("column_name", values=[0, 1]) .max("column1", "column2") .withColumnRenamed("0_max(another_table.`column1`)", "name1") .withColumnRenamed("0_max(another_table.`column2`)", "name2") {code} With Spark 2.1.0, the behaviour is the expected (buggy, but expected): columns doesn't get renamed. With Spark 2.1.1, and if this issue was resolved, you wouldn't need to change anything for the renames to work. However, the column doesn't get renamed at all because now you would need to use the following renames: {code} dataframe = sql("SELECT * FROM db.table") another_dataframe = sql("SELECT * FROM db.another_table") dataframe .join(another_dataframe, on=[...]) .pivot("column_name", values=[0, 1]) .max("column1", "column2") .withColumnRenamed("0_max(column1)", "name1") .withColumnRenamed("1_max(column2)", "name2") {code} As you can see, it seems that this PR somehow managed to removed the table name from the join context and also removed the backticks, thus introducing a breaking change. I should also notice that the original issue didn't happen when using JSON as output format. It only happens because Parquet doesn't support () characters in column names, but in JSON they work just fine. Here is an example of the error thrown by Parquet after upgrading to Spark 2.1.1 and not modifying your code. {code} Attribute name "0_max(column1)" contains invalid character(s) among " ,;{}()\\n\\t=". Please use alias to rename it. {code} I think the original issue was that the parseAttributeName cannot detect "table.column" notation, and as I understand this PR still doesn't fix this issue right? As a workaround, you can change your column renames to accomodate the new format. Any ideas? Am I missing something? > DataFrame fill after pivot causing org.apache.spark.sql.AnalysisException > - > > Key: SPARK-17237 > URL: https://issues.apache.org/jira/browse/SPARK-17237 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 >Reporter: Jiang Qiqi >Assignee: Takeshi Yamamuro > Labels: newbie > Fix For: 2.0.3, 2.1.1, 2.2.0 > > > I am trying to run a pivot transformation which I ran on a spark1.6 cluster, > namely > sc.parallelize(Seq((2,3,4), (3,4,5))).toDF("a", "b", "c") > res1: org.apache.spark.sql.DataFrame = [a: int, b: int, c: int] > scala> res1.groupBy("a").pivot("b").agg(count("c"), avg("c")).na.fill(0) > res2: org.apache.spark.sql.DataFrame = [a: int, 3_count(c): bigint, 3_avg(c): > double, 4_count(c): bigint, 4_avg(c): double] > scala> res1.groupBy("a").pivot("b").agg(count("c"), avg("c")).na.fill(0).show > +---+--++--++ > | a|3_count(c)|3_avg(c)|4_count(c)|4_avg(c)| > +---+--++--++ > | 2| 1| 4.0| 0| 0.0| > | 3| 0| 0.0| 1| 5.0| > +---+--++--++ > after upgrade the environment to spark2.0, got an error while executing > .na.fill method > scala> sc.parallelize(Seq((2,3,4), (3,4,5))).toDF("a", "b", "c") > res3: org.apache.spark.sql.DataFrame = [a: int, b: int ... 1 more field] > scala> res3.groupBy("a").pivot("b").agg(count("c"), avg("c")).na.fill(0) > org.apache.spark.sql.AnalysisException: syntax error in attribute name: > `3_count(`c`)`; > at > org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$.e$1(unresolved.scala:103) > at > org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$.parseAttributeName(unresolved.scala:113) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveQuoted(LogicalPlan.scala:168) > at org.apache.spark.sql.Dataset.resolve(Dataset.scala:218) > at org.apache.spark.sql.Dataset.col(Dataset.scala:921) > at > org.apache.spark.sql.DataFrameNaFunctions.org$apache$spark$sql$DataFrameNaFunctions$$fillCol(DataFrameNaFunctions.scala:411) > at > org.apache.spark.sql.DataFrameNaFunctions$$anonfun$2.apply(DataFrameNaFunctions.scala:162) > at > org.apache.spark.sql.DataFrameNaFunctions$$anonfun$2.apply(DataFrameNaFunctions.scala:159) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > a
[jira] [Comment Edited] (SPARK-20998) BroadcastHashJoin producing wrong results
[ https://issues.apache.org/jira/browse/SPARK-20998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16040687#comment-16040687 ] Mohit edited comment on SPARK-20998 at 6/7/17 10:41 AM: [~viirya] Attached an orc file which is giving this error. I used simply val df = spark.read.orc("file:///mnt/mohit/data") to read it. was (Author: mohitgargk): [~viirya] Attached. > BroadcastHashJoin producing wrong results > - > > Key: SPARK-20998 > URL: https://issues.apache.org/jira/browse/SPARK-20998 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 >Reporter: Mohit > Attachments: > part-r-0-e071fc92-9f0f-4ac9-acd6-75fe74d8b175.snappy.orc > > > I have a hive table : _DistributionAttributes_, with > *Schema*: > root > |-- distributionstatus: string (nullable = true) > |-- enabledforselectionflag: boolean (nullable = true) > |-- sourcedistributionid: integer (nullable = true) > |-- rowstartdate: date (nullable = true) > |-- rowenddate: date (nullable = true) > |-- rowiscurrent: string (nullable = true) > |-- dwcreatedate: timestamp (nullable = true) > |-- dwlastupdatedate: timestamp (nullable = true) > |-- appid: integer (nullable = true) > |-- siteid: integer (nullable = true) > |-- brandid: integer (nullable = true) > *DataFrame* > val df = spark.sql("SELECT s.sourcedistributionid as sid, > t.sourcedistributionid as tid, s.appid as sapp, t.appid as tapp, s.brandid > as sbrand, t.brandid as tbrand FROM DistributionAttributes t INNER JOIN > DistributionAttributes s ON t.sourcedistributionid=s.sourcedistributionid > AND t.appid=s.appid AND t.brandid=s.brandid"). > *Without BroadCastJoin* ( spark-shell --conf > "spark.sql.autoBroadcastJoinThreshold=-1") : > df.explain > == Physical Plan == > *Project [sourcedistributionid#71 AS sid#0, sourcedistributionid#60 AS tid#1, > appid#77 AS sapp#2, appid#66 AS tapp#3, brandid#79 AS sbrand#4, brandid#68 AS > tbrand#5] > +- *SortMergeJoin [sourcedistributionid#60, appid#66, brandid#68], > [sourcedistributionid#71, appid#77, brandid#79], Inner >:- *Sort [sourcedistributionid#60 ASC, appid#66 ASC, brandid#68 ASC], > false, 0 >: +- Exchange hashpartitioning(sourcedistributionid#60, appid#66, > brandid#68, 200) >: +- *Filter ((isnotnull(sourcedistributionid#60) && > isnotnull(brandid#68)) && isnotnull(appid#66)) >:+- HiveTableScan [sourcedistributionid#60, appid#66, brandid#68], > MetastoreRelation distributionattributes, t >+- *Sort [sourcedistributionid#71 ASC, appid#77 ASC, brandid#79 ASC], > false, 0 > +- Exchange hashpartitioning(sourcedistributionid#71, appid#77, > brandid#79, 200) > +- *Filter ((isnotnull(sourcedistributionid#71) && > isnotnull(appid#77)) && isnotnull(brandid#79)) > +- HiveTableScan [sourcedistributionid#71, appid#77, brandid#79], > MetastoreRelation distributionattributes, s > df.show > |sid|tid|sapp|tapp|sbrand|tbrand| > | 22| 22| 61| 61| 614| 614| > | 29| 29| 65| 65| 0| 0| > | 30| 30| 12| 12| 121| 121| > | 10| 10| 73| 73| 731| 731| > | 24| 24| 61| 61| 611| 611| > | 35| 35| 65| 65| 0| 0| > *With BroadCastJoin* ( spark-shell ) > df.explain > == Physical Plan == > *Project [sourcedistributionid#136 AS sid#65, sourcedistributionid#125 AS > tid#66, appid#142 AS sapp#67, appid#131 AS tapp#68, brandid#144 AS sbrand#69, > brandid#133 AS tbrand#70] > +- *BroadcastHashJoin [sourcedistributionid#125, appid#131, brandid#133], > [sourcedistributionid#136, appid#142, brandid#144], Inner, BuildRight >:- *Filter ((isnotnull(brandid#133) && isnotnull(appid#131)) && > isnotnull(sourcedistributionid#125)) >: +- HiveTableScan [sourcedistributionid#125, appid#131, brandid#133], > MetastoreRelation distributionattributes, t >+- BroadcastExchange > HashedRelationBroadcastMode(List((shiftleft((shiftleft(cast(input[0, int, > false] as bigint), 32) | (cast(input[1, int, false] as bigint) & > 4294967295)), 32) | (cast(input[2, int, false] as bigint) & 4294967295 > +- *Filter ((isnotnull(brandid#144) && > isnotnull(sourcedistributionid#136)) && isnotnull(appid#142)) > +- HiveTableScan [sourcedistributionid#136, appid#142, brandid#144], > MetastoreRelation distributionattributes, s > df.show > |sid|tid|sapp|tapp|sbrand|tbrand| > | 15| 22| 61| 61| 614| 614| > | 13| 22| 61| 61| 614| 614| > | 10| 22| 61| 61| 614| 614| > | 7| 22| 61| 61| 614| 614| > | 9| 22| 61| 61| 614| 614| > | 16| 22| 61| 61| 614| 614| -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional comm
[jira] [Commented] (SPARK-20998) BroadcastHashJoin producing wrong results
[ https://issues.apache.org/jira/browse/SPARK-20998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16040687#comment-16040687 ] Mohit commented on SPARK-20998: --- [~viirya] Attached. > BroadcastHashJoin producing wrong results > - > > Key: SPARK-20998 > URL: https://issues.apache.org/jira/browse/SPARK-20998 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 >Reporter: Mohit > Attachments: > part-r-0-e071fc92-9f0f-4ac9-acd6-75fe74d8b175.snappy.orc > > > I have a hive table : _DistributionAttributes_, with > *Schema*: > root > |-- distributionstatus: string (nullable = true) > |-- enabledforselectionflag: boolean (nullable = true) > |-- sourcedistributionid: integer (nullable = true) > |-- rowstartdate: date (nullable = true) > |-- rowenddate: date (nullable = true) > |-- rowiscurrent: string (nullable = true) > |-- dwcreatedate: timestamp (nullable = true) > |-- dwlastupdatedate: timestamp (nullable = true) > |-- appid: integer (nullable = true) > |-- siteid: integer (nullable = true) > |-- brandid: integer (nullable = true) > *DataFrame* > val df = spark.sql("SELECT s.sourcedistributionid as sid, > t.sourcedistributionid as tid, s.appid as sapp, t.appid as tapp, s.brandid > as sbrand, t.brandid as tbrand FROM DistributionAttributes t INNER JOIN > DistributionAttributes s ON t.sourcedistributionid=s.sourcedistributionid > AND t.appid=s.appid AND t.brandid=s.brandid"). > *Without BroadCastJoin* ( spark-shell --conf > "spark.sql.autoBroadcastJoinThreshold=-1") : > df.explain > == Physical Plan == > *Project [sourcedistributionid#71 AS sid#0, sourcedistributionid#60 AS tid#1, > appid#77 AS sapp#2, appid#66 AS tapp#3, brandid#79 AS sbrand#4, brandid#68 AS > tbrand#5] > +- *SortMergeJoin [sourcedistributionid#60, appid#66, brandid#68], > [sourcedistributionid#71, appid#77, brandid#79], Inner >:- *Sort [sourcedistributionid#60 ASC, appid#66 ASC, brandid#68 ASC], > false, 0 >: +- Exchange hashpartitioning(sourcedistributionid#60, appid#66, > brandid#68, 200) >: +- *Filter ((isnotnull(sourcedistributionid#60) && > isnotnull(brandid#68)) && isnotnull(appid#66)) >:+- HiveTableScan [sourcedistributionid#60, appid#66, brandid#68], > MetastoreRelation distributionattributes, t >+- *Sort [sourcedistributionid#71 ASC, appid#77 ASC, brandid#79 ASC], > false, 0 > +- Exchange hashpartitioning(sourcedistributionid#71, appid#77, > brandid#79, 200) > +- *Filter ((isnotnull(sourcedistributionid#71) && > isnotnull(appid#77)) && isnotnull(brandid#79)) > +- HiveTableScan [sourcedistributionid#71, appid#77, brandid#79], > MetastoreRelation distributionattributes, s > df.show > |sid|tid|sapp|tapp|sbrand|tbrand| > | 22| 22| 61| 61| 614| 614| > | 29| 29| 65| 65| 0| 0| > | 30| 30| 12| 12| 121| 121| > | 10| 10| 73| 73| 731| 731| > | 24| 24| 61| 61| 611| 611| > | 35| 35| 65| 65| 0| 0| > *With BroadCastJoin* ( spark-shell ) > df.explain > == Physical Plan == > *Project [sourcedistributionid#136 AS sid#65, sourcedistributionid#125 AS > tid#66, appid#142 AS sapp#67, appid#131 AS tapp#68, brandid#144 AS sbrand#69, > brandid#133 AS tbrand#70] > +- *BroadcastHashJoin [sourcedistributionid#125, appid#131, brandid#133], > [sourcedistributionid#136, appid#142, brandid#144], Inner, BuildRight >:- *Filter ((isnotnull(brandid#133) && isnotnull(appid#131)) && > isnotnull(sourcedistributionid#125)) >: +- HiveTableScan [sourcedistributionid#125, appid#131, brandid#133], > MetastoreRelation distributionattributes, t >+- BroadcastExchange > HashedRelationBroadcastMode(List((shiftleft((shiftleft(cast(input[0, int, > false] as bigint), 32) | (cast(input[1, int, false] as bigint) & > 4294967295)), 32) | (cast(input[2, int, false] as bigint) & 4294967295 > +- *Filter ((isnotnull(brandid#144) && > isnotnull(sourcedistributionid#136)) && isnotnull(appid#142)) > +- HiveTableScan [sourcedistributionid#136, appid#142, brandid#144], > MetastoreRelation distributionattributes, s > df.show > |sid|tid|sapp|tapp|sbrand|tbrand| > | 15| 22| 61| 61| 614| 614| > | 13| 22| 61| 61| 614| 614| > | 10| 22| 61| 61| 614| 614| > | 7| 22| 61| 61| 614| 614| > | 9| 22| 61| 61| 614| 614| > | 16| 22| 61| 61| 614| 614| -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20998) BroadcastHashJoin producing wrong results
[ https://issues.apache.org/jira/browse/SPARK-20998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mohit updated SPARK-20998: -- Attachment: part-r-0-e071fc92-9f0f-4ac9-acd6-75fe74d8b175.snappy.orc > BroadcastHashJoin producing wrong results > - > > Key: SPARK-20998 > URL: https://issues.apache.org/jira/browse/SPARK-20998 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 >Reporter: Mohit > Attachments: > part-r-0-e071fc92-9f0f-4ac9-acd6-75fe74d8b175.snappy.orc > > > I have a hive table : _DistributionAttributes_, with > *Schema*: > root > |-- distributionstatus: string (nullable = true) > |-- enabledforselectionflag: boolean (nullable = true) > |-- sourcedistributionid: integer (nullable = true) > |-- rowstartdate: date (nullable = true) > |-- rowenddate: date (nullable = true) > |-- rowiscurrent: string (nullable = true) > |-- dwcreatedate: timestamp (nullable = true) > |-- dwlastupdatedate: timestamp (nullable = true) > |-- appid: integer (nullable = true) > |-- siteid: integer (nullable = true) > |-- brandid: integer (nullable = true) > *DataFrame* > val df = spark.sql("SELECT s.sourcedistributionid as sid, > t.sourcedistributionid as tid, s.appid as sapp, t.appid as tapp, s.brandid > as sbrand, t.brandid as tbrand FROM DistributionAttributes t INNER JOIN > DistributionAttributes s ON t.sourcedistributionid=s.sourcedistributionid > AND t.appid=s.appid AND t.brandid=s.brandid"). > *Without BroadCastJoin* ( spark-shell --conf > "spark.sql.autoBroadcastJoinThreshold=-1") : > df.explain > == Physical Plan == > *Project [sourcedistributionid#71 AS sid#0, sourcedistributionid#60 AS tid#1, > appid#77 AS sapp#2, appid#66 AS tapp#3, brandid#79 AS sbrand#4, brandid#68 AS > tbrand#5] > +- *SortMergeJoin [sourcedistributionid#60, appid#66, brandid#68], > [sourcedistributionid#71, appid#77, brandid#79], Inner >:- *Sort [sourcedistributionid#60 ASC, appid#66 ASC, brandid#68 ASC], > false, 0 >: +- Exchange hashpartitioning(sourcedistributionid#60, appid#66, > brandid#68, 200) >: +- *Filter ((isnotnull(sourcedistributionid#60) && > isnotnull(brandid#68)) && isnotnull(appid#66)) >:+- HiveTableScan [sourcedistributionid#60, appid#66, brandid#68], > MetastoreRelation distributionattributes, t >+- *Sort [sourcedistributionid#71 ASC, appid#77 ASC, brandid#79 ASC], > false, 0 > +- Exchange hashpartitioning(sourcedistributionid#71, appid#77, > brandid#79, 200) > +- *Filter ((isnotnull(sourcedistributionid#71) && > isnotnull(appid#77)) && isnotnull(brandid#79)) > +- HiveTableScan [sourcedistributionid#71, appid#77, brandid#79], > MetastoreRelation distributionattributes, s > df.show > |sid|tid|sapp|tapp|sbrand|tbrand| > | 22| 22| 61| 61| 614| 614| > | 29| 29| 65| 65| 0| 0| > | 30| 30| 12| 12| 121| 121| > | 10| 10| 73| 73| 731| 731| > | 24| 24| 61| 61| 611| 611| > | 35| 35| 65| 65| 0| 0| > *With BroadCastJoin* ( spark-shell ) > df.explain > == Physical Plan == > *Project [sourcedistributionid#136 AS sid#65, sourcedistributionid#125 AS > tid#66, appid#142 AS sapp#67, appid#131 AS tapp#68, brandid#144 AS sbrand#69, > brandid#133 AS tbrand#70] > +- *BroadcastHashJoin [sourcedistributionid#125, appid#131, brandid#133], > [sourcedistributionid#136, appid#142, brandid#144], Inner, BuildRight >:- *Filter ((isnotnull(brandid#133) && isnotnull(appid#131)) && > isnotnull(sourcedistributionid#125)) >: +- HiveTableScan [sourcedistributionid#125, appid#131, brandid#133], > MetastoreRelation distributionattributes, t >+- BroadcastExchange > HashedRelationBroadcastMode(List((shiftleft((shiftleft(cast(input[0, int, > false] as bigint), 32) | (cast(input[1, int, false] as bigint) & > 4294967295)), 32) | (cast(input[2, int, false] as bigint) & 4294967295 > +- *Filter ((isnotnull(brandid#144) && > isnotnull(sourcedistributionid#136)) && isnotnull(appid#142)) > +- HiveTableScan [sourcedistributionid#136, appid#142, brandid#144], > MetastoreRelation distributionattributes, s > df.show > |sid|tid|sapp|tapp|sbrand|tbrand| > | 15| 22| 61| 61| 614| 614| > | 13| 22| 61| 61| 614| 614| > | 10| 22| 61| 61| 614| 614| > | 7| 22| 61| 61| 614| 614| > | 9| 22| 61| 61| 614| 614| > | 16| 22| 61| 61| 614| 614| -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20998) BroadcastHashJoin producing wrong results
[ https://issues.apache.org/jira/browse/SPARK-20998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mohit updated SPARK-20998: -- Description: I have a hive table : _DistributionAttributes_, with *Schema*: root |-- distributionstatus: string (nullable = true) |-- enabledforselectionflag: boolean (nullable = true) |-- sourcedistributionid: integer (nullable = true) |-- rowstartdate: date (nullable = true) |-- rowenddate: date (nullable = true) |-- rowiscurrent: string (nullable = true) |-- dwcreatedate: timestamp (nullable = true) |-- dwlastupdatedate: timestamp (nullable = true) |-- appid: integer (nullable = true) |-- siteid: integer (nullable = true) |-- brandid: integer (nullable = true) *DataFrame* val df = spark.sql("SELECT s.sourcedistributionid as sid, t.sourcedistributionid as tid, s.appid as sapp, t.appid as tapp, s.brandid as sbrand, t.brandid as tbrand FROM DistributionAttributes t INNER JOIN DistributionAttributes s ON t.sourcedistributionid=s.sourcedistributionid AND t.appid=s.appid AND t.brandid=s.brandid"). *Without BroadCastJoin* ( spark-shell --conf "spark.sql.autoBroadcastJoinThreshold=-1") : df.explain == Physical Plan == *Project [sourcedistributionid#71 AS sid#0, sourcedistributionid#60 AS tid#1, appid#77 AS sapp#2, appid#66 AS tapp#3, brandid#79 AS sbrand#4, brandid#68 AS tbrand#5] +- *SortMergeJoin [sourcedistributionid#60, appid#66, brandid#68], [sourcedistributionid#71, appid#77, brandid#79], Inner :- *Sort [sourcedistributionid#60 ASC, appid#66 ASC, brandid#68 ASC], false, 0 : +- Exchange hashpartitioning(sourcedistributionid#60, appid#66, brandid#68, 200) : +- *Filter ((isnotnull(sourcedistributionid#60) && isnotnull(brandid#68)) && isnotnull(appid#66)) :+- HiveTableScan [sourcedistributionid#60, appid#66, brandid#68], MetastoreRelation distributionattributes, t +- *Sort [sourcedistributionid#71 ASC, appid#77 ASC, brandid#79 ASC], false, 0 +- Exchange hashpartitioning(sourcedistributionid#71, appid#77, brandid#79, 200) +- *Filter ((isnotnull(sourcedistributionid#71) && isnotnull(appid#77)) && isnotnull(brandid#79)) +- HiveTableScan [sourcedistributionid#71, appid#77, brandid#79], MetastoreRelation distributionattributes, s df.show |sid|tid|sapp|tapp|sbrand|tbrand| | 22| 22| 61| 61| 614| 614| | 29| 29| 65| 65| 0| 0| | 30| 30| 12| 12| 121| 121| | 10| 10| 73| 73| 731| 731| | 24| 24| 61| 61| 611| 611| | 35| 35| 65| 65| 0| 0| *With BroadCastJoin* ( spark-shell ) df.explain == Physical Plan == *Project [sourcedistributionid#136 AS sid#65, sourcedistributionid#125 AS tid#66, appid#142 AS sapp#67, appid#131 AS tapp#68, brandid#144 AS sbrand#69, brandid#133 AS tbrand#70] +- *BroadcastHashJoin [sourcedistributionid#125, appid#131, brandid#133], [sourcedistributionid#136, appid#142, brandid#144], Inner, BuildRight :- *Filter ((isnotnull(brandid#133) && isnotnull(appid#131)) && isnotnull(sourcedistributionid#125)) : +- HiveTableScan [sourcedistributionid#125, appid#131, brandid#133], MetastoreRelation distributionattributes, t +- BroadcastExchange HashedRelationBroadcastMode(List((shiftleft((shiftleft(cast(input[0, int, false] as bigint), 32) | (cast(input[1, int, false] as bigint) & 4294967295)), 32) | (cast(input[2, int, false] as bigint) & 4294967295 +- *Filter ((isnotnull(brandid#144) && isnotnull(sourcedistributionid#136)) && isnotnull(appid#142)) +- HiveTableScan [sourcedistributionid#136, appid#142, brandid#144], MetastoreRelation distributionattributes, s df.show |sid|tid|sapp|tapp|sbrand|tbrand| | 15| 22| 61| 61| 614| 614| | 13| 22| 61| 61| 614| 614| | 10| 22| 61| 61| 614| 614| | 7| 22| 61| 61| 614| 614| | 9| 22| 61| 61| 614| 614| | 16| 22| 61| 61| 614| 614| was: I have a hive table : _eagle_edw_batch.DistributionAttributes_, with *Schema*: root |-- distributionstatus: string (nullable = true) |-- enabledforselectionflag: boolean (nullable = true) |-- sourcedistributionid: integer (nullable = true) |-- rowstartdate: date (nullable = true) |-- rowenddate: date (nullable = true) |-- rowiscurrent: string (nullable = true) |-- dwcreatedate: timestamp (nullable = true) |-- dwlastupdatedate: timestamp (nullable = true) |-- appid: integer (nullable = true) |-- siteid: integer (nullable = true) |-- brandid: integer (nullable = true) *DataFrame* val df = spark.sql("SELECT s.sourcedistributionid as sid, t.sourcedistributionid as tid, s.appid as sapp, t.appid as tapp, s.brandid as sbrand, t.brandid as tbrand FROM eagle_edw_batch.DistributionAttributes t INNER JOIN eagle_edw_batch.DistributionAttributes s ON t.sourcedistributionid=s.sourcedistributionid AND t.appid=s.appid AND t.brandid=s.brandid"). *Without BroadCastJoin* ( spark-shell --conf "spark.sql.autoBroadcastJoinThreshold=
[jira] [Commented] (SPARK-20998) BroadcastHashJoin producing wrong results
[ https://issues.apache.org/jira/browse/SPARK-20998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16040651#comment-16040651 ] Anuj commented on SPARK-20998: -- This could be duplicate of https://issues.apache.org/jira/browse/SPARK-17806 > BroadcastHashJoin producing wrong results > - > > Key: SPARK-20998 > URL: https://issues.apache.org/jira/browse/SPARK-20998 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 >Reporter: Mohit > > I have a hive table : _eagle_edw_batch.DistributionAttributes_, with > *Schema*: > root > |-- distributionstatus: string (nullable = true) > |-- enabledforselectionflag: boolean (nullable = true) > |-- sourcedistributionid: integer (nullable = true) > |-- rowstartdate: date (nullable = true) > |-- rowenddate: date (nullable = true) > |-- rowiscurrent: string (nullable = true) > |-- dwcreatedate: timestamp (nullable = true) > |-- dwlastupdatedate: timestamp (nullable = true) > |-- appid: integer (nullable = true) > |-- siteid: integer (nullable = true) > |-- brandid: integer (nullable = true) > *DataFrame* > val df = spark.sql("SELECT s.sourcedistributionid as sid, > t.sourcedistributionid as tid, s.appid as sapp, t.appid as tapp, s.brandid > as sbrand, t.brandid as tbrand FROM eagle_edw_batch.DistributionAttributes t > INNER JOIN eagle_edw_batch.DistributionAttributes s ON > t.sourcedistributionid=s.sourcedistributionid AND t.appid=s.appid AND > t.brandid=s.brandid"). > *Without BroadCastJoin* ( spark-shell --conf > "spark.sql.autoBroadcastJoinThreshold=-1") : > df.explain > == Physical Plan == > *Project [sourcedistributionid#71 AS sid#0, sourcedistributionid#60 AS tid#1, > appid#77 AS sapp#2, appid#66 AS tapp#3, brandid#79 AS sbrand#4, brandid#68 AS > tbrand#5] > +- *SortMergeJoin [sourcedistributionid#60, appid#66, brandid#68], > [sourcedistributionid#71, appid#77, brandid#79], Inner >:- *Sort [sourcedistributionid#60 ASC, appid#66 ASC, brandid#68 ASC], > false, 0 >: +- Exchange hashpartitioning(sourcedistributionid#60, appid#66, > brandid#68, 200) >: +- *Filter ((isnotnull(sourcedistributionid#60) && > isnotnull(brandid#68)) && isnotnull(appid#66)) >:+- HiveTableScan [sourcedistributionid#60, appid#66, brandid#68], > MetastoreRelation eagle_edw_batch, distributionattributes, t >+- *Sort [sourcedistributionid#71 ASC, appid#77 ASC, brandid#79 ASC], > false, 0 > +- Exchange hashpartitioning(sourcedistributionid#71, appid#77, > brandid#79, 200) > +- *Filter ((isnotnull(sourcedistributionid#71) && > isnotnull(appid#77)) && isnotnull(brandid#79)) > +- HiveTableScan [sourcedistributionid#71, appid#77, brandid#79], > MetastoreRelation eagle_edw_batch, distributionattributes, s > df.show > |sid|tid|sapp|tapp|sbrand|tbrand| > | 22| 22| 61| 61| 614| 614| > | 29| 29| 65| 65| 0| 0| > | 30| 30| 12| 12| 121| 121| > | 10| 10| 73| 73| 731| 731| > | 24| 24| 61| 61| 611| 611| > | 35| 35| 65| 65| 0| 0| > *With BroadCastJoin* ( spark-shell ) > df.explain > == Physical Plan == > *Project [sourcedistributionid#136 AS sid#65, sourcedistributionid#125 AS > tid#66, appid#142 AS sapp#67, appid#131 AS tapp#68, brandid#144 AS sbrand#69, > brandid#133 AS tbrand#70] > +- *BroadcastHashJoin [sourcedistributionid#125, appid#131, brandid#133], > [sourcedistributionid#136, appid#142, brandid#144], Inner, BuildRight >:- *Filter ((isnotnull(brandid#133) && isnotnull(appid#131)) && > isnotnull(sourcedistributionid#125)) >: +- HiveTableScan [sourcedistributionid#125, appid#131, brandid#133], > MetastoreRelation eagle_edw_batch, distributionattributes, t >+- BroadcastExchange > HashedRelationBroadcastMode(List((shiftleft((shiftleft(cast(input[0, int, > false] as bigint), 32) | (cast(input[1, int, false] as bigint) & > 4294967295)), 32) | (cast(input[2, int, false] as bigint) & 4294967295 > +- *Filter ((isnotnull(brandid#144) && > isnotnull(sourcedistributionid#136)) && isnotnull(appid#142)) > +- HiveTableScan [sourcedistributionid#136, appid#142, brandid#144], > MetastoreRelation eagle_edw_batch, distributionattributes, s > df.show > |sid|tid|sapp|tapp|sbrand|tbrand| > | 15| 22| 61| 61| 614| 614| > | 13| 22| 61| 61| 614| 614| > | 10| 22| 61| 61| 614| 614| > | 7| 22| 61| 61| 614| 614| > | 9| 22| 61| 61| 614| 614| > | 16| 22| 61| 61| 614| 614| -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18085) Better History Server scalability for many / large applications
[ https://issues.apache.org/jira/browse/SPARK-18085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16040633#comment-16040633 ] DjvuLee commented on SPARK-18085: - [~vanzin] the procedure of loading the history summary page is still a little long. Is there any further plan to solve this? > Better History Server scalability for many / large applications > --- > > Key: SPARK-18085 > URL: https://issues.apache.org/jira/browse/SPARK-18085 > Project: Spark > Issue Type: Umbrella > Components: Spark Core, Web UI >Affects Versions: 2.0.0 >Reporter: Marcelo Vanzin > Attachments: spark_hs_next_gen.pdf > > > It's a known fact that the History Server currently has some annoying issues > when serving lots of applications, and when serving large applications. > I'm filing this umbrella to track work related to addressing those issues. > I'll be attaching a document shortly describing the issues and suggesting a > path to how to solve them. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20994) Alleviate memory pressure in StreamManager
[ https://issues.apache.org/jira/browse/SPARK-20994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16040612#comment-16040612 ] Apache Spark commented on SPARK-20994: -- User 'jinxing64' has created a pull request for this issue: https://github.com/apache/spark/pull/18231 > Alleviate memory pressure in StreamManager > -- > > Key: SPARK-20994 > URL: https://issues.apache.org/jira/browse/SPARK-20994 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.1 >Reporter: jin xing > > In my cluster, we are suffering from OOM of shuffle-service. > We found that a lot of executors are fetching blocks from a single > shuffle-service. Analyzing the memory, we found that the > blockIds({{shuffle_shuffleId_mapId_reduceId}}) takes about 1.5GBytes. > In current code, chunks are fetched from shuffle service in two steps: > Step-1. Send {{OpenBlocks}}, which contains the blocks list to to fetch; > Step-2. Fetch the consecutive chunks from shuffle-service by {{streamId}} and > {{chunkIndex}} > Thus memory cost can be improved for step-1. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20997) spark-submit's --driver-cores marked as "YARN-only" but listed under "Spark standalone with cluster deploy mode only"
[ https://issues.apache.org/jira/browse/SPARK-20997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16040594#comment-16040594 ] Jacek Laskowski commented on SPARK-20997: - Go ahead! Thanks [~guoxiaolongzte]! > spark-submit's --driver-cores marked as "YARN-only" but listed under "Spark > standalone with cluster deploy mode only" > - > > Key: SPARK-20997 > URL: https://issues.apache.org/jira/browse/SPARK-20997 > Project: Spark > Issue Type: Bug > Components: Documentation, Spark Submit >Affects Versions: 2.3.0 >Reporter: Jacek Laskowski >Priority: Trivial > > Just noticed that {{spark-submit}} describes {{--driver-cores}} under: > * Spark standalone with cluster deploy mode only > * YARN-only > While I can understand "only" in "Spark standalone with cluster deploy mode > only" to refer to cluster deploy mode (not the default client mode), but > YARN-only baffles me which I think deserves a fix. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20963) Support column aliases for aliased relation in FROM clause
[ https://issues.apache.org/jira/browse/SPARK-20963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen updated SPARK-20963: -- Fix Version/s: (was: 2.3.0) > Support column aliases for aliased relation in FROM clause > -- > > Key: SPARK-20963 > URL: https://issues.apache.org/jira/browse/SPARK-20963 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.1.1 >Reporter: Takeshi Yamamuro > > Currently, we do not support column aliases for aliased relation; > {code} > scala> Seq((1, 2), (2, 0)).toDF("id", "value").createOrReplaceTempView("t1") > scala> Seq((1, 2), (2, 0)).toDF("id", "value").createOrReplaceTempView("t2") > scala> sql("SELECT * FROM (t1 JOIN t2)") > scala> sql("SELECT * FROM (t1 INNER JOIN t2 ON t1.id = t2.id) AS t(a, b, c, > d)").show > org.apache.spark.sql.catalyst.parser.ParseException: > mismatched input '(' expecting {, ',', 'WHERE', 'GROUP', 'ORDER', > 'HAVING', 'LIMIT', 'JOIN', 'CROSS', 'INNER', 'LEFT', 'RIGHT', 'FULL', > 'NATURAL', 'LATERAL', 'WINDOW', 'UNION', 'EXCEPT', 'MINUS', 'INTERSECT', > 'SORT', 'CLUSTER', 'DISTRIBUTE', 'ANTI'}(line 1, pos 54) > == SQL == > SELECT * FROM (t1 INNER JOIN t2 ON t1.id = t2.id) AS t(a, b, c, d) > --^^^ > at > org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217) > at > org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:114) > at org.apache.spark.sql.execution.SparkSqlParser.parse(Spa > {code} > We could support this by referring; > http://docs.aws.amazon.com/redshift/latest/dg/r_FROM_clause30.html -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20994) Alleviate memory pressure in StreamManager
[ https://issues.apache.org/jira/browse/SPARK-20994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jin xing updated SPARK-20994: - Description: In my cluster, we are suffering from OOM of shuffle-service. We found that a lot of executors are fetching blocks from a single shuffle-service. Analyzing the memory, we found that the blockIds({{shuffle_shuffleId_mapId_reduceId}}) takes about 1.5GBytes. In current code, chunks are fetched from shuffle service in two steps: Step-1. Send {{OpenBlocks}}, which contains the blocks list to to fetch; Step-2. Fetch the consecutive chunks from shuffle-service by {{streamId}} and {{chunkIndex}} Thus memory cost can be improved for step-1. was: In my cluster, we are suffering from OOM of shuffle-service. We found that a lot of executors are fetching blocks from a single shuffle-service. Analyzing the memory, we found that the blockIds({{shuffle_shuffleId_mapId_reduceId}}) takes about 1.5GBytes. In current code, chunks are fetched from shuffle service in two steps: Step-1. Send {{OpenBlocks}}, which contains the blocks list to to fetch; Step-2. Fetch the consecutive chunks from shuffle-service by {{streamId}} and {{chunkIndex}} Conceptually, there is no need to send the blocks list in step-1. Client can send the blockId in Step-2. Receiving {{ChunkFetchRequest}}, server can check if the chunkId is in local block manager and send back response. Thus memory cost can be improved. > Alleviate memory pressure in StreamManager > -- > > Key: SPARK-20994 > URL: https://issues.apache.org/jira/browse/SPARK-20994 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.1 >Reporter: jin xing > > In my cluster, we are suffering from OOM of shuffle-service. > We found that a lot of executors are fetching blocks from a single > shuffle-service. Analyzing the memory, we found that the > blockIds({{shuffle_shuffleId_mapId_reduceId}}) takes about 1.5GBytes. > In current code, chunks are fetched from shuffle service in two steps: > Step-1. Send {{OpenBlocks}}, which contains the blocks list to to fetch; > Step-2. Fetch the consecutive chunks from shuffle-service by {{streamId}} and > {{chunkIndex}} > Thus memory cost can be improved for step-1. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17495) Hive hash implementation
[ https://issues.apache.org/jira/browse/SPARK-17495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen updated SPARK-17495: -- Fix Version/s: (was: 2.2.0) > Hive hash implementation > > > Key: SPARK-17495 > URL: https://issues.apache.org/jira/browse/SPARK-17495 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Tejas Patil >Assignee: Tejas Patil >Priority: Minor > > Spark internally uses Murmur3Hash for partitioning. This is different from > the one used by Hive. For queries which use bucketing this leads to different > results if one tries the same query on both engines. For us, we want users to > have backward compatibility to that one can switch parts of applications > across the engines without observing regressions. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20894) Error while checkpointing to HDFS
[ https://issues.apache.org/jira/browse/SPARK-20894?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen updated SPARK-20894: -- Fix Version/s: (was: 2.3.0) > Error while checkpointing to HDFS > - > > Key: SPARK-20894 > URL: https://issues.apache.org/jira/browse/SPARK-20894 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.1.1 > Environment: Ubuntu, Spark 2.1.1, hadoop 2.7 >Reporter: kant kodali >Assignee: Shixiong Zhu > Attachments: driver_info_log, executor1_log, executor2_log > > > Dataset df2 = df1.groupBy(functions.window(df1.col("Timestamp5"), "24 > hours", "24 hours"), df1.col("AppName")).count(); > StreamingQuery query = df2.writeStream().foreach(new > KafkaSink()).option("checkpointLocation","/usr/local/hadoop/checkpoint").outputMode("update").start(); > query.awaitTermination(); > This for some reason fails with the Error > ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) > java.lang.IllegalStateException: Error reading delta file > /usr/local/hadoop/checkpoint/state/0/0/1.delta of HDFSStateStoreProvider[id = > (op=0, part=0), dir = /usr/local/hadoop/checkpoint/state/0/0]: > /usr/local/hadoop/checkpoint/state/0/0/1.delta does not exist > I did clear all the checkpoint data in /usr/local/hadoop/checkpoint/ and all > consumer offsets in Kafka from all brokers prior to running and yet this > error still persists. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20841) Support table column aliases in FROM clause
[ https://issues.apache.org/jira/browse/SPARK-20841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen updated SPARK-20841: -- Fix Version/s: (was: 2.3.0) > Support table column aliases in FROM clause > --- > > Key: SPARK-20841 > URL: https://issues.apache.org/jira/browse/SPARK-20841 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.0.0 >Reporter: Josh Rosen >Assignee: Takeshi Yamamuro >Priority: Minor > > Some SQL dialects support a relatively obscure "table column aliases" feature > where you can rename columns when aliasing a relation in a {{FROM}} clause. > For example: > {code} > SELECT * FROM onecolumn AS a(x) JOIN onecolumn AS b(y) ON a.x = b.y > {code} > Spark does not currently support this. I would like to add support for this > in order to allow me to run a corpus of existing queries which depend on this > syntax. > There's a good writeup on this at > http://modern-sql.com/feature/table-column-aliases, which has additional > examples and describes other databases' degrees of support for this feature. > One tricky thing to figure out will be whether FROM clause column aliases > take precedence over aliases in the SELECT clause. When adding support for > this, we should make sure to add sufficient testing of several corner-cases, > including: > * Aliasing in both the SELECT and FROM clause > * Aliasing columns in the FROM clause both with and without an explicit AS. > * Aliasing the wrong number of columns in the FROM clause, both greater and > fewer columns than were selected in the SELECT clause. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20962) Support subquery column aliases in FROM clause
[ https://issues.apache.org/jira/browse/SPARK-20962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen updated SPARK-20962: -- Fix Version/s: (was: 2.3.0) > Support subquery column aliases in FROM clause > -- > > Key: SPARK-20962 > URL: https://issues.apache.org/jira/browse/SPARK-20962 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.1.1 >Reporter: Takeshi Yamamuro > > Currently, we do not support subquery column aliases; > {code} > scala> sql("SELECT * FROM (SELECT 1 AS col1, 1 AS col2) t(a, b)").show > org.apache.spark.sql.catalyst.parser.ParseException: > mismatched input '(' expecting {, ',', 'WHERE', 'GROUP', 'ORDER', > 'HAVING', 'LIMIT', 'JOIN', 'CROSS', 'INNER', 'LEFT', 'RIGHT', 'FULL', > 'NATURAL', 'LATERAL', 'WINDOW', 'UNION', 'EXCEPT', 'MINUS', 'INTERSECT', > 'SORT', 'CLUSTER', 'DISTRIBUTE', 'ANTI'}(line 1, pos 45) > == SQL == > SELECT * FROM (SELECT 1 AS col1, 1 AS col2) t(a, b) > -^^^ > at > org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217) > at > org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:114) > at > org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:48) > at > org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:68) > at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:623) > {code} > We could support this by referring; > http://docs.aws.amazon.com/redshift/latest/dg/r_FROM_clause30.html -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20947) Encoding/decoding issue in PySpark pipe implementation
[ https://issues.apache.org/jira/browse/SPARK-20947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen updated SPARK-20947: -- Target Version/s: (was: 1.6.3) > Encoding/decoding issue in PySpark pipe implementation > -- > > Key: SPARK-20947 > URL: https://issues.apache.org/jira/browse/SPARK-20947 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0, > 2.1.1 >Reporter: Xiaozhe Wang > Attachments: fix-pipe-encoding-error.patch > > > Pipe action convert objects into strings using a way that was affected by the > default encoding setting of Python environment. > Here is the related code fragment (L717-721@python/pyspark/rdd.py): > {code} > def pipe_objs(out): > for obj in iterator: > s = str(obj).rstrip('\n') + '\n' > out.write(s.encode('utf-8')) > out.close() > {code} > The `str(obj)` part implicitly convert `obj` to an unicode string, then > encode it into a byte string using default encoding; On the other hand, the > `s.encode('utf-8')` part implicitly decode `s` into an unicode string using > default encoding and then encode it (AGAIN!) into a UTF-8 encoded byte string. > Typically the default encoding of Python environment would be 'ascii', which > means passing an unicode string containing characters beyond 'ascii' charset > will raise UnicodeEncodeError exception at `str(obj)` and passing a byte > string containing bytes greater than 128 will again raise UnicodeEncodeError > exception at 's.encode('utf-8')`. > Changing `str(obj)` to `unicode(obj)` would eliminate these problems. > The following code snippet reproduces these errors: > {code} > Welcome to > __ > / __/__ ___ _/ /__ > _\ \/ _ \/ _ `/ __/ '_/ >/__ / .__/\_,_/_/ /_/\_\ version 1.6.3 > /_/ > Using Python version 2.7.12 (default, Jul 25 2016 15:06:45) > SparkContext available as sc, HiveContext available as sqlContext. > >>> sc.parallelize([u'\u6d4b\u8bd5']).pipe('cat').collect() > [Stage 0:> (0 + 4) / > 4]Exception in thread Thread-1: > Traceback (most recent call last): > File > "/usr/local/Cellar/python/2.7.12/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", > line 801, in __bootstrap_inner > self.run() > File > "/usr/local/Cellar/python/2.7.12/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", > line 754, in run > self.__target(*self.__args, **self.__kwargs) > File > "/Users/wxz/Downloads/spark-1.6.3-bin-hadoop2.6/python/pyspark/rdd.py", line > 719, in pipe_objs > s = str(obj).rstrip('\n') + '\n' > UnicodeEncodeError: 'ascii' codec can't encode characters in position 0-1: > ordinal not in range(128) > >>> > >>> sc.parallelize([u'\u6d4b\u8bd5']).map(lambda x: > >>> x.encode('utf-8')).pipe('cat').collect() > Exception in thread Thread-1: > Traceback (most recent call last): > File > "/usr/local/Cellar/python/2.7.12/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", > line 801, in __bootstrap_inner > self.run() > File > "/usr/local/Cellar/python/2.7.12/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", > line 754, in run > self.__target(*self.__args, **self.__kwargs) > File > "/Users/wxz/Downloads/spark-1.6.3-bin-hadoop2.6/python/pyspark/rdd.py", line > 720, in pipe_objs > out.write(s.encode('utf-8')) > UnicodeDecodeError: 'ascii' codec can't decode byte 0xe6 in position 0: > ordinal not in range(128) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-20491) Synonym handling replacement issue in Apache Spark
[ https://issues.apache.org/jira/browse/SPARK-20491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-20491. --- Resolution: Invalid This looks like a question for the mailing list or somewhere else, not a JIRA > Synonym handling replacement issue in Apache Spark > -- > > Key: SPARK-20491 > URL: https://issues.apache.org/jira/browse/SPARK-20491 > Project: Spark > Issue Type: Question > Components: Examples, ML >Affects Versions: 2.0.2 > Environment: Eclipse LUNA, Spring Boot >Reporter: Nishanth J > Labels: maven > > I am facing a major issue on replacement of Synonyms in my DataSet. > I am trying to replace the synonym of the Brand names to its equivalent names. > I have tried 2 methods to solve this issue. > Method 1 (regexp_replace) > Here i am using the regexp_replace method. > Hashtable manufacturerNames = new Hashtable(); > Enumeration names; > String str; > double bal; > manufacturerNames.put("Allen","Apex Tool Group"); > manufacturerNames.put("Armstrong","Apex Tool Group"); > manufacturerNames.put("Campbell","Apex Tool Group"); > manufacturerNames.put("Lubriplate","Apex Tool Group"); > manufacturerNames.put("Delta","Apex Tool Group"); > manufacturerNames.put("Gearwrench","Apex Tool Group"); > manufacturerNames.put("H.K. Porter","Apex Tool Group"); > /*100 MORE*/ > manufacturerNames.put("Stanco","Stanco Mfg"); > manufacturerNames.put("Stanco","Stanco Mfg"); > manufacturerNames.put("Standard Safety","Standard Safety Equipment > Company"); > manufacturerNames.put("Standard Safety","Standard Safety Equipment > Company"); > // Show all balances in hash table. > names = manufacturerNames.keys(); > Dataset dataFileContent = > sqlContext.load("com.databricks.spark.csv", options); > while(names.hasMoreElements()) { > str = (String) names.nextElement(); > dataFileContent=dataFileContent.withColumn("ManufacturerSource", > regexp_replace(col("ManufacturerSource"),str,manufacturerNames.get(str).toString())); > } > dataFileContent.show(); > I got to know that the amount of data is too huge for regexp_replace so got a > solution to use UDF > http://stackoverflow.com/questions/43413513/issue-in-regex-replace-in-apache-spark-java > Method 2 (UDF) > List data2 = Arrays.asList( > RowFactory.create("Allen", "Apex Tool Group"), > RowFactory.create("Armstrong","Apex Tool Group"), > RowFactory.create("DeWALT","StanleyBlack") > ); > StructType schema2 = new StructType(new StructField[] { > new StructField("label2", DataTypes.StringType, false, > Metadata.empty()), > new StructField("sentence2", DataTypes.StringType, false, > Metadata.empty()) > }); > Dataset sentenceDataFrame2 = spark.createDataFrame(data2, schema2); > UDF2 contains = new UDF2 Boolean>() { > private static final long serialVersionUID = -5239951370238629896L; > @Override > public Boolean call(String t1, String t2) throws Exception { > return t1.contains(t2); > } > }; > spark.udf().register("contains", contains, DataTypes.BooleanType); > UDF3 replaceWithTerm = new UDF3 String, String, String>() { > private static final long serialVersionUID = -2882956931420910207L; > @Override > public String call(String t1, String t2, String t3) throws Exception { > return t1.replaceAll(t2, t3); > } > }; > spark.udf().register("replaceWithTerm", replaceWithTerm, > DataTypes.StringType); > Dataset joined = sentenceDataFrame.join(sentenceDataFrame2, > callUDF("contains", sentenceDataFrame.col("sentence"), > sentenceDataFrame2.col("label2"))) > .withColumn("sentence_replaced", > callUDF("replaceWithTerm", sentenceDataFrame.col("sentence"), > sentenceDataFrame2.col("label2"), sentenceDataFrame2.col("sentence2"))) > .select(col("sentence_replaced")); > joined.show(false); > } > Got this output when there are multiple replacements do in a row. > Input- > Allen Armstrong jeevi pramod Allen > sandesh Armstrong jeevi > harsha nischay DeWALT > Output- > Apex Tool Group Armstrong jeevi pramod Apex Tool Group > Allen Apex Tool Group jeevi pramod Allen > sandesh Apex Tool Group jeevi > harsha nischay StanleyBlack > Expected Output- > Apex Tool Group Apex Tool Group jeevi pramod Apex Tool Group > sandesh Apex Tool Group jeevi > harsha nischay StanleyBlack > Are there any other method which must be followed to get the proper output.? > Or is this is limitati
[jira] [Updated] (SPARK-20491) Synonym handling replacement issue in Apache Spark
[ https://issues.apache.org/jira/browse/SPARK-20491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen updated SPARK-20491: -- Target Version/s: (was: 2.0.2) > Synonym handling replacement issue in Apache Spark > -- > > Key: SPARK-20491 > URL: https://issues.apache.org/jira/browse/SPARK-20491 > Project: Spark > Issue Type: Question > Components: Examples, ML >Affects Versions: 2.0.2 > Environment: Eclipse LUNA, Spring Boot >Reporter: Nishanth J > Labels: maven > > I am facing a major issue on replacement of Synonyms in my DataSet. > I am trying to replace the synonym of the Brand names to its equivalent names. > I have tried 2 methods to solve this issue. > Method 1 (regexp_replace) > Here i am using the regexp_replace method. > Hashtable manufacturerNames = new Hashtable(); > Enumeration names; > String str; > double bal; > manufacturerNames.put("Allen","Apex Tool Group"); > manufacturerNames.put("Armstrong","Apex Tool Group"); > manufacturerNames.put("Campbell","Apex Tool Group"); > manufacturerNames.put("Lubriplate","Apex Tool Group"); > manufacturerNames.put("Delta","Apex Tool Group"); > manufacturerNames.put("Gearwrench","Apex Tool Group"); > manufacturerNames.put("H.K. Porter","Apex Tool Group"); > /*100 MORE*/ > manufacturerNames.put("Stanco","Stanco Mfg"); > manufacturerNames.put("Stanco","Stanco Mfg"); > manufacturerNames.put("Standard Safety","Standard Safety Equipment > Company"); > manufacturerNames.put("Standard Safety","Standard Safety Equipment > Company"); > // Show all balances in hash table. > names = manufacturerNames.keys(); > Dataset dataFileContent = > sqlContext.load("com.databricks.spark.csv", options); > while(names.hasMoreElements()) { > str = (String) names.nextElement(); > dataFileContent=dataFileContent.withColumn("ManufacturerSource", > regexp_replace(col("ManufacturerSource"),str,manufacturerNames.get(str).toString())); > } > dataFileContent.show(); > I got to know that the amount of data is too huge for regexp_replace so got a > solution to use UDF > http://stackoverflow.com/questions/43413513/issue-in-regex-replace-in-apache-spark-java > Method 2 (UDF) > List data2 = Arrays.asList( > RowFactory.create("Allen", "Apex Tool Group"), > RowFactory.create("Armstrong","Apex Tool Group"), > RowFactory.create("DeWALT","StanleyBlack") > ); > StructType schema2 = new StructType(new StructField[] { > new StructField("label2", DataTypes.StringType, false, > Metadata.empty()), > new StructField("sentence2", DataTypes.StringType, false, > Metadata.empty()) > }); > Dataset sentenceDataFrame2 = spark.createDataFrame(data2, schema2); > UDF2 contains = new UDF2 Boolean>() { > private static final long serialVersionUID = -5239951370238629896L; > @Override > public Boolean call(String t1, String t2) throws Exception { > return t1.contains(t2); > } > }; > spark.udf().register("contains", contains, DataTypes.BooleanType); > UDF3 replaceWithTerm = new UDF3 String, String, String>() { > private static final long serialVersionUID = -2882956931420910207L; > @Override > public String call(String t1, String t2, String t3) throws Exception { > return t1.replaceAll(t2, t3); > } > }; > spark.udf().register("replaceWithTerm", replaceWithTerm, > DataTypes.StringType); > Dataset joined = sentenceDataFrame.join(sentenceDataFrame2, > callUDF("contains", sentenceDataFrame.col("sentence"), > sentenceDataFrame2.col("label2"))) > .withColumn("sentence_replaced", > callUDF("replaceWithTerm", sentenceDataFrame.col("sentence"), > sentenceDataFrame2.col("label2"), sentenceDataFrame2.col("sentence2"))) > .select(col("sentence_replaced")); > joined.show(false); > } > Got this output when there are multiple replacements do in a row. > Input- > Allen Armstrong jeevi pramod Allen > sandesh Armstrong jeevi > harsha nischay DeWALT > Output- > Apex Tool Group Armstrong jeevi pramod Apex Tool Group > Allen Apex Tool Group jeevi pramod Allen > sandesh Apex Tool Group jeevi > harsha nischay StanleyBlack > Expected Output- > Apex Tool Group Apex Tool Group jeevi pramod Apex Tool Group > sandesh Apex Tool Group jeevi > harsha nischay StanleyBlack > Are there any other method which must be followed to get the proper output.? > Or is this is limitation of UDF ? > Kindly help us with this issue. -- This message was
[jira] [Commented] (SPARK-20676) Upload to PyPi
[ https://issues.apache.org/jira/browse/SPARK-20676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16040579#comment-16040579 ] Sean Owen commented on SPARK-20676: --- [~holdenk] are all the PyPi changes done? > Upload to PyPi > -- > > Key: SPARK-20676 > URL: https://issues.apache.org/jira/browse/SPARK-20676 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.1.1, 2.2.0 >Reporter: holdenk >Assignee: holdenk > > Upload PySpark to PyPi. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-17799) InterfaceStability annotation
[ https://issues.apache.org/jira/browse/SPARK-17799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-17799. --- Resolution: Done > InterfaceStability annotation > - > > Key: SPARK-17799 > URL: https://issues.apache.org/jira/browse/SPARK-17799 > Project: Spark > Issue Type: Improvement > Components: Build >Reporter: Reynold Xin >Assignee: Reynold Xin > Labels: releasenotes > > Based on discussions on the dev list > (http://apache-spark-developers-list.1001551.n3.nabble.com/discuss-separate-API-annotation-into-two-components-InterfaceAudience-amp-InterfaceStability-td17470.html#none), > there are consensus to introduce an InterfaceStability annotation to > eventually replace the current DeveloperApi / Experimental annotation. > This is an umbrella ticket to track its progress. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17637) Packed scheduling for Spark tasks across executors
[ https://issues.apache.org/jira/browse/SPARK-17637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen updated SPARK-17637: -- Target Version/s: (was: 2.2.0) > Packed scheduling for Spark tasks across executors > -- > > Key: SPARK-17637 > URL: https://issues.apache.org/jira/browse/SPARK-17637 > Project: Spark > Issue Type: Improvement > Components: Scheduler >Reporter: Zhan Zhang >Assignee: Zhan Zhang >Priority: Minor > > Currently Spark scheduler implements round robin scheduling for tasks to > executors. Which is great as it distributes the load evenly across the > cluster, but this leads to significant resource waste in some cases, > especially when dynamic allocation is enabled. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20448) Document how FileInputDStream works with object storage
[ https://issues.apache.org/jira/browse/SPARK-20448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen updated SPARK-20448: -- Target Version/s: (was: 2.2.0) > Document how FileInputDStream works with object storage > --- > > Key: SPARK-20448 > URL: https://issues.apache.org/jira/browse/SPARK-20448 > Project: Spark > Issue Type: Documentation > Components: Documentation >Affects Versions: 2.1.0 >Reporter: Steve Loughran >Priority: Minor > > Object stores work differently from filesystems: intermediate writes not > visible, renames are really O(data) copies, not O(1) transactions. > This makes working with them as DStreams fundamentally different: you can > write straight into the destination. > 1. Document how FileinputDStreams scan directories for changes > 2. Document how object stores behave differently, and the implications > for users. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-14151) Propose to refactor and expose Metrics Sink and Source interface
[ https://issues.apache.org/jira/browse/SPARK-14151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen updated SPARK-14151: -- Target Version/s: (was: 2.2.0) > Propose to refactor and expose Metrics Sink and Source interface > > > Key: SPARK-14151 > URL: https://issues.apache.org/jira/browse/SPARK-14151 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Reporter: Saisai Shao >Priority: Minor > > MetricsSystem is designed for plug-in different sources and sinks, user could > write their own sources and sinks and configured through metrics.properties, > MetricsSystem will register it through reflection. But current Source and > Sink interface is private, which means user cannot create their own sources > and sinks unless using the same package. > So here propose to expose source and sink interface, this will let user build > and maintain their own source and sink, alleviate the maintenance overhead of > spark codebase. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20929) LinearSVC should not use shared Param HasThresholds
[ https://issues.apache.org/jira/browse/SPARK-20929?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen updated SPARK-20929: -- Target Version/s: (was: 2.2.0) > LinearSVC should not use shared Param HasThresholds > --- > > Key: SPARK-20929 > URL: https://issues.apache.org/jira/browse/SPARK-20929 > Project: Spark > Issue Type: Bug > Components: ML >Affects Versions: 2.2.0 >Reporter: Joseph K. Bradley >Assignee: Joseph K. Bradley >Priority: Minor > > LinearSVC applies the Param 'threshold' to the rawPrediction, not the > probability. It has different semantics than the shared Param HasThreshold, > so it should not use the shared Param. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-20149) Audit PySpark code base for 2.6 specific work arounds
[ https://issues.apache.org/jira/browse/SPARK-20149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-20149. --- Resolution: Done > Audit PySpark code base for 2.6 specific work arounds > - > > Key: SPARK-20149 > URL: https://issues.apache.org/jira/browse/SPARK-20149 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.2.0 >Reporter: holdenk > > We should determine what the areas in PySpark are that have specific 2.6 work > arounds and create issues for them. The audit can be started during 2.2.0, > but cleaning up all the 2.6 specific code is likely too much to try and get > in so the actual fixing should probably be considered for 2.2.1 or 2.3 > (unless 2.2.0 is delayed). -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-12661) Drop Python 2.6 support in PySpark
[ https://issues.apache.org/jira/browse/SPARK-12661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-12661. --- Resolution: Done > Drop Python 2.6 support in PySpark > -- > > Key: SPARK-12661 > URL: https://issues.apache.org/jira/browse/SPARK-12661 > Project: Spark > Issue Type: Task > Components: PySpark >Reporter: Davies Liu > Labels: releasenotes > > 1. stop testing with 2.6 > 2. remove the code for python 2.6 > see discussion : > https://www.mail-archive.com/user@spark.apache.org/msg43423.html -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21008) Streaming applications read stale credentials file when recovering from checkpoint.
[ https://issues.apache.org/jira/browse/SPARK-21008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16040565#comment-16040565 ] Apache Spark commented on SPARK-21008: -- User 'saturday-shi' has created a pull request for this issue: https://github.com/apache/spark/pull/18230 > Streaming applications read stale credentials file when recovering from > checkpoint. > --- > > Key: SPARK-21008 > URL: https://issues.apache.org/jira/browse/SPARK-21008 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 1.6.3, 2.0.2, 2.1.1, 2.2.0 >Reporter: Xing Shi > > On a security(Kerberos) enabled cluster, streaming applications renew HDFS > delegation tokens periodically and save them in > {{/.sparkStaging//}} directory on HDFS. > The path of the credentials file will written into checkpoint, and reloaded > as the *old applicationId* at application restarting, although the > application has changed to a new id. > This issue can be reproduced by restarting a checkpoint-enabled streaming > application on a kerberized cluster. > The application run well - but with thousands of > {{java.io.FileNotFoundException}} logged - and finally failed by token > expiration. > The log file is something like this: > {code:title=the_first_run.log} > 17/06/07 14:52:06 INFO executor.CoarseGrainedExecutorBackend: Will > periodically update credentials from: > hdfs://nameservice1/user/xxx/.sparkStaging/application_1496384469444_0035/credentials-19a7c11e-8c93-478c-ab0a-cdbfae5b2ae5 > 17/06/07 14:52:06 INFO security.CredentialUpdater: Scheduling credentials > refresh from HDFS in 92263 ms. > {code} > {code:title=after_restart.log} > 17/06/07 15:11:24 INFO executor.CoarseGrainedExecutorBackend: Will > periodically update credentials from: > hdfs://nameservice1/user/xxx/.sparkStaging/application_1496384469444_0035/credentials-19a7c11e-8c93-478c-ab0a-cdbfae5b2ae5 > ... > 17/06/07 15:12:24 WARN yarn.YarnSparkHadoopUtil: Error while attempting to > list files from application staging dir > java.io.FileNotFoundException: File > hdfs://nameservice1/user/xxx/.sparkStaging/application_1496384469444_0035 > does not exist. > at > org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:697) > at > org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:105) > at > org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:755) > at > org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:751) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:751) > at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1485) > at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1525) > at > org.apache.spark.deploy.SparkHadoopUtil.listFilesSorted(SparkHadoopUtil.scala:257) > at > org.apache.spark.deploy.yarn.security.CredentialUpdater.org$apache$spark$deploy$yarn$security$CredentialUpdater$$updateCredentialsIfRequired(CredentialUpdater.scala:72) > at > org.apache.spark.deploy.yarn.security.CredentialUpdater$$anon$1$$anonfun$run$1.apply$mcV$sp(CredentialUpdater.scala:53) > at > org.apache.spark.deploy.yarn.security.CredentialUpdater$$anon$1$$anonfun$run$1.apply(CredentialUpdater.scala:53) > at > org.apache.spark.deploy.yarn.security.CredentialUpdater$$anon$1$$anonfun$run$1.apply(CredentialUpdater.scala:53) > at > org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1962) > at > org.apache.spark.deploy.yarn.security.CredentialUpdater$$anon$1.run(CredentialUpdater.scala:53) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > Notice that the applicationId after restart is > application_1496384469444_{color:red}0036{color} but the application still > attempt to read credentials from 0035's directory. > Recently I used Spark 1.6 in my cluster, and tested this issue with Spark > 1.6.3 and
[jira] [Assigned] (SPARK-21008) Streaming applications read stale credentials file when recovering from checkpoint.
[ https://issues.apache.org/jira/browse/SPARK-21008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-21008: Assignee: Apache Spark > Streaming applications read stale credentials file when recovering from > checkpoint. > --- > > Key: SPARK-21008 > URL: https://issues.apache.org/jira/browse/SPARK-21008 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 1.6.3, 2.0.2, 2.1.1, 2.2.0 >Reporter: Xing Shi >Assignee: Apache Spark > > On a security(Kerberos) enabled cluster, streaming applications renew HDFS > delegation tokens periodically and save them in > {{/.sparkStaging//}} directory on HDFS. > The path of the credentials file will written into checkpoint, and reloaded > as the *old applicationId* at application restarting, although the > application has changed to a new id. > This issue can be reproduced by restarting a checkpoint-enabled streaming > application on a kerberized cluster. > The application run well - but with thousands of > {{java.io.FileNotFoundException}} logged - and finally failed by token > expiration. > The log file is something like this: > {code:title=the_first_run.log} > 17/06/07 14:52:06 INFO executor.CoarseGrainedExecutorBackend: Will > periodically update credentials from: > hdfs://nameservice1/user/xxx/.sparkStaging/application_1496384469444_0035/credentials-19a7c11e-8c93-478c-ab0a-cdbfae5b2ae5 > 17/06/07 14:52:06 INFO security.CredentialUpdater: Scheduling credentials > refresh from HDFS in 92263 ms. > {code} > {code:title=after_restart.log} > 17/06/07 15:11:24 INFO executor.CoarseGrainedExecutorBackend: Will > periodically update credentials from: > hdfs://nameservice1/user/xxx/.sparkStaging/application_1496384469444_0035/credentials-19a7c11e-8c93-478c-ab0a-cdbfae5b2ae5 > ... > 17/06/07 15:12:24 WARN yarn.YarnSparkHadoopUtil: Error while attempting to > list files from application staging dir > java.io.FileNotFoundException: File > hdfs://nameservice1/user/xxx/.sparkStaging/application_1496384469444_0035 > does not exist. > at > org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:697) > at > org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:105) > at > org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:755) > at > org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:751) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:751) > at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1485) > at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1525) > at > org.apache.spark.deploy.SparkHadoopUtil.listFilesSorted(SparkHadoopUtil.scala:257) > at > org.apache.spark.deploy.yarn.security.CredentialUpdater.org$apache$spark$deploy$yarn$security$CredentialUpdater$$updateCredentialsIfRequired(CredentialUpdater.scala:72) > at > org.apache.spark.deploy.yarn.security.CredentialUpdater$$anon$1$$anonfun$run$1.apply$mcV$sp(CredentialUpdater.scala:53) > at > org.apache.spark.deploy.yarn.security.CredentialUpdater$$anon$1$$anonfun$run$1.apply(CredentialUpdater.scala:53) > at > org.apache.spark.deploy.yarn.security.CredentialUpdater$$anon$1$$anonfun$run$1.apply(CredentialUpdater.scala:53) > at > org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1962) > at > org.apache.spark.deploy.yarn.security.CredentialUpdater$$anon$1.run(CredentialUpdater.scala:53) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > Notice that the applicationId after restart is > application_1496384469444_{color:red}0036{color} but the application still > attempt to read credentials from 0035's directory. > Recently I used Spark 1.6 in my cluster, and tested this issue with Spark > 1.6.3 and 2.1.1. But it should affect all the versions from 1.5.x to current > master(2.3.x). -- Th
[jira] [Assigned] (SPARK-21008) Streaming applications read stale credentials file when recovering from checkpoint.
[ https://issues.apache.org/jira/browse/SPARK-21008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-21008: Assignee: (was: Apache Spark) > Streaming applications read stale credentials file when recovering from > checkpoint. > --- > > Key: SPARK-21008 > URL: https://issues.apache.org/jira/browse/SPARK-21008 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 1.6.3, 2.0.2, 2.1.1, 2.2.0 >Reporter: Xing Shi > > On a security(Kerberos) enabled cluster, streaming applications renew HDFS > delegation tokens periodically and save them in > {{/.sparkStaging//}} directory on HDFS. > The path of the credentials file will written into checkpoint, and reloaded > as the *old applicationId* at application restarting, although the > application has changed to a new id. > This issue can be reproduced by restarting a checkpoint-enabled streaming > application on a kerberized cluster. > The application run well - but with thousands of > {{java.io.FileNotFoundException}} logged - and finally failed by token > expiration. > The log file is something like this: > {code:title=the_first_run.log} > 17/06/07 14:52:06 INFO executor.CoarseGrainedExecutorBackend: Will > periodically update credentials from: > hdfs://nameservice1/user/xxx/.sparkStaging/application_1496384469444_0035/credentials-19a7c11e-8c93-478c-ab0a-cdbfae5b2ae5 > 17/06/07 14:52:06 INFO security.CredentialUpdater: Scheduling credentials > refresh from HDFS in 92263 ms. > {code} > {code:title=after_restart.log} > 17/06/07 15:11:24 INFO executor.CoarseGrainedExecutorBackend: Will > periodically update credentials from: > hdfs://nameservice1/user/xxx/.sparkStaging/application_1496384469444_0035/credentials-19a7c11e-8c93-478c-ab0a-cdbfae5b2ae5 > ... > 17/06/07 15:12:24 WARN yarn.YarnSparkHadoopUtil: Error while attempting to > list files from application staging dir > java.io.FileNotFoundException: File > hdfs://nameservice1/user/xxx/.sparkStaging/application_1496384469444_0035 > does not exist. > at > org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:697) > at > org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:105) > at > org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:755) > at > org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:751) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:751) > at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1485) > at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1525) > at > org.apache.spark.deploy.SparkHadoopUtil.listFilesSorted(SparkHadoopUtil.scala:257) > at > org.apache.spark.deploy.yarn.security.CredentialUpdater.org$apache$spark$deploy$yarn$security$CredentialUpdater$$updateCredentialsIfRequired(CredentialUpdater.scala:72) > at > org.apache.spark.deploy.yarn.security.CredentialUpdater$$anon$1$$anonfun$run$1.apply$mcV$sp(CredentialUpdater.scala:53) > at > org.apache.spark.deploy.yarn.security.CredentialUpdater$$anon$1$$anonfun$run$1.apply(CredentialUpdater.scala:53) > at > org.apache.spark.deploy.yarn.security.CredentialUpdater$$anon$1$$anonfun$run$1.apply(CredentialUpdater.scala:53) > at > org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1962) > at > org.apache.spark.deploy.yarn.security.CredentialUpdater$$anon$1.run(CredentialUpdater.scala:53) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > Notice that the applicationId after restart is > application_1496384469444_{color:red}0036{color} but the application still > attempt to read credentials from 0035's directory. > Recently I used Spark 1.6 in my cluster, and tested this issue with Spark > 1.6.3 and 2.1.1. But it should affect all the versions from 1.5.x to current > master(2.3.x). -- This message was sent by At
[jira] [Assigned] (SPARK-20966) Table data is not sorted by startTime time desc, time is not formatted and redundant code in JDBC/ODBC Server page.
[ https://issues.apache.org/jira/browse/SPARK-20966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen reassigned SPARK-20966: - Assignee: guoxiaolongzte > Table data is not sorted by startTime time desc, time is not formatted and > redundant code in JDBC/ODBC Server page. > --- > > Key: SPARK-20966 > URL: https://issues.apache.org/jira/browse/SPARK-20966 > Project: Spark > Issue Type: Improvement > Components: Web UI >Affects Versions: 2.1.1 >Reporter: guoxiaolongzte >Assignee: guoxiaolongzte >Priority: Minor > Fix For: 2.3.0 > > > Table data is not sorted by startTime time desc in JDBC/ODBC Server page. > Time is not formatted in JDBC/ODBC Server page. > Redundant code in the ThriftServerSessionPage.scala. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-20966) Table data is not sorted by startTime time desc, time is not formatted and redundant code in JDBC/ODBC Server page.
[ https://issues.apache.org/jira/browse/SPARK-20966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-20966. --- Resolution: Fixed Fix Version/s: 2.3.0 Issue resolved by pull request 18186 [https://github.com/apache/spark/pull/18186] > Table data is not sorted by startTime time desc, time is not formatted and > redundant code in JDBC/ODBC Server page. > --- > > Key: SPARK-20966 > URL: https://issues.apache.org/jira/browse/SPARK-20966 > Project: Spark > Issue Type: Improvement > Components: Web UI >Affects Versions: 2.1.1 >Reporter: guoxiaolongzte >Priority: Minor > Fix For: 2.3.0 > > > Table data is not sorted by startTime time desc in JDBC/ODBC Server page. > Time is not formatted in JDBC/ODBC Server page. > Redundant code in the ThriftServerSessionPage.scala. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20691) Difference between Storage Memory as seen internally and in web UI
[ https://issues.apache.org/jira/browse/SPARK-20691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16040556#comment-16040556 ] Apache Spark commented on SPARK-20691: -- User 'mkesselaers' has created a pull request for this issue: https://github.com/apache/spark/pull/18229 > Difference between Storage Memory as seen internally and in web UI > -- > > Key: SPARK-20691 > URL: https://issues.apache.org/jira/browse/SPARK-20691 > Project: Spark > Issue Type: Bug > Components: Web UI >Affects Versions: 2.3.0 >Reporter: Jacek Laskowski > Labels: starter > > I set Major priority as it's visible to a user. > There's a difference in what the size of Storage Memory is managed internally > and displayed to a user in web UI. > I found it while answering [How does web UI calculate Storage Memory (in > Executors tab)?|http://stackoverflow.com/q/43801062/1305344] on StackOverflow. > In short (quoting the main parts), when you start a Spark app (say > spark-shell) you see 912.3 MB RAM for Storage Memory: > {code} > $ ./bin/spark-shell --conf spark.driver.memory=2g > ... > 17/05/07 15:20:50 INFO BlockManagerMasterEndpoint: Registering block manager > 192.168.1.8:57177 with 912.3 MB RAM, BlockManagerId(driver, 192.168.1.8, > 57177, None) > {code} > but in the web UI you'll see 956.6 MB due to the way the custom JavaScript > function {{formatBytes}} in > [utils.js|https://github.com/apache/spark/blob/master/core/src/main/resources/org/apache/spark/ui/static/utils.js#L40-L48] > calculates the value. That translates to the following Scala code: > {code} > def formatBytes(bytes: Double) = { > val k = 1000 > val i = math.floor(math.log(bytes) / math.log(k)) > val maxMemoryWebUI = bytes / math.pow(k, i) > f"$maxMemoryWebUI%1.1f" > } > scala> println(formatBytes(maxMemory)) > 956.6 > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-20691) Difference between Storage Memory as seen internally and in web UI
[ https://issues.apache.org/jira/browse/SPARK-20691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-20691: Assignee: Apache Spark > Difference between Storage Memory as seen internally and in web UI > -- > > Key: SPARK-20691 > URL: https://issues.apache.org/jira/browse/SPARK-20691 > Project: Spark > Issue Type: Bug > Components: Web UI >Affects Versions: 2.3.0 >Reporter: Jacek Laskowski >Assignee: Apache Spark > Labels: starter > > I set Major priority as it's visible to a user. > There's a difference in what the size of Storage Memory is managed internally > and displayed to a user in web UI. > I found it while answering [How does web UI calculate Storage Memory (in > Executors tab)?|http://stackoverflow.com/q/43801062/1305344] on StackOverflow. > In short (quoting the main parts), when you start a Spark app (say > spark-shell) you see 912.3 MB RAM for Storage Memory: > {code} > $ ./bin/spark-shell --conf spark.driver.memory=2g > ... > 17/05/07 15:20:50 INFO BlockManagerMasterEndpoint: Registering block manager > 192.168.1.8:57177 with 912.3 MB RAM, BlockManagerId(driver, 192.168.1.8, > 57177, None) > {code} > but in the web UI you'll see 956.6 MB due to the way the custom JavaScript > function {{formatBytes}} in > [utils.js|https://github.com/apache/spark/blob/master/core/src/main/resources/org/apache/spark/ui/static/utils.js#L40-L48] > calculates the value. That translates to the following Scala code: > {code} > def formatBytes(bytes: Double) = { > val k = 1000 > val i = math.floor(math.log(bytes) / math.log(k)) > val maxMemoryWebUI = bytes / math.pow(k, i) > f"$maxMemoryWebUI%1.1f" > } > scala> println(formatBytes(maxMemory)) > 956.6 > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-20691) Difference between Storage Memory as seen internally and in web UI
[ https://issues.apache.org/jira/browse/SPARK-20691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-20691: Assignee: (was: Apache Spark) > Difference between Storage Memory as seen internally and in web UI > -- > > Key: SPARK-20691 > URL: https://issues.apache.org/jira/browse/SPARK-20691 > Project: Spark > Issue Type: Bug > Components: Web UI >Affects Versions: 2.3.0 >Reporter: Jacek Laskowski > Labels: starter > > I set Major priority as it's visible to a user. > There's a difference in what the size of Storage Memory is managed internally > and displayed to a user in web UI. > I found it while answering [How does web UI calculate Storage Memory (in > Executors tab)?|http://stackoverflow.com/q/43801062/1305344] on StackOverflow. > In short (quoting the main parts), when you start a Spark app (say > spark-shell) you see 912.3 MB RAM for Storage Memory: > {code} > $ ./bin/spark-shell --conf spark.driver.memory=2g > ... > 17/05/07 15:20:50 INFO BlockManagerMasterEndpoint: Registering block manager > 192.168.1.8:57177 with 912.3 MB RAM, BlockManagerId(driver, 192.168.1.8, > 57177, None) > {code} > but in the web UI you'll see 956.6 MB due to the way the custom JavaScript > function {{formatBytes}} in > [utils.js|https://github.com/apache/spark/blob/master/core/src/main/resources/org/apache/spark/ui/static/utils.js#L40-L48] > calculates the value. That translates to the following Scala code: > {code} > def formatBytes(bytes: Double) = { > val k = 1000 > val i = math.floor(math.log(bytes) / math.log(k)) > val maxMemoryWebUI = bytes / math.pow(k, i) > f"$maxMemoryWebUI%1.1f" > } > scala> println(formatBytes(maxMemory)) > 956.6 > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21005) VectorIndexerModel does not prepare output column field correctly
[ https://issues.apache.org/jira/browse/SPARK-21005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen Lin updated SPARK-21005: - Description: >From my understanding through reading the documentation, VectorIndexer >decides which features should be categorical based on the number of distinct >values, where features with at most maxCategories are declared categorical. >Meanwhile, those features which exceed maxCategories are declared continuous. Currently, VectorIndexerModel works all right with a dataset which has empty schema. However, when VectorIndexerModel is transforming on a dataset with `ML_ATTR` metadata, it may not output the expected result. For example, a feature with nominal attribute which has distinct values exceeding maxCategorie will not be treated as a continuous feature as we expected but still a categorical feature. Thus, it may cause all the tree-based algorithms (like Decision Tree, Random Forest, GBDT, etc.) throw errors as "DecisionTree requires maxBins (= $maxPossibleBins) to be at least as large as the number of values in each categorical feature, but categorical feature $maxCategory has $maxCategoriesPerFeature values. Considering remove this and other categorical features with a large number of values, or add more training examples.". Correct me if my understanding is wrong. I will submit a PR soon to resolve this issue. was: >From my understanding through reading the documentation, VectorIndexer >decides which features should be categorical based on the number of distinct >values, where features with at most maxCategories are declared categorical. >Meanwhile, those features which exceed maxCategories are declared continuous. Currently, VectorIndexerModel works all right with a dataset which has empty schema. However, when VectorIndexerModel is transforming on a dataset with `ML_ATTR` metadata, it may not output the expected result. For example, a feature with nominal attribute which has distinct values exceeding maxCategorie will not be treated as a continuous feature as we expected but still a categorical feature. Thus, it may cause all the tree-based algorithms (like Decision Tree, Random Forest, GBDT, etc.) throw errors as "DecisionTree requires maxBins (= $maxPossibleBins) to be at least as large as the number of values in each categorical feature, but categorical feature $maxCategory has $maxCategoriesPerFeature values. Considering remove this and other categorical features with a large number of values, or add more training examples.". Correct me if my understanding is wrong. I will submit a PR soon to solve this issue. > VectorIndexerModel does not prepare output column field correctly > - > > Key: SPARK-21005 > URL: https://issues.apache.org/jira/browse/SPARK-21005 > Project: Spark > Issue Type: Bug > Components: MLlib >Affects Versions: 2.1.1 >Reporter: Chen Lin > > From my understanding through reading the documentation, VectorIndexer > decides which features should be categorical based on the number of distinct > values, where features with at most maxCategories are declared categorical. > Meanwhile, those features which exceed maxCategories are declared continuous. > Currently, VectorIndexerModel works all right with a dataset which has empty > schema. However, when VectorIndexerModel is transforming on a dataset with > `ML_ATTR` metadata, it may not output the expected result. For example, a > feature with nominal attribute which has distinct values exceeding > maxCategorie will not be treated as a continuous feature as we expected but > still a categorical feature. Thus, it may cause all the tree-based algorithms > (like Decision Tree, Random Forest, GBDT, etc.) throw errors as "DecisionTree > requires maxBins (= $maxPossibleBins) to be at least as large as the number > of values in each categorical feature, but categorical feature $maxCategory > has $maxCategoriesPerFeature values. Considering remove this and other > categorical features with a large number of values, or add more training > examples.". > Correct me if my understanding is wrong. > I will submit a PR soon to resolve this issue. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21008) Streaming applications read stale credentials file when recovering from checkpoint.
Xing Shi created SPARK-21008: Summary: Streaming applications read stale credentials file when recovering from checkpoint. Key: SPARK-21008 URL: https://issues.apache.org/jira/browse/SPARK-21008 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.1.1, 2.0.2, 1.6.3, 2.2.0 Reporter: Xing Shi On a security(Kerberos) enabled cluster, streaming applications renew HDFS delegation tokens periodically and save them in {{/.sparkStaging//}} directory on HDFS. The path of the credentials file will written into checkpoint, and reloaded as the *old applicationId* at application restarting, although the application has changed to a new id. This issue can be reproduced by restarting a checkpoint-enabled streaming application on a kerberized cluster. The application run well - but with thousands of {{java.io.FileNotFoundException}} logged - and finally failed by token expiration. The log file is something like this: {code:title=the_first_run.log} 17/06/07 14:52:06 INFO executor.CoarseGrainedExecutorBackend: Will periodically update credentials from: hdfs://nameservice1/user/xxx/.sparkStaging/application_1496384469444_0035/credentials-19a7c11e-8c93-478c-ab0a-cdbfae5b2ae5 17/06/07 14:52:06 INFO security.CredentialUpdater: Scheduling credentials refresh from HDFS in 92263 ms. {code} {code:title=after_restart.log} 17/06/07 15:11:24 INFO executor.CoarseGrainedExecutorBackend: Will periodically update credentials from: hdfs://nameservice1/user/xxx/.sparkStaging/application_1496384469444_0035/credentials-19a7c11e-8c93-478c-ab0a-cdbfae5b2ae5 ... 17/06/07 15:12:24 WARN yarn.YarnSparkHadoopUtil: Error while attempting to list files from application staging dir java.io.FileNotFoundException: File hdfs://nameservice1/user/xxx/.sparkStaging/application_1496384469444_0035 does not exist. at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:697) at org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:105) at org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:755) at org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:751) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:751) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1485) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1525) at org.apache.spark.deploy.SparkHadoopUtil.listFilesSorted(SparkHadoopUtil.scala:257) at org.apache.spark.deploy.yarn.security.CredentialUpdater.org$apache$spark$deploy$yarn$security$CredentialUpdater$$updateCredentialsIfRequired(CredentialUpdater.scala:72) at org.apache.spark.deploy.yarn.security.CredentialUpdater$$anon$1$$anonfun$run$1.apply$mcV$sp(CredentialUpdater.scala:53) at org.apache.spark.deploy.yarn.security.CredentialUpdater$$anon$1$$anonfun$run$1.apply(CredentialUpdater.scala:53) at org.apache.spark.deploy.yarn.security.CredentialUpdater$$anon$1$$anonfun$run$1.apply(CredentialUpdater.scala:53) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1962) at org.apache.spark.deploy.yarn.security.CredentialUpdater$$anon$1.run(CredentialUpdater.scala:53) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) {code} Notice that the applicationId after restart is application_1496384469444_{color:red}0036{color} but the application still attempt to read credentials from 0035's directory. Recently I used Spark 1.6 in my cluster, and tested this issue with Spark 1.6.3 and 2.1.1. But it should affect all the versions from 1.5.x to current master(2.3.x). -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20928) Continuous Processing Mode for Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16040516#comment-16040516 ] Wladimir Schmidt commented on SPARK-20928: -- I am really looking forward for it! This sub millisecond streaming is really exciting. > Continuous Processing Mode for Structured Streaming > --- > > Key: SPARK-20928 > URL: https://issues.apache.org/jira/browse/SPARK-20928 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Michael Armbrust > > Given the current Source API, the minimum possible latency for any record is > bounded by the amount of time that it takes to launch a task. This > limitation is a result of the fact that {{getBatch}} requires us to know both > the starting and the ending offset, before any tasks are launched. In the > worst case, the end-to-end latency is actually closer to the average batch > time + task launching time. > For applications where latency is more important than exactly-once output > however, it would be useful if processing could happen continuously. This > would allow us to achieve fully pipelined reading and writing from sources > such as Kafka. This kind of architecture would make it possible to process > records with end-to-end latencies on the order of 1 ms, rather than the > 10-100ms that is possible today. > One possible architecture here would be to change the Source API to look like > the following rough sketch: > {code} > trait Epoch { > def data: DataFrame > /** The exclusive starting position for `data`. */ > def startOffset: Offset > /** The inclusive ending position for `data`. Incrementally updated > during processing, but not complete until execution of the query plan in > `data` is finished. */ > def endOffset: Offset > } > def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], > limits: Limits): Epoch > {code} > The above would allow us to build an alternative implementation of > {{StreamExecution}} that processes continuously with much lower latency and > only stops processing when needing to reconfigure the stream (either due to a > failure or a user requested change in parallelism. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20622) Parquet partition discovery for non key=value named directories
[ https://issues.apache.org/jira/browse/SPARK-20622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16040515#comment-16040515 ] Noam Asor commented on SPARK-20622: --- The provided pull request is not complete and is rather in a POC state. If it will be useful enough to be looked at and considered as part of Spark than it should get polished first. > Parquet partition discovery for non key=value named directories > --- > > Key: SPARK-20622 > URL: https://issues.apache.org/jira/browse/SPARK-20622 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Noam Asor >Priority: Minor > > h4. Why > There are cases where traditional M/R jobs and RDD based Spark jobs writes > out partitioned parquet in 'value only' named directories i.e. > {{hdfs:///some/base/path/2017/05/06}} and not in 'key=value' named > directories i.e. {{hdfs:///some/base/path/year=2017/month=05/day=06}} which > prevents users from leveraging Spark SQL parquet partition discovery when > reading the former back. > h4. What > This issue is a proposal for a solution which will allow Spark SQL to > discover parquet partitions for 'value only' named directories. > h4. How > By introducing a new Spark SQL read option *partitionTemplate*. > *partitionTemplate* is in a Path form and it should include base path > followed by the missing 'key=' as a template for transforming 'value only' > named dirs to 'key=value' named dirs. In the example above this will look > like: > {{hdfs:///some/base/path/year=/month=/day=/}}. > To simplify the solution this option should be tied with *basePath* option, > meaning that *partitionTemplate* option is valid only if *basePath* is set > also. > In the end for the above scenario, this will look something like: > {code} > spark.read > .option("basePath", "hdfs:///some/base/path") > .option("partitionTemplate", "hdfs:///some/base/path/year=/month=/day=/") > .parquet(...) > {code} > which will allow Spark SQL to do parquet partition discovery on the following > directory tree: > {code} > some > |--base >|--path > |--2016 > |--... > |--2017 >|--01 >|--02 >|--... >|--15 >|--... >|--... > {code} > adding to the schema of the resulted DataFrame the columns year, month, day > and their respective values as expected. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20622) Parquet partition discovery for non key=value named directories
[ https://issues.apache.org/jira/browse/SPARK-20622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Noam Asor updated SPARK-20622: -- Priority: Minor (was: Major) > Parquet partition discovery for non key=value named directories > --- > > Key: SPARK-20622 > URL: https://issues.apache.org/jira/browse/SPARK-20622 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Noam Asor >Priority: Minor > > h4. Why > There are cases where traditional M/R jobs and RDD based Spark jobs writes > out partitioned parquet in 'value only' named directories i.e. > {{hdfs:///some/base/path/2017/05/06}} and not in 'key=value' named > directories i.e. {{hdfs:///some/base/path/year=2017/month=05/day=06}} which > prevents users from leveraging Spark SQL parquet partition discovery when > reading the former back. > h4. What > This issue is a proposal for a solution which will allow Spark SQL to > discover parquet partitions for 'value only' named directories. > h4. How > By introducing a new Spark SQL read option *partitionTemplate*. > *partitionTemplate* is in a Path form and it should include base path > followed by the missing 'key=' as a template for transforming 'value only' > named dirs to 'key=value' named dirs. In the example above this will look > like: > {{hdfs:///some/base/path/year=/month=/day=/}}. > To simplify the solution this option should be tied with *basePath* option, > meaning that *partitionTemplate* option is valid only if *basePath* is set > also. > In the end for the above scenario, this will look something like: > {code} > spark.read > .option("basePath", "hdfs:///some/base/path") > .option("partitionTemplate", "hdfs:///some/base/path/year=/month=/day=/") > .parquet(...) > {code} > which will allow Spark SQL to do parquet partition discovery on the following > directory tree: > {code} > some > |--base >|--path > |--2016 > |--... > |--2017 >|--01 >|--02 >|--... >|--15 >|--... >|--... > {code} > adding to the schema of the resulted DataFrame the columns year, month, day > and their respective values as expected. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21007) Add SQL function - RIGHT && LEFT
[ https://issues.apache.org/jira/browse/SPARK-21007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16040510#comment-16040510 ] Apache Spark commented on SPARK-21007: -- User '10110346' has created a pull request for this issue: https://github.com/apache/spark/pull/18228 > Add SQL function - RIGHT && LEFT > - > > Key: SPARK-21007 > URL: https://issues.apache.org/jira/browse/SPARK-21007 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: liuxian > > Add SQL function - RIGHT && LEFT, same as MySQL: > https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_left > https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_right -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-21007) Add SQL function - RIGHT && LEFT
[ https://issues.apache.org/jira/browse/SPARK-21007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-21007: Assignee: (was: Apache Spark) > Add SQL function - RIGHT && LEFT > - > > Key: SPARK-21007 > URL: https://issues.apache.org/jira/browse/SPARK-21007 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: liuxian > > Add SQL function - RIGHT && LEFT, same as MySQL: > https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_left > https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_right -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-21007) Add SQL function - RIGHT && LEFT
[ https://issues.apache.org/jira/browse/SPARK-21007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-21007: Assignee: Apache Spark > Add SQL function - RIGHT && LEFT > - > > Key: SPARK-21007 > URL: https://issues.apache.org/jira/browse/SPARK-21007 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: liuxian >Assignee: Apache Spark > > Add SQL function - RIGHT && LEFT, same as MySQL: > https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_left > https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_right -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21007) Add SQL function - RIGHT && LEFT
[ https://issues.apache.org/jira/browse/SPARK-21007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] liuxian updated SPARK-21007: Description: Add SQL function - RIGHT && LEFT, same as MySQL: https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_left https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_right was: Add SQL function - RIGHT && LEFT, same as MySQL: https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_left https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_right > Add SQL function - RIGHT && LEFT > - > > Key: SPARK-21007 > URL: https://issues.apache.org/jira/browse/SPARK-21007 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: liuxian > > Add SQL function - RIGHT && LEFT, same as MySQL: > https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_left > https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_right -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21007) Add SQL function - RIGHT && LEFT
liuxian created SPARK-21007: --- Summary: Add SQL function - RIGHT && LEFT Key: SPARK-21007 URL: https://issues.apache.org/jira/browse/SPARK-21007 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.2.0 Reporter: liuxian Add SQL function - RIGHT && LEFT, same as MySQL: https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_left https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_right -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-18385) Make the transformer's natively in ml framework to avoid extra conversion
[ https://issues.apache.org/jira/browse/SPARK-18385?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-18385. --- Resolution: Won't Fix > Make the transformer's natively in ml framework to avoid extra conversion > - > > Key: SPARK-18385 > URL: https://issues.apache.org/jira/browse/SPARK-18385 > Project: Spark > Issue Type: Improvement > Components: ML >Reporter: Sandeep Singh >Priority: Minor > > follow up of [SPARK-14615] > see TODO's here https://github.com/apache/spark/pull/12627 > will impact performance since we avoid converting to/from MLLib vectors -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-16856) Link the application's executor page to the master's UI
[ https://issues.apache.org/jira/browse/SPARK-16856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-16856. --- Resolution: Won't Fix > Link the application's executor page to the master's UI > --- > > Key: SPARK-16856 > URL: https://issues.apache.org/jira/browse/SPARK-16856 > Project: Spark > Issue Type: Improvement > Components: Spark Core, Web UI >Reporter: Tao Lin > -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-16124) Throws exception when executing query on `build/sbt hive/console`
[ https://issues.apache.org/jira/browse/SPARK-16124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-16124. --- Resolution: Not A Problem > Throws exception when executing query on `build/sbt hive/console` > - > > Key: SPARK-16124 > URL: https://issues.apache.org/jira/browse/SPARK-16124 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 >Reporter: MIN-FU YANG >Priority: Minor > > When I execute `val query = sql("SELECT * FROM src WHERE key = 92 ")` on hive > console which is from `build/sbt hive/console`, It throws exception: > org.apache.spark.sql.AnalysisException: LOAD DATA input path does not exist: > file:/Users/xxx/git/spark/sql/hive/target/scala-2.11/spark-hive_2.11-2.0.0-SNAPSHOT.jar!/data/files/kv1.txt; > at > org.apache.spark.sql.execution.command.LoadDataCommand.run(tables.scala:242) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:67) > at > org.apache.spark.sql.execution.QueryExecution.hiveResultString(QueryExecution.scala:128) > at > org.apache.spark.sql.hive.test.TestHiveSparkSession$SqlCmd$$anonfun$cmd$1.apply$mcV$sp(TestHive.scala:192) > at > org.apache.spark.sql.hive.test.TestHiveSparkSession$$anonfun$loadTestTable$2.apply(TestHive.scala:376) > at > org.apache.spark.sql.hive.test.TestHiveSparkSession$$anonfun$loadTestTable$2.apply(TestHive.scala:376) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) > at > org.apache.spark.sql.hive.test.TestHiveSparkSession.loadTestTable(TestHive.scala:376) > at > org.apache.spark.sql.hive.test.TestHiveQueryExecution$$anonfun$analyzed$2.apply(TestHive.scala:462) > at > org.apache.spark.sql.hive.test.TestHiveQueryExecution$$anonfun$analyzed$2.apply(TestHive.scala:462) > at scala.collection.immutable.List.foreach(List.scala:381) > at > org.apache.spark.sql.hive.test.TestHiveQueryExecution.analyzed$lzycompute(TestHive.scala:462) > at > org.apache.spark.sql.hive.test.TestHiveQueryExecution.analyzed(TestHive.scala:450) > at > org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49) > at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64) > at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:573) > at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:682) > ... 42 elided -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-20515) Issue with reading Hive ORC tables having char/varchar columns in Spark SQL
[ https://issues.apache.org/jira/browse/SPARK-20515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-20515. --- Resolution: Duplicate > Issue with reading Hive ORC tables having char/varchar columns in Spark SQL > --- > > Key: SPARK-20515 > URL: https://issues.apache.org/jira/browse/SPARK-20515 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2 > Environment: AWS EMR Cluster >Reporter: Udit Mehrotra > > Reading from a Hive ORC table containing char/varchar columns fails in Spark > SQL. This is caused by the fact that Spark SQL internally replaces the > char/varchar columns with String data type. So, while reading from the table > created in Hive which has varchar/char columns, it ends up using the wrong > reader and causes a ClassCastException. > > Here is the exception: > > java.lang.ClassCastException: > org.apache.hadoop.hive.serde2.io.HiveVarcharWritable cannot be cast to > org.apache.hadoop.io.Text > at > org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector.getPrimitiveWritableObject(WritableStringObjectInspector.java:41) > at > org.apache.spark.sql.hive.HiveInspectors$class.unwrap(HiveInspectors.scala:324) > at > org.apache.spark.sql.hive.HadoopTableReader$.unwrap(TableReader.scala:333) > at > org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$15.apply(TableReader.scala:419) > at > org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$15.apply(TableReader.scala:419) > at > org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:435) > at > org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:426) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:247) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > at > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > While the issue has been fixed in Spark 2.1.1 and 2.2.0 with SPARK-19459, it > still needs to be fixed Spark 2.0. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org