svn commit: r29982 - in /dev/spark/3.0.0-SNAPSHOT-2018_10_09_16_02-faf73dc-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Oct 9 23:17:14 2018 New Revision: 29982 Log: Apache Spark 3.0.0-SNAPSHOT-2018_10_09_16_02-faf73dc docs [This commit notification would consist of 1482 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25559][FOLLOW-UP] Add comments for partial pushdown of conjuncts in Parquet
Repository: spark Updated Branches: refs/heads/master 3eee9e024 -> faf73dcd3 [SPARK-25559][FOLLOW-UP] Add comments for partial pushdown of conjuncts in Parquet ## What changes were proposed in this pull request? This is a follow up of https://github.com/apache/spark/pull/22574. Renamed the parameter and added comments. ## How was this patch tested? N/A Closes #22679 from gatorsmile/followupSPARK-25559. Authored-by: gatorsmile Signed-off-by: DB Tsai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/faf73dcd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/faf73dcd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/faf73dcd Branch: refs/heads/master Commit: faf73dcd33d04365c28c2846d3a1f845785f69df Parents: 3eee9e0 Author: gatorsmile Authored: Tue Oct 9 21:10:33 2018 + Committer: DB Tsai Committed: Tue Oct 9 21:10:33 2018 + -- .../datasources/parquet/ParquetFilters.scala| 31 ++-- 1 file changed, 22 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/faf73dcd/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 44a0d20..21ab9c7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -394,13 +394,22 @@ private[parquet] class ParquetFilters( */ def createFilter(schema: MessageType, predicate: sources.Filter): Option[FilterPredicate] = { val nameToParquetField = getFieldMap(schema) -createFilterHelper(nameToParquetField, predicate, canRemoveOneSideInAnd = true) +createFilterHelper(nameToParquetField, predicate, canPartialPushDownConjuncts = true) } + /** + * @param nameToParquetField a map from the field name to its field name and data type. + * This only includes the root fields whose types are primitive types. + * @param predicate the input filter predicates. Not all the predicates can be pushed down. + * @param canPartialPushDownConjuncts whether a subset of conjuncts of predicates can be pushed + *down safely. Pushing ONLY one side of AND down is safe to + *do at the top level or none of its ancestors is NOT and OR. + * @return the Parquet-native filter predicates that are eligible for pushdown. + */ private def createFilterHelper( nameToParquetField: Map[String, ParquetField], predicate: sources.Filter, - canRemoveOneSideInAnd: Boolean): Option[FilterPredicate] = { + canPartialPushDownConjuncts: Boolean): Option[FilterPredicate] = { // Decimal type must make sure that filter value's scale matched the file. // If doesn't matched, which would cause data corruption. def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): Boolean = value match { @@ -505,24 +514,28 @@ private[parquet] class ParquetFilters( // Pushing one side of AND down is only safe to do at the top level or in the child // AND before hitting NOT or OR conditions, and in this case, the unsupported predicate // can be safely removed. -val lhsFilterOption = createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd) -val rhsFilterOption = createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd) +val lhsFilterOption = + createFilterHelper(nameToParquetField, lhs, canPartialPushDownConjuncts) +val rhsFilterOption = + createFilterHelper(nameToParquetField, rhs, canPartialPushDownConjuncts) (lhsFilterOption, rhsFilterOption) match { case (Some(lhsFilter), Some(rhsFilter)) => Some(FilterApi.and(lhsFilter, rhsFilter)) - case (Some(lhsFilter), None) if canRemoveOneSideInAnd => Some(lhsFilter) - case (None, Some(rhsFilter)) if canRemoveOneSideInAnd => Some(rhsFilter) + case (Some(lhsFilter), None) if canPartialPushDownConjuncts => Some(lhsFilter) + case (None, Some(rhsFilter)) if canPartialPushDownConjuncts => Some(rhsFilter) case _ => None } case sources.Or(lhs, rhs) => for { - lhsFilter <- createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd = false) - rhsFilter <- createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd = false) + lhsF
svn commit: r29973 - in /dev/spark/3.0.0-SNAPSHOT-2018_10_09_08_03-3eee9e0-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Oct 9 15:17:22 2018 New Revision: 29973 Log: Apache Spark 3.0.0-SNAPSHOT-2018_10_09_08_03-3eee9e0 docs [This commit notification would consist of 1482 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25535][CORE] Work around bad error handling in commons-crypto.
Repository: spark Updated Branches: refs/heads/master deb9588b2 -> 3eee9e024 [SPARK-25535][CORE] Work around bad error handling in commons-crypto. The commons-crypto library does some questionable error handling internally, which can lead to JVM crashes if some call into native code fails and cleans up state it should not. While the library is not fixed, this change adds some workarounds in Spark code so that when an error is detected in the commons-crypto side, Spark avoids calling into the library further. Tested with existing and added unit tests. Closes #22557 from vanzin/SPARK-25535. Authored-by: Marcelo Vanzin Signed-off-by: Imran Rashid Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3eee9e02 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3eee9e02 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3eee9e02 Branch: refs/heads/master Commit: 3eee9e02463e10570a29fad00823c953debd945e Parents: deb9588 Author: Marcelo Vanzin Authored: Tue Oct 9 09:27:08 2018 -0500 Committer: Imran Rashid Committed: Tue Oct 9 09:27:08 2018 -0500 -- .../apache/spark/network/crypto/AuthEngine.java | 95 - .../spark/network/crypto/TransportCipher.java | 60 ++-- .../spark/network/crypto/AuthEngineSuite.java | 17 +++ .../spark/security/CryptoStreamUtils.scala | 137 +-- .../spark/security/CryptoStreamUtilsSuite.scala | 37 - 5 files changed, 295 insertions(+), 51 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3eee9e02/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthEngine.java -- diff --git a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthEngine.java b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthEngine.java index 056505e..64fdb32 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthEngine.java +++ b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthEngine.java @@ -159,15 +159,21 @@ class AuthEngine implements Closeable { // accurately report the errors when they happen. RuntimeException error = null; byte[] dummy = new byte[8]; -try { - doCipherOp(encryptor, dummy, true); -} catch (Exception e) { - error = new RuntimeException(e); +if (encryptor != null) { + try { +doCipherOp(Cipher.ENCRYPT_MODE, dummy, true); + } catch (Exception e) { +error = new RuntimeException(e); + } + encryptor = null; } -try { - doCipherOp(decryptor, dummy, true); -} catch (Exception e) { - error = new RuntimeException(e); +if (decryptor != null) { + try { +doCipherOp(Cipher.DECRYPT_MODE, dummy, true); + } catch (Exception e) { +error = new RuntimeException(e); + } + decryptor = null; } random.close(); @@ -189,11 +195,11 @@ class AuthEngine implements Closeable { } private byte[] decrypt(byte[] in) throws GeneralSecurityException { -return doCipherOp(decryptor, in, false); +return doCipherOp(Cipher.DECRYPT_MODE, in, false); } private byte[] encrypt(byte[] in) throws GeneralSecurityException { -return doCipherOp(encryptor, in, false); +return doCipherOp(Cipher.ENCRYPT_MODE, in, false); } private void initializeForAuth(String cipher, byte[] nonce, SecretKeySpec key) @@ -205,11 +211,13 @@ class AuthEngine implements Closeable { byte[] iv = new byte[conf.ivLength()]; System.arraycopy(nonce, 0, iv, 0, Math.min(nonce.length, iv.length)); -encryptor = CryptoCipherFactory.getCryptoCipher(cipher, cryptoConf); -encryptor.init(Cipher.ENCRYPT_MODE, key, new IvParameterSpec(iv)); +CryptoCipher _encryptor = CryptoCipherFactory.getCryptoCipher(cipher, cryptoConf); +_encryptor.init(Cipher.ENCRYPT_MODE, key, new IvParameterSpec(iv)); +this.encryptor = _encryptor; -decryptor = CryptoCipherFactory.getCryptoCipher(cipher, cryptoConf); -decryptor.init(Cipher.DECRYPT_MODE, key, new IvParameterSpec(iv)); +CryptoCipher _decryptor = CryptoCipherFactory.getCryptoCipher(cipher, cryptoConf); +_decryptor.init(Cipher.DECRYPT_MODE, key, new IvParameterSpec(iv)); +this.decryptor = _decryptor; } /** @@ -241,29 +249,52 @@ class AuthEngine implements Closeable { return new SecretKeySpec(key.getEncoded(), conf.keyAlgorithm()); } - private byte[] doCipherOp(CryptoCipher cipher, byte[] in, boolean isFinal) + private byte[] doCipherOp(int mode, byte[] in, boolean isFinal) throws GeneralSecurityException { -Preconditions.checkState(cipher != null); +CryptoCipher cipher; +switch (mode) { + case Cipher.ENC
spark git commit: [SPARK-24851][UI] Map a Stage ID to it's Associated Job ID
Repository: spark Updated Branches: refs/heads/master e3133f4ab -> deb9588b2 [SPARK-24851][UI] Map a Stage ID to it's Associated Job ID It would be nice to have a field in Stage Page UI which would show mapping of the current stage id to the job id's to which that stage belongs to. ## What changes were proposed in this pull request? Added a field in Stage UI to display the corresponding job id for that particular stage. ## How was this patch tested? https://user-images.githubusercontent.com/8190/43220447-a8e94f80-900f-11e8-8a20-a235bbd5a369.png";> Closes #21809 from pgandhi999/SPARK-24851. Authored-by: pgandhi Signed-off-by: Thomas Graves Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/deb9588b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/deb9588b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/deb9588b Branch: refs/heads/master Commit: deb9588b2ab6596b30ab17f56c59951cabf57162 Parents: e3133f4 Author: pgandhi Authored: Tue Oct 9 08:59:21 2018 -0500 Committer: Thomas Graves Committed: Tue Oct 9 08:59:21 2018 -0500 -- .../scala/org/apache/spark/status/AppStatusStore.scala | 8 +--- .../apache/spark/status/api/v1/StagesResource.scala| 2 +- .../scala/org/apache/spark/ui/jobs/StagePage.scala | 13 +++-- 3 files changed, 17 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/deb9588b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala -- diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index e237281..9839cbb 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -112,10 +112,12 @@ private[spark] class AppStatusStore( } } - def stageAttempt(stageId: Int, stageAttemptId: Int, details: Boolean = false): v1.StageData = { + def stageAttempt(stageId: Int, stageAttemptId: Int, + details: Boolean = false): (v1.StageData, Seq[Int]) = { val stageKey = Array(stageId, stageAttemptId) -val stage = store.read(classOf[StageDataWrapper], stageKey).info -if (details) stageWithDetails(stage) else stage +val stageDataWrapper = store.read(classOf[StageDataWrapper], stageKey) +val stage = if (details) stageWithDetails(stageDataWrapper.info) else stageDataWrapper.info +(stage, stageDataWrapper.jobIds.toSeq) } def taskCount(stageId: Int, stageAttemptId: Int): Long = { http://git-wip-us.apache.org/repos/asf/spark/blob/deb9588b/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala -- diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala index 96249e4..30d52b9 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala @@ -56,7 +56,7 @@ private[v1] class StagesResource extends BaseAppResource { @PathParam("stageAttemptId") stageAttemptId: Int, @QueryParam("details") @DefaultValue("true") details: Boolean): StageData = withUI { ui => try { - ui.store.stageAttempt(stageId, stageAttemptId, details = details) + ui.store.stageAttempt(stageId, stageAttemptId, details = details)._1 } catch { case _: NoSuchElementException => // Change the message depending on whether there are any attempts for the requested stage. http://git-wip-us.apache.org/repos/asf/spark/blob/deb9588b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 7428bbe..0f74b07 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -105,7 +105,7 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We val stageAttemptId = parameterAttempt.toInt val stageHeader = s"Details for Stage $stageId (Attempt $stageAttemptId)" -val stageData = parent.store +val (stageData, stageJobIds) = parent.store .asOption(parent.store.stageAttempt(stageId, stageAttemptId, details = false)) .getOrElse { val content = @@ -183,6 +183,15 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We {Utils.bytesToString(stageDat
svn commit: r29965 - in /dev/spark/3.0.0-SNAPSHOT-2018_10_09_04_02-e3133f4-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Oct 9 11:16:46 2018 New Revision: 29965 Log: Apache Spark 3.0.0-SNAPSHOT-2018_10_09_04_02-e3133f4 docs [This commit notification would consist of 1481 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r29963 - in /dev/spark/2.4.1-SNAPSHOT-2018_10_09_02_02-404c840-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Oct 9 09:16:59 2018 New Revision: 29963 Log: Apache Spark 2.4.1-SNAPSHOT-2018_10_09_02_02-404c840 docs [This commit notification would consist of 1472 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25497][SQL] Limit operation within whole stage codegen should not consume all the inputs
Repository: spark Updated Branches: refs/heads/master 46fe40838 -> e3133f4ab [SPARK-25497][SQL] Limit operation within whole stage codegen should not consume all the inputs ## What changes were proposed in this pull request? This PR is inspired by https://github.com/apache/spark/pull/22524, but proposes a safer fix. The current limit whole stage codegen has 2 problems: 1. It's only applied to `InputAdapter`, many leaf nodes can't stop earlier w.r.t. limit. 2. It needs to override a method, which will break if we have more than one limit in the whole-stage. The first problem is easy to fix, just figure out which nodes can stop earlier w.r.t. limit, and update them. This PR updates `RangeExec`, `ColumnarBatchScan`, `SortExec`, `HashAggregateExec`. The second problem is hard to fix. This PR proposes to propagate the limit counter variable name upstream, so that the upstream leaf/blocking nodes can check the limit counter and quit the loop earlier. For better performance, the implementation here follows `CodegenSupport.needStopCheck`, so that we only codegen the check only if there is limit in the query. For columnar node like range, we check the limit counter per-batch instead of per-row, to make the inner loop tight and fast. Why this is safer? 1. the leaf/blocking nodes don't have to check the limit counter and stop earlier. It's only for performance. (this is same as before) 2. The blocking operators can stop propagating the limit counter name, because the counter of limit after blocking operators will never increase, before blocking operators consume all the data from upstream operators. So the upstream operators don't care about limit after blocking operators. This is also for performance only, it's OK if we forget to do it for some new blocking operators. ## How was this patch tested? a new test Closes #22630 from cloud-fan/limit. Authored-by: Wenchen Fan Signed-off-by: Kazuaki Ishizaki Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e3133f4a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e3133f4a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e3133f4a Branch: refs/heads/master Commit: e3133f4abf1cd5667abe5f0d05fa0af0df3033ae Parents: 46fe408 Author: Wenchen Fan Authored: Tue Oct 9 16:46:23 2018 +0900 Committer: Kazuaki Ishizaki Committed: Tue Oct 9 16:46:23 2018 +0900 -- .../sql/execution/BufferedRowIterator.java | 10 -- .../spark/sql/execution/ColumnarBatchScan.scala | 4 +- .../apache/spark/sql/execution/SortExec.scala | 12 +- .../sql/execution/WholeStageCodegenExec.scala | 59 +- .../execution/aggregate/HashAggregateExec.scala | 22 +--- .../sql/execution/basicPhysicalOperators.scala | 91 +-- .../org/apache/spark/sql/execution/limit.scala | 31 -- .../sql/execution/metric/SQLMetricsSuite.scala | 111 --- 8 files changed, 215 insertions(+), 125 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e3133f4a/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java -- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java b/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java index 74c9c05..3d0511b 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java @@ -74,16 +74,6 @@ public abstract class BufferedRowIterator { } /** - * Returns whether this iterator should stop fetching next row from [[CodegenSupport#inputRDDs]]. - * - * If it returns true, the caller should exit the loop that [[InputAdapter]] generates. - * This interface is mainly used to limit the number of input rows. - */ - public boolean stopEarly() { -return false; - } - - /** * Returns whether `processNext()` should stop processing next row from `input` or not. * * If it returns true, the caller should exit the loop (return from processNext()). http://git-wip-us.apache.org/repos/asf/spark/blob/e3133f4a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala index 48abad9..9f6b593 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala @@ -136,7 +136,7 @@ private[sql] trait ColumnarBatchScan extends Codeg
svn commit: r29962 - in /dev/spark/3.0.0-SNAPSHOT-2018_10_09_00_02-46fe408-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Oct 9 07:16:42 2018 New Revision: 29962 Log: Apache Spark 3.0.0-SNAPSHOT-2018_10_09_00_02-46fe408 docs [This commit notification would consist of 1481 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org