[jira] [Closed] (SPARK-8966) Design a mechanism to ensure that temporary files created in tasks are cleaned up after failures
[ https://issues.apache.org/jira/browse/SPARK-8966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin closed SPARK-8966. -- Resolution: Later > Design a mechanism to ensure that temporary files created in tasks are > cleaned up after failures > > > Key: SPARK-8966 > URL: https://issues.apache.org/jira/browse/SPARK-8966 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Reporter: Josh Rosen > > It's important to avoid leaking temporary files, such as spill files created > by the external sorter. Individual operators should still make an effort to > clean up their own files / perform their own error handling, but I think that > we should add a safety-net mechanism to track file creation on a per-task > basis and automatically clean up leaked files. > During tests, this mechanism should throw an exception when a leak is > detected. In production deployments, it should log a warning and clean up the > leak itself. This is similar to the TaskMemoryManager's leak detection and > cleanup code. > We may be able to implement this via a convenience method that registers task > completion handlers with TaskContext. > We might also explore techniques that will cause files to be cleaned up > automatically when their file descriptors are closed (e.g. by calling unlink > on an open file). These techniques should not be our last line of defense > against file resource leaks, though, since they might be platform-specific > and may clean up resources later than we'd like. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18519) map type can not be used in EqualTo
[ https://issues.apache.org/jira/browse/SPARK-18519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15689184#comment-15689184 ] Apache Spark commented on SPARK-18519: -- User 'cloud-fan' has created a pull request for this issue: https://github.com/apache/spark/pull/15988 > map type can not be used in EqualTo > --- > > Key: SPARK-18519 > URL: https://issues.apache.org/jira/browse/SPARK-18519 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Wenchen Fan >Assignee: Wenchen Fan > Fix For: 2.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-18556) Suboptimal number of tasks when writing partitioned data with desired number of files per directory
Damian Momot created SPARK-18556: Summary: Suboptimal number of tasks when writing partitioned data with desired number of files per directory Key: SPARK-18556 URL: https://issues.apache.org/jira/browse/SPARK-18556 Project: Spark Issue Type: Improvement Affects Versions: 2.0.2, 2.0.1, 2.0.0 Reporter: Damian Momot It's unable to have optimal number of write tasks when optimal number of files per directory is known, example: When saving data to hdfs: 1. Data which is supposed to be partitioned by column (for example date) - it contains for example 90 different dates 2. Upfront knowledge that each date should be written into X files (for example 4, because of recommended hdfs/parquet block size etc.) 3. During processing, dataset was partitioned into 200 partitions (for example because of some grouping operations) currently we can do {code} val data: Dataset[Row] = ??? data .write .partitionBy("date") .parquet("/xyz") {code} This will properly write data into 90 date directories (see point '1') but each directory will contain 200 files (see point '3') We can force number of files by using repartition/coalesce: {code} val data: Dataset[Row] = ??? data .repartition(4) .write .partitionBy("date") .parquet("xyz") {code} This will properly save 90 directories, 4 files each... but it will be done using only 4 tasks which is way too slow - 360 files could be written in parallel using 360 tasks -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-18179) Throws analysis exception with a proper message for unsupported argument types in reflect/java_method function
[ https://issues.apache.org/jira/browse/SPARK-18179?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin resolved SPARK-18179. - Resolution: Fixed Assignee: Hyukjin Kwon Fix Version/s: 2.1.0 > Throws analysis exception with a proper message for unsupported argument > types in reflect/java_method function > -- > > Key: SPARK-18179 > URL: https://issues.apache.org/jira/browse/SPARK-18179 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Hyukjin Kwon >Assignee: Hyukjin Kwon >Priority: Minor > Fix For: 2.1.0 > > > {code} > scala> spark.range(1).selectExpr("reflect('java.lang.String', 'valueOf', > cast('1990-01-01' as timestamp))") > {code} > throws an exception as below: > {code} > java.util.NoSuchElementException: key not found: TimestampType > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:59) > at scala.collection.MapLike$class.apply(MapLike.scala:141) > at scala.collection.AbstractMap.apply(Map.scala:59) > at > org.apache.spark.sql.catalyst.expressions.CallMethodViaReflection$$anonfun$findMethod$1$$anonfun$apply$1.apply(CallMethodViaReflection.scala:159) > at > org.apache.spark.sql.catalyst.expressions.CallMethodViaReflection$$anonfun$findMethod$1$$anonfun$apply$1.apply(CallMethodViaReflection.scala:158) > at > scala.collection.IndexedSeqOptimized$class.prefixLengthImpl(IndexedSeqOptimized.scala:38) > {code} > We should throw analysis exception with a better message when the types are > unsupported rather than {{java.util.NoSuchElementException: key not found: > TimestampType}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18525) Kafka DirectInputStream cannot be aware of new partition
[ https://issues.apache.org/jira/browse/SPARK-18525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15689015#comment-15689015 ] Zhiwen Sun commented on SPARK-18525: Hi Cody: Thanks for your reply. We are still use kafka 0.8.2 , Is there a way to solve this problem? > Kafka DirectInputStream cannot be aware of new partition > > > Key: SPARK-18525 > URL: https://issues.apache.org/jira/browse/SPARK-18525 > Project: Spark > Issue Type: Improvement > Components: Input/Output >Affects Versions: 2.0.2 >Reporter: Zhiwen Sun > > It seems that DirectKafkaInputStream does not support read new partition when > spark streaming is running. > Related spark code: > https://github.com/apache/spark/blob/v2.0.2/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala#L101 > How to produce it: > {code:title=KafkaDirectTest.scala|borderStyle=solid} > object KafkaDirectTest { > def main(args: Array[String]) { > val conf = new SparkConf().setAppName("kafka direct test 5") > conf.setIfMissing("spark.master", "local[3]") > conf.set("spark.streaming.kafka.maxRatePerPartition", "10") > val ssc = new StreamingContext(conf, Seconds(1)) > val zkQuorum = Config("common").getString("kafka.zkquorum") > val topic = "test_use" > val groupId = "stream-test-0809" > val kafkaParams = Map( > "metadata.broker.list" -> "dev-002:9092,dev-004:9092", > "group.id" -> groupId > ) > val fromOffsets: Map[TopicAndPartition, Long] = Map( > new TopicAndPartition(topic, 0) -> 0L, > new TopicAndPartition(topic, 1) -> 0L, > new TopicAndPartition(topic, 2) -> 0L, > new TopicAndPartition(topic, 3) -> 0L > ) > val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd > val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, > StringDecoder](ssc, kafkaParams, Set(topic)) > lines.foreachRDD { rdd => > rdd.foreach { row => > println(s"\n row: ${row} ") > } > val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges > offsetRanges.foreach { offset => > println(s"\n- offset: ${offset.topic} ${offset.partition} > ${offset.fromOffset} ${offset.untilOffset}") > } > } > ssc.start() > ssc.awaitTermination() > } > } > {code} > 1. start the job > 2. add new partition of test_use topic > The job cannot read new partition data. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18515) AlterTableDropPartitions fails for non-string columns
[ https://issues.apache.org/jira/browse/SPARK-18515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688851#comment-15688851 ] Apache Spark commented on SPARK-18515: -- User 'dongjoon-hyun' has created a pull request for this issue: https://github.com/apache/spark/pull/15987 > AlterTableDropPartitions fails for non-string columns > - > > Key: SPARK-18515 > URL: https://issues.apache.org/jira/browse/SPARK-18515 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Herman van Hovell >Assignee: Dongjoon Hyun > > AlterTableDropPartitions fails with a scala MatchError if you use non-string > partitioning columns: > {noformat} > spark.sql("drop table if exists tbl_x") > spark.sql("create table tbl_x (a int) partitioned by (p int)") > spark.sql("alter table tbl_x add partition (p=10)") > spark.sql("alter table tbl_x drop partition (p=10)") > {noformat} > Yields the following error: > {noformat} > scala.MatchError: (cast(p#8 as int) = 10) (of class > org.apache.spark.sql.catalyst.expressions.EqualTo) > at > org.apache.spark.sql.execution.command.AlterTableDropPartitionCommand$$anonfun$10$$anonfun$11.apply(ddl.scala:462) > at > org.apache.spark.sql.execution.command.AlterTableDropPartitionCommand$$anonfun$10$$anonfun$11.apply(ddl.scala:462) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.List.foreach(List.scala:381) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.immutable.List.map(List.scala:285) > at > org.apache.spark.sql.execution.command.AlterTableDropPartitionCommand$$anonfun$10.apply(ddl.scala:462) > at > org.apache.spark.sql.execution.command.AlterTableDropPartitionCommand$$anonfun$10.apply(ddl.scala:461) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.spark.sql.execution.command.AlterTableDropPartitionCommand.run(ddl.scala:461) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113) > at > org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87) > at > org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87) > at org.apache.spark.sql.Dataset.(Dataset.scala:185) > at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64) > at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:591) > ... 39 elided > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-18515) AlterTableDropPartitions fails for non-string columns
[ https://issues.apache.org/jira/browse/SPARK-18515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-18515: Assignee: Dongjoon Hyun (was: Apache Spark) > AlterTableDropPartitions fails for non-string columns > - > > Key: SPARK-18515 > URL: https://issues.apache.org/jira/browse/SPARK-18515 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Herman van Hovell >Assignee: Dongjoon Hyun > > AlterTableDropPartitions fails with a scala MatchError if you use non-string > partitioning columns: > {noformat} > spark.sql("drop table if exists tbl_x") > spark.sql("create table tbl_x (a int) partitioned by (p int)") > spark.sql("alter table tbl_x add partition (p=10)") > spark.sql("alter table tbl_x drop partition (p=10)") > {noformat} > Yields the following error: > {noformat} > scala.MatchError: (cast(p#8 as int) = 10) (of class > org.apache.spark.sql.catalyst.expressions.EqualTo) > at > org.apache.spark.sql.execution.command.AlterTableDropPartitionCommand$$anonfun$10$$anonfun$11.apply(ddl.scala:462) > at > org.apache.spark.sql.execution.command.AlterTableDropPartitionCommand$$anonfun$10$$anonfun$11.apply(ddl.scala:462) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.List.foreach(List.scala:381) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.immutable.List.map(List.scala:285) > at > org.apache.spark.sql.execution.command.AlterTableDropPartitionCommand$$anonfun$10.apply(ddl.scala:462) > at > org.apache.spark.sql.execution.command.AlterTableDropPartitionCommand$$anonfun$10.apply(ddl.scala:461) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.spark.sql.execution.command.AlterTableDropPartitionCommand.run(ddl.scala:461) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113) > at > org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87) > at > org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87) > at org.apache.spark.sql.Dataset.(Dataset.scala:185) > at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64) > at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:591) > ... 39 elided > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-18515) AlterTableDropPartitions fails for non-string columns
[ https://issues.apache.org/jira/browse/SPARK-18515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-18515: Assignee: Apache Spark (was: Dongjoon Hyun) > AlterTableDropPartitions fails for non-string columns > - > > Key: SPARK-18515 > URL: https://issues.apache.org/jira/browse/SPARK-18515 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Herman van Hovell >Assignee: Apache Spark > > AlterTableDropPartitions fails with a scala MatchError if you use non-string > partitioning columns: > {noformat} > spark.sql("drop table if exists tbl_x") > spark.sql("create table tbl_x (a int) partitioned by (p int)") > spark.sql("alter table tbl_x add partition (p=10)") > spark.sql("alter table tbl_x drop partition (p=10)") > {noformat} > Yields the following error: > {noformat} > scala.MatchError: (cast(p#8 as int) = 10) (of class > org.apache.spark.sql.catalyst.expressions.EqualTo) > at > org.apache.spark.sql.execution.command.AlterTableDropPartitionCommand$$anonfun$10$$anonfun$11.apply(ddl.scala:462) > at > org.apache.spark.sql.execution.command.AlterTableDropPartitionCommand$$anonfun$10$$anonfun$11.apply(ddl.scala:462) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.List.foreach(List.scala:381) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.immutable.List.map(List.scala:285) > at > org.apache.spark.sql.execution.command.AlterTableDropPartitionCommand$$anonfun$10.apply(ddl.scala:462) > at > org.apache.spark.sql.execution.command.AlterTableDropPartitionCommand$$anonfun$10.apply(ddl.scala:461) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.spark.sql.execution.command.AlterTableDropPartitionCommand.run(ddl.scala:461) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113) > at > org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87) > at > org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87) > at org.apache.spark.sql.Dataset.(Dataset.scala:185) > at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64) > at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:591) > ... 39 elided > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18555) na.fill miss up original values in long integers
[ https://issues.apache.org/jira/browse/SPARK-18555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mahmoud Rawas updated SPARK-18555: -- Description: Manly the issue is clarified in the following example: Given a Dataset: scala> data.show | a| b| | 1| 2| | -1| -2| |9123146099426677101|9123146560113991650| theoretically when we call na.fill(0) nothing should change, while the current result is: scala> data.na.fill(0).show | a| b| | 1| 2| | -1| -2| |9123146099426676736|9123146560113991680| was: Manly the issue is clarified in the following example: Given a Dataset: ` scala> data.show +---+---+ | a| b| +---+---+ | 1| 2| | -1| -2| |9123146099426677101|9123146560113991650| +---+---+ ` theoretically when we call na.fill(0) nothing should change, while the current result is: ` scala> data.na.fill(0).show +---+---+ | a| b| +---+---+ | 1| 2| | -1| -2| |9123146099426676736|9123146560113991680| +---+---+ ` > na.fill miss up original values in long integers > > > Key: SPARK-18555 > URL: https://issues.apache.org/jira/browse/SPARK-18555 > Project: Spark > Issue Type: Bug >Affects Versions: 2.0.0, 2.0.1, 2.0.2 >Reporter: Mahmoud Rawas >Priority: Critical > > Manly the issue is clarified in the following example: > Given a Dataset: > scala> data.show > | a| b| > | 1| 2| > | -1| -2| > |9123146099426677101|9123146560113991650| > theoretically when we call na.fill(0) nothing should change, while the > current result is: > scala> data.na.fill(0).show > | a| b| > | 1| 2| > | -1| -2| > |9123146099426676736|9123146560113991680| -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-18555) na.fill miss up original values in long integers
Mahmoud Rawas created SPARK-18555: - Summary: na.fill miss up original values in long integers Key: SPARK-18555 URL: https://issues.apache.org/jira/browse/SPARK-18555 Project: Spark Issue Type: Bug Affects Versions: 2.0.2, 2.0.1, 2.0.0 Reporter: Mahmoud Rawas Priority: Critical Manly the issue is clarified in the following example: Given a Dataset: ` scala> data.show +---+---+ | a| b| +---+---+ | 1| 2| | -1| -2| |9123146099426677101|9123146560113991650| +---+---+ ` theoretically when we call na.fill(0) nothing should change, while the current result is: ` scala> data.na.fill(0).show +---+---+ | a| b| +---+---+ | 1| 2| | -1| -2| |9123146099426676736|9123146560113991680| +---+---+ ` -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-18554) leader master lost the leadership, when the slave become master, the perivious app's state display as waitting
liujianhui created SPARK-18554: -- Summary: leader master lost the leadership, when the slave become master, the perivious app's state display as waitting Key: SPARK-18554 URL: https://issues.apache.org/jira/browse/SPARK-18554 Project: Spark Issue Type: Bug Components: Deploy, Web UI Affects Versions: 1.6.1 Environment: java1.8 Reporter: liujianhui Priority: Minor when the leader of master lost the leadship and the slave become master, the state of app in the webui will display waiting; this code as follow case MasterChangeAcknowledged(appId) => { idToApp.get(appId) match { case Some(app) => logInfo("Application has been re-registered: " + appId) app.state = ApplicationState.WAITING case None => logWarning("Master change ack from unknown app: " + appId) } if (canCompleteRecovery) { completeRecovery() } the state of app should be RUNNING instead of waiting -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18553) Executor loss may cause TaskSetManager to be leaked
[ https://issues.apache.org/jira/browse/SPARK-18553?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688759#comment-15688759 ] Apache Spark commented on SPARK-18553: -- User 'JoshRosen' has created a pull request for this issue: https://github.com/apache/spark/pull/15986 > Executor loss may cause TaskSetManager to be leaked > --- > > Key: SPARK-18553 > URL: https://issues.apache.org/jira/browse/SPARK-18553 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 1.6.0, 2.0.0, 2.1.0 >Reporter: Josh Rosen >Assignee: Josh Rosen >Priority: Blocker > > Due to a bug in TaskSchedulerImpl, the complete sudden loss of an executor > may cause a TaskSetManager to be leaked, causing ShuffleDependencies and > other data structures to be kept alive indefinitely, leading to various types > of resource leaks (including shuffle file leaks). > In a nutshell, the problem is that TaskSchedulerImpl did not maintain its own > mapping from executorId to running task ids, leaving it unable to clean up > taskId to taskSetManager maps when an executor is totally lost. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-18553) Executor loss may cause TaskSetManager to be leaked
[ https://issues.apache.org/jira/browse/SPARK-18553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-18553: Assignee: Apache Spark (was: Josh Rosen) > Executor loss may cause TaskSetManager to be leaked > --- > > Key: SPARK-18553 > URL: https://issues.apache.org/jira/browse/SPARK-18553 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 1.6.0, 2.0.0, 2.1.0 >Reporter: Josh Rosen >Assignee: Apache Spark >Priority: Blocker > > Due to a bug in TaskSchedulerImpl, the complete sudden loss of an executor > may cause a TaskSetManager to be leaked, causing ShuffleDependencies and > other data structures to be kept alive indefinitely, leading to various types > of resource leaks (including shuffle file leaks). > In a nutshell, the problem is that TaskSchedulerImpl did not maintain its own > mapping from executorId to running task ids, leaving it unable to clean up > taskId to taskSetManager maps when an executor is totally lost. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-18553) Executor loss may cause TaskSetManager to be leaked
[ https://issues.apache.org/jira/browse/SPARK-18553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-18553: Assignee: Josh Rosen (was: Apache Spark) > Executor loss may cause TaskSetManager to be leaked > --- > > Key: SPARK-18553 > URL: https://issues.apache.org/jira/browse/SPARK-18553 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 1.6.0, 2.0.0, 2.1.0 >Reporter: Josh Rosen >Assignee: Josh Rosen >Priority: Blocker > > Due to a bug in TaskSchedulerImpl, the complete sudden loss of an executor > may cause a TaskSetManager to be leaked, causing ShuffleDependencies and > other data structures to be kept alive indefinitely, leading to various types > of resource leaks (including shuffle file leaks). > In a nutshell, the problem is that TaskSchedulerImpl did not maintain its own > mapping from executorId to running task ids, leaving it unable to clean up > taskId to taskSetManager maps when an executor is totally lost. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-18501) SparkR spark.glm error on collinear data
[ https://issues.apache.org/jira/browse/SPARK-18501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yanbo Liang resolved SPARK-18501. - Resolution: Fixed Fix Version/s: 2.1.0 > SparkR spark.glm error on collinear data > - > > Key: SPARK-18501 > URL: https://issues.apache.org/jira/browse/SPARK-18501 > Project: Spark > Issue Type: Bug > Components: ML, SparkR >Reporter: Yanbo Liang >Assignee: Yanbo Liang > Fix For: 2.1.0 > > > Spark {{GeneralizedLinearRegression}} can handle collinear data since the > underlying {{WeightedLeastSquares}} can be solved by local "l-bfgs"(rather > than "normal"). But the SparkR wrapper {{spark.glm}} throw errors when > fitting on collinear data: > {code} > > df <- read.df("data/mllib/sample_binary_classification_data.txt", source = > > "libsvm") > > model <- spark.glm(df, label ~ features, family = binomial(link = "logit”)) > > summary(model) > Error in `rownames<-`(`*tmp*`, value = c("(Intercept)", "features_0", : > length of 'dimnames' [1] not equal to array extent > {code} > After depth study of this error, I found it was caused the standard error of > coefficients, t value and p value are not available when the underlying > {{WeightedLeastSquares}} was solved by local "l-bfgs". So the coefficients > matrix was generated failed. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-18553) Executor loss may cause TaskSetManager to be leaked
Josh Rosen created SPARK-18553: -- Summary: Executor loss may cause TaskSetManager to be leaked Key: SPARK-18553 URL: https://issues.apache.org/jira/browse/SPARK-18553 Project: Spark Issue Type: Bug Components: Scheduler Affects Versions: 2.0.0, 1.6.0, 2.1.0 Reporter: Josh Rosen Assignee: Josh Rosen Priority: Blocker Due to a bug in TaskSchedulerImpl, the complete sudden loss of an executor may cause a TaskSetManager to be leaked, causing ShuffleDependencies and other data structures to be kept alive indefinitely, leading to various types of resource leaks (including shuffle file leaks). In a nutshell, the problem is that TaskSchedulerImpl did not maintain its own mapping from executorId to running task ids, leaving it unable to clean up taskId to taskSetManager maps when an executor is totally lost. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-18552) Watermark should not rely on sinks to proceed
Liwei Lin created SPARK-18552: - Summary: Watermark should not rely on sinks to proceed Key: SPARK-18552 URL: https://issues.apache.org/jira/browse/SPARK-18552 Project: Spark Issue Type: Bug Components: Structured Streaming Reporter: Liwei Lin Priority: Critical Today for watermark to be collected and proceed correctly, a sink should trigger the real execution the dataset it received in every batch. However, during the recovery process, a sink might skip a batch (such as in https://github.com/apache/spark/blob/v2.0.2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala#L202-L204), then the watermark just goes wrong. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-18545) Verify number of hive client RPCs in PartitionedTablePerfStatsSuite
[ https://issues.apache.org/jira/browse/SPARK-18545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-18545: Assignee: (was: Apache Spark) > Verify number of hive client RPCs in PartitionedTablePerfStatsSuite > --- > > Key: SPARK-18545 > URL: https://issues.apache.org/jira/browse/SPARK-18545 > Project: Spark > Issue Type: Test > Components: SQL >Reporter: Eric Liang >Priority: Minor > > To avoid performance regressions like > https://issues.apache.org/jira/browse/SPARK-18507 in the future, we should > add a metric for the number of Hive client RPC issued and check it in the > perf stats suite. > cc [~cloud_fan] -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18545) Verify number of hive client RPCs in PartitionedTablePerfStatsSuite
[ https://issues.apache.org/jira/browse/SPARK-18545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688674#comment-15688674 ] Apache Spark commented on SPARK-18545: -- User 'ericl' has created a pull request for this issue: https://github.com/apache/spark/pull/15985 > Verify number of hive client RPCs in PartitionedTablePerfStatsSuite > --- > > Key: SPARK-18545 > URL: https://issues.apache.org/jira/browse/SPARK-18545 > Project: Spark > Issue Type: Test > Components: SQL >Reporter: Eric Liang >Priority: Minor > > To avoid performance regressions like > https://issues.apache.org/jira/browse/SPARK-18507 in the future, we should > add a metric for the number of Hive client RPC issued and check it in the > perf stats suite. > cc [~cloud_fan] -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-18545) Verify number of hive client RPCs in PartitionedTablePerfStatsSuite
[ https://issues.apache.org/jira/browse/SPARK-18545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-18545: Assignee: Apache Spark > Verify number of hive client RPCs in PartitionedTablePerfStatsSuite > --- > > Key: SPARK-18545 > URL: https://issues.apache.org/jira/browse/SPARK-18545 > Project: Spark > Issue Type: Test > Components: SQL >Reporter: Eric Liang >Assignee: Apache Spark >Priority: Minor > > To avoid performance regressions like > https://issues.apache.org/jira/browse/SPARK-18507 in the future, we should > add a metric for the number of Hive client RPC issued and check it in the > perf stats suite. > cc [~cloud_fan] -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-18551) Add functionality to delete event logs from the History Server UI
[ https://issues.apache.org/jira/browse/SPARK-18551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-18551: Assignee: Apache Spark > Add functionality to delete event logs from the History Server UI > - > > Key: SPARK-18551 > URL: https://issues.apache.org/jira/browse/SPARK-18551 > Project: Spark > Issue Type: New Feature > Components: Spark Core, Web UI >Reporter: Alex Bozarth >Assignee: Apache Spark > > Sometimes a Spark user will only have access to a History Server to interact > with their (past) applications. But without access to the server they can > only delete applications through use of the FS Cleaner feature, which itself > can only clean logs older than a set date. > I propose adding the ability to delete specific applications via the History > Server UI with the default setting to off. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18551) Add functionality to delete event logs from the History Server UI
[ https://issues.apache.org/jira/browse/SPARK-18551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688516#comment-15688516 ] Apache Spark commented on SPARK-18551: -- User 'ajbozarth' has created a pull request for this issue: https://github.com/apache/spark/pull/15984 > Add functionality to delete event logs from the History Server UI > - > > Key: SPARK-18551 > URL: https://issues.apache.org/jira/browse/SPARK-18551 > Project: Spark > Issue Type: New Feature > Components: Spark Core, Web UI >Reporter: Alex Bozarth > > Sometimes a Spark user will only have access to a History Server to interact > with their (past) applications. But without access to the server they can > only delete applications through use of the FS Cleaner feature, which itself > can only clean logs older than a set date. > I propose adding the ability to delete specific applications via the History > Server UI with the default setting to off. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-18551) Add functionality to delete event logs from the History Server UI
[ https://issues.apache.org/jira/browse/SPARK-18551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-18551: Assignee: (was: Apache Spark) > Add functionality to delete event logs from the History Server UI > - > > Key: SPARK-18551 > URL: https://issues.apache.org/jira/browse/SPARK-18551 > Project: Spark > Issue Type: New Feature > Components: Spark Core, Web UI >Reporter: Alex Bozarth > > Sometimes a Spark user will only have access to a History Server to interact > with their (past) applications. But without access to the server they can > only delete applications through use of the FS Cleaner feature, which itself > can only clean logs older than a set date. > I propose adding the ability to delete specific applications via the History > Server UI with the default setting to off. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-1677) Allow users to avoid Hadoop output checks if desired
[ https://issues.apache.org/jira/browse/SPARK-1677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688509#comment-15688509 ] Apache Spark commented on SPARK-1677: - User 'CodingCat' has created a pull request for this issue: https://github.com/apache/spark/pull/947 > Allow users to avoid Hadoop output checks if desired > > > Key: SPARK-1677 > URL: https://issues.apache.org/jira/browse/SPARK-1677 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 1.0.0 >Reporter: Patrick Wendell >Assignee: Nan Zhu > Fix For: 1.0.1, 1.1.0 > > > For compatibility with older versions of Spark it would be nice to have an > option `spark.hadoop.validateOutputSpecs` (default true) and a description > "If set to true, validates the output specification used in saveAsHadoopFile > and other variants. This can be disabled to silence exceptions due to > pre-existing output directories." > This would just wrap the checking done in this PR: > https://issues.apache.org/jira/browse/SPARK-1100 > https://github.com/apache/spark/pull/11 > By first checking the spark conf. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-1677) Allow users to avoid Hadoop output checks if desired
[ https://issues.apache.org/jira/browse/SPARK-1677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688502#comment-15688502 ] Yang Li commented on SPARK-1677: Hi Spark Community, I'm curious on the behavior of this "spark.hadoop.validateOutputSpecs" option. If I set it to 'false', will existing files in output directory get wiped out beforehand? For example, if spark job is to output file Y under directory A, which already contain file X, do we expect both file X and Y under folder A? Or just Y will be retained after the job completion. Thanks! > Allow users to avoid Hadoop output checks if desired > > > Key: SPARK-1677 > URL: https://issues.apache.org/jira/browse/SPARK-1677 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 1.0.0 >Reporter: Patrick Wendell >Assignee: Nan Zhu > Fix For: 1.0.1, 1.1.0 > > > For compatibility with older versions of Spark it would be nice to have an > option `spark.hadoop.validateOutputSpecs` (default true) and a description > "If set to true, validates the output specification used in saveAsHadoopFile > and other variants. This can be disabled to silence exceptions due to > pre-existing output directories." > This would just wrap the checking done in this PR: > https://issues.apache.org/jira/browse/SPARK-1100 > https://github.com/apache/spark/pull/11 > By first checking the spark conf. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18544) Append with df.saveAsTable writes data to wrong location
[ https://issues.apache.org/jira/browse/SPARK-18544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688471#comment-15688471 ] Apache Spark commented on SPARK-18544: -- User 'ericl' has created a pull request for this issue: https://github.com/apache/spark/pull/15983 > Append with df.saveAsTable writes data to wrong location > > > Key: SPARK-18544 > URL: https://issues.apache.org/jira/browse/SPARK-18544 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Eric Liang >Priority: Blocker > > When using saveAsTable in append mode, data will be written to the wrong > location for non-managed Datasource tables. The following example illustrates > this. > It seems somehow pass the wrong table path to InsertIntoHadoopFsRelation from > DataFrameWriter. Also, we should probably remove the repair table call at the > end of saveAsTable in DataFrameWriter. That shouldn't be needed in either the > Hive or Datasource case. > {code} > scala> spark.sqlContext.range(100).selectExpr("id", "id as A", "id as > B").write.partitionBy("A", "B").mode("overwrite").parquet("/tmp/test") > scala> sql("create table test (id long, A int, B int) USING parquet OPTIONS > (path '/tmp/test') PARTITIONED BY (A, B)") > scala> sql("msck repair table test") > scala> sql("select * from test where A = 1").count > res6: Long = 1 > scala> spark.sqlContext.range(10).selectExpr("id", "id as A", "id as > B").write.partitionBy("A", "B").mode("append").saveAsTable("test") > scala> sql("select * from test where A = 1").count > res8: Long = 1 > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-18544) Append with df.saveAsTable writes data to wrong location
[ https://issues.apache.org/jira/browse/SPARK-18544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-18544: Assignee: Apache Spark > Append with df.saveAsTable writes data to wrong location > > > Key: SPARK-18544 > URL: https://issues.apache.org/jira/browse/SPARK-18544 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Eric Liang >Assignee: Apache Spark >Priority: Blocker > > When using saveAsTable in append mode, data will be written to the wrong > location for non-managed Datasource tables. The following example illustrates > this. > It seems somehow pass the wrong table path to InsertIntoHadoopFsRelation from > DataFrameWriter. Also, we should probably remove the repair table call at the > end of saveAsTable in DataFrameWriter. That shouldn't be needed in either the > Hive or Datasource case. > {code} > scala> spark.sqlContext.range(100).selectExpr("id", "id as A", "id as > B").write.partitionBy("A", "B").mode("overwrite").parquet("/tmp/test") > scala> sql("create table test (id long, A int, B int) USING parquet OPTIONS > (path '/tmp/test') PARTITIONED BY (A, B)") > scala> sql("msck repair table test") > scala> sql("select * from test where A = 1").count > res6: Long = 1 > scala> spark.sqlContext.range(10).selectExpr("id", "id as A", "id as > B").write.partitionBy("A", "B").mode("append").saveAsTable("test") > scala> sql("select * from test where A = 1").count > res8: Long = 1 > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-18544) Append with df.saveAsTable writes data to wrong location
[ https://issues.apache.org/jira/browse/SPARK-18544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-18544: Assignee: (was: Apache Spark) > Append with df.saveAsTable writes data to wrong location > > > Key: SPARK-18544 > URL: https://issues.apache.org/jira/browse/SPARK-18544 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Eric Liang >Priority: Blocker > > When using saveAsTable in append mode, data will be written to the wrong > location for non-managed Datasource tables. The following example illustrates > this. > It seems somehow pass the wrong table path to InsertIntoHadoopFsRelation from > DataFrameWriter. Also, we should probably remove the repair table call at the > end of saveAsTable in DataFrameWriter. That shouldn't be needed in either the > Hive or Datasource case. > {code} > scala> spark.sqlContext.range(100).selectExpr("id", "id as A", "id as > B").write.partitionBy("A", "B").mode("overwrite").parquet("/tmp/test") > scala> sql("create table test (id long, A int, B int) USING parquet OPTIONS > (path '/tmp/test') PARTITIONED BY (A, B)") > scala> sql("msck repair table test") > scala> sql("select * from test where A = 1").count > res6: Long = 1 > scala> spark.sqlContext.range(10).selectExpr("id", "id as A", "id as > B").write.partitionBy("A", "B").mode("append").saveAsTable("test") > scala> sql("select * from test where A = 1").count > res8: Long = 1 > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-18530) Kafka timestamp should be TimestampType
[ https://issues.apache.org/jira/browse/SPARK-18530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-18530. -- Resolution: Fixed Fix Version/s: 2.1.0 > Kafka timestamp should be TimestampType > --- > > Key: SPARK-18530 > URL: https://issues.apache.org/jira/browse/SPARK-18530 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Reporter: Michael Armbrust >Assignee: Shixiong Zhu >Priority: Blocker > Fix For: 2.1.0 > > > Otherwise every time you try to use it you have to do a manual conversion. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-15380) Generate code that stores a float/double value in each column from ColumnarBatch when DataFrame.cache() is used
[ https://issues.apache.org/jira/browse/SPARK-15380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Herman van Hovell updated SPARK-15380: -- Target Version/s: 2.2.0 (was: 2.1.0) > Generate code that stores a float/double value in each column from > ColumnarBatch when DataFrame.cache() is used > --- > > Key: SPARK-15380 > URL: https://issues.apache.org/jira/browse/SPARK-15380 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Kazuaki Ishizaki > > When DataFrame.cache() is called, data will be stored as column-oriented > storage in CachedBatch. The current Catalyst generates Java program to store > a computed value to an InternalRow and then the value is stored into > CachedBatch even if data is read from ColumnarBatch for ParquetReader. > This JIRA generates Java code to store a value into a ColumnarBatch, and > store data from the ColumnarBatch to the CachedBatch. This JIRA handles only > float and double types for a value. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-15117) Generate code that get a value in each compressed column from CachedBatch when DataFrame.cache() is called
[ https://issues.apache.org/jira/browse/SPARK-15117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Herman van Hovell updated SPARK-15117: -- Target Version/s: 2.2.0 (was: 2.1.0) > Generate code that get a value in each compressed column from CachedBatch > when DataFrame.cache() is called > -- > > Key: SPARK-15117 > URL: https://issues.apache.org/jira/browse/SPARK-15117 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Kazuaki Ishizaki > > Once SPARK-14098 is merged, we will migrate a feature in this JIRA entry. > When DataFrame.cache() is called, data is stored as column-oriented storage > in CachedBatch. The current Catalyst generates Java program to get a value of > a column from an InternalRow that is translated from CachedBatch. This issue > generates Java code to get a value of a column from CachedBatch. This JIRA > entry supports other primitive types (boolean/byte/short/int/long) whose > column may be compressed. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18551) Add functionality to delete event logs from the History Server UI
[ https://issues.apache.org/jira/browse/SPARK-18551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688393#comment-15688393 ] Alex Bozarth commented on SPARK-18551: -- I will have a wip pr open for this shortly > Add functionality to delete event logs from the History Server UI > - > > Key: SPARK-18551 > URL: https://issues.apache.org/jira/browse/SPARK-18551 > Project: Spark > Issue Type: New Feature > Components: Spark Core, Web UI >Reporter: Alex Bozarth > > Sometimes a Spark user will only have access to a History Server to interact > with their (past) applications. But without access to the server they can > only delete applications through use of the FS Cleaner feature, which itself > can only clean logs older than a set date. > I propose adding the ability to delete specific applications via the History > Server UI with the default setting to off. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-18551) Add functionality to delete event logs from the History Server UI
Alex Bozarth created SPARK-18551: Summary: Add functionality to delete event logs from the History Server UI Key: SPARK-18551 URL: https://issues.apache.org/jira/browse/SPARK-18551 Project: Spark Issue Type: New Feature Components: Spark Core, Web UI Reporter: Alex Bozarth Sometimes a Spark user will only have access to a History Server to interact with their (past) applications. But without access to the server they can only delete applications through use of the FS Cleaner feature, which itself can only clean logs older than a set date. I propose adding the ability to delete specific applications via the History Server UI with the default setting to off. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-18550) Make the queue capacity of LiveListenerBus configurable.
[ https://issues.apache.org/jira/browse/SPARK-18550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Herman van Hovell closed SPARK-18550. - Resolution: Duplicate > Make the queue capacity of LiveListenerBus configurable. > > > Key: SPARK-18550 > URL: https://issues.apache.org/jira/browse/SPARK-18550 > Project: Spark > Issue Type: Bug > Components: Spark Core >Reporter: Zhan Zhang >Priority: Minor > > We meet issues that driver listener bus cannot catch up the speed of incoming > event. Current value is fixed as 1000. This value should be configurable per > job. Otherwise, when event is dropped, the UI is totally useless. > Bus: Dropping SparkListenerEvent because no remaining room in event queue. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18550) Make the queue capacity of LiveListenerBus configurable.
[ https://issues.apache.org/jira/browse/SPARK-18550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688361#comment-15688361 ] Zhan Zhang commented on SPARK-18550: I was not aware it has been fixed already. Please help to close it. > Make the queue capacity of LiveListenerBus configurable. > > > Key: SPARK-18550 > URL: https://issues.apache.org/jira/browse/SPARK-18550 > Project: Spark > Issue Type: Bug > Components: Spark Core >Reporter: Zhan Zhang >Priority: Minor > > We meet issues that driver listener bus cannot catch up the speed of incoming > event. Current value is fixed as 1000. This value should be configurable per > job. Otherwise, when event is dropped, the UI is totally useless. > Bus: Dropping SparkListenerEvent because no remaining room in event queue. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18513) Record and recover watermark
[ https://issues.apache.org/jira/browse/SPARK-18513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688357#comment-15688357 ] Apache Spark commented on SPARK-18513: -- User 'tcondie' has created a pull request for this issue: https://github.com/apache/spark/pull/15949 > Record and recover watermark > > > Key: SPARK-18513 > URL: https://issues.apache.org/jira/browse/SPARK-18513 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Reporter: Liwei Lin >Assignee: Tyson Condie >Priority: Blocker > > We should record the watermark into the persistent log and recover it to > ensure determinism. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18533) Raise correct error upon specification of schema for datasource tables created through CTAS
[ https://issues.apache.org/jira/browse/SPARK-18533?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li updated SPARK-18533: Assignee: Dilip Biswal > Raise correct error upon specification of schema for datasource tables > created through CTAS > --- > > Key: SPARK-18533 > URL: https://issues.apache.org/jira/browse/SPARK-18533 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2 >Reporter: Dilip Biswal >Assignee: Dilip Biswal >Priority: Minor > Fix For: 2.1.0 > > > Currently hive serde tables created through CTAS does not allow explicit > specification of schema as its inferred from the select clause. Currently a > semantic error is raised for this case. However for data source tables > currently we raise a parser error which is not as informative. We should > raise consistent error for both forms of tables. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-18533) Raise correct error upon specification of schema for datasource tables created through CTAS
[ https://issues.apache.org/jira/browse/SPARK-18533?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li resolved SPARK-18533. - Resolution: Fixed Fix Version/s: 2.1.0 Issue resolved by pull request 15968 [https://github.com/apache/spark/pull/15968] > Raise correct error upon specification of schema for datasource tables > created through CTAS > --- > > Key: SPARK-18533 > URL: https://issues.apache.org/jira/browse/SPARK-18533 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2 >Reporter: Dilip Biswal >Priority: Minor > Fix For: 2.1.0 > > > Currently hive serde tables created through CTAS does not allow explicit > specification of schema as its inferred from the select clause. Currently a > semantic error is raised for this case. However for data source tables > currently we raise a parser error which is not as informative. We should > raise consistent error for both forms of tables. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18394) Executing the same query twice in a row results in CodeGenerator cache misses
[ https://issues.apache.org/jira/browse/SPARK-18394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688345#comment-15688345 ] Herman van Hovell commented on SPARK-18394: --- Great! Ping me if you need any assistance. > Executing the same query twice in a row results in CodeGenerator cache misses > - > > Key: SPARK-18394 > URL: https://issues.apache.org/jira/browse/SPARK-18394 > Project: Spark > Issue Type: Bug > Components: SQL > Environment: HiveThriftServer2 running on branch-2.0 on Mac laptop >Reporter: Jonny Serencsa > > Executing the query: > {noformat} > select > l_returnflag, > l_linestatus, > sum(l_quantity) as sum_qty, > sum(l_extendedprice) as sum_base_price, > sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, > sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, > avg(l_quantity) as avg_qty, > avg(l_extendedprice) as avg_price, > avg(l_discount) as avg_disc, > count(*) as count_order > from > lineitem_1_row > where > l_shipdate <= date_sub('1998-12-01', '90') > group by > l_returnflag, > l_linestatus > ; > {noformat} > twice (in succession), will result in CodeGenerator cache misses in BOTH > executions. Since the query is identical, I would expect the same code to be > generated. > Turns out, the generated code is not exactly the same, resulting in cache > misses when performing the lookup in the CodeGenerator cache. Yet, the code > is equivalent. > Below is (some portion of the) generated code for two runs of the query: > run-1 > {noformat} > import java.nio.ByteBuffer; > import java.nio.ByteOrder; > import scala.collection.Iterator; > import org.apache.spark.sql.types.DataType; > import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder; > import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter; > import org.apache.spark.sql.execution.columnar.MutableUnsafeRow; > public SpecificColumnarIterator generate(Object[] references) { > return new SpecificColumnarIterator(); > } > class SpecificColumnarIterator extends > org.apache.spark.sql.execution.columnar.ColumnarIterator { > private ByteOrder nativeOrder = null; > private byte[][] buffers = null; > private UnsafeRow unsafeRow = new UnsafeRow(7); > private BufferHolder bufferHolder = new BufferHolder(unsafeRow); > private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7); > private MutableUnsafeRow mutableRow = null; > private int currentRow = 0; > private int numRowsInBatch = 0; > private scala.collection.Iterator input = null; > private DataType[] columnTypes = null; > private int[] columnIndexes = null; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor1; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor2; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor3; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor4; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor5; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor6; > public SpecificColumnarIterator() { > this.nativeOrder = ByteOrder.nativeOrder(); > this.buffers = new byte[7][]; > this.mutableRow = new MutableUnsafeRow(rowWriter); > } > public void initialize(Iterator input, DataType[] columnTypes, int[] > columnIndexes) { > this.input = input; > this.columnTypes = columnTypes; > this.columnIndexes = columnIndexes; > } > public boolean hasNext() { > if (currentRow < numRowsInBatch) { > return true; > } > if (!input.hasNext()) { > return false; > } > org.apache.spark.sql.execution.columnar.CachedBatch batch = > (org.apache.spark.sql.execution.columnar.CachedBatch) input.next(); > currentRow = 0; > numRowsInBatch = batch.numRows(); > for (int i = 0; i < columnIndexes.length; i ++) { > buffers[i] = batch.buffers()[columnIndexes[i]]; > } > accessor = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[0]).order(nativeOrder)); > accessor1 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[1]).order(nativeOrder)); > accessor2 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[2]).order(nativeOrder)); > accessor3 = new > org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[3]).order(nativeOrder)); > accessor4 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[4]).order(nativeOrder)); > accessor5 = new >
[jira] [Commented] (SPARK-18394) Executing the same query twice in a row results in CodeGenerator cache misses
[ https://issues.apache.org/jira/browse/SPARK-18394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688333#comment-15688333 ] Jonny Serencsa commented on SPARK-18394: Yes I would. > Executing the same query twice in a row results in CodeGenerator cache misses > - > > Key: SPARK-18394 > URL: https://issues.apache.org/jira/browse/SPARK-18394 > Project: Spark > Issue Type: Bug > Components: SQL > Environment: HiveThriftServer2 running on branch-2.0 on Mac laptop >Reporter: Jonny Serencsa > > Executing the query: > {noformat} > select > l_returnflag, > l_linestatus, > sum(l_quantity) as sum_qty, > sum(l_extendedprice) as sum_base_price, > sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, > sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, > avg(l_quantity) as avg_qty, > avg(l_extendedprice) as avg_price, > avg(l_discount) as avg_disc, > count(*) as count_order > from > lineitem_1_row > where > l_shipdate <= date_sub('1998-12-01', '90') > group by > l_returnflag, > l_linestatus > ; > {noformat} > twice (in succession), will result in CodeGenerator cache misses in BOTH > executions. Since the query is identical, I would expect the same code to be > generated. > Turns out, the generated code is not exactly the same, resulting in cache > misses when performing the lookup in the CodeGenerator cache. Yet, the code > is equivalent. > Below is (some portion of the) generated code for two runs of the query: > run-1 > {noformat} > import java.nio.ByteBuffer; > import java.nio.ByteOrder; > import scala.collection.Iterator; > import org.apache.spark.sql.types.DataType; > import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder; > import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter; > import org.apache.spark.sql.execution.columnar.MutableUnsafeRow; > public SpecificColumnarIterator generate(Object[] references) { > return new SpecificColumnarIterator(); > } > class SpecificColumnarIterator extends > org.apache.spark.sql.execution.columnar.ColumnarIterator { > private ByteOrder nativeOrder = null; > private byte[][] buffers = null; > private UnsafeRow unsafeRow = new UnsafeRow(7); > private BufferHolder bufferHolder = new BufferHolder(unsafeRow); > private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7); > private MutableUnsafeRow mutableRow = null; > private int currentRow = 0; > private int numRowsInBatch = 0; > private scala.collection.Iterator input = null; > private DataType[] columnTypes = null; > private int[] columnIndexes = null; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor1; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor2; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor3; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor4; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor5; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor6; > public SpecificColumnarIterator() { > this.nativeOrder = ByteOrder.nativeOrder(); > this.buffers = new byte[7][]; > this.mutableRow = new MutableUnsafeRow(rowWriter); > } > public void initialize(Iterator input, DataType[] columnTypes, int[] > columnIndexes) { > this.input = input; > this.columnTypes = columnTypes; > this.columnIndexes = columnIndexes; > } > public boolean hasNext() { > if (currentRow < numRowsInBatch) { > return true; > } > if (!input.hasNext()) { > return false; > } > org.apache.spark.sql.execution.columnar.CachedBatch batch = > (org.apache.spark.sql.execution.columnar.CachedBatch) input.next(); > currentRow = 0; > numRowsInBatch = batch.numRows(); > for (int i = 0; i < columnIndexes.length; i ++) { > buffers[i] = batch.buffers()[columnIndexes[i]]; > } > accessor = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[0]).order(nativeOrder)); > accessor1 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[1]).order(nativeOrder)); > accessor2 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[2]).order(nativeOrder)); > accessor3 = new > org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[3]).order(nativeOrder)); > accessor4 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[4]).order(nativeOrder)); > accessor5 = new >
[jira] [Commented] (SPARK-18394) Executing the same query twice in a row results in CodeGenerator cache misses
[ https://issues.apache.org/jira/browse/SPARK-18394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688330#comment-15688330 ] Herman van Hovell commented on SPARK-18394: --- Nice, this is a good find. I think we either need to make AttributeSet iteration deterministic, or make a change to the following line in the SparkPlanner: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala#L96 Would you be interested in working on this? > Executing the same query twice in a row results in CodeGenerator cache misses > - > > Key: SPARK-18394 > URL: https://issues.apache.org/jira/browse/SPARK-18394 > Project: Spark > Issue Type: Bug > Components: SQL > Environment: HiveThriftServer2 running on branch-2.0 on Mac laptop >Reporter: Jonny Serencsa > > Executing the query: > {noformat} > select > l_returnflag, > l_linestatus, > sum(l_quantity) as sum_qty, > sum(l_extendedprice) as sum_base_price, > sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, > sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, > avg(l_quantity) as avg_qty, > avg(l_extendedprice) as avg_price, > avg(l_discount) as avg_disc, > count(*) as count_order > from > lineitem_1_row > where > l_shipdate <= date_sub('1998-12-01', '90') > group by > l_returnflag, > l_linestatus > ; > {noformat} > twice (in succession), will result in CodeGenerator cache misses in BOTH > executions. Since the query is identical, I would expect the same code to be > generated. > Turns out, the generated code is not exactly the same, resulting in cache > misses when performing the lookup in the CodeGenerator cache. Yet, the code > is equivalent. > Below is (some portion of the) generated code for two runs of the query: > run-1 > {noformat} > import java.nio.ByteBuffer; > import java.nio.ByteOrder; > import scala.collection.Iterator; > import org.apache.spark.sql.types.DataType; > import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder; > import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter; > import org.apache.spark.sql.execution.columnar.MutableUnsafeRow; > public SpecificColumnarIterator generate(Object[] references) { > return new SpecificColumnarIterator(); > } > class SpecificColumnarIterator extends > org.apache.spark.sql.execution.columnar.ColumnarIterator { > private ByteOrder nativeOrder = null; > private byte[][] buffers = null; > private UnsafeRow unsafeRow = new UnsafeRow(7); > private BufferHolder bufferHolder = new BufferHolder(unsafeRow); > private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7); > private MutableUnsafeRow mutableRow = null; > private int currentRow = 0; > private int numRowsInBatch = 0; > private scala.collection.Iterator input = null; > private DataType[] columnTypes = null; > private int[] columnIndexes = null; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor1; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor2; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor3; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor4; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor5; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor6; > public SpecificColumnarIterator() { > this.nativeOrder = ByteOrder.nativeOrder(); > this.buffers = new byte[7][]; > this.mutableRow = new MutableUnsafeRow(rowWriter); > } > public void initialize(Iterator input, DataType[] columnTypes, int[] > columnIndexes) { > this.input = input; > this.columnTypes = columnTypes; > this.columnIndexes = columnIndexes; > } > public boolean hasNext() { > if (currentRow < numRowsInBatch) { > return true; > } > if (!input.hasNext()) { > return false; > } > org.apache.spark.sql.execution.columnar.CachedBatch batch = > (org.apache.spark.sql.execution.columnar.CachedBatch) input.next(); > currentRow = 0; > numRowsInBatch = batch.numRows(); > for (int i = 0; i < columnIndexes.length; i ++) { > buffers[i] = batch.buffers()[columnIndexes[i]]; > } > accessor = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[0]).order(nativeOrder)); > accessor1 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[1]).order(nativeOrder)); > accessor2 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[2]).order(nativeOrder)); > accessor3 = new >
[jira] [Commented] (SPARK-18550) Make the queue capacity of LiveListenerBus configurable.
[ https://issues.apache.org/jira/browse/SPARK-18550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688323#comment-15688323 ] Marcelo Vanzin commented on SPARK-18550: SPARK-15703? > Make the queue capacity of LiveListenerBus configurable. > > > Key: SPARK-18550 > URL: https://issues.apache.org/jira/browse/SPARK-18550 > Project: Spark > Issue Type: Bug > Components: Spark Core >Reporter: Zhan Zhang >Priority: Minor > > We meet issues that driver listener bus cannot catch up the speed of incoming > event. Current value is fixed as 1000. This value should be configurable per > job. Otherwise, when event is dropped, the UI is totally useless. > Bus: Dropping SparkListenerEvent because no remaining room in event queue. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-18550) Make the queue capacity of LiveListenerBus configurable.
Zhan Zhang created SPARK-18550: -- Summary: Make the queue capacity of LiveListenerBus configurable. Key: SPARK-18550 URL: https://issues.apache.org/jira/browse/SPARK-18550 Project: Spark Issue Type: Bug Components: Spark Core Reporter: Zhan Zhang Priority: Minor We meet issues that driver listener bus cannot catch up the speed of incoming event. Current value is fixed as 1000. This value should be configurable per job. Otherwise, when event is dropped, the UI is totally useless. Bus: Dropping SparkListenerEvent because no remaining room in event queue. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18169) Suppress warnings when dropping views on a dropped table
[ https://issues.apache.org/jira/browse/SPARK-18169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688303#comment-15688303 ] Dongjoon Hyun commented on SPARK-18169: --- This issue is superceded by SPARK-18549 . > Suppress warnings when dropping views on a dropped table > > > Key: SPARK-18169 > URL: https://issues.apache.org/jira/browse/SPARK-18169 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.0.1 >Reporter: Dongjoon Hyun >Priority: Minor > > Apache Spark 2.0.0 ~ 2.0.2-rc1 shows an inconsistent *AnalysisException* > warning message when dropping a *view* on a dropped table. This does not > happen on dropping *temporary views*. Also, Spark 1.6.x does not show > warnings. We had better suppress this to be more consistent in Spark 2.x and > with Spark 1.6.x. > {code} > scala> sql("create table t(a int)") > scala> sql("create view v as select * from t") > scala> sql("create temporary view tv as select * from t") > scala> sql("drop table t") > scala> sql("drop view tv") > scala> sql("drop view v") > 16/10/29 15:50:03 WARN DropTableCommand: > org.apache.spark.sql.AnalysisException: Table or view not found: > `default`.`t`; line 1 pos 91; > 'SubqueryAlias v, `default`.`v` > +- 'Project ['gen_attr_0 AS a#19] >+- 'SubqueryAlias t > +- 'Project ['gen_attr_0] > +- 'SubqueryAlias gen_subquery_0 > +- 'Project ['a AS gen_attr_0#18] >+- 'UnresolvedRelation `default`.`t` > org.apache.spark.sql.AnalysisException: Table or view not found: > `default`.`t`; line 1 pos 91; > 'SubqueryAlias v, `default`.`v` > +- 'Project ['gen_attr_0 AS a#19] >+- 'SubqueryAlias t > +- 'Project ['gen_attr_0] > +- 'SubqueryAlias gen_subquery_0 > +- 'Project ['a AS gen_attr_0#18] >+- 'UnresolvedRelation `default`.`t` > ... > res5: org.apache.spark.sql.DataFrame = [] > {code} > Note that this is different case of dropping non-exist view. For the > non-exist view, Spark raises NoSuchTableException. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18169) Suppress warnings when dropping views on a dropped table
[ https://issues.apache.org/jira/browse/SPARK-18169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-18169: -- Target Version/s: (was: 2.1.0) > Suppress warnings when dropping views on a dropped table > > > Key: SPARK-18169 > URL: https://issues.apache.org/jira/browse/SPARK-18169 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.0.1 >Reporter: Dongjoon Hyun >Priority: Minor > > Apache Spark 2.0.0 ~ 2.0.2-rc1 shows an inconsistent *AnalysisException* > warning message when dropping a *view* on a dropped table. This does not > happen on dropping *temporary views*. Also, Spark 1.6.x does not show > warnings. We had better suppress this to be more consistent in Spark 2.x and > with Spark 1.6.x. > {code} > scala> sql("create table t(a int)") > scala> sql("create view v as select * from t") > scala> sql("create temporary view tv as select * from t") > scala> sql("drop table t") > scala> sql("drop view tv") > scala> sql("drop view v") > 16/10/29 15:50:03 WARN DropTableCommand: > org.apache.spark.sql.AnalysisException: Table or view not found: > `default`.`t`; line 1 pos 91; > 'SubqueryAlias v, `default`.`v` > +- 'Project ['gen_attr_0 AS a#19] >+- 'SubqueryAlias t > +- 'Project ['gen_attr_0] > +- 'SubqueryAlias gen_subquery_0 > +- 'Project ['a AS gen_attr_0#18] >+- 'UnresolvedRelation `default`.`t` > org.apache.spark.sql.AnalysisException: Table or view not found: > `default`.`t`; line 1 pos 91; > 'SubqueryAlias v, `default`.`v` > +- 'Project ['gen_attr_0 AS a#19] >+- 'SubqueryAlias t > +- 'Project ['gen_attr_0] > +- 'SubqueryAlias gen_subquery_0 > +- 'Project ['a AS gen_attr_0#18] >+- 'UnresolvedRelation `default`.`t` > ... > res5: org.apache.spark.sql.DataFrame = [] > {code} > Note that this is different case of dropping non-exist view. For the > non-exist view, Spark raises NoSuchTableException. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-18169) Suppress warnings when dropping views on a dropped table
[ https://issues.apache.org/jira/browse/SPARK-18169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun closed SPARK-18169. - Resolution: Fixed > Suppress warnings when dropping views on a dropped table > > > Key: SPARK-18169 > URL: https://issues.apache.org/jira/browse/SPARK-18169 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.0.1 >Reporter: Dongjoon Hyun >Priority: Minor > > Apache Spark 2.0.0 ~ 2.0.2-rc1 shows an inconsistent *AnalysisException* > warning message when dropping a *view* on a dropped table. This does not > happen on dropping *temporary views*. Also, Spark 1.6.x does not show > warnings. We had better suppress this to be more consistent in Spark 2.x and > with Spark 1.6.x. > {code} > scala> sql("create table t(a int)") > scala> sql("create view v as select * from t") > scala> sql("create temporary view tv as select * from t") > scala> sql("drop table t") > scala> sql("drop view tv") > scala> sql("drop view v") > 16/10/29 15:50:03 WARN DropTableCommand: > org.apache.spark.sql.AnalysisException: Table or view not found: > `default`.`t`; line 1 pos 91; > 'SubqueryAlias v, `default`.`v` > +- 'Project ['gen_attr_0 AS a#19] >+- 'SubqueryAlias t > +- 'Project ['gen_attr_0] > +- 'SubqueryAlias gen_subquery_0 > +- 'Project ['a AS gen_attr_0#18] >+- 'UnresolvedRelation `default`.`t` > org.apache.spark.sql.AnalysisException: Table or view not found: > `default`.`t`; line 1 pos 91; > 'SubqueryAlias v, `default`.`v` > +- 'Project ['gen_attr_0 AS a#19] >+- 'SubqueryAlias t > +- 'Project ['gen_attr_0] > +- 'SubqueryAlias gen_subquery_0 > +- 'Project ['a AS gen_attr_0#18] >+- 'UnresolvedRelation `default`.`t` > ... > res5: org.apache.spark.sql.DataFrame = [] > {code} > Note that this is different case of dropping non-exist view. For the > non-exist view, Spark raises NoSuchTableException. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18549) Failed to Uncache a View that References a Dropped Table.
[ https://issues.apache.org/jira/browse/SPARK-18549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688286#comment-15688286 ] Dongjoon Hyun commented on SPARK-18549: --- `InMemoryRelation` of `CachedData` has `tableName: Option[String]`. Can we use that? > Failed to Uncache a View that References a Dropped Table. > - > > Key: SPARK-18549 > URL: https://issues.apache.org/jira/browse/SPARK-18549 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2 >Reporter: Xiao Li >Priority: Critical > > {code} > spark.range(1, 10).toDF("id1").write.format("json").saveAsTable("jt1") > spark.range(1, 10).toDF("id2").write.format("json").saveAsTable("jt2") > sql("CREATE VIEW testView AS SELECT * FROM jt1 JOIN jt2 ON id1 == id2") > // Cache is empty at the beginning > assert(spark.sharedState.cacheManager.isEmpty) > sql("CACHE TABLE testView") > assert(spark.catalog.isCached("testView")) > // Cache is not empty > assert(!spark.sharedState.cacheManager.isEmpty) > {code} > {code} > // drop a table referenced by a cached view > sql("DROP TABLE jt1") > -- So far everything is fine > // Failed to unache the view > val e = intercept[AnalysisException] { > sql("UNCACHE TABLE testView") > }.getMessage > assert(e.contains("Table or view not found: `default`.`jt1`")) > // We are unable to drop it from the cache > assert(!spark.sharedState.cacheManager.isEmpty) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18549) Failed to Uncache a View that References a Dropped Table.
[ https://issues.apache.org/jira/browse/SPARK-18549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688274#comment-15688274 ] Xiao Li commented on SPARK-18549: - cc [~hvanhovell] [~dongjoon] [~rxin] [~marmbrus] [~cloud_fan] Any suggestion is welcomed. > Failed to Uncache a View that References a Dropped Table. > - > > Key: SPARK-18549 > URL: https://issues.apache.org/jira/browse/SPARK-18549 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2 >Reporter: Xiao Li >Priority: Critical > > {code} > spark.range(1, 10).toDF("id1").write.format("json").saveAsTable("jt1") > spark.range(1, 10).toDF("id2").write.format("json").saveAsTable("jt2") > sql("CREATE VIEW testView AS SELECT * FROM jt1 JOIN jt2 ON id1 == id2") > // Cache is empty at the beginning > assert(spark.sharedState.cacheManager.isEmpty) > sql("CACHE TABLE testView") > assert(spark.catalog.isCached("testView")) > // Cache is not empty > assert(!spark.sharedState.cacheManager.isEmpty) > {code} > {code} > // drop a table referenced by a cached view > sql("DROP TABLE jt1") > -- So far everything is fine > // Failed to unache the view > val e = intercept[AnalysisException] { > sql("UNCACHE TABLE testView") > }.getMessage > assert(e.contains("Table or view not found: `default`.`jt1`")) > // We are unable to drop it from the cache > assert(!spark.sharedState.cacheManager.isEmpty) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18549) Failed to Uncache a View that References a Dropped Table.
[ https://issues.apache.org/jira/browse/SPARK-18549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li updated SPARK-18549: Priority: Critical (was: Major) > Failed to Uncache a View that References a Dropped Table. > - > > Key: SPARK-18549 > URL: https://issues.apache.org/jira/browse/SPARK-18549 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2 >Reporter: Xiao Li >Priority: Critical > > {code} > spark.range(1, 10).toDF("id1").write.format("json").saveAsTable("jt1") > spark.range(1, 10).toDF("id2").write.format("json").saveAsTable("jt2") > sql("CREATE VIEW testView AS SELECT * FROM jt1 JOIN jt2 ON id1 == id2") > // Cache is empty at the beginning > assert(spark.sharedState.cacheManager.isEmpty) > sql("CACHE TABLE testView") > assert(spark.catalog.isCached("testView")) > // Cache is not empty > assert(!spark.sharedState.cacheManager.isEmpty) > {code} > {code} > // drop a table referenced by a cached view > sql("DROP TABLE jt1") > -- So far everything is fine > // Failed to unache the view > val e = intercept[AnalysisException] { > sql("UNCACHE TABLE testView") > }.getMessage > assert(e.contains("Table or view not found: `default`.`jt1`")) > // We are unable to drop it from the cache > assert(!spark.sharedState.cacheManager.isEmpty) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18549) Failed to Uncache a View that References a Dropped Table.
[ https://issues.apache.org/jira/browse/SPARK-18549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li updated SPARK-18549: Description: {code} spark.range(1, 10).toDF("id1").write.format("json").saveAsTable("jt1") spark.range(1, 10).toDF("id2").write.format("json").saveAsTable("jt2") sql("CREATE VIEW testView AS SELECT * FROM jt1 JOIN jt2 ON id1 == id2") // Cache is empty at the beginning assert(spark.sharedState.cacheManager.isEmpty) sql("CACHE TABLE testView") assert(spark.catalog.isCached("testView")) // Cache is not empty assert(!spark.sharedState.cacheManager.isEmpty) // drop a table referenced by a cached view sql("DROP TABLE jt1") was: {code} spark.range(1, 10).toDF("id1").write.format("json").saveAsTable("jt1") spark.range(1, 10).toDF("id2").write.format("json").saveAsTable("jt2") sql("CREATE VIEW testView AS SELECT * FROM jt1 JOIN jt2 ON id1 == id2") // Cache is empty at the beginning assert(spark.sharedState.cacheManager.isEmpty) sql("CACHE TABLE testView") assert(spark.catalog.isCached("testView")) // Cache is not empty assert(!spark.sharedState.cacheManager.isEmpty) // drop a table referenced by a cached view sql("DROP TABLE jt1") -- So far everything is fine // Failed to unache the view val e = intercept[AnalysisException] { sql("UNCACHE TABLE testView") }.getMessage assert(e.contains("Table or view not found: `default`.`jt1`")) // We are unable to drop it from the cache assert(!spark.sharedState.cacheManager.isEmpty) {code} > Failed to Uncache a View that References a Dropped Table. > - > > Key: SPARK-18549 > URL: https://issues.apache.org/jira/browse/SPARK-18549 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2 >Reporter: Xiao Li > > {code} > spark.range(1, 10).toDF("id1").write.format("json").saveAsTable("jt1") > spark.range(1, 10).toDF("id2").write.format("json").saveAsTable("jt2") > sql("CREATE VIEW testView AS SELECT * FROM jt1 JOIN jt2 ON id1 == id2") > // Cache is empty at the beginning > assert(spark.sharedState.cacheManager.isEmpty) > sql("CACHE TABLE testView") > assert(spark.catalog.isCached("testView")) > // Cache is not empty > assert(!spark.sharedState.cacheManager.isEmpty) > // drop a table referenced by a cached view > sql("DROP TABLE jt1") -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18549) Failed to Uncache a View that References a Dropped Table.
[ https://issues.apache.org/jira/browse/SPARK-18549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li updated SPARK-18549: Description: {code} spark.range(1, 10).toDF("id1").write.format("json").saveAsTable("jt1") spark.range(1, 10).toDF("id2").write.format("json").saveAsTable("jt2") sql("CREATE VIEW testView AS SELECT * FROM jt1 JOIN jt2 ON id1 == id2") // Cache is empty at the beginning assert(spark.sharedState.cacheManager.isEmpty) sql("CACHE TABLE testView") assert(spark.catalog.isCached("testView")) // Cache is not empty assert(!spark.sharedState.cacheManager.isEmpty) {code} {code} // drop a table referenced by a cached view sql("DROP TABLE jt1") -- So far everything is fine // Failed to unache the view val e = intercept[AnalysisException] { sql("UNCACHE TABLE testView") }.getMessage assert(e.contains("Table or view not found: `default`.`jt1`")) // We are unable to drop it from the cache assert(!spark.sharedState.cacheManager.isEmpty) {code} was: {code} spark.range(1, 10).toDF("id1").write.format("json").saveAsTable("jt1") spark.range(1, 10).toDF("id2").write.format("json").saveAsTable("jt2") sql("CREATE VIEW testView AS SELECT * FROM jt1 JOIN jt2 ON id1 == id2") // Cache is empty at the beginning assert(spark.sharedState.cacheManager.isEmpty) sql("CACHE TABLE testView") assert(spark.catalog.isCached("testView")) // Cache is not empty assert(!spark.sharedState.cacheManager.isEmpty) // drop a table referenced by a cached view sql("DROP TABLE jt1") > Failed to Uncache a View that References a Dropped Table. > - > > Key: SPARK-18549 > URL: https://issues.apache.org/jira/browse/SPARK-18549 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2 >Reporter: Xiao Li > > {code} > spark.range(1, 10).toDF("id1").write.format("json").saveAsTable("jt1") > spark.range(1, 10).toDF("id2").write.format("json").saveAsTable("jt2") > sql("CREATE VIEW testView AS SELECT * FROM jt1 JOIN jt2 ON id1 == id2") > // Cache is empty at the beginning > assert(spark.sharedState.cacheManager.isEmpty) > sql("CACHE TABLE testView") > assert(spark.catalog.isCached("testView")) > // Cache is not empty > assert(!spark.sharedState.cacheManager.isEmpty) > {code} > {code} > // drop a table referenced by a cached view > sql("DROP TABLE jt1") > -- So far everything is fine > // Failed to unache the view > val e = intercept[AnalysisException] { > sql("UNCACHE TABLE testView") > }.getMessage > assert(e.contains("Table or view not found: `default`.`jt1`")) > // We are unable to drop it from the cache > assert(!spark.sharedState.cacheManager.isEmpty) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-18549) Failed to Uncache a View that References a Dropped Table.
Xiao Li created SPARK-18549: --- Summary: Failed to Uncache a View that References a Dropped Table. Key: SPARK-18549 URL: https://issues.apache.org/jira/browse/SPARK-18549 Project: Spark Issue Type: Bug Components: SQL Reporter: Xiao Li -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18549) Failed to Uncache a View that References a Dropped Table.
[ https://issues.apache.org/jira/browse/SPARK-18549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li updated SPARK-18549: Affects Version/s: 2.0.2 Target Version/s: 2.1.0 Description: {code} spark.range(1, 10).toDF("id1").write.format("json").saveAsTable("jt1") spark.range(1, 10).toDF("id2").write.format("json").saveAsTable("jt2") sql("CREATE VIEW testView AS SELECT * FROM jt1 JOIN jt2 ON id1 == id2") // Cache is empty at the beginning assert(spark.sharedState.cacheManager.isEmpty) sql("CACHE TABLE testView") assert(spark.catalog.isCached("testView")) // Cache is not empty assert(!spark.sharedState.cacheManager.isEmpty) // drop a table referenced by a cached view sql("DROP TABLE jt1") -- So far everything is fine // Failed to unache the view val e = intercept[AnalysisException] { sql("UNCACHE TABLE testView") }.getMessage assert(e.contains("Table or view not found: `default`.`jt1`")) // We are unable to drop it from the cache assert(!spark.sharedState.cacheManager.isEmpty) {code} > Failed to Uncache a View that References a Dropped Table. > - > > Key: SPARK-18549 > URL: https://issues.apache.org/jira/browse/SPARK-18549 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2 >Reporter: Xiao Li > > {code} > spark.range(1, 10).toDF("id1").write.format("json").saveAsTable("jt1") > spark.range(1, 10).toDF("id2").write.format("json").saveAsTable("jt2") > sql("CREATE VIEW testView AS SELECT * FROM jt1 JOIN jt2 ON id1 == id2") > // Cache is empty at the beginning > assert(spark.sharedState.cacheManager.isEmpty) > sql("CACHE TABLE testView") > assert(spark.catalog.isCached("testView")) > // Cache is not empty > assert(!spark.sharedState.cacheManager.isEmpty) > // drop a table referenced by a cached view > sql("DROP TABLE jt1") > -- So far everything is fine > // Failed to unache the view > val e = intercept[AnalysisException] { > sql("UNCACHE TABLE testView") > }.getMessage > assert(e.contains("Table or view not found: `default`.`jt1`")) > // We are unable to drop it from the cache > assert(!spark.sharedState.cacheManager.isEmpty) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-16803) SaveAsTable does not work when source DataFrame is built on a Hive Table
[ https://issues.apache.org/jira/browse/SPARK-16803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li reassigned SPARK-16803: --- Assignee: Xiao Li > SaveAsTable does not work when source DataFrame is built on a Hive Table > > > Key: SPARK-16803 > URL: https://issues.apache.org/jira/browse/SPARK-16803 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 >Reporter: Xiao Li >Assignee: Xiao Li > Fix For: 2.1.0 > > > {noformat} > scala> sql("create table sample.sample stored as SEQUENCEFILE as select 1 as > key, 'abc' as value") > res2: org.apache.spark.sql.DataFrame = [] > scala> val df = sql("select key, value as value from sample.sample") > df: org.apache.spark.sql.DataFrame = [key: int, value: string] > scala> df.write.mode("append").saveAsTable("sample.sample") > scala> sql("select * from sample.sample").show() > +---+-+ > |key|value| > +---+-+ > | 1| abc| > | 1| abc| > +---+-+ > {noformat} > In Spark 1.6, it works, but Spark 2.0 does not work. The error message from > Spark 2.0 is > {noformat} > scala> df.write.mode("append").saveAsTable("sample.sample") > org.apache.spark.sql.AnalysisException: Saving data in MetastoreRelation > sample, sample > is not supported.; > {noformat} > So far, we do not plan to support it in Spark 2.0. Spark 1.6 works because it > internally uses {{insertInto}}. But, if we change it back it will break the > semantic of {{saveAsTable}} (this method uses by-name resolution instead of > using by-position resolution used by {{insertInto}}). > Instead, users should use {{insertInto}} API. We should correct the error > messages. Users can understand how to bypass it before we support it. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18394) Executing the same query twice in a row results in CodeGenerator cache misses
[ https://issues.apache.org/jira/browse/SPARK-18394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688244#comment-15688244 ] Jonny Serencsa commented on SPARK-18394: Already did that. The problem happens with both HiveScanExec and InMemoryScanExec. The indeterminate ordering is an artifact of the hash code for AttributeEquals involving a hash code of the exprId. Thus, when you iterate through an AttributeSet, the order or the attributes is not consistent. > Executing the same query twice in a row results in CodeGenerator cache misses > - > > Key: SPARK-18394 > URL: https://issues.apache.org/jira/browse/SPARK-18394 > Project: Spark > Issue Type: Bug > Components: SQL > Environment: HiveThriftServer2 running on branch-2.0 on Mac laptop >Reporter: Jonny Serencsa > > Executing the query: > {noformat} > select > l_returnflag, > l_linestatus, > sum(l_quantity) as sum_qty, > sum(l_extendedprice) as sum_base_price, > sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, > sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, > avg(l_quantity) as avg_qty, > avg(l_extendedprice) as avg_price, > avg(l_discount) as avg_disc, > count(*) as count_order > from > lineitem_1_row > where > l_shipdate <= date_sub('1998-12-01', '90') > group by > l_returnflag, > l_linestatus > ; > {noformat} > twice (in succession), will result in CodeGenerator cache misses in BOTH > executions. Since the query is identical, I would expect the same code to be > generated. > Turns out, the generated code is not exactly the same, resulting in cache > misses when performing the lookup in the CodeGenerator cache. Yet, the code > is equivalent. > Below is (some portion of the) generated code for two runs of the query: > run-1 > {noformat} > import java.nio.ByteBuffer; > import java.nio.ByteOrder; > import scala.collection.Iterator; > import org.apache.spark.sql.types.DataType; > import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder; > import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter; > import org.apache.spark.sql.execution.columnar.MutableUnsafeRow; > public SpecificColumnarIterator generate(Object[] references) { > return new SpecificColumnarIterator(); > } > class SpecificColumnarIterator extends > org.apache.spark.sql.execution.columnar.ColumnarIterator { > private ByteOrder nativeOrder = null; > private byte[][] buffers = null; > private UnsafeRow unsafeRow = new UnsafeRow(7); > private BufferHolder bufferHolder = new BufferHolder(unsafeRow); > private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7); > private MutableUnsafeRow mutableRow = null; > private int currentRow = 0; > private int numRowsInBatch = 0; > private scala.collection.Iterator input = null; > private DataType[] columnTypes = null; > private int[] columnIndexes = null; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor1; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor2; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor3; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor4; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor5; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor6; > public SpecificColumnarIterator() { > this.nativeOrder = ByteOrder.nativeOrder(); > this.buffers = new byte[7][]; > this.mutableRow = new MutableUnsafeRow(rowWriter); > } > public void initialize(Iterator input, DataType[] columnTypes, int[] > columnIndexes) { > this.input = input; > this.columnTypes = columnTypes; > this.columnIndexes = columnIndexes; > } > public boolean hasNext() { > if (currentRow < numRowsInBatch) { > return true; > } > if (!input.hasNext()) { > return false; > } > org.apache.spark.sql.execution.columnar.CachedBatch batch = > (org.apache.spark.sql.execution.columnar.CachedBatch) input.next(); > currentRow = 0; > numRowsInBatch = batch.numRows(); > for (int i = 0; i < columnIndexes.length; i ++) { > buffers[i] = batch.buffers()[columnIndexes[i]]; > } > accessor = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[0]).order(nativeOrder)); > accessor1 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[1]).order(nativeOrder)); > accessor2 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[2]).order(nativeOrder)); > accessor3 = new >
[jira] [Commented] (SPARK-18475) Be able to provide higher parallelization for StructuredStreaming Kafka Source
[ https://issues.apache.org/jira/browse/SPARK-18475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688246#comment-15688246 ] Ofir Manor commented on SPARK-18475: Cody, for me your are the main gatekeeper for everything Kafka and the main Kafka expert, so I wanted your perspective, not Michael's (except the generic "order" guarantee, which I still think does not exist). I thought that if someone did the effort of building, testing and trying to contribute it, it is an indication that it hurts in the real world, especially when you said it is a repeated request. I guess in many places, getting a read access to a potentially huge, shared topic is not the same as having Kafka admin rights or being the only or main consumer or being able to easily fix bad past decisions around partitions and keys... Anyway, it is totally up to you, you'll have to maintain it. I personally have no use for this feature. > Be able to provide higher parallelization for StructuredStreaming Kafka Source > -- > > Key: SPARK-18475 > URL: https://issues.apache.org/jira/browse/SPARK-18475 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.0.2, 2.1.0 >Reporter: Burak Yavuz > > Right now the StructuredStreaming Kafka Source creates as many Spark tasks as > there are TopicPartitions that we're going to read from Kafka. > This doesn't work well when we have data skew, and there is no reason why we > shouldn't be able to increase parallelism further, i.e. have multiple Spark > tasks reading from the same Kafka TopicPartition. > What this will mean is that we won't be able to use the "CachedKafkaConsumer" > for what it is defined for (being cached) in this use case, but the extra > overhead is worth handling data skew and increasing parallelism especially in > ETL use cases. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18394) Executing the same query twice in a row results in CodeGenerator cache misses
[ https://issues.apache.org/jira/browse/SPARK-18394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Herman van Hovell updated SPARK-18394: -- Shepherd: Herman van Hovell Target Version/s: 2.2.0 > Executing the same query twice in a row results in CodeGenerator cache misses > - > > Key: SPARK-18394 > URL: https://issues.apache.org/jira/browse/SPARK-18394 > Project: Spark > Issue Type: Bug > Components: SQL > Environment: HiveThriftServer2 running on branch-2.0 on Mac laptop >Reporter: Jonny Serencsa > > Executing the query: > {noformat} > select > l_returnflag, > l_linestatus, > sum(l_quantity) as sum_qty, > sum(l_extendedprice) as sum_base_price, > sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, > sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, > avg(l_quantity) as avg_qty, > avg(l_extendedprice) as avg_price, > avg(l_discount) as avg_disc, > count(*) as count_order > from > lineitem_1_row > where > l_shipdate <= date_sub('1998-12-01', '90') > group by > l_returnflag, > l_linestatus > ; > {noformat} > twice (in succession), will result in CodeGenerator cache misses in BOTH > executions. Since the query is identical, I would expect the same code to be > generated. > Turns out, the generated code is not exactly the same, resulting in cache > misses when performing the lookup in the CodeGenerator cache. Yet, the code > is equivalent. > Below is (some portion of the) generated code for two runs of the query: > run-1 > {noformat} > import java.nio.ByteBuffer; > import java.nio.ByteOrder; > import scala.collection.Iterator; > import org.apache.spark.sql.types.DataType; > import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder; > import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter; > import org.apache.spark.sql.execution.columnar.MutableUnsafeRow; > public SpecificColumnarIterator generate(Object[] references) { > return new SpecificColumnarIterator(); > } > class SpecificColumnarIterator extends > org.apache.spark.sql.execution.columnar.ColumnarIterator { > private ByteOrder nativeOrder = null; > private byte[][] buffers = null; > private UnsafeRow unsafeRow = new UnsafeRow(7); > private BufferHolder bufferHolder = new BufferHolder(unsafeRow); > private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7); > private MutableUnsafeRow mutableRow = null; > private int currentRow = 0; > private int numRowsInBatch = 0; > private scala.collection.Iterator input = null; > private DataType[] columnTypes = null; > private int[] columnIndexes = null; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor1; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor2; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor3; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor4; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor5; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor6; > public SpecificColumnarIterator() { > this.nativeOrder = ByteOrder.nativeOrder(); > this.buffers = new byte[7][]; > this.mutableRow = new MutableUnsafeRow(rowWriter); > } > public void initialize(Iterator input, DataType[] columnTypes, int[] > columnIndexes) { > this.input = input; > this.columnTypes = columnTypes; > this.columnIndexes = columnIndexes; > } > public boolean hasNext() { > if (currentRow < numRowsInBatch) { > return true; > } > if (!input.hasNext()) { > return false; > } > org.apache.spark.sql.execution.columnar.CachedBatch batch = > (org.apache.spark.sql.execution.columnar.CachedBatch) input.next(); > currentRow = 0; > numRowsInBatch = batch.numRows(); > for (int i = 0; i < columnIndexes.length; i ++) { > buffers[i] = batch.buffers()[columnIndexes[i]]; > } > accessor = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[0]).order(nativeOrder)); > accessor1 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[1]).order(nativeOrder)); > accessor2 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[2]).order(nativeOrder)); > accessor3 = new > org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[3]).order(nativeOrder)); > accessor4 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[4]).order(nativeOrder)); > accessor5 = new >
[jira] [Resolved] (SPARK-16803) SaveAsTable does not work when source DataFrame is built on a Hive Table
[ https://issues.apache.org/jira/browse/SPARK-16803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li resolved SPARK-16803. - Resolution: Fixed Fix Version/s: 2.1.0 Issue resolved by pull request 15926 [https://github.com/apache/spark/pull/15926] > SaveAsTable does not work when source DataFrame is built on a Hive Table > > > Key: SPARK-16803 > URL: https://issues.apache.org/jira/browse/SPARK-16803 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 >Reporter: Xiao Li > Fix For: 2.1.0 > > > {noformat} > scala> sql("create table sample.sample stored as SEQUENCEFILE as select 1 as > key, 'abc' as value") > res2: org.apache.spark.sql.DataFrame = [] > scala> val df = sql("select key, value as value from sample.sample") > df: org.apache.spark.sql.DataFrame = [key: int, value: string] > scala> df.write.mode("append").saveAsTable("sample.sample") > scala> sql("select * from sample.sample").show() > +---+-+ > |key|value| > +---+-+ > | 1| abc| > | 1| abc| > +---+-+ > {noformat} > In Spark 1.6, it works, but Spark 2.0 does not work. The error message from > Spark 2.0 is > {noformat} > scala> df.write.mode("append").saveAsTable("sample.sample") > org.apache.spark.sql.AnalysisException: Saving data in MetastoreRelation > sample, sample > is not supported.; > {noformat} > So far, we do not plan to support it in Spark 2.0. Spark 1.6 works because it > internally uses {{insertInto}}. But, if we change it back it will break the > semantic of {{saveAsTable}} (this method uses by-name resolution instead of > using by-position resolution used by {{insertInto}}). > Instead, users should use {{insertInto}} API. We should correct the error > messages. Users can understand how to bypass it before we support it. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18169) Suppress warnings when dropping views on a dropped table
[ https://issues.apache.org/jira/browse/SPARK-18169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Herman van Hovell updated SPARK-18169: -- Target Version/s: 2.1.0 > Suppress warnings when dropping views on a dropped table > > > Key: SPARK-18169 > URL: https://issues.apache.org/jira/browse/SPARK-18169 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.0.1 >Reporter: Dongjoon Hyun >Priority: Minor > > Apache Spark 2.0.0 ~ 2.0.2-rc1 shows an inconsistent *AnalysisException* > warning message when dropping a *view* on a dropped table. This does not > happen on dropping *temporary views*. Also, Spark 1.6.x does not show > warnings. We had better suppress this to be more consistent in Spark 2.x and > with Spark 1.6.x. > {code} > scala> sql("create table t(a int)") > scala> sql("create view v as select * from t") > scala> sql("create temporary view tv as select * from t") > scala> sql("drop table t") > scala> sql("drop view tv") > scala> sql("drop view v") > 16/10/29 15:50:03 WARN DropTableCommand: > org.apache.spark.sql.AnalysisException: Table or view not found: > `default`.`t`; line 1 pos 91; > 'SubqueryAlias v, `default`.`v` > +- 'Project ['gen_attr_0 AS a#19] >+- 'SubqueryAlias t > +- 'Project ['gen_attr_0] > +- 'SubqueryAlias gen_subquery_0 > +- 'Project ['a AS gen_attr_0#18] >+- 'UnresolvedRelation `default`.`t` > org.apache.spark.sql.AnalysisException: Table or view not found: > `default`.`t`; line 1 pos 91; > 'SubqueryAlias v, `default`.`v` > +- 'Project ['gen_attr_0 AS a#19] >+- 'SubqueryAlias t > +- 'Project ['gen_attr_0] > +- 'SubqueryAlias gen_subquery_0 > +- 'Project ['a AS gen_attr_0#18] >+- 'UnresolvedRelation `default`.`t` > ... > res5: org.apache.spark.sql.DataFrame = [] > {code} > Note that this is different case of dropping non-exist view. For the > non-exist view, Spark raises NoSuchTableException. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18394) Executing the same query twice in a row results in CodeGenerator cache misses
[ https://issues.apache.org/jira/browse/SPARK-18394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Herman van Hovell updated SPARK-18394: -- Priority: Major (was: Minor) > Executing the same query twice in a row results in CodeGenerator cache misses > - > > Key: SPARK-18394 > URL: https://issues.apache.org/jira/browse/SPARK-18394 > Project: Spark > Issue Type: Bug > Components: SQL > Environment: HiveThriftServer2 running on branch-2.0 on Mac laptop >Reporter: Jonny Serencsa > > Executing the query: > {noformat} > select > l_returnflag, > l_linestatus, > sum(l_quantity) as sum_qty, > sum(l_extendedprice) as sum_base_price, > sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, > sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, > avg(l_quantity) as avg_qty, > avg(l_extendedprice) as avg_price, > avg(l_discount) as avg_disc, > count(*) as count_order > from > lineitem_1_row > where > l_shipdate <= date_sub('1998-12-01', '90') > group by > l_returnflag, > l_linestatus > ; > {noformat} > twice (in succession), will result in CodeGenerator cache misses in BOTH > executions. Since the query is identical, I would expect the same code to be > generated. > Turns out, the generated code is not exactly the same, resulting in cache > misses when performing the lookup in the CodeGenerator cache. Yet, the code > is equivalent. > Below is (some portion of the) generated code for two runs of the query: > run-1 > {noformat} > import java.nio.ByteBuffer; > import java.nio.ByteOrder; > import scala.collection.Iterator; > import org.apache.spark.sql.types.DataType; > import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder; > import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter; > import org.apache.spark.sql.execution.columnar.MutableUnsafeRow; > public SpecificColumnarIterator generate(Object[] references) { > return new SpecificColumnarIterator(); > } > class SpecificColumnarIterator extends > org.apache.spark.sql.execution.columnar.ColumnarIterator { > private ByteOrder nativeOrder = null; > private byte[][] buffers = null; > private UnsafeRow unsafeRow = new UnsafeRow(7); > private BufferHolder bufferHolder = new BufferHolder(unsafeRow); > private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7); > private MutableUnsafeRow mutableRow = null; > private int currentRow = 0; > private int numRowsInBatch = 0; > private scala.collection.Iterator input = null; > private DataType[] columnTypes = null; > private int[] columnIndexes = null; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor1; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor2; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor3; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor4; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor5; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor6; > public SpecificColumnarIterator() { > this.nativeOrder = ByteOrder.nativeOrder(); > this.buffers = new byte[7][]; > this.mutableRow = new MutableUnsafeRow(rowWriter); > } > public void initialize(Iterator input, DataType[] columnTypes, int[] > columnIndexes) { > this.input = input; > this.columnTypes = columnTypes; > this.columnIndexes = columnIndexes; > } > public boolean hasNext() { > if (currentRow < numRowsInBatch) { > return true; > } > if (!input.hasNext()) { > return false; > } > org.apache.spark.sql.execution.columnar.CachedBatch batch = > (org.apache.spark.sql.execution.columnar.CachedBatch) input.next(); > currentRow = 0; > numRowsInBatch = batch.numRows(); > for (int i = 0; i < columnIndexes.length; i ++) { > buffers[i] = batch.buffers()[columnIndexes[i]]; > } > accessor = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[0]).order(nativeOrder)); > accessor1 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[1]).order(nativeOrder)); > accessor2 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[2]).order(nativeOrder)); > accessor3 = new > org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[3]).order(nativeOrder)); > accessor4 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[4]).order(nativeOrder)); > accessor5 = new > org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[5]).order(nativeOrder)); >
[jira] [Commented] (SPARK-18394) Executing the same query twice in a row results in CodeGenerator cache misses
[ https://issues.apache.org/jira/browse/SPARK-18394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688225#comment-15688225 ] Herman van Hovell commented on SPARK-18394: --- Ok, that is fair. What strikes me as odd is that the column order that the columnar cache produces is different the two both plans. This is what causes the code generator to create two different 'programs' and what in the end causes the your caching problems . Could you re-run this without the in-memory cache, and see if you are still hitting this problem. I'll have a look on my end to see what is going on in the in-memory cache. > Executing the same query twice in a row results in CodeGenerator cache misses > - > > Key: SPARK-18394 > URL: https://issues.apache.org/jira/browse/SPARK-18394 > Project: Spark > Issue Type: Bug > Components: SQL > Environment: HiveThriftServer2 running on branch-2.0 on Mac laptop >Reporter: Jonny Serencsa >Priority: Minor > > Executing the query: > {noformat} > select > l_returnflag, > l_linestatus, > sum(l_quantity) as sum_qty, > sum(l_extendedprice) as sum_base_price, > sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, > sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, > avg(l_quantity) as avg_qty, > avg(l_extendedprice) as avg_price, > avg(l_discount) as avg_disc, > count(*) as count_order > from > lineitem_1_row > where > l_shipdate <= date_sub('1998-12-01', '90') > group by > l_returnflag, > l_linestatus > ; > {noformat} > twice (in succession), will result in CodeGenerator cache misses in BOTH > executions. Since the query is identical, I would expect the same code to be > generated. > Turns out, the generated code is not exactly the same, resulting in cache > misses when performing the lookup in the CodeGenerator cache. Yet, the code > is equivalent. > Below is (some portion of the) generated code for two runs of the query: > run-1 > {noformat} > import java.nio.ByteBuffer; > import java.nio.ByteOrder; > import scala.collection.Iterator; > import org.apache.spark.sql.types.DataType; > import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder; > import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter; > import org.apache.spark.sql.execution.columnar.MutableUnsafeRow; > public SpecificColumnarIterator generate(Object[] references) { > return new SpecificColumnarIterator(); > } > class SpecificColumnarIterator extends > org.apache.spark.sql.execution.columnar.ColumnarIterator { > private ByteOrder nativeOrder = null; > private byte[][] buffers = null; > private UnsafeRow unsafeRow = new UnsafeRow(7); > private BufferHolder bufferHolder = new BufferHolder(unsafeRow); > private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7); > private MutableUnsafeRow mutableRow = null; > private int currentRow = 0; > private int numRowsInBatch = 0; > private scala.collection.Iterator input = null; > private DataType[] columnTypes = null; > private int[] columnIndexes = null; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor1; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor2; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor3; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor4; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor5; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor6; > public SpecificColumnarIterator() { > this.nativeOrder = ByteOrder.nativeOrder(); > this.buffers = new byte[7][]; > this.mutableRow = new MutableUnsafeRow(rowWriter); > } > public void initialize(Iterator input, DataType[] columnTypes, int[] > columnIndexes) { > this.input = input; > this.columnTypes = columnTypes; > this.columnIndexes = columnIndexes; > } > public boolean hasNext() { > if (currentRow < numRowsInBatch) { > return true; > } > if (!input.hasNext()) { > return false; > } > org.apache.spark.sql.execution.columnar.CachedBatch batch = > (org.apache.spark.sql.execution.columnar.CachedBatch) input.next(); > currentRow = 0; > numRowsInBatch = batch.numRows(); > for (int i = 0; i < columnIndexes.length; i ++) { > buffers[i] = batch.buffers()[columnIndexes[i]]; > } > accessor = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[0]).order(nativeOrder)); > accessor1 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[1]).order(nativeOrder)); > accessor2 = new >
[jira] [Created] (SPARK-18548) OnlineLDAOptimizer reads the same broadcast data after deletion
Xiaoye Sun created SPARK-18548: -- Summary: OnlineLDAOptimizer reads the same broadcast data after deletion Key: SPARK-18548 URL: https://issues.apache.org/jira/browse/SPARK-18548 Project: Spark Issue Type: Improvement Components: MLlib Affects Versions: 1.6.1 Reporter: Xiaoye Sun Priority: Trivial In submitMiniBatch() called by OnlineLDAOptimizer, broadcast variable expElogbeta is deleted before its use in the second time, which causes the executor reads the same large broadcast data twice. I suggest to move the broadcast data deletion (expElogbetaBc.unpersist()) later. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18394) Executing the same query twice in a row results in CodeGenerator cache misses
[ https://issues.apache.org/jira/browse/SPARK-18394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688135#comment-15688135 ] Jonny Serencsa commented on SPARK-18394: This problem was discovered during high concurrency experiments where I was running the aforementioned query thousands of times repeatedly via many concurrent clients. Eventually, after 5K-10K executions, the JVM level CodeGenCache was having to be purged resulting in 10 second long pauses. My expectation was that since the exact same query is being executed, Spark would not have to re-generate the byte code (because of it's own CodeGenCache). After removing the WHERE clauses from the query, this was in fact the case and the JVM level cache purging disappeared. I made this a Major issue because it didn't seem like Spark's CodeGenCache was working as expected. Perhaps I am mistaken. My above repro of the issue required me to set breakpoints through the debugger. > Executing the same query twice in a row results in CodeGenerator cache misses > - > > Key: SPARK-18394 > URL: https://issues.apache.org/jira/browse/SPARK-18394 > Project: Spark > Issue Type: Bug > Components: SQL > Environment: HiveThriftServer2 running on branch-2.0 on Mac laptop >Reporter: Jonny Serencsa >Priority: Minor > > Executing the query: > {noformat} > select > l_returnflag, > l_linestatus, > sum(l_quantity) as sum_qty, > sum(l_extendedprice) as sum_base_price, > sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, > sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, > avg(l_quantity) as avg_qty, > avg(l_extendedprice) as avg_price, > avg(l_discount) as avg_disc, > count(*) as count_order > from > lineitem_1_row > where > l_shipdate <= date_sub('1998-12-01', '90') > group by > l_returnflag, > l_linestatus > ; > {noformat} > twice (in succession), will result in CodeGenerator cache misses in BOTH > executions. Since the query is identical, I would expect the same code to be > generated. > Turns out, the generated code is not exactly the same, resulting in cache > misses when performing the lookup in the CodeGenerator cache. Yet, the code > is equivalent. > Below is (some portion of the) generated code for two runs of the query: > run-1 > {noformat} > import java.nio.ByteBuffer; > import java.nio.ByteOrder; > import scala.collection.Iterator; > import org.apache.spark.sql.types.DataType; > import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder; > import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter; > import org.apache.spark.sql.execution.columnar.MutableUnsafeRow; > public SpecificColumnarIterator generate(Object[] references) { > return new SpecificColumnarIterator(); > } > class SpecificColumnarIterator extends > org.apache.spark.sql.execution.columnar.ColumnarIterator { > private ByteOrder nativeOrder = null; > private byte[][] buffers = null; > private UnsafeRow unsafeRow = new UnsafeRow(7); > private BufferHolder bufferHolder = new BufferHolder(unsafeRow); > private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7); > private MutableUnsafeRow mutableRow = null; > private int currentRow = 0; > private int numRowsInBatch = 0; > private scala.collection.Iterator input = null; > private DataType[] columnTypes = null; > private int[] columnIndexes = null; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor1; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor2; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor3; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor4; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor5; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor6; > public SpecificColumnarIterator() { > this.nativeOrder = ByteOrder.nativeOrder(); > this.buffers = new byte[7][]; > this.mutableRow = new MutableUnsafeRow(rowWriter); > } > public void initialize(Iterator input, DataType[] columnTypes, int[] > columnIndexes) { > this.input = input; > this.columnTypes = columnTypes; > this.columnIndexes = columnIndexes; > } > public boolean hasNext() { > if (currentRow < numRowsInBatch) { > return true; > } > if (!input.hasNext()) { > return false; > } > org.apache.spark.sql.execution.columnar.CachedBatch batch = > (org.apache.spark.sql.execution.columnar.CachedBatch) input.next(); > currentRow = 0; > numRowsInBatch = batch.numRows(); > for (int i = 0; i < columnIndexes.length; i ++) { > buffers[i] =
[jira] [Resolved] (SPARK-18529) Timeouts shouldn't be AssertionErrors
[ https://issues.apache.org/jira/browse/SPARK-18529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tathagata Das resolved SPARK-18529. --- Resolution: Fixed Fix Version/s: 2.1.0 Issue resolved by pull request 15820 [https://github.com/apache/spark/pull/15820] > Timeouts shouldn't be AssertionErrors > - > > Key: SPARK-18529 > URL: https://issues.apache.org/jira/browse/SPARK-18529 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Reporter: Michael Armbrust >Assignee: Shixiong Zhu > Fix For: 2.1.0 > > > A timeout should inherit from {{RuntimeException}} as its not a fatal error. > {code} > java.lang.AssertionError: assertion failed: Failed to get records for > spark-kafka-source-26d6f51c-0781-45a4-ab8e-bc6bd6603917-86874470-executor > service-log-0 49350201 after polling for 1000 > at scala.Predef$.assert(Predef.scala:170) > at > org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:65) > at > org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.next(KafkaSourceRDD.scala:146) > at > org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.next(KafkaSourceRDD.scala:142) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-18373) Make KafkaSource's failOnDataLoss=false work with Spark jobs
[ https://issues.apache.org/jira/browse/SPARK-18373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tathagata Das resolved SPARK-18373. --- Resolution: Fixed Fix Version/s: 2.1.0 Issue resolved by pull request 15820 [https://github.com/apache/spark/pull/15820] > Make KafkaSource's failOnDataLoss=false work with Spark jobs > > > Key: SPARK-18373 > URL: https://issues.apache.org/jira/browse/SPARK-18373 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu > Fix For: 2.1.0 > > > Right now failOnDataLoss=false doesn't affect Spark jobs launched by > KafkaSource. The job may still fail the query when some topics are deleted or > some data is aged out. We should handle these corner cases in Spark jobs as > well. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18394) Executing the same query twice in a row results in CodeGenerator cache misses
[ https://issues.apache.org/jira/browse/SPARK-18394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688059#comment-15688059 ] Herman van Hovell commented on SPARK-18394: --- I am not able to reproduce this. Could you also explain to me why this is a major issue? I have used the following script: {noformat} sc.setLogLevel("INFO") spark.sql("create database if not exists tpc") spark.sql("drop table if exists tpc.lineitem") spark.sql(""" create table tpc.lineitem ( L_ORDERKEY bigint, L_PARTKEY bigint, L_SUPPKEY bigint, L_LINENUMBER bigint, L_QUANTITY double, L_EXTENDEDPRICE double, L_DISCOUNT double, L_TAX double, L_RETURNFLAG string, L_LINESTATUS string, L_SHIPDATE string, L_COMMITDATE string, L_RECEIPTDATE string, L_SHIPINSTRUCT string, L_SHIPMODE string, L_COMMENT string ) using parquet """) spark.sql(s""" insert into tpc.lineitem select id as L_ORDERKEY, id % 10 as L_PARTKEY, id % 50 as L_SUPPKEY, id as L_LINENUMBER, rand(3) * 10 as L_QUANTITY, rand(5) * 50 as L_EXTENDEDPRICE, rand(7) * 20 as L_DISCOUNT, 0.18d as L_TAX, case when rand(11) < 0.7d then 'Y' else 'N' end as L_RETURNFLAG, case when rand(13) < 0.4d then 'A' when rand(17) < 0.2d then 'B' else 'C' end as L_LINESTATUS, date_format(date_add(date '1998-08-05', id % 365), '-MM-dd') as L_SHIPDATE, date_format(date_add(date '1998-08-01', id % 365), '-MM-dd') as L_COMMITDATE, date_format(date_add(date '1998-08-03', id % 365), '-MM-dd') as L_RECEIPTDATE, 'DUMMY' as L_SHIPINSTRUCT, case when rand(19) < 0.7d then 'AIR' else 'LAND' end as L_SHIPMODE, 'DUMMY' as L_COMMENT from range(100) """) val df = spark.sql(""" select l_returnflag, l_linestatus, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, avg(l_quantity) as avg_qty, avg(l_extendedprice) as avg_price, avg(l_discount) as avg_disc, count(*) as count_order from tpc.lineitem where l_shipdate <= date_sub('1998-12-01', '90') group by l_returnflag, l_linestatus """) df.show() df.show() {noformat} > Executing the same query twice in a row results in CodeGenerator cache misses > - > > Key: SPARK-18394 > URL: https://issues.apache.org/jira/browse/SPARK-18394 > Project: Spark > Issue Type: Bug > Components: SQL > Environment: HiveThriftServer2 running on branch-2.0 on Mac laptop >Reporter: Jonny Serencsa > > Executing the query: > {noformat} > select > l_returnflag, > l_linestatus, > sum(l_quantity) as sum_qty, > sum(l_extendedprice) as sum_base_price, > sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, > sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, > avg(l_quantity) as avg_qty, > avg(l_extendedprice) as avg_price, > avg(l_discount) as avg_disc, > count(*) as count_order > from > lineitem_1_row > where > l_shipdate <= date_sub('1998-12-01', '90') > group by > l_returnflag, > l_linestatus > ; > {noformat} > twice (in succession), will result in CodeGenerator cache misses in BOTH > executions. Since the query is identical, I would expect the same code to be > generated. > Turns out, the generated code is not exactly the same, resulting in cache > misses when performing the lookup in the CodeGenerator cache. Yet, the code > is equivalent. > Below is (some portion of the) generated code for two runs of the query: > run-1 > {noformat} > import java.nio.ByteBuffer; > import java.nio.ByteOrder; > import scala.collection.Iterator; > import org.apache.spark.sql.types.DataType; > import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder; > import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter; > import org.apache.spark.sql.execution.columnar.MutableUnsafeRow; > public SpecificColumnarIterator generate(Object[] references) { > return new SpecificColumnarIterator(); > } > class SpecificColumnarIterator extends > org.apache.spark.sql.execution.columnar.ColumnarIterator { > private ByteOrder nativeOrder = null; > private byte[][] buffers = null; > private UnsafeRow unsafeRow = new UnsafeRow(7); > private BufferHolder bufferHolder = new BufferHolder(unsafeRow); > private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7); > private MutableUnsafeRow mutableRow = null; > private int currentRow = 0; > private int numRowsInBatch = 0; > private scala.collection.Iterator input = null; > private DataType[] columnTypes = null; > private int[] columnIndexes = null; > private
[jira] [Updated] (SPARK-18394) Executing the same query twice in a row results in CodeGenerator cache misses
[ https://issues.apache.org/jira/browse/SPARK-18394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Herman van Hovell updated SPARK-18394: -- Priority: Minor (was: Major) > Executing the same query twice in a row results in CodeGenerator cache misses > - > > Key: SPARK-18394 > URL: https://issues.apache.org/jira/browse/SPARK-18394 > Project: Spark > Issue Type: Bug > Components: SQL > Environment: HiveThriftServer2 running on branch-2.0 on Mac laptop >Reporter: Jonny Serencsa >Priority: Minor > > Executing the query: > {noformat} > select > l_returnflag, > l_linestatus, > sum(l_quantity) as sum_qty, > sum(l_extendedprice) as sum_base_price, > sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, > sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, > avg(l_quantity) as avg_qty, > avg(l_extendedprice) as avg_price, > avg(l_discount) as avg_disc, > count(*) as count_order > from > lineitem_1_row > where > l_shipdate <= date_sub('1998-12-01', '90') > group by > l_returnflag, > l_linestatus > ; > {noformat} > twice (in succession), will result in CodeGenerator cache misses in BOTH > executions. Since the query is identical, I would expect the same code to be > generated. > Turns out, the generated code is not exactly the same, resulting in cache > misses when performing the lookup in the CodeGenerator cache. Yet, the code > is equivalent. > Below is (some portion of the) generated code for two runs of the query: > run-1 > {noformat} > import java.nio.ByteBuffer; > import java.nio.ByteOrder; > import scala.collection.Iterator; > import org.apache.spark.sql.types.DataType; > import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder; > import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter; > import org.apache.spark.sql.execution.columnar.MutableUnsafeRow; > public SpecificColumnarIterator generate(Object[] references) { > return new SpecificColumnarIterator(); > } > class SpecificColumnarIterator extends > org.apache.spark.sql.execution.columnar.ColumnarIterator { > private ByteOrder nativeOrder = null; > private byte[][] buffers = null; > private UnsafeRow unsafeRow = new UnsafeRow(7); > private BufferHolder bufferHolder = new BufferHolder(unsafeRow); > private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7); > private MutableUnsafeRow mutableRow = null; > private int currentRow = 0; > private int numRowsInBatch = 0; > private scala.collection.Iterator input = null; > private DataType[] columnTypes = null; > private int[] columnIndexes = null; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor1; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor2; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor3; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor4; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor5; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor6; > public SpecificColumnarIterator() { > this.nativeOrder = ByteOrder.nativeOrder(); > this.buffers = new byte[7][]; > this.mutableRow = new MutableUnsafeRow(rowWriter); > } > public void initialize(Iterator input, DataType[] columnTypes, int[] > columnIndexes) { > this.input = input; > this.columnTypes = columnTypes; > this.columnIndexes = columnIndexes; > } > public boolean hasNext() { > if (currentRow < numRowsInBatch) { > return true; > } > if (!input.hasNext()) { > return false; > } > org.apache.spark.sql.execution.columnar.CachedBatch batch = > (org.apache.spark.sql.execution.columnar.CachedBatch) input.next(); > currentRow = 0; > numRowsInBatch = batch.numRows(); > for (int i = 0; i < columnIndexes.length; i ++) { > buffers[i] = batch.buffers()[columnIndexes[i]]; > } > accessor = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[0]).order(nativeOrder)); > accessor1 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[1]).order(nativeOrder)); > accessor2 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[2]).order(nativeOrder)); > accessor3 = new > org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[3]).order(nativeOrder)); > accessor4 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[4]).order(nativeOrder)); > accessor5 = new >
[jira] [Updated] (SPARK-18394) Executing the same query twice in a row results in CodeGenerator cache misses
[ https://issues.apache.org/jira/browse/SPARK-18394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Herman van Hovell updated SPARK-18394: -- Target Version/s: (was: 2.1.0) > Executing the same query twice in a row results in CodeGenerator cache misses > - > > Key: SPARK-18394 > URL: https://issues.apache.org/jira/browse/SPARK-18394 > Project: Spark > Issue Type: Bug > Components: SQL > Environment: HiveThriftServer2 running on branch-2.0 on Mac laptop >Reporter: Jonny Serencsa >Priority: Minor > > Executing the query: > {noformat} > select > l_returnflag, > l_linestatus, > sum(l_quantity) as sum_qty, > sum(l_extendedprice) as sum_base_price, > sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, > sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, > avg(l_quantity) as avg_qty, > avg(l_extendedprice) as avg_price, > avg(l_discount) as avg_disc, > count(*) as count_order > from > lineitem_1_row > where > l_shipdate <= date_sub('1998-12-01', '90') > group by > l_returnflag, > l_linestatus > ; > {noformat} > twice (in succession), will result in CodeGenerator cache misses in BOTH > executions. Since the query is identical, I would expect the same code to be > generated. > Turns out, the generated code is not exactly the same, resulting in cache > misses when performing the lookup in the CodeGenerator cache. Yet, the code > is equivalent. > Below is (some portion of the) generated code for two runs of the query: > run-1 > {noformat} > import java.nio.ByteBuffer; > import java.nio.ByteOrder; > import scala.collection.Iterator; > import org.apache.spark.sql.types.DataType; > import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder; > import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter; > import org.apache.spark.sql.execution.columnar.MutableUnsafeRow; > public SpecificColumnarIterator generate(Object[] references) { > return new SpecificColumnarIterator(); > } > class SpecificColumnarIterator extends > org.apache.spark.sql.execution.columnar.ColumnarIterator { > private ByteOrder nativeOrder = null; > private byte[][] buffers = null; > private UnsafeRow unsafeRow = new UnsafeRow(7); > private BufferHolder bufferHolder = new BufferHolder(unsafeRow); > private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7); > private MutableUnsafeRow mutableRow = null; > private int currentRow = 0; > private int numRowsInBatch = 0; > private scala.collection.Iterator input = null; > private DataType[] columnTypes = null; > private int[] columnIndexes = null; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor1; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor2; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor3; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor4; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor5; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor6; > public SpecificColumnarIterator() { > this.nativeOrder = ByteOrder.nativeOrder(); > this.buffers = new byte[7][]; > this.mutableRow = new MutableUnsafeRow(rowWriter); > } > public void initialize(Iterator input, DataType[] columnTypes, int[] > columnIndexes) { > this.input = input; > this.columnTypes = columnTypes; > this.columnIndexes = columnIndexes; > } > public boolean hasNext() { > if (currentRow < numRowsInBatch) { > return true; > } > if (!input.hasNext()) { > return false; > } > org.apache.spark.sql.execution.columnar.CachedBatch batch = > (org.apache.spark.sql.execution.columnar.CachedBatch) input.next(); > currentRow = 0; > numRowsInBatch = batch.numRows(); > for (int i = 0; i < columnIndexes.length; i ++) { > buffers[i] = batch.buffers()[columnIndexes[i]]; > } > accessor = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[0]).order(nativeOrder)); > accessor1 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[1]).order(nativeOrder)); > accessor2 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[2]).order(nativeOrder)); > accessor3 = new > org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[3]).order(nativeOrder)); > accessor4 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[4]).order(nativeOrder)); > accessor5 = new >
[jira] [Updated] (SPARK-18544) Append with df.saveAsTable writes data to wrong location
[ https://issues.apache.org/jira/browse/SPARK-18544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eric Liang updated SPARK-18544: --- Description: When using saveAsTable in append mode, data will be written to the wrong location for non-managed Datasource tables. The following example illustrates this. It seems somehow pass the wrong table path to InsertIntoHadoopFsRelation from DataFrameWriter. Also, we should probably remove the repair table call at the end of saveAsTable in DataFrameWriter. That shouldn't be needed in either the Hive or Datasource case. {code} scala> spark.sqlContext.range(100).selectExpr("id", "id as A", "id as B").write.partitionBy("A", "B").mode("overwrite").parquet("/tmp/test") scala> sql("create table test (id long, A int, B int) USING parquet OPTIONS (path '/tmp/test') PARTITIONED BY (A, B)") scala> sql("msck repair table test") scala> sql("select * from test where A = 1").count res6: Long = 1 scala> spark.sqlContext.range(10).selectExpr("id", "id as A", "id as B").write.partitionBy("A", "B").mode("append").saveAsTable("test") scala> sql("select * from test where A = 1").count res8: Long = 1 {code} was: When using saveAsTable in append mode, data will be written to the wrong location for non-managed Datasource tables. The following example illustrates this. It seems somehow pass the wrong table path to InsertIntoHadoopFsRelation from DataFrameWriter. Also, we should probably remove the repair table call at the end of saveAsTable in DataFrameWriter. That shouldn't be needed in either the Hive or Datasource case. {code} scala> spark.sqlContext.range(1).selectExpr("id", "id as A", "id as B").write.partitionBy("A", "B").mode("overwrite").parquet("/tmp/test_10k") scala> sql("msck repair table test_10k") scala> sql("select * from test_10k where A = 1").count res6: Long = 1 scala> spark.sqlContext.range(10).selectExpr("id", "id as A", "id as B").write.partitionBy("A", "B").mode("append").parquet("/tmp/test_10k") scala> sql("select * from test_10k where A = 1").count res8: Long = 1 {code} > Append with df.saveAsTable writes data to wrong location > > > Key: SPARK-18544 > URL: https://issues.apache.org/jira/browse/SPARK-18544 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Eric Liang >Priority: Blocker > > When using saveAsTable in append mode, data will be written to the wrong > location for non-managed Datasource tables. The following example illustrates > this. > It seems somehow pass the wrong table path to InsertIntoHadoopFsRelation from > DataFrameWriter. Also, we should probably remove the repair table call at the > end of saveAsTable in DataFrameWriter. That shouldn't be needed in either the > Hive or Datasource case. > {code} > scala> spark.sqlContext.range(100).selectExpr("id", "id as A", "id as > B").write.partitionBy("A", "B").mode("overwrite").parquet("/tmp/test") > scala> sql("create table test (id long, A int, B int) USING parquet OPTIONS > (path '/tmp/test') PARTITIONED BY (A, B)") > scala> sql("msck repair table test") > scala> sql("select * from test where A = 1").count > res6: Long = 1 > scala> spark.sqlContext.range(10).selectExpr("id", "id as A", "id as > B").write.partitionBy("A", "B").mode("append").saveAsTable("test") > scala> sql("select * from test where A = 1").count > res8: Long = 1 > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18531) Apache Spark FPGrowth algorithm implementation fails with java.lang.StackOverflowError
[ https://issues.apache.org/jira/browse/SPARK-18531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15687998#comment-15687998 ] yuhao yang commented on SPARK-18531: I think you should use spark.driver.extraJavaOptions > Apache Spark FPGrowth algorithm implementation fails with > java.lang.StackOverflowError > -- > > Key: SPARK-18531 > URL: https://issues.apache.org/jira/browse/SPARK-18531 > Project: Spark > Issue Type: Bug > Components: MLlib >Affects Versions: 1.6.1 >Reporter: Saleem Ansari > > More details can be found here: > https://gist.github.com/tuxdna/37a69b53e6f9a9442fa3b1d5e53c2acb > *Spark FPGrowth algorithm croaks with a small dataset as shown below* > $ spark-shell --master "local[*]" --driver-memory 5g > Welcome to > __ > / __/__ ___ _/ /__ > _\ \/ _ \/ _ `/ __/ '_/ >/___/ .__/\_,_/_/ /_/\_\ version 1.6.1 > /_/ > Using Scala version 2.10.5 (OpenJDK 64-Bit Server VM, Java 1.8.0_102) > Spark context available as sc. > SQL context available as sqlContext. > scala> import org.apache.spark.mllib.fpm.FPGrowth > import org.apache.spark.mllib.fpm.FPGrowth > scala> import org.apache.spark.rdd.RDD > import org.apache.spark.rdd.RDD > scala> import org.apache.spark.sql.SQLContext > import org.apache.spark.sql.SQLContext > scala> import org.apache.spark.{SparkConf, SparkContext} > import org.apache.spark.{SparkConf, SparkContext} > scala> val data = sc.textFile("bug.data") > data: org.apache.spark.rdd.RDD[String] = bug.data MapPartitionsRDD[1] at > textFile at :31 > scala> val transactions: RDD[Array[String]] = data.map(l => > l.split(",").distinct) > transactions: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] > at map at :33 > scala> transactions.cache() > res0: transactions.type = MapPartitionsRDD[2] at map at :33 > scala> val fpg = new FPGrowth().setMinSupport(0.05).setNumPartitions(10) > fpg: org.apache.spark.mllib.fpm.FPGrowth = > org.apache.spark.mllib.fpm.FPGrowth@66d62c59 > scala> val model = fpg.run(transactions) > model: org.apache.spark.mllib.fpm.FPGrowthModel[String] = > org.apache.spark.mllib.fpm.FPGrowthModel@6e92f150 > scala> model.freqItemsets.take(1).foreach { i => i.items.mkString("[", ",", > "]") + ", " + i.freq } > [Stage 3:> (0 + 2) / > 2]16/11/21 23:56:14 ERROR Executor: Managed memory leak detected; size = > 18068980 bytes, TID = 14 > 16/11/21 23:56:14 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 14) > java.lang.StackOverflowError > at org.xerial.snappy.Snappy.arrayCopy(Snappy.java:84) > at > org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:273) > at org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:115) > at > org.apache.spark.io.SnappyOutputStreamWrapper.write(CompressionCodec.scala:202) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > *This failure is likely due to the size of baskets which contains over > thousands of items.* > scala> val maxBasketSize = transactions.map(_.length).max() > maxBasketSize: Int = 1171 > > scala> transactions.filter(_.length == maxBasketSize).collect() > res3: Array[Array[String]] = Array(Array(3858, 109, 5842, 2184, 2481, 534 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18544) Append with df.saveAsTable writes data to wrong location
[ https://issues.apache.org/jira/browse/SPARK-18544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yin Huai updated SPARK-18544: - Priority: Blocker (was: Major) > Append with df.saveAsTable writes data to wrong location > > > Key: SPARK-18544 > URL: https://issues.apache.org/jira/browse/SPARK-18544 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Eric Liang >Priority: Blocker > > When using saveAsTable in append mode, data will be written to the wrong > location for non-managed Datasource tables. The following example illustrates > this. > It seems somehow pass the wrong table path to InsertIntoHadoopFsRelation from > DataFrameWriter. Also, we should probably remove the repair table call at the > end of saveAsTable in DataFrameWriter. That shouldn't be needed in either the > Hive or Datasource case. > {code} > scala> spark.sqlContext.range(1).selectExpr("id", "id as A", "id as > B").write.partitionBy("A", "B").mode("overwrite").parquet("/tmp/test_10k") > scala> sql("msck repair table test_10k") > scala> sql("select * from test_10k where A = 1").count > res6: Long = 1 > scala> spark.sqlContext.range(10).selectExpr("id", "id as A", "id as > B").write.partitionBy("A", "B").mode("append").parquet("/tmp/test_10k") > scala> sql("select * from test_10k where A = 1").count > res8: Long = 1 > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-18546) UnsafeShuffleWriter corrupts encrypted shuffle files when merging
[ https://issues.apache.org/jira/browse/SPARK-18546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-18546: Assignee: Apache Spark (was: Marcelo Vanzin) > UnsafeShuffleWriter corrupts encrypted shuffle files when merging > - > > Key: SPARK-18546 > URL: https://issues.apache.org/jira/browse/SPARK-18546 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Marcelo Vanzin >Assignee: Apache Spark >Priority: Critical > > The merging algorithm in {{UnsafeShuffleWriter}} does not consider > encryption, and when it tries to merge encrypted files the result data cannot > be read, since data encrypted with different initial vectors is interleaved > in the same partition data. This leads to exceptions when trying to read the > files during shuffle: > {noformat} > com.esotericsoftware.kryo.KryoException: com.ning.compress.lzf.LZFException: > Corrupt input data, block did not start with 2 byte signature ('ZV') followed > by type byte, 2-byte length) > at com.esotericsoftware.kryo.io.Input.fill(Input.java:142) > at com.esotericsoftware.kryo.io.Input.require(Input.java:155) > at com.esotericsoftware.kryo.io.Input.readInt(Input.java:337) > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721) > at > org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:228) > at > org.apache.spark.serializer.DeserializationStream.readKey(Serializer.scala:169) > at > org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.readNextItem(ExternalAppendOnlyMap.scala:512) > at > org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.hasNext(ExternalAppendOnlyMap.scala:533) > ... > {noformat} > (This is our internal branch so don't worry if lines don't necessarily match.) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-18546) UnsafeShuffleWriter corrupts encrypted shuffle files when merging
[ https://issues.apache.org/jira/browse/SPARK-18546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-18546: Assignee: Marcelo Vanzin (was: Apache Spark) > UnsafeShuffleWriter corrupts encrypted shuffle files when merging > - > > Key: SPARK-18546 > URL: https://issues.apache.org/jira/browse/SPARK-18546 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Marcelo Vanzin >Assignee: Marcelo Vanzin >Priority: Critical > > The merging algorithm in {{UnsafeShuffleWriter}} does not consider > encryption, and when it tries to merge encrypted files the result data cannot > be read, since data encrypted with different initial vectors is interleaved > in the same partition data. This leads to exceptions when trying to read the > files during shuffle: > {noformat} > com.esotericsoftware.kryo.KryoException: com.ning.compress.lzf.LZFException: > Corrupt input data, block did not start with 2 byte signature ('ZV') followed > by type byte, 2-byte length) > at com.esotericsoftware.kryo.io.Input.fill(Input.java:142) > at com.esotericsoftware.kryo.io.Input.require(Input.java:155) > at com.esotericsoftware.kryo.io.Input.readInt(Input.java:337) > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721) > at > org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:228) > at > org.apache.spark.serializer.DeserializationStream.readKey(Serializer.scala:169) > at > org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.readNextItem(ExternalAppendOnlyMap.scala:512) > at > org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.hasNext(ExternalAppendOnlyMap.scala:533) > ... > {noformat} > (This is our internal branch so don't worry if lines don't necessarily match.) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18546) UnsafeShuffleWriter corrupts encrypted shuffle files when merging
[ https://issues.apache.org/jira/browse/SPARK-18546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15687952#comment-15687952 ] Apache Spark commented on SPARK-18546: -- User 'vanzin' has created a pull request for this issue: https://github.com/apache/spark/pull/15982 > UnsafeShuffleWriter corrupts encrypted shuffle files when merging > - > > Key: SPARK-18546 > URL: https://issues.apache.org/jira/browse/SPARK-18546 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Marcelo Vanzin >Assignee: Marcelo Vanzin >Priority: Critical > > The merging algorithm in {{UnsafeShuffleWriter}} does not consider > encryption, and when it tries to merge encrypted files the result data cannot > be read, since data encrypted with different initial vectors is interleaved > in the same partition data. This leads to exceptions when trying to read the > files during shuffle: > {noformat} > com.esotericsoftware.kryo.KryoException: com.ning.compress.lzf.LZFException: > Corrupt input data, block did not start with 2 byte signature ('ZV') followed > by type byte, 2-byte length) > at com.esotericsoftware.kryo.io.Input.fill(Input.java:142) > at com.esotericsoftware.kryo.io.Input.require(Input.java:155) > at com.esotericsoftware.kryo.io.Input.readInt(Input.java:337) > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721) > at > org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:228) > at > org.apache.spark.serializer.DeserializationStream.readKey(Serializer.scala:169) > at > org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.readNextItem(ExternalAppendOnlyMap.scala:512) > at > org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.hasNext(ExternalAppendOnlyMap.scala:533) > ... > {noformat} > (This is our internal branch so don't worry if lines don't necessarily match.) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18547) Decouple I/O encryption key propagation from UserGroupInformation
[ https://issues.apache.org/jira/browse/SPARK-18547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15687937#comment-15687937 ] Apache Spark commented on SPARK-18547: -- User 'vanzin' has created a pull request for this issue: https://github.com/apache/spark/pull/15981 > Decouple I/O encryption key propagation from UserGroupInformation > - > > Key: SPARK-18547 > URL: https://issues.apache.org/jira/browse/SPARK-18547 > Project: Spark > Issue Type: Improvement > Components: Spark Core, YARN >Affects Versions: 2.1.0 >Reporter: Marcelo Vanzin >Assignee: Marcelo Vanzin > > Currently, the encryption key used by the shuffle code is propagated using > {{UserGroupInformation}} and thus only works on YARN. That makes it really > painful to write unit tests in core that include encryption functionality. > We should change that so that writing these tests is possible, and also > because that would allow shuffle encryption to work with other cluster > managers. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-18547) Decouple I/O encryption key propagation from UserGroupInformation
[ https://issues.apache.org/jira/browse/SPARK-18547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-18547: Assignee: Marcelo Vanzin (was: Apache Spark) > Decouple I/O encryption key propagation from UserGroupInformation > - > > Key: SPARK-18547 > URL: https://issues.apache.org/jira/browse/SPARK-18547 > Project: Spark > Issue Type: Improvement > Components: Spark Core, YARN >Affects Versions: 2.1.0 >Reporter: Marcelo Vanzin >Assignee: Marcelo Vanzin > > Currently, the encryption key used by the shuffle code is propagated using > {{UserGroupInformation}} and thus only works on YARN. That makes it really > painful to write unit tests in core that include encryption functionality. > We should change that so that writing these tests is possible, and also > because that would allow shuffle encryption to work with other cluster > managers. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-18547) Decouple I/O encryption key propagation from UserGroupInformation
[ https://issues.apache.org/jira/browse/SPARK-18547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-18547: Assignee: Apache Spark (was: Marcelo Vanzin) > Decouple I/O encryption key propagation from UserGroupInformation > - > > Key: SPARK-18547 > URL: https://issues.apache.org/jira/browse/SPARK-18547 > Project: Spark > Issue Type: Improvement > Components: Spark Core, YARN >Affects Versions: 2.1.0 >Reporter: Marcelo Vanzin >Assignee: Apache Spark > > Currently, the encryption key used by the shuffle code is propagated using > {{UserGroupInformation}} and thus only works on YARN. That makes it really > painful to write unit tests in core that include encryption functionality. > We should change that so that writing these tests is possible, and also > because that would allow shuffle encryption to work with other cluster > managers. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18546) UnsafeShuffleWriter corrupts encrypted shuffle files when merging
[ https://issues.apache.org/jira/browse/SPARK-18546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15687930#comment-15687930 ] Marcelo Vanzin commented on SPARK-18546: I'm marking this as blocked by SPARK-18547 because that feature allows adding the necessary unit tests for making sure this bug is fixed. > UnsafeShuffleWriter corrupts encrypted shuffle files when merging > - > > Key: SPARK-18546 > URL: https://issues.apache.org/jira/browse/SPARK-18546 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Marcelo Vanzin >Assignee: Marcelo Vanzin >Priority: Critical > > The merging algorithm in {{UnsafeShuffleWriter}} does not consider > encryption, and when it tries to merge encrypted files the result data cannot > be read, since data encrypted with different initial vectors is interleaved > in the same partition data. This leads to exceptions when trying to read the > files during shuffle: > {noformat} > com.esotericsoftware.kryo.KryoException: com.ning.compress.lzf.LZFException: > Corrupt input data, block did not start with 2 byte signature ('ZV') followed > by type byte, 2-byte length) > at com.esotericsoftware.kryo.io.Input.fill(Input.java:142) > at com.esotericsoftware.kryo.io.Input.require(Input.java:155) > at com.esotericsoftware.kryo.io.Input.readInt(Input.java:337) > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721) > at > org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:228) > at > org.apache.spark.serializer.DeserializationStream.readKey(Serializer.scala:169) > at > org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.readNextItem(ExternalAppendOnlyMap.scala:512) > at > org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.hasNext(ExternalAppendOnlyMap.scala:533) > ... > {noformat} > (This is our internal branch so don't worry if lines don't necessarily match.) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-18547) Decouple I/O encryption key propagation from UserGroupInformation
Marcelo Vanzin created SPARK-18547: -- Summary: Decouple I/O encryption key propagation from UserGroupInformation Key: SPARK-18547 URL: https://issues.apache.org/jira/browse/SPARK-18547 Project: Spark Issue Type: Improvement Components: Spark Core, YARN Affects Versions: 2.1.0 Reporter: Marcelo Vanzin Assignee: Marcelo Vanzin Currently, the encryption key used by the shuffle code is propagated using {{UserGroupInformation}} and thus only works on YARN. That makes it really painful to write unit tests in core that include encryption functionality. We should change that so that writing these tests is possible, and also because that would allow shuffle encryption to work with other cluster managers. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-18546) UnsafeShuffleWriter corrupts encrypted shuffle files when merging
Marcelo Vanzin created SPARK-18546: -- Summary: UnsafeShuffleWriter corrupts encrypted shuffle files when merging Key: SPARK-18546 URL: https://issues.apache.org/jira/browse/SPARK-18546 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.1.0 Reporter: Marcelo Vanzin Assignee: Marcelo Vanzin Priority: Critical The merging algorithm in {{UnsafeShuffleWriter}} does not consider encryption, and when it tries to merge encrypted files the result data cannot be read, since data encrypted with different initial vectors is interleaved in the same partition data. This leads to exceptions when trying to read the files during shuffle: {noformat} com.esotericsoftware.kryo.KryoException: com.ning.compress.lzf.LZFException: Corrupt input data, block did not start with 2 byte signature ('ZV') followed by type byte, 2-byte length) at com.esotericsoftware.kryo.io.Input.fill(Input.java:142) at com.esotericsoftware.kryo.io.Input.require(Input.java:155) at com.esotericsoftware.kryo.io.Input.readInt(Input.java:337) at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:228) at org.apache.spark.serializer.DeserializationStream.readKey(Serializer.scala:169) at org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.readNextItem(ExternalAppendOnlyMap.scala:512) at org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.hasNext(ExternalAppendOnlyMap.scala:533) ... {noformat} (This is our internal branch so don't worry if lines don't necessarily match.) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-13649) Move CalendarInterval out of unsafe package
[ https://issues.apache.org/jira/browse/SPARK-13649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin closed SPARK-13649. --- Resolution: Won't Fix Target Version/s: (was: 2.1.0) > Move CalendarInterval out of unsafe package > --- > > Key: SPARK-13649 > URL: https://issues.apache.org/jira/browse/SPARK-13649 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.0.0 >Reporter: Cheng Lian > > {{CalendarInterval}} is part of user facing interface as it can be returned > DataFrame contents, but historically it was put in {{unsafe.types}} package > for no good reason. We should probably move it out. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18544) Append with df.saveAsTable writes data to wrong location
[ https://issues.apache.org/jira/browse/SPARK-18544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15687884#comment-15687884 ] Eric Liang commented on SPARK-18544: cc [~yhuai] [~cloud_fan] I'll try to look at this today but might not get to it. > Append with df.saveAsTable writes data to wrong location > > > Key: SPARK-18544 > URL: https://issues.apache.org/jira/browse/SPARK-18544 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Eric Liang > > When using saveAsTable in append mode, data will be written to the wrong > location for non-managed Datasource tables. The following example illustrates > this. > It seems somehow pass the wrong table path to InsertIntoHadoopFsRelation from > DataFrameWriter. Also, we should probably remove the repair table call at the > end of saveAsTable in DataFrameWriter. That shouldn't be needed in either the > Hive or Datasource case. > {code} > scala> spark.sqlContext.range(1).selectExpr("id", "id as A", "id as > B").write.partitionBy("A", "B").mode("overwrite").parquet("/tmp/test_10k") > scala> sql("msck repair table test_10k") > scala> sql("select * from test_10k where A = 1").count > res6: Long = 1 > scala> spark.sqlContext.range(10).selectExpr("id", "id as A", "id as > B").write.partitionBy("A", "B").mode("append").parquet("/tmp/test_10k") > scala> sql("select * from test_10k where A = 1").count > res8: Long = 1 > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-18545) Verify number of hive client RPCs in PartitionedTablePerfStatsSuite
Eric Liang created SPARK-18545: -- Summary: Verify number of hive client RPCs in PartitionedTablePerfStatsSuite Key: SPARK-18545 URL: https://issues.apache.org/jira/browse/SPARK-18545 Project: Spark Issue Type: Test Components: SQL Reporter: Eric Liang Priority: Minor To avoid performance regressions like https://issues.apache.org/jira/browse/SPARK-18507 in the future, we should add a metric for the number of Hive client RPC issued and check it in the perf stats suite. cc [~cloud_fan] -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-18465) Uncache Table shouldn't throw an exception when table doesn't exist
[ https://issues.apache.org/jira/browse/SPARK-18465?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Herman van Hovell resolved SPARK-18465. --- Resolution: Fixed Assignee: Burak Yavuz Fix Version/s: 2.1.0 > Uncache Table shouldn't throw an exception when table doesn't exist > --- > > Key: SPARK-18465 > URL: https://issues.apache.org/jira/browse/SPARK-18465 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Burak Yavuz >Assignee: Burak Yavuz > Fix For: 2.1.0 > > > While this behavior is debatable, consider the following use case: > {code} > UNCACHE TABLE foo; > CACHE TABLE foo AS > SELECT * FROM bar > {code} > The command above fails the first time you run it. But I want to run the > command above over and over again, and I don't want to change my code just > for the first run of it. > The issue is that subsequent `CACHE TABLE` commands do not overwrite the > existing table. > There are alternate solutions, e.g. > 1. > {code} > UNCACHE TABLE IF EXISTS foo > {code} > 2. > {code} > CACHE AND REPLACE TABLE foo > {code} > Which will require additional work with the parser. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-18544) Append with df.saveAsTable writes data to wrong location
Eric Liang created SPARK-18544: -- Summary: Append with df.saveAsTable writes data to wrong location Key: SPARK-18544 URL: https://issues.apache.org/jira/browse/SPARK-18544 Project: Spark Issue Type: Bug Components: SQL Reporter: Eric Liang When using saveAsTable in append mode, data will be written to the wrong location for non-managed Datasource tables. The following example illustrates this. It seems somehow pass the wrong table path to InsertIntoHadoopFsRelation from DataFrameWriter. Also, we should probably remove the repair table call at the end of saveAsTable in DataFrameWriter. That shouldn't be needed in either the Hive or Datasource case. {code} scala> spark.sqlContext.range(1).selectExpr("id", "id as A", "id as B").write.partitionBy("A", "B").mode("overwrite").parquet("/tmp/test_10k") scala> sql("msck repair table test_10k") scala> sql("select * from test_10k where A = 1").count res6: Long = 1 scala> spark.sqlContext.range(10).selectExpr("id", "id as A", "id as B").write.partitionBy("A", "B").mode("append").parquet("/tmp/test_10k") scala> sql("select * from test_10k where A = 1").count res8: Long = 1 {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13649) Move CalendarInterval out of unsafe package
[ https://issues.apache.org/jira/browse/SPARK-13649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15687878#comment-15687878 ] Herman van Hovell commented on SPARK-13649: --- I have to agree with reynold. We return CalendarIntervals to the end user, for example: {noformat} import org.apache.spark.unsafe.types.CalendarInterval import org.apache.spark.sql.Row val df = sql("select * from values (1, interval 10 days), (2, interval 2 months), (3, interval 5 seconds 4 years) as t(id, duration)") df.collect().foreach { case Row(id: Int, duration: CalendarInterval) => println(s"[$id]: $duration") } {noformat} ...yields the following result: {noformat} [1]: interval 1 weeks 3 days [2]: interval 2 months [3]: interval 4 years 5 seconds {noformat} So I don't think we can do this any time soon. > Move CalendarInterval out of unsafe package > --- > > Key: SPARK-13649 > URL: https://issues.apache.org/jira/browse/SPARK-13649 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.0.0 >Reporter: Cheng Lian > > {{CalendarInterval}} is part of user facing interface as it can be returned > DataFrame contents, but historically it was put in {{unsafe.types}} package > for no good reason. We should probably move it out. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-18507) Major performance regression in SHOW PARTITIONS on partitioned Hive tables
[ https://issues.apache.org/jira/browse/SPARK-18507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Or resolved SPARK-18507. --- Resolution: Fixed Assignee: Wenchen Fan Fix Version/s: 2.1.0 Target Version/s: 2.1.0 > Major performance regression in SHOW PARTITIONS on partitioned Hive tables > -- > > Key: SPARK-18507 > URL: https://issues.apache.org/jira/browse/SPARK-18507 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: Michael Allman >Assignee: Wenchen Fan >Priority: Critical > Fix For: 2.1.0 > > > Commit {{ccb11543048dccd4cc590a8db1df1d9d5847d112}} > (https://github.com/apache/spark/commit/ccb11543048dccd4cc590a8db1df1d9d5847d112) > appears to have introduced a major regression in the performance of the Hive > {{SHOW PARTITIONS}} command. Running that command on a Hive table with 17,337 > partitions in the {{spark-sql}} shell with the parent commit of {{ccb1154}} > takes approximately 7.3 seconds. Running the same command with commit > {{ccb1154}} takes approximately 250 seconds. > I have not had the opportunity to complete a thorough investigation, but I > suspect the problem lies in the diff hunk beginning at > https://github.com/apache/spark/commit/ccb11543048dccd4cc590a8db1df1d9d5847d112#diff-159191585e10542f013cb3a714f26075L675. > If that's the case, this performance issue should manifest itself in other > areas as this programming pattern was used elsewhere in this commit. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-18504) Scalar subquery with extra group by columns returning incorrect result
[ https://issues.apache.org/jira/browse/SPARK-18504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Herman van Hovell resolved SPARK-18504. --- Resolution: Fixed Assignee: Nattavut Sutyanyong Fix Version/s: 2.1.0 2.0.3 > Scalar subquery with extra group by columns returning incorrect result > -- > > Key: SPARK-18504 > URL: https://issues.apache.org/jira/browse/SPARK-18504 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 >Reporter: Nattavut Sutyanyong >Assignee: Nattavut Sutyanyong > Fix For: 2.0.3, 2.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18513) Record and recover watermark
[ https://issues.apache.org/jira/browse/SPARK-18513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-18513: - Assignee: Tyson Condie > Record and recover watermark > > > Key: SPARK-18513 > URL: https://issues.apache.org/jira/browse/SPARK-18513 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Reporter: Liwei Lin >Assignee: Tyson Condie >Priority: Blocker > > We should record the watermark into the persistent log and recover it to > ensure determinism. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12469) Data Property Accumulators for Spark (formerly Consistent Accumulators)
[ https://issues.apache.org/jira/browse/SPARK-12469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15687763#comment-15687763 ] holdenk commented on SPARK-12469: - Cool - I'll bug y'all after the 2.1 release is out so hopefully we can get the jump on getting it in for 2.2 :) > Data Property Accumulators for Spark (formerly Consistent Accumulators) > --- > > Key: SPARK-12469 > URL: https://issues.apache.org/jira/browse/SPARK-12469 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Reporter: holdenk > > Tasks executed on Spark workers are unable to modify values from the driver, > and accumulators are the one exception for this. Accumulators in Spark are > implemented in such a way that when a stage is recomputed (say for cache > eviction) the accumulator will be updated a second time. This makes > accumulators inside of transformations more difficult to use for things like > counting invalid records (one of the primary potential use cases of > collecting side information during a transformation). However in some cases > this counting during re-evaluation is exactly the behaviour we want (say in > tracking total execution time for a particular function). Spark would benefit > from a version of accumulators which did not double count even if stages were > re-executed. > Motivating example: > {code} > val parseTime = sc.accumulator(0L) > val parseFailures = sc.accumulator(0L) > val parsedData = sc.textFile(...).flatMap { line => > val start = System.currentTimeMillis() > val parsed = Try(parse(line)) > if (parsed.isFailure) parseFailures += 1 > parseTime += System.currentTimeMillis() - start > parsed.toOption > } > parsedData.cache() > val resultA = parsedData.map(...).filter(...).count() > // some intervening code. Almost anything could happen here -- some of > parsedData may > // get kicked out of the cache, or an executor where data was cached might > get lost > val resultB = parsedData.filter(...).map(...).flatMap(...).count() > // now we look at the accumulators > {code} > Here we would want parseFailures to only have been added to once for every > line which failed to parse. Unfortunately, the current Spark accumulator API > doesn’t support the current parseFailures use case since if some data had > been evicted its possible that it will be double counted. > See the full design document at > https://docs.google.com/document/d/1lR_l1g3zMVctZXrcVjFusq2iQVpr4XvRK_UUDsDr6nk/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-18528) limit + groupBy leads to java.lang.NullPointerException
[ https://issues.apache.org/jira/browse/SPARK-18528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-18528: Assignee: Apache Spark > limit + groupBy leads to java.lang.NullPointerException > --- > > Key: SPARK-18528 > URL: https://issues.apache.org/jira/browse/SPARK-18528 > Project: Spark > Issue Type: Bug > Components: PySpark, SQL >Affects Versions: 2.0.1 > Environment: CentOS release 6.6, Linux 2.6.32-504.el6.x86_64 >Reporter: Corey >Assignee: Apache Spark > > Using limit on a DataFrame prior to groupBy will lead to a crash. > Repartitioning will avoid the crash. > *will crash:* {{df.limit(3).groupBy("user_id").count().show()}} > *will work:* {{df.limit(3).coalesce(1).groupBy('user_id').count().show()}} > *will work:* > {{df.limit(3).repartition('user_id').groupBy('user_id').count().show()}} > Here is a reproducible example along with the error message: > {quote} > >>> df = spark.createDataFrame([ (1, 1), (1, 3), (2, 1), (3, 2), (3, 3) ], > >>> ["user_id", "genre_id"]) > >>> > >>> df.show() > +---++ > |user_id|genre_id| > +---++ > | 1| 1| > | 1| 3| > | 2| 1| > | 3| 2| > | 3| 3| > +---++ > >>> df.groupBy("user_id").count().show() > +---+-+ > |user_id|count| > +---+-+ > | 1|2| > | 3|2| > | 2|1| > +---+-+ > >>> df.limit(3).groupBy("user_id").count().show() > [Stage 8:===>(1964 + 24) / > 2000]16/11/21 01:59:27 WARN TaskSetManager: Lost task 0.0 in stage 9.0 (TID > 8204, lvsp20hdn012.stubprod.com): java.lang.NullPointerException > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-18528) limit + groupBy leads to java.lang.NullPointerException
[ https://issues.apache.org/jira/browse/SPARK-18528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-18528: Assignee: (was: Apache Spark) > limit + groupBy leads to java.lang.NullPointerException > --- > > Key: SPARK-18528 > URL: https://issues.apache.org/jira/browse/SPARK-18528 > Project: Spark > Issue Type: Bug > Components: PySpark, SQL >Affects Versions: 2.0.1 > Environment: CentOS release 6.6, Linux 2.6.32-504.el6.x86_64 >Reporter: Corey > > Using limit on a DataFrame prior to groupBy will lead to a crash. > Repartitioning will avoid the crash. > *will crash:* {{df.limit(3).groupBy("user_id").count().show()}} > *will work:* {{df.limit(3).coalesce(1).groupBy('user_id').count().show()}} > *will work:* > {{df.limit(3).repartition('user_id').groupBy('user_id').count().show()}} > Here is a reproducible example along with the error message: > {quote} > >>> df = spark.createDataFrame([ (1, 1), (1, 3), (2, 1), (3, 2), (3, 3) ], > >>> ["user_id", "genre_id"]) > >>> > >>> df.show() > +---++ > |user_id|genre_id| > +---++ > | 1| 1| > | 1| 3| > | 2| 1| > | 3| 2| > | 3| 3| > +---++ > >>> df.groupBy("user_id").count().show() > +---+-+ > |user_id|count| > +---+-+ > | 1|2| > | 3|2| > | 2|1| > +---+-+ > >>> df.limit(3).groupBy("user_id").count().show() > [Stage 8:===>(1964 + 24) / > 2000]16/11/21 01:59:27 WARN TaskSetManager: Lost task 0.0 in stage 9.0 (TID > 8204, lvsp20hdn012.stubprod.com): java.lang.NullPointerException > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18528) limit + groupBy leads to java.lang.NullPointerException
[ https://issues.apache.org/jira/browse/SPARK-18528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15687690#comment-15687690 ] Apache Spark commented on SPARK-18528: -- User 'maropu' has created a pull request for this issue: https://github.com/apache/spark/pull/15980 > limit + groupBy leads to java.lang.NullPointerException > --- > > Key: SPARK-18528 > URL: https://issues.apache.org/jira/browse/SPARK-18528 > Project: Spark > Issue Type: Bug > Components: PySpark, SQL >Affects Versions: 2.0.1 > Environment: CentOS release 6.6, Linux 2.6.32-504.el6.x86_64 >Reporter: Corey > > Using limit on a DataFrame prior to groupBy will lead to a crash. > Repartitioning will avoid the crash. > *will crash:* {{df.limit(3).groupBy("user_id").count().show()}} > *will work:* {{df.limit(3).coalesce(1).groupBy('user_id').count().show()}} > *will work:* > {{df.limit(3).repartition('user_id').groupBy('user_id').count().show()}} > Here is a reproducible example along with the error message: > {quote} > >>> df = spark.createDataFrame([ (1, 1), (1, 3), (2, 1), (3, 2), (3, 3) ], > >>> ["user_id", "genre_id"]) > >>> > >>> df.show() > +---++ > |user_id|genre_id| > +---++ > | 1| 1| > | 1| 3| > | 2| 1| > | 3| 2| > | 3| 3| > +---++ > >>> df.groupBy("user_id").count().show() > +---+-+ > |user_id|count| > +---+-+ > | 1|2| > | 3|2| > | 2|1| > +---+-+ > >>> df.limit(3).groupBy("user_id").count().show() > [Stage 8:===>(1964 + 24) / > 2000]16/11/21 01:59:27 WARN TaskSetManager: Lost task 0.0 in stage 9.0 (TID > 8204, lvsp20hdn012.stubprod.com): java.lang.NullPointerException > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-12469) Data Property Accumulators for Spark (formerly Consistent Accumulators)
[ https://issues.apache.org/jira/browse/SPARK-12469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15687668#comment-15687668 ] Reynold Xin edited comment on SPARK-12469 at 11/22/16 7:36 PM: --- Sorry there is no way to get this in 2.1, given the size of the change to the critical path. was (Author: rxin): Sorry there is no way to get this in 2.1, given the size of the change. > Data Property Accumulators for Spark (formerly Consistent Accumulators) > --- > > Key: SPARK-12469 > URL: https://issues.apache.org/jira/browse/SPARK-12469 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Reporter: holdenk > > Tasks executed on Spark workers are unable to modify values from the driver, > and accumulators are the one exception for this. Accumulators in Spark are > implemented in such a way that when a stage is recomputed (say for cache > eviction) the accumulator will be updated a second time. This makes > accumulators inside of transformations more difficult to use for things like > counting invalid records (one of the primary potential use cases of > collecting side information during a transformation). However in some cases > this counting during re-evaluation is exactly the behaviour we want (say in > tracking total execution time for a particular function). Spark would benefit > from a version of accumulators which did not double count even if stages were > re-executed. > Motivating example: > {code} > val parseTime = sc.accumulator(0L) > val parseFailures = sc.accumulator(0L) > val parsedData = sc.textFile(...).flatMap { line => > val start = System.currentTimeMillis() > val parsed = Try(parse(line)) > if (parsed.isFailure) parseFailures += 1 > parseTime += System.currentTimeMillis() - start > parsed.toOption > } > parsedData.cache() > val resultA = parsedData.map(...).filter(...).count() > // some intervening code. Almost anything could happen here -- some of > parsedData may > // get kicked out of the cache, or an executor where data was cached might > get lost > val resultB = parsedData.filter(...).map(...).flatMap(...).count() > // now we look at the accumulators > {code} > Here we would want parseFailures to only have been added to once for every > line which failed to parse. Unfortunately, the current Spark accumulator API > doesn’t support the current parseFailures use case since if some data had > been evicted its possible that it will be double counted. > See the full design document at > https://docs.google.com/document/d/1lR_l1g3zMVctZXrcVjFusq2iQVpr4XvRK_UUDsDr6nk/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12469) Data Property Accumulators for Spark (formerly Consistent Accumulators)
[ https://issues.apache.org/jira/browse/SPARK-12469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15687668#comment-15687668 ] Reynold Xin commented on SPARK-12469: - Sorry there is no way to get this in 2.1, given the size of the change. > Data Property Accumulators for Spark (formerly Consistent Accumulators) > --- > > Key: SPARK-12469 > URL: https://issues.apache.org/jira/browse/SPARK-12469 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Reporter: holdenk > > Tasks executed on Spark workers are unable to modify values from the driver, > and accumulators are the one exception for this. Accumulators in Spark are > implemented in such a way that when a stage is recomputed (say for cache > eviction) the accumulator will be updated a second time. This makes > accumulators inside of transformations more difficult to use for things like > counting invalid records (one of the primary potential use cases of > collecting side information during a transformation). However in some cases > this counting during re-evaluation is exactly the behaviour we want (say in > tracking total execution time for a particular function). Spark would benefit > from a version of accumulators which did not double count even if stages were > re-executed. > Motivating example: > {code} > val parseTime = sc.accumulator(0L) > val parseFailures = sc.accumulator(0L) > val parsedData = sc.textFile(...).flatMap { line => > val start = System.currentTimeMillis() > val parsed = Try(parse(line)) > if (parsed.isFailure) parseFailures += 1 > parseTime += System.currentTimeMillis() - start > parsed.toOption > } > parsedData.cache() > val resultA = parsedData.map(...).filter(...).count() > // some intervening code. Almost anything could happen here -- some of > parsedData may > // get kicked out of the cache, or an executor where data was cached might > get lost > val resultB = parsedData.filter(...).map(...).flatMap(...).count() > // now we look at the accumulators > {code} > Here we would want parseFailures to only have been added to once for every > line which failed to parse. Unfortunately, the current Spark accumulator API > doesn’t support the current parseFailures use case since if some data had > been evicted its possible that it will be double counted. > See the full design document at > https://docs.google.com/document/d/1lR_l1g3zMVctZXrcVjFusq2iQVpr4XvRK_UUDsDr6nk/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12469) Data Property Accumulators for Spark (formerly Consistent Accumulators)
[ https://issues.apache.org/jira/browse/SPARK-12469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15687604#comment-15687604 ] holdenk commented on SPARK-12469: - In some ways I agree, on the other hand its slipped 2.0 already (as a result of the API rewrite [~rxin] undertook around the time it wasn't feasible to keep up to date and get in) - so I'm ok with us pushing this out to 2.2 but I'm worried we will get in a habbit of just pushing this off. We've added features larger than this before during the window between RCs provided the PR existed before and it was primarily additive (and this is primarily additive, existing accumulators don't change but we do add new options), and this clearly meets the bar for that but there is a lot of review work left potentially so I think it would be pretty ok if we get this in near the start of 2.2 so that people can have some time to poke at the new API and. What do [~squito]/[~imranr]/[~rxin] think? > Data Property Accumulators for Spark (formerly Consistent Accumulators) > --- > > Key: SPARK-12469 > URL: https://issues.apache.org/jira/browse/SPARK-12469 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Reporter: holdenk > > Tasks executed on Spark workers are unable to modify values from the driver, > and accumulators are the one exception for this. Accumulators in Spark are > implemented in such a way that when a stage is recomputed (say for cache > eviction) the accumulator will be updated a second time. This makes > accumulators inside of transformations more difficult to use for things like > counting invalid records (one of the primary potential use cases of > collecting side information during a transformation). However in some cases > this counting during re-evaluation is exactly the behaviour we want (say in > tracking total execution time for a particular function). Spark would benefit > from a version of accumulators which did not double count even if stages were > re-executed. > Motivating example: > {code} > val parseTime = sc.accumulator(0L) > val parseFailures = sc.accumulator(0L) > val parsedData = sc.textFile(...).flatMap { line => > val start = System.currentTimeMillis() > val parsed = Try(parse(line)) > if (parsed.isFailure) parseFailures += 1 > parseTime += System.currentTimeMillis() - start > parsed.toOption > } > parsedData.cache() > val resultA = parsedData.map(...).filter(...).count() > // some intervening code. Almost anything could happen here -- some of > parsedData may > // get kicked out of the cache, or an executor where data was cached might > get lost > val resultB = parsedData.filter(...).map(...).flatMap(...).count() > // now we look at the accumulators > {code} > Here we would want parseFailures to only have been added to once for every > line which failed to parse. Unfortunately, the current Spark accumulator API > doesn’t support the current parseFailures use case since if some data had > been evicted its possible that it will be double counted. > See the full design document at > https://docs.google.com/document/d/1lR_l1g3zMVctZXrcVjFusq2iQVpr4XvRK_UUDsDr6nk/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-14146) Imported implicits can't be found in Spark REPL in some cases
[ https://issues.apache.org/jira/browse/SPARK-14146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15687597#comment-15687597 ] Shixiong Zhu edited comment on SPARK-14146 at 11/22/16 7:10 PM: Hm..., this issue seems worse than I thought. It will ignore all wildcard imports. E.g., {code} scala> import java.text._ import java.text._ scala> class Foo() { val f = new SimpleDateFormat("hh") } r: false h: import java.text._ wanted: Set(scala, SimpleDateFormat) r: false h: import org.apache.spark.sql.functions._ wanted: Set(scala, SimpleDateFormat) r: false h: import spark.sql wanted: Set(scala, SimpleDateFormat) r: false h: import spark.implicits._ wanted: Set(scala, SimpleDateFormat) r: false h: import org.apache.spark.SparkContext._ wanted: Set(scala, SimpleDateFormat) :13: error: not found: type SimpleDateFormat class Foo() { val f = new SimpleDateFormat("hh") } {code} was (Author: zsxwing): Hm..., this issue seems worse than I thought. It will ignore all wildcard imports. E.g., {code} bin/scala -Yrepl-class-based Welcome to Scala 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_45). Type in expressions for evaluation. Or try :help. scala> import java.text._ import java.text._ scala> class Foo() { val f = new SimpleDateFormat("hh") } :11: error: not found: type SimpleDateFormat class Foo() { val f = new SimpleDateFormat("hh") } {code} > Imported implicits can't be found in Spark REPL in some cases > - > > Key: SPARK-14146 > URL: https://issues.apache.org/jira/browse/SPARK-14146 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.0.0 >Reporter: Wenchen Fan > > {code} > class I(i: Int) { > def double: Int = i * 2 > } > class Context { > implicit def toI(i: Int): I = new I(i) > } > val c = new Context > import c._ > // OK > 1.double > // Fail > class A; 1.double > {code} > The above code snippets can work in Scala REPL however. > This will affect our Dataset functionality, for example: > {code} > class A; Seq(1 -> "a").toDS() // fail > {code} > or in paste mode: > {code} > :paste > class A > Seq(1 -> "a").toDS() // fail > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-14146) Imported implicits can't be found in Spark REPL in some cases
[ https://issues.apache.org/jira/browse/SPARK-14146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15687597#comment-15687597 ] Shixiong Zhu commented on SPARK-14146: -- Hm..., this issue seems worse than I thought. It will ignore all wildcard imports. E.g., {code} bin/scala -Yrepl-class-based Welcome to Scala 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_45). Type in expressions for evaluation. Or try :help. scala> import java.text._ import java.text._ scala> class Foo() { val f = new SimpleDateFormat("hh") } :11: error: not found: type SimpleDateFormat class Foo() { val f = new SimpleDateFormat("hh") } {code} > Imported implicits can't be found in Spark REPL in some cases > - > > Key: SPARK-14146 > URL: https://issues.apache.org/jira/browse/SPARK-14146 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.0.0 >Reporter: Wenchen Fan > > {code} > class I(i: Int) { > def double: Int = i * 2 > } > class Context { > implicit def toI(i: Int): I = new I(i) > } > val c = new Context > import c._ > // OK > 1.double > // Fail > class A; 1.double > {code} > The above code snippets can work in Scala REPL however. > This will affect our Dataset functionality, for example: > {code} > class A; Seq(1 -> "a").toDS() // fail > {code} > or in paste mode: > {code} > :paste > class A > Seq(1 -> "a").toDS() // fail > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12469) Data Property Accumulators for Spark (formerly Consistent Accumulators)
[ https://issues.apache.org/jira/browse/SPARK-12469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15687587#comment-15687587 ] Herman van Hovell commented on SPARK-12469: --- [~holdenk] We are really close to RC. I does not make sense to add such a large feature at this point. Can we push this to 2.2? > Data Property Accumulators for Spark (formerly Consistent Accumulators) > --- > > Key: SPARK-12469 > URL: https://issues.apache.org/jira/browse/SPARK-12469 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Reporter: holdenk > > Tasks executed on Spark workers are unable to modify values from the driver, > and accumulators are the one exception for this. Accumulators in Spark are > implemented in such a way that when a stage is recomputed (say for cache > eviction) the accumulator will be updated a second time. This makes > accumulators inside of transformations more difficult to use for things like > counting invalid records (one of the primary potential use cases of > collecting side information during a transformation). However in some cases > this counting during re-evaluation is exactly the behaviour we want (say in > tracking total execution time for a particular function). Spark would benefit > from a version of accumulators which did not double count even if stages were > re-executed. > Motivating example: > {code} > val parseTime = sc.accumulator(0L) > val parseFailures = sc.accumulator(0L) > val parsedData = sc.textFile(...).flatMap { line => > val start = System.currentTimeMillis() > val parsed = Try(parse(line)) > if (parsed.isFailure) parseFailures += 1 > parseTime += System.currentTimeMillis() - start > parsed.toOption > } > parsedData.cache() > val resultA = parsedData.map(...).filter(...).count() > // some intervening code. Almost anything could happen here -- some of > parsedData may > // get kicked out of the cache, or an executor where data was cached might > get lost > val resultB = parsedData.filter(...).map(...).flatMap(...).count() > // now we look at the accumulators > {code} > Here we would want parseFailures to only have been added to once for every > line which failed to parse. Unfortunately, the current Spark accumulator API > doesn’t support the current parseFailures use case since if some data had > been evicted its possible that it will be double counted. > See the full design document at > https://docs.google.com/document/d/1lR_l1g3zMVctZXrcVjFusq2iQVpr4XvRK_UUDsDr6nk/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17772) Add helper testing methods for instance weighting
[ https://issues.apache.org/jira/browse/SPARK-17772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Herman van Hovell updated SPARK-17772: -- Target Version/s: 2.2.0 (was: 2.1.0) > Add helper testing methods for instance weighting > - > > Key: SPARK-17772 > URL: https://issues.apache.org/jira/browse/SPARK-17772 > Project: Spark > Issue Type: Test > Components: ML >Reporter: Seth Hendrickson >Assignee: Seth Hendrickson >Priority: Minor > > More and more ML algos are accepting instance weights. We keep replicating > code to test instance weighting in every test suite, which will get out of > hand rather quickly. We can and should implement some generic instance weight > test helper methods so that we can reduce duplicated code and standardize > these tests. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org