This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 300ec1a [SPARK-27226][SQL] Reduce the code duplicate when upgrading built-in Hive 300ec1a is described below commit 300ec1a74cb14867c22e616e657566d510426331 Author: Yuming Wang <yumw...@ebay.com> AuthorDate: Mon Mar 25 19:39:00 2019 -0500 [SPARK-27226][SQL] Reduce the code duplicate when upgrading built-in Hive ## What changes were proposed in this pull request? This pr related to #24119. Reduce the code duplicate when upgrading built-in Hive. To achieve this, we should avoid using classes in `org.apache.orc.storage.*` because these classes will be replaced with `org.apache.hadoop.hive.*` after upgrading the built-in Hive. Such as: ![image](https://user-images.githubusercontent.com/5399861/54437594-e9be1000-476f-11e9-8878-3b7414871ee5.png) - Move the usage of `org.apache.orc.storage.*` to `OrcShimUtils`: 1. Add wrapper for `VectorizedRowBatch`(Reduce code duplication of [OrcColumnarBatchReader](https://github.com/apache/spark/pull/24166/files#diff-e594f7295e5408c01ace8175166313b6)). 2. Move some serializer/deserializer method out of `OrcDeserializer` and `OrcSerializer`(Reduce code duplication of [OrcDeserializer](https://github.com/apache/spark/pull/24166/files#diff-b933819e6dcaff41eee8fce1e8f2932c) and [OrcSerializer](https://github.com/apache/spark/pull/24166/files#diff-6d3849d88929f6ea25c436d71da729da)). 3. Defined two type aliases: `Operator` and `SearchArgument`(Reduce code duplication of [OrcV1FilterSuite](https://github.com/apache/spark/pull/24166/files#diff-48c4fc7a3b3384a6d0aab246723a0058)). - Move duplication code to super class: 1. Add a trait for `OrcFilters`(Reduce code duplication of [OrcFilters](https://github.com/apache/spark/pull/24166/files#diff-224b8cbedf286ecbfdd092d1e2e2f237)). 2. Move `checkNoFilterPredicate` from `OrcFilterSuite` to `OrcTest`(Reduce code duplication of [OrcFilterSuite](https://github.com/apache/spark/pull/24166/files#diff-8e05c1faaaec98edd7723e62f84066f1)). After this pr. We only need to copy these 4 files: OrcColumnVector, OrcFilters, OrcFilterSuite and OrcShimUtils. ## How was this patch tested? existing tests Closes #24166 from wangyum/SPARK-27226. Authored-by: Yuming Wang <yumw...@ebay.com> Signed-off-by: Sean Owen <sean.o...@databricks.com> --- .../datasources/orc/OrcColumnarBatchReader.java | 16 +++--- .../datasources/orc/OrcDeserializer.scala | 6 +- .../sql/execution/datasources/orc/OrcFilters.scala | 34 +---------- .../execution/datasources/orc/OrcFiltersBase.scala | 58 +++++++++++++++++++ .../execution/datasources/orc/OrcSerializer.scala | 16 +----- .../execution/datasources/orc/OrcShimUtils.scala | 66 ++++++++++++++++++++++ .../execution/datasources/orc/OrcFilterSuite.scala | 28 --------- .../sql/execution/datasources/orc/OrcTest.scala | 34 ++++++++++- .../datasources/orc/OrcV1FilterSuite.scala | 5 +- 9 files changed, 174 insertions(+), 89 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java index efca96e..6a4b116 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java @@ -30,9 +30,9 @@ import org.apache.orc.OrcFile; import org.apache.orc.Reader; import org.apache.orc.TypeDescription; import org.apache.orc.mapred.OrcInputFormat; -import org.apache.orc.storage.ql.exec.vector.*; import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.datasources.orc.OrcShimUtils.VectorizedRowBatchWrap; import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils; import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; import org.apache.spark.sql.types.*; @@ -48,8 +48,8 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> { // The capacity of vectorized batch. private int capacity; - // Vectorized ORC Row Batch - private VectorizedRowBatch batch; + // Vectorized ORC Row Batch wrap. + private VectorizedRowBatchWrap wrap; /** * The column IDs of the physical ORC file schema which are required by this reader. @@ -146,8 +146,8 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> { int[] requestedDataColIds, int[] requestedPartitionColIds, InternalRow partitionValues) { - batch = orcSchema.createRowBatch(capacity); - assert(!batch.selectedInUse); // `selectedInUse` should be initialized with `false`. + wrap = new VectorizedRowBatchWrap(orcSchema.createRowBatch(capacity)); + assert(!wrap.batch().selectedInUse); // `selectedInUse` should be initialized with `false`. assert(requiredFields.length == requestedDataColIds.length); assert(requiredFields.length == requestedPartitionColIds.length); // If a required column is also partition column, use partition value and don't read from file. @@ -180,7 +180,7 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> { missingCol.setIsConstant(); orcVectorWrappers[i] = missingCol; } else { - orcVectorWrappers[i] = new OrcColumnVector(dt, batch.cols[colId]); + orcVectorWrappers[i] = new OrcColumnVector(dt, wrap.batch().cols[colId]); } } } @@ -193,8 +193,8 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> { * by copying from ORC VectorizedRowBatch columns to Spark ColumnarBatch columns. */ private boolean nextBatch() throws IOException { - recordReader.nextBatch(batch); - int batchSize = batch.size; + recordReader.nextBatch(wrap.batch()); + int batchSize = wrap.batch().size; if (batchSize == 0) { return false; } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala index 62e1670..6d52d40 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.datasources.orc import org.apache.hadoop.io._ import org.apache.orc.mapred.{OrcList, OrcMap, OrcStruct, OrcTimestamp} -import org.apache.orc.storage.serde2.io.{DateWritable, HiveDecimalWritable} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData} @@ -109,14 +108,13 @@ class OrcDeserializer( updater.set(ordinal, bytes) case DateType => (ordinal, value) => - updater.setInt(ordinal, DateTimeUtils.fromJavaDate(value.asInstanceOf[DateWritable].get)) + updater.setInt(ordinal, DateTimeUtils.fromJavaDate(OrcShimUtils.getSqlDate(value))) case TimestampType => (ordinal, value) => updater.setLong(ordinal, DateTimeUtils.fromJavaTimestamp(value.asInstanceOf[OrcTimestamp])) case DecimalType.Fixed(precision, scale) => (ordinal, value) => - val decimal = value.asInstanceOf[HiveDecimalWritable].getHiveDecimal() - val v = Decimal(decimal.bigDecimalValue, decimal.precision(), decimal.scale()) + val v = OrcShimUtils.getDecimal(value) v.changePrecision(precision, scale) updater.set(ordinal, v) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index 9848400..112dcb2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -23,7 +23,7 @@ import org.apache.orc.storage.ql.io.sarg.SearchArgument.Builder import org.apache.orc.storage.ql.io.sarg.SearchArgumentFactory.newBuilder import org.apache.orc.storage.serde2.io.HiveDecimalWritable -import org.apache.spark.sql.sources.{And, Filter} +import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types._ /** @@ -56,27 +56,7 @@ import org.apache.spark.sql.types._ * builder methods mentioned above can only be found in test code, where all tested filters are * known to be convertible. */ -private[sql] object OrcFilters { - private[sql] def buildTree(filters: Seq[Filter]): Option[Filter] = { - filters match { - case Seq() => None - case Seq(filter) => Some(filter) - case Seq(filter1, filter2) => Some(And(filter1, filter2)) - case _ => // length > 2 - val (left, right) = filters.splitAt(filters.length / 2) - Some(And(buildTree(left).get, buildTree(right).get)) - } - } - - // Since ORC 1.5.0 (ORC-323), we need to quote for column names with `.` characters - // in order to distinguish predicate pushdown for nested columns. - private def quoteAttributeNameIfNeeded(name: String) : String = { - if (!name.contains("`") && name.contains(".")) { - s"`$name`" - } else { - name - } - } +private[sql] object OrcFilters extends OrcFiltersBase { /** * Create ORC filter as a SearchArgument instance. @@ -102,16 +82,6 @@ private[sql] object OrcFilters { } /** - * Return true if this is a searchable type in ORC. - * Both CharType and VarcharType are cleaned at AstBuilder. - */ - private def isSearchableType(dataType: DataType) = dataType match { - case BinaryType => false - case _: AtomicType => true - case _ => false - } - - /** * Get PredicateLeafType which is corresponding to the given DataType. */ private def getPredicateLeafType(dataType: DataType) = dataType match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala new file mode 100644 index 0000000..8d4898a --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc + +import org.apache.spark.sql.sources.{And, Filter} +import org.apache.spark.sql.types.{AtomicType, BinaryType, DataType} + +/** + * Methods that can be shared when upgrading the built-in Hive. + */ +trait OrcFiltersBase { + + private[sql] def buildTree(filters: Seq[Filter]): Option[Filter] = { + filters match { + case Seq() => None + case Seq(filter) => Some(filter) + case Seq(filter1, filter2) => Some(And(filter1, filter2)) + case _ => // length > 2 + val (left, right) = filters.splitAt(filters.length / 2) + Some(And(buildTree(left).get, buildTree(right).get)) + } + } + + // Since ORC 1.5.0 (ORC-323), we need to quote for column names with `.` characters + // in order to distinguish predicate pushdown for nested columns. + protected def quoteAttributeNameIfNeeded(name: String) : String = { + if (!name.contains("`") && name.contains(".")) { + s"`$name`" + } else { + name + } + } + + /** + * Return true if this is a searchable type in ORC. + * Both CharType and VarcharType are cleaned at AstBuilder. + */ + protected def isSearchableType(dataType: DataType) = dataType match { + case BinaryType => false + case _: AtomicType => true + case _ => false + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala index 90d1268..0b9cbec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala @@ -20,8 +20,6 @@ package org.apache.spark.sql.execution.datasources.orc import org.apache.hadoop.io._ import org.apache.orc.TypeDescription import org.apache.orc.mapred.{OrcList, OrcMap, OrcStruct, OrcTimestamp} -import org.apache.orc.storage.common.`type`.HiveDecimal -import org.apache.orc.storage.serde2.io.{DateWritable, HiveDecimalWritable} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.SpecializedGetters @@ -139,14 +137,7 @@ class OrcSerializer(dataSchema: StructType) { new BytesWritable(getter.getBinary(ordinal)) case DateType => - if (reuseObj) { - val result = new DateWritable() - (getter, ordinal) => - result.set(getter.getInt(ordinal)) - result - } else { - (getter, ordinal) => new DateWritable(getter.getInt(ordinal)) - } + OrcShimUtils.getDateWritable(reuseObj) // The following cases are already expensive, reusing object or not doesn't matter. @@ -156,9 +147,8 @@ class OrcSerializer(dataSchema: StructType) { result.setNanos(ts.getNanos) result - case DecimalType.Fixed(precision, scale) => (getter, ordinal) => - val d = getter.getDecimal(ordinal, precision, scale) - new HiveDecimalWritable(HiveDecimal.create(d.toJavaBigDecimal)) + case DecimalType.Fixed(precision, scale) => + OrcShimUtils.getHiveDecimalWritable(precision, scale) case st: StructType => (getter, ordinal) => val result = createOrcValue(st).asInstanceOf[OrcStruct] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala new file mode 100644 index 0000000..68503ab --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc + +import java.sql.Date + +import org.apache.orc.storage.common.`type`.HiveDecimal +import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch +import org.apache.orc.storage.ql.io.sarg.{SearchArgument => OrcSearchArgument} +import org.apache.orc.storage.ql.io.sarg.PredicateLeaf.{Operator => OrcOperator} +import org.apache.orc.storage.serde2.io.{DateWritable, HiveDecimalWritable} + +import org.apache.spark.sql.catalyst.expressions.SpecializedGetters +import org.apache.spark.sql.types.Decimal + +/** + * Various utilities for ORC used to upgrade the built-in Hive. + */ +private[sql] object OrcShimUtils { + + class VectorizedRowBatchWrap(val batch: VectorizedRowBatch) {} + + private[sql] type Operator = OrcOperator + private[sql] type SearchArgument = OrcSearchArgument + + def getSqlDate(value: Any): Date = value.asInstanceOf[DateWritable].get + + def getDecimal(value: Any): Decimal = { + val decimal = value.asInstanceOf[HiveDecimalWritable].getHiveDecimal() + Decimal(decimal.bigDecimalValue, decimal.precision(), decimal.scale()) + } + + def getDateWritable(reuseObj: Boolean): (SpecializedGetters, Int) => DateWritable = { + if (reuseObj) { + val result = new DateWritable() + (getter, ordinal) => + result.set(getter.getInt(ordinal)) + result + } else { + (getter: SpecializedGetters, ordinal: Int) => + new DateWritable(getter.getInt(ordinal)) + } + } + + def getHiveDecimalWritable(precision: Int, scale: Int): + (SpecializedGetters, Int) => HiveDecimalWritable = { + (getter, ordinal) => + val d = getter.getDecimal(ordinal, precision, scale) + new HiveDecimalWritable(HiveDecimal.create(d.toJavaBigDecimal)) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala index 7b65ba8..e96c6fb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala @@ -89,34 +89,6 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext { checkFilterPredicate(df, predicate, checkLogicalOperator) } - protected def checkNoFilterPredicate - (predicate: Predicate, noneSupported: Boolean = false) - (implicit df: DataFrame): Unit = { - val output = predicate.collect { case a: Attribute => a }.distinct - val query = df - .select(output.map(e => Column(e)): _*) - .where(Column(predicate)) - - query.queryExecution.optimizedPlan match { - case PhysicalOperation(_, filters, - DataSourceV2Relation(orcTable: OrcTable, _, options)) => - assert(filters.nonEmpty, "No filter is analyzed from the given query") - val scanBuilder = orcTable.newScanBuilder(options) - scanBuilder.pushFilters(filters.flatMap(DataSourceStrategy.translateFilter).toArray) - val pushedFilters = scanBuilder.pushedFilters() - if (noneSupported) { - assert(pushedFilters.isEmpty, "Unsupported filters should not show in pushed filters") - } else { - assert(pushedFilters.nonEmpty, "No filter is pushed down") - val maybeFilter = OrcFilters.createFilter(query.schema, pushedFilters) - assert(maybeFilter.isEmpty, s"Couldn't generate filter predicate for $pushedFilters") - } - - case _ => - throw new AnalysisException("Can not match OrcTable in the query.") - } - } - test("filter pushdown - integer") { withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i)))) { implicit df => checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala index 411e632..adbd93d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala @@ -25,7 +25,11 @@ import scala.reflect.runtime.universe.TypeTag import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql._ -import org.apache.spark.sql.execution.datasources.FileBasedDataSourceTest +import org.apache.spark.sql.catalyst.expressions.{Attribute, Predicate} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, FileBasedDataSourceTest} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.ORC_IMPLEMENTATION @@ -104,4 +108,32 @@ abstract class OrcTest extends QueryTest with FileBasedDataSourceTest with Befor assert(actual < numRows) } } + + protected def checkNoFilterPredicate + (predicate: Predicate, noneSupported: Boolean = false) + (implicit df: DataFrame): Unit = { + val output = predicate.collect { case a: Attribute => a }.distinct + val query = df + .select(output.map(e => Column(e)): _*) + .where(Column(predicate)) + + query.queryExecution.optimizedPlan match { + case PhysicalOperation(_, filters, + DataSourceV2Relation(orcTable: OrcTable, _, options)) => + assert(filters.nonEmpty, "No filter is analyzed from the given query") + val scanBuilder = orcTable.newScanBuilder(options) + scanBuilder.pushFilters(filters.flatMap(DataSourceStrategy.translateFilter).toArray) + val pushedFilters = scanBuilder.pushedFilters() + if (noneSupported) { + assert(pushedFilters.isEmpty, "Unsupported filters should not show in pushed filters") + } else { + assert(pushedFilters.nonEmpty, "No filter is pushed down") + val maybeFilter = OrcFilters.createFilter(query.schema, pushedFilters) + assert(maybeFilter.isEmpty, s"Couldn't generate filter predicate for $pushedFilters") + } + + case _ => + throw new AnalysisException("Can not match OrcTable in the query.") + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV1FilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV1FilterSuite.scala index 5a1bf9b..4452780 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV1FilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV1FilterSuite.scala @@ -18,14 +18,13 @@ package org.apache.spark.sql.execution.datasources.orc import scala.collection.JavaConverters._ -import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument} - import org.apache.spark.SparkConf import org.apache.spark.sql.{Column, DataFrame} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions.{Attribute, Predicate} import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.execution.datasources.orc.OrcShimUtils.{Operator, SearchArgument} import org.apache.spark.sql.internal.SQLConf class OrcV1FilterSuite extends OrcFilterSuite { @@ -63,7 +62,7 @@ class OrcV1FilterSuite extends OrcFilterSuite { } override def checkFilterPredicate - (predicate: Predicate, filterOperator: PredicateLeaf.Operator) + (predicate: Predicate, filterOperator: Operator) (implicit df: DataFrame): Unit = { def checkComparisonOperator(filter: SearchArgument) = { val operator = filter.getLeaves.asScala --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org