spark git commit: [SPARK-20217][CORE] Executor should not fail stage if killed task throws non-interrupted exception
Repository: spark Updated Branches: refs/heads/master 4000f128b -> 5142e5d4e [SPARK-20217][CORE] Executor should not fail stage if killed task throws non-interrupted exception ## What changes were proposed in this pull request? If tasks throw non-interrupted exceptions on kill (e.g. java.nio.channels.ClosedByInterruptException), their death is reported back as TaskFailed instead of TaskKilled. This causes stage failure in some cases. This is reproducible as follows. Run the following, and then use SparkContext.killTaskAttempt to kill one of the tasks. The entire stage will fail since we threw a RuntimeException instead of InterruptedException. ``` spark.range(100).repartition(100).foreach { i => try { Thread.sleep(1000) } catch { case t: InterruptedException => throw new RuntimeException(t) } } ``` Based on the code in TaskSetManager, I think this also affects kills of speculative tasks. However, since the number of speculated tasks is few, and usually you need to fail a task a few times before the stage is cancelled, it unlikely this would be noticed in production unless both speculation was enabled and the num allowed task failures was = 1. We should probably unconditionally return TaskKilled instead of TaskFailed if the task was killed by the driver, regardless of the actual exception thrown. ## How was this patch tested? Unit test. The test fails before the change in Executor.scala cc JoshRosen Author: Eric Liang Closes #17531 from ericl/fix-task-interrupt. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5142e5d4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5142e5d4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5142e5d4 Branch: refs/heads/master Commit: 5142e5d4e09c7cb36cf1d792934a21c5305c6d42 Parents: 4000f12 Author: Eric Liang Authored: Wed Apr 5 19:37:21 2017 -0700 Committer: Yin Huai Committed: Wed Apr 5 19:37:21 2017 -0700 -- core/src/main/scala/org/apache/spark/executor/Executor.scala | 2 +- core/src/test/scala/org/apache/spark/SparkContextSuite.scala | 8 +++- 2 files changed, 8 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5142e5d4/core/src/main/scala/org/apache/spark/executor/Executor.scala -- diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 99b1608..83469c5 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -432,7 +432,7 @@ private[spark] class Executor( setTaskFinishedAndClearInterruptStatus() execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled(t.reason))) -case _: InterruptedException if task.reasonIfKilled.isDefined => +case NonFatal(_) if task != null && task.reasonIfKilled.isDefined => val killReason = task.reasonIfKilled.getOrElse("unknown reason") logInfo(s"Executor interrupted and killed $taskName (TID $taskId), reason: $killReason") setTaskFinishedAndClearInterruptStatus() http://git-wip-us.apache.org/repos/asf/spark/blob/5142e5d4/core/src/test/scala/org/apache/spark/SparkContextSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 2c94755..735f445 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -572,7 +572,13 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu // first attempt will hang if (!SparkContextSuite.isTaskStarted) { SparkContextSuite.isTaskStarted = true - Thread.sleep(999) + try { +Thread.sleep(999) + } catch { +case t: Throwable => + // SPARK-20217 should not fail stage if task throws non-interrupted exception + throw new RuntimeException("killed") + } } // second attempt succeeds immediately } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20231][SQL] Refactor star schema code for the subsequent star join detection in CBO
Repository: spark Updated Branches: refs/heads/master 12206058e -> 4000f128b [SPARK-20231][SQL] Refactor star schema code for the subsequent star join detection in CBO ## What changes were proposed in this pull request? This commit moves star schema code from ```join.scala``` to ```StarSchemaDetection.scala```. It also applies some minor fixes in ```StarJoinReorderSuite.scala```. ## How was this patch tested? Run existing ```StarJoinReorderSuite.scala```. Author: Ioana Delaney Closes #17544 from ioana-delaney/starSchemaCBOv2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4000f128 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4000f128 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4000f128 Branch: refs/heads/master Commit: 4000f128b7101484ba618115504ca916c22fa84a Parents: 1220605 Author: Ioana Delaney Authored: Wed Apr 5 18:02:53 2017 -0700 Committer: Xiao Li Committed: Wed Apr 5 18:02:53 2017 -0700 -- .../optimizer/StarSchemaDetection.scala | 351 +++ .../spark/sql/catalyst/optimizer/joins.scala| 328 + .../optimizer/StarJoinReorderSuite.scala| 4 +- 3 files changed, 354 insertions(+), 329 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4000f128/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala new file mode 100644 index 000..91cb004 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import scala.annotation.tailrec + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.internal.SQLConf + +/** + * Encapsulates star-schema detection logic. + */ +case class StarSchemaDetection(conf: SQLConf) extends PredicateHelper { + + /** + * Star schema consists of one or more fact tables referencing a number of dimension + * tables. In general, star-schema joins are detected using the following conditions: + * 1. Informational RI constraints (reliable detection) + * + Dimension contains a primary key that is being joined to the fact table. + * + Fact table contains foreign keys referencing multiple dimension tables. + * 2. Cardinality based heuristics + * + Usually, the table with the highest cardinality is the fact table. + * + Table being joined with the most number of tables is the fact table. + * + * To detect star joins, the algorithm uses a combination of the above two conditions. + * The fact table is chosen based on the cardinality heuristics, and the dimension + * tables are chosen based on the RI constraints. A star join will consist of the largest + * fact table joined with the dimension tables on their primary keys. To detect that a + * column is a primary key, the algorithm uses table and column statistics. + * + * The algorithm currently returns only the star join with the largest fact table. + * Choosing the largest fact table on the driving arm to avoid large inners is in + * general a good heuristic. This restriction will be lifted to observe multiple + * star joins. + * + * The highlights of the algorithm are the following: + * + * Given a set of joined tables/plans, the algorithm first verifies if they are eligible + * for star join detection. An eligible plan is a base table access with valid statistics. + * A base table access represents Project or Filter operators above a LeafNo
spark git commit: [SPARK-20214][ML] Make sure converted csc matrix has sorted indices
Repository: spark Updated Branches: refs/heads/branch-2.0 15ea5eaa2 -> 9016e17af [SPARK-20214][ML] Make sure converted csc matrix has sorted indices ## What changes were proposed in this pull request? `_convert_to_vector` converts a scipy sparse matrix to csc matrix for initializing `SparseVector`. However, it doesn't guarantee the converted csc matrix has sorted indices and so a failure happens when you do something like that: from scipy.sparse import lil_matrix lil = lil_matrix((4, 1)) lil[1, 0] = 1 lil[3, 0] = 2 _convert_to_vector(lil.todok()) File "/home/jenkins/workspace/python/pyspark/mllib/linalg/__init__.py", line 78, in _convert_to_vector return SparseVector(l.shape[0], csc.indices, csc.data) File "/home/jenkins/workspace/python/pyspark/mllib/linalg/__init__.py", line 556, in __init__ % (self.indices[i], self.indices[i + 1])) TypeError: Indices 3 and 1 are not strictly increasing A simple test can confirm that `dok_matrix.tocsc()` won't guarantee sorted indices: >>> from scipy.sparse import lil_matrix >>> lil = lil_matrix((4, 1)) >>> lil[1, 0] = 1 >>> lil[3, 0] = 2 >>> dok = lil.todok() >>> csc = dok.tocsc() >>> csc.has_sorted_indices 0 >>> csc.indices array([3, 1], dtype=int32) I checked the source codes of scipy. The only way to guarantee it is `csc_matrix.tocsr()` and `csr_matrix.tocsc()`. ## How was this patch tested? Existing tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh Closes #17532 from viirya/make-sure-sorted-indices. (cherry picked from commit 12206058e8780e202c208b92774df3773eff36ae) Signed-off-by: Joseph K. Bradley Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9016e17a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9016e17a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9016e17a Branch: refs/heads/branch-2.0 Commit: 9016e17af6048e63841938244d6207fa64b010b1 Parents: 15ea5ea Author: Liang-Chi Hsieh Authored: Wed Apr 5 17:46:44 2017 -0700 Committer: Joseph K. Bradley Committed: Wed Apr 5 17:47:59 2017 -0700 -- python/pyspark/ml/linalg/__init__.py| 3 +++ python/pyspark/mllib/linalg/__init__.py | 3 +++ python/pyspark/mllib/tests.py | 11 +++ 3 files changed, 17 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9016e17a/python/pyspark/ml/linalg/__init__.py -- diff --git a/python/pyspark/ml/linalg/__init__.py b/python/pyspark/ml/linalg/__init__.py index bd0e186..22001e8 100644 --- a/python/pyspark/ml/linalg/__init__.py +++ b/python/pyspark/ml/linalg/__init__.py @@ -72,7 +72,10 @@ def _convert_to_vector(l): return DenseVector(l) elif _have_scipy and scipy.sparse.issparse(l): assert l.shape[1] == 1, "Expected column vector" +# Make sure the converted csc_matrix has sorted indices. csc = l.tocsc() +if not csc.has_sorted_indices: +csc.sort_indices() return SparseVector(l.shape[0], csc.indices, csc.data) else: raise TypeError("Cannot convert type %s into Vector" % type(l)) http://git-wip-us.apache.org/repos/asf/spark/blob/9016e17a/python/pyspark/mllib/linalg/__init__.py -- diff --git a/python/pyspark/mllib/linalg/__init__.py b/python/pyspark/mllib/linalg/__init__.py index d37e715..12f87fd 100644 --- a/python/pyspark/mllib/linalg/__init__.py +++ b/python/pyspark/mllib/linalg/__init__.py @@ -74,7 +74,10 @@ def _convert_to_vector(l): return DenseVector(l) elif _have_scipy and scipy.sparse.issparse(l): assert l.shape[1] == 1, "Expected column vector" +# Make sure the converted csc_matrix has sorted indices. csc = l.tocsc() +if not csc.has_sorted_indices: +csc.sort_indices() return SparseVector(l.shape[0], csc.indices, csc.data) else: raise TypeError("Cannot convert type %s into Vector" % type(l)) http://git-wip-us.apache.org/repos/asf/spark/blob/9016e17a/python/pyspark/mllib/tests.py -- diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index c519883..523b3f1 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -853,6 +853,17 @@ class SciPyTests(MLlibTestCase): self.assertEqual(sv, serialize(lil.tocsr())) self.assertEqual(sv, serialize(lil.todok())) +def test_convert_to_vector(self): +from scipy.sparse import csc_matrix +# Create a CSC matrix with non-sorted indices +indptr = array(
spark git commit: [SPARK-20214][ML] Make sure converted csc matrix has sorted indices
Repository: spark Updated Branches: refs/heads/branch-2.1 2b85e059b -> fb81a412e [SPARK-20214][ML] Make sure converted csc matrix has sorted indices ## What changes were proposed in this pull request? `_convert_to_vector` converts a scipy sparse matrix to csc matrix for initializing `SparseVector`. However, it doesn't guarantee the converted csc matrix has sorted indices and so a failure happens when you do something like that: from scipy.sparse import lil_matrix lil = lil_matrix((4, 1)) lil[1, 0] = 1 lil[3, 0] = 2 _convert_to_vector(lil.todok()) File "/home/jenkins/workspace/python/pyspark/mllib/linalg/__init__.py", line 78, in _convert_to_vector return SparseVector(l.shape[0], csc.indices, csc.data) File "/home/jenkins/workspace/python/pyspark/mllib/linalg/__init__.py", line 556, in __init__ % (self.indices[i], self.indices[i + 1])) TypeError: Indices 3 and 1 are not strictly increasing A simple test can confirm that `dok_matrix.tocsc()` won't guarantee sorted indices: >>> from scipy.sparse import lil_matrix >>> lil = lil_matrix((4, 1)) >>> lil[1, 0] = 1 >>> lil[3, 0] = 2 >>> dok = lil.todok() >>> csc = dok.tocsc() >>> csc.has_sorted_indices 0 >>> csc.indices array([3, 1], dtype=int32) I checked the source codes of scipy. The only way to guarantee it is `csc_matrix.tocsr()` and `csr_matrix.tocsc()`. ## How was this patch tested? Existing tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh Closes #17532 from viirya/make-sure-sorted-indices. (cherry picked from commit 12206058e8780e202c208b92774df3773eff36ae) Signed-off-by: Joseph K. Bradley Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fb81a412 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fb81a412 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fb81a412 Branch: refs/heads/branch-2.1 Commit: fb81a412eea1e60bd503cb5bb879ae468be24e56 Parents: 2b85e05 Author: Liang-Chi Hsieh Authored: Wed Apr 5 17:46:44 2017 -0700 Committer: Joseph K. Bradley Committed: Wed Apr 5 17:46:55 2017 -0700 -- python/pyspark/ml/linalg/__init__.py| 3 +++ python/pyspark/mllib/linalg/__init__.py | 3 +++ python/pyspark/mllib/tests.py | 11 +++ 3 files changed, 17 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fb81a412/python/pyspark/ml/linalg/__init__.py -- diff --git a/python/pyspark/ml/linalg/__init__.py b/python/pyspark/ml/linalg/__init__.py index 1705c15..eed9946 100644 --- a/python/pyspark/ml/linalg/__init__.py +++ b/python/pyspark/ml/linalg/__init__.py @@ -72,7 +72,10 @@ def _convert_to_vector(l): return DenseVector(l) elif _have_scipy and scipy.sparse.issparse(l): assert l.shape[1] == 1, "Expected column vector" +# Make sure the converted csc_matrix has sorted indices. csc = l.tocsc() +if not csc.has_sorted_indices: +csc.sort_indices() return SparseVector(l.shape[0], csc.indices, csc.data) else: raise TypeError("Cannot convert type %s into Vector" % type(l)) http://git-wip-us.apache.org/repos/asf/spark/blob/fb81a412/python/pyspark/mllib/linalg/__init__.py -- diff --git a/python/pyspark/mllib/linalg/__init__.py b/python/pyspark/mllib/linalg/__init__.py index 031f22c..7b24b3c 100644 --- a/python/pyspark/mllib/linalg/__init__.py +++ b/python/pyspark/mllib/linalg/__init__.py @@ -74,7 +74,10 @@ def _convert_to_vector(l): return DenseVector(l) elif _have_scipy and scipy.sparse.issparse(l): assert l.shape[1] == 1, "Expected column vector" +# Make sure the converted csc_matrix has sorted indices. csc = l.tocsc() +if not csc.has_sorted_indices: +csc.sort_indices() return SparseVector(l.shape[0], csc.indices, csc.data) else: raise TypeError("Cannot convert type %s into Vector" % type(l)) http://git-wip-us.apache.org/repos/asf/spark/blob/fb81a412/python/pyspark/mllib/tests.py -- diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index c519883..523b3f1 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -853,6 +853,17 @@ class SciPyTests(MLlibTestCase): self.assertEqual(sv, serialize(lil.tocsr())) self.assertEqual(sv, serialize(lil.todok())) +def test_convert_to_vector(self): +from scipy.sparse import csc_matrix +# Create a CSC matrix with non-sorted indices +indptr = array(
spark git commit: [SPARK-20214][ML] Make sure converted csc matrix has sorted indices
Repository: spark Updated Branches: refs/heads/master 9d68c6723 -> 12206058e [SPARK-20214][ML] Make sure converted csc matrix has sorted indices ## What changes were proposed in this pull request? `_convert_to_vector` converts a scipy sparse matrix to csc matrix for initializing `SparseVector`. However, it doesn't guarantee the converted csc matrix has sorted indices and so a failure happens when you do something like that: from scipy.sparse import lil_matrix lil = lil_matrix((4, 1)) lil[1, 0] = 1 lil[3, 0] = 2 _convert_to_vector(lil.todok()) File "/home/jenkins/workspace/python/pyspark/mllib/linalg/__init__.py", line 78, in _convert_to_vector return SparseVector(l.shape[0], csc.indices, csc.data) File "/home/jenkins/workspace/python/pyspark/mllib/linalg/__init__.py", line 556, in __init__ % (self.indices[i], self.indices[i + 1])) TypeError: Indices 3 and 1 are not strictly increasing A simple test can confirm that `dok_matrix.tocsc()` won't guarantee sorted indices: >>> from scipy.sparse import lil_matrix >>> lil = lil_matrix((4, 1)) >>> lil[1, 0] = 1 >>> lil[3, 0] = 2 >>> dok = lil.todok() >>> csc = dok.tocsc() >>> csc.has_sorted_indices 0 >>> csc.indices array([3, 1], dtype=int32) I checked the source codes of scipy. The only way to guarantee it is `csc_matrix.tocsr()` and `csr_matrix.tocsc()`. ## How was this patch tested? Existing tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh Closes #17532 from viirya/make-sure-sorted-indices. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/12206058 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/12206058 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/12206058 Branch: refs/heads/master Commit: 12206058e8780e202c208b92774df3773eff36ae Parents: 9d68c67 Author: Liang-Chi Hsieh Authored: Wed Apr 5 17:46:44 2017 -0700 Committer: Joseph K. Bradley Committed: Wed Apr 5 17:46:44 2017 -0700 -- python/pyspark/ml/linalg/__init__.py| 3 +++ python/pyspark/mllib/linalg/__init__.py | 3 +++ python/pyspark/mllib/tests.py | 11 +++ 3 files changed, 17 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/12206058/python/pyspark/ml/linalg/__init__.py -- diff --git a/python/pyspark/ml/linalg/__init__.py b/python/pyspark/ml/linalg/__init__.py index b765343..ad1b487 100644 --- a/python/pyspark/ml/linalg/__init__.py +++ b/python/pyspark/ml/linalg/__init__.py @@ -72,7 +72,10 @@ def _convert_to_vector(l): return DenseVector(l) elif _have_scipy and scipy.sparse.issparse(l): assert l.shape[1] == 1, "Expected column vector" +# Make sure the converted csc_matrix has sorted indices. csc = l.tocsc() +if not csc.has_sorted_indices: +csc.sort_indices() return SparseVector(l.shape[0], csc.indices, csc.data) else: raise TypeError("Cannot convert type %s into Vector" % type(l)) http://git-wip-us.apache.org/repos/asf/spark/blob/12206058/python/pyspark/mllib/linalg/__init__.py -- diff --git a/python/pyspark/mllib/linalg/__init__.py b/python/pyspark/mllib/linalg/__init__.py index 031f22c..7b24b3c 100644 --- a/python/pyspark/mllib/linalg/__init__.py +++ b/python/pyspark/mllib/linalg/__init__.py @@ -74,7 +74,10 @@ def _convert_to_vector(l): return DenseVector(l) elif _have_scipy and scipy.sparse.issparse(l): assert l.shape[1] == 1, "Expected column vector" +# Make sure the converted csc_matrix has sorted indices. csc = l.tocsc() +if not csc.has_sorted_indices: +csc.sort_indices() return SparseVector(l.shape[0], csc.indices, csc.data) else: raise TypeError("Cannot convert type %s into Vector" % type(l)) http://git-wip-us.apache.org/repos/asf/spark/blob/12206058/python/pyspark/mllib/tests.py -- diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index c519883..523b3f1 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -853,6 +853,17 @@ class SciPyTests(MLlibTestCase): self.assertEqual(sv, serialize(lil.tocsr())) self.assertEqual(sv, serialize(lil.todok())) +def test_convert_to_vector(self): +from scipy.sparse import csc_matrix +# Create a CSC matrix with non-sorted indices +indptr = array([0, 2]) +indices = array([3, 1]) +data = array([2.0, 1.0]) +csc = csc_matrix((data, indi
spark git commit: [SPARK-20204][SQL][FOLLOWUP] SQLConf should react to change in default timezone settings
Repository: spark Updated Branches: refs/heads/master 9543fc0e0 -> 9d68c6723 [SPARK-20204][SQL][FOLLOWUP] SQLConf should react to change in default timezone settings ## What changes were proposed in this pull request? Make sure SESSION_LOCAL_TIMEZONE reflects the change in JVM's default timezone setting. Currently several timezone related tests fail as the change to default timezone is not picked up by SQLConf. ## How was this patch tested? Added an unit test in ConfigEntrySuite Author: Dilip Biswal Closes #17537 from dilipbiswal/timezone_debug. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9d68c672 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9d68c672 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9d68c672 Branch: refs/heads/master Commit: 9d68c67235481fa33983afb766916b791ca8212a Parents: 9543fc0 Author: Dilip Biswal Authored: Thu Apr 6 08:33:14 2017 +0800 Committer: Wenchen Fan Committed: Thu Apr 6 08:33:14 2017 +0800 -- .../spark/internal/config/ConfigBuilder.scala | 8 .../apache/spark/internal/config/ConfigEntry.scala | 17 + .../spark/internal/config/ConfigEntrySuite.scala | 9 + .../org/apache/spark/sql/internal/SQLConf.scala| 2 +- 4 files changed, 35 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9d68c672/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala -- diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala index b992113..e5d60a7 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala @@ -147,6 +147,14 @@ private[spark] class TypedConfigBuilder[T]( } } + /** Creates a [[ConfigEntry]] with a function to determine the default value */ + def createWithDefaultFunction(defaultFunc: () => T): ConfigEntry[T] = { +val entry = new ConfigEntryWithDefaultFunction[T](parent.key, defaultFunc, converter, + stringConverter, parent._doc, parent._public) +parent._onCreate.foreach(_ (entry)) +entry + } + /** * Creates a [[ConfigEntry]] that has a default value. The default value is provided as a * [[String]] and must be a valid value for the entry. http://git-wip-us.apache.org/repos/asf/spark/blob/9d68c672/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala -- diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala index 4f3e42b..e86712e 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala @@ -78,7 +78,24 @@ private class ConfigEntryWithDefault[T] ( def readFrom(reader: ConfigReader): T = { reader.get(key).map(valueConverter).getOrElse(_defaultValue) } +} + +private class ConfigEntryWithDefaultFunction[T] ( + key: String, + _defaultFunction: () => T, + valueConverter: String => T, + stringConverter: T => String, + doc: String, + isPublic: Boolean) + extends ConfigEntry(key, valueConverter, stringConverter, doc, isPublic) { + + override def defaultValue: Option[T] = Some(_defaultFunction()) + override def defaultValueString: String = stringConverter(_defaultFunction()) + + def readFrom(reader: ConfigReader): T = { +reader.get(key).map(valueConverter).getOrElse(_defaultFunction()) + } } private class ConfigEntryWithDefaultString[T] ( http://git-wip-us.apache.org/repos/asf/spark/blob/9d68c672/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala index 3ff7e84..e2ba0d2 100644 --- a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala +++ b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala @@ -251,4 +251,13 @@ class ConfigEntrySuite extends SparkFunSuite { .createWithDefault(null) testEntryRef(nullConf, ref(nullConf)) } + + test("conf entry : default function") { +var data = 0 +val conf = new SparkConf() +val iConf = ConfigBuilder(testKey("intval")).intConf.createWithDefaultFunction(() => data) +assert(conf.get(iConf) === 0) +data = 2 +assert(conf.get(iCo
spark git commit: [SPARK-20224][SS] Updated docs for streaming dropDuplicates and mapGroupsWithState
Repository: spark Updated Branches: refs/heads/master e2773996b -> 9543fc0e0 [SPARK-20224][SS] Updated docs for streaming dropDuplicates and mapGroupsWithState ## What changes were proposed in this pull request? - Fixed bug in Java API not passing timeout conf to scala API - Updated markdown docs - Updated scala docs - Added scala and Java example ## How was this patch tested? Manually ran examples. Author: Tathagata Das Closes #17539 from tdas/SPARK-20224. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9543fc0e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9543fc0e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9543fc0e Branch: refs/heads/master Commit: 9543fc0e08a21680961689ea772441c49fcd52ee Parents: e277399 Author: Tathagata Das Authored: Wed Apr 5 16:03:04 2017 -0700 Committer: Tathagata Das Committed: Wed Apr 5 16:03:04 2017 -0700 -- docs/structured-streaming-programming-guide.md | 98 ++- .../streaming/JavaStructuredSessionization.java | 255 +++ .../streaming/StructuredSessionization.scala| 151 +++ .../spark/sql/KeyValueGroupedDataset.scala | 2 +- .../apache/spark/sql/streaming/GroupState.scala | 15 +- 5 files changed, 509 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9543fc0e/docs/structured-streaming-programming-guide.md -- diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index b5cf9f1..37a1d61 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -1,6 +1,6 @@ --- layout: global -displayTitle: Structured Streaming Programming Guide [Alpha] +displayTitle: Structured Streaming Programming Guide [Experimental] title: Structured Streaming Programming Guide --- @@ -871,6 +871,65 @@ streamingDf.join(staticDf, "type", "right_join") # right outer join with a stat +### Streaming Deduplication +You can deduplicate records in data streams using a unique identifier in the events. This is exactly same as deduplication on static using a unique identifier column. The query will store the necessary amount of data from previous records such that it can filter duplicate records. Similar to aggregations, you can use deduplication with or without watermarking. + +- *With watermark* - If there is a upper bound on how late a duplicate record may arrive, then you can define a watermark on a event time column and deduplicate using both the guid and the event time columns. The query will use the watermark to remove old state data from past records that are not expected to get any duplicates any more. This bounds the amount of the state the query has to maintain. + +- *Without watermark* - Since there are no bounds on when a duplicate record may arrive, the query stores the data from all the past records as state. + + + + +{% highlight scala %} +val streamingDf = spark.readStream. ... // columns: guid, eventTime, ... + +// Without watermark using guid column +streamingDf.dropDuplicates("guid") + +// With watermark using guid and eventTime columns +streamingDf + .withWatermark("eventTime", "10 seconds") + .dropDuplicates("guid", "eventTime") +{% endhighlight %} + + + + +{% highlight java %} +Dataset streamingDf = spark.readStream. ...; // columns: guid, eventTime, ... + +// Without watermark using guid column +streamingDf.dropDuplicates("guid"); + +// With watermark using guid and eventTime columns +streamingDf + .withWatermark("eventTime", "10 seconds") + .dropDuplicates("guid", "eventTime"); +{% endhighlight %} + + + + + +{% highlight python %} +streamingDf = spark.readStream. ... + +// Without watermark using guid column +streamingDf.dropDuplicates("guid") + +// With watermark using guid and eventTime columns +streamingDf \ + .withWatermark("eventTime", "10 seconds") \ + .dropDuplicates("guid", "eventTime") +{% endhighlight %} + + + + +### Arbitrary Stateful Operations +Many uscases require more advanced stateful operations than aggregations. For example, in many usecases, you have to track sessions from data streams of events. For doing such sessionization, you will have to save arbitrary types of data as state, and perform arbitrary operations on the state using the data stream events in every trigger. Since Spark 2.2, this can be done using the operation `mapGroupsWithState` and the more powerful operation `flatMapGroupsWithState`. Both operations allow you to apply user-defined code on grouped Datasets to update user-defined state. For more concrete details, take a look at the API documentation ([Scala](api/scala/index.html#org.apache.spark.sql.streaming.GroupStat
spark git commit: [SPARK-19454][PYTHON][SQL] DataFrame.replace improvements
Repository: spark Updated Branches: refs/heads/master a2d8d767d -> e2773996b [SPARK-19454][PYTHON][SQL] DataFrame.replace improvements ## What changes were proposed in this pull request? - Allows skipping `value` argument if `to_replace` is a `dict`: ```python df = sc.parallelize([("Alice", 1, 3.0)]).toDF() df.replace({"Alice": "Bob"}).show() - Adds validation step to ensure homogeneous values / replacements. - Simplifies internal control flow. - Improves unit tests coverage. ## How was this patch tested? Existing unit tests, additional unit tests, manual testing. Author: zero323 Closes #16793 from zero323/SPARK-19454. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e2773996 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e2773996 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e2773996 Branch: refs/heads/master Commit: e2773996b8d1c0214d9ffac634a059b4923caf7b Parents: a2d8d76 Author: zero323 Authored: Wed Apr 5 11:47:40 2017 -0700 Committer: Holden Karau Committed: Wed Apr 5 11:47:40 2017 -0700 -- python/pyspark/sql/dataframe.py | 81 +--- python/pyspark/sql/tests.py | 72 2 files changed, 128 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e2773996/python/pyspark/sql/dataframe.py -- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index a24512f..774caf5 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -25,6 +25,8 @@ if sys.version >= '3': else: from itertools import imap as map +import warnings + from pyspark import copy_func, since from pyspark.rdd import RDD, _load_from_socket, ignore_unicode_prefix from pyspark.serializers import BatchedSerializer, PickleSerializer, UTF8Deserializer @@ -1281,7 +1283,7 @@ class DataFrame(object): return DataFrame(self._jdf.na().fill(value, self._jseq(subset)), self.sql_ctx) @since(1.4) -def replace(self, to_replace, value, subset=None): +def replace(self, to_replace, value=None, subset=None): """Returns a new :class:`DataFrame` replacing a value with another value. :func:`DataFrame.replace` and :func:`DataFrameNaFunctions.replace` are aliases of each other. @@ -1326,43 +1328,72 @@ class DataFrame(object): |null| null|null| ++--++ """ -if not isinstance(to_replace, (float, int, long, basestring, list, tuple, dict)): +# Helper functions +def all_of(types): +"""Given a type or tuple of types and a sequence of xs +check if each x is instance of type(s) + +>>> all_of(bool)([True, False]) +True +>>> all_of(basestring)(["a", 1]) +False +""" +def all_of_(xs): +return all(isinstance(x, types) for x in xs) +return all_of_ + +all_of_bool = all_of(bool) +all_of_str = all_of(basestring) +all_of_numeric = all_of((float, int, long)) + +# Validate input types +valid_types = (bool, float, int, long, basestring, list, tuple) +if not isinstance(to_replace, valid_types + (dict, )): raise ValueError( -"to_replace should be a float, int, long, string, list, tuple, or dict") +"to_replace should be a float, int, long, string, list, tuple, or dict. " +"Got {0}".format(type(to_replace))) -if not isinstance(value, (float, int, long, basestring, list, tuple)): -raise ValueError("value should be a float, int, long, string, list, or tuple") +if not isinstance(value, valid_types) and not isinstance(to_replace, dict): +raise ValueError("If to_replace is not a dict, value should be " + "a float, int, long, string, list, or tuple. " + "Got {0}".format(type(value))) + +if isinstance(to_replace, (list, tuple)) and isinstance(value, (list, tuple)): +if len(to_replace) != len(value): +raise ValueError("to_replace and value lists should be of the same length. " + "Got {0} and {1}".format(len(to_replace), len(value))) -rep_dict = dict() +if not (subset is None or isinstance(subset, (list, tuple, basestring))): +raise ValueError("subset should be a list or tuple of column names, " + "column name or None. Got {0}".format(type(subset))) +# Reshape input arguments if necessary if isinstance(to_
spark git commit: [SPARK-20223][SQL] Fix typo in tpcds q77.sql
Repository: spark Updated Branches: refs/heads/branch-2.0 90eb37343 -> 15ea5eaa2 [SPARK-20223][SQL] Fix typo in tpcds q77.sql ## What changes were proposed in this pull request? Fix typo in tpcds q77.sql ## How was this patch tested? N/A Author: wangzhenhua Closes #17538 from wzhfy/typoQ77. (cherry picked from commit a2d8d767d933321426a4eb9df1583e017722d7d6) Signed-off-by: Xiao Li Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/15ea5eaa Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/15ea5eaa Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/15ea5eaa Branch: refs/heads/branch-2.0 Commit: 15ea5eaa2e45bee4b8221be96b2b666e6d64498b Parents: 90eb373 Author: wangzhenhua Authored: Wed Apr 5 10:21:43 2017 -0700 Committer: Xiao Li Committed: Wed Apr 5 10:22:10 2017 -0700 -- sql/core/src/test/resources/tpcds/q77.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/15ea5eaa/sql/core/src/test/resources/tpcds/q77.sql -- diff --git a/sql/core/src/test/resources/tpcds/q77.sql b/sql/core/src/test/resources/tpcds/q77.sql index 7830f96..a69df9f 100755 --- a/sql/core/src/test/resources/tpcds/q77.sql +++ b/sql/core/src/test/resources/tpcds/q77.sql @@ -36,7 +36,7 @@ WITH ss AS sum(cr_net_loss) AS profit_loss FROM catalog_returns, date_dim WHERE cr_returned_date_sk = d_date_sk -AND d_date BETWEEN cast('2000-08-03]' AS DATE) AND +AND d_date BETWEEN cast('2000-08-03' AS DATE) AND (cast('2000-08-03' AS DATE) + INTERVAL 30 days)), ws AS (SELECT - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20223][SQL] Fix typo in tpcds q77.sql
Repository: spark Updated Branches: refs/heads/branch-2.1 efc72dcc3 -> 2b85e059b [SPARK-20223][SQL] Fix typo in tpcds q77.sql ## What changes were proposed in this pull request? Fix typo in tpcds q77.sql ## How was this patch tested? N/A Author: wangzhenhua Closes #17538 from wzhfy/typoQ77. (cherry picked from commit a2d8d767d933321426a4eb9df1583e017722d7d6) Signed-off-by: Xiao Li Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2b85e059 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2b85e059 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2b85e059 Branch: refs/heads/branch-2.1 Commit: 2b85e059b634bfc4b015c76b7b232b732460bf12 Parents: efc72dc Author: wangzhenhua Authored: Wed Apr 5 10:21:43 2017 -0700 Committer: Xiao Li Committed: Wed Apr 5 10:21:53 2017 -0700 -- sql/core/src/test/resources/tpcds/q77.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2b85e059/sql/core/src/test/resources/tpcds/q77.sql -- diff --git a/sql/core/src/test/resources/tpcds/q77.sql b/sql/core/src/test/resources/tpcds/q77.sql index 7830f96..a69df9f 100755 --- a/sql/core/src/test/resources/tpcds/q77.sql +++ b/sql/core/src/test/resources/tpcds/q77.sql @@ -36,7 +36,7 @@ WITH ss AS sum(cr_net_loss) AS profit_loss FROM catalog_returns, date_dim WHERE cr_returned_date_sk = d_date_sk -AND d_date BETWEEN cast('2000-08-03]' AS DATE) AND +AND d_date BETWEEN cast('2000-08-03' AS DATE) AND (cast('2000-08-03' AS DATE) + INTERVAL 30 days)), ws AS (SELECT - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20223][SQL] Fix typo in tpcds q77.sql
Repository: spark Updated Branches: refs/heads/master 71c3c4815 -> a2d8d767d [SPARK-20223][SQL] Fix typo in tpcds q77.sql ## What changes were proposed in this pull request? Fix typo in tpcds q77.sql ## How was this patch tested? N/A Author: wangzhenhua Closes #17538 from wzhfy/typoQ77. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a2d8d767 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a2d8d767 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a2d8d767 Branch: refs/heads/master Commit: a2d8d767d933321426a4eb9df1583e017722d7d6 Parents: 71c3c48 Author: wangzhenhua Authored: Wed Apr 5 10:21:43 2017 -0700 Committer: Xiao Li Committed: Wed Apr 5 10:21:43 2017 -0700 -- sql/core/src/test/resources/tpcds/q77.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a2d8d767/sql/core/src/test/resources/tpcds/q77.sql -- diff --git a/sql/core/src/test/resources/tpcds/q77.sql b/sql/core/src/test/resources/tpcds/q77.sql index 7830f96..a69df9f 100755 --- a/sql/core/src/test/resources/tpcds/q77.sql +++ b/sql/core/src/test/resources/tpcds/q77.sql @@ -36,7 +36,7 @@ WITH ss AS sum(cr_net_loss) AS profit_loss FROM catalog_returns, date_dim WHERE cr_returned_date_sk = d_date_sk -AND d_date BETWEEN cast('2000-08-03]' AS DATE) AND +AND d_date BETWEEN cast('2000-08-03' AS DATE) AND (cast('2000-08-03' AS DATE) + INTERVAL 30 days)), ws AS (SELECT - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-19807][WEB UI] Add reason for cancellation when a stage is killed using web UI
Repository: spark Updated Branches: refs/heads/master 6f09dc70d -> 71c3c4815 [SPARK-19807][WEB UI] Add reason for cancellation when a stage is killed using web UI ## What changes were proposed in this pull request? When a user kills a stage using web UI (in Stages page), StagesTab.handleKillRequest requests SparkContext to cancel the stage without giving a reason. SparkContext has cancelStage(stageId: Int, reason: String) that Spark could use to pass the information for monitoring/debugging purposes. ## How was this patch tested? manual tests Please review http://spark.apache.org/contributing.html before opening a pull request. Author: shaolinliu Author: lvdongr Closes #17258 from shaolinliu/SPARK-19807. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/71c3c481 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/71c3c481 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/71c3c481 Branch: refs/heads/master Commit: 71c3c48159fe7eb4a46fc2a1b78b72088ccfa824 Parents: 6f09dc7 Author: shaolinliu Authored: Wed Apr 5 13:47:44 2017 +0100 Committer: Sean Owen Committed: Wed Apr 5 13:47:44 2017 +0100 -- core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/71c3c481/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala index c1f2511..181465b 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala @@ -42,7 +42,7 @@ private[ui] class StagesTab(parent: SparkUI) extends SparkUITab(parent, "stages" val stageId = Option(request.getParameter("id")).map(_.toInt) stageId.foreach { id => if (progressListener.activeStages.contains(id)) { - sc.foreach(_.cancelStage(id)) + sc.foreach(_.cancelStage(id, "killed via the Web UI")) // Do a quick pause here to give Spark time to kill the stage so it shows up as // killed after the refresh. Note that this will block the serving thread so the // time should be limited in duration. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20042][WEB UI] Fix log page buttons for reverse proxy mode
Repository: spark Updated Branches: refs/heads/branch-2.1 00c124884 -> efc72dcc3 [SPARK-20042][WEB UI] Fix log page buttons for reverse proxy mode with spark.ui.reverseProxy=true, full path URLs like /log will point to the master web endpoint which is serving the worker UI as reverse proxy. To access a REST endpoint in the worker in reverse proxy mode , the leading /proxy/"target"/ part of the base URI must be retained. Added logic to log-view.js to handle this, similar to executorspage.js Patch was tested manually Author: Oliver Köth Closes #17370 from okoethibm/master. (cherry picked from commit 6f09dc70d9808cae004ceda9ad615aa9be50f43d) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/efc72dcc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/efc72dcc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/efc72dcc Branch: refs/heads/branch-2.1 Commit: efc72dcc3f964ea9931fb47a454db253556d0f8a Parents: 00c1248 Author: Oliver Köth Authored: Wed Apr 5 08:09:42 2017 +0100 Committer: Sean Owen Committed: Wed Apr 5 08:09:52 2017 +0100 -- .../org/apache/spark/ui/static/log-view.js | 19 --- 1 file changed, 16 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/efc72dcc/core/src/main/resources/org/apache/spark/ui/static/log-view.js -- diff --git a/core/src/main/resources/org/apache/spark/ui/static/log-view.js b/core/src/main/resources/org/apache/spark/ui/static/log-view.js index 1782b4f..b5c43e5 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/log-view.js +++ b/core/src/main/resources/org/apache/spark/ui/static/log-view.js @@ -51,13 +51,26 @@ function noNewAlert() { window.setTimeout(function () {alert.css("display", "none");}, 4000); } + +function getRESTEndPoint() { + // If the worker is served from the master through a proxy (see doc on spark.ui.reverseProxy), + // we need to retain the leading ../proxy// part of the URL when making REST requests. + // Similar logic is contained in executorspage.js function createRESTEndPoint. + var words = document.baseURI.split('/'); + var ind = words.indexOf("proxy"); + if (ind > 0) { + return words.slice(0, ind + 2).join('/') + "/log"; + } + return "/log" +} + function loadMore() { var offset = Math.max(startByte - byteLength, 0); var moreByteLength = Math.min(byteLength, startByte); $.ajax({ type: "GET", -url: "/log" + baseParams + "&offset=" + offset + "&byteLength=" + moreByteLength, +url: getRESTEndPoint() + baseParams + "&offset=" + offset + "&byteLength=" + moreByteLength, success: function (data) { var oldHeight = $(".log-content")[0].scrollHeight; var newlineIndex = data.indexOf('\n'); @@ -83,14 +96,14 @@ function loadMore() { function loadNew() { $.ajax({ type: "GET", -url: "/log" + baseParams + "&byteLength=0", +url: getRESTEndPoint() + baseParams + "&byteLength=0", success: function (data) { var dataInfo = data.substring(0, data.indexOf('\n')).match(/\d+/g); var newDataLen = dataInfo[2] - totalLogLength; if (newDataLen != 0) { $.ajax({ type: "GET", - url: "/log" + baseParams + "&byteLength=" + newDataLen, + url: getRESTEndPoint() + baseParams + "&byteLength=" + newDataLen, success: function (data) { var newlineIndex = data.indexOf('\n'); var dataInfo = data.substring(0, newlineIndex).match(/\d+/g); - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20042][WEB UI] Fix log page buttons for reverse proxy mode
Repository: spark Updated Branches: refs/heads/master dad499f32 -> 6f09dc70d [SPARK-20042][WEB UI] Fix log page buttons for reverse proxy mode with spark.ui.reverseProxy=true, full path URLs like /log will point to the master web endpoint which is serving the worker UI as reverse proxy. To access a REST endpoint in the worker in reverse proxy mode , the leading /proxy/"target"/ part of the base URI must be retained. Added logic to log-view.js to handle this, similar to executorspage.js Patch was tested manually Author: Oliver Köth Closes #17370 from okoethibm/master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6f09dc70 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6f09dc70 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6f09dc70 Branch: refs/heads/master Commit: 6f09dc70d9808cae004ceda9ad615aa9be50f43d Parents: dad499f Author: Oliver Köth Authored: Wed Apr 5 08:09:42 2017 +0100 Committer: Sean Owen Committed: Wed Apr 5 08:09:42 2017 +0100 -- .../org/apache/spark/ui/static/log-view.js | 19 --- 1 file changed, 16 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6f09dc70/core/src/main/resources/org/apache/spark/ui/static/log-view.js -- diff --git a/core/src/main/resources/org/apache/spark/ui/static/log-view.js b/core/src/main/resources/org/apache/spark/ui/static/log-view.js index 1782b4f..b5c43e5 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/log-view.js +++ b/core/src/main/resources/org/apache/spark/ui/static/log-view.js @@ -51,13 +51,26 @@ function noNewAlert() { window.setTimeout(function () {alert.css("display", "none");}, 4000); } + +function getRESTEndPoint() { + // If the worker is served from the master through a proxy (see doc on spark.ui.reverseProxy), + // we need to retain the leading ../proxy// part of the URL when making REST requests. + // Similar logic is contained in executorspage.js function createRESTEndPoint. + var words = document.baseURI.split('/'); + var ind = words.indexOf("proxy"); + if (ind > 0) { + return words.slice(0, ind + 2).join('/') + "/log"; + } + return "/log" +} + function loadMore() { var offset = Math.max(startByte - byteLength, 0); var moreByteLength = Math.min(byteLength, startByte); $.ajax({ type: "GET", -url: "/log" + baseParams + "&offset=" + offset + "&byteLength=" + moreByteLength, +url: getRESTEndPoint() + baseParams + "&offset=" + offset + "&byteLength=" + moreByteLength, success: function (data) { var oldHeight = $(".log-content")[0].scrollHeight; var newlineIndex = data.indexOf('\n'); @@ -83,14 +96,14 @@ function loadMore() { function loadNew() { $.ajax({ type: "GET", -url: "/log" + baseParams + "&byteLength=0", +url: getRESTEndPoint() + baseParams + "&byteLength=0", success: function (data) { var dataInfo = data.substring(0, data.indexOf('\n')).match(/\d+/g); var newDataLen = dataInfo[2] - totalLogLength; if (newDataLen != 0) { $.ajax({ type: "GET", - url: "/log" + baseParams + "&byteLength=" + newDataLen, + url: getRESTEndPoint() + baseParams + "&byteLength=" + newDataLen, success: function (data) { var newlineIndex = data.indexOf('\n'); var dataInfo = data.substring(0, newlineIndex).match(/\d+/g); - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org