spark git commit: [SPARK-20703][SQL][FOLLOW-UP] Associate metrics with data writes onto DataFrameWriter operations
Repository: spark Updated Branches: refs/heads/master c09b31eb8 -> 5df99bd36 [SPARK-20703][SQL][FOLLOW-UP] Associate metrics with data writes onto DataFrameWriter operations ## What changes were proposed in this pull request? Remove time metrics since it seems no way to measure it in non per-row tracking. ## How was this patch tested? Existing tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi HsiehCloses #18558 from viirya/SPARK-20703-followup. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5df99bd3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5df99bd3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5df99bd3 Branch: refs/heads/master Commit: 5df99bd364561c6f4c02308149ba5eb71f89247e Parents: c09b31e Author: Liang-Chi Hsieh Authored: Fri Jul 7 13:12:20 2017 +0800 Committer: Wenchen Fan Committed: Fri Jul 7 13:12:20 2017 +0800 -- .../execution/command/DataWritingCommand.scala | 10 - .../datasources/FileFormatWriter.scala | 22 +++- .../sql/hive/execution/SQLMetricsSuite.scala| 3 --- 3 files changed, 3 insertions(+), 32 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5df99bd3/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala index 0c381a2..700f7f8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala @@ -30,7 +30,6 @@ trait DataWritingCommand extends RunnableCommand { override lazy val metrics: Map[String, SQLMetric] = { val sparkContext = SparkContext.getActive.get Map( - "avgTime" -> SQLMetrics.createMetric(sparkContext, "average writing time (ms)"), "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of written files"), "numOutputBytes" -> SQLMetrics.createMetric(sparkContext, "bytes of written output"), "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), @@ -47,23 +46,14 @@ trait DataWritingCommand extends RunnableCommand { var numFiles = 0 var totalNumBytes: Long = 0L var totalNumOutput: Long = 0L -var totalWritingTime: Long = 0L writeSummaries.foreach { summary => numPartitions += summary.updatedPartitions.size numFiles += summary.numOutputFile totalNumBytes += summary.numOutputBytes totalNumOutput += summary.numOutputRows - totalWritingTime += summary.totalWritingTime } -val avgWritingTime = if (numFiles > 0) { - (totalWritingTime / numFiles).toLong -} else { - 0L -} - -metrics("avgTime").add(avgWritingTime) metrics("numFiles").add(numFiles) metrics("numOutputBytes").add(totalNumBytes) metrics("numOutputRows").add(totalNumOutput) http://git-wip-us.apache.org/repos/asf/spark/blob/5df99bd3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 6486663..9eb9eae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -275,8 +275,6 @@ object FileFormatWriter extends Logging { /** * The data structures used to measure metrics during writing. */ -protected var totalWritingTime: Long = 0L -protected var timeOnCurrentFile: Long = 0L protected var numOutputRows: Long = 0L protected var numOutputBytes: Long = 0L @@ -343,9 +341,7 @@ object FileFormatWriter extends Logging { } val internalRow = iter.next() -val startTime = System.nanoTime() currentWriter.write(internalRow) -timeOnCurrentFile += (System.nanoTime() - startTime) recordsInFile += 1 } releaseResources() @@ -355,17 +351,13 @@ object FileFormatWriter extends Logging { updatedPartitions = Set.empty, numOutputFile = fileCounter + 1, numOutputBytes = numOutputBytes, -numOutputRows = numOutputRows, -totalWritingTime =
spark git commit: [SPARK-21217][SQL] Support ColumnVector.Array.toArray()
Repository: spark Updated Branches: refs/heads/master 53c2eb59b -> c09b31eb8 [SPARK-21217][SQL] Support ColumnVector.Array.toArray() ## What changes were proposed in this pull request? This PR implements bulk-copy for `ColumnVector.Array.toArray()` methods (e.g. `toIntArray()`) in `ColumnVector.Array` by using `System.arrayCopy()` or `Platform.copyMemory()`. Before this PR, when one of these method is called, the generic method in `ArrayData` is called. It is not fast since element-wise copy is performed. This PR can improve performance of a benchmark program by 1.9x and 3.2x. Without this PR ``` OpenJDK 64-Bit Server VM 1.8.0_131-8u131-b11-0ubuntu1.16.04.2-b11 on Linux 4.4.0-66-generic Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz Int ArrayBest/Avg Time(ms)Rate(M/s) Per Row(ns) ON_HEAP586 / 628 14.3 69.9 OFF_HEAP 893 / 902 9.4 106.5 ``` With this PR ``` OpenJDK 64-Bit Server VM 1.8.0_131-8u131-b11-0ubuntu1.16.04.2-b11 on Linux 4.4.0-66-generic Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz Int ArrayBest/Avg Time(ms)Rate(M/s) Per Row(ns) ON_HEAP306 / 331 27.4 36.4 OFF_HEAP 282 / 287 29.8 33.6 ``` Source program ``` (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => { val len = 8 * 1024 * 1024 val column = ColumnVector.allocate(len * 2, new ArrayType(IntegerType, false), memMode) val data = column.arrayData var i = 0 while (i < len) { data.putInt(i, i) i += 1 } column.putArray(0, 0, len) val benchmark = new Benchmark("Int Array", len, minNumIters = 20) benchmark.addCase(s"$memMode") { iter => var i = 0 while (i < 50) { column.getArray(0).toIntArray i += 1 } } benchmark.run }} ``` ## How was this patch tested? Added test suite Author: Kazuaki IshizakiCloses #18425 from kiszk/SPARK-21217. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c09b31eb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c09b31eb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c09b31eb Branch: refs/heads/master Commit: c09b31eb8fa83d5463a045c9278f5874ae505a8e Parents: 53c2eb5 Author: Kazuaki Ishizaki Authored: Fri Jul 7 13:09:32 2017 +0800 Committer: Wenchen Fan Committed: Fri Jul 7 13:09:32 2017 +0800 -- .../sql/execution/vectorized/ColumnVector.java | 56 +++ .../vectorized/OffHeapColumnVector.java | 58 .../vectorized/OnHeapColumnVector.java | 58 .../vectorized/ColumnarBatchSuite.scala | 49 + 4 files changed, 221 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c09b31eb/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java -- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java index 24260a6..0c027f8 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java @@ -100,6 +100,27 @@ public abstract class ColumnVector implements AutoCloseable { throw new UnsupportedOperationException(); } +@Override +public boolean[] toBooleanArray() { return data.getBooleans(offset, length); } + +@Override +public byte[] toByteArray() { return data.getBytes(offset, length); } + +@Override +public short[] toShortArray() { return data.getShorts(offset, length); } + +@Override +public int[] toIntArray() { return data.getInts(offset, length); } + +@Override +public long[] toLongArray() { return data.getLongs(offset, length); } + +@Override +public float[] toFloatArray() { return data.getFloats(offset, length); } + +@Override +public double[] toDoubleArray() { return data.getDoubles(offset, length); } + // TODO: this is extremely expensive. @Override public Object[] array() { @@ -367,6 +388,11 @@ public abstract class ColumnVector implements
spark git commit: [SPARK-21327][SQL][PYSPARK] ArrayConstructor should handle an array of typecode 'l' as long rather than int in Python 2.
Repository: spark Updated Branches: refs/heads/master d451b7f43 -> 53c2eb59b [SPARK-21327][SQL][PYSPARK] ArrayConstructor should handle an array of typecode 'l' as long rather than int in Python 2. ## What changes were proposed in this pull request? Currently `ArrayConstructor` handles an array of typecode `'l'` as `int` when converting Python object in Python 2 into Java object, so if the value is larger than `Integer.MAX_VALUE` or smaller than `Integer.MIN_VALUE` then the overflow occurs. ```python import array data = [Row(longarray=array.array('l', [-9223372036854775808, 0, 9223372036854775807]))] df = spark.createDataFrame(data) df.show(truncate=False) ``` ``` +--+ |longarray | +--+ |[0, 0, -1]| +--+ ``` This should be: ``` +--+ |longarray | +--+ |[-9223372036854775808, 0, 9223372036854775807]| +--+ ``` ## How was this patch tested? Added a test and existing tests. Author: Takuya UESHINCloses #18553 from ueshin/issues/SPARK-21327. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/53c2eb59 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/53c2eb59 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/53c2eb59 Branch: refs/heads/master Commit: 53c2eb59b2cc557081f6a252748dc38511601b0d Parents: d451b7f Author: Takuya UESHIN Authored: Fri Jul 7 14:05:22 2017 +0900 Committer: Takuya UESHIN Committed: Fri Jul 7 14:05:22 2017 +0900 -- .../scala/org/apache/spark/api/python/SerDeUtil.scala | 10 ++ python/pyspark/sql/tests.py | 6 ++ 2 files changed, 16 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/53c2eb59/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala index 6e4eab4..42f67e8 100644 --- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala +++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala @@ -73,6 +73,16 @@ private[spark] object SerDeUtil extends Logging { // This must be ISO 8859-1 / Latin 1, not UTF-8, to interoperate correctly val data = args(1).asInstanceOf[String].getBytes(StandardCharsets.ISO_8859_1) construct(typecode, machineCodes(typecode), data) + } else if (args.length == 2 && args(0) == "l") { +// On Python 2, an array of typecode 'l' should be handled as long rather than int. +val values = args(1).asInstanceOf[JArrayList[_]] +val result = new Array[Long](values.size) +var i = 0 +while (i < values.size) { + result(i) = values.get(i).asInstanceOf[Number].longValue() + i += 1 +} +result } else { super.construct(args) } http://git-wip-us.apache.org/repos/asf/spark/blob/53c2eb59/python/pyspark/sql/tests.py -- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index c0e3b8d..9db2f40 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -2342,6 +2342,12 @@ class SQLTests(ReusedPySparkTestCase): self.assertEquals(types[2], np.bool) self.assertEquals(types[3], np.float32) +def test_create_dataframe_from_array_of_long(self): +import array +data = [Row(longarray=array.array('l', [-9223372036854775808, 0, 9223372036854775807]))] +df = self.spark.createDataFrame(data) +self.assertEqual(df.first(), Row(longarray=[-9223372036854775808, 0, 9223372036854775807])) + class HiveSparkSubmitTests(SparkSubmitTests): - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21326][SPARK-21066][ML] Use TextFileFormat in LibSVMFileFormat and allow multiple input paths for determining numFeatures
Repository: spark Updated Branches: refs/heads/master e5bb26174 -> d451b7f43 [SPARK-21326][SPARK-21066][ML] Use TextFileFormat in LibSVMFileFormat and allow multiple input paths for determining numFeatures ## What changes were proposed in this pull request? This is related with [SPARK-19918](https://issues.apache.org/jira/browse/SPARK-19918) and [SPARK-18362](https://issues.apache.org/jira/browse/SPARK-18362). This PR proposes to use `TextFileFormat` and allow multiple input paths (but with a warning) when determining the number of features in LibSVM data source via an extra scan. There are three points here: - The main advantage of this change should be to remove file-listing bottlenecks in driver side. - Another advantage is ones from using `FileScanRDD`. For example, I guess we can use `spark.sql.files.ignoreCorruptFiles` option when determining the number of features. - We can unify the schema inference code path in text based data sources. This is also a preparation for [SPARK-21289](https://issues.apache.org/jira/browse/SPARK-21289). ## How was this patch tested? Unit tests in `LibSVMRelationSuite`. Closes #18288 Author: hyukjinkwonCloses #18556 from HyukjinKwon/libsvm-schema. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d451b7f4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d451b7f4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d451b7f4 Branch: refs/heads/master Commit: d451b7f43d559aa1efd7ac3d1cbec5249f3a7a24 Parents: e5bb261 Author: hyukjinkwon Authored: Fri Jul 7 12:24:03 2017 +0800 Committer: Wenchen Fan Committed: Fri Jul 7 12:24:03 2017 +0800 -- .../spark/ml/source/libsvm/LibSVMRelation.scala | 26 ++-- .../org/apache/spark/mllib/util/MLUtils.scala | 25 +-- .../ml/source/libsvm/LibSVMRelationSuite.scala | 17 ++--- 3 files changed, 49 insertions(+), 19 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d451b7f4/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala index f68847a..dec1183 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} +import org.apache.spark.internal.Logging import org.apache.spark.TaskContext import org.apache.spark.ml.feature.LabeledPoint import org.apache.spark.ml.linalg.{Vectors, VectorUDT} @@ -66,7 +67,10 @@ private[libsvm] class LibSVMOutputWriter( /** @see [[LibSVMDataSource]] for public documentation. */ // If this is moved or renamed, please update DataSource's backwardCompatibilityMap. -private[libsvm] class LibSVMFileFormat extends TextBasedFileFormat with DataSourceRegister { +private[libsvm] class LibSVMFileFormat + extends TextBasedFileFormat + with DataSourceRegister + with Logging { override def shortName(): String = "libsvm" @@ -89,18 +93,14 @@ private[libsvm] class LibSVMFileFormat extends TextBasedFileFormat with DataSour files: Seq[FileStatus]): Option[StructType] = { val libSVMOptions = new LibSVMOptions(options) val numFeatures: Int = libSVMOptions.numFeatures.getOrElse { - // Infers number of features if the user doesn't specify (a valid) one. - val dataFiles = files.filterNot(_.getPath.getName startsWith "_") - val path = if (dataFiles.length == 1) { -dataFiles.head.getPath.toUri.toString - } else if (dataFiles.isEmpty) { -throw new IOException("No input path specified for libsvm data") - } else { -throw new IOException("Multiple input paths are not supported for libsvm data.") - } - - val sc = sparkSession.sparkContext - val parsed = MLUtils.parseLibSVMFile(sc, path, sc.defaultParallelism) + require(files.nonEmpty, "No input path specified for libsvm data") + logWarning( +"'numFeatures' option not specified, determining the number of features by going " + +"though the input. If you know the number in advance, please specify it via " + +"'numFeatures' option to avoid the extra scan.") + + val paths = files.map(_.getPath.toUri.toString) + val parsed = MLUtils.parseLibSVMFile(sparkSession, paths) MLUtils.computeNumFeatures(parsed) }
spark git commit: [SPARK-21329][SS] Make EventTimeWatermarkExec explicitly UnaryExecNode
Repository: spark Updated Branches: refs/heads/master 40c7add3a -> e5bb26174 [SPARK-21329][SS] Make EventTimeWatermarkExec explicitly UnaryExecNode ## What changes were proposed in this pull request? Making EventTimeWatermarkExec explicitly UnaryExecNode /cc tdas zsxwing ## How was this patch tested? Local build. Author: Jacek LaskowskiCloses #18509 from jaceklaskowski/EventTimeWatermarkExec-UnaryExecNode. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e5bb2617 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e5bb2617 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e5bb2617 Branch: refs/heads/master Commit: e5bb26174d3336e07dd670eec4fd2137df346163 Parents: 40c7add Author: Jacek Laskowski Authored: Thu Jul 6 18:11:41 2017 -0700 Committer: Shixiong Zhu Committed: Thu Jul 6 18:11:41 2017 -0700 -- .../spark/sql/execution/streaming/EventTimeWatermarkExec.scala | 6 ++ 1 file changed, 2 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e5bb2617/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala index 25cf609..87e5b78 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala @@ -21,7 +21,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection} import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.types.MetadataBuilder import org.apache.spark.unsafe.types.CalendarInterval import org.apache.spark.util.AccumulatorV2 @@ -81,7 +81,7 @@ class EventTimeStatsAccum(protected var currentStats: EventTimeStats = EventTime case class EventTimeWatermarkExec( eventTime: Attribute, delay: CalendarInterval, -child: SparkPlan) extends SparkPlan { +child: SparkPlan) extends UnaryExecNode { val eventTimeStats = new EventTimeStatsAccum() val delayMs = EventTimeWatermark.getDelayMs(delay) @@ -117,6 +117,4 @@ case class EventTimeWatermarkExec( a } } - - override def children: Seq[SparkPlan] = child :: Nil } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20946][SQL] Do not update conf for existing SparkContext in SparkSession.getOrCreate
Repository: spark Updated Branches: refs/heads/master 0217dfd26 -> 40c7add3a [SPARK-20946][SQL] Do not update conf for existing SparkContext in SparkSession.getOrCreate ## What changes were proposed in this pull request? SparkContext is shared by all sessions, we should not update its conf for only one session. ## How was this patch tested? existing tests Author: Wenchen FanCloses #18536 from cloud-fan/config. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/40c7add3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/40c7add3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/40c7add3 Branch: refs/heads/master Commit: 40c7add3a4c811202d1fa2be9606aca08df81266 Parents: 0217dfd Author: Wenchen Fan Authored: Fri Jul 7 08:44:31 2017 +0800 Committer: Wenchen Fan Committed: Fri Jul 7 08:44:31 2017 +0800 -- .../spark/ml/recommendation/ALSSuite.scala | 4 +--- .../apache/spark/ml/tree/impl/TreeTests.scala| 2 -- .../org/apache/spark/sql/SparkSession.scala | 19 +++ .../spark/sql/SparkSessionBuilderSuite.scala | 8 +++- 4 files changed, 11 insertions(+), 22 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/40c7add3/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala index 3094f52..b57fc8d 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala @@ -818,15 +818,13 @@ class ALSCleanerSuite extends SparkFunSuite { FileUtils.listFiles(localDir, TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE).asScala.toSet try { conf.set("spark.local.dir", localDir.getAbsolutePath) - val sc = new SparkContext("local[2]", "test", conf) + val sc = new SparkContext("local[2]", "ALSCleanerSuite", conf) try { sc.setCheckpointDir(checkpointDir.getAbsolutePath) // Generate test data val (training, _) = ALSSuite.genImplicitTestData(sc, 20, 5, 1, 0.2, 0) // Implicitly test the cleaning of parents during ALS training val spark = SparkSession.builder - .master("local[2]") - .appName("ALSCleanerSuite") .sparkContext(sc) .getOrCreate() import spark.implicits._ http://git-wip-us.apache.org/repos/asf/spark/blob/40c7add3/mllib/src/test/scala/org/apache/spark/ml/tree/impl/TreeTests.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/TreeTests.scala b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/TreeTests.scala index 92a2369..b6894b3 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/TreeTests.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/TreeTests.scala @@ -43,8 +43,6 @@ private[ml] object TreeTests extends SparkFunSuite { categoricalFeatures: Map[Int, Int], numClasses: Int): DataFrame = { val spark = SparkSession.builder() - .master("local[2]") - .appName("TreeTests") .sparkContext(data.sparkContext) .getOrCreate() import spark.implicits._ http://git-wip-us.apache.org/repos/asf/spark/blob/40c7add3/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 0ddcd21..6dfe8a6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -867,7 +867,7 @@ object SparkSession { * * @since 2.2.0 */ -def withExtensions(f: SparkSessionExtensions => Unit): Builder = { +def withExtensions(f: SparkSessionExtensions => Unit): Builder = synchronized { f(extensions) this } @@ -912,21 +912,16 @@ object SparkSession { // No active nor global default session. Create a new one. val sparkContext = userSuppliedContext.getOrElse { - // set app name if not given - val randomAppName = java.util.UUID.randomUUID().toString val sparkConf = new SparkConf() options.foreach { case (k, v) => sparkConf.set(k, v) } + + // set a random app name if not given. if (!sparkConf.contains("spark.app.name")) { -
spark git commit: [SPARK-21267][SS][DOCS] Update Structured Streaming Documentation
Repository: spark Updated Branches: refs/heads/branch-2.2 4e53a4edd -> 576fd4c3a [SPARK-21267][SS][DOCS] Update Structured Streaming Documentation ## What changes were proposed in this pull request? Few changes to the Structured Streaming documentation - Clarify that the entire stream input table is not materialized - Add information for Ganglia - Add Kafka Sink to the main docs - Removed a couple of leftover experimental tags - Added more associated reading material and talk videos. In addition, https://github.com/apache/spark/pull/16856 broke the link to the RDD programming guide in several places while renaming the page. This PR fixes those sameeragarwal cloud-fan. - Added a redirection to avoid breaking internal and possible external links. - Removed unnecessary redirection pages that were there since the separate scala, java, and python programming guides were merged together in 2013 or 2014. ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Tathagata DasCloses #18485 from tdas/SPARK-21267. (cherry picked from commit 0217dfd26f89133f146197359b556c9bf5aca172) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/576fd4c3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/576fd4c3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/576fd4c3 Branch: refs/heads/branch-2.2 Commit: 576fd4c3a67b4affc5ac50979e27ae929472f0d9 Parents: 4e53a4e Author: Tathagata Das Authored: Thu Jul 6 17:28:20 2017 -0700 Committer: Shixiong Zhu Committed: Thu Jul 6 17:28:28 2017 -0700 -- docs/_layouts/global.html | 7 +- docs/index.md | 13 +- docs/java-programming-guide.md | 7 - docs/programming-guide.md | 7 + docs/python-programming-guide.md| 7 - docs/rdd-programming-guide.md | 2 +- docs/scala-programming-guide.md | 7 - docs/sql-programming-guide.md | 16 +- docs/structured-streaming-programming-guide.md | 172 --- .../scala/org/apache/spark/sql/Dataset.scala| 3 - 10 files changed, 169 insertions(+), 72 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/576fd4c3/docs/_layouts/global.html -- diff --git a/docs/_layouts/global.html b/docs/_layouts/global.html index c00d0db..570483c 100755 --- a/docs/_layouts/global.html +++ b/docs/_layouts/global.html @@ -69,11 +69,10 @@ Programming Guides Quick Start -Spark Programming Guide - -Spark Streaming -DataFrames, Datasets and SQL +RDDs, Accumulators, Broadcasts Vars +SQL, DataFrames, and Datasets Structured Streaming +Spark Streaming (DStreams) MLlib (Machine Learning) GraphX (Graph Processing) SparkR (R on Spark) http://git-wip-us.apache.org/repos/asf/spark/blob/576fd4c3/docs/index.md -- diff --git a/docs/index.md b/docs/index.md index a757fa0..51641c9 100644 --- a/docs/index.md +++ b/docs/index.md @@ -88,13 +88,12 @@ options for deployment: **Programming Guides:** * [Quick Start](quick-start.html): a quick introduction to the Spark API; start here! -* [Spark Programming Guide](programming-guide.html): detailed overview of Spark - in all supported languages (Scala, Java, Python, R) -* Modules built on Spark: - * [Spark Streaming](streaming-programming-guide.html): processing real-time data streams - * [Spark SQL, Datasets, and DataFrames](sql-programming-guide.html): support for structured data and relational queries - * [MLlib](ml-guide.html): built-in machine learning library - * [GraphX](graphx-programming-guide.html): Spark's new API for graph processing +* [RDD Programming Guide](programming-guide.html): overview of Spark basics - RDDs (core but old API), accumulators, and broadcast variables +* [Spark SQL, Datasets, and
spark git commit: [SPARK-21323][SQL] Rename plans.logical.statsEstimation.Range to ValueInterval
Repository: spark Updated Branches: refs/heads/master 48e44b24a -> bf66335ac [SPARK-21323][SQL] Rename plans.logical.statsEstimation.Range to ValueInterval ## What changes were proposed in this pull request? Rename org.apache.spark.sql.catalyst.plans.logical.statsEstimation.Range to ValueInterval. The current naming is identical to logical operator "range". Refactoring it to ValueInterval is more accurate. ## How was this patch tested? unit test Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Wang GengliangCloses #18549 from gengliangwang/ValueInterval. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bf66335a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bf66335a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bf66335a Branch: refs/heads/master Commit: bf66335acab3c0c188f6c378eb8aa6948a259cb2 Parents: 48e44b2 Author: Wang Gengliang Authored: Thu Jul 6 13:58:27 2017 -0700 Committer: Reynold Xin Committed: Thu Jul 6 13:58:27 2017 -0700 -- .../statsEstimation/FilterEstimation.scala | 36 .../statsEstimation/JoinEstimation.scala| 14 +-- .../plans/logical/statsEstimation/Range.scala | 88 --- .../logical/statsEstimation/ValueInterval.scala | 91 4 files changed, 117 insertions(+), 112 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bf66335a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala index 5a3bee7..e13db85 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala @@ -316,8 +316,8 @@ case class FilterEstimation(plan: Filter) extends Logging { // decide if the value is in [min, max] of the column. // We currently don't store min/max for binary/string type. // Hence, we assume it is in boundary for binary/string type. -val statsRange = Range(colStat.min, colStat.max, attr.dataType) -if (statsRange.contains(literal)) { +val statsInterval = ValueInterval(colStat.min, colStat.max, attr.dataType) +if (statsInterval.contains(literal)) { if (update) { // We update ColumnStat structure after apply this equality predicate: // Set distinctCount to 1, nullCount to 0, and min/max values (if exist) to the literal @@ -388,9 +388,10 @@ case class FilterEstimation(plan: Filter) extends Logging { // use [min, max] to filter the original hSet dataType match { case _: NumericType | BooleanType | DateType | TimestampType => -val statsRange = Range(colStat.min, colStat.max, dataType).asInstanceOf[NumericRange] +val statsInterval = + ValueInterval(colStat.min, colStat.max, dataType).asInstanceOf[NumericValueInterval] val validQuerySet = hSet.filter { v => - v != null && statsRange.contains(Literal(v, dataType)) + v != null && statsInterval.contains(Literal(v, dataType)) } if (validQuerySet.isEmpty) { @@ -440,12 +441,13 @@ case class FilterEstimation(plan: Filter) extends Logging { update: Boolean): Option[BigDecimal] = { val colStat = colStatsMap(attr) -val statsRange = Range(colStat.min, colStat.max, attr.dataType).asInstanceOf[NumericRange] -val max = statsRange.max.toBigDecimal -val min = statsRange.min.toBigDecimal +val statsInterval = + ValueInterval(colStat.min, colStat.max, attr.dataType).asInstanceOf[NumericValueInterval] +val max = statsInterval.max.toBigDecimal +val min = statsInterval.min.toBigDecimal val ndv = BigDecimal(colStat.distinctCount) -// determine the overlapping degree between predicate range and column's range +// determine the overlapping degree between predicate interval and column's interval val numericLiteral = if (literal.dataType == BooleanType) { if (literal.value.asInstanceOf[Boolean]) BigDecimal(1) else BigDecimal(0) } else { @@ -566,18 +568,18 @@ case class FilterEstimation(plan: Filter) extends Logging { } val colStatLeft = colStatsMap(attrLeft) -val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, attrLeft.dataType) - .asInstanceOf[NumericRange] -val maxLeft =
spark git commit: [SPARK-21204][SQL] Add support for Scala Set collection types in serialization
Repository: spark Updated Branches: refs/heads/master 26ac085de -> 48e44b24a [SPARK-21204][SQL] Add support for Scala Set collection types in serialization ## What changes were proposed in this pull request? Currently we can't produce a `Dataset` containing `Set` in SparkSQL. This PR tries to support serialization/deserialization of `Set`. Because there's no corresponding internal data type in SparkSQL for a `Set`, the most proper choice for serializing a set should be an array. ## How was this patch tested? Added unit tests. Author: Liang-Chi HsiehCloses #18416 from viirya/SPARK-21204. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/48e44b24 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/48e44b24 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/48e44b24 Branch: refs/heads/master Commit: 48e44b24a7663142176102ac4c6bf4242f103804 Parents: 26ac085 Author: Liang-Chi Hsieh Authored: Fri Jul 7 01:07:45 2017 +0800 Committer: Wenchen Fan Committed: Fri Jul 7 01:07:45 2017 +0800 -- .../spark/sql/catalyst/ScalaReflection.scala| 28 -- .../catalyst/expressions/objects/objects.scala | 5 ++-- .../org/apache/spark/sql/SQLImplicits.scala | 10 +++ .../spark/sql/DataFrameAggregateSuite.scala | 10 +++ .../spark/sql/DatasetPrimitiveSuite.scala | 31 5 files changed, 79 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/48e44b24/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index 814f2c1..4d5401f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -309,7 +309,10 @@ object ScalaReflection extends ScalaReflection { Invoke(arrayData, primitiveMethod, arrayCls, returnNullable = false) } - case t if t <:< localTypeOf[Seq[_]] => + // We serialize a `Set` to Catalyst array. When we deserialize a Catalyst array + // to a `Set`, if there are duplicated elements, the elements will be de-duplicated. + case t if t <:< localTypeOf[Seq[_]] || + t <:< localTypeOf[scala.collection.Set[_]] => val TypeRef(_, _, Seq(elementType)) = t val Schema(dataType, elementNullable) = schemaFor(elementType) val className = getClassNameFromType(elementType) @@ -327,8 +330,10 @@ object ScalaReflection extends ScalaReflection { } val companion = t.normalize.typeSymbol.companionSymbol.typeSignature -val cls = companion.declaration(newTermName("newBuilder")) match { - case NoSymbol => classOf[Seq[_]] +val cls = companion.member(newTermName("newBuilder")) match { + case NoSymbol if t <:< localTypeOf[Seq[_]] => classOf[Seq[_]] + case NoSymbol if t <:< localTypeOf[scala.collection.Set[_]] => +classOf[scala.collection.Set[_]] case _ => mirror.runtimeClass(t.typeSymbol.asClass) } UnresolvedMapObjects(mapFunction, getPath, Some(cls)) @@ -502,6 +507,19 @@ object ScalaReflection extends ScalaReflection { serializerFor(_, valueType, valuePath, seenTypeSet), valueNullable = !valueType.typeSymbol.asClass.isPrimitive) + case t if t <:< localTypeOf[scala.collection.Set[_]] => +val TypeRef(_, _, Seq(elementType)) = t + +// There's no corresponding Catalyst type for `Set`, we serialize a `Set` to Catalyst array. +// Note that the property of `Set` is only kept when manipulating the data as domain object. +val newInput = + Invoke( + inputObject, + "toSeq", + ObjectType(classOf[Seq[_]])) + +toCatalystArray(newInput, elementType) + case t if t <:< localTypeOf[String] => StaticInvoke( classOf[UTF8String], @@ -713,6 +731,10 @@ object ScalaReflection extends ScalaReflection { val Schema(valueDataType, valueNullable) = schemaFor(valueType) Schema(MapType(schemaFor(keyType).dataType, valueDataType, valueContainsNull = valueNullable), nullable = true) + case t if t <:< localTypeOf[Set[_]] => +val TypeRef(_, _, Seq(elementType)) = t +val Schema(dataType, nullable) = schemaFor(elementType) +Schema(ArrayType(dataType, containsNull = nullable), nullable = true) case t if t <:< localTypeOf[String] =>
spark git commit: [SPARK-21228][SQL] InSet incorrect handling of structs
Repository: spark Updated Branches: refs/heads/master 565e7a8d4 -> 26ac085de [SPARK-21228][SQL] InSet incorrect handling of structs ## What changes were proposed in this pull request? When data type is struct, InSet now uses TypeUtils.getInterpretedOrdering (similar to EqualTo) to build a TreeSet. In other cases it will use a HashSet as before (which should be faster). Similarly, In.eval uses Ordering.equiv instead of equals. ## How was this patch tested? New test in SQLQuerySuite. Author: Bogdan RaducanuCloses #18455 from bogdanrdc/SPARK-21228. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/26ac085d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/26ac085d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/26ac085d Branch: refs/heads/master Commit: 26ac085debb54d0104762d1cd4187cdf73f301ba Parents: 565e7a8 Author: Bogdan Raducanu Authored: Fri Jul 7 01:04:57 2017 +0800 Committer: Wenchen Fan Committed: Fri Jul 7 01:04:57 2017 +0800 -- .../sql/catalyst/expressions/predicates.scala | 57 +--- .../catalyst/expressions/PredicateSuite.scala | 31 ++- .../catalyst/optimizer/OptimizeInSuite.scala| 2 +- .../org/apache/spark/sql/SQLQuerySuite.scala| 22 4 files changed, 78 insertions(+), 34 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/26ac085d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index f3fe58c..7bf10f1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -17,10 +17,11 @@ package org.apache.spark.sql.catalyst.expressions +import scala.collection.immutable.TreeSet + import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TypeCheckResult -import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} -import org.apache.spark.sql.catalyst.expressions.codegen.{Predicate => BasePredicate} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, GenerateSafeProjection, GenerateUnsafeProjection, Predicate => BasePredicate} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.TypeUtils import org.apache.spark.sql.types._ @@ -175,20 +176,23 @@ case class In(value: Expression, list: Seq[Expression]) extends Predicate { |[${sub.output.map(_.dataType.catalogString).mkString(", ")}]. """.stripMargin) } else { -TypeCheckResult.TypeCheckSuccess +TypeUtils.checkForOrderingExpr(value.dataType, s"function $prettyName") } } case _ => -if (list.exists(l => l.dataType != value.dataType)) { - TypeCheckResult.TypeCheckFailure("Arguments must be same type") +val mismatchOpt = list.find(l => l.dataType != value.dataType) +if (mismatchOpt.isDefined) { + TypeCheckResult.TypeCheckFailure(s"Arguments must be same type but were: " + +s"${value.dataType} != ${mismatchOpt.get.dataType}") } else { - TypeCheckResult.TypeCheckSuccess + TypeUtils.checkForOrderingExpr(value.dataType, s"function $prettyName") } } } override def children: Seq[Expression] = value +: list lazy val inSetConvertible = list.forall(_.isInstanceOf[Literal]) + private lazy val ordering = TypeUtils.getInterpretedOrdering(value.dataType) override def nullable: Boolean = children.exists(_.nullable) override def foldable: Boolean = children.forall(_.foldable) @@ -203,10 +207,10 @@ case class In(value: Expression, list: Seq[Expression]) extends Predicate { var hasNull = false list.foreach { e => val v = e.eval(input) -if (v == evaluatedValue) { - return true -} else if (v == null) { +if (v == null) { hasNull = true +} else if (ordering.equiv(v, evaluatedValue)) { + return true } } if (hasNull) { @@ -265,7 +269,7 @@ case class InSet(child: Expression, hset: Set[Any]) extends UnaryExpression with override def nullable: Boolean = child.nullable || hasNull protected override def nullSafeEval(value: Any): Any = { -if (hset.contains(value)) { +if (set.contains(value)) { true
spark git commit: [SPARK-20950][CORE] add a new config to diskWriteBufferSize which is hard coded before
Repository: spark Updated Branches: refs/heads/master d540dfbff -> 565e7a8d4 [SPARK-20950][CORE] add a new config to diskWriteBufferSize which is hard coded before ## What changes were proposed in this pull request? This PR Improvement in two: 1.With spark.shuffle.spill.diskWriteBufferSize configure diskWriteBufferSize of ShuffleExternalSorter. when change the size of the diskWriteBufferSize to test `forceSorterToSpill` The average performance of running 10 times is as follows:(their unit is MS). ``` diskWriteBufferSize: 1M512K256K128K64K32K16K 8K4K --- RecordSize = 2.5M 742 722 694 686 667668671 669 683 RecordSize = 1M294 293 292 287 283285281 279 285 ``` 2.Remove outputBufferSizeInBytes and inputBufferSizeInBytes to initialize in mergeSpillsWithFileStream function. ## How was this patch tested? The unit test. Author: caoxuewenCloses #18174 from heary-cao/buffersize. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/565e7a8d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/565e7a8d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/565e7a8d Branch: refs/heads/master Commit: 565e7a8d4ae7879ee704fb94ae9b3da31e202d7e Parents: d540dfb Author: caoxuewen Authored: Thu Jul 6 19:49:34 2017 +0800 Committer: Wenchen Fan Committed: Thu Jul 6 19:49:34 2017 +0800 -- .../shuffle/sort/ShuffleExternalSorter.java | 11 +--- .../spark/shuffle/sort/UnsafeShuffleWriter.java | 14 +++--- .../unsafe/sort/UnsafeSorterSpillWriter.java| 24 ++--- .../apache/spark/internal/config/package.scala | 27 4 files changed, 60 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/565e7a8d/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java -- diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java index c33d1e3..338faaa 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java @@ -43,6 +43,7 @@ import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.array.LongArray; import org.apache.spark.unsafe.memory.MemoryBlock; import org.apache.spark.util.Utils; +import org.apache.spark.internal.config.package$; /** * An external sorter that is specialized for sort-based shuffle. @@ -82,6 +83,9 @@ final class ShuffleExternalSorter extends MemoryConsumer { /** The buffer size to use when writing spills using DiskBlockObjectWriter */ private final int fileBufferSizeBytes; + /** The buffer size to use when writing the sorted records to an on-disk file */ + private final int diskWriteBufferSize; + /** * Memory pages that hold the records being sorted. The pages in this list are freed when * spilling, although in principle we could recycle these pages across spills (on the other hand, @@ -116,13 +120,14 @@ final class ShuffleExternalSorter extends MemoryConsumer { this.taskContext = taskContext; this.numPartitions = numPartitions; // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided -this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024; +this.fileBufferSizeBytes = (int) (long) conf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024; this.numElementsForSpillThreshold = conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", 1024 * 1024 * 1024); this.writeMetrics = writeMetrics; this.inMemSorter = new ShuffleInMemorySorter( this, initialSize, conf.getBoolean("spark.shuffle.sort.useRadixSort", true)); this.peakMemoryUsedBytes = getMemoryUsage(); +this.diskWriteBufferSize = (int) (long) conf.get(package$.MODULE$.SHUFFLE_DISK_WRITE_BUFFER_SIZE()); } /** @@ -155,7 +160,7 @@ final class ShuffleExternalSorter extends MemoryConsumer { // be an API to directly transfer bytes from managed memory to the disk writer, we buffer // data through a byte array. This array does not need to be large enough to hold a single // record; -final byte[] writeBuffer = new byte[DISK_WRITE_BUFFER_SIZE]; +final byte[] writeBuffer = new byte[diskWriteBufferSize]; // Because this output will be read during shuffle, its compression codec
spark git commit: [SPARK-21273][SQL][FOLLOW-UP] Add missing test cases back and revise code style
Repository: spark Updated Branches: refs/heads/master b8e4d567a -> d540dfbff [SPARK-21273][SQL][FOLLOW-UP] Add missing test cases back and revise code style ## What changes were proposed in this pull request? Add missing test cases back and revise code style Follow up the previous PR: https://github.com/apache/spark/pull/18479 ## How was this patch tested? Unit test Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Wang GengliangCloses #18548 from gengliangwang/stat_propagation_revise. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d540dfbf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d540dfbf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d540dfbf Branch: refs/heads/master Commit: d540dfbff33aa2f8571e0de149dfa3f4e7321113 Parents: b8e4d56 Author: Wang Gengliang Authored: Thu Jul 6 19:12:15 2017 +0800 Committer: Wenchen Fan Committed: Thu Jul 6 19:12:15 2017 +0800 -- .../plans/logical/LogicalPlanVisitor.scala | 2 +- .../BasicStatsEstimationSuite.scala | 45 2 files changed, 46 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d540dfbf/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanVisitor.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanVisitor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanVisitor.scala index b230458..2652f6d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanVisitor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanVisitor.scala @@ -38,10 +38,10 @@ trait LogicalPlanVisitor[T] { case p: Range => visitRange(p) case p: Repartition => visitRepartition(p) case p: RepartitionByExpression => visitRepartitionByExpr(p) +case p: ResolvedHint => visitHint(p) case p: Sample => visitSample(p) case p: ScriptTransformation => visitScriptTransform(p) case p: Union => visitUnion(p) -case p: ResolvedHint => visitHint(p) case p: LogicalPlan => default(p) } http://git-wip-us.apache.org/repos/asf/spark/blob/d540dfbf/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala index 5fd21a0..913be6d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala @@ -78,6 +78,37 @@ class BasicStatsEstimationSuite extends PlanTest with StatsEstimationTestBase { checkStats(globalLimit, stats) } + test("sample estimation") { +val sample = Sample(0.0, 0.5, withReplacement = false, (math.random * 1000).toLong, plan) +checkStats(sample, Statistics(sizeInBytes = 60, rowCount = Some(5))) + +// Child doesn't have rowCount in stats +val childStats = Statistics(sizeInBytes = 120) +val childPlan = DummyLogicalPlan(childStats, childStats) +val sample2 = + Sample(0.0, 0.11, withReplacement = false, (math.random * 1000).toLong, childPlan) +checkStats(sample2, Statistics(sizeInBytes = 14)) + } + + test("estimate statistics when the conf changes") { +val expectedDefaultStats = + Statistics( +sizeInBytes = 40, +rowCount = Some(10), +attributeStats = AttributeMap(Seq( + AttributeReference("c1", IntegerType)() -> ColumnStat(10, Some(1), Some(10), 0, 4, 4 +val expectedCboStats = + Statistics( +sizeInBytes = 4, +rowCount = Some(1), +attributeStats = AttributeMap(Seq( + AttributeReference("c1", IntegerType)() -> ColumnStat(1, Some(5), Some(5), 0, 4, 4 + +val plan = DummyLogicalPlan(defaultStats = expectedDefaultStats, cboStats = expectedCboStats) +checkStats( + plan, expectedStatsCboOn = expectedCboStats, expectedStatsCboOff = expectedDefaultStats) + } + /** Check estimated stats when cbo is turned on/off. */ private def checkStats( plan: LogicalPlan, @@ -99,3 +130,17 @@ class BasicStatsEstimationSuite extends PlanTest with StatsEstimationTestBase { private def checkStats(plan: LogicalPlan,
spark-website git commit: Update Sandy.
Repository: spark-website Updated Branches: refs/heads/asf-site 8a9ae6b7d -> 878dcfd84 Update Sandy. Project: http://git-wip-us.apache.org/repos/asf/spark-website/repo Commit: http://git-wip-us.apache.org/repos/asf/spark-website/commit/878dcfd8 Tree: http://git-wip-us.apache.org/repos/asf/spark-website/tree/878dcfd8 Diff: http://git-wip-us.apache.org/repos/asf/spark-website/diff/878dcfd8 Branch: refs/heads/asf-site Commit: 878dcfd8422f92842a016933d7ce2c8dfb25f46e Parents: 8a9ae6b Author: Dongjoon HyunAuthored: Wed Jul 5 00:08:01 2017 -0700 Committer: Dongjoon Hyun Committed: Wed Jul 5 00:08:01 2017 -0700 -- committers.md| 2 +- site/committers.html | 8 2 files changed, 5 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark-website/blob/878dcfd8/committers.md -- diff --git a/committers.md b/committers.md index c7bcca1..e850f8b 100644 --- a/committers.md +++ b/committers.md @@ -46,7 +46,7 @@ navigation: |Imran Rashid|Cloudera| |Charles Reiss|UC Berkeley| |Josh Rosen|Databricks| -|Sandy Ryza|Clover Health| +|Sandy Ryza|Remix| |Kousuke Saruta|NTT Data| |Prashant Sharma|IBM| |Ram Sriharsha|Databricks| http://git-wip-us.apache.org/repos/asf/spark-website/blob/878dcfd8/site/committers.html -- diff --git a/site/committers.html b/site/committers.html index 2999465..4136677 100644 --- a/site/committers.html +++ b/site/committers.html @@ -214,7 +214,7 @@ Felix Cheung - Automattic + Microsoft Mosharaf Chowdhury @@ -294,7 +294,7 @@ Xiao Li - IBM + Databricks Davies Liu @@ -350,7 +350,7 @@ Sandy Ryza - Clover Health + Remix Kousuke Saruta @@ -370,7 +370,7 @@ Takuya Ueshin - + Databricks Marcelo Vanzin - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark-website git commit: Update committer page.
Repository: spark-website Updated Branches: refs/heads/asf-site 9749c8e2f -> 8a9ae6b7d Update committer page. Project: http://git-wip-us.apache.org/repos/asf/spark-website/repo Commit: http://git-wip-us.apache.org/repos/asf/spark-website/commit/8a9ae6b7 Tree: http://git-wip-us.apache.org/repos/asf/spark-website/tree/8a9ae6b7 Diff: http://git-wip-us.apache.org/repos/asf/spark-website/diff/8a9ae6b7 Branch: refs/heads/asf-site Commit: 8a9ae6b7d17d3a8e8879a37f8fca1fe9a28edc12 Parents: 9749c8e Author: Dongjoon HyunAuthored: Tue Jul 4 23:44:01 2017 -0700 Committer: Dongjoon Hyun Committed: Tue Jul 4 23:44:01 2017 -0700 -- committers.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark-website/blob/8a9ae6b7/committers.md -- diff --git a/committers.md b/committers.md index a97bb72..c7bcca1 100644 --- a/committers.md +++ b/committers.md @@ -12,7 +12,7 @@ navigation: ||| |Michael Armbrust|Databricks| |Joseph Bradley|Databricks| -|Felix Cheung|Automattic| +|Felix Cheung|Microsoft| |Mosharaf Chowdhury|University of Michigan, Ann Arbor| |Jason Dai|Intel| |Tathagata Das|Databricks| @@ -32,7 +32,7 @@ navigation: |Andy Konwinski|Databricks| |Ryan LeCompte|Quantifind| |Haoyuan Li|Alluxio, UC Berkeley| -|Xiao Li|IBM| +|Xiao Li|Databricks| |Davies Liu|Databricks| |Cheng Lian|Databricks| |Yanbo Liang|Hortonworks| @@ -51,7 +51,7 @@ navigation: |Prashant Sharma|IBM| |Ram Sriharsha|Databricks| |DB Tsai|Netflix| -|Takuya Ueshin|| +|Takuya Ueshin|Databricks| |Marcelo Vanzin|Cloudera| |Shivaram Venkataraman|UC Berkeley| |Patrick Wendell|Databricks| - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21324][TEST] Improve statistics test suites
Repository: spark Updated Branches: refs/heads/master 6ff05a66f -> b8e4d567a [SPARK-21324][TEST] Improve statistics test suites ## What changes were proposed in this pull request? 1. move `StatisticsCollectionTestBase` to a separate file. 2. move some test cases to `StatisticsCollectionSuite` so that `hive/StatisticsSuite` only keeps tests that need hive support. 3. clear up some test cases. ## How was this patch tested? Existing tests. Author: wangzhenhuaAuthor: Zhenhua Wang Closes #18545 from wzhfy/cleanStatSuites. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b8e4d567 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b8e4d567 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b8e4d567 Branch: refs/heads/master Commit: b8e4d567a7d6c2ff277700d4e7707e57e87c7808 Parents: 6ff05a66 Author: wangzhenhua Authored: Thu Jul 6 16:00:31 2017 +0800 Committer: Wenchen Fan Committed: Thu Jul 6 16:00:31 2017 +0800 -- .../spark/sql/StatisticsCollectionSuite.scala | 193 --- .../sql/StatisticsCollectionTestBase.scala | 192 ++ .../apache/spark/sql/hive/StatisticsSuite.scala | 124 3 files changed, 258 insertions(+), 251 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b8e4d567/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala index d9392de..843ced7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala @@ -17,19 +17,12 @@ package org.apache.spark.sql -import java.{lang => jl} -import java.sql.{Date, Timestamp} - import scala.collection.mutable -import scala.util.Random import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics} import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} -import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.test.SQLTestData.ArrayData import org.apache.spark.sql.types._ @@ -58,6 +51,37 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared } } + test("analyzing views is not supported") { +def assertAnalyzeUnsupported(analyzeCommand: String): Unit = { + val err = intercept[AnalysisException] { +sql(analyzeCommand) + } + assert(err.message.contains("ANALYZE TABLE is not supported")) +} + +val tableName = "tbl" +withTable(tableName) { + spark.range(10).write.saveAsTable(tableName) + val viewName = "view" + withView(viewName) { +sql(s"CREATE VIEW $viewName AS SELECT * FROM $tableName") +assertAnalyzeUnsupported(s"ANALYZE TABLE $viewName COMPUTE STATISTICS") +assertAnalyzeUnsupported(s"ANALYZE TABLE $viewName COMPUTE STATISTICS FOR COLUMNS id") + } +} + } + + test("statistics collection of a table with zero column") { +val table_no_cols = "table_no_cols" +withTable(table_no_cols) { + val rddNoCols = sparkContext.parallelize(1 to 10).map(_ => Row.empty) + val dfNoCols = spark.createDataFrame(rddNoCols, StructType(Seq.empty)) + dfNoCols.write.format("json").saveAsTable(table_no_cols) + sql(s"ANALYZE TABLE $table_no_cols COMPUTE STATISTICS") + checkTableStats(table_no_cols, hasSizeInBytes = true, expectedRowCounts = Some(10)) +} + } + test("analyze column command - unsupported types and invalid columns") { val tableName = "column_stats_test1" withTable(tableName) { @@ -239,154 +263,3 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared } } - - -/** - * The base for test cases that we want to include in both the hive module (for verifying behavior - * when using the Hive external catalog) as well as in the sql/core module. - */ -abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils { - import testImplicits._ - - private val dec1 = new java.math.BigDecimal("1.00") - private val dec2 = new java.math.BigDecimal("8.00") - private val d1 =
spark git commit: [SPARK-20703][SQL] Associate metrics with data writes onto DataFrameWriter operations
Repository: spark Updated Branches: refs/heads/master 5800144a5 -> 6ff05a66f [SPARK-20703][SQL] Associate metrics with data writes onto DataFrameWriter operations ## What changes were proposed in this pull request? Right now in the UI, after SPARK-20213, we can show the operations to write data out. However, there is no way to associate metrics with data writes. We should show relative metrics on the operations. Supported commands This change supports updating metrics for file-based data writing operations, including `InsertIntoHadoopFsRelationCommand`, `InsertIntoHiveTable`. Supported metrics: * number of written files * number of dynamic partitions * total bytes of written data * total number of output rows * average writing data out time (ms) * (TODO) min/med/max number of output rows per file/partition * (TODO) min/med/max bytes of written data per file/partition Commands not supported `InsertIntoDataSourceCommand`, `SaveIntoDataSourceCommand`: The two commands uses DataSource APIs to write data out, i.e., the logic of writing data out is delegated to the DataSource implementations, such as `InsertableRelation.insert` and `CreatableRelationProvider.createRelation`. So we can't obtain metrics from delegated methods for now. `CreateHiveTableAsSelectCommand`, `CreateDataSourceTableAsSelectCommand` : The two commands invokes other commands to write data out. The invoked commands can even write to non file-based data source. We leave them as future TODO. How to update metrics of writing files out A `RunnableCommand` which wants to update metrics, needs to override its `metrics` and provide the metrics data structure to `ExecutedCommandExec`. The metrics are prepared during the execution of `FileFormatWriter`. The callback function passed to `FileFormatWriter` will accept the metrics and update accordingly. There is a metrics updating function in `RunnableCommand`. In runtime, the function will be bound to the spark context and `metrics` of `ExecutedCommandExec` and pass to `FileFormatWriter`. ## How was this patch tested? Updated unit tests. Author: Liang-Chi HsiehCloses #18159 from viirya/SPARK-20703-2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6ff05a66 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6ff05a66 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6ff05a66 Branch: refs/heads/master Commit: 6ff05a66fe83e721063efe5c28d2ffeb850fecc7 Parents: 5800144 Author: Liang-Chi Hsieh Authored: Thu Jul 6 15:47:09 2017 +0800 Committer: Wenchen Fan Committed: Thu Jul 6 15:47:09 2017 +0800 -- .../scala/org/apache/spark/util/Utils.scala | 9 ++ .../execution/command/DataWritingCommand.scala | 75 ++ .../spark/sql/execution/command/commands.scala | 12 ++ .../datasources/FileFormatWriter.scala | 121 +--- .../InsertIntoHadoopFsRelationCommand.scala | 18 ++- .../sql/sources/PartitionedWriteSuite.scala | 21 +-- .../hive/execution/InsertIntoHiveTable.scala| 8 +- .../sql/hive/execution/SQLMetricsSuite.scala| 139 +++ 8 files changed, 362 insertions(+), 41 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6ff05a66/core/src/main/scala/org/apache/spark/util/Utils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 26f61e2..b4caf68 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1003,6 +1003,15 @@ private[spark] object Utils extends Logging { } /** + * Lists files recursively. + */ + def recursiveList(f: File): Array[File] = { +require(f.isDirectory) +val current = f.listFiles +current ++ current.filter(_.isDirectory).flatMap(recursiveList) + } + + /** * Delete a file or directory and its contents recursively. * Don't follow directories if they are symlinks. * Throws an exception if deletion is unsuccessful. http://git-wip-us.apache.org/repos/asf/spark/blob/6ff05a66/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala new file mode 100644 index 000..0c381a2 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache
spark git commit: [SPARK-21012][SUBMIT] Add glob support for resources adding to Spark
Repository: spark Updated Branches: refs/heads/master 60043f224 -> 5800144a5 [SPARK-21012][SUBMIT] Add glob support for resources adding to Spark Current "--jars (spark.jars)", "--files (spark.files)", "--py-files (spark.submit.pyFiles)" and "--archives (spark.yarn.dist.archives)" only support non-glob path. This is OK for most of the cases, but when user requires to add more jars, files into Spark, it is too verbose to list one by one. So here propose to add glob path support for resources. Also improving the code of downloading resources. ## How was this patch tested? UT added, also verified manually in local cluster. Author: jerryshaoCloses #18235 from jerryshao/SPARK-21012. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5800144a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5800144a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5800144a Branch: refs/heads/master Commit: 5800144a54f5c0180ccf67392f32c3e8a51119b1 Parents: 60043f2 Author: jerryshao Authored: Thu Jul 6 15:32:49 2017 +0800 Committer: Wenchen Fan Committed: Thu Jul 6 15:32:49 2017 +0800 -- .../org/apache/spark/deploy/SparkSubmit.scala | 166 +++ .../spark/deploy/SparkSubmitArguments.scala | 2 +- .../apache/spark/deploy/SparkSubmitSuite.scala | 68 +++- docs/configuration.md | 6 +- 4 files changed, 196 insertions(+), 46 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5800144a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index d13fb41..abde040 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -17,17 +17,21 @@ package org.apache.spark.deploy -import java.io.{File, IOException} +import java.io._ import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowableException} import java.net.URL import java.nio.file.Files -import java.security.PrivilegedExceptionAction +import java.security.{KeyStore, PrivilegedExceptionAction} +import java.security.cert.X509Certificate import java.text.ParseException +import javax.net.ssl._ import scala.annotation.tailrec import scala.collection.mutable.{ArrayBuffer, HashMap, Map} import scala.util.Properties +import com.google.common.io.ByteStreams +import org.apache.commons.io.FileUtils import org.apache.commons.lang3.StringUtils import org.apache.hadoop.conf.{Configuration => HadoopConfiguration} import org.apache.hadoop.fs.{FileSystem, Path} @@ -310,33 +314,33 @@ object SparkSubmit extends CommandLineUtils { RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose) } -// In client mode, download remote files. -if (deployMode == CLIENT) { - val hadoopConf = new HadoopConfiguration() - args.primaryResource = Option(args.primaryResource).map(downloadFile(_, hadoopConf)).orNull - args.jars = Option(args.jars).map(downloadFileList(_, hadoopConf)).orNull - args.pyFiles = Option(args.pyFiles).map(downloadFileList(_, hadoopConf)).orNull - args.files = Option(args.files).map(downloadFileList(_, hadoopConf)).orNull -} - -// Require all python files to be local, so we can add them to the PYTHONPATH -// In YARN cluster mode, python files are distributed as regular files, which can be non-local. -// In Mesos cluster mode, non-local python files are automatically downloaded by Mesos. -if (args.isPython && !isYarnCluster && !isMesosCluster) { - if (Utils.nonLocalPaths(args.primaryResource).nonEmpty) { -printErrorAndExit(s"Only local python files are supported: ${args.primaryResource}") +val hadoopConf = new HadoopConfiguration() +val targetDir = Files.createTempDirectory("tmp").toFile +// scalastyle:off runtimeaddshutdownhook +Runtime.getRuntime.addShutdownHook(new Thread() { + override def run(): Unit = { +FileUtils.deleteQuietly(targetDir) } - val nonLocalPyFiles = Utils.nonLocalPaths(args.pyFiles).mkString(",") - if (nonLocalPyFiles.nonEmpty) { -printErrorAndExit(s"Only local additional python files are supported: $nonLocalPyFiles") - } -} +}) +// scalastyle:on runtimeaddshutdownhook -// Require all R files to be local -if (args.isR && !isYarnCluster && !isMesosCluster) { - if (Utils.nonLocalPaths(args.primaryResource).nonEmpty) { -printErrorAndExit(s"Only local R files are
spark git commit: [SS][MINOR] Fix flaky test in DatastreamReaderWriterSuite. temp checkpoint dir should be deleted
Repository: spark Updated Branches: refs/heads/branch-2.2 6e1081cbe -> 4e53a4edd [SS][MINOR] Fix flaky test in DatastreamReaderWriterSuite. temp checkpoint dir should be deleted ## What changes were proposed in this pull request? Stopping query while it is being initialized can throw interrupt exception, in which case temporary checkpoint directories will not be deleted, and the test will fail. Author: Tathagata DasCloses #18442 from tdas/DatastreamReaderWriterSuite-fix. (cherry picked from commit 60043f22458668ac7ecba94fa78953f23a6bdcec) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4e53a4ed Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4e53a4ed Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4e53a4ed Branch: refs/heads/branch-2.2 Commit: 4e53a4edd72e372583f243c660bbcc0572205716 Parents: 6e1081c Author: Tathagata Das Authored: Thu Jul 6 00:20:26 2017 -0700 Committer: Tathagata Das Committed: Thu Jul 6 00:20:40 2017 -0700 -- .../spark/sql/streaming/test/DataStreamReaderWriterSuite.scala | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4e53a4ed/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index dc2506a..bae9d81 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -641,6 +641,7 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { test("temp checkpoint dir should be deleted if a query is stopped without errors") { import testImplicits._ val query = MemoryStream[Int].toDS.writeStream.format("console").start() +query.processAllAvailable() val checkpointDir = new Path( query.asInstanceOf[StreamingQueryWrapper].streamingQuery.checkpointRoot) val fs = checkpointDir.getFileSystem(spark.sessionState.newHadoopConf()) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SS][MINOR] Fix flaky test in DatastreamReaderWriterSuite. temp checkpoint dir should be deleted
Repository: spark Updated Branches: refs/heads/master 14a3bb3a0 -> 60043f224 [SS][MINOR] Fix flaky test in DatastreamReaderWriterSuite. temp checkpoint dir should be deleted ## What changes were proposed in this pull request? Stopping query while it is being initialized can throw interrupt exception, in which case temporary checkpoint directories will not be deleted, and the test will fail. Author: Tathagata DasCloses #18442 from tdas/DatastreamReaderWriterSuite-fix. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/60043f22 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/60043f22 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/60043f22 Branch: refs/heads/master Commit: 60043f22458668ac7ecba94fa78953f23a6bdcec Parents: 14a3bb3 Author: Tathagata Das Authored: Thu Jul 6 00:20:26 2017 -0700 Committer: Tathagata Das Committed: Thu Jul 6 00:20:26 2017 -0700 -- .../spark/sql/streaming/test/DataStreamReaderWriterSuite.scala | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/60043f22/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index 3de0ae6..e8a6202 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -641,6 +641,7 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { test("temp checkpoint dir should be deleted if a query is stopped without errors") { import testImplicits._ val query = MemoryStream[Int].toDS.writeStream.format("console").start() +query.processAllAvailable() val checkpointDir = new Path( query.asInstanceOf[StreamingQueryWrapper].streamingQuery.resolvedCheckpointRoot) val fs = checkpointDir.getFileSystem(spark.sessionState.newHadoopConf()) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21312][SQL] correct offsetInBytes in UnsafeRow.writeToStream
Repository: spark Updated Branches: refs/heads/branch-2.1 8f1ca6957 -> 7f7b63bb6 [SPARK-21312][SQL] correct offsetInBytes in UnsafeRow.writeToStream ## What changes were proposed in this pull request? Corrects offsetInBytes calculation in UnsafeRow.writeToStream. Known failures include writes to some DataSources that have own SparkPlan implementations and cause EXCHANGE in writes. ## How was this patch tested? Extended UnsafeRowSuite.writeToStream to include an UnsafeRow over byte array having non-zero offset. Author: Sumedh WaleCloses #18535 from sumwale/SPARK-21312. (cherry picked from commit 14a3bb3a008c302aac908d7deaf0942a98c63be7) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7f7b63bb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7f7b63bb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7f7b63bb Branch: refs/heads/branch-2.1 Commit: 7f7b63bb634c3b89db80cee99848ee94f9dca6ba Parents: 8f1ca69 Author: Sumedh Wale Authored: Thu Jul 6 14:47:22 2017 +0800 Committer: Wenchen Fan Committed: Thu Jul 6 14:48:12 2017 +0800 -- .../spark/sql/catalyst/expressions/UnsafeRow.java | 2 +- .../scala/org/apache/spark/sql/UnsafeRowSuite.scala| 13 + 2 files changed, 14 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7f7b63bb/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java -- diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java index d205547..b8e9388 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java @@ -550,7 +550,7 @@ public final class UnsafeRow extends InternalRow implements Externalizable, Kryo */ public void writeToStream(OutputStream out, byte[] writeBuffer) throws IOException { if (baseObject instanceof byte[]) { - int offsetInByteArray = (int) (Platform.BYTE_ARRAY_OFFSET - baseOffset); + int offsetInByteArray = (int) (baseOffset - Platform.BYTE_ARRAY_OFFSET); out.write((byte[]) baseObject, offsetInByteArray, sizeInBytes); } else { int dataRemaining = sizeInBytes; http://git-wip-us.apache.org/repos/asf/spark/blob/7f7b63bb/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala index a32763d..a5f904c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala @@ -101,9 +101,22 @@ class UnsafeRowSuite extends SparkFunSuite { MemoryAllocator.UNSAFE.free(offheapRowPage) } } +val (bytesFromArrayBackedRowWithOffset, field0StringFromArrayBackedRowWithOffset) = { + val baos = new ByteArrayOutputStream() + val numBytes = arrayBackedUnsafeRow.getSizeInBytes + val bytesWithOffset = new Array[Byte](numBytes + 100) + System.arraycopy(arrayBackedUnsafeRow.getBaseObject.asInstanceOf[Array[Byte]], 0, +bytesWithOffset, 100, numBytes) + val arrayBackedRow = new UnsafeRow(arrayBackedUnsafeRow.numFields()) + arrayBackedRow.pointTo(bytesWithOffset, Platform.BYTE_ARRAY_OFFSET + 100, numBytes) + arrayBackedRow.writeToStream(baos, null) + (baos.toByteArray, arrayBackedRow.getString(0)) +} assert(bytesFromArrayBackedRow === bytesFromOffheapRow) assert(field0StringFromArrayBackedRow === field0StringFromOffheapRow) +assert(bytesFromArrayBackedRow === bytesFromArrayBackedRowWithOffset) +assert(field0StringFromArrayBackedRow === field0StringFromArrayBackedRowWithOffset) } test("calling getDouble() and getFloat() on null columns") { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21312][SQL] correct offsetInBytes in UnsafeRow.writeToStream
Repository: spark Updated Branches: refs/heads/branch-2.2 770fd2a23 -> 6e1081cbe [SPARK-21312][SQL] correct offsetInBytes in UnsafeRow.writeToStream ## What changes were proposed in this pull request? Corrects offsetInBytes calculation in UnsafeRow.writeToStream. Known failures include writes to some DataSources that have own SparkPlan implementations and cause EXCHANGE in writes. ## How was this patch tested? Extended UnsafeRowSuite.writeToStream to include an UnsafeRow over byte array having non-zero offset. Author: Sumedh WaleCloses #18535 from sumwale/SPARK-21312. (cherry picked from commit 14a3bb3a008c302aac908d7deaf0942a98c63be7) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6e1081cb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6e1081cb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6e1081cb Branch: refs/heads/branch-2.2 Commit: 6e1081cbeac58826526b6ff7f2938a556b31ca9e Parents: 770fd2a Author: Sumedh Wale Authored: Thu Jul 6 14:47:22 2017 +0800 Committer: Wenchen Fan Committed: Thu Jul 6 14:47:43 2017 +0800 -- .../spark/sql/catalyst/expressions/UnsafeRow.java | 2 +- .../scala/org/apache/spark/sql/UnsafeRowSuite.scala| 13 + 2 files changed, 14 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6e1081cb/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java -- diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java index 86de909..56994fa 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java @@ -550,7 +550,7 @@ public final class UnsafeRow extends InternalRow implements Externalizable, Kryo */ public void writeToStream(OutputStream out, byte[] writeBuffer) throws IOException { if (baseObject instanceof byte[]) { - int offsetInByteArray = (int) (Platform.BYTE_ARRAY_OFFSET - baseOffset); + int offsetInByteArray = (int) (baseOffset - Platform.BYTE_ARRAY_OFFSET); out.write((byte[]) baseObject, offsetInByteArray, sizeInBytes); } else { int dataRemaining = sizeInBytes; http://git-wip-us.apache.org/repos/asf/spark/blob/6e1081cb/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala index a32763d..a5f904c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala @@ -101,9 +101,22 @@ class UnsafeRowSuite extends SparkFunSuite { MemoryAllocator.UNSAFE.free(offheapRowPage) } } +val (bytesFromArrayBackedRowWithOffset, field0StringFromArrayBackedRowWithOffset) = { + val baos = new ByteArrayOutputStream() + val numBytes = arrayBackedUnsafeRow.getSizeInBytes + val bytesWithOffset = new Array[Byte](numBytes + 100) + System.arraycopy(arrayBackedUnsafeRow.getBaseObject.asInstanceOf[Array[Byte]], 0, +bytesWithOffset, 100, numBytes) + val arrayBackedRow = new UnsafeRow(arrayBackedUnsafeRow.numFields()) + arrayBackedRow.pointTo(bytesWithOffset, Platform.BYTE_ARRAY_OFFSET + 100, numBytes) + arrayBackedRow.writeToStream(baos, null) + (baos.toByteArray, arrayBackedRow.getString(0)) +} assert(bytesFromArrayBackedRow === bytesFromOffheapRow) assert(field0StringFromArrayBackedRow === field0StringFromOffheapRow) +assert(bytesFromArrayBackedRow === bytesFromArrayBackedRowWithOffset) +assert(field0StringFromArrayBackedRow === field0StringFromArrayBackedRowWithOffset) } test("calling getDouble() and getFloat() on null columns") { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21312][SQL] correct offsetInBytes in UnsafeRow.writeToStream
Repository: spark Updated Branches: refs/heads/master 75b168fd3 -> 14a3bb3a0 [SPARK-21312][SQL] correct offsetInBytes in UnsafeRow.writeToStream ## What changes were proposed in this pull request? Corrects offsetInBytes calculation in UnsafeRow.writeToStream. Known failures include writes to some DataSources that have own SparkPlan implementations and cause EXCHANGE in writes. ## How was this patch tested? Extended UnsafeRowSuite.writeToStream to include an UnsafeRow over byte array having non-zero offset. Author: Sumedh WaleCloses #18535 from sumwale/SPARK-21312. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14a3bb3a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14a3bb3a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14a3bb3a Branch: refs/heads/master Commit: 14a3bb3a008c302aac908d7deaf0942a98c63be7 Parents: 75b168f Author: Sumedh Wale Authored: Thu Jul 6 14:47:22 2017 +0800 Committer: Wenchen Fan Committed: Thu Jul 6 14:47:22 2017 +0800 -- .../spark/sql/catalyst/expressions/UnsafeRow.java | 2 +- .../scala/org/apache/spark/sql/UnsafeRowSuite.scala| 13 + 2 files changed, 14 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/14a3bb3a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java -- diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java index 86de909..56994fa 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java @@ -550,7 +550,7 @@ public final class UnsafeRow extends InternalRow implements Externalizable, Kryo */ public void writeToStream(OutputStream out, byte[] writeBuffer) throws IOException { if (baseObject instanceof byte[]) { - int offsetInByteArray = (int) (Platform.BYTE_ARRAY_OFFSET - baseOffset); + int offsetInByteArray = (int) (baseOffset - Platform.BYTE_ARRAY_OFFSET); out.write((byte[]) baseObject, offsetInByteArray, sizeInBytes); } else { int dataRemaining = sizeInBytes; http://git-wip-us.apache.org/repos/asf/spark/blob/14a3bb3a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala index a32763d..a5f904c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala @@ -101,9 +101,22 @@ class UnsafeRowSuite extends SparkFunSuite { MemoryAllocator.UNSAFE.free(offheapRowPage) } } +val (bytesFromArrayBackedRowWithOffset, field0StringFromArrayBackedRowWithOffset) = { + val baos = new ByteArrayOutputStream() + val numBytes = arrayBackedUnsafeRow.getSizeInBytes + val bytesWithOffset = new Array[Byte](numBytes + 100) + System.arraycopy(arrayBackedUnsafeRow.getBaseObject.asInstanceOf[Array[Byte]], 0, +bytesWithOffset, 100, numBytes) + val arrayBackedRow = new UnsafeRow(arrayBackedUnsafeRow.numFields()) + arrayBackedRow.pointTo(bytesWithOffset, Platform.BYTE_ARRAY_OFFSET + 100, numBytes) + arrayBackedRow.writeToStream(baos, null) + (baos.toByteArray, arrayBackedRow.getString(0)) +} assert(bytesFromArrayBackedRow === bytesFromOffheapRow) assert(field0StringFromArrayBackedRow === field0StringFromOffheapRow) +assert(bytesFromArrayBackedRow === bytesFromArrayBackedRowWithOffset) +assert(field0StringFromArrayBackedRow === field0StringFromArrayBackedRowWithOffset) } test("calling getDouble() and getFloat() on null columns") { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21308][SQL] Remove SQLConf parameters from the optimizer
Repository: spark Updated Branches: refs/heads/master ab866f117 -> 75b168fd3 [SPARK-21308][SQL] Remove SQLConf parameters from the optimizer ### What changes were proposed in this pull request? This PR removes SQLConf parameters from the optimizer rules ### How was this patch tested? The existing test cases Author: gatorsmileCloses #18533 from gatorsmile/rmSQLConfOptimizer. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/75b168fd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/75b168fd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/75b168fd Branch: refs/heads/master Commit: 75b168fd30bb9a52ae223b6f1df73da4b1316f2e Parents: ab866f1 Author: gatorsmile Authored: Thu Jul 6 14:18:50 2017 +0800 Committer: Wenchen Fan Committed: Thu Jul 6 14:18:50 2017 +0800 -- .../optimizer/CostBasedJoinReorder.scala| 7 ++-- .../sql/catalyst/optimizer/Optimizer.scala | 36 +--- .../optimizer/StarSchemaDetection.scala | 4 ++- .../sql/catalyst/optimizer/expressions.scala| 14 .../spark/sql/catalyst/optimizer/joins.scala| 6 ++-- .../BinaryComparisonSimplificationSuite.scala | 2 +- .../optimizer/BooleanSimplificationSuite.scala | 2 +- .../optimizer/CombiningLimitsSuite.scala| 2 +- .../optimizer/ConstantFoldingSuite.scala| 2 +- .../optimizer/DecimalAggregatesSuite.scala | 2 +- .../optimizer/EliminateMapObjectsSuite.scala| 2 +- .../optimizer/JoinOptimizationSuite.scala | 2 +- .../catalyst/optimizer/JoinReorderSuite.scala | 27 --- .../catalyst/optimizer/LimitPushdownSuite.scala | 2 +- .../optimizer/OptimizeCodegenSuite.scala| 2 +- .../catalyst/optimizer/OptimizeInSuite.scala| 24 +++-- .../StarJoinCostBasedReorderSuite.scala | 36 +++- .../optimizer/StarJoinReorderSuite.scala| 25 +++--- .../catalyst/optimizer/complexTypesSuite.scala | 2 +- .../spark/sql/catalyst/plans/PlanTest.scala | 4 +-- .../execution/OptimizeMetadataOnlyQuery.scala | 8 ++--- .../spark/sql/execution/SparkOptimizer.scala| 6 ++-- .../sql/internal/BaseSessionStateBuilder.scala | 2 +- 23 files changed, 137 insertions(+), 82 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/75b168fd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala index 3a7543e..db7baf6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala @@ -32,7 +32,10 @@ import org.apache.spark.sql.internal.SQLConf * We may have several join reorder algorithms in the future. This class is the entry of these * algorithms, and chooses which one to use. */ -case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with PredicateHelper { +object CostBasedJoinReorder extends Rule[LogicalPlan] with PredicateHelper { + + private def conf = SQLConf.get + def apply(plan: LogicalPlan): LogicalPlan = { if (!conf.cboEnabled || !conf.joinReorderEnabled) { plan @@ -379,7 +382,7 @@ object JoinReorderDPFilters extends PredicateHelper { if (conf.joinReorderDPStarFilter) { // Compute the tables in a star-schema relationship. - val starJoin = StarSchemaDetection(conf).findStarJoins(items, conditions.toSeq) + val starJoin = StarSchemaDetection.findStarJoins(items, conditions.toSeq) val nonStarJoin = items.filterNot(starJoin.contains(_)) if (starJoin.nonEmpty && nonStarJoin.nonEmpty) { http://git-wip-us.apache.org/repos/asf/spark/blob/75b168fd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 946fa7b..d82af94 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -34,10 +34,10 @@ import org.apache.spark.sql.types._ * Abstract class all optimizers should inherit of, contains the standard batches (extending * Optimizers can