spark git commit: [SPARK-13996] [SQL] Add more not null attributes for Filter codegen
Repository: spark Updated Branches: refs/heads/master 1cf701834 -> c2f25b1a1 [SPARK-13996] [SQL] Add more not null attributes for Filter codegen ## What changes were proposed in this pull request? JIRA: https://issues.apache.org/jira/browse/SPARK-13996 Filter codegen finds the attributes not null by checking IsNotNull(a) expression with a condition if child.output.contains(a). However, the current approach to checking it is not comprehensive. We can improve it. E.g., for this plan: val rdd = sqlContext.sparkContext.makeRDD(Seq(Row(1, "1"), Row(null, "1"), Row(2, "2"))) val schema = new StructType().add("k", IntegerType).add("v", StringType) val smallDF = sqlContext.createDataFrame(rdd, schema) val df = smallDF.filter("isnotnull(k + 1)") The code snippet generated without this patch: /* 031 */ protected void processNext() throws java.io.IOException { /* 032 */ /*** PRODUCE: Filter isnotnull((k#0 + 1)) */ /* 033 */ /* 034 */ /*** PRODUCE: INPUT */ /* 035 */ /* 036 */ while (!shouldStop() && inputadapter_input.hasNext()) { /* 037 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 038 */ /*** CONSUME: Filter isnotnull((k#0 + 1)) */ /* 039 */ /* input[0, int] */ /* 040 */ boolean filter_isNull = inputadapter_row.isNullAt(0); /* 041 */ int filter_value = filter_isNull ? -1 : (inputadapter_row.getInt(0)); /* 042 */ /* 043 */ /* isnotnull((input[0, int] + 1)) */ /* 044 */ /* (input[0, int] + 1) */ /* 045 */ boolean filter_isNull3 = true; /* 046 */ int filter_value3 = -1; /* 047 */ /* 048 */ if (!filter_isNull) { /* 049 */ filter_isNull3 = false; // resultCode could change nullability. /* 050 */ filter_value3 = filter_value + 1; /* 051 */ /* 052 */ } /* 053 */ if (!(!(filter_isNull3))) continue; /* 054 */ /* 055 */ filter_metricValue.add(1); With this patch: /* 031 */ protected void processNext() throws java.io.IOException { /* 032 */ /*** PRODUCE: Filter isnotnull((k#0 + 1)) */ /* 033 */ /* 034 */ /*** PRODUCE: INPUT */ /* 035 */ /* 036 */ while (!shouldStop() && inputadapter_input.hasNext()) { /* 037 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 038 */ /*** CONSUME: Filter isnotnull((k#0 + 1)) */ /* 039 */ /* input[0, int] */ /* 040 */ boolean filter_isNull = inputadapter_row.isNullAt(0); /* 041 */ int filter_value = filter_isNull ? -1 : (inputadapter_row.getInt(0)); /* 042 */ /* 043 */ if (filter_isNull) continue; /* 044 */ /* 045 */ filter_metricValue.add(1); ## How was this patch tested? Existing tests. Author: Liang-Chi HsiehCloses #11810 from viirya/add-more-not-null-attrs. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c2f25b1a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c2f25b1a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c2f25b1a Branch: refs/heads/master Commit: c2f25b1a148eeb1791ea7018b14b3a665c13212a Parents: 1cf7018 Author: Liang-Chi Hsieh Authored: Sat Apr 2 19:34:38 2016 -0700 Committer: Davies Liu Committed: Sat Apr 2 19:34:38 2016 -0700 -- .../org/apache/spark/sql/execution/basicOperators.scala | 8 1 file changed, 4 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c2f25b1a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index a6a14df..fb1c618 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -79,12 +79,12 @@ case class Filter(condition: Expression, child: SparkPlan) // Split out all the IsNotNulls from condition. private val (notNullPreds, otherPreds) = splitConjunctivePredicates(condition).partition { -case IsNotNull(a) if child.output.exists(_.semanticEquals(a)) => true +case IsNotNull(a: NullIntolerant) if a.references.subsetOf(child.outputSet) => true case _ => false } // The columns that will filtered out by `IsNotNull` could be considered as not nullable. - private val notNullAttributes = notNullPreds.flatMap(_.references) + private val notNullAttributes =
spark git commit: [SPARK-14056] Appends s3 specific configurations and spark.hadoop con…
Repository: spark Updated Branches: refs/heads/master 03d130f97 -> 1cf701834 [SPARK-14056] Appends s3 specific configurations and spark.hadoop con⦠## What changes were proposed in this pull request? Appends s3 specific configurations and spark.hadoop configurations to hive configuration. ## How was this patch tested? Tested by running a job on cluster. â¦figurations to hive configuration. Author: Sital KediaCloses #11876 from sitalkedia/hiveConf. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1cf70183 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1cf70183 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1cf70183 Branch: refs/heads/master Commit: 1cf70183423b938ec064925b20fd4a5b9e355991 Parents: 03d130f Author: Sital Kedia Authored: Sat Apr 2 19:17:25 2016 -0700 Committer: Sean Owen Committed: Sat Apr 2 19:17:25 2016 -0700 -- .../apache/spark/deploy/SparkHadoopUtil.scala| 19 +-- .../org/apache/spark/sql/hive/TableReader.scala | 4 ++-- 2 files changed, 15 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1cf70183/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 06b7b38..4e8e363 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -74,13 +74,12 @@ class SparkHadoopUtil extends Logging { } } - /** - * Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop - * subsystems. - */ - def newConfiguration(conf: SparkConf): Configuration = { -val hadoopConf = new Configuration() + /** +* Appends S3-specific, spark.hadoop.*, and spark.buffer.size configurations to a Hadoop +* configuration. +*/ + def appendS3AndSparkHadoopConfigurations(conf: SparkConf, hadoopConf: Configuration): Unit = { // Note: this null check is around more than just access to the "conf" object to maintain // the behavior of the old implementation of this code, for backwards compatibility. if (conf != null) { @@ -106,7 +105,15 @@ class SparkHadoopUtil extends Logging { val bufferSize = conf.get("spark.buffer.size", "65536") hadoopConf.set("io.file.buffer.size", bufferSize) } + } + /** +* Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop +* subsystems. +*/ + def newConfiguration(conf: SparkConf): Configuration = { +val hadoopConf = new Configuration() +appendS3AndSparkHadoopConfigurations(conf, hadoopConf) hadoopConf } http://git-wip-us.apache.org/repos/asf/spark/blob/1cf70183/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 80b24dc..54afe9c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -34,6 +34,7 @@ import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf} import org.apache.spark.broadcast.Broadcast +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD} import org.apache.spark.sql.catalyst.InternalRow @@ -74,8 +75,7 @@ class HadoopTableReader( math.max(sc.hiveconf.getInt("mapred.map.tasks", 1), sc.sparkContext.defaultMinPartitions) } - // TODO: set aws s3 credentials. - + SparkHadoopUtil.get.appendS3AndSparkHadoopConfigurations(sc.sparkContext.conf, hiveExtraConf) private val _broadcastedHiveConf = sc.sparkContext.broadcast(new SerializableConfiguration(hiveExtraConf)) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-14342][CORE][DOCS][TESTS] Remove straggler references to Tachyon
Repository: spark Updated Branches: refs/heads/master 4a6e78abd -> 03d130f97 [SPARK-14342][CORE][DOCS][TESTS] Remove straggler references to Tachyon ## What changes were proposed in this pull request? Straggler references to Tachyon were removed: - for docs, `tachyon` has been generalized as `off-heap memory`; - for Mesos test suits, the key-value `tachyon:true`/`tachyon:false` has been changed to `os:centos`/`os:ubuntu`, since `os` is an example constrain used by the [Mesos official docs](http://mesos.apache.org/documentation/attributes-resources/). ## How was this patch tested? Existing test suites. Author: Liwei LinCloses #12129 from lw-lin/tachyon-cleanup. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/03d130f9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/03d130f9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/03d130f9 Branch: refs/heads/master Commit: 03d130f9734be66e8aefc4ffaa207ee13e837629 Parents: 4a6e78a Author: Liwei Lin Authored: Sat Apr 2 17:55:46 2016 -0700 Committer: Reynold Xin Committed: Sat Apr 2 17:55:46 2016 -0700 -- .../apache/spark/api/java/StorageLevels.java| 4 +-- .../cluster/mesos/MesosSchedulerUtils.scala | 4 +-- .../mesos/MesosSchedulerUtilsSuite.scala| 32 ++-- docs/running-on-mesos.md| 4 +-- docs/streaming-programming-guide.md | 2 +- python/pyspark/storagelevel.py | 2 +- 6 files changed, 24 insertions(+), 24 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/03d130f9/core/src/main/java/org/apache/spark/api/java/StorageLevels.java -- diff --git a/core/src/main/java/org/apache/spark/api/java/StorageLevels.java b/core/src/main/java/org/apache/spark/api/java/StorageLevels.java index 666c797..23673d3 100644 --- a/core/src/main/java/org/apache/spark/api/java/StorageLevels.java +++ b/core/src/main/java/org/apache/spark/api/java/StorageLevels.java @@ -39,8 +39,8 @@ public class StorageLevels { /** * Create a new StorageLevel object. * @param useDisk saved to disk, if true - * @param useMemory saved to memory, if true - * @param useOffHeap saved to Tachyon, if true + * @param useMemory saved to on-heap memory, if true + * @param useOffHeap saved to off-heap memory, if true * @param deserialized saved as deserialized objects, if true * @param replication replication factor */ http://git-wip-us.apache.org/repos/asf/spark/blob/03d130f9/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index 35f9143..233bdc2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -283,11 +283,11 @@ private[mesos] trait MesosSchedulerUtils extends Logging { * are separated by ':'. The ':' implies equality (for singular values) and "is one of" for * multiple values (comma separated). For example: * {{{ - * parseConstraintString("tachyon:true;zone:us-east-1a,us-east-1b") + * parseConstraintString("os:centos7;zone:us-east-1a,us-east-1b") * // would result in * * Map( - *"tachyon" -> Set("true"), + *"os" -> Set("centos7"), *"zone": -> Set("us-east-1a", "us-east-1b") * ) * }}} http://git-wip-us.apache.org/repos/asf/spark/blob/03d130f9/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala index 85437b2..ceb3a52 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala @@ -59,10 +59,10 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS test("parse a non-empty constraint string correctly") { val expectedMap = Map( - "tachyon" -> Set("true"), + "os" -> Set("centos7"), "zone" -> Set("us-east-1a", "us-east-1b") ) -utils.parseConstraintString("tachyon:true;zone:us-east-1a,us-east-1b")
[2/2] spark git commit: [MINOR][DOCS] Use multi-line JavaDoc comments in Scala code.
[MINOR][DOCS] Use multi-line JavaDoc comments in Scala code. ## What changes were proposed in this pull request? This PR aims to fix all Scala-Style multiline comments into Java-Style multiline comments in Scala codes. (All comment-only changes over 77 files: +786 lines, â747 lines) ## How was this patch tested? Manual. Author: Dongjoon HyunCloses #12130 from dongjoon-hyun/use_multiine_javadoc_comments. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4a6e78ab Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4a6e78ab Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4a6e78ab Branch: refs/heads/master Commit: 4a6e78abd9d5edc4a5092738dff0006bbe202a89 Parents: f705037 Author: Dongjoon Hyun Authored: Sat Apr 2 17:50:40 2016 -0700 Committer: Reynold Xin Committed: Sat Apr 2 17:50:40 2016 -0700 -- .../scala/org/apache/spark/FutureAction.scala | 14 +- .../scala/org/apache/spark/SSLOptions.scala | 57 +++--- .../scala/org/apache/spark/SparkContext.scala | 42 ++-- .../org/apache/spark/api/java/JavaPairRDD.scala | 8 +- .../spark/api/java/JavaSparkContext.scala | 60 +++--- .../spark/deploy/worker/CommandUtils.scala | 2 +- .../org/apache/spark/rdd/CoGroupedRDD.scala | 10 +- .../main/scala/org/apache/spark/rdd/RDD.scala | 4 +- .../mesos/CoarseMesosSchedulerBackend.scala | 24 +-- .../cluster/mesos/MesosClusterScheduler.scala | 12 +- .../cluster/mesos/MesosSchedulerUtils.scala | 4 +- .../apache/spark/shuffle/ShuffleManager.scala | 6 +- .../spark/storage/memory/MemoryStore.scala | 20 +- .../scala/org/apache/spark/util/Utils.scala | 17 +- .../test/scala/org/apache/spark/Smuggle.scala | 46 ++--- .../spark/memory/MemoryManagerSuite.scala | 24 +-- .../apache/spark/examples/BroadcastTest.scala | 4 +- .../spark/examples/DFSReadWriteTest.scala | 20 +- .../org/apache/spark/examples/GroupByTest.scala | 4 +- .../spark/examples/MultiBroadcastTest.scala | 4 +- .../examples/SimpleSkewedGroupByTest.scala | 4 +- .../spark/examples/SkewedGroupByTest.scala | 4 +- .../clickstream/PageViewGenerator.scala | 23 +-- .../streaming/clickstream/PageViewStream.scala | 21 +- .../streaming/flume/FlumeInputDStream.scala | 15 +- .../streaming/kafka/KafkaRDDPartition.scala | 15 +- .../org/apache/spark/graphx/GraphOps.scala | 10 +- .../spark/graphx/lib/ConnectedComponents.scala | 18 +- .../spark/ml/feature/ElementwiseProduct.scala | 6 +- .../python/GaussianMixtureModelWrapper.scala| 8 +- .../mllib/api/python/Word2VecModelWrapper.scala | 4 +- .../apache/spark/mllib/linalg/Matrices.scala| 16 +- .../StreamingLinearRegressionWithSGD.scala | 4 +- .../org/apache/spark/repl/SparkILoop.scala | 21 +- .../org/apache/spark/repl/SparkImports.scala| 5 +- .../scala/org/apache/spark/sql/Encoder.scala| 24 +-- .../spark/sql/catalyst/analysis/Analyzer.scala | 20 +- .../sql/catalyst/expressions/Projection.scala | 6 +- .../expressions/codegen/CodeGenerator.scala | 26 +-- .../sql/catalyst/expressions/grouping.scala | 18 +- .../spark/sql/catalyst/expressions/misc.scala | 4 +- .../sql/catalyst/optimizer/Optimizer.scala | 40 ++-- .../spark/sql/catalyst/parser/AstBuilder.scala | 4 +- .../spark/sql/catalyst/planning/patterns.scala | 28 +-- .../spark/sql/catalyst/plans/QueryPlan.scala| 4 +- .../catalyst/plans/physical/partitioning.scala | 6 +- .../optimizer/OptimizerExtendableSuite.scala| 14 +- .../scala/org/apache/spark/sql/SQLContext.scala | 14 +- .../spark/sql/execution/CacheManager.scala | 7 +- .../apache/spark/sql/execution/SparkPlan.scala | 4 +- .../spark/sql/execution/WholeStageCodegen.scala | 172 - .../org/apache/spark/sql/execution/Window.scala | 36 ++-- .../aggregate/AggregationIterator.scala | 22 +-- .../SortBasedAggregationIterator.scala | 6 +- .../execution/aggregate/TungstenAggregate.scala | 16 +- .../execution/datasources/SqlNewHadoopRDD.scala | 8 +- .../datasources/csv/CSVInferSchema.scala| 22 +-- .../datasources/csv/DefaultSource.scala | 4 +- .../spark/sql/execution/datasources/ddl.scala | 8 +- .../sql/execution/joins/BroadcastHashJoin.scala | 22 +-- .../sql/execution/joins/CartesianProduct.scala | 8 +- .../spark/sql/execution/joins/HashJoin.scala| 8 +- .../sql/execution/joins/HashedRelation.scala| 76 .../sql/execution/joins/SortMergeJoin.scala | 36 ++-- .../state/HDFSBackedStateStoreProvider.scala| 8 +- .../spark/sql/execution/ui/SparkPlanGraph.scala | 4 +- .../scala/org/apache/spark/sql/functions.scala | 191 +--
[1/2] spark git commit: [MINOR][DOCS] Use multi-line JavaDoc comments in Scala code.
Repository: spark Updated Branches: refs/heads/master f70503761 -> 4a6e78abd http://git-wip-us.apache.org/repos/asf/spark/blob/4a6e78ab/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala index 9bdf611..9f539c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala @@ -31,8 +31,8 @@ import org.apache.spark.sql.execution.metric.{LongSQLMetricValue, SQLMetrics} import org.apache.spark.sql.internal.SQLConf /** - * An interface for those physical operators that support codegen. - */ + * An interface for those physical operators that support codegen. + */ trait CodegenSupport extends SparkPlan { /** Prefix used in the current operator's variable names. */ @@ -46,10 +46,10 @@ trait CodegenSupport extends SparkPlan { } /** -* Creates a metric using the specified name. -* -* @return name of the variable representing the metric -*/ + * Creates a metric using the specified name. + * + * @return name of the variable representing the metric + */ def metricTerm(ctx: CodegenContext, name: String): String = { val metric = ctx.addReferenceObj(name, longMetric(name)) val value = ctx.freshName("metricValue") @@ -59,25 +59,25 @@ trait CodegenSupport extends SparkPlan { } /** -* Whether this SparkPlan support whole stage codegen or not. -*/ + * Whether this SparkPlan support whole stage codegen or not. + */ def supportCodegen: Boolean = true /** -* Which SparkPlan is calling produce() of this one. It's itself for the first SparkPlan. -*/ + * Which SparkPlan is calling produce() of this one. It's itself for the first SparkPlan. + */ protected var parent: CodegenSupport = null /** -* Returns all the RDDs of InternalRow which generates the input rows. -* -* Note: right now we support up to two RDDs. -*/ + * Returns all the RDDs of InternalRow which generates the input rows. + * + * Note: right now we support up to two RDDs. + */ def upstreams(): Seq[RDD[InternalRow]] /** -* Returns Java source code to process the rows from upstream. -*/ + * Returns Java source code to process the rows from upstream. + */ final def produce(ctx: CodegenContext, parent: CodegenSupport): String = { this.parent = parent ctx.freshNamePrefix = variablePrefix @@ -89,28 +89,28 @@ trait CodegenSupport extends SparkPlan { } /** -* Generate the Java source code to process, should be overridden by subclass to support codegen. -* -* doProduce() usually generate the framework, for example, aggregation could generate this: -* -* if (!initialized) { -* # create a hash map, then build the aggregation hash map -* # call child.produce() -* initialized = true; -* } -* while (hashmap.hasNext()) { -* row = hashmap.next(); -* # build the aggregation results -* # create variables for results -* # call consume(), which will call parent.doConsume() + * Generate the Java source code to process, should be overridden by subclass to support codegen. + * + * doProduce() usually generate the framework, for example, aggregation could generate this: + * + * if (!initialized) { + * # create a hash map, then build the aggregation hash map + * # call child.produce() + * initialized = true; + * } + * while (hashmap.hasNext()) { + * row = hashmap.next(); + * # build the aggregation results + * # create variables for results + * # call consume(), which will call parent.doConsume() * if (shouldStop()) return; -* } -*/ + * } + */ protected def doProduce(ctx: CodegenContext): String /** -* Consume the generated columns or row from current SparkPlan, call it's parent's doConsume(). -*/ + * Consume the generated columns or row from current SparkPlan, call it's parent's doConsume(). + */ final def consume(ctx: CodegenContext, outputVars: Seq[ExprCode], row: String = null): String = { val inputVars = if (row != null) { @@ -158,9 +158,9 @@ trait CodegenSupport extends SparkPlan { } /** -* Returns source code to evaluate all the variables, and clear the code of them, to prevent -* them to be evaluated twice. -*/ + * Returns source code to evaluate all the variables, and clear the code of them, to prevent + * them to be evaluated twice. + */ protected def evaluateVariables(variables: Seq[ExprCode]): String = { val evaluate = variables.filter(_.code !=
spark git commit: [SPARK-14338][SQL] Improve `SimplifyConditionals` rule to handle `null` in IF/CASEWHEN
Repository: spark Updated Branches: refs/heads/master a3e293542 -> f70503761 [SPARK-14338][SQL] Improve `SimplifyConditionals` rule to handle `null` in IF/CASEWHEN ## What changes were proposed in this pull request? Currently, `SimplifyConditionals` handles `true` and `false` to optimize branches. This PR improves `SimplifyConditionals` to take advantage of `null` conditions for `if` and `CaseWhen` expressions, too. **Before** ``` scala> sql("SELECT IF(null, 1, 0)").explain() == Physical Plan == WholeStageCodegen : +- Project [if (null) 1 else 0 AS (IF(CAST(NULL AS BOOLEAN), 1, 0))#4] : +- INPUT +- Scan OneRowRelation[] scala> sql("select case when cast(null as boolean) then 1 else 2 end").explain() == Physical Plan == WholeStageCodegen : +- Project [CASE WHEN null THEN 1 ELSE 2 END AS CASE WHEN CAST(NULL AS BOOLEAN) THEN 1 ELSE 2 END#14] : +- INPUT +- Scan OneRowRelation[] ``` **After** ``` scala> sql("SELECT IF(null, 1, 0)").explain() == Physical Plan == WholeStageCodegen : +- Project [0 AS (IF(CAST(NULL AS BOOLEAN), 1, 0))#4] : +- INPUT +- Scan OneRowRelation[] scala> sql("select case when cast(null as boolean) then 1 else 2 end").explain() == Physical Plan == WholeStageCodegen : +- Project [2 AS CASE WHEN CAST(NULL AS BOOLEAN) THEN 1 ELSE 2 END#4] : +- INPUT +- Scan OneRowRelation[] ``` **Hive** ``` hive> select if(null,1,2); OK 2 hive> select case when cast(null as boolean) then 1 else 2 end; OK 2 ``` ## How was this patch tested? Pass the Jenkins tests (including new extended test cases). Author: Dongjoon HyunCloses #12122 from dongjoon-hyun/SPARK-14338. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f7050376 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f7050376 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f7050376 Branch: refs/heads/master Commit: f705037617d55bb479ec60bcb1e55c736224be94 Parents: a3e2935 Author: Dongjoon Hyun Authored: Sat Apr 2 17:48:53 2016 -0700 Committer: Reynold Xin Committed: Sat Apr 2 17:48:53 2016 -0700 -- .../spark/sql/catalyst/optimizer/Optimizer.scala| 13 ++--- .../optimizer/SimplifyConditionalSuite.scala| 16 +++- 2 files changed, 21 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f7050376/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 326933e..a5ab390 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -527,7 +527,7 @@ object LikeSimplification extends Rule[LogicalPlan] { * Null value propagation from bottom to top of the expression tree. */ object NullPropagation extends Rule[LogicalPlan] { - def nonNullLiteral(e: Expression): Boolean = e match { + private def nonNullLiteral(e: Expression): Boolean = e match { case Literal(null, _) => false case _ => true } @@ -773,17 +773,24 @@ object BooleanSimplification extends Rule[LogicalPlan] with PredicateHelper { * Simplifies conditional expressions (if / case). */ object SimplifyConditionals extends Rule[LogicalPlan] with PredicateHelper { + private def falseOrNullLiteral(e: Expression): Boolean = e match { +case FalseLiteral => true +case Literal(null, _) => true +case _ => false + } + def apply(plan: LogicalPlan): LogicalPlan = plan transform { case q: LogicalPlan => q transformExpressionsUp { case If(TrueLiteral, trueValue, _) => trueValue case If(FalseLiteral, _, falseValue) => falseValue + case If(Literal(null, _), _, falseValue) => falseValue - case e @ CaseWhen(branches, elseValue) if branches.exists(_._1 == FalseLiteral) => + case e @ CaseWhen(branches, elseValue) if branches.exists(x => falseOrNullLiteral(x._1)) => // If there are branches that are always false, remove them. // If there are no more branches left, just use the else value. // Note that these two are handled together here in a single case statement because // otherwise we cannot determine the data type for the elseValue if it is None (i.e. null). -val newBranches = branches.filter(_._1 != FalseLiteral) +val newBranches = branches.filter(x => !falseOrNullLiteral(x._1)) if (newBranches.isEmpty) { elseValue.getOrElse(Literal.create(null, e.dataType)) } else {
spark git commit: [HOTFIX] Disable StateStoreSuite.maintenance
Repository: spark Updated Branches: refs/heads/master 06694f1c6 -> a3e293542 [HOTFIX] Disable StateStoreSuite.maintenance Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a3e29354 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a3e29354 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a3e29354 Branch: refs/heads/master Commit: a3e293542a6e7df9bcc7d9bbd22b3c93a81bcc38 Parents: 06694f1 Author: Reynold XinAuthored: Sat Apr 2 12:44:02 2016 -0700 Committer: Reynold Xin Committed: Sat Apr 2 12:44:02 2016 -0700 -- .../spark/sql/execution/streaming/state/StateStoreSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a3e29354/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index 0e5936d..dd23925 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -352,7 +352,7 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth } } - test("maintenance") { + ignore("maintenance") { val conf = new SparkConf() .setMaster("local") .setAppName("test") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [MINOR] Typo fixes
Repository: spark Updated Branches: refs/heads/master 67d753516 -> 06694f1c6 [MINOR] Typo fixes ## What changes were proposed in this pull request? Typo fixes. No functional changes. ## How was this patch tested? Built the sources and ran with samples. Author: Jacek LaskowskiCloses #11802 from jaceklaskowski/typo-fixes. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/06694f1c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/06694f1c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/06694f1c Branch: refs/heads/master Commit: 06694f1c68cb752ea311144f0dbe50e92e1393cf Parents: 67d7535 Author: Jacek Laskowski Authored: Sat Apr 2 08:12:04 2016 -0700 Committer: Sean Owen Committed: Sat Apr 2 08:12:04 2016 -0700 -- .../streaming/RecoverableNetworkWordCount.scala | 2 +- .../scala/org/apache/spark/ml/Pipeline.scala| 2 +- .../spark/ml/regression/LinearRegression.scala | 2 +- .../catalyst/plans/logical/LogicalPlan.scala| 4 ++-- .../apache/spark/sql/ExperimentalMethods.scala | 2 +- .../sql/execution/joins/BroadcastHashJoin.scala | 2 +- .../scala/org/apache/spark/sql/functions.scala | 12 +-- .../spark/streaming/StreamingContext.scala | 13 ++-- .../dstream/ConstantInputDStream.scala | 2 +- .../spark/streaming/dstream/DStream.scala | 8 +++ .../dstream/DStreamCheckpointData.scala | 6 +++--- .../spark/streaming/dstream/InputDStream.scala | 6 +++--- .../dstream/ReducedWindowedDStream.scala| 2 +- .../spark/streaming/dstream/StateDStream.scala | 12 +-- .../scheduler/ReceivedBlockTracker.scala| 4 ++-- .../scheduler/rate/RateEstimator.scala | 22 +++- 16 files changed, 52 insertions(+), 49 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/06694f1c/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala index 05f8e65..b6b8bc3 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala @@ -141,7 +141,7 @@ object RecoverableNetworkWordCount { def main(args: Array[String]) { if (args.length != 4) { - System.err.println("You arguments were " + args.mkString("[", ", ", "]")) + System.err.println("Your arguments were " + args.mkString("[", ", ", "]")) System.err.println( """ |Usage: RecoverableNetworkWordCount http://git-wip-us.apache.org/repos/asf/spark/blob/06694f1c/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala index 3a99979..afefaaa 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala @@ -147,7 +147,7 @@ class Pipeline @Since("1.4.0") ( t case _ => throw new IllegalArgumentException( - s"Do not support stage $stage of type ${stage.getClass}") + s"Does not support stage $stage of type ${stage.getClass}") } if (index < indexOfLastEstimator) { curDataset = transformer.transform(curDataset) http://git-wip-us.apache.org/repos/asf/spark/blob/06694f1c/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index ba5ad4c..2633c06 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -58,7 +58,7 @@ private[regression] trait LinearRegressionParams extends PredictorParams * The specific squared error loss function used is: * L = 1/2n ||A coefficients - y||^2^ * - * This support multiple types of regularization: + * This supports multiple types of regularization: * - none (a.k.a. ordinary least squares) * - L2 (ridge regression) * - L1 (Lasso)
spark git commit: [HOTFIX] Fix compilation break.
Repository: spark Updated Branches: refs/heads/master d7982a3a9 -> 67d753516 [HOTFIX] Fix compilation break. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/67d75351 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/67d75351 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/67d75351 Branch: refs/heads/master Commit: 67d753516da9b6318cd4001bb7ae91703aaf098d Parents: d7982a3 Author: Reynold XinAuthored: Sat Apr 2 00:00:19 2016 -0700 Committer: Reynold Xin Committed: Sat Apr 2 00:00:19 2016 -0700 -- .../spark/sql/execution/streaming/HDFSMetadataLogSuite.scala | 1 + .../spark/sql/streaming/StreamingAggregationSuite.scala | 8 +++- 2 files changed, 4 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/67d75351/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala index d5db9db..1328142 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala @@ -21,6 +21,7 @@ import java.io.{File, FileNotFoundException, IOException} import java.net.URI import java.util.ConcurrentModificationException +import scala.language.implicitConversions import scala.util.Random import org.apache.hadoop.conf.Configuration http://git-wip-us.apache.org/repos/asf/spark/blob/67d75351/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index b63ce89..3af7c01 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -18,8 +18,9 @@ package org.apache.spark.sql.streaming import org.apache.spark.SparkException -import org.apache.spark.sql.{Encoder, StreamTest, SumOf, TypedColumn} +import org.apache.spark.sql.StreamTest import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.expressions.scala.typed import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext @@ -118,11 +119,8 @@ class StreamingAggregationSuite extends StreamTest with SharedSQLContext { } test("typed aggregators") { -def sum[I, N : Numeric : Encoder](f: I => N): TypedColumn[I, N] = - new SumOf(f).toColumn - val inputData = MemoryStream[(String, Int)] -val aggregated = inputData.toDS().groupByKey(_._1).agg(sum(_._2)) +val aggregated = inputData.toDS().groupByKey(_._1).agg(typed.sumLong(_._2)) testStream(aggregated)( AddData(inputData, ("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)), - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org